128 lines
4.1 KiB
Python
128 lines
4.1 KiB
Python
import os
|
|
import sqlite3
|
|
from typing import List, Dict, Any
|
|
|
|
from flask import render_template, request, jsonify
|
|
|
|
import auth
|
|
|
|
APP_CONFIG = auth.return_app_config()
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
SEARCH_DB_PATH = os.path.join(BASE_DIR, "search.db")
|
|
|
|
# Open search.db in read-only mode to avoid accidental writes.
|
|
search_db = sqlite3.connect(f"file:{SEARCH_DB_PATH}?mode=ro", uri=True, check_same_thread=False)
|
|
search_db.row_factory = sqlite3.Row
|
|
|
|
|
|
def _normalize_folder_path(folder_path: str) -> str:
|
|
"""Normalize folder paths to a consistent, slash-based format."""
|
|
return folder_path.replace("\\", "/").strip().strip("/")
|
|
|
|
|
|
def _list_children(parent: str = "") -> List[str]:
|
|
"""
|
|
Return the next folder level for the given parent.
|
|
- parent == "" → first level (basefolder column)
|
|
- parent != "" → distinct next segment in relative_path below that parent
|
|
"""
|
|
cursor = search_db.cursor()
|
|
|
|
normalized = _normalize_folder_path(parent)
|
|
if not normalized:
|
|
rows = cursor.execute("SELECT DISTINCT basefolder FROM files ORDER BY basefolder").fetchall()
|
|
return [row["basefolder"] for row in rows if row["basefolder"]]
|
|
|
|
prefix = normalized + "/"
|
|
rows = cursor.execute(
|
|
"SELECT relative_path FROM files WHERE relative_path LIKE ?",
|
|
(prefix + "%",)
|
|
).fetchall()
|
|
|
|
children = set()
|
|
plen = len(prefix)
|
|
for row in rows:
|
|
rel = row["relative_path"]
|
|
# Strip the prefix and keep only the next segment
|
|
remainder = rel[plen:]
|
|
if "/" in remainder:
|
|
next_seg = remainder.split("/", 1)[0]
|
|
else:
|
|
# File directly under parent; no deeper folder
|
|
continue
|
|
if next_seg:
|
|
children.add(next_seg)
|
|
|
|
return sorted(children)
|
|
|
|
|
|
def _query_counts(folder_path: str) -> Dict[str, Any]:
|
|
"""Run a grouped count query on search.db filtered by folder_path."""
|
|
normalized = _normalize_folder_path(folder_path)
|
|
params: List[str] = []
|
|
conditions: List[str] = []
|
|
|
|
if not normalized:
|
|
raise ValueError("Bitte einen Ordnerpfad angeben.")
|
|
|
|
# Match both basefolder and deeper paths inside that folder.
|
|
conditions.append("(relative_path LIKE ? OR basefolder = ?)")
|
|
params.extend([f"{normalized}/%", normalized])
|
|
|
|
where_sql = " AND ".join(conditions) if conditions else "1=1"
|
|
sql = f"""
|
|
SELECT COALESCE(category, 'Keine Kategorie') AS category_label,
|
|
COUNT(*) AS file_count
|
|
FROM files
|
|
WHERE {where_sql}
|
|
GROUP BY category_label
|
|
ORDER BY file_count DESC
|
|
"""
|
|
|
|
cursor = search_db.cursor()
|
|
rows = cursor.execute(sql, params).fetchall()
|
|
|
|
total = sum(row["file_count"] for row in rows)
|
|
categories = [
|
|
{"category": row["category_label"], "count": row["file_count"]}
|
|
for row in rows
|
|
]
|
|
|
|
return {"total": total, "categories": categories}
|
|
|
|
|
|
def search_db_analyzer():
|
|
"""Render the UI for analyzing search.db by folder."""
|
|
return render_template(
|
|
"search_db_analyzer.html",
|
|
admin_enabled=auth.is_admin(),
|
|
title_short=APP_CONFIG.get("TITLE_SHORT", "Default Title"),
|
|
title_long=APP_CONFIG.get("TITLE_LONG", "Default Title"),
|
|
)
|
|
|
|
|
|
def search_db_query():
|
|
"""Return grouped counts by category for a given folder path."""
|
|
payload = request.get_json(silent=True) or {}
|
|
folder_path = (payload.get("folder_path") or "").strip()
|
|
|
|
try:
|
|
result = _query_counts(folder_path)
|
|
except ValueError as exc:
|
|
return jsonify({"error": str(exc)}), 400
|
|
except Exception as exc: # pragma: no cover - defensive logging
|
|
return jsonify({"error": f"Abfrage fehlgeschlagen: {exc}"}), 500
|
|
|
|
return jsonify(result)
|
|
|
|
|
|
def search_db_folders():
|
|
"""Return next-level folder names for the given parent path (or basefolders)."""
|
|
parent = request.args.get("parent", "").strip()
|
|
try:
|
|
children = _list_children(parent)
|
|
except Exception as exc: # pragma: no cover - defensive logging
|
|
return jsonify({"error": f"Ordner konnten nicht geladen werden: {exc}"}), 500
|
|
|
|
return jsonify({"children": children})
|