Merge branch 'development' of gitea.centx.de:lelo/bethaus-app into development

This commit is contained in:
lelo 2025-07-24 09:47:01 +02:00
commit f3b44d1d5a
2 changed files with 81 additions and 21 deletions

View File

@ -5,6 +5,7 @@ import geoip2.database
from auth import require_secret from auth import require_secret
from collections import defaultdict from collections import defaultdict
import pandas as pd import pandas as pd
from typing import Optional, List, Tuple
import json import json
import os import os
import auth import auth
@ -701,9 +702,76 @@ def export_to_excel():
# Close the cursor and database connection # Close the cursor and database connection
cursor.close() cursor.close()
def search_and_replace_rel_path(
search: str,
replacement: str,
dry_run: bool = True,
limit_preview: int = 10
) -> Optional[List[Tuple[str, str]]]:
"""
Find all rel_path values containing `search`.
In dry_run mode:
- show up to `limit_preview` example renames
- show total count
- prompt you to apply them (yes/no)
If you answer yes (or if dry_run=False), performs the UPDATE.
Returns the list of (old_path, new_path) that were applied (or would be applied).
"""
cursor = log_db.cursor()
like_pattern = f'%{search}%'
# 1) Fetch matching rows
cursor.execute(
"SELECT id, rel_path FROM file_access_log WHERE rel_path LIKE ?;",
(like_pattern,)
)
rows = cursor.fetchall() # List of (id, old_path)
total = len(rows)
if total == 0:
print("No matching rel_path entries found.")
return []
# Build all renames
renames = [(old, old.replace(search, replacement)) for (_id, old) in rows]
# In dry_run: preview
if dry_run:
print(f"\nShowing up to {limit_preview} of {total} pending renames:\n")
for old, new in renames[:limit_preview]:
print(f" {old!r}{new!r}")
if total > limit_preview:
print(f" ...and {total - limit_preview} more\n")
print(f"Total: {total} renaming action(s) would be applied.\n")
ans = input("Apply these changes? (yes/no): ").strip().lower()
if ans in ('y', 'yes'):
# Switch to actual update
return search_and_replace_rel_path(search, replacement, dry_run=False)
else:
print("Aborted; no changes made.")
return renames
# If not dry_run, apply the UPDATE once
cursor.execute(
"""
UPDATE file_access_log
SET rel_path = replace(rel_path, ?, ?)
WHERE rel_path LIKE ?;
""",
(search, replacement, like_pattern)
)
log_db.commit()
updated = cursor.rowcount
print(f"Done: {updated} row(s) updated.")
return renames
if __name__ == "__main__": if __name__ == "__main__":
print("Running as a standalone script.") # Example usage: dry run first
export_to_excel() search_and_replace_rel_path(
print("Exported search_db to search_db.xlsx") search="2025-06-13 Freitag 16 Uhr",
replacement="2025-06-13 Eröffnungsgottesdienst Freitag 16 Uhr",
dry_run=True
)

View File

@ -8,6 +8,7 @@ import auth
app_config = auth.return_app_config() app_config = auth.return_app_config()
BASE_DIR = os.path.realpath(app_config['BASE_DIR']) BASE_DIR = os.path.realpath(app_config['BASE_DIR'])
CATEGORY_KEYWORDS = app_config['CATEGORY_KEYWORDS']
log_db = sqlite3.connect("access_log.db", check_same_thread=False) log_db = sqlite3.connect("access_log.db", check_same_thread=False)
@ -88,21 +89,6 @@ def extract_structure_from_string(input_string):
# first part not a number # first part not a number
pass pass
# define your mapping: category → list of trigger-words
CATEGORY_KEYWORDS = {
'Predigt': ['predig', 'thema'],
'Vorwort': ['wort', 'einladung', 'begrüßung', 'ansprache', 'einleitung', 'aufruf zum', 'zuruf zum'],
'Kinderchor': ['kinderchor'],
'Jugendchor': ['jugendchor'],
'Orchester': ['orchester', 'sinfonie', 'symphonie'],
'Chor': ['chor'],
'Gemeinsamer Gesang': ['gemeinsam', 'gemeindelied', 'gemeinsamer gesang'],
'Gruppenlied': ['gruppenlied', 'jugend', 'lied', 'musikgruppe'],
'Gedicht': ['gedicht'],
'Erzählung': ['vortrag', 'erzä', 'program'],
'Instrumental': ['instrumental', 'musikstück', 'harfenstück'],
}
# walk the dict in your desired priority order # walk the dict in your desired priority order
category = None category = None
text = left_side.lower() text = left_side.lower()
@ -170,7 +156,13 @@ def generate_top_list(category):
filelist = [] filelist = []
for record in records: for record in records:
rel_path = record['rel_path'] rel_path = record['rel_path']
if os.path.exists(os.path.join(BASE_DIR, rel_path)): # ensure file exists on disk // slow operation. maybe improve later
# Locate the real file on disk
root, *relative_parts = rel_path.split('/')
base_path = session['folders'].get(root)
full_path = os.path.join(base_path or '', *relative_parts)
if os.path.exists(os.path.join(BASE_DIR, full_path)): # ensure file exists on disk // slow operation. maybe improve later
filelist.append({ filelist.append({
'name': os.path.basename(rel_path), 'name': os.path.basename(rel_path),
'path': rel_path, 'path': rel_path,