138 lines
5.8 KiB
Python
138 lines
5.8 KiB
Python
import os
|
|
import json
|
|
import sqlite3
|
|
|
|
SEARCH_DB_NAME = 'search.db'
|
|
ACCESS_LOG_DB_NAME = 'access_log.db'
|
|
|
|
# Connect to the search database.
|
|
search_db = sqlite3.connect(SEARCH_DB_NAME, check_same_thread=False)
|
|
search_db.row_factory = sqlite3.Row
|
|
|
|
# Open access_log.db in read-only mode.
|
|
access_log_db = sqlite3.connect(f'file:{ACCESS_LOG_DB_NAME}?mode=ro', uri=True)
|
|
access_log_db.row_factory = sqlite3.Row
|
|
|
|
def init_db():
|
|
"""Initializes the database with the required schema."""
|
|
cursor = search_db.cursor()
|
|
# Create table with the new 'hitcount' column.
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS files (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
relative_path TEXT,
|
|
filename TEXT,
|
|
filetype TEXT,
|
|
transcript TEXT,
|
|
hitcount INTEGER DEFAULT 0,
|
|
UNIQUE(relative_path, filename)
|
|
)
|
|
''')
|
|
search_db.commit()
|
|
# If the table already existed, try to add the 'hitcount' column.
|
|
try:
|
|
cursor.execute("ALTER TABLE files ADD COLUMN hitcount INTEGER DEFAULT 0")
|
|
except sqlite3.OperationalError:
|
|
# Likely the column already exists, so we ignore this error.
|
|
pass
|
|
search_db.commit()
|
|
|
|
def scan_dir(directory):
|
|
"""Recursively scan directories using os.scandir for improved performance."""
|
|
try:
|
|
with os.scandir(directory) as it:
|
|
for entry in it:
|
|
if entry.is_dir(follow_symlinks=False):
|
|
# Skip transcription directories immediately.
|
|
if entry.name.lower() == "transkription":
|
|
continue
|
|
yield from scan_dir(entry.path)
|
|
elif entry.is_file(follow_symlinks=False):
|
|
yield entry
|
|
except PermissionError:
|
|
return
|
|
|
|
def get_hit_count(relative_path):
|
|
"""Returns the hit count for a given file from the access log database."""
|
|
cursor = access_log_db.cursor()
|
|
cursor.execute("SELECT COUNT(*) AS hit_count FROM file_access_log WHERE rel_path = ?", (relative_path,))
|
|
row = cursor.fetchone()
|
|
return row["hit_count"] if row else 0
|
|
|
|
def updatefileindex():
|
|
cursor = search_db.cursor()
|
|
|
|
# Load folder configuration from JSON file.
|
|
with open("folder_config.json", "r", encoding="utf-8") as f:
|
|
config_data = json.load(f)
|
|
|
|
# Process each configured base folder.
|
|
for config in config_data:
|
|
for folder in config.get("folders", []):
|
|
foldername = folder.get("foldername")
|
|
raw_folderpath = folder.get("folderpath")
|
|
norm_folderpath = os.path.normpath(raw_folderpath)
|
|
# Precompute the length of the base folder path (plus one for the separator)
|
|
base_len = len(norm_folderpath) + 1
|
|
|
|
# Accumulate scanned file data and keys for this base folder.
|
|
scanned_files = [] # Each entry: (relative_path, filename, filetype, transcript, hitcount)
|
|
current_keys = set()
|
|
|
|
for entry in scan_dir(norm_folderpath):
|
|
entry_path = os.path.normpath(entry.path)
|
|
# Get relative part by slicing if possible.
|
|
if entry_path.startswith(norm_folderpath):
|
|
rel_part = entry_path[base_len:]
|
|
else:
|
|
rel_part = os.path.relpath(entry_path, norm_folderpath)
|
|
# Prepend the foldername so it becomes part of the stored relative path.
|
|
relative_path = os.path.join(foldername, rel_part).replace(os.sep, '/')
|
|
filetype = os.path.splitext(entry.name)[1].lower()
|
|
transcript = None
|
|
|
|
# Check for a corresponding transcript file in a sibling "Transkription" folder.
|
|
parent_dir = os.path.dirname(entry_path)
|
|
transcript_dir = os.path.join(parent_dir, "Transkription")
|
|
transcript_filename = os.path.splitext(entry.name)[0] + ".md"
|
|
transcript_path = os.path.join(transcript_dir, transcript_filename)
|
|
if os.path.exists(transcript_path):
|
|
try:
|
|
with open(transcript_path, 'r', encoding='utf-8') as tf:
|
|
transcript = tf.read()
|
|
except Exception:
|
|
transcript = None
|
|
|
|
# Retrieve the hit count for this file.
|
|
hit_count = get_hit_count(relative_path)
|
|
|
|
scanned_files.append((relative_path, entry.name, filetype, transcript, hit_count))
|
|
current_keys.add((relative_path, entry.name))
|
|
|
|
# Remove database entries for files under this base folder that are no longer on disk.
|
|
pattern = foldername + os.sep + '%'
|
|
cursor.execute("SELECT id, relative_path, filename FROM files WHERE relative_path LIKE ?", (pattern,))
|
|
db_rows = cursor.fetchall()
|
|
keys_in_db = set((row["relative_path"], row["filename"]) for row in db_rows)
|
|
keys_to_delete = keys_in_db - current_keys
|
|
for key in keys_to_delete:
|
|
cursor.execute("DELETE FROM files WHERE relative_path = ? AND filename = ?", key)
|
|
|
|
# Bulk write the scanned files using INSERT OR REPLACE.
|
|
cursor.executemany(
|
|
"INSERT OR REPLACE INTO files (relative_path, filename, filetype, transcript, hitcount) VALUES (?, ?, ?, ?, ?)",
|
|
scanned_files
|
|
)
|
|
|
|
# Commit changes after processing this base folder.
|
|
search_db.commit()
|
|
|
|
return "File index updated successfully"
|
|
|
|
if __name__ == "__main__":
|
|
init_db() # Initialize the database schema if it doesn't exist
|
|
updatefileindex() # Update the file index
|
|
search_db.close() # Close the search database connection
|
|
access_log_db.close() # Close the access log connection
|
|
print("Database connections closed.")
|