bethaus-app/search_db_analyzer.py
2026-01-26 17:10:03 +00:00

202 lines
6.2 KiB
Python

import os
import sqlite3
from typing import List, Dict, Any
from flask import render_template, request, jsonify
import auth
APP_CONFIG = auth.return_app_config()
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
SEARCH_DB_PATH = os.path.join(BASE_DIR, "search.db")
# Filetype buckets (extension-based, lowercase, leading dot).
FILETYPE_BUCKETS = [
("Liedtexte", {
".sng"
}),
("Image", {
".png", ".gif", ".bmp", ".webp", ".tiff", ".tif", ".svg", ".ico"
}),
("Video", {
".mp4", ".m4v", ".mov", ".avi", ".mkv", ".webm", ".mpg", ".mpeg", ".3gp", ".3g2", ".wmv"
}),
("Photo", {
".jpg", ".jpeg", ".heic", ".heif", ".dng", ".cr2", ".cr3", ".arw", ".nef", ".orf", ".rw2", ".raf"
}),
("Document", {
".pdf", ".doc", ".docx", ".ppt", ".pptx", ".txt", ".rtf", ".md", ".odt", ".pages", ".key", ".note"
}),
("Table", {
".xls", ".xlsx", ".csv", ".ods", ".tsv"
}),
("Audio", {
".mp3", ".wav", ".m4a", ".flac", ".ogg", ".aac", ".wma", ".aiff", ".alac"
}),
]
FILETYPE_ORDER = [label for label, _ in FILETYPE_BUCKETS] + ["Misc"]
# Open search.db in read-only mode to avoid accidental writes.
search_db = sqlite3.connect(f"file:{SEARCH_DB_PATH}?mode=ro", uri=True, check_same_thread=False)
search_db.row_factory = sqlite3.Row
def _normalize_folder_path(folder_path: str) -> str:
"""Normalize folder paths to a consistent, slash-based format."""
return folder_path.replace("\\", "/").strip().strip("/")
def _bucket_for_filetype(filetype: str) -> str:
if not filetype:
return "Misc"
ft = str(filetype).strip().lower()
if not ft:
return "Misc"
if "/" in ft and not ft.startswith("."):
if ft.startswith("image/"):
return "Image"
if ft.startswith("video/"):
return "Video"
if ft.startswith("audio/"):
return "Audio"
if ft.startswith("text/") or ft in ("application/pdf",):
return "Document"
if not ft.startswith("."):
ft = f".{ft}"
for label, exts in FILETYPE_BUCKETS:
if ft in exts:
return label
return "Misc"
def _list_children(parent: str = "") -> List[str]:
"""
Return the next folder level for the given parent.
- parent == "" → first level (basefolder column)
- parent != "" → distinct next segment in relative_path below that parent
"""
cursor = search_db.cursor()
normalized = _normalize_folder_path(parent)
if not normalized:
rows = cursor.execute("SELECT DISTINCT basefolder FROM files ORDER BY basefolder").fetchall()
return [row["basefolder"] for row in rows if row["basefolder"]]
prefix = normalized + "/"
rows = cursor.execute(
"SELECT relative_path FROM files WHERE relative_path LIKE ?",
(prefix + "%",)
).fetchall()
children = set()
plen = len(prefix)
for row in rows:
rel = row["relative_path"]
# Strip the prefix and keep only the next segment
remainder = rel[plen:]
if "/" in remainder:
next_seg = remainder.split("/", 1)[0]
else:
# File directly under parent; no deeper folder
continue
if next_seg:
children.add(next_seg)
return sorted(children)
def _query_counts(folder_path: str) -> Dict[str, Any]:
"""Run a grouped count query on search.db filtered by folder_path."""
normalized = _normalize_folder_path(folder_path)
params: List[str] = []
conditions: List[str] = []
if not normalized:
raise ValueError("Bitte einen Ordnerpfad angeben.")
# Match both basefolder and deeper paths inside that folder.
conditions.append("(relative_path LIKE ? OR basefolder = ?)")
params.extend([f"{normalized}/%", normalized])
where_sql = " AND ".join(conditions) if conditions else "1=1"
sql = f"""
SELECT COALESCE(category, 'Keine Kategorie') AS category_label,
COUNT(*) AS file_count
FROM files
WHERE {where_sql}
GROUP BY category_label
ORDER BY file_count DESC
"""
cursor = search_db.cursor()
rows = cursor.execute(sql, params).fetchall()
type_rows = cursor.execute(
f"""
SELECT COALESCE(filetype, '') AS filetype_label,
COUNT(*) AS file_count
FROM files
WHERE {where_sql}
GROUP BY filetype_label
""",
params
).fetchall()
total = sum(row["file_count"] for row in rows)
categories = [
{"category": row["category_label"], "count": row["file_count"]}
for row in rows
]
type_totals: Dict[str, int] = {}
for row in type_rows:
bucket = _bucket_for_filetype(row["filetype_label"])
type_totals[bucket] = type_totals.get(bucket, 0) + row["file_count"]
filetypes = [
{"type": label, "count": type_totals[label]}
for label in FILETYPE_ORDER
if type_totals.get(label)
]
return {"total": total, "categories": categories, "filetypes": filetypes}
def search_db_analyzer():
"""Render the UI for analyzing search.db by folder."""
return render_template(
"search_db_analyzer.html",
admin_enabled=auth.is_admin(),
title_short=APP_CONFIG.get("TITLE_SHORT", "Default Title"),
title_long=APP_CONFIG.get("TITLE_LONG", "Default Title"),
)
def search_db_query():
"""Return grouped counts by category for a given folder path."""
payload = request.get_json(silent=True) or {}
folder_path = (payload.get("folder_path") or "").strip()
try:
result = _query_counts(folder_path)
except ValueError as exc:
return jsonify({"error": str(exc)}), 400
except Exception as exc: # pragma: no cover - defensive logging
return jsonify({"error": f"Abfrage fehlgeschlagen: {exc}"}), 500
return jsonify(result)
def search_db_folders():
"""Return next-level folder names for the given parent path (or basefolders)."""
parent = request.args.get("parent", "").strip()
try:
children = _list_children(parent)
except Exception as exc: # pragma: no cover - defensive logging
return jsonify({"error": f"Ordner konnten nicht geladen werden: {exc}"}), 500
return jsonify({"children": children})