bethaus-app/analytics.py

705 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sqlite3
from flask import render_template, request, session
from datetime import datetime, timedelta, timezone
import geoip2.database
from auth import require_secret
from collections import defaultdict
import pandas as pd
import json
import os
import auth
import helperfunctions as hf
file_access_temp = []
folder_today = []
folder_yesterday = []
app_config = auth.return_app_config()
# Create a single global connection to SQLite
log_db = sqlite3.connect("access_log.db", check_same_thread=False)
search_db = sqlite3.connect("search.db", check_same_thread=False)
# geo location
geoReader = geoip2.database.Reader('GeoLite2-City.mmdb')
def init_log_db():
"""Create the file_access_log table if it doesn't already exist."""
with log_db:
log_db.execute('''
CREATE TABLE IF NOT EXISTS file_access_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
rel_path TEXT,
filesize INTEGER,
mime TEXT,
city TEXT,
country TEXT,
user_agent TEXT,
device_id TEXT,
cached BOOLEAN
)
''')
init_log_db()
def lookup_location(ip):
try:
response = geoReader.city(ip)
country = response.country.name if response.country.name else "Unknown"
city = response.city.name if response.city.name else "Unknown"
return city, country
except Exception:
return "Unknown", "Unknown"
def get_device_type(user_agent):
"""Classify device type based on user agent string."""
if 'Android' in user_agent:
return 'Android'
elif 'iPhone' in user_agent or 'iPad' in user_agent:
return 'iOS'
elif 'Windows' in user_agent:
return 'Windows'
elif 'Macintosh' in user_agent or 'Mac OS' in user_agent:
return 'MacOS'
elif 'Linux' in user_agent:
return 'Linux'
else:
return 'Other'
def parse_timestamp(ts_str):
try:
# Try the normal ISO parsing.
return datetime.fromisoformat(ts_str)
except ValueError as e:
if 'unconverted data remains' in str(e):
# Find where the timezone starts. Look for a '+' or '-' after the time.
for sign in ['+', '-']:
pos = ts_str.find(sign)
if pos != -1:
# Assume the base part is up to pos and then the tz part
base = ts_str[:pos]
tz_part = ts_str[pos:]
# Remove any colon from the tz part to help with parsing.
tz_clean = tz_part.replace(':', '')
# Try parsing the base part. It might or might not have fractional seconds.
try:
dt = datetime.fromisoformat(base)
except ValueError:
dt = datetime.strptime(base, '%Y-%m-%dT%H:%M:%S')
# Extract hours and minutes from the tz portion.
try:
offset_hours = int(tz_clean[1:3])
offset_minutes = int(tz_clean[3:5])
except Exception:
raise ValueError(f"Unable to parse timezone from {ts_str}")
offset = timedelta(hours=offset_hours, minutes=offset_minutes)
if tz_clean[0] == '-':
offset = -offset
# Return a timezone-aware datetime.
return dt.replace(tzinfo=timezone(offset))
# If it's some other ValueError, re-raise it.
raise
def log_file_access(rel_path, filesize, mime, ip_address, user_agent, device_id, cached):
"""Insert a file access record into the database and prune entries older than 10 minutes,
and track todays files separately in folder_today."""
global file_access_temp, folder_today, folder_yesterday
# Create a timezone-aware timestamp
now = datetime.now(timezone.utc).astimezone()
iso_ts = now.isoformat()
# Convert the IP address to a location
city, country = lookup_location(ip_address)
with log_db:
log_db.execute('''
INSERT INTO file_access_log
(timestamp, rel_path, filesize, mime, city, country, user_agent, device_id, cached)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (iso_ts, rel_path, filesize, mime, city, country, user_agent, device_id, cached))
# Prune temp entries older than 10 minutes
cutoff = now - timedelta(minutes=10)
file_access_temp[:] = [
entry for entry in file_access_temp
if parse_timestamp(entry[0]) >= cutoff
]
# Keep only today's entries in folder_today
today_str = iso_ts.split('T', 1)[0]
folder_today[:] = [
entry for entry in folder_today
if entry['date_str'] == today_str
]
# Keep only yesterday's entries in folder_yesterday
yesterday_str = (now - timedelta(days=1)).isoformat().split('T', 1)[0]
folder_yesterday[:] = [
entry for entry in folder_yesterday
if entry['date_str'] == yesterday_str
]
# If this new access is from today, record it
# Compare the helpers YYYY-MM-DD string to todays ISO date string
date_from_path = hf.extract_date_from_string(rel_path)
if date_from_path == today_str:
# get just the folder part (everything before the final '/')
folder_path = rel_path.rsplit('/', 1)[0] if '/' in rel_path else rel_path
# only append if that folder isn't already in folder_today
if not any(entry['rel_path'] == folder_path for entry in folder_today):
folder_today.append({'date_str': today_str, 'rel_path': folder_path})
# If this new access is from yesterday, record it
if date_from_path == yesterday_str:
# get just the folder part (everything before the final '/')
folder_path = rel_path.rsplit('/', 1)[0] if '/' in rel_path else rel_path
# only append if that folder isn't already in folder_yesterday
if not any(entry['rel_path'] == folder_path for entry in folder_yesterday):
folder_yesterday.append({'date_str': yesterday_str, 'rel_path': folder_path})
# Finally, insert the new access at the top of the temp log
file_access_temp.insert(0, [
iso_ts,
rel_path,
filesize,
mime,
f"{city}, {country}",
user_agent,
device_id,
cached
])
return True
def return_folder_today():
"""
Return only those folder_today entries whose first segment
(up to the first '/') is in session['folders'].keys().
"""
valid_keys = set(session.get('folders', {}).keys())
filtered = []
for entry in folder_today:
# get the part before the first slash
top_level = entry['rel_path'].split('/', 1)[0]
# include only if this segment is one of the session keys
if top_level in valid_keys:
filtered.append(entry)
return filtered
def return_folder_yesterday():
"""
Return only those folder_yesterday entries whose first segment
(up to the first '/') is in session['folders'].keys().
"""
valid_keys = set(session.get('folders', {}).keys())
filtered = []
for entry in folder_yesterday:
# get the part before the first slash
top_level = entry['rel_path'].split('/', 1)[0]
# include only if this segment is one of the session keys
if top_level in valid_keys:
filtered.append(entry)
return filtered
def return_file_access():
"""Return recent file access logs from memory (the last 10 minutes)."""
global file_access_temp
if file_access_temp:
# Create a timezone-aware cutoff time
cutoff_time = datetime.now(timezone.utc).astimezone() - timedelta(minutes=10)
# Only keep entries with timestamps greater than or equal to cutoff_time
file_access_temp[:] = [
entry for entry in file_access_temp
if datetime.fromisoformat(entry[0]) >= cutoff_time
]
return file_access_temp
else:
return []
def songs_dashboard():
# — SESSION & PARAM HANDLING (unchanged) —
if 'songs_dashboard_timeframe' not in session:
session['songs_dashboard_timeframe'] = "30"
timeframe_param = request.args.get("timeframe", session['songs_dashboard_timeframe'])
session['songs_dashboard_timeframe'] = timeframe_param
if 'songs_dashboard_category' not in session:
session['songs_dashboard_category'] = "Gemeinsamer Gesang"
category = request.args.get("category", session['songs_dashboard_category'])
session['songs_dashboard_category'] = category
if 'songs_dashboard_site' not in session:
session['songs_dashboard_site'] = "Speyer"
site = request.args.get("site", session['songs_dashboard_site'])
session['songs_dashboard_site'] = site
# — DETERMINE CUTOFF + TODAY STRINGS —
now = datetime.now()
params = [category, site]
date_clauses = []
if timeframe_param != "all":
cutoff = now - timedelta(days=int(timeframe_param))
date_clauses.append("performance_date >= ?")
params.append(cutoff.strftime("%Y-%m-%d"))
# filter out any future-dated rows at the DB level
date_clauses.append("performance_date <= ?")
params.append(now.strftime("%Y-%m-%d"))
where_sql = " AND ".join(["category = ?", "site = ?"] + date_clauses)
cursor = search_db.cursor()
cursor.execute(
f"SELECT titel, performance_date FROM files WHERE {where_sql}",
params
)
rows = cursor.fetchall()
# — AGGREGATE COUNTS + LAST-PERFORMED, WITH ERROR LOGGING —
performance_counts = defaultdict(int)
last_performed_dates = {}
for titel, perf_date_str in rows:
if not perf_date_str:
continue
perf_date_str = perf_date_str.strip()
try:
perf_date = datetime.strptime(perf_date_str, "%Y-%m-%d")
except ValueError:
print(f"[songs_dashboard] bad date format for “{titel}”: “{perf_date_str}")
continue
performance_counts[titel] += 1
prev = last_performed_dates.get(titel)
if prev is None or perf_date > prev:
last_performed_dates[titel] = perf_date
# — BUILD LIST FOR TEMPLATE —
performance_data = []
for titel, count in performance_counts.items():
last_str = last_performed_dates[titel].strftime("%d.%m.%Y")
performance_data.append({
"titel": titel,
"count": count,
"last_performed": last_str
})
performance_data.sort(key=lambda x: x["count"], reverse=True)
# — RENDER —
return render_template(
'songs_dashboard.html',
timeframe=timeframe_param,
performance_data=performance_data,
site=site,
category=category,
admin_enabled=auth.is_admin(),
title_short=app_config.get('TITLE_SHORT', 'Default Title'),
title_long= app_config.get('TITLE_LONG', 'Default Title'),
)
@require_secret
def connections():
title_short = app_config.get('TITLE_SHORT', 'Default Title')
title_long = app_config.get('TITLE_LONG' , 'Default Title')
return render_template('connections.html',
admin_enabled=auth.is_admin(),
title_short=title_short,
title_long=title_long)
@require_secret
def dashboard():
if 'filetype' not in session:
session['filetype'] = 'audio'
if 'timeframe' not in session:
session['timeframe'] = 'last24hours'
session['filetype'] = request.args.get('filetype', session['filetype'])
session['timeframe'] = request.args.get('timeframe', session['timeframe'])
now = datetime.now()
# default filetype if not found
filetype = 'other'
# Some simplistic sets to decide how we match the MIME type
audio_list = ['mp3', 'wav', 'ton', 'audio']
image_list = ['jpg', 'jpeg', 'image', 'photo', 'bild', 'foto']
video_list = ['mp4', 'mov', 'wmv', 'avi', 'film', 'video']
if session['filetype'].lower() in audio_list:
filetype = 'audio/'
elif session['filetype'].lower() in image_list:
filetype = 'image/'
elif session['filetype'].lower() in video_list:
filetype = 'video/'
# Determine start time based on session['timeframe']
if session['timeframe'] == 'last24hours':
start_dt = now - timedelta(hours=24)
elif session['timeframe'] == '7days':
start_dt = now - timedelta(days=7)
elif session['timeframe'] == '30days':
start_dt = now - timedelta(days=30)
elif session['timeframe'] == '365days':
start_dt = now - timedelta(days=365)
else:
start_dt = now - timedelta(hours=24)
# We'll compare the textual timestamp (ISO 8601).
start_str = start_dt.isoformat()
# Build the SQL filter
if filetype == 'other':
# Exclude audio, image, video
filetype_filter_sql = (
"AND mime NOT LIKE 'audio/%' "
"AND mime NOT LIKE 'image/%' "
"AND mime NOT LIKE 'video/%' "
)
params_for_filter = (start_str,)
else:
# Filter for mimes that start with the given type
filetype_filter_sql = "AND mime LIKE ?"
params_for_filter = (start_str, filetype + '%')
# 1. Top files by access count
# removed and moved to file_access() function
# 2. Distinct device trend
# We'll group by hour if "today", by day if "7days"/"30days", by month if "365days"
if session['timeframe'] == 'last24hours':
# Group by hour: substr(timestamp, 12, 2) -> HH
query = f'''
SELECT strftime('%Y-%m-%dT%H:00:00Z', replace(timestamp, 'T', ' ')) AS bucket, COUNT(DISTINCT device_id) AS count
FROM file_access_log
WHERE timestamp >= ? {filetype_filter_sql}
GROUP BY bucket
ORDER BY bucket
'''
elif session['timeframe'] in ('7days', '30days'):
# Group by day: substr(timestamp, 1, 10) -> YYYY-MM-DD
query = f'''
SELECT substr(timestamp, 1, 10) AS bucket, COUNT(DISTINCT device_id) AS count
FROM file_access_log
WHERE timestamp >= ? {filetype_filter_sql}
GROUP BY bucket
ORDER BY bucket
'''
elif session['timeframe'] == '365days':
# Group by month: substr(timestamp, 1, 7) -> YYYY-MM
query = f'''
SELECT substr(timestamp, 1, 7) AS bucket, COUNT(DISTINCT device_id) AS count
FROM file_access_log
WHERE timestamp >= ? {filetype_filter_sql}
GROUP BY bucket
ORDER BY bucket
'''
else:
# Default: group by day
query = f'''
SELECT substr(timestamp, 1, 10) AS bucket, COUNT(DISTINCT device_id) AS count
FROM file_access_log
WHERE timestamp >= ? {filetype_filter_sql}
GROUP BY bucket
ORDER BY bucket
'''
with log_db:
cursor = log_db.execute(query, params_for_filter)
distinct_device_data_rows = cursor.fetchall()
distinct_device_data = [
dict(bucket=r[0], count=r[1]) for r in distinct_device_data_rows
]
# 3. Download trend
# We'll group by hour if "today", by day if "7days"/"30days", by month if "365days".
if session['timeframe'] == 'last24hours':
# Hour: substr(timestamp, 12, 2) -> HH
query = f'''
SELECT strftime('%Y-%m-%dT%H:00:00Z', replace(timestamp, 'T', ' ')) AS bucket, COUNT(*) AS count
FROM file_access_log
WHERE timestamp >= ? {filetype_filter_sql}
GROUP BY bucket
ORDER BY bucket
'''
elif session['timeframe'] in ('7days', '30days'):
# Day: substr(timestamp, 1, 10) -> YYYY-MM-DD
query = f'''
SELECT substr(timestamp, 1, 10) AS bucket, COUNT(*) AS count
FROM file_access_log
WHERE timestamp >= ? {filetype_filter_sql}
GROUP BY bucket
ORDER BY bucket
'''
elif session['timeframe'] == '365days':
# Month: substr(timestamp, 1, 7) -> YYYY-MM
query = f'''
SELECT substr(timestamp, 1, 7) AS bucket, COUNT(*) AS count
FROM file_access_log
WHERE timestamp >= ? {filetype_filter_sql}
GROUP BY bucket
ORDER BY bucket
'''
else:
# Default: group by day
query = f'''
SELECT substr(timestamp, 1, 10) AS bucket, COUNT(*) AS count
FROM file_access_log
WHERE timestamp >= ? {filetype_filter_sql}
GROUP BY bucket
ORDER BY bucket
'''
with log_db:
cursor = log_db.execute(query, params_for_filter)
timeframe_data_rows = cursor.fetchall()
timeframe_data = [
dict(bucket=r[0], count=r[1]) for r in timeframe_data_rows
]
# 4. User agent distribution: Count user_agent once per device_id
query = f'''
SELECT user_agent, COUNT(DISTINCT device_id) AS count
FROM file_access_log
WHERE timestamp >= ? {filetype_filter_sql}
GROUP BY user_agent
ORDER BY count DESC
'''
with log_db:
cursor = log_db.execute(query, params_for_filter)
raw_user_agents = cursor.fetchall()
device_counts = {}
for (ua, cnt) in raw_user_agents:
device = get_device_type(ua)
device_counts[device] = device_counts.get(device, 0) + cnt
user_agent_data = [
dict(device=d, count=c) for d, c in device_counts.items()
]
# 5. Parent folder distribution
query = f'''
SELECT rel_path, COUNT(*) AS count
FROM file_access_log
WHERE timestamp >= ? {filetype_filter_sql}
GROUP BY rel_path
ORDER BY count DESC
'''
folder_data_dict = {}
with log_db:
cursor = log_db.execute(query, params_for_filter)
for (rp, c) in cursor.fetchall():
if '/' in rp:
parent_folder = rp.rsplit('/', 1)[0]
else:
parent_folder = "Root"
folder_data_dict[parent_folder] = folder_data_dict.get(parent_folder, 0) + c
folder_data = [dict(folder=f, count=cnt) for f, cnt in folder_data_dict.items()]
folder_data.sort(key=lambda x: x['count'], reverse=True)
folder_data = folder_data[:10]
# 6. Aggregate locations with counts
query = f'''
SELECT city, country, COUNT(*) as count
FROM file_access_log
WHERE timestamp >= ? {filetype_filter_sql}
GROUP BY city, country
ORDER BY count DESC
'''
with log_db:
cursor = log_db.execute(query, params_for_filter)
locations = cursor.fetchall()
# 7. Summary stats
# total_accesses
query = f'''
SELECT COUNT(*)
FROM file_access_log
WHERE timestamp >= ? {filetype_filter_sql}
'''
with log_db:
cursor = log_db.execute(query, params_for_filter)
total_accesses = cursor.fetchone()[0]
# unique_files
query = f'''
SELECT COUNT(DISTINCT rel_path)
FROM file_access_log
WHERE timestamp >= ? {filetype_filter_sql}
'''
with log_db:
cursor = log_db.execute(query, params_for_filter)
unique_files = cursor.fetchone()[0]
# unique_user
query = f'''
SELECT COUNT(DISTINCT device_id)
FROM file_access_log
WHERE timestamp >= ? {filetype_filter_sql}
'''
with log_db:
cursor = log_db.execute(query, params_for_filter)
unique_user = cursor.fetchone()[0]
# Percentage of cached calls
query = f'''
SELECT (CAST(SUM(CASE WHEN cached = 1 THEN 1 ELSE 0 END) AS FLOAT) / COUNT(*)) * 100
FROM file_access_log
WHERE timestamp >= ? {filetype_filter_sql}
'''
with log_db:
cursor = log_db.execute(query, params_for_filter)
cached_percentage = cursor.fetchone()[0]
if cached_percentage is not None:
cached_percentage = f"{cached_percentage:.2f}"
# 8. Process location data
location_data_dict = {}
for (city, country, cnt) in locations:
key = (city, country)
location_data_dict[key] = location_data_dict.get(key, 0) + cnt
location_data = [
dict(city=k[0], country=k[1], count=v)
for k, v in location_data_dict.items()
]
location_data.sort(key=lambda x: x['count'], reverse=True)
location_data = location_data[:20]
title_short = app_config.get('TITLE_SHORT', 'Default Title')
title_long = app_config.get('TITLE_LONG' , 'Default Title')
return render_template(
"dashboard.html",
timeframe=session['timeframe'],
distinct_device_data=distinct_device_data,
user_agent_data=user_agent_data,
folder_data=folder_data,
location_data=location_data,
total_accesses=total_accesses,
unique_files=unique_files,
unique_user=unique_user,
cached_percentage=cached_percentage,
timeframe_data=timeframe_data,
admin_enabled=auth.is_admin(),
title_short=title_short,
title_long=title_long
)
@require_secret
def file_access():
if 'timeframe' not in session:
session['timeframe'] = 'last24hours'
session['timeframe'] = request.args.get('timeframe', session['timeframe'])
now = datetime.now()
filetype = 'audio/'
# Determine start time based on session['timeframe']
if session['timeframe'] == 'last24hours':
start_dt = now - timedelta(hours=24)
elif session['timeframe'] == '7days':
start_dt = now - timedelta(days=7)
elif session['timeframe'] == '30days':
start_dt = now - timedelta(days=30)
elif session['timeframe'] == '365days':
start_dt = now - timedelta(days=365)
else:
start_dt = now - timedelta(hours=24)
# We'll compare the textual timestamp (ISO 8601).
start_str = start_dt.isoformat()
# Filter for mimes that start with the given type
filetype_filter_sql = "AND mime LIKE ?"
params_for_filter = (start_str, filetype + '%')
# 1. Top files by access count
query = f'''
SELECT rel_path, COUNT(*) as access_count
FROM file_access_log
WHERE timestamp >= ? {filetype_filter_sql}
GROUP BY rel_path
ORDER BY access_count DESC
LIMIT 1000
'''
with log_db:
cursor = log_db.execute(query, params_for_filter)
rows = cursor.fetchall()
# Convert rows to a list of dictionaries and add category
rows = [
{
'rel_path': rel_path,
'access_count': access_count,
'category': hf.extract_structure_from_string(rel_path)[0]
}
for rel_path, access_count in rows
]
# Get possible categories from the rows
categories = sorted({r['category'] for r in rows if r['category'] is not None})
all_categories = [None] + categories
top20 = []
for category in all_categories:
label = category if category is not None else 'Keine Kategorie gefunden !'
files = [r for r in rows if r['category'] == category][:20]
top20.append({
'category': label,
'files': files
})
title_short = app_config.get('TITLE_SHORT', 'Default Title')
title_long = app_config.get('TITLE_LONG' , 'Default Title')
return render_template(
"file_access.html",
timeframe=session['timeframe'],
top20 = top20,
admin_enabled=auth.is_admin(),
title_short=title_short,
title_long=title_long
)
def export_to_excel():
"""Export search_db to an Excel file and store it locally."""
# Query all data from the search_db
query = "SELECT * FROM files"
cursor = search_db.cursor()
cursor.execute(query)
rows = cursor.fetchall()
# Get column names from the cursor description
column_names = [description[0] for description in cursor.description]
# Create a DataFrame and save it to an Excel file
df = pd.DataFrame(rows, columns=column_names)
df = df.drop(columns=['transcript'], errors='ignore') # Drop the 'id' column if it exists
df.to_excel("search_db.xlsx", index=False)
# Close the cursor and database connection
cursor.close()
if __name__ == "__main__":
print("Running as a standalone script.")
export_to_excel()
print("Exported search_db to search_db.xlsx")