293 lines
12 KiB
Python
Executable File
293 lines
12 KiB
Python
Executable File
import os
|
||
import json
|
||
import sqlite3
|
||
from datetime import datetime
|
||
import re
|
||
|
||
SEARCH_DB_NAME = 'search.db'
|
||
ACCESS_LOG_DB_NAME = 'access_log.db'
|
||
FOLDER_CONFIG = 'folder_secret_config.json'
|
||
|
||
# Connect to the search database.
|
||
search_db = sqlite3.connect(SEARCH_DB_NAME, check_same_thread=False)
|
||
search_db.row_factory = sqlite3.Row
|
||
|
||
# Open access_log.db in read-only mode.
|
||
access_log_db = sqlite3.connect(f'file:{ACCESS_LOG_DB_NAME}?mode=ro', uri=True)
|
||
access_log_db.row_factory = sqlite3.Row
|
||
|
||
def init_db():
|
||
"""Initializes the database with the required schema."""
|
||
cursor = search_db.cursor()
|
||
# Create table with the new 'hitcount' and 'basefolder' columns.
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS files (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
relative_path TEXT,
|
||
basefolder TEXT,
|
||
filename TEXT,
|
||
filetype TEXT,
|
||
category TEXT,
|
||
titel TEXT,
|
||
name TEXT,
|
||
performance_date TEXT,
|
||
site TEXT,
|
||
transcript TEXT,
|
||
hitcount INTEGER DEFAULT 0,
|
||
UNIQUE(relative_path, filename)
|
||
)
|
||
''')
|
||
search_db.commit()
|
||
# If the table already existed, try to add the new columns.
|
||
# try:
|
||
# cursor.execute("ALTER TABLE files ADD COLUMN category TEXT")
|
||
# except sqlite3.OperationalError:
|
||
# # Likely the column already exists, so we ignore this error.
|
||
# pass
|
||
|
||
search_db.commit()
|
||
|
||
def scan_dir(directory):
|
||
"""Recursively scan directories using os.scandir for improved performance."""
|
||
try:
|
||
with os.scandir(directory) as it:
|
||
for entry in it:
|
||
if entry.is_dir(follow_symlinks=False):
|
||
# Skip transcription directories immediately.
|
||
if entry.name.lower() == "transkription":
|
||
continue
|
||
yield from scan_dir(entry.path)
|
||
elif entry.is_file(follow_symlinks=False):
|
||
yield entry
|
||
except PermissionError:
|
||
return
|
||
|
||
def get_hit_count(relative_path):
|
||
"""Returns the hit count for a given file from the access log database."""
|
||
cursor = access_log_db.cursor()
|
||
cursor.execute("SELECT COUNT(*) AS hit_count FROM file_access_log WHERE rel_path = ?", (relative_path,))
|
||
row = cursor.fetchone()
|
||
return row["hit_count"] if row else 0
|
||
|
||
|
||
def extract_date_from_string(string_with_date):
|
||
# grab X.Y.Z where X,Y,Z are 1–4 digits
|
||
m = re.search(r'(\d{1,4}\.\d{1,2}\.\d{1,4})', string_with_date)
|
||
if not m:
|
||
return None
|
||
|
||
date_str = m.group(1)
|
||
parts = date_str.split('.')
|
||
|
||
# 1) Unambiguous “last group = YYYY”
|
||
if len(parts) == 3 and len(parts[2]) == 4:
|
||
fmt = '%d.%m.%Y'
|
||
|
||
# 2) Unambiguous “first group = YYYY”
|
||
elif len(parts) == 3 and len(parts[0]) == 4:
|
||
fmt = '%Y.%m.%d'
|
||
|
||
# 3) Ambiguous “XX.XX.XX” → prefer DD.MM.YY, fallback to YY.MM.DD
|
||
elif len(parts) == 3 and all(len(p) == 2 for p in parts):
|
||
# try last-group-as-year first
|
||
try:
|
||
dt = datetime.strptime(date_str, '%d.%m.%y')
|
||
return dt.strftime('%Y-%m-%d')
|
||
except ValueError:
|
||
# fallback to first-group-as-year
|
||
fmt = '%y.%m.%d'
|
||
|
||
else:
|
||
# optional: handle ISO with dashes
|
||
if '-' in date_str:
|
||
try:
|
||
dt = datetime.strptime(date_str, '%Y-%m-%d')
|
||
return dt.strftime('%Y-%m-%d')
|
||
except ValueError:
|
||
return None
|
||
return None
|
||
|
||
# parse with whichever fmt we settled on
|
||
try:
|
||
dt = datetime.strptime(date_str, fmt)
|
||
return dt.strftime('%Y-%m-%d')
|
||
except ValueError:
|
||
return None
|
||
|
||
|
||
def updatefileindex():
|
||
cursor = search_db.cursor()
|
||
|
||
# Load folder configuration from JSON file.
|
||
with open(FOLDER_CONFIG, "r", encoding="utf-8") as f:
|
||
config_data = json.load(f)
|
||
|
||
# Process each configured base folder.
|
||
for config in config_data:
|
||
for folder in config.get("folders", []):
|
||
foldername = folder.get("foldername")
|
||
print(f"Processing folder: {foldername}")
|
||
raw_folderpath = folder.get("folderpath")
|
||
norm_folderpath = os.path.normpath(raw_folderpath)
|
||
# Precompute the length of the base folder path (plus one for the separator)
|
||
base_len = len(norm_folderpath) + 1
|
||
|
||
# Accumulate scanned file data and keys for this base folder.
|
||
scanned_files = [] # Each entry: (relative_path, basefolder, filename, filetype, transcript, hitcount)
|
||
current_keys = set()
|
||
for entry in scan_dir(norm_folderpath):
|
||
transcript = None
|
||
entry_path = os.path.normpath(entry.path)
|
||
# Get relative part by slicing if possible.
|
||
if entry_path.startswith(norm_folderpath):
|
||
rel_part = entry_path[base_len:]
|
||
else:
|
||
rel_part = os.path.relpath(entry_path, norm_folderpath)
|
||
# Prepend the foldername so it becomes part of the stored relative path.
|
||
relative_path = os.path.join(foldername, rel_part).replace(os.sep, '/')
|
||
filetype = os.path.splitext(entry.name)[1].lower()
|
||
|
||
if filetype not in ['.mp3', '.wav', '.ogg', '.m4a', '.flac']:
|
||
# Skip non-audio files.
|
||
continue
|
||
|
||
# Retrieve the hit count for this file.
|
||
hit_count = get_hit_count(relative_path)
|
||
|
||
category, titel, name, performance_date, site = None, None, None, None, None
|
||
|
||
# Determine the site
|
||
if foldername == 'Gottesdienste Speyer':
|
||
site = 'Speyer'
|
||
elif foldername == 'Gottesdienste Schwegenheim':
|
||
site = 'Schwegenheim'
|
||
|
||
# Check for a corresponding transcript file in a sibling "Transkription" folder.
|
||
parent_dir = os.path.dirname(entry_path)
|
||
transcript_dir = os.path.join(parent_dir, "Transkription")
|
||
transcript_filename = os.path.splitext(entry.name)[0] + ".md"
|
||
transcript_path = os.path.join(transcript_dir, transcript_filename)
|
||
if os.path.exists(transcript_path):
|
||
try:
|
||
with open(transcript_path, 'r', encoding='utf-8') as tf:
|
||
transcript = tf.read()
|
||
except Exception:
|
||
transcript = None
|
||
|
||
# extract category and titel from filename
|
||
filename_ext = os.path.splitext(entry.name)[0]
|
||
left_side, right_side = filename_ext.split('-', 1) if '-' in filename_ext else (filename_ext, None)
|
||
try:
|
||
int(left_side.strip())
|
||
# first part is only a number
|
||
previous_right_side = right_side
|
||
left_side, right_side = previous_right_side.split('-', 1) if '-' in previous_right_side else (previous_right_side, None)
|
||
except:
|
||
# first part not a number
|
||
pass
|
||
|
||
if 'predig' in left_side.lower():
|
||
category = 'Predigt'
|
||
elif 'wort' in left_side.lower() or 'einladung' in left_side.lower():
|
||
category = 'Vorwort'
|
||
elif 'chor' in left_side.lower():
|
||
category = 'Chor'
|
||
elif 'orchester' in left_side.lower():
|
||
category = 'Orchester'
|
||
elif 'gruppenlied' in left_side.lower() or 'jugendlied' in left_side.lower():
|
||
category = 'Gruppenlied'
|
||
elif 'gemeinsam' in left_side.lower() or 'gesang' in left_side.lower() or 'lied' in left_side.lower():
|
||
category = 'Gemeinsamer Gesang'
|
||
elif 'gedicht' in left_side.lower():
|
||
category = 'Gedicht'
|
||
elif 'instrumental' in left_side.lower() or 'musikstück' in left_side.lower():
|
||
category = 'Instrumental'
|
||
else:
|
||
category = None
|
||
|
||
if right_side:
|
||
titel, name = right_side.split('-', 1) if '-' in right_side else (right_side, None)
|
||
if category == 'Predigt' or category == 'Vorwort' or category == 'Gedicht':
|
||
if not name: # kein Titel, nur Name
|
||
name = titel
|
||
titel = None
|
||
else:
|
||
titel = None
|
||
name = None
|
||
|
||
performance_date = extract_date_from_string(relative_path)
|
||
|
||
scanned_files.append((relative_path, foldername, entry.name, filetype, category, titel, name, performance_date, site, transcript, hit_count))
|
||
current_keys.add((relative_path, entry.name))
|
||
|
||
# Remove database entries for files under this base folder that are no longer on disk.
|
||
pattern = foldername + os.sep + '%'
|
||
cursor.execute("SELECT id, relative_path, filename FROM files WHERE relative_path LIKE ?", (pattern,))
|
||
db_rows = cursor.fetchall()
|
||
keys_in_db = set((row["relative_path"], row["filename"]) for row in db_rows)
|
||
keys_to_delete = keys_in_db - current_keys
|
||
for key in keys_to_delete:
|
||
cursor.execute("DELETE FROM files WHERE relative_path = ? AND filename = ?", key)
|
||
|
||
# Bulk write the scanned files using INSERT OR REPLACE.
|
||
cursor.executemany(
|
||
"INSERT OR REPLACE INTO files (relative_path, basefolder, filename, filetype, category, titel, name, performance_date, site, transcript, hitcount) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||
scanned_files
|
||
)
|
||
|
||
# Commit changes after processing this base folder.
|
||
search_db.commit()
|
||
|
||
return "File index updated successfully"
|
||
|
||
def convert_dates(search_db,
|
||
date_formats=('%d.%m.%Y', '%d.%m.%y')):
|
||
"""
|
||
Connects to the SQLite database at search_db, and for every row in table 'files':
|
||
- Reads the date from performance_date (expects 'dd.mm.yyyy' or 'dd.mm.yy').
|
||
- Parses it and reformats to ISO 'YYYY-MM-DD'.
|
||
- Updates the row (using id as primary key).
|
||
|
||
Only counts rows where the conversion was successful.
|
||
"""
|
||
# Regex to quickly filter out non-matching strings
|
||
date_regex = re.compile(r'^\d{1,2}\.\d{1,2}\.\d{2,4}$')
|
||
|
||
cur = search_db.cursor()
|
||
|
||
# Fetch all rows with a non-null date
|
||
cur.execute("SELECT id, performance_date FROM files")
|
||
rows = cur.fetchall()
|
||
|
||
converted_count = 0
|
||
|
||
for pk, raw_date in rows:
|
||
if not raw_date or not date_regex.match(raw_date):
|
||
continue
|
||
|
||
for fmt in date_formats:
|
||
try:
|
||
dt = datetime.strptime(raw_date, fmt)
|
||
new_date = dt.strftime('%Y-%m-%d')
|
||
# Only update if the reformatted date is different
|
||
if new_date != raw_date:
|
||
cur.execute(
|
||
"UPDATE files SET performance_date = ? WHERE id = ?",
|
||
(new_date, pk)
|
||
)
|
||
converted_count += 1
|
||
break # stop trying other formats
|
||
except ValueError:
|
||
continue
|
||
|
||
search_db.commit()
|
||
print(f"Converted {converted_count} rows to ISO format.")
|
||
|
||
if __name__ == "__main__":
|
||
convert_dates(search_db)
|
||
init_db() # Initialize the database schema if it doesn't exist
|
||
updatefileindex() # Update the file index
|
||
search_db.close() # Close the search database connection
|
||
access_log_db.close() # Close the access log connection
|
||
print("Database connections closed.")
|