#!/usr/bin/env python3 """ cache_analyzer.py (READ-ONLY) Analyzes DiskCache-backed caches (audio, image, video, other) by inspecting their SQLite DBs in STRICT READ-ONLY mode (no writes, no cache opens). What it shows per cache: - Directory usage (filesystem) - DB stats (page size/count, freelist, tables, row counts) - Total payload (sum of size column) - 10 oldest / 10 latest entries by Cache.access_time - Top 10 largest entries - Size stats: avg, median (approx), min, max - Size distribution buckets - Expiration: expired count, expiring next 24h and next 7d (if expire_time present) - Entries per day (last 14 days) based on access_time - Top 10 prefixes by bytes (prefix = text before first '/'), if key is TEXT Usage: python cache_analyzer.py # override paths via env vars if needed: FILECACHE_AUDIO=/path/to/filecache_audio python cache_analyzer.py """ import os import sqlite3 import time from datetime import datetime from typing import Optional, Dict, Any, List, Tuple # -------- Config (override with env vars if needed) -------- CACHE_DIRS = { "audio": os.environ.get("FILECACHE_AUDIO", "./filecache_audio"), "image": os.environ.get("FILECACHE_IMAGE", "./filecache_image"), "video": os.environ.get("FILECACHE_VIDEO", "./filecache_video"), "other": os.environ.get("FILECACHE_OTHER", "./filecache_other"), } # Column heuristics (actual names in your DB appear to include: access_time, expire_time, size, key) TS_COL = "access_time" # per your request: ONLY use Cache.access_time for recency/age EXPIRE_COLS = ["expire_time"] # best-effort SIZE_CANDIDATES = ["size", "bytes", "value_size", "data_size"] KEY_CANDIDATES = ["key", "path", "name", "url", "k"] def human_bytes(n: Optional[int]) -> str: if n is None: return "N/A" step = 1024.0 for unit in ("B", "KB", "MB", "GB", "TB", "PB", "EB"): if n < step: return f"{n:.2f} {unit}" n /= step return f"{n:.2f} ZB" def print_header(title: str) -> None: line = "=" * max(60, len(title) + 8) print(line) print(f"📦 {title}") print(line) def dir_usage_bytes(path: str) -> int: total = 0 for root, _, files in os.walk(path): for f in files: fp = os.path.join(root, f) try: total += os.path.getsize(fp) except OSError: pass return total def open_ro(db_path: str) -> Optional[sqlite3.Connection]: try: return sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) except sqlite3.Error: return None def list_tables(cur: sqlite3.Cursor) -> List[str]: cur.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name """) return [r[0] for r in cur.fetchall()] def table_info(cur: sqlite3.Cursor, name: str) -> Dict[str, Dict[str, Any]]: cur.execute(f"PRAGMA table_info({name})") info = {} for cid, cname, ctype, *_ in cur.fetchall(): info[cname] = {"cid": cid, "ctype": ctype} return info def pick_first_present(cols: List[str], options: List[str]) -> Optional[str]: lower = {c.lower(): c for c in cols} for opt in options: if opt in lower: return lower[opt] return None def fmt_epoch(v: Optional[float]) -> str: if v is None: return "N/A" try: fv = float(v) if fv <= 0 or fv > 1e11: return str(v) return datetime.utcfromtimestamp(fv).isoformat() + "Z" except Exception: return str(v) def key_preview(v: Any) -> str: if v is None: return "NULL" if isinstance(v, (bytes, bytearray)): try: s = v.decode("utf-8", errors="strict") return s[:120] + ("…" if len(s) > 120 else "") except Exception: hx = v.hex() return "0x" + (hx[:120] + ("…" if len(hx) > 120 else "")) s = str(v) return s[:120] + ("…" if len(s) > 120 else "") def median_via_sql(cur: sqlite3.Cursor, table: str, size_col: str) -> Optional[float]: # Approximate median using LIMIT/OFFSET (works fine for typical cache sizes) try: cur.execute(f"SELECT COUNT(*) FROM {table} WHERE {size_col} IS NOT NULL") n = cur.fetchone()[0] or 0 if n == 0: return None offset = (n - 1) // 2 cur.execute(f""" SELECT {size_col} FROM {table} WHERE {size_col} IS NOT NULL ORDER BY {size_col} ASC LIMIT 1 OFFSET {offset} """) row = cur.fetchone() return float(row[0]) if row else None except Exception: return None def top_entries(cur: sqlite3.Cursor, table: str, key_col: Optional[str], size_col: Optional[str], time_col: str, asc: bool, limit: int = 10): order = "ASC" if asc else "DESC" cols = [time_col] if key_col: cols.append(key_col) if size_col: cols.append(size_col) col_list = ", ".join(cols) try: cur.execute(f""" SELECT {col_list} FROM {table} WHERE {time_col} IS NOT NULL ORDER BY {time_col} {order} LIMIT {limit} """) rows = cur.fetchall() label = "Oldest" if asc else "Latest" print(f" {label} {limit} by {time_col}:") for r in rows: i = 0 t_s = fmt_epoch(r[i]); i += 1 k_s = key_preview(r[i]) if key_col else "-" i += 1 if key_col else 0 sz_s = human_bytes(int(r[i])) if (size_col and r[i] is not None) else "-" print(f" time={t_s} key={k_s} size={sz_s}") except Exception as e: print(f" ({label.lower()} query error: {e})") def top_largest(cur: sqlite3.Cursor, table: str, key_col: Optional[str], size_col: Optional[str], limit: int = 10): if not size_col: print(" Top largest: (no size column)") return cols = [size_col] if key_col: cols.append(key_col) col_list = ", ".join(cols) try: cur.execute(f""" SELECT {col_list} FROM {table} WHERE {size_col} IS NOT NULL ORDER BY {size_col} DESC LIMIT {limit} """) rows = cur.fetchall() print(f" Top {limit} largest entries:") for r in rows: sz = human_bytes(int(r[0] or 0)) key_s = key_preview(r[1]) if key_col else "-" print(f" size={sz} key={key_s}") except Exception as e: print(f" (largest query error: {e})") def size_stats(cur: sqlite3.Cursor, table: str, size_col: Optional[str]): if not size_col: print(" Size stats: (no size column)") return try: cur.execute(f""" SELECT COUNT(*), SUM({size_col}), AVG({size_col}), MIN({size_col}), MAX({size_col}) FROM {table} WHERE {size_col} IS NOT NULL """) cnt, total_b, avg_b, min_b, max_b = cur.fetchone() med_b = median_via_sql(cur, table, size_col) print(" Size stats:") print(f" entries: {cnt}") print(f" total: {human_bytes(int(total_b or 0))}") print(f" avg: {human_bytes(int(avg_b or 0)) if avg_b else 'N/A'}") print(f" median: {human_bytes(int(med_b or 0)) if med_b else 'N/A'}") print(f" min: {human_bytes(int(min_b or 0)) if min_b is not None else 'N/A'}") print(f" max: {human_bytes(int(max_b or 0)) if max_b is not None else 'N/A'}") except Exception as e: print(f" (size stats error: {e})") def size_distribution(cur: sqlite3.Cursor, table: str, size_col: Optional[str]): if not size_col: print(" Size distribution: (no size column)") return # Buckets: <1MB, 1-10MB, 10-100MB, 100MB-1GB, 1-5GB, >=5GB try: cur.execute(f""" SELECT SUM(CASE WHEN {size_col} < 1024*1024 THEN 1 ELSE 0 END) AS lt_1mb, SUM(CASE WHEN {size_col} >= 1024*1024 AND {size_col} < 10*1024*1024 THEN 1 ELSE 0 END) AS _1_10mb, SUM(CASE WHEN {size_col} >= 10*1024*1024 AND {size_col} < 100*1024*1024 THEN 1 ELSE 0 END) AS _10_100mb, SUM(CASE WHEN {size_col} >= 100*1024*1024 AND {size_col} < 1024*1024*1024 THEN 1 ELSE 0 END) AS _100mb_1gb, SUM(CASE WHEN {size_col} >= 1024*1024*1024 AND {size_col} < 5*1024*1024*1024 THEN 1 ELSE 0 END) AS _1_5gb, SUM(CASE WHEN {size_col} >= 5*1024*1024*1024 THEN 1 ELSE 0 END) AS gte_5gb FROM {table} WHERE {size_col} IS NOT NULL """) (lt1, b1, b2, b3, b4, g5) = [int(x or 0) for x in cur.fetchone()] print(" Size distribution (count):") print(f" <1MB: {lt1}") print(f" 1–10MB: {b1}") print(f" 10–100MB: {b2}") print(f" 100MB–1GB: {b3}") print(f" 1–5GB: {b4}") print(f" ≥5GB: {g5}") except Exception as e: print(f" (size distribution error: {e})") def expiration_stats(cur: sqlite3.Cursor, table: str, expire_col: Optional[str], now_epoch: int): if not expire_col: print(" Expiration: (no expire column)") return try: cur.execute(f"SELECT COUNT(*) FROM {table} WHERE {expire_col} IS NOT NULL AND {expire_col} < ?", (now_epoch,)) expired = cur.fetchone()[0] or 0 cur.execute(f"SELECT COUNT(*) FROM {table} WHERE {expire_col} >= ? AND {expire_col} < ?", (now_epoch, now_epoch + 24*3600)) exp_24h = cur.fetchone()[0] or 0 cur.execute(f"SELECT COUNT(*) FROM {table} WHERE {expire_col} >= ? AND {expire_col} < ?", (now_epoch, now_epoch + 7*24*3600)) exp_7d = cur.fetchone()[0] or 0 print(" Expiration (by expire_time):") print(f" expired: {expired}") print(f" expiring in 24h: {exp_24h}") print(f" expiring in 7d: {exp_7d}") except Exception as e: print(f" (expiration stats error: {e})") def entries_per_day(cur: sqlite3.Cursor, table: str, time_col: str, days: int = 14): try: cur.execute(f""" SELECT date({time_col}, 'unixepoch') AS d, COUNT(*) FROM {table} WHERE {time_col} IS NOT NULL GROUP BY d ORDER BY d DESC LIMIT {days} """) rows = cur.fetchall() print(f" Entries per day (last {days} days):") for d, c in rows: print(f" {d}: {c}") except Exception as e: print(f" (entries-per-day error: {e})") def top_prefixes_by_bytes(cur: sqlite3.Cursor, table: str, key_col: Optional[str], size_col: Optional[str], limit: int = 10): if not key_col or not size_col: print(" Top prefixes: (need key and size columns)") return # Only consider TEXT keys (skip BLOB keys) try: cur.execute(f""" SELECT CASE WHEN typeof({key_col})='text' AND instr({key_col}, '/')>0 THEN substr({key_col}, 1, instr({key_col}, '/')-1) WHEN typeof({key_col})='text' THEN {key_col} ELSE '(non-text)' END AS prefix, SUM({size_col}) AS total_bytes, COUNT(*) AS cnt FROM {table} WHERE {size_col} IS NOT NULL GROUP BY prefix ORDER BY total_bytes DESC LIMIT {limit} """) rows = cur.fetchall() print(f" Top {limit} prefixes by bytes:") for prefix, total_b, cnt in rows: print(f" {prefix}: {human_bytes(int(total_b or 0))} across {cnt} entries") except Exception as e: print(f" (top prefixes error: {e})") def analyze_sqlite_db(db_path: str) -> None: print(f" • DB path: {db_path}") if not os.path.exists(db_path): print(" Status: ❌ not found") return # filesystem size (db file) try: fsize = os.path.getsize(db_path) print(f" File size: {human_bytes(fsize)}") except Exception: pass conn = open_ro(db_path) if not conn: print(" Status: ❌ open failed (read-only)") return now_epoch = int(time.time()) try: cur = conn.cursor() # SQLite stats cur.execute("PRAGMA page_size;") page_size = cur.fetchone()[0] cur.execute("PRAGMA page_count;") page_count = cur.fetchone()[0] cur.execute("PRAGMA freelist_count;") freelist_count = cur.fetchone()[0] payload = page_size * page_count print(f" Page size: {page_size} B") print(f" Page count: {page_count}") print(f" Payload size: {human_bytes(payload)}") print(f" Freelist pages: {freelist_count}") # tables tables = list_tables(cur) if not tables: print(" Tables: (none)") return print(" Tables:") for t in tables: print(f" - {t}") # row counts print(" Row counts:") for t in tables: try: cur.execute(f"SELECT COUNT(*) FROM {t}") cnt = cur.fetchone()[0] print(f" {t:<20} {cnt}") except Exception as e: print(f" {t:<20} (error: {e})") # choose entries table (DiskCache default is 'Cache') probe = next((n for n in tables if n.lower() == "cache"), None) or tables[0] cols_info = table_info(cur, probe) colnames = list(cols_info.keys()) lower = [c.lower() for c in colnames] # prefer 'access_time' (required by your request) time_col = next((c for c in colnames if c.lower() == TS_COL.lower()), None) if not time_col: print(" Note: No 'access_time' column found; skipping chronology-based lists.") # choose size & key columns size_col_l = pick_first_present(lower, [c.lower() for c in SIZE_CANDIDATES]) size_col = next((c for c in colnames if c.lower() == size_col_l), None) if size_col_l else None key_col_l = pick_first_present(lower, [c.lower() for c in KEY_CANDIDATES]) key_col = next((c for c in colnames if c.lower() == key_col_l), None) if key_col_l else None # print total payload if size_col: try: cur.execute(f"SELECT SUM({size_col}) FROM {probe}") total_b = cur.fetchone()[0] print(f" Total payload: {human_bytes(int(total_b or 0))} (sum of {probe}.{size_col})") except Exception as e: print(f" Total payload: (error: {e})") else: print(f" Total payload: (no size column detected)") # chronology lists (ONLY by access_time) if time_col: top_entries(cur, probe, key_col, size_col, time_col, asc=True, limit=10) top_entries(cur, probe, key_col, size_col, time_col, asc=False, limit=10) # largest entries top_largest(cur, probe, key_col, size_col, limit=10) # size stats & distribution size_stats(cur, probe, size_col) size_distribution(cur, probe, size_col) # expiration expire_col_l = pick_first_present(lower, [c.lower() for c in EXPIRE_COLS]) expire_col = next((c for c in colnames if c.lower() == expire_col_l), None) if expire_col_l else None expiration_stats(cur, probe, expire_col, now_epoch) # entries per day (last 14d) using access_time if time_col: entries_per_day(cur, probe, time_col, days=14) # top prefixes by bytes (if key is TEXT-like) top_prefixes_by_bytes(cur, probe, key_col, size_col, limit=10) finally: try: conn.close() except Exception: pass def analyze_cache(label: str, directory: str) -> None: print_header(f"{label.upper()} CACHE — {directory}") if not os.path.isdir(directory): print("Directory status: ❌ not found") return # On-disk usage (directory walk, read-only) try: usage = dir_usage_bytes(directory) print(f"Directory usage: {human_bytes(usage)}") except Exception: pass # Read-only inspection of sqlite DB (DiskCache uses cache.db) analyze_sqlite_db(os.path.join(directory, "cache.db")) print() # spacer def analyze_all_caches() -> None: for label, path in CACHE_DIRS.items(): analyze_cache(label, path) if __name__ == "__main__": print_header("ALL CACHES OVERVIEW") for k, v in CACHE_DIRS.items(): print(f"- {k:<5} -> {v}") print() analyze_all_caches()