bethaus-app/index_for_search.py
2026-01-26 19:02:43 +00:00

403 lines
16 KiB
Python
Executable File

import os
import json
import sqlite3
from datetime import datetime
from time import monotonic
import re
from typing import Optional
import helperfunctions as hf
SEARCH_DB_NAME = 'search.db'
ACCESS_LOG_DB_NAME = 'access_log.db'
FOLDER_CONFIG = 'folder_secret_config.json'
TRANSCRIPT_DIRNAME = "Transkription"
TRANSCRIPT_EXT = ".md"
IGNORED_DIRS = {TRANSCRIPT_DIRNAME, "@eaDir", ".app", "#recycle"}
# Logging/progress tuning (keep output light by default)
LOG_STRUCTURE_DEPTH = int(os.getenv("INDEX_LOG_STRUCTURE_DEPTH", "0") or 0)
PROGRESS_EVERY_SECS = float(os.getenv("INDEX_PROGRESS_SECS", "30"))
PROGRESS_EVERY_FILES = int(os.getenv("INDEX_PROGRESS_FILES", "5000"))
MAX_ERROR_LOGS = int(os.getenv("INDEX_MAX_ERROR_LOGS", "5"))
# Connect to the search database.
search_db = sqlite3.connect(SEARCH_DB_NAME, check_same_thread=False)
search_db.row_factory = sqlite3.Row
# Open access_log.db in read-only mode.
access_log_db = sqlite3.connect(f'file:{ACCESS_LOG_DB_NAME}?mode=ro', uri=True)
access_log_db.row_factory = sqlite3.Row
def log(message: str):
"""Small helper to ensure console output is flushed immediately."""
print(message, flush=True)
def log_permission_error(path: str, stats: dict):
"""Log permission errors sparingly to avoid noisy output."""
stats["perm_errors"] += 1
if stats["perm_errors"] <= MAX_ERROR_LOGS:
log(f"Permission denied: {path}")
elif stats["perm_errors"] == MAX_ERROR_LOGS + 1:
log("Further permission errors suppressed.")
def skip_dir(name: str) -> bool:
"""Return True when a directory name should be skipped during traversal/logging."""
return name.startswith('.') or name in IGNORED_DIRS
def format_duration(seconds: float) -> str:
total_secs = int(seconds)
minutes, secs = divmod(total_secs, 60)
return f"{minutes}m {secs:02d}s"
def init_db():
"""Initializes the database with the required schema."""
cursor = search_db.cursor()
# Create table with the new 'hitcount' and 'basefolder' columns.
cursor.execute('''
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
relative_path TEXT,
basefolder TEXT,
filename TEXT,
filetype TEXT,
category TEXT,
titel TEXT,
name TEXT,
performance_date TEXT,
site TEXT,
transcript TEXT,
hitcount INTEGER DEFAULT 0,
UNIQUE(relative_path, filename)
)
''')
search_db.commit()
# If the table already existed, try to add the new columns.
# try:
# cursor.execute("ALTER TABLE files ADD COLUMN category TEXT")
# except sqlite3.OperationalError:
# # Likely the column already exists, so we ignore this error.
# pass
search_db.commit()
def scan_dir(directory: str, stats: dict):
"""Iteratively scan directories using os.scandir for improved performance."""
stack = [directory]
while stack:
current = stack.pop()
stats["dirs"] += 1
try:
with os.scandir(current) as it:
for entry in it:
try:
if entry.is_dir(follow_symlinks=False):
# Skip unwanted directories immediately.
if skip_dir(entry.name):
stats["skipped_dirs"] += 1
continue
stack.append(entry.path)
elif entry.is_file(follow_symlinks=False):
yield entry
except PermissionError:
log_permission_error(entry.path, stats)
except PermissionError:
log_permission_error(current, stats)
def get_hit_count(relative_path):
"""Returns the hit count for a given file from the access log database."""
cursor = access_log_db.cursor()
cursor.execute("SELECT COUNT(*) AS hit_count FROM file_access_log WHERE rel_path = ?", (relative_path,))
row = cursor.fetchone()
return row["hit_count"] if row else 0
def get_hit_counts_for_basefolder(basefolder: str) -> dict:
"""Return a map of rel_path -> hit_count for all files under a basefolder."""
cursor = access_log_db.cursor()
pattern = f"{basefolder}/%"
cursor.execute(
"SELECT rel_path, COUNT(*) AS hit_count FROM file_access_log WHERE rel_path LIKE ? GROUP BY rel_path",
(pattern,)
)
return {row["rel_path"]: row["hit_count"] for row in cursor.fetchall()}
def build_transcript_index(transcript_dir: str, stats: dict):
"""Return a dict of basename -> transcript path for a transcript directory."""
try:
with os.scandir(transcript_dir) as it:
index = {}
for entry in it:
if not entry.is_file(follow_symlinks=False):
continue
name = entry.name
if not name.endswith(TRANSCRIPT_EXT):
continue
index[name[:-len(TRANSCRIPT_EXT)]] = entry.path
return index
except FileNotFoundError:
return None
def read_text_file(path: str) -> Optional[str]:
try:
with open(path, 'r', encoding='utf-8') as handle:
return handle.read()
except UnicodeDecodeError:
try:
with open(path, 'r', encoding='cp1252') as handle:
return handle.read()
except Exception:
return None
except Exception:
return None
except PermissionError:
log_permission_error(transcript_dir, stats)
return None
def log_structure(root_path, max_depth=None, show_files=False):
"""
Log folder structure up to max_depth levels (root = depth 1).
If max_depth is None, traverse all depths. Files are logged only when show_files is True.
"""
depth_label = "all" if max_depth is None else f"<= {max_depth}"
log(f"Folder structure (depth {depth_label}) for '{root_path}':")
def _walk(path, depth):
if max_depth is not None and depth > max_depth:
return
try:
with os.scandir(path) as it:
entries = sorted(it, key=lambda e: (not e.is_dir(follow_symlinks=False), e.name.lower()))
for entry in entries:
if entry.is_dir(follow_symlinks=False):
if skip_dir(entry.name):
continue
indent = " " * (depth - 1)
log(f"{indent}- {entry.name}/")
_walk(entry.path, depth + 1)
elif show_files:
indent = " " * (depth - 1)
log(f"{indent}- {entry.name}")
except PermissionError:
indent = " " * (depth - 1)
log(f"{indent}- [permission denied]")
_walk(root_path, depth=1)
def log_file(relative_path: str, filename: str):
"""Debug helper to log each file that is indexed."""
log(f" file: {relative_path} ({filename})")
def updatefileindex():
total_start = monotonic()
cursor = search_db.cursor()
totals = {"folders": 0, "scanned": 0, "deleted": 0}
# Load folder configuration from JSON file.
with open(FOLDER_CONFIG, "r", encoding="utf-8") as f:
config_data = json.load(f)
# Process each configured base folder.
for config in config_data:
for folder in config.get("folders", []):
totals["folders"] += 1
foldername = folder.get("foldername")
log(f"Processing folder: {foldername}")
raw_folderpath = folder.get("folderpath")
norm_folderpath = os.path.normpath(raw_folderpath)
# Optional shallow structure log (off by default)
if LOG_STRUCTURE_DEPTH > 0:
log_structure(norm_folderpath, max_depth=LOG_STRUCTURE_DEPTH, show_files=False)
# Precompute the length of the base folder path (plus one for the separator)
base_prefix = norm_folderpath + os.sep
base_len = len(base_prefix)
# Prefetch hit counts for this basefolder to avoid per-file queries
hitcount_map = get_hit_counts_for_basefolder(foldername)
# Accumulate scanned file data and keys for this base folder.
scanned_files = [] # Each entry: (relative_path, basefolder, filename, filetype, transcript, hitcount)
current_keys = set()
scan_stats = {"dirs": 0, "skipped_dirs": 0, "perm_errors": 0}
transcript_cache = {}
transcripts_read = 0
transcript_errors = 0
site = None
if foldername == 'Gottesdienste Speyer':
site = 'Speyer'
elif foldername == 'Gottesdienste Schwegenheim':
site = 'Schwegenheim'
start_time = monotonic()
last_log_time = start_time
next_log_count = PROGRESS_EVERY_FILES
scanned_count = 0
last_log_count = 0
extract_structure = hf.extract_structure_from_string
extract_date = hf.extract_date_from_string
for entry in scan_dir(norm_folderpath, scan_stats):
transcript = None
scanned_count += 1
entry_path = entry.path
# Get relative part by slicing if possible.
if entry_path.startswith(base_prefix):
rel_part = entry_path[base_len:]
else:
rel_part = os.path.relpath(entry_path, norm_folderpath)
# Prepend the foldername so it becomes part of the stored relative path.
rel_part = rel_part.replace(os.sep, '/')
relative_path = f"{foldername}/{rel_part}"
name_root, name_ext = os.path.splitext(entry.name)
filetype = name_ext.lower()
# Retrieve the hit count for this file from pre-fetched map.
hit_count = hitcount_map.get(relative_path, 0)
# Check for a corresponding transcript file in a sibling "Transkription" folder.
parent_dir = os.path.dirname(entry_path)
transcript_index = transcript_cache.get(parent_dir)
if transcript_index is None and parent_dir not in transcript_cache:
transcript_dir = os.path.join(parent_dir, TRANSCRIPT_DIRNAME)
transcript_index = build_transcript_index(transcript_dir, scan_stats)
transcript_cache[parent_dir] = transcript_index
if transcript_index:
transcript_path = transcript_index.get(name_root)
if transcript_path:
try:
with open(transcript_path, 'r', encoding='utf-8') as tf:
transcript = tf.read()
transcripts_read += 1
except Exception:
transcript_errors += 1
if transcript is None and filetype == '.sng':
sng_text = read_text_file(entry_path)
if sng_text is not None:
transcript = sng_text
transcripts_read += 1
else:
transcript_errors += 1
category, titel, name = extract_structure(entry.name)
performance_date = extract_date(relative_path)
scanned_files.append((relative_path, foldername, entry.name, filetype, category, titel, name, performance_date, site, transcript, hit_count))
current_keys.add((relative_path, entry.name))
# Light progress output
now = monotonic()
if scanned_count >= next_log_count or (now - last_log_time) >= PROGRESS_EVERY_SECS:
elapsed = max(now - start_time, 0.0001)
window_elapsed = max(now - last_log_time, 0.0001)
window_count = scanned_count - last_log_count
window_rate = window_count / window_elapsed
log(f" progress: {scanned_count} files, {scan_stats['dirs']} dirs, {window_rate:.1f} files/s")
last_log_time = now
last_log_count = scanned_count
next_log_count = scanned_count + PROGRESS_EVERY_FILES
# Progress indicator
dir_count = scan_stats["dirs"]
file_count = scanned_count
elapsed = max(monotonic() - start_time, 0.0001)
avg_rate = file_count / elapsed
log(f"Scan summary for '{foldername}': {dir_count} dirs, {file_count} files, {avg_rate:.1f} files/s avg")
if scan_stats["skipped_dirs"]:
log(f" skipped dirs: {scan_stats['skipped_dirs']}")
if scan_stats["perm_errors"]:
log(f" permission errors: {scan_stats['perm_errors']}")
if transcripts_read or transcript_errors:
log(f" transcripts: {transcripts_read} read, {transcript_errors} errors")
log("updating database...")
scan_duration = format_duration(elapsed)
# Remove database entries for files under this base folder that are no longer on disk.
pattern = foldername + os.sep + '%'
cursor.execute("SELECT id, relative_path, filename FROM files WHERE relative_path LIKE ?", (pattern,))
db_rows = cursor.fetchall()
keys_in_db = set((row["relative_path"], row["filename"]) for row in db_rows)
keys_to_delete = keys_in_db - current_keys
deleted_count = len(keys_to_delete)
totals["deleted"] += deleted_count
for key in keys_to_delete:
cursor.execute("DELETE FROM files WHERE relative_path = ? AND filename = ?", key)
# Bulk write the scanned files using INSERT OR REPLACE.
cursor.executemany(
"INSERT OR REPLACE INTO files (relative_path, basefolder, filename, filetype, category, titel, name, performance_date, site, transcript, hitcount) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
scanned_files
)
# Commit changes after processing this base folder.
search_db.commit()
folder_scanned = scanned_count
totals["scanned"] += folder_scanned
log(f"Indexed {folder_scanned} files (deleted {deleted_count}) in '{foldername}'")
log(f"Scan duration for '{foldername}': {scan_duration}")
total_elapsed = max(monotonic() - total_start, 0.0001)
log(f"Index update finished: folders={totals['folders']}, files indexed={totals['scanned']}, removed={totals['deleted']}")
log(f"Total index duration: {format_duration(total_elapsed)}")
return "File index updated successfully"
def convert_dates(search_db,
date_formats=('%d.%m.%Y', '%d.%m.%y')):
"""
Connects to the SQLite database at search_db, and for every row in table 'files':
- Reads the date from performance_date (expects 'dd.mm.yyyy' or 'dd.mm.yy').
- Parses it and reformats to ISO 'YYYY-MM-DD'.
- Updates the row (using id as primary key).
Only counts rows where the conversion was successful.
"""
# Regex to quickly filter out non-matching strings
date_regex = re.compile(r'^\d{1,2}\.\d{1,2}\.\d{2,4}$')
cur = search_db.cursor()
# Fetch all rows with a non-null date
cur.execute("SELECT id, performance_date FROM files")
rows = cur.fetchall()
converted_count = 0
for pk, raw_date in rows:
if not raw_date or not date_regex.match(raw_date):
continue
for fmt in date_formats:
try:
dt = datetime.strptime(raw_date, fmt)
new_date = dt.strftime('%Y-%m-%d')
# Only update if the reformatted date is different
if new_date != raw_date:
cur.execute(
"UPDATE files SET performance_date = ? WHERE id = ?",
(new_date, pk)
)
converted_count += 1
break # stop trying other formats
except ValueError:
continue
search_db.commit()
print(f"Converted {converted_count} rows to ISO format.")
if __name__ == "__main__":
convert_dates(search_db)
init_db() # Initialize the database schema if it doesn't exist
updatefileindex() # Update the file index
search_db.close() # Close the search database connection
access_log_db.close() # Close the access log connection
print("Database connections closed.")