bethaus-app/cache_analyzer.py
2025-11-08 17:14:14 +00:00

473 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
cache_analyzer.py (READ-ONLY)
Analyzes DiskCache-backed caches (audio, image, video, other) by inspecting
their SQLite DBs in STRICT READ-ONLY mode (no writes, no cache opens).
What it shows per cache:
- Directory usage (filesystem)
- DB stats (page size/count, freelist, tables, row counts)
- Total payload (sum of size column)
- 10 oldest / 10 latest entries by Cache.access_time
- Top 10 largest entries
- Size stats: avg, median (approx), min, max
- Size distribution buckets
- Expiration: expired count, expiring next 24h and next 7d (if expire_time present)
- Entries per day (last 14 days) based on access_time
- Top 10 prefixes by bytes (prefix = text before first '/'), if key is TEXT
Usage:
python cache_analyzer.py
# override paths via env vars if needed:
FILECACHE_AUDIO=/path/to/filecache_audio python cache_analyzer.py
"""
import os
import sqlite3
import time
from datetime import datetime
from typing import Optional, Dict, Any, List, Tuple
# -------- Config (override with env vars if needed) --------
CACHE_DIRS = {
"audio": os.environ.get("FILECACHE_AUDIO", "./filecache_audio"),
"image": os.environ.get("FILECACHE_IMAGE", "./filecache_image"),
"video": os.environ.get("FILECACHE_VIDEO", "./filecache_video"),
"other": os.environ.get("FILECACHE_OTHER", "./filecache_other"),
}
# Column heuristics (actual names in your DB appear to include: access_time, expire_time, size, key)
TS_COL = "access_time" # per your request: ONLY use Cache.access_time for recency/age
EXPIRE_COLS = ["expire_time"] # best-effort
SIZE_CANDIDATES = ["size", "bytes", "value_size", "data_size"]
KEY_CANDIDATES = ["key", "path", "name", "url", "k"]
def human_bytes(n: Optional[int]) -> str:
if n is None:
return "N/A"
step = 1024.0
for unit in ("B", "KB", "MB", "GB", "TB", "PB", "EB"):
if n < step:
return f"{n:.2f} {unit}"
n /= step
return f"{n:.2f} ZB"
def print_header(title: str) -> None:
line = "=" * max(60, len(title) + 8)
print(line)
print(f"📦 {title}")
print(line)
def dir_usage_bytes(path: str) -> int:
total = 0
for root, _, files in os.walk(path):
for f in files:
fp = os.path.join(root, f)
try:
total += os.path.getsize(fp)
except OSError:
pass
return total
def open_ro(db_path: str) -> Optional[sqlite3.Connection]:
try:
return sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
except sqlite3.Error:
return None
def list_tables(cur: sqlite3.Cursor) -> List[str]:
cur.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name NOT LIKE 'sqlite_%'
ORDER BY name
""")
return [r[0] for r in cur.fetchall()]
def table_info(cur: sqlite3.Cursor, name: str) -> Dict[str, Dict[str, Any]]:
cur.execute(f"PRAGMA table_info({name})")
info = {}
for cid, cname, ctype, *_ in cur.fetchall():
info[cname] = {"cid": cid, "ctype": ctype}
return info
def pick_first_present(cols: List[str], options: List[str]) -> Optional[str]:
lower = {c.lower(): c for c in cols}
for opt in options:
if opt in lower:
return lower[opt]
return None
def fmt_epoch(v: Optional[float]) -> str:
if v is None:
return "N/A"
try:
fv = float(v)
if fv <= 0 or fv > 1e11:
return str(v)
return datetime.utcfromtimestamp(fv).isoformat() + "Z"
except Exception:
return str(v)
def key_preview(v: Any) -> str:
if v is None:
return "NULL"
if isinstance(v, (bytes, bytearray)):
try:
s = v.decode("utf-8", errors="strict")
return s[:120] + ("" if len(s) > 120 else "")
except Exception:
hx = v.hex()
return "0x" + (hx[:120] + ("" if len(hx) > 120 else ""))
s = str(v)
return s[:120] + ("" if len(s) > 120 else "")
def median_via_sql(cur: sqlite3.Cursor, table: str, size_col: str) -> Optional[float]:
# Approximate median using LIMIT/OFFSET (works fine for typical cache sizes)
try:
cur.execute(f"SELECT COUNT(*) FROM {table} WHERE {size_col} IS NOT NULL")
n = cur.fetchone()[0] or 0
if n == 0:
return None
offset = (n - 1) // 2
cur.execute(f"""
SELECT {size_col} FROM {table}
WHERE {size_col} IS NOT NULL
ORDER BY {size_col} ASC
LIMIT 1 OFFSET {offset}
""")
row = cur.fetchone()
return float(row[0]) if row else None
except Exception:
return None
def top_entries(cur: sqlite3.Cursor, table: str, key_col: Optional[str], size_col: Optional[str],
time_col: str, asc: bool, limit: int = 10):
order = "ASC" if asc else "DESC"
cols = [time_col]
if key_col:
cols.append(key_col)
if size_col:
cols.append(size_col)
col_list = ", ".join(cols)
try:
cur.execute(f"""
SELECT {col_list} FROM {table}
WHERE {time_col} IS NOT NULL
ORDER BY {time_col} {order}
LIMIT {limit}
""")
rows = cur.fetchall()
label = "Oldest" if asc else "Latest"
print(f" {label} {limit} by {time_col}:")
for r in rows:
i = 0
t_s = fmt_epoch(r[i]); i += 1
k_s = key_preview(r[i]) if key_col else "-"
i += 1 if key_col else 0
sz_s = human_bytes(int(r[i])) if (size_col and r[i] is not None) else "-"
print(f" time={t_s} key={k_s} size={sz_s}")
except Exception as e:
print(f" ({label.lower()} query error: {e})")
def top_largest(cur: sqlite3.Cursor, table: str, key_col: Optional[str], size_col: Optional[str], limit: int = 10):
if not size_col:
print(" Top largest: (no size column)")
return
cols = [size_col]
if key_col:
cols.append(key_col)
col_list = ", ".join(cols)
try:
cur.execute(f"""
SELECT {col_list} FROM {table}
WHERE {size_col} IS NOT NULL
ORDER BY {size_col} DESC
LIMIT {limit}
""")
rows = cur.fetchall()
print(f" Top {limit} largest entries:")
for r in rows:
sz = human_bytes(int(r[0] or 0))
key_s = key_preview(r[1]) if key_col else "-"
print(f" size={sz} key={key_s}")
except Exception as e:
print(f" (largest query error: {e})")
def size_stats(cur: sqlite3.Cursor, table: str, size_col: Optional[str]):
if not size_col:
print(" Size stats: (no size column)")
return
try:
cur.execute(f"""
SELECT COUNT(*), SUM({size_col}), AVG({size_col}),
MIN({size_col}), MAX({size_col})
FROM {table}
WHERE {size_col} IS NOT NULL
""")
cnt, total_b, avg_b, min_b, max_b = cur.fetchone()
med_b = median_via_sql(cur, table, size_col)
print(" Size stats:")
print(f" entries: {cnt}")
print(f" total: {human_bytes(int(total_b or 0))}")
print(f" avg: {human_bytes(int(avg_b or 0)) if avg_b else 'N/A'}")
print(f" median: {human_bytes(int(med_b or 0)) if med_b else 'N/A'}")
print(f" min: {human_bytes(int(min_b or 0)) if min_b is not None else 'N/A'}")
print(f" max: {human_bytes(int(max_b or 0)) if max_b is not None else 'N/A'}")
except Exception as e:
print(f" (size stats error: {e})")
def size_distribution(cur: sqlite3.Cursor, table: str, size_col: Optional[str]):
if not size_col:
print(" Size distribution: (no size column)")
return
# Buckets: <1MB, 1-10MB, 10-100MB, 100MB-1GB, 1-5GB, >=5GB
try:
cur.execute(f"""
SELECT
SUM(CASE WHEN {size_col} < 1024*1024 THEN 1 ELSE 0 END) AS lt_1mb,
SUM(CASE WHEN {size_col} >= 1024*1024 AND {size_col} < 10*1024*1024 THEN 1 ELSE 0 END) AS _1_10mb,
SUM(CASE WHEN {size_col} >= 10*1024*1024 AND {size_col} < 100*1024*1024 THEN 1 ELSE 0 END) AS _10_100mb,
SUM(CASE WHEN {size_col} >= 100*1024*1024 AND {size_col} < 1024*1024*1024 THEN 1 ELSE 0 END) AS _100mb_1gb,
SUM(CASE WHEN {size_col} >= 1024*1024*1024 AND {size_col} < 5*1024*1024*1024 THEN 1 ELSE 0 END) AS _1_5gb,
SUM(CASE WHEN {size_col} >= 5*1024*1024*1024 THEN 1 ELSE 0 END) AS gte_5gb
FROM {table}
WHERE {size_col} IS NOT NULL
""")
(lt1, b1, b2, b3, b4, g5) = [int(x or 0) for x in cur.fetchone()]
print(" Size distribution (count):")
print(f" <1MB: {lt1}")
print(f" 110MB: {b1}")
print(f" 10100MB: {b2}")
print(f" 100MB1GB: {b3}")
print(f" 15GB: {b4}")
print(f" ≥5GB: {g5}")
except Exception as e:
print(f" (size distribution error: {e})")
def expiration_stats(cur: sqlite3.Cursor, table: str, expire_col: Optional[str], now_epoch: int):
if not expire_col:
print(" Expiration: (no expire column)")
return
try:
cur.execute(f"SELECT COUNT(*) FROM {table} WHERE {expire_col} IS NOT NULL AND {expire_col} < ?", (now_epoch,))
expired = cur.fetchone()[0] or 0
cur.execute(f"SELECT COUNT(*) FROM {table} WHERE {expire_col} >= ? AND {expire_col} < ?", (now_epoch, now_epoch + 24*3600))
exp_24h = cur.fetchone()[0] or 0
cur.execute(f"SELECT COUNT(*) FROM {table} WHERE {expire_col} >= ? AND {expire_col} < ?", (now_epoch, now_epoch + 7*24*3600))
exp_7d = cur.fetchone()[0] or 0
print(" Expiration (by expire_time):")
print(f" expired: {expired}")
print(f" expiring in 24h: {exp_24h}")
print(f" expiring in 7d: {exp_7d}")
except Exception as e:
print(f" (expiration stats error: {e})")
def entries_per_day(cur: sqlite3.Cursor, table: str, time_col: str, days: int = 14):
try:
cur.execute(f"""
SELECT date({time_col}, 'unixepoch') AS d, COUNT(*)
FROM {table}
WHERE {time_col} IS NOT NULL
GROUP BY d
ORDER BY d DESC
LIMIT {days}
""")
rows = cur.fetchall()
print(f" Entries per day (last {days} days):")
for d, c in rows:
print(f" {d}: {c}")
except Exception as e:
print(f" (entries-per-day error: {e})")
def top_prefixes_by_bytes(cur: sqlite3.Cursor, table: str, key_col: Optional[str], size_col: Optional[str], limit: int = 10):
if not key_col or not size_col:
print(" Top prefixes: (need key and size columns)")
return
# Only consider TEXT keys (skip BLOB keys)
try:
cur.execute(f"""
SELECT
CASE
WHEN typeof({key_col})='text' AND instr({key_col}, '/')>0
THEN substr({key_col}, 1, instr({key_col}, '/')-1)
WHEN typeof({key_col})='text'
THEN {key_col}
ELSE '(non-text)'
END AS prefix,
SUM({size_col}) AS total_bytes,
COUNT(*) AS cnt
FROM {table}
WHERE {size_col} IS NOT NULL
GROUP BY prefix
ORDER BY total_bytes DESC
LIMIT {limit}
""")
rows = cur.fetchall()
print(f" Top {limit} prefixes by bytes:")
for prefix, total_b, cnt in rows:
print(f" {prefix}: {human_bytes(int(total_b or 0))} across {cnt} entries")
except Exception as e:
print(f" (top prefixes error: {e})")
def analyze_sqlite_db(db_path: str) -> None:
print(f" • DB path: {db_path}")
if not os.path.exists(db_path):
print(" Status: ❌ not found")
return
# filesystem size (db file)
try:
fsize = os.path.getsize(db_path)
print(f" File size: {human_bytes(fsize)}")
except Exception:
pass
conn = open_ro(db_path)
if not conn:
print(" Status: ❌ open failed (read-only)")
return
now_epoch = int(time.time())
try:
cur = conn.cursor()
# SQLite stats
cur.execute("PRAGMA page_size;")
page_size = cur.fetchone()[0]
cur.execute("PRAGMA page_count;")
page_count = cur.fetchone()[0]
cur.execute("PRAGMA freelist_count;")
freelist_count = cur.fetchone()[0]
payload = page_size * page_count
print(f" Page size: {page_size} B")
print(f" Page count: {page_count}")
print(f" Payload size: {human_bytes(payload)}")
print(f" Freelist pages: {freelist_count}")
# tables
tables = list_tables(cur)
if not tables:
print(" Tables: (none)")
return
print(" Tables:")
for t in tables:
print(f" - {t}")
# row counts
print(" Row counts:")
for t in tables:
try:
cur.execute(f"SELECT COUNT(*) FROM {t}")
cnt = cur.fetchone()[0]
print(f" {t:<20} {cnt}")
except Exception as e:
print(f" {t:<20} (error: {e})")
# choose entries table (DiskCache default is 'Cache')
probe = next((n for n in tables if n.lower() == "cache"), None) or tables[0]
cols_info = table_info(cur, probe)
colnames = list(cols_info.keys())
lower = [c.lower() for c in colnames]
# prefer 'access_time' (required by your request)
time_col = next((c for c in colnames if c.lower() == TS_COL.lower()), None)
if not time_col:
print(" Note: No 'access_time' column found; skipping chronology-based lists.")
# choose size & key columns
size_col_l = pick_first_present(lower, [c.lower() for c in SIZE_CANDIDATES])
size_col = next((c for c in colnames if c.lower() == size_col_l), None) if size_col_l else None
key_col_l = pick_first_present(lower, [c.lower() for c in KEY_CANDIDATES])
key_col = next((c for c in colnames if c.lower() == key_col_l), None) if key_col_l else None
# print total payload
if size_col:
try:
cur.execute(f"SELECT SUM({size_col}) FROM {probe}")
total_b = cur.fetchone()[0]
print(f" Total payload: {human_bytes(int(total_b or 0))} (sum of {probe}.{size_col})")
except Exception as e:
print(f" Total payload: (error: {e})")
else:
print(f" Total payload: (no size column detected)")
# chronology lists (ONLY by access_time)
if time_col:
top_entries(cur, probe, key_col, size_col, time_col, asc=True, limit=10)
top_entries(cur, probe, key_col, size_col, time_col, asc=False, limit=10)
# largest entries
top_largest(cur, probe, key_col, size_col, limit=10)
# size stats & distribution
size_stats(cur, probe, size_col)
size_distribution(cur, probe, size_col)
# expiration
expire_col_l = pick_first_present(lower, [c.lower() for c in EXPIRE_COLS])
expire_col = next((c for c in colnames if c.lower() == expire_col_l), None) if expire_col_l else None
expiration_stats(cur, probe, expire_col, now_epoch)
# entries per day (last 14d) using access_time
if time_col:
entries_per_day(cur, probe, time_col, days=14)
# top prefixes by bytes (if key is TEXT-like)
top_prefixes_by_bytes(cur, probe, key_col, size_col, limit=10)
finally:
try:
conn.close()
except Exception:
pass
def analyze_cache(label: str, directory: str) -> None:
print_header(f"{label.upper()} CACHE — {directory}")
if not os.path.isdir(directory):
print("Directory status: ❌ not found")
return
# On-disk usage (directory walk, read-only)
try:
usage = dir_usage_bytes(directory)
print(f"Directory usage: {human_bytes(usage)}")
except Exception:
pass
# Read-only inspection of sqlite DB (DiskCache uses cache.db)
analyze_sqlite_db(os.path.join(directory, "cache.db"))
print() # spacer
def analyze_all_caches() -> None:
for label, path in CACHE_DIRS.items():
analyze_cache(label, path)
if __name__ == "__main__":
print_header("ALL CACHES OVERVIEW")
for k, v in CACHE_DIRS.items():
print(f"- {k:<5} -> {v}")
print()
analyze_all_caches()