import os import sqlite3 from typing import List, Dict, Any from flask import render_template, request, jsonify import auth APP_CONFIG = auth.return_app_config() BASE_DIR = os.path.dirname(os.path.abspath(__file__)) SEARCH_DB_PATH = os.path.join(BASE_DIR, "search.db") # Open search.db in read-only mode to avoid accidental writes. search_db = sqlite3.connect(f"file:{SEARCH_DB_PATH}?mode=ro", uri=True, check_same_thread=False) search_db.row_factory = sqlite3.Row def _normalize_folder_path(folder_path: str) -> str: """Normalize folder paths to a consistent, slash-based format.""" return folder_path.replace("\\", "/").strip().strip("/") def _list_children(parent: str = "") -> List[str]: """ Return the next folder level for the given parent. - parent == "" → first level (basefolder column) - parent != "" → distinct next segment in relative_path below that parent """ cursor = search_db.cursor() normalized = _normalize_folder_path(parent) if not normalized: rows = cursor.execute("SELECT DISTINCT basefolder FROM files ORDER BY basefolder").fetchall() return [row["basefolder"] for row in rows if row["basefolder"]] prefix = normalized + "/" rows = cursor.execute( "SELECT relative_path FROM files WHERE relative_path LIKE ?", (prefix + "%",) ).fetchall() children = set() plen = len(prefix) for row in rows: rel = row["relative_path"] # Strip the prefix and keep only the next segment remainder = rel[plen:] if "/" in remainder: next_seg = remainder.split("/", 1)[0] else: # File directly under parent; no deeper folder continue if next_seg: children.add(next_seg) return sorted(children) def _query_counts(folder_path: str) -> Dict[str, Any]: """Run a grouped count query on search.db filtered by folder_path.""" normalized = _normalize_folder_path(folder_path) params: List[str] = [] conditions: List[str] = [] if not normalized: raise ValueError("Bitte einen Ordnerpfad angeben.") # Match both basefolder and deeper paths inside that folder. conditions.append("(relative_path LIKE ? OR basefolder = ?)") params.extend([f"{normalized}/%", normalized]) where_sql = " AND ".join(conditions) if conditions else "1=1" sql = f""" SELECT COALESCE(category, 'Keine Kategorie') AS category_label, COUNT(*) AS file_count FROM files WHERE {where_sql} GROUP BY category_label ORDER BY file_count DESC """ cursor = search_db.cursor() rows = cursor.execute(sql, params).fetchall() total = sum(row["file_count"] for row in rows) categories = [ {"category": row["category_label"], "count": row["file_count"]} for row in rows ] return {"total": total, "categories": categories} def search_db_analyzer(): """Render the UI for analyzing search.db by folder.""" return render_template( "search_db_analyzer.html", admin_enabled=auth.is_admin(), title_short=APP_CONFIG.get("TITLE_SHORT", "Default Title"), title_long=APP_CONFIG.get("TITLE_LONG", "Default Title"), ) def search_db_query(): """Return grouped counts by category for a given folder path.""" payload = request.get_json(silent=True) or {} folder_path = (payload.get("folder_path") or "").strip() try: result = _query_counts(folder_path) except ValueError as exc: return jsonify({"error": str(exc)}), 400 except Exception as exc: # pragma: no cover - defensive logging return jsonify({"error": f"Abfrage fehlgeschlagen: {exc}"}), 500 return jsonify(result) def search_db_folders(): """Return next-level folder names for the given parent path (or basefolders).""" parent = request.args.get("parent", "").strip() try: children = _list_children(parent) except Exception as exc: # pragma: no cover - defensive logging return jsonify({"error": f"Ordner konnten nicht geladen werden: {exc}"}), 500 return jsonify({"children": children})