bethaus-app/analytics.py
2025-03-31 17:37:39 +00:00

266 lines
9.9 KiB
Python

from flask import render_template, request, session
from datetime import datetime, timedelta
import geoip2.database
from urllib.parse import urlparse, unquote
from auth import require_secret
import os
import threading
import psycopg2
file_access_temp = []
# Thread-safe singleton metaclass.
class SingletonMeta(type):
_instances = {}
_lock = threading.Lock() # Ensures thread safety.
def __call__(cls, *args, **kwargs):
with cls._lock:
if cls not in cls._instances:
instance = super().__call__(*args, **kwargs)
cls._instances[cls] = instance
return cls._instances[cls]
# Database class that only handles the connection.
class Database(metaclass=SingletonMeta):
def __init__(self):
self.dbname = os.environ.get('DB_NAME')
self.user = os.environ.get('DB_USER')
self.password = os.environ.get('DB_PASSWORD')
self.host = os.environ.get('DB_HOST')
self.port = int(os.environ.get('DB_PORT', 5432))
self.connection = psycopg2.connect(dbname=self.dbname,
user=self.user,
password=self.password,
host=self.host,
port=self.port)
# Enable autocommit so we don't have to call commit() after every transaction.
self.connection.autocommit = True
# Create a global database instance.
log_db = Database()
def lookup_location(ip, reader):
try:
response = reader.city(ip)
country = response.country.name if response.country.name else "Unknown"
city = response.city.name if response.city.name else "Unknown"
return country, city
except Exception:
return "Unknown", "Unknown"
def get_device_type(user_agent):
"Classify device type based on user agent string"
if 'Android' in user_agent:
return 'Android'
elif 'iPhone' in user_agent or 'iPad' in user_agent:
return 'iOS'
elif 'Windows' in user_agent:
return 'Windows'
elif 'Macintosh' in user_agent or 'Mac OS' in user_agent:
return 'MacOS'
elif 'Linux' in user_agent:
return 'Linux'
else:
return 'Other'
# Function to initialize the database.
def init_log_db():
with log_db.connection.cursor() as cursor:
cursor.execute('''
CREATE TABLE IF NOT EXISTS file_access_log (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMP,
rel_path TEXT,
filesize BIGINT,
mime TEXT,
ip_address TEXT,
user_agent TEXT,
device_id TEXT,
cached BOOLEAN
)
''')
# Logging function that uses the singleton connection.
def log_file_access(rel_path, filesize, mime, ip_address, user_agent, device_id, cached):
timestamp = datetime.now() # Use datetime object directly
with log_db.connection.cursor() as cursor:
cursor.execute('''
INSERT INTO file_access_log (timestamp, rel_path, filesize, mime, ip_address, user_agent, device_id, cached)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
''', (timestamp, rel_path, filesize, mime, ip_address, user_agent, device_id, cached))
return timestamp.isoformat()
def return_file_access():
global file_access_temp
if file_access_temp:
cutoff_time = datetime.now() - timedelta(minutes=10)
file_access_temp[:] = [
entry for entry in file_access_temp
if datetime.fromisoformat(entry[0]) >= cutoff_time
]
return file_access_temp
else:
return []
@require_secret
def connections():
return render_template('connections.html')
@require_secret
def dashboard():
timeframe = request.args.get('timeframe', 'today')
now = datetime.now()
if timeframe == 'today':
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
elif timeframe == '7days':
start = now - timedelta(days=7)
elif timeframe == '30days':
start = now - timedelta(days=30)
elif timeframe == '365days':
start = now - timedelta(days=365)
else:
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
with log_db.connection.cursor() as cursor:
# Raw file access counts for the table (top files)
cursor.execute('''
SELECT rel_path, COUNT(*) as access_count
FROM file_access_log
WHERE timestamp >= %s
GROUP BY rel_path
ORDER BY access_count DESC
LIMIT 20
''', (start,))
rows = cursor.fetchall()
# Daily access trend for a line chart
cursor.execute('''
SELECT CAST(timestamp AS DATE) as date, COUNT(*) as count
FROM file_access_log
WHERE timestamp >= %s
GROUP BY CAST(timestamp AS DATE)
ORDER BY date
''', (start,))
daily_access_data = [dict(date=str(row[0]), count=row[1]) for row in cursor.fetchall()]
# Aggregate download counts by time bucket according to the timeframe.
if timeframe == 'today':
# Group by hour using to_char
cursor.execute('''
SELECT to_char(timestamp, 'HH24') as bucket, COUNT(*) as count
FROM file_access_log
WHERE timestamp >= %s
GROUP BY bucket
ORDER BY bucket
''', (start,))
elif timeframe in ('7days', '30days'):
# Group by day
cursor.execute('''
SELECT CAST(timestamp AS DATE) as bucket, COUNT(*) as count
FROM file_access_log
WHERE timestamp >= %s
GROUP BY bucket
ORDER BY bucket
''', (start,))
elif timeframe == '365days':
# Group by month using to_char
cursor.execute('''
SELECT to_char(timestamp, 'YYYY-MM') as bucket, COUNT(*) as count
FROM file_access_log
WHERE timestamp >= %s
GROUP BY bucket
ORDER BY bucket
''', (start,))
else:
# Fallback: group by day
cursor.execute('''
SELECT CAST(timestamp AS DATE) as bucket, COUNT(*) as count
FROM file_access_log
WHERE timestamp >= %s
GROUP BY bucket
ORDER BY bucket
''', (start,))
timeframe_data = [dict(bucket=row[0], count=row[1]) for row in cursor.fetchall()]
# User agent distribution (aggregate by device type)
cursor.execute('''
SELECT user_agent, COUNT(*) as count
FROM file_access_log
WHERE timestamp >= %s
GROUP BY user_agent
ORDER BY count DESC
''', (start,))
raw_user_agents = [dict(user_agent=row[0], count=row[1]) for row in cursor.fetchall()]
device_counts = {}
for entry in raw_user_agents:
device = get_device_type(entry['user_agent'])
device_counts[device] = device_counts.get(device, 0) + entry['count']
user_agent_data = [dict(device=device, count=count) for device, count in device_counts.items()]
# Parent folder distribution
cursor.execute('''
SELECT rel_path, COUNT(*) as count
FROM file_access_log
WHERE timestamp >= %s
GROUP BY rel_path
ORDER BY count DESC
''', (start,))
folder_data = {}
for row in cursor.fetchall():
rel_path = row[0]
parent_folder = rel_path.rsplit('/', 1)[0] if '/' in rel_path else "Root"
folder_data[parent_folder] = folder_data.get(parent_folder, 0) + row[1]
folder_data = [dict(folder=folder, count=count) for folder, count in folder_data.items()]
folder_data.sort(key=lambda x: x['count'], reverse=True)
folder_data = folder_data[:10]
# Aggregate IP addresses with counts
cursor.execute('''
SELECT ip_address, COUNT(*) as count
FROM file_access_log
WHERE timestamp >= %s
GROUP BY ip_address
ORDER BY count DESC
''', (start,))
ip_rows = cursor.fetchall()
# Summary stats using separate SQL queries
cursor.execute('SELECT COUNT(*) FROM file_access_log WHERE timestamp >= %s', (start,))
total_accesses = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(DISTINCT rel_path) FROM file_access_log WHERE timestamp >= %s', (start,))
unique_files = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(DISTINCT device_id) FROM file_access_log WHERE timestamp >= %s', (start,))
unique_user = cursor.fetchone()[0]
# Process location data with GeoIP2.
reader = geoip2.database.Reader('GeoLite2-City.mmdb')
location_data = {}
for ip, count in ip_rows:
country, city = lookup_location(ip, reader)
key = (country, city)
location_data[key] = location_data.get(key, 0) + count
reader.close()
location_data = [dict(country=key[0], city=key[1], count=value) for key, value in location_data.items()]
location_data.sort(key=lambda x: x['count'], reverse=True)
location_data = location_data[:20]
return render_template("dashboard.html",
timeframe=timeframe,
rows=rows,
daily_access_data=daily_access_data,
user_agent_data=user_agent_data,
folder_data=folder_data,
location_data=location_data,
total_accesses=total_accesses,
unique_files=unique_files,
unique_user=unique_user,
timeframe_data=timeframe_data)
if __name__ == '__main__':
init_log_db()