From 462581b69885b15201fb6c88cc71168fb269bcbf Mon Sep 17 00:00:00 2001 From: lelo Date: Sat, 8 Nov 2025 17:14:14 +0000 Subject: [PATCH] add cache analyzer --- cache_analyzer.py | 472 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 472 insertions(+) create mode 100644 cache_analyzer.py diff --git a/cache_analyzer.py b/cache_analyzer.py new file mode 100644 index 0000000..bf2e37a --- /dev/null +++ b/cache_analyzer.py @@ -0,0 +1,472 @@ +#!/usr/bin/env python3 +""" +cache_analyzer.py (READ-ONLY) + +Analyzes DiskCache-backed caches (audio, image, video, other) by inspecting +their SQLite DBs in STRICT READ-ONLY mode (no writes, no cache opens). + +What it shows per cache: + - Directory usage (filesystem) + - DB stats (page size/count, freelist, tables, row counts) + - Total payload (sum of size column) + - 10 oldest / 10 latest entries by Cache.access_time + - Top 10 largest entries + - Size stats: avg, median (approx), min, max + - Size distribution buckets + - Expiration: expired count, expiring next 24h and next 7d (if expire_time present) + - Entries per day (last 14 days) based on access_time + - Top 10 prefixes by bytes (prefix = text before first '/'), if key is TEXT + +Usage: + python cache_analyzer.py + # override paths via env vars if needed: + FILECACHE_AUDIO=/path/to/filecache_audio python cache_analyzer.py +""" + +import os +import sqlite3 +import time +from datetime import datetime +from typing import Optional, Dict, Any, List, Tuple + +# -------- Config (override with env vars if needed) -------- +CACHE_DIRS = { + "audio": os.environ.get("FILECACHE_AUDIO", "./filecache_audio"), + "image": os.environ.get("FILECACHE_IMAGE", "./filecache_image"), + "video": os.environ.get("FILECACHE_VIDEO", "./filecache_video"), + "other": os.environ.get("FILECACHE_OTHER", "./filecache_other"), +} + +# Column heuristics (actual names in your DB appear to include: access_time, expire_time, size, key) +TS_COL = "access_time" # per your request: ONLY use Cache.access_time for recency/age +EXPIRE_COLS = ["expire_time"] # best-effort +SIZE_CANDIDATES = ["size", "bytes", "value_size", "data_size"] +KEY_CANDIDATES = ["key", "path", "name", "url", "k"] + + +def human_bytes(n: Optional[int]) -> str: + if n is None: + return "N/A" + step = 1024.0 + for unit in ("B", "KB", "MB", "GB", "TB", "PB", "EB"): + if n < step: + return f"{n:.2f} {unit}" + n /= step + return f"{n:.2f} ZB" + + +def print_header(title: str) -> None: + line = "=" * max(60, len(title) + 8) + print(line) + print(f"📦 {title}") + print(line) + + +def dir_usage_bytes(path: str) -> int: + total = 0 + for root, _, files in os.walk(path): + for f in files: + fp = os.path.join(root, f) + try: + total += os.path.getsize(fp) + except OSError: + pass + return total + + +def open_ro(db_path: str) -> Optional[sqlite3.Connection]: + try: + return sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) + except sqlite3.Error: + return None + + +def list_tables(cur: sqlite3.Cursor) -> List[str]: + cur.execute(""" + SELECT name FROM sqlite_master + WHERE type='table' AND name NOT LIKE 'sqlite_%' + ORDER BY name + """) + return [r[0] for r in cur.fetchall()] + + +def table_info(cur: sqlite3.Cursor, name: str) -> Dict[str, Dict[str, Any]]: + cur.execute(f"PRAGMA table_info({name})") + info = {} + for cid, cname, ctype, *_ in cur.fetchall(): + info[cname] = {"cid": cid, "ctype": ctype} + return info + + +def pick_first_present(cols: List[str], options: List[str]) -> Optional[str]: + lower = {c.lower(): c for c in cols} + for opt in options: + if opt in lower: + return lower[opt] + return None + + +def fmt_epoch(v: Optional[float]) -> str: + if v is None: + return "N/A" + try: + fv = float(v) + if fv <= 0 or fv > 1e11: + return str(v) + return datetime.utcfromtimestamp(fv).isoformat() + "Z" + except Exception: + return str(v) + + +def key_preview(v: Any) -> str: + if v is None: + return "NULL" + if isinstance(v, (bytes, bytearray)): + try: + s = v.decode("utf-8", errors="strict") + return s[:120] + ("…" if len(s) > 120 else "") + except Exception: + hx = v.hex() + return "0x" + (hx[:120] + ("…" if len(hx) > 120 else "")) + s = str(v) + return s[:120] + ("…" if len(s) > 120 else "") + + +def median_via_sql(cur: sqlite3.Cursor, table: str, size_col: str) -> Optional[float]: + # Approximate median using LIMIT/OFFSET (works fine for typical cache sizes) + try: + cur.execute(f"SELECT COUNT(*) FROM {table} WHERE {size_col} IS NOT NULL") + n = cur.fetchone()[0] or 0 + if n == 0: + return None + offset = (n - 1) // 2 + cur.execute(f""" + SELECT {size_col} FROM {table} + WHERE {size_col} IS NOT NULL + ORDER BY {size_col} ASC + LIMIT 1 OFFSET {offset} + """) + row = cur.fetchone() + return float(row[0]) if row else None + except Exception: + return None + + +def top_entries(cur: sqlite3.Cursor, table: str, key_col: Optional[str], size_col: Optional[str], + time_col: str, asc: bool, limit: int = 10): + order = "ASC" if asc else "DESC" + cols = [time_col] + if key_col: + cols.append(key_col) + if size_col: + cols.append(size_col) + col_list = ", ".join(cols) + try: + cur.execute(f""" + SELECT {col_list} FROM {table} + WHERE {time_col} IS NOT NULL + ORDER BY {time_col} {order} + LIMIT {limit} + """) + rows = cur.fetchall() + label = "Oldest" if asc else "Latest" + print(f" {label} {limit} by {time_col}:") + for r in rows: + i = 0 + t_s = fmt_epoch(r[i]); i += 1 + k_s = key_preview(r[i]) if key_col else "-" + i += 1 if key_col else 0 + sz_s = human_bytes(int(r[i])) if (size_col and r[i] is not None) else "-" + print(f" time={t_s} key={k_s} size={sz_s}") + except Exception as e: + print(f" ({label.lower()} query error: {e})") + + +def top_largest(cur: sqlite3.Cursor, table: str, key_col: Optional[str], size_col: Optional[str], limit: int = 10): + if not size_col: + print(" Top largest: (no size column)") + return + cols = [size_col] + if key_col: + cols.append(key_col) + col_list = ", ".join(cols) + try: + cur.execute(f""" + SELECT {col_list} FROM {table} + WHERE {size_col} IS NOT NULL + ORDER BY {size_col} DESC + LIMIT {limit} + """) + rows = cur.fetchall() + print(f" Top {limit} largest entries:") + for r in rows: + sz = human_bytes(int(r[0] or 0)) + key_s = key_preview(r[1]) if key_col else "-" + print(f" size={sz} key={key_s}") + except Exception as e: + print(f" (largest query error: {e})") + + +def size_stats(cur: sqlite3.Cursor, table: str, size_col: Optional[str]): + if not size_col: + print(" Size stats: (no size column)") + return + try: + cur.execute(f""" + SELECT COUNT(*), SUM({size_col}), AVG({size_col}), + MIN({size_col}), MAX({size_col}) + FROM {table} + WHERE {size_col} IS NOT NULL + """) + cnt, total_b, avg_b, min_b, max_b = cur.fetchone() + med_b = median_via_sql(cur, table, size_col) + print(" Size stats:") + print(f" entries: {cnt}") + print(f" total: {human_bytes(int(total_b or 0))}") + print(f" avg: {human_bytes(int(avg_b or 0)) if avg_b else 'N/A'}") + print(f" median: {human_bytes(int(med_b or 0)) if med_b else 'N/A'}") + print(f" min: {human_bytes(int(min_b or 0)) if min_b is not None else 'N/A'}") + print(f" max: {human_bytes(int(max_b or 0)) if max_b is not None else 'N/A'}") + except Exception as e: + print(f" (size stats error: {e})") + + +def size_distribution(cur: sqlite3.Cursor, table: str, size_col: Optional[str]): + if not size_col: + print(" Size distribution: (no size column)") + return + # Buckets: <1MB, 1-10MB, 10-100MB, 100MB-1GB, 1-5GB, >=5GB + try: + cur.execute(f""" + SELECT + SUM(CASE WHEN {size_col} < 1024*1024 THEN 1 ELSE 0 END) AS lt_1mb, + SUM(CASE WHEN {size_col} >= 1024*1024 AND {size_col} < 10*1024*1024 THEN 1 ELSE 0 END) AS _1_10mb, + SUM(CASE WHEN {size_col} >= 10*1024*1024 AND {size_col} < 100*1024*1024 THEN 1 ELSE 0 END) AS _10_100mb, + SUM(CASE WHEN {size_col} >= 100*1024*1024 AND {size_col} < 1024*1024*1024 THEN 1 ELSE 0 END) AS _100mb_1gb, + SUM(CASE WHEN {size_col} >= 1024*1024*1024 AND {size_col} < 5*1024*1024*1024 THEN 1 ELSE 0 END) AS _1_5gb, + SUM(CASE WHEN {size_col} >= 5*1024*1024*1024 THEN 1 ELSE 0 END) AS gte_5gb + FROM {table} + WHERE {size_col} IS NOT NULL + """) + (lt1, b1, b2, b3, b4, g5) = [int(x or 0) for x in cur.fetchone()] + print(" Size distribution (count):") + print(f" <1MB: {lt1}") + print(f" 1–10MB: {b1}") + print(f" 10–100MB: {b2}") + print(f" 100MB–1GB: {b3}") + print(f" 1–5GB: {b4}") + print(f" ≥5GB: {g5}") + except Exception as e: + print(f" (size distribution error: {e})") + + +def expiration_stats(cur: sqlite3.Cursor, table: str, expire_col: Optional[str], now_epoch: int): + if not expire_col: + print(" Expiration: (no expire column)") + return + try: + cur.execute(f"SELECT COUNT(*) FROM {table} WHERE {expire_col} IS NOT NULL AND {expire_col} < ?", (now_epoch,)) + expired = cur.fetchone()[0] or 0 + cur.execute(f"SELECT COUNT(*) FROM {table} WHERE {expire_col} >= ? AND {expire_col} < ?", (now_epoch, now_epoch + 24*3600)) + exp_24h = cur.fetchone()[0] or 0 + cur.execute(f"SELECT COUNT(*) FROM {table} WHERE {expire_col} >= ? AND {expire_col} < ?", (now_epoch, now_epoch + 7*24*3600)) + exp_7d = cur.fetchone()[0] or 0 + print(" Expiration (by expire_time):") + print(f" expired: {expired}") + print(f" expiring in 24h: {exp_24h}") + print(f" expiring in 7d: {exp_7d}") + except Exception as e: + print(f" (expiration stats error: {e})") + + +def entries_per_day(cur: sqlite3.Cursor, table: str, time_col: str, days: int = 14): + try: + cur.execute(f""" + SELECT date({time_col}, 'unixepoch') AS d, COUNT(*) + FROM {table} + WHERE {time_col} IS NOT NULL + GROUP BY d + ORDER BY d DESC + LIMIT {days} + """) + rows = cur.fetchall() + print(f" Entries per day (last {days} days):") + for d, c in rows: + print(f" {d}: {c}") + except Exception as e: + print(f" (entries-per-day error: {e})") + + +def top_prefixes_by_bytes(cur: sqlite3.Cursor, table: str, key_col: Optional[str], size_col: Optional[str], limit: int = 10): + if not key_col or not size_col: + print(" Top prefixes: (need key and size columns)") + return + # Only consider TEXT keys (skip BLOB keys) + try: + cur.execute(f""" + SELECT + CASE + WHEN typeof({key_col})='text' AND instr({key_col}, '/')>0 + THEN substr({key_col}, 1, instr({key_col}, '/')-1) + WHEN typeof({key_col})='text' + THEN {key_col} + ELSE '(non-text)' + END AS prefix, + SUM({size_col}) AS total_bytes, + COUNT(*) AS cnt + FROM {table} + WHERE {size_col} IS NOT NULL + GROUP BY prefix + ORDER BY total_bytes DESC + LIMIT {limit} + """) + rows = cur.fetchall() + print(f" Top {limit} prefixes by bytes:") + for prefix, total_b, cnt in rows: + print(f" {prefix}: {human_bytes(int(total_b or 0))} across {cnt} entries") + except Exception as e: + print(f" (top prefixes error: {e})") + + +def analyze_sqlite_db(db_path: str) -> None: + print(f" • DB path: {db_path}") + if not os.path.exists(db_path): + print(" Status: ❌ not found") + return + + # filesystem size (db file) + try: + fsize = os.path.getsize(db_path) + print(f" File size: {human_bytes(fsize)}") + except Exception: + pass + + conn = open_ro(db_path) + if not conn: + print(" Status: ❌ open failed (read-only)") + return + + now_epoch = int(time.time()) + + try: + cur = conn.cursor() + # SQLite stats + cur.execute("PRAGMA page_size;") + page_size = cur.fetchone()[0] + cur.execute("PRAGMA page_count;") + page_count = cur.fetchone()[0] + cur.execute("PRAGMA freelist_count;") + freelist_count = cur.fetchone()[0] + payload = page_size * page_count + print(f" Page size: {page_size} B") + print(f" Page count: {page_count}") + print(f" Payload size: {human_bytes(payload)}") + print(f" Freelist pages: {freelist_count}") + + # tables + tables = list_tables(cur) + if not tables: + print(" Tables: (none)") + return + + print(" Tables:") + for t in tables: + print(f" - {t}") + + # row counts + print(" Row counts:") + for t in tables: + try: + cur.execute(f"SELECT COUNT(*) FROM {t}") + cnt = cur.fetchone()[0] + print(f" {t:<20} {cnt}") + except Exception as e: + print(f" {t:<20} (error: {e})") + + # choose entries table (DiskCache default is 'Cache') + probe = next((n for n in tables if n.lower() == "cache"), None) or tables[0] + cols_info = table_info(cur, probe) + colnames = list(cols_info.keys()) + lower = [c.lower() for c in colnames] + + # prefer 'access_time' (required by your request) + time_col = next((c for c in colnames if c.lower() == TS_COL.lower()), None) + if not time_col: + print(" Note: No 'access_time' column found; skipping chronology-based lists.") + # choose size & key columns + size_col_l = pick_first_present(lower, [c.lower() for c in SIZE_CANDIDATES]) + size_col = next((c for c in colnames if c.lower() == size_col_l), None) if size_col_l else None + key_col_l = pick_first_present(lower, [c.lower() for c in KEY_CANDIDATES]) + key_col = next((c for c in colnames if c.lower() == key_col_l), None) if key_col_l else None + + # print total payload + if size_col: + try: + cur.execute(f"SELECT SUM({size_col}) FROM {probe}") + total_b = cur.fetchone()[0] + print(f" Total payload: {human_bytes(int(total_b or 0))} (sum of {probe}.{size_col})") + except Exception as e: + print(f" Total payload: (error: {e})") + else: + print(f" Total payload: (no size column detected)") + + # chronology lists (ONLY by access_time) + if time_col: + top_entries(cur, probe, key_col, size_col, time_col, asc=True, limit=10) + top_entries(cur, probe, key_col, size_col, time_col, asc=False, limit=10) + + # largest entries + top_largest(cur, probe, key_col, size_col, limit=10) + + # size stats & distribution + size_stats(cur, probe, size_col) + size_distribution(cur, probe, size_col) + + # expiration + expire_col_l = pick_first_present(lower, [c.lower() for c in EXPIRE_COLS]) + expire_col = next((c for c in colnames if c.lower() == expire_col_l), None) if expire_col_l else None + expiration_stats(cur, probe, expire_col, now_epoch) + + # entries per day (last 14d) using access_time + if time_col: + entries_per_day(cur, probe, time_col, days=14) + + # top prefixes by bytes (if key is TEXT-like) + top_prefixes_by_bytes(cur, probe, key_col, size_col, limit=10) + + finally: + try: + conn.close() + except Exception: + pass + + +def analyze_cache(label: str, directory: str) -> None: + print_header(f"{label.upper()} CACHE — {directory}") + if not os.path.isdir(directory): + print("Directory status: ❌ not found") + return + + # On-disk usage (directory walk, read-only) + try: + usage = dir_usage_bytes(directory) + print(f"Directory usage: {human_bytes(usage)}") + except Exception: + pass + + # Read-only inspection of sqlite DB (DiskCache uses cache.db) + analyze_sqlite_db(os.path.join(directory, "cache.db")) + print() # spacer + + +def analyze_all_caches() -> None: + for label, path in CACHE_DIRS.items(): + analyze_cache(label, path) + + +if __name__ == "__main__": + print_header("ALL CACHES OVERVIEW") + for k, v in CACHE_DIRS.items(): + print(f"- {k:<5} -> {v}") + print() + analyze_all_caches()