bethaus-app/analytics.py
2025-03-23 12:55:49 +00:00

226 lines
7.9 KiB
Python

from flask import render_template, request, session
import sqlite3
from datetime import datetime, date, timedelta
import geoip2.database
from urllib.parse import urlparse, unquote
from auth import require_secret
file_access_temp = []
def lookup_location(ip, reader):
try:
response = reader.city(ip)
country = response.country.name if response.country.name else "Unknown"
city = response.city.name if response.city.name else "Unknown"
return country, city
except Exception:
return "Unknown", "Unknown"
def get_device_type(user_agent):
"classify device type based on user agent string"
if 'Android' in user_agent:
return 'Android'
elif 'iPhone' in user_agent or 'iPad' in user_agent:
return 'iOS'
elif 'Windows' in user_agent:
return 'Windows'
elif 'Macintosh' in user_agent or 'Mac OS' in user_agent:
return 'MacOS'
elif 'Linux' in user_agent:
return 'Linux'
else:
return 'Other'
def shorten_referrer(url):
segments = [seg for seg in url.split('/') if seg]
segment = segments[-1]
# Decode all percent-encoded characters (like %20, %2F, etc.)
segment_decoded = unquote(segment)
return segment_decoded
def log_file_access(full_path, ip_address, user_agent, referrer):
"""
Log file access details to a SQLite database.
Records the timestamp, full file path, client IP, user agent, and referrer.
"""
global file_access_temp
# Connect to the database (this will create the file if it doesn't exist)
conn = sqlite3.connect('access_log.db')
cursor = conn.cursor()
# Create the table if it doesn't exist
cursor.execute('''
CREATE TABLE IF NOT EXISTS file_access_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
full_path TEXT,
ip_address TEXT,
user_agent TEXT,
referrer TEXT
)
''')
# Gather information from the request
timestamp = datetime.now().isoformat()
# Insert the access record into the database
cursor.execute('''
INSERT INTO file_access_log (timestamp, full_path, ip_address, user_agent, referrer)
VALUES (?, ?, ?, ?, ?)
''', (timestamp, full_path, ip_address, user_agent, referrer))
conn.commit()
conn.close()
file_access_temp.insert(0, [timestamp, full_path, ip_address, user_agent, referrer])
return return_file_access()
def return_file_access():
global file_access_temp
if len(file_access_temp) > 0:
# Compute the cutoff time (10 minutes ago from now)
cutoff_time = datetime.now() - timedelta(minutes=10)
# Update the list in-place to keep only entries newer than 10 minutes
file_access_temp[:] = [
entry for entry in file_access_temp
if datetime.fromisoformat(entry[0]) >= cutoff_time
]
return file_access_temp
else:
return []
@require_secret
def network():
return render_template('network.html')
@require_secret
def dashboard():
timeframe = request.args.get('timeframe', 'today')
now = datetime.now()
if timeframe == 'today':
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
elif timeframe == '7days':
start = now - timedelta(days=7)
elif timeframe == '30days':
start = now - timedelta(days=30)
elif timeframe == '365days':
start = now - timedelta(days=365)
else:
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
conn = sqlite3.connect('access_log.db')
cursor = conn.cursor()
# Raw file access counts for the table (top files)
cursor.execute('''
SELECT full_path, COUNT(*) as access_count
FROM file_access_log
WHERE timestamp >= ?
GROUP BY full_path
ORDER BY access_count DESC
LIMIT 20
''', (start.isoformat(),))
rows = cursor.fetchall()
# Daily access trend for a line chart
cursor.execute('''
SELECT date(timestamp) as date, COUNT(*) as count
FROM file_access_log
WHERE timestamp >= ?
GROUP BY date
ORDER BY date
''', (start.isoformat(),))
daily_access_data = [dict(date=row[0], count=row[1]) for row in cursor.fetchall()]
# Top files for bar chart
cursor.execute('''
SELECT full_path, COUNT(*) as access_count
FROM file_access_log
WHERE timestamp >= ?
GROUP BY full_path
ORDER BY access_count DESC
LIMIT 10
''', (start.isoformat(),))
top_files_data = [dict(full_path=row[0], access_count=row[1]) for row in cursor.fetchall()]
# User agent distribution (aggregate by device type)
cursor.execute('''
SELECT user_agent, COUNT(*) as count
FROM file_access_log
WHERE timestamp >= ?
GROUP BY user_agent
ORDER BY count DESC
''', (start.isoformat(),))
raw_user_agents = [dict(user_agent=row[0], count=row[1]) for row in cursor.fetchall()]
device_counts = {}
for entry in raw_user_agents:
device = get_device_type(entry['user_agent'])
device_counts[device] = device_counts.get(device, 0) + entry['count']
# Rename to user_agent_data for compatibility with the frontend
user_agent_data = [dict(device=device, count=count) for device, count in device_counts.items()]
# Referrer distribution (shorten links)
cursor.execute('''
SELECT referrer, COUNT(*) as count
FROM file_access_log
WHERE timestamp >= ?
GROUP BY referrer
ORDER BY count DESC
LIMIT 10
''', (start.isoformat(),))
referrer_data = []
for row in cursor.fetchall():
raw_ref = row[0]
shortened = shorten_referrer(raw_ref) if raw_ref else "Direct/None"
referrer_data.append(dict(referrer=shortened, count=row[1]))
# Aggregate IP addresses with counts
cursor.execute('''
SELECT ip_address, COUNT(*) as count
FROM file_access_log
WHERE timestamp >= ?
GROUP BY ip_address
ORDER BY count DESC
LIMIT 20
''', (start.isoformat(),))
ip_rows = cursor.fetchall()
# Initialize GeoIP2 reader once for efficiency
reader = geoip2.database.Reader('GeoLite2-City.mmdb')
ip_data = []
for ip, count in ip_rows:
country, city = lookup_location(ip, reader)
ip_data.append(dict(ip=ip, count=count, country=country, city=city))
reader.close()
# Aggregate by city (ignoring entries without a city)
city_counts = {}
for entry in ip_data:
if entry['city']:
city_counts[entry['city']] = city_counts.get(entry['city'], 0) + entry['count']
city_data = [dict(city=city, count=count) for city, count in city_counts.items()]
# Summary stats using separate SQL queries
cursor.execute('SELECT COUNT(*) FROM file_access_log WHERE timestamp >= ?', (start.isoformat(),))
total_accesses = cursor.fetchone()[0]
# Use a separate query to count unique files (distinct full_path values)
cursor.execute('SELECT COUNT(DISTINCT full_path) FROM file_access_log WHERE timestamp >= ?', (start.isoformat(),))
unique_files = cursor.fetchone()[0]
# Use a separate query to count unique IP addresses
cursor.execute('SELECT COUNT(DISTINCT ip_address) FROM file_access_log WHERE timestamp >= ?', (start.isoformat(),))
unique_ips = cursor.fetchone()[0]
conn.close()
return render_template("dashboard.html",
timeframe=timeframe,
rows=rows,
daily_access_data=daily_access_data,
top_files_data=top_files_data,
user_agent_data=user_agent_data,
referrer_data=referrer_data,
ip_data=ip_data,
city_data=city_data,
total_accesses=total_accesses,
unique_files=unique_files,
unique_ips=unique_ips)