bethaus-app/search_db_analyzer.py
2025-12-14 10:00:18 +00:00

128 lines
4.1 KiB
Python

import os
import sqlite3
from typing import List, Dict, Any
from flask import render_template, request, jsonify
import auth
APP_CONFIG = auth.return_app_config()
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
SEARCH_DB_PATH = os.path.join(BASE_DIR, "search.db")
# Open search.db in read-only mode to avoid accidental writes.
search_db = sqlite3.connect(f"file:{SEARCH_DB_PATH}?mode=ro", uri=True, check_same_thread=False)
search_db.row_factory = sqlite3.Row
def _normalize_folder_path(folder_path: str) -> str:
"""Normalize folder paths to a consistent, slash-based format."""
return folder_path.replace("\\", "/").strip().strip("/")
def _list_children(parent: str = "") -> List[str]:
"""
Return the next folder level for the given parent.
- parent == "" → first level (basefolder column)
- parent != "" → distinct next segment in relative_path below that parent
"""
cursor = search_db.cursor()
normalized = _normalize_folder_path(parent)
if not normalized:
rows = cursor.execute("SELECT DISTINCT basefolder FROM files ORDER BY basefolder").fetchall()
return [row["basefolder"] for row in rows if row["basefolder"]]
prefix = normalized + "/"
rows = cursor.execute(
"SELECT relative_path FROM files WHERE relative_path LIKE ?",
(prefix + "%",)
).fetchall()
children = set()
plen = len(prefix)
for row in rows:
rel = row["relative_path"]
# Strip the prefix and keep only the next segment
remainder = rel[plen:]
if "/" in remainder:
next_seg = remainder.split("/", 1)[0]
else:
# File directly under parent; no deeper folder
continue
if next_seg:
children.add(next_seg)
return sorted(children)
def _query_counts(folder_path: str) -> Dict[str, Any]:
"""Run a grouped count query on search.db filtered by folder_path."""
normalized = _normalize_folder_path(folder_path)
params: List[str] = []
conditions: List[str] = []
if not normalized:
raise ValueError("Bitte einen Ordnerpfad angeben.")
# Match both basefolder and deeper paths inside that folder.
conditions.append("(relative_path LIKE ? OR basefolder = ?)")
params.extend([f"{normalized}/%", normalized])
where_sql = " AND ".join(conditions) if conditions else "1=1"
sql = f"""
SELECT COALESCE(category, 'Keine Kategorie') AS category_label,
COUNT(*) AS file_count
FROM files
WHERE {where_sql}
GROUP BY category_label
ORDER BY file_count DESC
"""
cursor = search_db.cursor()
rows = cursor.execute(sql, params).fetchall()
total = sum(row["file_count"] for row in rows)
categories = [
{"category": row["category_label"], "count": row["file_count"]}
for row in rows
]
return {"total": total, "categories": categories}
def search_db_analyzer():
"""Render the UI for analyzing search.db by folder."""
return render_template(
"search_db_analyzer.html",
admin_enabled=auth.is_admin(),
title_short=APP_CONFIG.get("TITLE_SHORT", "Default Title"),
title_long=APP_CONFIG.get("TITLE_LONG", "Default Title"),
)
def search_db_query():
"""Return grouped counts by category for a given folder path."""
payload = request.get_json(silent=True) or {}
folder_path = (payload.get("folder_path") or "").strip()
try:
result = _query_counts(folder_path)
except ValueError as exc:
return jsonify({"error": str(exc)}), 400
except Exception as exc: # pragma: no cover - defensive logging
return jsonify({"error": f"Abfrage fehlgeschlagen: {exc}"}), 500
return jsonify(result)
def search_db_folders():
"""Return next-level folder names for the given parent path (or basefolders)."""
parent = request.args.get("parent", "").strip()
try:
children = _list_children(parent)
except Exception as exc: # pragma: no cover - defensive logging
return jsonify({"error": f"Ordner konnten nicht geladen werden: {exc}"}), 500
return jsonify({"children": children})