bethaus-app/index_for_search.py
2026-01-24 16:07:44 +00:00

376 lines
16 KiB
Python
Executable File

import os
import json
import sqlite3
from datetime import datetime
from time import monotonic
import re
import helperfunctions as hf
SEARCH_DB_NAME = 'search.db'
ACCESS_LOG_DB_NAME = 'access_log.db'
FOLDER_CONFIG = 'folder_secret_config.json'
TRANSCRIPT_DIRNAME = "Transkription"
TRANSCRIPT_EXT = ".md"
IGNORED_DIRS = {TRANSCRIPT_DIRNAME, "@eaDir", ".app", "#recycle"}
# Logging/progress tuning (keep output light by default)
LOG_STRUCTURE_DEPTH = int(os.getenv("INDEX_LOG_STRUCTURE_DEPTH", "0") or 0)
PROGRESS_EVERY_SECS = float(os.getenv("INDEX_PROGRESS_SECS", "30"))
PROGRESS_EVERY_FILES = int(os.getenv("INDEX_PROGRESS_FILES", "5000"))
MAX_ERROR_LOGS = int(os.getenv("INDEX_MAX_ERROR_LOGS", "5"))
# Connect to the search database.
search_db = sqlite3.connect(SEARCH_DB_NAME, check_same_thread=False)
search_db.row_factory = sqlite3.Row
# Open access_log.db in read-only mode.
access_log_db = sqlite3.connect(f'file:{ACCESS_LOG_DB_NAME}?mode=ro', uri=True)
access_log_db.row_factory = sqlite3.Row
def log(message: str):
"""Small helper to ensure console output is flushed immediately."""
print(message, flush=True)
def log_permission_error(path: str, stats: dict):
"""Log permission errors sparingly to avoid noisy output."""
stats["perm_errors"] += 1
if stats["perm_errors"] <= MAX_ERROR_LOGS:
log(f"Permission denied: {path}")
elif stats["perm_errors"] == MAX_ERROR_LOGS + 1:
log("Further permission errors suppressed.")
def skip_dir(name: str) -> bool:
"""Return True when a directory name should be skipped during traversal/logging."""
return name.startswith('.') or name in IGNORED_DIRS
def format_duration(seconds: float) -> str:
total_secs = int(seconds)
minutes, secs = divmod(total_secs, 60)
return f"{minutes}m {secs:02d}s"
def init_db():
"""Initializes the database with the required schema."""
cursor = search_db.cursor()
# Create table with the new 'hitcount' and 'basefolder' columns.
cursor.execute('''
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
relative_path TEXT,
basefolder TEXT,
filename TEXT,
filetype TEXT,
category TEXT,
titel TEXT,
name TEXT,
performance_date TEXT,
site TEXT,
transcript TEXT,
hitcount INTEGER DEFAULT 0,
UNIQUE(relative_path, filename)
)
''')
search_db.commit()
# If the table already existed, try to add the new columns.
# try:
# cursor.execute("ALTER TABLE files ADD COLUMN category TEXT")
# except sqlite3.OperationalError:
# # Likely the column already exists, so we ignore this error.
# pass
search_db.commit()
def scan_dir(directory: str, stats: dict):
"""Iteratively scan directories using os.scandir for improved performance."""
stack = [directory]
while stack:
current = stack.pop()
stats["dirs"] += 1
try:
with os.scandir(current) as it:
for entry in it:
try:
if entry.is_dir(follow_symlinks=False):
# Skip unwanted directories immediately.
if skip_dir(entry.name):
stats["skipped_dirs"] += 1
continue
stack.append(entry.path)
elif entry.is_file(follow_symlinks=False):
yield entry
except PermissionError:
log_permission_error(entry.path, stats)
except PermissionError:
log_permission_error(current, stats)
def get_hit_count(relative_path):
"""Returns the hit count for a given file from the access log database."""
cursor = access_log_db.cursor()
cursor.execute("SELECT COUNT(*) AS hit_count FROM file_access_log WHERE rel_path = ?", (relative_path,))
row = cursor.fetchone()
return row["hit_count"] if row else 0
def get_hit_counts_for_basefolder(basefolder: str) -> dict:
"""Return a map of rel_path -> hit_count for all files under a basefolder."""
cursor = access_log_db.cursor()
pattern = f"{basefolder}/%"
cursor.execute(
"SELECT rel_path, COUNT(*) AS hit_count FROM file_access_log WHERE rel_path LIKE ? GROUP BY rel_path",
(pattern,)
)
return {row["rel_path"]: row["hit_count"] for row in cursor.fetchall()}
def build_transcript_index(transcript_dir: str, stats: dict):
"""Return a dict of basename -> transcript path for a transcript directory."""
try:
with os.scandir(transcript_dir) as it:
index = {}
for entry in it:
if not entry.is_file(follow_symlinks=False):
continue
name = entry.name
if not name.endswith(TRANSCRIPT_EXT):
continue
index[name[:-len(TRANSCRIPT_EXT)]] = entry.path
return index
except FileNotFoundError:
return None
except PermissionError:
log_permission_error(transcript_dir, stats)
return None
def log_structure(root_path, max_depth=None, show_files=False):
"""
Log folder structure up to max_depth levels (root = depth 1).
If max_depth is None, traverse all depths. Files are logged only when show_files is True.
"""
depth_label = "all" if max_depth is None else f"<= {max_depth}"
log(f"Folder structure (depth {depth_label}) for '{root_path}':")
def _walk(path, depth):
if max_depth is not None and depth > max_depth:
return
try:
with os.scandir(path) as it:
entries = sorted(it, key=lambda e: (not e.is_dir(follow_symlinks=False), e.name.lower()))
for entry in entries:
if entry.is_dir(follow_symlinks=False):
if skip_dir(entry.name):
continue
indent = " " * (depth - 1)
log(f"{indent}- {entry.name}/")
_walk(entry.path, depth + 1)
elif show_files:
indent = " " * (depth - 1)
log(f"{indent}- {entry.name}")
except PermissionError:
indent = " " * (depth - 1)
log(f"{indent}- [permission denied]")
_walk(root_path, depth=1)
def log_file(relative_path: str, filename: str):
"""Debug helper to log each file that is indexed."""
log(f" file: {relative_path} ({filename})")
def updatefileindex():
total_start = monotonic()
cursor = search_db.cursor()
totals = {"folders": 0, "scanned": 0, "deleted": 0}
# Load folder configuration from JSON file.
with open(FOLDER_CONFIG, "r", encoding="utf-8") as f:
config_data = json.load(f)
# Process each configured base folder.
for config in config_data:
for folder in config.get("folders", []):
totals["folders"] += 1
foldername = folder.get("foldername")
log(f"Processing folder: {foldername}")
raw_folderpath = folder.get("folderpath")
norm_folderpath = os.path.normpath(raw_folderpath)
# Optional shallow structure log (off by default)
if LOG_STRUCTURE_DEPTH > 0:
log_structure(norm_folderpath, max_depth=LOG_STRUCTURE_DEPTH, show_files=False)
# Precompute the length of the base folder path (plus one for the separator)
base_prefix = norm_folderpath + os.sep
base_len = len(base_prefix)
# Prefetch hit counts for this basefolder to avoid per-file queries
hitcount_map = get_hit_counts_for_basefolder(foldername)
# Accumulate scanned file data and keys for this base folder.
scanned_files = [] # Each entry: (relative_path, basefolder, filename, filetype, transcript, hitcount)
current_keys = set()
scan_stats = {"dirs": 0, "skipped_dirs": 0, "perm_errors": 0}
transcript_cache = {}
transcripts_read = 0
transcript_errors = 0
site = None
if foldername == 'Gottesdienste Speyer':
site = 'Speyer'
elif foldername == 'Gottesdienste Schwegenheim':
site = 'Schwegenheim'
start_time = monotonic()
last_log_time = start_time
next_log_count = PROGRESS_EVERY_FILES
scanned_count = 0
extract_structure = hf.extract_structure_from_string
extract_date = hf.extract_date_from_string
for entry in scan_dir(norm_folderpath, scan_stats):
transcript = None
scanned_count += 1
entry_path = entry.path
# Get relative part by slicing if possible.
if entry_path.startswith(base_prefix):
rel_part = entry_path[base_len:]
else:
rel_part = os.path.relpath(entry_path, norm_folderpath)
# Prepend the foldername so it becomes part of the stored relative path.
rel_part = rel_part.replace(os.sep, '/')
relative_path = f"{foldername}/{rel_part}"
name_root, name_ext = os.path.splitext(entry.name)
filetype = name_ext.lower()
# Retrieve the hit count for this file from pre-fetched map.
hit_count = hitcount_map.get(relative_path, 0)
# Check for a corresponding transcript file in a sibling "Transkription" folder.
parent_dir = os.path.dirname(entry_path)
transcript_index = transcript_cache.get(parent_dir)
if transcript_index is None and parent_dir not in transcript_cache:
transcript_dir = os.path.join(parent_dir, TRANSCRIPT_DIRNAME)
transcript_index = build_transcript_index(transcript_dir, scan_stats)
transcript_cache[parent_dir] = transcript_index
if transcript_index:
transcript_path = transcript_index.get(name_root)
if transcript_path:
try:
with open(transcript_path, 'r', encoding='utf-8') as tf:
transcript = tf.read()
transcripts_read += 1
except Exception:
transcript_errors += 1
category, titel, name = extract_structure(entry.name)
performance_date = extract_date(relative_path)
scanned_files.append((relative_path, foldername, entry.name, filetype, category, titel, name, performance_date, site, transcript, hit_count))
current_keys.add((relative_path, entry.name))
# Light progress output
now = monotonic()
if scanned_count >= next_log_count or (now - last_log_time) >= PROGRESS_EVERY_SECS:
elapsed = max(now - start_time, 0.0001)
rate = scanned_count / elapsed
log(f" progress: {scanned_count} files, {scan_stats['dirs']} dirs, {rate:.1f} files/s")
last_log_time = now
next_log_count = scanned_count + PROGRESS_EVERY_FILES
# Progress indicator
dir_count = scan_stats["dirs"]
file_count = scanned_count
elapsed = max(monotonic() - start_time, 0.0001)
rate = file_count / elapsed
log(f"Scan summary for '{foldername}': {dir_count} dirs, {file_count} files, {rate:.1f} files/s")
if scan_stats["skipped_dirs"]:
log(f" skipped dirs: {scan_stats['skipped_dirs']}")
if scan_stats["perm_errors"]:
log(f" permission errors: {scan_stats['perm_errors']}")
if transcripts_read or transcript_errors:
log(f" transcripts: {transcripts_read} read, {transcript_errors} errors")
log("updating database...")
scan_duration = format_duration(elapsed)
# Remove database entries for files under this base folder that are no longer on disk.
pattern = foldername + os.sep + '%'
cursor.execute("SELECT id, relative_path, filename FROM files WHERE relative_path LIKE ?", (pattern,))
db_rows = cursor.fetchall()
keys_in_db = set((row["relative_path"], row["filename"]) for row in db_rows)
keys_to_delete = keys_in_db - current_keys
deleted_count = len(keys_to_delete)
totals["deleted"] += deleted_count
for key in keys_to_delete:
cursor.execute("DELETE FROM files WHERE relative_path = ? AND filename = ?", key)
# Bulk write the scanned files using INSERT OR REPLACE.
cursor.executemany(
"INSERT OR REPLACE INTO files (relative_path, basefolder, filename, filetype, category, titel, name, performance_date, site, transcript, hitcount) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
scanned_files
)
# Commit changes after processing this base folder.
search_db.commit()
folder_scanned = scanned_count
totals["scanned"] += folder_scanned
log(f"Indexed {folder_scanned} files (deleted {deleted_count}) in '{foldername}'")
log(f"Scan duration for '{foldername}': {scan_duration}")
total_elapsed = max(monotonic() - total_start, 0.0001)
log(f"Index update finished: folders={totals['folders']}, files indexed={totals['scanned']}, removed={totals['deleted']}")
log(f"Total index duration: {format_duration(total_elapsed)}")
return "File index updated successfully"
def convert_dates(search_db,
date_formats=('%d.%m.%Y', '%d.%m.%y')):
"""
Connects to the SQLite database at search_db, and for every row in table 'files':
- Reads the date from performance_date (expects 'dd.mm.yyyy' or 'dd.mm.yy').
- Parses it and reformats to ISO 'YYYY-MM-DD'.
- Updates the row (using id as primary key).
Only counts rows where the conversion was successful.
"""
# Regex to quickly filter out non-matching strings
date_regex = re.compile(r'^\d{1,2}\.\d{1,2}\.\d{2,4}$')
cur = search_db.cursor()
# Fetch all rows with a non-null date
cur.execute("SELECT id, performance_date FROM files")
rows = cur.fetchall()
converted_count = 0
for pk, raw_date in rows:
if not raw_date or not date_regex.match(raw_date):
continue
for fmt in date_formats:
try:
dt = datetime.strptime(raw_date, fmt)
new_date = dt.strftime('%Y-%m-%d')
# Only update if the reformatted date is different
if new_date != raw_date:
cur.execute(
"UPDATE files SET performance_date = ? WHERE id = ?",
(new_date, pk)
)
converted_count += 1
break # stop trying other formats
except ValueError:
continue
search_db.commit()
print(f"Converted {converted_count} rows to ISO format.")
if __name__ == "__main__":
convert_dates(search_db)
init_db() # Initialize the database schema if it doesn't exist
updatefileindex() # Update the file index
search_db.close() # Close the search database connection
access_log_db.close() # Close the access log connection
print("Database connections closed.")