bethaus-app/index_for_search.py
2026-01-24 15:18:50 +00:00

298 lines
12 KiB
Python
Executable File

import os
import json
import sqlite3
from datetime import datetime
import re
import helperfunctions as hf
from collections import defaultdict
SEARCH_DB_NAME = 'search.db'
ACCESS_LOG_DB_NAME = 'access_log.db'
FOLDER_CONFIG = 'folder_secret_config.json'
IGNORED_DIRS = {"Transkription", "@eaDir", ".app", "#recycle"}
# Connect to the search database.
search_db = sqlite3.connect(SEARCH_DB_NAME, check_same_thread=False)
search_db.row_factory = sqlite3.Row
# Open access_log.db in read-only mode.
access_log_db = sqlite3.connect(f'file:{ACCESS_LOG_DB_NAME}?mode=ro', uri=True)
access_log_db.row_factory = sqlite3.Row
def log(message: str):
"""Small helper to ensure console output is flushed immediately."""
print(message, flush=True)
def skip_dir(name: str) -> bool:
"""Return True when a directory name should be skipped during traversal/logging."""
return name.startswith('.') or name in IGNORED_DIRS
def init_db():
"""Initializes the database with the required schema."""
cursor = search_db.cursor()
# Create table with the new 'hitcount' and 'basefolder' columns.
cursor.execute('''
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
relative_path TEXT,
basefolder TEXT,
filename TEXT,
filetype TEXT,
category TEXT,
titel TEXT,
name TEXT,
performance_date TEXT,
site TEXT,
transcript TEXT,
hitcount INTEGER DEFAULT 0,
UNIQUE(relative_path, filename)
)
''')
search_db.commit()
# If the table already existed, try to add the new columns.
# try:
# cursor.execute("ALTER TABLE files ADD COLUMN category TEXT")
# except sqlite3.OperationalError:
# # Likely the column already exists, so we ignore this error.
# pass
search_db.commit()
def scan_dir(directory):
"""Recursively scan directories using os.scandir for improved performance."""
try:
with os.scandir(directory) as it:
for entry in it:
if entry.is_dir(follow_symlinks=False):
# Skip unwanted directories immediately.
if skip_dir(entry.name):
continue
yield from scan_dir(entry.path)
elif entry.is_file(follow_symlinks=False):
yield entry
except PermissionError:
log(f"Permission denied: {directory}")
return
def get_hit_count(relative_path):
"""Returns the hit count for a given file from the access log database."""
cursor = access_log_db.cursor()
cursor.execute("SELECT COUNT(*) AS hit_count FROM file_access_log WHERE rel_path = ?", (relative_path,))
row = cursor.fetchone()
return row["hit_count"] if row else 0
def get_hit_counts_for_basefolder(basefolder: str) -> dict:
"""Return a map of rel_path -> hit_count for all files under a basefolder."""
cursor = access_log_db.cursor()
pattern = f"{basefolder}/%"
cursor.execute(
"SELECT rel_path, COUNT(*) AS hit_count FROM file_access_log WHERE rel_path LIKE ? GROUP BY rel_path",
(pattern,)
)
return {row["rel_path"]: row["hit_count"] for row in cursor.fetchall()}
def log_structure(root_path, max_depth=None, show_files=False):
"""
Log folder structure up to max_depth levels (root = depth 1).
If max_depth is None, traverse all depths. Files are logged only when show_files is True.
"""
depth_label = "all" if max_depth is None else f"<= {max_depth}"
log(f"Folder structure (depth {depth_label}) for '{root_path}':")
def _walk(path, depth):
if max_depth is not None and depth > max_depth:
return
try:
with os.scandir(path) as it:
entries = sorted(it, key=lambda e: (not e.is_dir(follow_symlinks=False), e.name.lower()))
for entry in entries:
if entry.is_dir(follow_symlinks=False):
if skip_dir(entry.name):
continue
indent = " " * (depth - 1)
log(f"{indent}- {entry.name}/")
_walk(entry.path, depth + 1)
elif show_files:
indent = " " * (depth - 1)
log(f"{indent}- {entry.name}")
except PermissionError:
indent = " " * (depth - 1)
log(f"{indent}- [permission denied]")
_walk(root_path, depth=1)
def log_file(relative_path: str, filename: str):
"""Debug helper to log each file that is indexed."""
log(f" file: {relative_path} ({filename})")
def log_directory_batch(directory: str, files: list[str]):
"""Log file count for a directory without listing filenames."""
if not files:
return
log(f" Dir {directory or '/'}: {len(files)} files")
def updatefileindex():
cursor = search_db.cursor()
totals = {"folders": 0, "scanned": 0, "deleted": 0}
# Load folder configuration from JSON file.
with open(FOLDER_CONFIG, "r", encoding="utf-8") as f:
config_data = json.load(f)
# Process each configured base folder.
for config in config_data:
for folder in config.get("folders", []):
totals["folders"] += 1
foldername = folder.get("foldername")
log(f"Processing folder: {foldername}")
raw_folderpath = folder.get("folderpath")
norm_folderpath = os.path.normpath(raw_folderpath)
# Only log folder names up to 3 levels deep; suppress filenames
log_structure(norm_folderpath, max_depth=3, show_files=False)
# Precompute the length of the base folder path (plus one for the separator)
base_len = len(norm_folderpath) + 1
# Prefetch hit counts for this basefolder to avoid per-file queries
hitcount_map = get_hit_counts_for_basefolder(foldername)
# Accumulate scanned file data and keys for this base folder.
scanned_files = [] # Each entry: (relative_path, basefolder, filename, filetype, transcript, hitcount)
current_keys = set()
dir_files = defaultdict(list) # map of directory -> list of filenames
for entry in scan_dir(norm_folderpath):
transcript = None
entry_path = os.path.normpath(entry.path)
# Get relative part by slicing if possible.
if entry_path.startswith(norm_folderpath):
rel_part = entry_path[base_len:]
else:
rel_part = os.path.relpath(entry_path, norm_folderpath)
# Prepend the foldername so it becomes part of the stored relative path.
relative_path = os.path.join(foldername, rel_part).replace(os.sep, '/')
filetype = os.path.splitext(entry.name)[1].lower()
# Retrieve the hit count for this file from pre-fetched map.
hit_count = hitcount_map.get(relative_path, 0)
# Determine the site
if foldername == 'Gottesdienste Speyer':
site = 'Speyer'
elif foldername == 'Gottesdienste Schwegenheim':
site = 'Schwegenheim'
else:
site = None
# Check for a corresponding transcript file in a sibling "Transkription" folder.
parent_dir = os.path.dirname(entry_path)
transcript_dir = os.path.join(parent_dir, "Transkription")
transcript_filename = os.path.splitext(entry.name)[0] + ".md"
transcript_path = os.path.join(transcript_dir, transcript_filename)
if os.path.exists(transcript_path):
try:
with open(transcript_path, 'r', encoding='utf-8') as tf:
transcript = tf.read()
except Exception:
transcript = None
category, titel, name = hf.extract_structure_from_string(entry.name)
performance_date = hf.extract_date_from_string(relative_path)
# Debug: batch file logging per directory
dir_files[os.path.dirname(relative_path)].append(entry.name)
scanned_files.append((relative_path, foldername, entry.name, filetype, category, titel, name, performance_date, site, transcript, hit_count))
current_keys.add((relative_path, entry.name))
# After scanning, log grouped files per directory
for d, files in dir_files.items():
log_directory_batch(d, files)
# Progress indicator
dir_count = len(dir_files)
file_count = len(scanned_files)
log(f"Found {dir_count} folders and {file_count} files in '{foldername}'.")
log("updating database...")
# Remove database entries for files under this base folder that are no longer on disk.
pattern = foldername + os.sep + '%'
cursor.execute("SELECT id, relative_path, filename FROM files WHERE relative_path LIKE ?", (pattern,))
db_rows = cursor.fetchall()
keys_in_db = set((row["relative_path"], row["filename"]) for row in db_rows)
keys_to_delete = keys_in_db - current_keys
deleted_count = len(keys_to_delete)
totals["deleted"] += deleted_count
for key in keys_to_delete:
cursor.execute("DELETE FROM files WHERE relative_path = ? AND filename = ?", key)
# Bulk write the scanned files using INSERT OR REPLACE.
cursor.executemany(
"INSERT OR REPLACE INTO files (relative_path, basefolder, filename, filetype, category, titel, name, performance_date, site, transcript, hitcount) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
scanned_files
)
# Commit changes after processing this base folder.
search_db.commit()
folder_scanned = len(scanned_files)
totals["scanned"] += folder_scanned
log(f"Indexed {folder_scanned} files (deleted {deleted_count}) in '{foldername}'")
log(f"Index update finished: folders={totals['folders']}, files indexed={totals['scanned']}, removed={totals['deleted']}")
return "File index updated successfully"
def convert_dates(search_db,
date_formats=('%d.%m.%Y', '%d.%m.%y')):
"""
Connects to the SQLite database at search_db, and for every row in table 'files':
- Reads the date from performance_date (expects 'dd.mm.yyyy' or 'dd.mm.yy').
- Parses it and reformats to ISO 'YYYY-MM-DD'.
- Updates the row (using id as primary key).
Only counts rows where the conversion was successful.
"""
# Regex to quickly filter out non-matching strings
date_regex = re.compile(r'^\d{1,2}\.\d{1,2}\.\d{2,4}$')
cur = search_db.cursor()
# Fetch all rows with a non-null date
cur.execute("SELECT id, performance_date FROM files")
rows = cur.fetchall()
converted_count = 0
for pk, raw_date in rows:
if not raw_date or not date_regex.match(raw_date):
continue
for fmt in date_formats:
try:
dt = datetime.strptime(raw_date, fmt)
new_date = dt.strftime('%Y-%m-%d')
# Only update if the reformatted date is different
if new_date != raw_date:
cur.execute(
"UPDATE files SET performance_date = ? WHERE id = ?",
(new_date, pk)
)
converted_count += 1
break # stop trying other formats
except ValueError:
continue
search_db.commit()
print(f"Converted {converted_count} rows to ISO format.")
if __name__ == "__main__":
convert_dates(search_db)
init_db() # Initialize the database schema if it doesn't exist
updatefileindex() # Update the file index
search_db.close() # Close the search database connection
access_log_db.close() # Close the access log connection
print("Database connections closed.")