import os import sqlite3 from typing import List, Dict, Any from flask import render_template, request, jsonify import auth APP_CONFIG = auth.return_app_config() BASE_DIR = os.path.dirname(os.path.abspath(__file__)) SEARCH_DB_PATH = os.path.join(BASE_DIR, "search.db") # Filetype buckets (extension-based, lowercase, leading dot). FILETYPE_BUCKETS = [ ("Liedtexte", { ".sng" }), ("Image", { ".png", ".gif", ".bmp", ".webp", ".tiff", ".tif", ".svg", ".ico" }), ("Video", { ".mp4", ".m4v", ".mov", ".avi", ".mkv", ".webm", ".mpg", ".mpeg", ".3gp", ".3g2", ".wmv" }), ("Photo", { ".jpg", ".jpeg", ".heic", ".heif", ".dng", ".cr2", ".cr3", ".arw", ".nef", ".orf", ".rw2", ".raf" }), ("Document", { ".pdf", ".doc", ".docx", ".ppt", ".pptx", ".txt", ".rtf", ".md", ".odt", ".pages", ".key", ".note" }), ("Table", { ".xls", ".xlsx", ".csv", ".ods", ".tsv" }), ("Audio", { ".mp3", ".wav", ".m4a", ".flac", ".ogg", ".aac", ".wma", ".aiff", ".alac" }), ] FILETYPE_ORDER = [label for label, _ in FILETYPE_BUCKETS] + ["Misc"] # Open search.db in read-only mode to avoid accidental writes. search_db = sqlite3.connect(f"file:{SEARCH_DB_PATH}?mode=ro", uri=True, check_same_thread=False) search_db.row_factory = sqlite3.Row def _normalize_folder_path(folder_path: str) -> str: """Normalize folder paths to a consistent, slash-based format.""" return folder_path.replace("\\", "/").strip().strip("/") def _bucket_for_filetype(filetype: str) -> str: if not filetype: return "Misc" ft = str(filetype).strip().lower() if not ft: return "Misc" if "/" in ft and not ft.startswith("."): if ft.startswith("image/"): return "Image" if ft.startswith("video/"): return "Video" if ft.startswith("audio/"): return "Audio" if ft.startswith("text/") or ft in ("application/pdf",): return "Document" if not ft.startswith("."): ft = f".{ft}" for label, exts in FILETYPE_BUCKETS: if ft in exts: return label return "Misc" def _list_children(parent: str = "") -> List[str]: """ Return the next folder level for the given parent. - parent == "" → first level (basefolder column) - parent != "" → distinct next segment in relative_path below that parent """ cursor = search_db.cursor() normalized = _normalize_folder_path(parent) if not normalized: rows = cursor.execute("SELECT DISTINCT basefolder FROM files ORDER BY basefolder").fetchall() return [row["basefolder"] for row in rows if row["basefolder"]] prefix = normalized + "/" rows = cursor.execute( "SELECT relative_path FROM files WHERE relative_path LIKE ?", (prefix + "%",) ).fetchall() children = set() plen = len(prefix) for row in rows: rel = row["relative_path"] # Strip the prefix and keep only the next segment remainder = rel[plen:] if "/" in remainder: next_seg = remainder.split("/", 1)[0] else: # File directly under parent; no deeper folder continue if next_seg: children.add(next_seg) return sorted(children) def _query_counts(folder_path: str) -> Dict[str, Any]: """Run a grouped count query on search.db filtered by folder_path.""" normalized = _normalize_folder_path(folder_path) params: List[str] = [] conditions: List[str] = [] if not normalized: raise ValueError("Bitte einen Ordnerpfad angeben.") # Match both basefolder and deeper paths inside that folder. conditions.append("(relative_path LIKE ? OR basefolder = ?)") params.extend([f"{normalized}/%", normalized]) where_sql = " AND ".join(conditions) if conditions else "1=1" sql = f""" SELECT COALESCE(category, 'Keine Kategorie') AS category_label, COUNT(*) AS file_count FROM files WHERE {where_sql} GROUP BY category_label ORDER BY file_count DESC """ cursor = search_db.cursor() rows = cursor.execute(sql, params).fetchall() type_rows = cursor.execute( f""" SELECT COALESCE(filetype, '') AS filetype_label, COUNT(*) AS file_count FROM files WHERE {where_sql} GROUP BY filetype_label """, params ).fetchall() total = sum(row["file_count"] for row in rows) categories = [ {"category": row["category_label"], "count": row["file_count"]} for row in rows ] type_totals: Dict[str, int] = {} for row in type_rows: bucket = _bucket_for_filetype(row["filetype_label"]) type_totals[bucket] = type_totals.get(bucket, 0) + row["file_count"] filetypes = [ {"type": label, "count": type_totals[label]} for label in FILETYPE_ORDER if type_totals.get(label) ] return {"total": total, "categories": categories, "filetypes": filetypes} def search_db_analyzer(): """Render the UI for analyzing search.db by folder.""" return render_template( "search_db_analyzer.html", admin_enabled=auth.is_admin(), title_short=APP_CONFIG.get("TITLE_SHORT", "Default Title"), title_long=APP_CONFIG.get("TITLE_LONG", "Default Title"), ) def search_db_query(): """Return grouped counts by category for a given folder path.""" payload = request.get_json(silent=True) or {} folder_path = (payload.get("folder_path") or "").strip() try: result = _query_counts(folder_path) except ValueError as exc: return jsonify({"error": str(exc)}), 400 except Exception as exc: # pragma: no cover - defensive logging return jsonify({"error": f"Abfrage fehlgeschlagen: {exc}"}), 500 return jsonify(result) def search_db_folders(): """Return next-level folder names for the given parent path (or basefolders).""" parent = request.args.get("parent", "").strip() try: children = _list_children(parent) except Exception as exc: # pragma: no cover - defensive logging return jsonify({"error": f"Ordner konnten nicht geladen werden: {exc}"}), 500 return jsonify({"children": children})