bethaus-app/index_for_search.py
2025-05-11 12:50:05 +02:00

293 lines
12 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import sqlite3
from datetime import datetime
import re
SEARCH_DB_NAME = 'search.db'
ACCESS_LOG_DB_NAME = 'access_log.db'
FOLDER_CONFIG = 'folder_secret_config.json'
# Connect to the search database.
search_db = sqlite3.connect(SEARCH_DB_NAME, check_same_thread=False)
search_db.row_factory = sqlite3.Row
# Open access_log.db in read-only mode.
access_log_db = sqlite3.connect(f'file:{ACCESS_LOG_DB_NAME}?mode=ro', uri=True)
access_log_db.row_factory = sqlite3.Row
def init_db():
"""Initializes the database with the required schema."""
cursor = search_db.cursor()
# Create table with the new 'hitcount' and 'basefolder' columns.
cursor.execute('''
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
relative_path TEXT,
basefolder TEXT,
filename TEXT,
filetype TEXT,
category TEXT,
titel TEXT,
name TEXT,
performance_date TEXT,
site TEXT,
transcript TEXT,
hitcount INTEGER DEFAULT 0,
UNIQUE(relative_path, filename)
)
''')
search_db.commit()
# If the table already existed, try to add the new columns.
# try:
# cursor.execute("ALTER TABLE files ADD COLUMN category TEXT")
# except sqlite3.OperationalError:
# # Likely the column already exists, so we ignore this error.
# pass
search_db.commit()
def scan_dir(directory):
"""Recursively scan directories using os.scandir for improved performance."""
try:
with os.scandir(directory) as it:
for entry in it:
if entry.is_dir(follow_symlinks=False):
# Skip transcription directories immediately.
if entry.name.lower() == "transkription":
continue
yield from scan_dir(entry.path)
elif entry.is_file(follow_symlinks=False):
yield entry
except PermissionError:
return
def get_hit_count(relative_path):
"""Returns the hit count for a given file from the access log database."""
cursor = access_log_db.cursor()
cursor.execute("SELECT COUNT(*) AS hit_count FROM file_access_log WHERE rel_path = ?", (relative_path,))
row = cursor.fetchone()
return row["hit_count"] if row else 0
def extract_date_from_string(string_with_date):
# grab X.Y.Z where X,Y,Z are 14 digits
m = re.search(r'(\d{1,4}\.\d{1,2}\.\d{1,4})', string_with_date)
if not m:
return None
date_str = m.group(1)
parts = date_str.split('.')
# 1) Unambiguous “last group = YYYY”
if len(parts) == 3 and len(parts[2]) == 4:
fmt = '%d.%m.%Y'
# 2) Unambiguous “first group = YYYY”
elif len(parts) == 3 and len(parts[0]) == 4:
fmt = '%Y.%m.%d'
# 3) Ambiguous “XX.XX.XX” → prefer DD.MM.YY, fallback to YY.MM.DD
elif len(parts) == 3 and all(len(p) == 2 for p in parts):
# try last-group-as-year first
try:
dt = datetime.strptime(date_str, '%d.%m.%y')
return dt.strftime('%Y-%m-%d')
except ValueError:
# fallback to first-group-as-year
fmt = '%y.%m.%d'
else:
# optional: handle ISO with dashes
if '-' in date_str:
try:
dt = datetime.strptime(date_str, '%Y-%m-%d')
return dt.strftime('%Y-%m-%d')
except ValueError:
return None
return None
# parse with whichever fmt we settled on
try:
dt = datetime.strptime(date_str, fmt)
return dt.strftime('%Y-%m-%d')
except ValueError:
return None
def updatefileindex():
cursor = search_db.cursor()
# Load folder configuration from JSON file.
with open(FOLDER_CONFIG, "r", encoding="utf-8") as f:
config_data = json.load(f)
# Process each configured base folder.
for config in config_data:
for folder in config.get("folders", []):
foldername = folder.get("foldername")
print(f"Processing folder: {foldername}")
raw_folderpath = folder.get("folderpath")
norm_folderpath = os.path.normpath(raw_folderpath)
# Precompute the length of the base folder path (plus one for the separator)
base_len = len(norm_folderpath) + 1
# Accumulate scanned file data and keys for this base folder.
scanned_files = [] # Each entry: (relative_path, basefolder, filename, filetype, transcript, hitcount)
current_keys = set()
for entry in scan_dir(norm_folderpath):
transcript = None
entry_path = os.path.normpath(entry.path)
# Get relative part by slicing if possible.
if entry_path.startswith(norm_folderpath):
rel_part = entry_path[base_len:]
else:
rel_part = os.path.relpath(entry_path, norm_folderpath)
# Prepend the foldername so it becomes part of the stored relative path.
relative_path = os.path.join(foldername, rel_part).replace(os.sep, '/')
filetype = os.path.splitext(entry.name)[1].lower()
if filetype not in ['.mp3', '.wav', '.ogg', '.m4a', '.flac']:
# Skip non-audio files.
continue
# Retrieve the hit count for this file.
hit_count = get_hit_count(relative_path)
category, titel, name, performance_date, site = None, None, None, None, None
# Determine the site
if foldername == 'Gottesdienste Speyer':
site = 'Speyer'
elif foldername == 'Gottesdienste Schwegenheim':
site = 'Schwegenheim'
# Check for a corresponding transcript file in a sibling "Transkription" folder.
parent_dir = os.path.dirname(entry_path)
transcript_dir = os.path.join(parent_dir, "Transkription")
transcript_filename = os.path.splitext(entry.name)[0] + ".md"
transcript_path = os.path.join(transcript_dir, transcript_filename)
if os.path.exists(transcript_path):
try:
with open(transcript_path, 'r', encoding='utf-8') as tf:
transcript = tf.read()
except Exception:
transcript = None
# extract category and titel from filename
filename_ext = os.path.splitext(entry.name)[0]
left_side, right_side = filename_ext.split('-', 1) if '-' in filename_ext else (filename_ext, None)
try:
int(left_side.strip())
# first part is only a number
previous_right_side = right_side
left_side, right_side = previous_right_side.split('-', 1) if '-' in previous_right_side else (previous_right_side, None)
except:
# first part not a number
pass
if 'predig' in left_side.lower():
category = 'Predigt'
elif 'wort' in left_side.lower() or 'einladung' in left_side.lower():
category = 'Vorwort'
elif 'chor' in left_side.lower():
category = 'Chor'
elif 'orchester' in left_side.lower():
category = 'Orchester'
elif 'gruppenlied' in left_side.lower() or 'jugendlied' in left_side.lower():
category = 'Gruppenlied'
elif 'gemeinsam' in left_side.lower() or 'gesang' in left_side.lower() or 'lied' in left_side.lower():
category = 'Gemeinsamer Gesang'
elif 'gedicht' in left_side.lower():
category = 'Gedicht'
elif 'instrumental' in left_side.lower() or 'musikstück' in left_side.lower():
category = 'Instrumental'
else:
category = None
if right_side:
titel, name = right_side.split('-', 1) if '-' in right_side else (right_side, None)
if category == 'Predigt' or category == 'Vorwort' or category == 'Gedicht':
if not name: # kein Titel, nur Name
name = titel
titel = None
else:
titel = None
name = None
performance_date = extract_date_from_string(relative_path)
scanned_files.append((relative_path, foldername, entry.name, filetype, category, titel, name, performance_date, site, transcript, hit_count))
current_keys.add((relative_path, entry.name))
# Remove database entries for files under this base folder that are no longer on disk.
pattern = foldername + os.sep + '%'
cursor.execute("SELECT id, relative_path, filename FROM files WHERE relative_path LIKE ?", (pattern,))
db_rows = cursor.fetchall()
keys_in_db = set((row["relative_path"], row["filename"]) for row in db_rows)
keys_to_delete = keys_in_db - current_keys
for key in keys_to_delete:
cursor.execute("DELETE FROM files WHERE relative_path = ? AND filename = ?", key)
# Bulk write the scanned files using INSERT OR REPLACE.
cursor.executemany(
"INSERT OR REPLACE INTO files (relative_path, basefolder, filename, filetype, category, titel, name, performance_date, site, transcript, hitcount) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
scanned_files
)
# Commit changes after processing this base folder.
search_db.commit()
return "File index updated successfully"
def convert_dates(search_db,
date_formats=('%d.%m.%Y', '%d.%m.%y')):
"""
Connects to the SQLite database at search_db, and for every row in table 'files':
- Reads the date from performance_date (expects 'dd.mm.yyyy' or 'dd.mm.yy').
- Parses it and reformats to ISO 'YYYY-MM-DD'.
- Updates the row (using id as primary key).
Only counts rows where the conversion was successful.
"""
# Regex to quickly filter out non-matching strings
date_regex = re.compile(r'^\d{1,2}\.\d{1,2}\.\d{2,4}$')
cur = search_db.cursor()
# Fetch all rows with a non-null date
cur.execute("SELECT id, performance_date FROM files")
rows = cur.fetchall()
converted_count = 0
for pk, raw_date in rows:
if not raw_date or not date_regex.match(raw_date):
continue
for fmt in date_formats:
try:
dt = datetime.strptime(raw_date, fmt)
new_date = dt.strftime('%Y-%m-%d')
# Only update if the reformatted date is different
if new_date != raw_date:
cur.execute(
"UPDATE files SET performance_date = ? WHERE id = ?",
(new_date, pk)
)
converted_count += 1
break # stop trying other formats
except ValueError:
continue
search_db.commit()
print(f"Converted {converted_count} rows to ISO format.")
if __name__ == "__main__":
convert_dates(search_db)
init_db() # Initialize the database schema if it doesn't exist
updatefileindex() # Update the file index
search_db.close() # Close the search database connection
access_log_db.close() # Close the access log connection
print("Database connections closed.")