202 lines
6.2 KiB
Python
202 lines
6.2 KiB
Python
import os
|
|
import sqlite3
|
|
from typing import List, Dict, Any
|
|
|
|
from flask import render_template, request, jsonify
|
|
|
|
import auth
|
|
|
|
APP_CONFIG = auth.return_app_config()
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
SEARCH_DB_PATH = os.path.join(BASE_DIR, "search.db")
|
|
|
|
# Filetype buckets (extension-based, lowercase, leading dot).
|
|
FILETYPE_BUCKETS = [
|
|
("Liedtexte", {
|
|
".sng"
|
|
}),
|
|
("Image", {
|
|
".png", ".gif", ".bmp", ".webp", ".tiff", ".tif", ".svg", ".ico"
|
|
}),
|
|
("Video", {
|
|
".mp4", ".m4v", ".mov", ".avi", ".mkv", ".webm", ".mpg", ".mpeg", ".3gp", ".3g2", ".wmv"
|
|
}),
|
|
("Photo", {
|
|
".jpg", ".jpeg", ".heic", ".heif", ".dng", ".cr2", ".cr3", ".arw", ".nef", ".orf", ".rw2", ".raf"
|
|
}),
|
|
("Document", {
|
|
".pdf", ".doc", ".docx", ".ppt", ".pptx", ".txt", ".rtf", ".md", ".odt", ".pages", ".key", ".note"
|
|
}),
|
|
("Table", {
|
|
".xls", ".xlsx", ".csv", ".ods", ".tsv"
|
|
}),
|
|
("Audio", {
|
|
".mp3", ".wav", ".m4a", ".flac", ".ogg", ".aac", ".wma", ".aiff", ".alac"
|
|
}),
|
|
]
|
|
FILETYPE_ORDER = [label for label, _ in FILETYPE_BUCKETS] + ["Misc"]
|
|
|
|
# Open search.db in read-only mode to avoid accidental writes.
|
|
search_db = sqlite3.connect(f"file:{SEARCH_DB_PATH}?mode=ro", uri=True, check_same_thread=False)
|
|
search_db.row_factory = sqlite3.Row
|
|
|
|
|
|
def _normalize_folder_path(folder_path: str) -> str:
|
|
"""Normalize folder paths to a consistent, slash-based format."""
|
|
return folder_path.replace("\\", "/").strip().strip("/")
|
|
|
|
|
|
def _bucket_for_filetype(filetype: str) -> str:
|
|
if not filetype:
|
|
return "Misc"
|
|
ft = str(filetype).strip().lower()
|
|
if not ft:
|
|
return "Misc"
|
|
|
|
if "/" in ft and not ft.startswith("."):
|
|
if ft.startswith("image/"):
|
|
return "Image"
|
|
if ft.startswith("video/"):
|
|
return "Video"
|
|
if ft.startswith("audio/"):
|
|
return "Audio"
|
|
if ft.startswith("text/") or ft in ("application/pdf",):
|
|
return "Document"
|
|
|
|
if not ft.startswith("."):
|
|
ft = f".{ft}"
|
|
|
|
for label, exts in FILETYPE_BUCKETS:
|
|
if ft in exts:
|
|
return label
|
|
return "Misc"
|
|
|
|
|
|
def _list_children(parent: str = "") -> List[str]:
|
|
"""
|
|
Return the next folder level for the given parent.
|
|
- parent == "" → first level (basefolder column)
|
|
- parent != "" → distinct next segment in relative_path below that parent
|
|
"""
|
|
cursor = search_db.cursor()
|
|
|
|
normalized = _normalize_folder_path(parent)
|
|
if not normalized:
|
|
rows = cursor.execute("SELECT DISTINCT basefolder FROM files ORDER BY basefolder").fetchall()
|
|
return [row["basefolder"] for row in rows if row["basefolder"]]
|
|
|
|
prefix = normalized + "/"
|
|
rows = cursor.execute(
|
|
"SELECT relative_path FROM files WHERE relative_path LIKE ?",
|
|
(prefix + "%",)
|
|
).fetchall()
|
|
|
|
children = set()
|
|
plen = len(prefix)
|
|
for row in rows:
|
|
rel = row["relative_path"]
|
|
# Strip the prefix and keep only the next segment
|
|
remainder = rel[plen:]
|
|
if "/" in remainder:
|
|
next_seg = remainder.split("/", 1)[0]
|
|
else:
|
|
# File directly under parent; no deeper folder
|
|
continue
|
|
if next_seg:
|
|
children.add(next_seg)
|
|
|
|
return sorted(children)
|
|
|
|
|
|
def _query_counts(folder_path: str) -> Dict[str, Any]:
|
|
"""Run a grouped count query on search.db filtered by folder_path."""
|
|
normalized = _normalize_folder_path(folder_path)
|
|
params: List[str] = []
|
|
conditions: List[str] = []
|
|
|
|
if not normalized:
|
|
raise ValueError("Bitte einen Ordnerpfad angeben.")
|
|
|
|
# Match both basefolder and deeper paths inside that folder.
|
|
conditions.append("(relative_path LIKE ? OR basefolder = ?)")
|
|
params.extend([f"{normalized}/%", normalized])
|
|
|
|
where_sql = " AND ".join(conditions) if conditions else "1=1"
|
|
sql = f"""
|
|
SELECT COALESCE(category, 'Keine Kategorie') AS category_label,
|
|
COUNT(*) AS file_count
|
|
FROM files
|
|
WHERE {where_sql}
|
|
GROUP BY category_label
|
|
ORDER BY file_count DESC
|
|
"""
|
|
|
|
cursor = search_db.cursor()
|
|
rows = cursor.execute(sql, params).fetchall()
|
|
|
|
type_rows = cursor.execute(
|
|
f"""
|
|
SELECT COALESCE(filetype, '') AS filetype_label,
|
|
COUNT(*) AS file_count
|
|
FROM files
|
|
WHERE {where_sql}
|
|
GROUP BY filetype_label
|
|
""",
|
|
params
|
|
).fetchall()
|
|
|
|
total = sum(row["file_count"] for row in rows)
|
|
categories = [
|
|
{"category": row["category_label"], "count": row["file_count"]}
|
|
for row in rows
|
|
]
|
|
|
|
type_totals: Dict[str, int] = {}
|
|
for row in type_rows:
|
|
bucket = _bucket_for_filetype(row["filetype_label"])
|
|
type_totals[bucket] = type_totals.get(bucket, 0) + row["file_count"]
|
|
|
|
filetypes = [
|
|
{"type": label, "count": type_totals[label]}
|
|
for label in FILETYPE_ORDER
|
|
if type_totals.get(label)
|
|
]
|
|
|
|
return {"total": total, "categories": categories, "filetypes": filetypes}
|
|
|
|
|
|
def search_db_analyzer():
|
|
"""Render the UI for analyzing search.db by folder."""
|
|
return render_template(
|
|
"search_db_analyzer.html",
|
|
admin_enabled=auth.is_admin(),
|
|
title_short=APP_CONFIG.get("TITLE_SHORT", "Default Title"),
|
|
title_long=APP_CONFIG.get("TITLE_LONG", "Default Title"),
|
|
)
|
|
|
|
|
|
def search_db_query():
|
|
"""Return grouped counts by category for a given folder path."""
|
|
payload = request.get_json(silent=True) or {}
|
|
folder_path = (payload.get("folder_path") or "").strip()
|
|
|
|
try:
|
|
result = _query_counts(folder_path)
|
|
except ValueError as exc:
|
|
return jsonify({"error": str(exc)}), 400
|
|
except Exception as exc: # pragma: no cover - defensive logging
|
|
return jsonify({"error": f"Abfrage fehlgeschlagen: {exc}"}), 500
|
|
|
|
return jsonify(result)
|
|
|
|
|
|
def search_db_folders():
|
|
"""Return next-level folder names for the given parent path (or basefolders)."""
|
|
parent = request.args.get("parent", "").strip()
|
|
try:
|
|
children = _list_children(parent)
|
|
except Exception as exc: # pragma: no cover - defensive logging
|
|
return jsonify({"error": f"Ordner konnten nicht geladen werden: {exc}"}), 500
|
|
|
|
return jsonify({"children": children})
|