add cache analyzer

This commit is contained in:
lelo 2025-11-08 17:14:14 +00:00
parent 8ee07b57eb
commit 462581b698

472
cache_analyzer.py Normal file
View File

@ -0,0 +1,472 @@
#!/usr/bin/env python3
"""
cache_analyzer.py (READ-ONLY)
Analyzes DiskCache-backed caches (audio, image, video, other) by inspecting
their SQLite DBs in STRICT READ-ONLY mode (no writes, no cache opens).
What it shows per cache:
- Directory usage (filesystem)
- DB stats (page size/count, freelist, tables, row counts)
- Total payload (sum of size column)
- 10 oldest / 10 latest entries by Cache.access_time
- Top 10 largest entries
- Size stats: avg, median (approx), min, max
- Size distribution buckets
- Expiration: expired count, expiring next 24h and next 7d (if expire_time present)
- Entries per day (last 14 days) based on access_time
- Top 10 prefixes by bytes (prefix = text before first '/'), if key is TEXT
Usage:
python cache_analyzer.py
# override paths via env vars if needed:
FILECACHE_AUDIO=/path/to/filecache_audio python cache_analyzer.py
"""
import os
import sqlite3
import time
from datetime import datetime
from typing import Optional, Dict, Any, List, Tuple
# -------- Config (override with env vars if needed) --------
CACHE_DIRS = {
"audio": os.environ.get("FILECACHE_AUDIO", "./filecache_audio"),
"image": os.environ.get("FILECACHE_IMAGE", "./filecache_image"),
"video": os.environ.get("FILECACHE_VIDEO", "./filecache_video"),
"other": os.environ.get("FILECACHE_OTHER", "./filecache_other"),
}
# Column heuristics (actual names in your DB appear to include: access_time, expire_time, size, key)
TS_COL = "access_time" # per your request: ONLY use Cache.access_time for recency/age
EXPIRE_COLS = ["expire_time"] # best-effort
SIZE_CANDIDATES = ["size", "bytes", "value_size", "data_size"]
KEY_CANDIDATES = ["key", "path", "name", "url", "k"]
def human_bytes(n: Optional[int]) -> str:
if n is None:
return "N/A"
step = 1024.0
for unit in ("B", "KB", "MB", "GB", "TB", "PB", "EB"):
if n < step:
return f"{n:.2f} {unit}"
n /= step
return f"{n:.2f} ZB"
def print_header(title: str) -> None:
line = "=" * max(60, len(title) + 8)
print(line)
print(f"📦 {title}")
print(line)
def dir_usage_bytes(path: str) -> int:
total = 0
for root, _, files in os.walk(path):
for f in files:
fp = os.path.join(root, f)
try:
total += os.path.getsize(fp)
except OSError:
pass
return total
def open_ro(db_path: str) -> Optional[sqlite3.Connection]:
try:
return sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
except sqlite3.Error:
return None
def list_tables(cur: sqlite3.Cursor) -> List[str]:
cur.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name NOT LIKE 'sqlite_%'
ORDER BY name
""")
return [r[0] for r in cur.fetchall()]
def table_info(cur: sqlite3.Cursor, name: str) -> Dict[str, Dict[str, Any]]:
cur.execute(f"PRAGMA table_info({name})")
info = {}
for cid, cname, ctype, *_ in cur.fetchall():
info[cname] = {"cid": cid, "ctype": ctype}
return info
def pick_first_present(cols: List[str], options: List[str]) -> Optional[str]:
lower = {c.lower(): c for c in cols}
for opt in options:
if opt in lower:
return lower[opt]
return None
def fmt_epoch(v: Optional[float]) -> str:
if v is None:
return "N/A"
try:
fv = float(v)
if fv <= 0 or fv > 1e11:
return str(v)
return datetime.utcfromtimestamp(fv).isoformat() + "Z"
except Exception:
return str(v)
def key_preview(v: Any) -> str:
if v is None:
return "NULL"
if isinstance(v, (bytes, bytearray)):
try:
s = v.decode("utf-8", errors="strict")
return s[:120] + ("" if len(s) > 120 else "")
except Exception:
hx = v.hex()
return "0x" + (hx[:120] + ("" if len(hx) > 120 else ""))
s = str(v)
return s[:120] + ("" if len(s) > 120 else "")
def median_via_sql(cur: sqlite3.Cursor, table: str, size_col: str) -> Optional[float]:
# Approximate median using LIMIT/OFFSET (works fine for typical cache sizes)
try:
cur.execute(f"SELECT COUNT(*) FROM {table} WHERE {size_col} IS NOT NULL")
n = cur.fetchone()[0] or 0
if n == 0:
return None
offset = (n - 1) // 2
cur.execute(f"""
SELECT {size_col} FROM {table}
WHERE {size_col} IS NOT NULL
ORDER BY {size_col} ASC
LIMIT 1 OFFSET {offset}
""")
row = cur.fetchone()
return float(row[0]) if row else None
except Exception:
return None
def top_entries(cur: sqlite3.Cursor, table: str, key_col: Optional[str], size_col: Optional[str],
time_col: str, asc: bool, limit: int = 10):
order = "ASC" if asc else "DESC"
cols = [time_col]
if key_col:
cols.append(key_col)
if size_col:
cols.append(size_col)
col_list = ", ".join(cols)
try:
cur.execute(f"""
SELECT {col_list} FROM {table}
WHERE {time_col} IS NOT NULL
ORDER BY {time_col} {order}
LIMIT {limit}
""")
rows = cur.fetchall()
label = "Oldest" if asc else "Latest"
print(f" {label} {limit} by {time_col}:")
for r in rows:
i = 0
t_s = fmt_epoch(r[i]); i += 1
k_s = key_preview(r[i]) if key_col else "-"
i += 1 if key_col else 0
sz_s = human_bytes(int(r[i])) if (size_col and r[i] is not None) else "-"
print(f" time={t_s} key={k_s} size={sz_s}")
except Exception as e:
print(f" ({label.lower()} query error: {e})")
def top_largest(cur: sqlite3.Cursor, table: str, key_col: Optional[str], size_col: Optional[str], limit: int = 10):
if not size_col:
print(" Top largest: (no size column)")
return
cols = [size_col]
if key_col:
cols.append(key_col)
col_list = ", ".join(cols)
try:
cur.execute(f"""
SELECT {col_list} FROM {table}
WHERE {size_col} IS NOT NULL
ORDER BY {size_col} DESC
LIMIT {limit}
""")
rows = cur.fetchall()
print(f" Top {limit} largest entries:")
for r in rows:
sz = human_bytes(int(r[0] or 0))
key_s = key_preview(r[1]) if key_col else "-"
print(f" size={sz} key={key_s}")
except Exception as e:
print(f" (largest query error: {e})")
def size_stats(cur: sqlite3.Cursor, table: str, size_col: Optional[str]):
if not size_col:
print(" Size stats: (no size column)")
return
try:
cur.execute(f"""
SELECT COUNT(*), SUM({size_col}), AVG({size_col}),
MIN({size_col}), MAX({size_col})
FROM {table}
WHERE {size_col} IS NOT NULL
""")
cnt, total_b, avg_b, min_b, max_b = cur.fetchone()
med_b = median_via_sql(cur, table, size_col)
print(" Size stats:")
print(f" entries: {cnt}")
print(f" total: {human_bytes(int(total_b or 0))}")
print(f" avg: {human_bytes(int(avg_b or 0)) if avg_b else 'N/A'}")
print(f" median: {human_bytes(int(med_b or 0)) if med_b else 'N/A'}")
print(f" min: {human_bytes(int(min_b or 0)) if min_b is not None else 'N/A'}")
print(f" max: {human_bytes(int(max_b or 0)) if max_b is not None else 'N/A'}")
except Exception as e:
print(f" (size stats error: {e})")
def size_distribution(cur: sqlite3.Cursor, table: str, size_col: Optional[str]):
if not size_col:
print(" Size distribution: (no size column)")
return
# Buckets: <1MB, 1-10MB, 10-100MB, 100MB-1GB, 1-5GB, >=5GB
try:
cur.execute(f"""
SELECT
SUM(CASE WHEN {size_col} < 1024*1024 THEN 1 ELSE 0 END) AS lt_1mb,
SUM(CASE WHEN {size_col} >= 1024*1024 AND {size_col} < 10*1024*1024 THEN 1 ELSE 0 END) AS _1_10mb,
SUM(CASE WHEN {size_col} >= 10*1024*1024 AND {size_col} < 100*1024*1024 THEN 1 ELSE 0 END) AS _10_100mb,
SUM(CASE WHEN {size_col} >= 100*1024*1024 AND {size_col} < 1024*1024*1024 THEN 1 ELSE 0 END) AS _100mb_1gb,
SUM(CASE WHEN {size_col} >= 1024*1024*1024 AND {size_col} < 5*1024*1024*1024 THEN 1 ELSE 0 END) AS _1_5gb,
SUM(CASE WHEN {size_col} >= 5*1024*1024*1024 THEN 1 ELSE 0 END) AS gte_5gb
FROM {table}
WHERE {size_col} IS NOT NULL
""")
(lt1, b1, b2, b3, b4, g5) = [int(x or 0) for x in cur.fetchone()]
print(" Size distribution (count):")
print(f" <1MB: {lt1}")
print(f" 110MB: {b1}")
print(f" 10100MB: {b2}")
print(f" 100MB1GB: {b3}")
print(f" 15GB: {b4}")
print(f" ≥5GB: {g5}")
except Exception as e:
print(f" (size distribution error: {e})")
def expiration_stats(cur: sqlite3.Cursor, table: str, expire_col: Optional[str], now_epoch: int):
if not expire_col:
print(" Expiration: (no expire column)")
return
try:
cur.execute(f"SELECT COUNT(*) FROM {table} WHERE {expire_col} IS NOT NULL AND {expire_col} < ?", (now_epoch,))
expired = cur.fetchone()[0] or 0
cur.execute(f"SELECT COUNT(*) FROM {table} WHERE {expire_col} >= ? AND {expire_col} < ?", (now_epoch, now_epoch + 24*3600))
exp_24h = cur.fetchone()[0] or 0
cur.execute(f"SELECT COUNT(*) FROM {table} WHERE {expire_col} >= ? AND {expire_col} < ?", (now_epoch, now_epoch + 7*24*3600))
exp_7d = cur.fetchone()[0] or 0
print(" Expiration (by expire_time):")
print(f" expired: {expired}")
print(f" expiring in 24h: {exp_24h}")
print(f" expiring in 7d: {exp_7d}")
except Exception as e:
print(f" (expiration stats error: {e})")
def entries_per_day(cur: sqlite3.Cursor, table: str, time_col: str, days: int = 14):
try:
cur.execute(f"""
SELECT date({time_col}, 'unixepoch') AS d, COUNT(*)
FROM {table}
WHERE {time_col} IS NOT NULL
GROUP BY d
ORDER BY d DESC
LIMIT {days}
""")
rows = cur.fetchall()
print(f" Entries per day (last {days} days):")
for d, c in rows:
print(f" {d}: {c}")
except Exception as e:
print(f" (entries-per-day error: {e})")
def top_prefixes_by_bytes(cur: sqlite3.Cursor, table: str, key_col: Optional[str], size_col: Optional[str], limit: int = 10):
if not key_col or not size_col:
print(" Top prefixes: (need key and size columns)")
return
# Only consider TEXT keys (skip BLOB keys)
try:
cur.execute(f"""
SELECT
CASE
WHEN typeof({key_col})='text' AND instr({key_col}, '/')>0
THEN substr({key_col}, 1, instr({key_col}, '/')-1)
WHEN typeof({key_col})='text'
THEN {key_col}
ELSE '(non-text)'
END AS prefix,
SUM({size_col}) AS total_bytes,
COUNT(*) AS cnt
FROM {table}
WHERE {size_col} IS NOT NULL
GROUP BY prefix
ORDER BY total_bytes DESC
LIMIT {limit}
""")
rows = cur.fetchall()
print(f" Top {limit} prefixes by bytes:")
for prefix, total_b, cnt in rows:
print(f" {prefix}: {human_bytes(int(total_b or 0))} across {cnt} entries")
except Exception as e:
print(f" (top prefixes error: {e})")
def analyze_sqlite_db(db_path: str) -> None:
print(f" • DB path: {db_path}")
if not os.path.exists(db_path):
print(" Status: ❌ not found")
return
# filesystem size (db file)
try:
fsize = os.path.getsize(db_path)
print(f" File size: {human_bytes(fsize)}")
except Exception:
pass
conn = open_ro(db_path)
if not conn:
print(" Status: ❌ open failed (read-only)")
return
now_epoch = int(time.time())
try:
cur = conn.cursor()
# SQLite stats
cur.execute("PRAGMA page_size;")
page_size = cur.fetchone()[0]
cur.execute("PRAGMA page_count;")
page_count = cur.fetchone()[0]
cur.execute("PRAGMA freelist_count;")
freelist_count = cur.fetchone()[0]
payload = page_size * page_count
print(f" Page size: {page_size} B")
print(f" Page count: {page_count}")
print(f" Payload size: {human_bytes(payload)}")
print(f" Freelist pages: {freelist_count}")
# tables
tables = list_tables(cur)
if not tables:
print(" Tables: (none)")
return
print(" Tables:")
for t in tables:
print(f" - {t}")
# row counts
print(" Row counts:")
for t in tables:
try:
cur.execute(f"SELECT COUNT(*) FROM {t}")
cnt = cur.fetchone()[0]
print(f" {t:<20} {cnt}")
except Exception as e:
print(f" {t:<20} (error: {e})")
# choose entries table (DiskCache default is 'Cache')
probe = next((n for n in tables if n.lower() == "cache"), None) or tables[0]
cols_info = table_info(cur, probe)
colnames = list(cols_info.keys())
lower = [c.lower() for c in colnames]
# prefer 'access_time' (required by your request)
time_col = next((c for c in colnames if c.lower() == TS_COL.lower()), None)
if not time_col:
print(" Note: No 'access_time' column found; skipping chronology-based lists.")
# choose size & key columns
size_col_l = pick_first_present(lower, [c.lower() for c in SIZE_CANDIDATES])
size_col = next((c for c in colnames if c.lower() == size_col_l), None) if size_col_l else None
key_col_l = pick_first_present(lower, [c.lower() for c in KEY_CANDIDATES])
key_col = next((c for c in colnames if c.lower() == key_col_l), None) if key_col_l else None
# print total payload
if size_col:
try:
cur.execute(f"SELECT SUM({size_col}) FROM {probe}")
total_b = cur.fetchone()[0]
print(f" Total payload: {human_bytes(int(total_b or 0))} (sum of {probe}.{size_col})")
except Exception as e:
print(f" Total payload: (error: {e})")
else:
print(f" Total payload: (no size column detected)")
# chronology lists (ONLY by access_time)
if time_col:
top_entries(cur, probe, key_col, size_col, time_col, asc=True, limit=10)
top_entries(cur, probe, key_col, size_col, time_col, asc=False, limit=10)
# largest entries
top_largest(cur, probe, key_col, size_col, limit=10)
# size stats & distribution
size_stats(cur, probe, size_col)
size_distribution(cur, probe, size_col)
# expiration
expire_col_l = pick_first_present(lower, [c.lower() for c in EXPIRE_COLS])
expire_col = next((c for c in colnames if c.lower() == expire_col_l), None) if expire_col_l else None
expiration_stats(cur, probe, expire_col, now_epoch)
# entries per day (last 14d) using access_time
if time_col:
entries_per_day(cur, probe, time_col, days=14)
# top prefixes by bytes (if key is TEXT-like)
top_prefixes_by_bytes(cur, probe, key_col, size_col, limit=10)
finally:
try:
conn.close()
except Exception:
pass
def analyze_cache(label: str, directory: str) -> None:
print_header(f"{label.upper()} CACHE — {directory}")
if not os.path.isdir(directory):
print("Directory status: ❌ not found")
return
# On-disk usage (directory walk, read-only)
try:
usage = dir_usage_bytes(directory)
print(f"Directory usage: {human_bytes(usage)}")
except Exception:
pass
# Read-only inspection of sqlite DB (DiskCache uses cache.db)
analyze_sqlite_db(os.path.join(directory, "cache.db"))
print() # spacer
def analyze_all_caches() -> None:
for label, path in CACHE_DIRS.items():
analyze_cache(label, path)
if __name__ == "__main__":
print_header("ALL CACHES OVERVIEW")
for k, v in CACHE_DIRS.items():
print(f"- {k:<5} -> {v}")
print()
analyze_all_caches()