bethaus-app/index_for_search.py
2025-04-06 06:51:08 +00:00

146 lines
6.1 KiB
Python

import os
import json
import sqlite3
SEARCH_DB_NAME = 'search.db'
ACCESS_LOG_DB_NAME = 'access_log.db'
FOLDER_CONFIG = 'folder_permission_config.json'
# Connect to the search database.
search_db = sqlite3.connect(SEARCH_DB_NAME, check_same_thread=False)
search_db.row_factory = sqlite3.Row
# Open access_log.db in read-only mode.
access_log_db = sqlite3.connect(f'file:{ACCESS_LOG_DB_NAME}?mode=ro', uri=True)
access_log_db.row_factory = sqlite3.Row
def init_db():
"""Initializes the database with the required schema."""
cursor = search_db.cursor()
# Create table with the new 'hitcount' and 'basefolder' columns.
cursor.execute('''
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
relative_path TEXT,
basefolder TEXT,
filename TEXT,
filetype TEXT,
transcript TEXT,
hitcount INTEGER DEFAULT 0,
UNIQUE(relative_path, filename)
)
''')
search_db.commit()
# If the table already existed, try to add the new columns.
try:
cursor.execute("ALTER TABLE files ADD COLUMN hitcount INTEGER DEFAULT 0")
except sqlite3.OperationalError:
# Likely the column already exists, so we ignore this error.
pass
try:
cursor.execute("ALTER TABLE files ADD COLUMN basefolder TEXT")
except sqlite3.OperationalError:
# Likely the column already exists, so we ignore this error.
pass
search_db.commit()
def scan_dir(directory):
"""Recursively scan directories using os.scandir for improved performance."""
try:
with os.scandir(directory) as it:
for entry in it:
if entry.is_dir(follow_symlinks=False):
# Skip transcription directories immediately.
if entry.name.lower() == "transkription":
continue
yield from scan_dir(entry.path)
elif entry.is_file(follow_symlinks=False):
yield entry
except PermissionError:
return
def get_hit_count(relative_path):
"""Returns the hit count for a given file from the access log database."""
cursor = access_log_db.cursor()
cursor.execute("SELECT COUNT(*) AS hit_count FROM file_access_log WHERE rel_path = ?", (relative_path,))
row = cursor.fetchone()
return row["hit_count"] if row else 0
def updatefileindex():
cursor = search_db.cursor()
# Load folder configuration from JSON file.
with open(FOLDER_CONFIG, "r", encoding="utf-8") as f:
config_data = json.load(f)
# Process each configured base folder.
for config in config_data:
for folder in config.get("folders", []):
foldername = folder.get("foldername")
print(f"Processing folder: {foldername}")
raw_folderpath = folder.get("folderpath")
norm_folderpath = os.path.normpath(raw_folderpath)
# Precompute the length of the base folder path (plus one for the separator)
base_len = len(norm_folderpath) + 1
# Accumulate scanned file data and keys for this base folder.
scanned_files = [] # Each entry: (relative_path, basefolder, filename, filetype, transcript, hitcount)
current_keys = set()
for entry in scan_dir(norm_folderpath):
entry_path = os.path.normpath(entry.path)
# Get relative part by slicing if possible.
if entry_path.startswith(norm_folderpath):
rel_part = entry_path[base_len:]
else:
rel_part = os.path.relpath(entry_path, norm_folderpath)
# Prepend the foldername so it becomes part of the stored relative path.
relative_path = os.path.join(foldername, rel_part).replace(os.sep, '/')
filetype = os.path.splitext(entry.name)[1].lower()
transcript = None
# Check for a corresponding transcript file in a sibling "Transkription" folder.
parent_dir = os.path.dirname(entry_path)
transcript_dir = os.path.join(parent_dir, "Transkription")
transcript_filename = os.path.splitext(entry.name)[0] + ".md"
transcript_path = os.path.join(transcript_dir, transcript_filename)
if os.path.exists(transcript_path):
try:
with open(transcript_path, 'r', encoding='utf-8') as tf:
transcript = tf.read()
except Exception:
transcript = None
# Retrieve the hit count for this file.
hit_count = get_hit_count(relative_path)
scanned_files.append((relative_path, foldername, entry.name, filetype, transcript, hit_count))
current_keys.add((relative_path, entry.name))
# Remove database entries for files under this base folder that are no longer on disk.
pattern = foldername + os.sep + '%'
cursor.execute("SELECT id, relative_path, filename FROM files WHERE relative_path LIKE ?", (pattern,))
db_rows = cursor.fetchall()
keys_in_db = set((row["relative_path"], row["filename"]) for row in db_rows)
keys_to_delete = keys_in_db - current_keys
for key in keys_to_delete:
cursor.execute("DELETE FROM files WHERE relative_path = ? AND filename = ?", key)
# Bulk write the scanned files using INSERT OR REPLACE.
cursor.executemany(
"INSERT OR REPLACE INTO files (relative_path, basefolder, filename, filetype, transcript, hitcount) VALUES (?, ?, ?, ?, ?, ?)",
scanned_files
)
# Commit changes after processing this base folder.
search_db.commit()
return "File index updated successfully"
if __name__ == "__main__":
init_db() # Initialize the database schema if it doesn't exist
updatefileindex() # Update the file index
search_db.close() # Close the search database connection
access_log_db.close() # Close the access log connection
print("Database connections closed.")