bethaus-app/app.py

569 lines
21 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, render_template, send_file, url_for, jsonify, request, session, send_from_directory, abort
import os
from PIL import Image
import io
from functools import wraps
import mimetypes
import sqlite3
from datetime import datetime, date, timedelta
import diskcache
import json
import geoip2.database
from functools import lru_cache
from urllib.parse import urlparse, unquote
from werkzeug.middleware.proxy_fix import ProxyFix
cache = diskcache.Cache('./filecache', size_limit= 48 * 1024**3) # 48 GB limit
app = Flask(__name__)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1)
app.config['SECRET_KEY'] = '85c1117eb3a5f2c79f0ff395bada8ff8d9a257b99ef5e143'
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=90)
if os.environ.get('FLASK_ENV') == 'production':
app.config['SESSION_COOKIE_SAMESITE'] = 'None'
app.config['SESSION_COOKIE_SECURE'] = True
with open('folder_config.json') as file:
app.config['folder_config'] = json.load(file)
def require_secret(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Your config list:
folder_config = app.config['folder_config']
def is_valid(config_item, provided_secret):
"""
Checks if today's date is <= validity date
AND if the provided secret matches config_item['secret'].
"""
folder_validity = config_item['validity']
# Convert string to a date if necessary:
if isinstance(folder_validity, str):
folder_validity = datetime.strptime(folder_validity, '%d.%m.%Y').date()
# Return whether it's still valid and secrets match:
return (
date.today() <= folder_validity and
provided_secret == config_item['secret']
)
# 1) Get secret from query params (if any)
args_secret = request.args.get('secret')
# 2) Initialize 'allowed_secrets' in the session if missing
if 'allowed_secrets' not in session:
session['allowed_secrets'] = []
# 3) If a new secret is provided, check if its valid, and add to session if so
if args_secret:
for config_item in folder_config:
if is_valid(config_item, args_secret):
if args_secret not in session['allowed_secrets']:
session['allowed_secrets'].append(args_secret)
session.permanent = True # Make the session permanent
# 4) Re-check validity of each secret in session['allowed_secrets']
# If a secret is no longer valid (or not in config), remove it.
for secret_in_session in session['allowed_secrets'][:]:
# Find the current config item with matching secret
config_item = next(
(c for c in folder_config if c['secret'] == secret_in_session),
None
)
# If the config item doesnt exist or is invalid, remove secret
if config_item is None or not is_valid(config_item, secret_in_session):
session['allowed_secrets'].remove(secret_in_session)
# 5) Build session['folders'] fresh from the valid secrets
session['folders'] = {}
for secret_in_session in session.get('allowed_secrets', []):
config_item = next(
(c for c in folder_config if c['secret'] == secret_in_session),
None
)
if config_item:
for folder_info in config_item['folders']:
session['folders'][folder_info['foldername']] = folder_info['folderpath']
# 6) If we have folders, proceed; otherwise show index
if session['folders']:
return f(*args, **kwargs)
else:
return render_template('index.html')
return decorated_function
@lru_cache(maxsize=10)
def get_cached_image(size):
dimensions = tuple(map(int, size.split('-')[1].split('x')))
original_logo_path = os.path.join(app.root_path, 'static', 'logo.png')
with Image.open(original_logo_path) as img:
img = img.convert("RGBA")
orig_width, orig_height = img.size
if dimensions[0] >= orig_width and dimensions[1] >= orig_height:
resized_img = img
else:
resized_img = img.copy()
resized_img.thumbnail(dimensions, Image.LANCZOS)
img_byte_arr = io.BytesIO()
resized_img.save(img_byte_arr, format='PNG')
return img_byte_arr.getvalue()
@app.route('/static/icons/<string:size>.png')
def serve_resized_icon(size):
cached_image_bytes = get_cached_image(size)
return send_file(
io.BytesIO(cached_image_bytes),
mimetype='image/png'
)
@app.route('/sw.js')
def serve_sw():
return send_from_directory(os.path.join(app.root_path, 'static'), 'sw.js', mimetype='application/javascript')
def list_directory_contents(directory, subpath):
"""
List only the immediate contents of the given directory.
Also, if a "Transkription" subfolder exists, check for matching .md files for music files.
Skip folders that start with a dot.
"""
directories = []
files = []
transcription_dir = os.path.join(directory, "Transkription")
transcription_exists = os.path.isdir(transcription_dir)
# Define allowed file extensions.
allowed_music_exts = ('.mp3',)
allowed_image_exts = ('.jpg', '.jpeg', '.png', '.gif', '.bmp')
try:
for item in sorted(os.listdir(directory)):
# Skip hidden folders and files starting with a dot.
if item.startswith('.'):
continue
full_path = os.path.join(directory, item)
# Process directories.
if os.path.isdir(full_path):
# skip folder
skip_folder = ["Transkription", "@eaDir"]
if item in skip_folder:
continue
rel_path = os.path.join(subpath, item) if subpath else item
rel_path = rel_path.replace(os.sep, '/')
directories.append({'name': item, 'path': rel_path})
# Process files: either music or image files.
elif os.path.isfile(full_path) and (
item.lower().endswith(allowed_music_exts) or item.lower().endswith(allowed_image_exts)
):
rel_path = os.path.join(subpath, item) if subpath else item
rel_path = rel_path.replace(os.sep, '/')
# Determine the file type.
if item.lower().endswith(allowed_music_exts):
file_type = 'music'
else:
file_type = 'image'
file_entry = {'name': item, 'path': rel_path, 'file_type': file_type}
# Only check for transcription if it's a music file.
if file_type == 'music' and transcription_exists:
base_name = os.path.splitext(item)[0]
transcript_filename = base_name + '.md'
transcript_path = os.path.join(transcription_dir, transcript_filename)
if os.path.isfile(transcript_path):
file_entry['has_transcript'] = True
transcript_rel_path = os.path.join(subpath, "Transkription", transcript_filename) if subpath else os.path.join("Transkription", transcript_filename)
transcript_rel_path = transcript_rel_path.replace(os.sep, '/')
file_entry['transcript_url'] = url_for('get_transcript', subpath=transcript_rel_path)
else:
file_entry['has_transcript'] = False
else:
file_entry['has_transcript'] = False
files.append(file_entry)
except PermissionError:
pass
return directories, files
def generate_breadcrumbs(subpath=None):
breadcrumbs = [{'name': 'Home', 'path': ''}]
if subpath:
parts = subpath.split('/')
path_accum = ""
for part in parts:
path_accum = f"{path_accum}/{part}" if path_accum else part
breadcrumbs.append({'name': part, 'path': path_accum})
return breadcrumbs
# API endpoint for AJAX: returns JSON for a given directory.
@app.route('/api/path/', defaults={'subpath': ''})
@app.route('/api/path/<path:subpath>')
@require_secret
def api_browse(subpath):
if subpath == '': # root directory
foldernames = []
for foldername, folderpath in session['folders'].items():
foldernames.append({'name': foldername, 'path': foldername})
return jsonify({
'breadcrumbs': generate_breadcrumbs(),
'directories': foldernames,
'files': []
})
root, *relative_parts = subpath.split('/')
base_path = session['folders'][root]
directory = os.path.join(base_path, *relative_parts)
if not os.path.isdir(directory):
return jsonify({'error': 'Directory not found'}), 404
directories, files = list_directory_contents(directory, subpath)
breadcrumbs = generate_breadcrumbs(subpath)
return jsonify({
'breadcrumbs': breadcrumbs,
'directories': directories,
'files': files
})
def lookup_location(ip, reader):
try:
response = reader.city(ip)
country = response.country.name if response.country.name else "Unknown"
city = response.city.name if response.city.name else "Unknown"
return country, city
except Exception:
return "Unknown", "Unknown"
# Helper function to classify device type based on user agent string
def get_device_type(user_agent):
if 'Android' in user_agent:
return 'Android'
elif 'iPhone' in user_agent or 'iPad' in user_agent:
return 'iOS'
elif 'Windows' in user_agent:
return 'Windows'
elif 'Macintosh' in user_agent or 'Mac OS' in user_agent:
return 'MacOS'
elif 'Linux' in user_agent:
return 'Linux'
else:
return 'Other'
def shorten_referrer(url):
segments = [seg for seg in url.split('/') if seg]
segment = segments[-1]
# Decode all percent-encoded characters (like %20, %2F, etc.)
segment_decoded = unquote(segment)
return segment_decoded
@app.route("/dashboard")
@require_secret
def dashboard():
timeframe = request.args.get('timeframe', 'today')
now = datetime.now()
if timeframe == 'today':
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
elif timeframe == '7days':
start = now - timedelta(days=7)
elif timeframe == '30days':
start = now - timedelta(days=30)
elif timeframe == '365days':
start = now - timedelta(days=365)
else:
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
conn = sqlite3.connect('access_log.db')
cursor = conn.cursor()
# Raw file access counts for the table (top files)
cursor.execute('''
SELECT full_path, COUNT(*) as access_count
FROM file_access_log
WHERE timestamp >= ?
GROUP BY full_path
ORDER BY access_count DESC
LIMIT 20
''', (start.isoformat(),))
rows = cursor.fetchall()
# Daily access trend for a line chart
cursor.execute('''
SELECT date(timestamp) as date, COUNT(*) as count
FROM file_access_log
WHERE timestamp >= ?
GROUP BY date
ORDER BY date
''', (start.isoformat(),))
daily_access_data = [dict(date=row[0], count=row[1]) for row in cursor.fetchall()]
# Top files for bar chart (limit to 10)
cursor.execute('''
SELECT full_path, COUNT(*) as access_count
FROM file_access_log
WHERE timestamp >= ?
GROUP BY full_path
ORDER BY access_count DESC
LIMIT 10
''', (start.isoformat(),))
top_files_data = [dict(full_path=row[0], access_count=row[1]) for row in cursor.fetchall()]
# User agent distribution (aggregate by device type)
cursor.execute('''
SELECT user_agent, COUNT(*) as count
FROM file_access_log
WHERE timestamp >= ?
GROUP BY user_agent
ORDER BY count DESC
''', (start.isoformat(),))
raw_user_agents = [dict(user_agent=row[0], count=row[1]) for row in cursor.fetchall()]
device_counts = {}
for entry in raw_user_agents:
device = get_device_type(entry['user_agent'])
device_counts[device] = device_counts.get(device, 0) + entry['count']
# Rename to user_agent_data for compatibility with the frontend
user_agent_data = [dict(device=device, count=count) for device, count in device_counts.items()]
# Referrer distribution (shorten links)
cursor.execute('''
SELECT referrer, COUNT(*) as count
FROM file_access_log
WHERE timestamp >= ?
GROUP BY referrer
ORDER BY count DESC
LIMIT 10
''', (start.isoformat(),))
referrer_data = []
for row in cursor.fetchall():
raw_ref = row[0]
shortened = shorten_referrer(raw_ref) if raw_ref else "Direct/None"
referrer_data.append(dict(referrer=shortened, count=row[1]))
# Aggregate IP addresses with counts
cursor.execute('''
SELECT ip_address, COUNT(*) as count
FROM file_access_log
WHERE timestamp >= ?
GROUP BY ip_address
ORDER BY count DESC
LIMIT 20
''', (start.isoformat(),))
ip_rows = cursor.fetchall()
# Initialize GeoIP2 reader once for efficiency
reader = geoip2.database.Reader('GeoLite2-City.mmdb')
ip_data = []
for ip, count in ip_rows:
country, city = lookup_location(ip, reader)
ip_data.append(dict(ip=ip, count=count, country=country, city=city))
reader.close()
# Aggregate by city (ignoring entries without a city)
city_counts = {}
for entry in ip_data:
if entry['city']:
city_counts[entry['city']] = city_counts.get(entry['city'], 0) + entry['count']
city_data = [dict(city=city, count=count) for city, count in city_counts.items()]
# Summary stats
total_accesses = sum([row[1] for row in rows])
unique_files = len(rows)
cursor.execute('SELECT COUNT(DISTINCT ip_address) FROM file_access_log WHERE timestamp >= ?', (start.isoformat(),))
unique_ips = cursor.fetchone()[0]
conn.close()
return render_template("dashboard.html",
timeframe=timeframe,
rows=rows,
daily_access_data=daily_access_data,
top_files_data=top_files_data,
user_agent_data=user_agent_data,
referrer_data=referrer_data,
ip_data=ip_data,
city_data=city_data,
total_accesses=total_accesses,
unique_files=unique_files,
unique_ips=unique_ips)
def log_file_access(full_path):
"""
Log file access details to a SQLite database.
Records the timestamp, full file path, client IP, user agent, and referrer.
"""
# Connect to the database (this will create the file if it doesn't exist)
conn = sqlite3.connect('access_log.db')
cursor = conn.cursor()
# Create the table if it doesn't exist
cursor.execute('''
CREATE TABLE IF NOT EXISTS file_access_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
full_path TEXT,
ip_address TEXT,
user_agent TEXT,
referrer TEXT
)
''')
# Gather information from the request
timestamp = datetime.now().isoformat()
ip_address = request.remote_addr
user_agent = request.headers.get('User-Agent')
referrer = request.headers.get('Referer')
# Insert the access record into the database
cursor.execute('''
INSERT INTO file_access_log (timestamp, full_path, ip_address, user_agent, referrer)
VALUES (?, ?, ?, ?, ?)
''', (timestamp, full_path, ip_address, user_agent, referrer))
conn.commit()
conn.close()
@app.route("/media/<path:subpath>")
@require_secret
def serve_file(subpath):
root, *relative_parts = subpath.split('/')
base_path = session['folders'][root]
full_path = os.path.join(base_path, *relative_parts)
if not os.path.isfile(full_path):
app.logger.error(f"File not found: {full_path}")
return "File not found", 404
mime, _ = mimetypes.guess_type(full_path)
mime = mime or 'application/octet-stream'
if mime and mime.startswith('image/'):
pass # do not log access to images
else:
# HEAD request are coming in to initiate server caching.
# only log initial hits and not the reload of further file parts
range_header = request.headers.get('Range')
if request.method != 'HEAD' and (not range_header or range_header.startswith("bytes=0-")):
log_file_access(full_path)
# Check cache first (using diskcache)
response = None
cached = cache.get(subpath)
if cached:
cached_file_bytes, mime = cached
cached_file = io.BytesIO(cached_file_bytes)
response = send_file(cached_file, mimetype=mime)
else:
if mime and mime.startswith('image/'):
# Image processing branch (with caching)
try:
with Image.open(full_path) as img:
img.thumbnail((1200, 1200))
img_bytes = io.BytesIO()
img.save(img_bytes, format='PNG', quality=85)
img_bytes = img_bytes.getvalue()
cache.set(subpath, (img_bytes, mime))
response = send_file(io.BytesIO(img_bytes), mimetype=mime)
except Exception as e:
app.logger.error(f"Image processing failed for {subpath}: {e}")
abort(500)
else:
# Cache non-image files: read bytes and cache
try:
with open(full_path, 'rb') as f:
file_bytes = f.read()
cache.set(subpath, (file_bytes, mime))
response = send_file(io.BytesIO(file_bytes), mimetype=mime)
except Exception as e:
app.logger.error(f"Failed to read file {subpath}: {e}")
abort(500)
# Set Cache-Control header (browser caching for 1 day)
response.headers['Cache-Control'] = 'public, max-age=86400'
return response
@app.route("/transcript/<path:subpath>")
@require_secret
def get_transcript(subpath):
root, *relative_parts = subpath.split('/')
base_path = session['folders'][root]
full_path = os.path.join(base_path, *relative_parts)
if not os.path.isfile(full_path):
return "Transcription not found", 404
with open(full_path, 'r', encoding='utf-8') as f:
content = f.read()
return content, 200, {'Content-Type': 'text/markdown; charset=utf-8'}
@app.route("/crawl/<path:subpath>")
@require_secret
def crawl_and_cache(subpath):
"""
Crawls through a directory and caches each file.
For images, it creates a thumbnail (max 1200x1200) and caches the processed image.
For non-images, it simply reads and caches the file bytes.
"""
root, *relative_parts = subpath.split('/')
base_path = session['folders'][root]
full_path = os.path.join(base_path, *relative_parts)
cached_files = [] # List to hold cached file relative paths
# Walk through all subdirectories and files
for root, dirs, files in os.walk(full_path):
for filename in files:
full_path_file = os.path.join(root, filename)
# Skip if this file is already in the cache
if cache.get(full_path_file):
continue
# Determine the MIME type
mime, _ = mimetypes.guess_type(full_path)
mime = mime or 'application/octet-stream'
# Process image files differently
if mime.startswith('image/'):
try:
with Image.open(full_path) as img:
# Create a thumbnail (max 1200x1200)
img.thumbnail((1200, 1200))
img_bytes_io = io.BytesIO()
# Save processed image as PNG
img.save(img_bytes_io, format='PNG', quality=85)
img_bytes = img_bytes_io.getvalue()
# Cache the processed image bytes along with its mime type
cache.set(full_path_file, (img_bytes, mime))
cached_files.append(full_path_file)
except Exception as e:
app.logger.error(f"Image processing failed for {full_path_file}: {e}")
else:
# Process non-image files
try:
with open(full_path_file, 'rb') as f:
file_bytes = f.read()
cache.set(full_path_file, (file_bytes, mime))
cached_files.append(full_path_file)
except Exception as e:
app.logger.error(f"Failed to read file {full_path_file}: {e}")
# Return the list of cached files as a JSON response
return json.dumps({"cached_files": cached_files}, indent=4), 200
# Catch-all route to serve the single-page application template.
@app.route('/', defaults={'path': ''})
@app.route('/<path:path>')
@require_secret
def index(path):
return render_template("app.html")
if __name__ == "__main__":
app.run(debug=True, host='0.0.0.0')