bethaus-app/index_for_search.py
2025-05-20 20:46:52 +00:00

249 lines
10 KiB
Python
Executable File

import os
import json
import sqlite3
from datetime import datetime
import re
import helperfunctions as hf
SEARCH_DB_NAME = 'search.db'
ACCESS_LOG_DB_NAME = 'access_log.db'
FOLDER_CONFIG = 'folder_secret_config.json'
# Connect to the search database.
search_db = sqlite3.connect(SEARCH_DB_NAME, check_same_thread=False)
search_db.row_factory = sqlite3.Row
# Open access_log.db in read-only mode.
access_log_db = sqlite3.connect(f'file:{ACCESS_LOG_DB_NAME}?mode=ro', uri=True)
access_log_db.row_factory = sqlite3.Row
def init_db():
"""Initializes the database with the required schema."""
cursor = search_db.cursor()
# Create table with the new 'hitcount' and 'basefolder' columns.
cursor.execute('''
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
relative_path TEXT,
basefolder TEXT,
filename TEXT,
filetype TEXT,
category TEXT,
titel TEXT,
name TEXT,
performance_date TEXT,
site TEXT,
transcript TEXT,
hitcount INTEGER DEFAULT 0,
UNIQUE(relative_path, filename)
)
''')
search_db.commit()
# If the table already existed, try to add the new columns.
# try:
# cursor.execute("ALTER TABLE files ADD COLUMN category TEXT")
# except sqlite3.OperationalError:
# # Likely the column already exists, so we ignore this error.
# pass
search_db.commit()
def scan_dir(directory):
"""Recursively scan directories using os.scandir for improved performance."""
try:
with os.scandir(directory) as it:
for entry in it:
if entry.is_dir(follow_symlinks=False):
# Skip transcription directories immediately.
if entry.name.lower() == "transkription":
continue
yield from scan_dir(entry.path)
elif entry.is_file(follow_symlinks=False):
yield entry
except PermissionError:
return
def get_hit_count(relative_path):
"""Returns the hit count for a given file from the access log database."""
cursor = access_log_db.cursor()
cursor.execute("SELECT COUNT(*) AS hit_count FROM file_access_log WHERE rel_path = ?", (relative_path,))
row = cursor.fetchone()
return row["hit_count"] if row else 0
def updatefileindex():
cursor = search_db.cursor()
# Load folder configuration from JSON file.
with open(FOLDER_CONFIG, "r", encoding="utf-8") as f:
config_data = json.load(f)
# Process each configured base folder.
for config in config_data:
for folder in config.get("folders", []):
foldername = folder.get("foldername")
print(f"Processing folder: {foldername}")
raw_folderpath = folder.get("folderpath")
norm_folderpath = os.path.normpath(raw_folderpath)
# Precompute the length of the base folder path (plus one for the separator)
base_len = len(norm_folderpath) + 1
# Accumulate scanned file data and keys for this base folder.
scanned_files = [] # Each entry: (relative_path, basefolder, filename, filetype, transcript, hitcount)
current_keys = set()
for entry in scan_dir(norm_folderpath):
transcript = None
entry_path = os.path.normpath(entry.path)
# Get relative part by slicing if possible.
if entry_path.startswith(norm_folderpath):
rel_part = entry_path[base_len:]
else:
rel_part = os.path.relpath(entry_path, norm_folderpath)
# Prepend the foldername so it becomes part of the stored relative path.
relative_path = os.path.join(foldername, rel_part).replace(os.sep, '/')
filetype = os.path.splitext(entry.name)[1].lower()
if filetype not in ['.mp3', '.wav', '.ogg', '.m4a', '.flac']:
# Skip non-audio files.
continue
# Retrieve the hit count for this file.
hit_count = get_hit_count(relative_path)
category, titel, name, performance_date, site = None, None, None, None, None
# Determine the site
if foldername == 'Gottesdienste Speyer':
site = 'Speyer'
elif foldername == 'Gottesdienste Schwegenheim':
site = 'Schwegenheim'
# Check for a corresponding transcript file in a sibling "Transkription" folder.
parent_dir = os.path.dirname(entry_path)
transcript_dir = os.path.join(parent_dir, "Transkription")
transcript_filename = os.path.splitext(entry.name)[0] + ".md"
transcript_path = os.path.join(transcript_dir, transcript_filename)
if os.path.exists(transcript_path):
try:
with open(transcript_path, 'r', encoding='utf-8') as tf:
transcript = tf.read()
except Exception:
transcript = None
# extract category and titel from filename
filename_ext = os.path.splitext(entry.name)[0]
left_side, right_side = filename_ext.split('-', 1) if '-' in filename_ext else (filename_ext, None)
try:
int(left_side.strip())
# first part is only a number
previous_right_side = right_side
left_side, right_side = previous_right_side.split('-', 1) if '-' in previous_right_side else (previous_right_side, None)
except:
# first part not a number
pass
if 'predig' in left_side.lower():
category = 'Predigt'
elif 'wort' in left_side.lower() or 'einladung' in left_side.lower():
category = 'Vorwort'
elif 'chor' in left_side.lower():
category = 'Chor'
elif 'orchester' in left_side.lower():
category = 'Orchester'
elif 'gruppenlied' in left_side.lower() or 'jugendlied' in left_side.lower():
category = 'Gruppenlied'
elif 'gemeinsam' in left_side.lower() or 'gesang' in left_side.lower() or 'lied' in left_side.lower():
category = 'Gemeinsamer Gesang'
elif 'gedicht' in left_side.lower():
category = 'Gedicht'
elif 'instrumental' in left_side.lower() or 'musikstück' in left_side.lower():
category = 'Instrumental'
else:
category = None
if right_side:
titel, name = right_side.split('-', 1) if '-' in right_side else (right_side, None)
if category == 'Predigt' or category == 'Vorwort' or category == 'Gedicht':
if not name: # kein Titel, nur Name
name = titel
titel = None
else:
titel = None
name = None
performance_date = hf.extract_date_from_string(relative_path)
scanned_files.append((relative_path, foldername, entry.name, filetype, category, titel, name, performance_date, site, transcript, hit_count))
current_keys.add((relative_path, entry.name))
# Remove database entries for files under this base folder that are no longer on disk.
pattern = foldername + os.sep + '%'
cursor.execute("SELECT id, relative_path, filename FROM files WHERE relative_path LIKE ?", (pattern,))
db_rows = cursor.fetchall()
keys_in_db = set((row["relative_path"], row["filename"]) for row in db_rows)
keys_to_delete = keys_in_db - current_keys
for key in keys_to_delete:
cursor.execute("DELETE FROM files WHERE relative_path = ? AND filename = ?", key)
# Bulk write the scanned files using INSERT OR REPLACE.
cursor.executemany(
"INSERT OR REPLACE INTO files (relative_path, basefolder, filename, filetype, category, titel, name, performance_date, site, transcript, hitcount) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
scanned_files
)
# Commit changes after processing this base folder.
search_db.commit()
return "File index updated successfully"
def convert_dates(search_db,
date_formats=('%d.%m.%Y', '%d.%m.%y')):
"""
Connects to the SQLite database at search_db, and for every row in table 'files':
- Reads the date from performance_date (expects 'dd.mm.yyyy' or 'dd.mm.yy').
- Parses it and reformats to ISO 'YYYY-MM-DD'.
- Updates the row (using id as primary key).
Only counts rows where the conversion was successful.
"""
# Regex to quickly filter out non-matching strings
date_regex = re.compile(r'^\d{1,2}\.\d{1,2}\.\d{2,4}$')
cur = search_db.cursor()
# Fetch all rows with a non-null date
cur.execute("SELECT id, performance_date FROM files")
rows = cur.fetchall()
converted_count = 0
for pk, raw_date in rows:
if not raw_date or not date_regex.match(raw_date):
continue
for fmt in date_formats:
try:
dt = datetime.strptime(raw_date, fmt)
new_date = dt.strftime('%Y-%m-%d')
# Only update if the reformatted date is different
if new_date != raw_date:
cur.execute(
"UPDATE files SET performance_date = ? WHERE id = ?",
(new_date, pk)
)
converted_count += 1
break # stop trying other formats
except ValueError:
continue
search_db.commit()
print(f"Converted {converted_count} rows to ISO format.")
if __name__ == "__main__":
convert_dates(search_db)
init_db() # Initialize the database schema if it doesn't exist
updatefileindex() # Update the file index
search_db.close() # Close the search database connection
access_log_db.close() # Close the access log connection
print("Database connections closed.")