bethaus-app/app.py
2025-03-17 23:46:54 +00:00

347 lines
13 KiB
Python
Executable File

from flask import Flask, render_template, send_file, url_for, jsonify, request, session, send_from_directory
import os
from PIL import Image
import io
from functools import wraps
import mimetypes
import sqlite3
from datetime import datetime, date, timedelta
from urllib.parse import unquote
import diskcache
import json
cache = diskcache.Cache('./filecache', size_limit= 32 * 1024**3) # 32 GB limit
app = Flask(__name__)
# Use a raw string for the default FILE_ROOT path.
app.config['FILE_ROOT'] = r'/mp3_root'
app.config['SECRET_KEY'] = '85c1117eb3a5f2c79f0ff395bada8ff8d9a257b99ef5e143'
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=90)
app.config['SESSION_COOKIE_SAMESITE'] = 'None'
app.config['SESSION_COOKIE_SECURE'] = True
def load_allowed_secrets(filename='allowed_secrets.json'):
with open(filename) as f:
secrets = json.load(f)
for key, value in secrets.items():
if 'expiry' in value:
value['expiry'] = datetime.strptime(value['expiry'], '%d.%m.%Y').date()
return secrets
app.config['ALLOWED_SECRETS'] = load_allowed_secrets()
def require_secret(f):
@wraps(f)
def decorated_function(*args, **kwargs):
allowed_secrets = app.config['ALLOWED_SECRETS']
today = date.today()
def is_valid(secret_data):
expiry_date = secret_data.get('expiry')
return expiry_date and today <= expiry_date
# Check if a secret was provided via GET parameter
get_secret = request.args.get('secret')
if get_secret is not None:
secret_data = allowed_secrets.get(get_secret)
if secret_data:
if is_valid(secret_data):
# Valid secret provided in URL: update session and config
session['secret'] = get_secret
session.permanent = True
app.config['FILE_ROOT'] = secret_data.get('file_root')
return f(*args, **kwargs)
else:
# Secret provided via URL is expired or invalid
return render_template('error.html', message="Invalid or expired secret."), 403
# If no secret provided via GET, check the session
session_secret = session.get('secret')
if session_secret is not None:
secret_data = allowed_secrets.get(session_secret)
if secret_data:
if is_valid(secret_data):
session.permanent = True
app.config['FILE_ROOT'] = secret_data.get('file_root')
return f(*args, **kwargs)
else:
# Session secret exists but is expired
return render_template('error.html', message="Invalid or expired secret."), 403
# No secret provided at all; show the public index page
return render_template('index.html')
return decorated_function
@app.route('/static/icons/<string:size>.png')
def serve_resized_icon(size):
dimensions = tuple(map(int, size.split('-')[1].split('x')))
original_logo_path = os.path.join(app.root_path, 'static', 'logo.png')
with Image.open(original_logo_path) as img:
img = img.convert("RGBA")
# Original image dimensions
orig_width, orig_height = img.size
# Check if requested dimensions exceed original dimensions
if dimensions[0] >= orig_width and dimensions[1] >= orig_height:
# Requested size is larger or equal; return original image
return send_file(original_logo_path, mimetype='image/png')
# Otherwise, resize
resized_img = img.copy()
resized_img.thumbnail(dimensions, Image.LANCZOS)
# Save resized image to bytes
resized_bytes = io.BytesIO()
resized_img.save(resized_bytes, format='PNG')
resized_bytes.seek(0)
return send_file(resized_bytes, mimetype='image/png')
@app.route('/sw.js')
def serve_sw():
return send_from_directory(os.path.join(app.root_path, 'static'), 'sw.js', mimetype='application/javascript')
def list_directory_contents(directory, subpath):
"""
List only the immediate contents of the given directory.
Also, if a "Transkription" subfolder exists, check for matching .md files for music files.
Skip folders that start with a dot.
"""
directories = []
files = []
transcription_dir = os.path.join(directory, "Transkription")
transcription_exists = os.path.isdir(transcription_dir)
# Define allowed file extensions.
allowed_music_exts = ('.mp3',)
allowed_image_exts = ('.jpg', '.jpeg', '.png', '.gif', '.bmp')
try:
for item in sorted(os.listdir(directory)):
# Skip hidden folders and files starting with a dot.
if item.startswith('.'):
continue
full_path = os.path.join(directory, item)
# Process directories.
if os.path.isdir(full_path):
# Also skip the "Transkription" folder.
if item.lower() == "transkription":
continue
rel_path = os.path.join(subpath, item) if subpath else item
rel_path = rel_path.replace(os.sep, '/')
directories.append({'name': item, 'path': rel_path})
# Process files: either music or image files.
elif os.path.isfile(full_path) and (
item.lower().endswith(allowed_music_exts) or item.lower().endswith(allowed_image_exts)
):
rel_path = os.path.join(subpath, item) if subpath else item
rel_path = rel_path.replace(os.sep, '/')
# Determine the file type.
if item.lower().endswith(allowed_music_exts):
file_type = 'music'
else:
file_type = 'image'
file_entry = {'name': item, 'path': rel_path, 'file_type': file_type}
# Only check for transcription if it's a music file.
if file_type == 'music' and transcription_exists:
base_name = os.path.splitext(item)[0]
transcript_filename = base_name + '.md'
transcript_path = os.path.join(transcription_dir, transcript_filename)
if os.path.isfile(transcript_path):
file_entry['has_transcript'] = True
transcript_rel_path = os.path.join(subpath, "Transkription", transcript_filename) if subpath else os.path.join("Transkription", transcript_filename)
transcript_rel_path = transcript_rel_path.replace(os.sep, '/')
file_entry['transcript_url'] = url_for('get_transcript', filename=transcript_rel_path)
else:
file_entry['has_transcript'] = False
else:
file_entry['has_transcript'] = False
files.append(file_entry)
except PermissionError:
pass
return directories, files
def generate_breadcrumbs(subpath):
breadcrumbs = [{'name': 'Home', 'path': ''}]
if subpath:
parts = subpath.split('/')
path_accum = ""
for part in parts:
path_accum = f"{path_accum}/{part}" if path_accum else part
breadcrumbs.append({'name': part, 'path': path_accum})
return breadcrumbs
# API endpoint for AJAX: returns JSON for a given directory.
@app.route('/api/path/', defaults={'subpath': ''})
@app.route('/api/path/<path:subpath>')
@require_secret
def api_browse(subpath):
file_root = app.config['FILE_ROOT']
directory = os.path.join(file_root, subpath.replace('/', os.sep))
if not os.path.isdir(directory):
return jsonify({'error': 'Directory not found'}), 404
directories, files = list_directory_contents(directory, subpath)
breadcrumbs = generate_breadcrumbs(subpath)
return jsonify({
'breadcrumbs': breadcrumbs,
'directories': directories,
'files': files
})
@app.route("/access-log")
@require_secret
def access_log():
# Get timeframe filter from query parameter; default to "today"
timeframe = request.args.get('timeframe', 'today')
now = datetime.now()
# Determine the start time based on the requested timeframe
if timeframe == 'today':
# Beginning of today
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
elif timeframe == '7days':
start = now - timedelta(days=7)
elif timeframe == '30days':
start = now - timedelta(days=30)
elif timeframe == '365days':
start = now - timedelta(days=365)
else:
# Default to today if an unknown timeframe is passed
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
# Query the access log database for file access counts since the 'start' time
conn = sqlite3.connect('access_log.db')
cursor = conn.cursor()
cursor.execute('''
SELECT full_path, COUNT(*) as access_count
FROM file_access_log
WHERE timestamp >= ?
GROUP BY full_path
ORDER BY access_count DESC
''', (start.isoformat(),))
rows = cursor.fetchall()
conn.close()
return render_template("access_log.html", rows=rows, timeframe=timeframe)
def log_file_access(full_path):
"""
Log file access details to a SQLite database.
Records the timestamp, full file path, client IP, user agent, and referrer.
"""
# Connect to the database (this will create the file if it doesn't exist)
conn = sqlite3.connect('access_log.db')
cursor = conn.cursor()
# Create the table if it doesn't exist
cursor.execute('''
CREATE TABLE IF NOT EXISTS file_access_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
full_path TEXT,
ip_address TEXT,
user_agent TEXT,
referrer TEXT
)
''')
# Gather information from the request
timestamp = datetime.now().isoformat()
ip_address = request.remote_addr
user_agent = request.headers.get('User-Agent')
referrer = request.headers.get('Referer')
# Insert the access record into the database
cursor.execute('''
INSERT INTO file_access_log (timestamp, full_path, ip_address, user_agent, referrer)
VALUES (?, ?, ?, ?, ?)
''', (timestamp, full_path, ip_address, user_agent, referrer))
conn.commit()
conn.close()
@app.route("/media/<path:filename>")
@require_secret
def serve_file(filename):
decoded_filename = unquote(filename).replace('/', os.sep)
full_path = os.path.normpath(os.path.join(app.config['FILE_ROOT'], decoded_filename))
if not os.path.isfile(full_path):
app.logger.error(f"File not found: {full_path}")
return "File not found", 404
# Exclude HEAD requests from logging
if request.method != 'HEAD':
log_file_access(full_path)
mime, _ = mimetypes.guess_type(full_path)
mime = mime or 'application/octet-stream'
response = None
# Check cache first (using diskcache)
cached = cache.get(filename)
if cached:
cached_file_bytes, mime = cached
cached_file = io.BytesIO(cached_file_bytes)
response = send_file(cached_file, mimetype=mime)
else:
if mime and mime.startswith('image/'):
# Image processing branch (with caching)
try:
with Image.open(full_path) as img:
img.thumbnail((1200, 1200))
img_bytes = io.BytesIO()
img.save(img_bytes, format='PNG', quality=85)
img_bytes = img_bytes.getvalue()
cache.set(filename, (img_bytes, mime))
response = send_file(io.BytesIO(img_bytes), mimetype=mime)
except Exception as e:
app.logger.error(f"Image processing failed for {filename}: {e}")
abort(500)
else:
# Cache non-image files: read bytes and cache
try:
with open(full_path, 'rb') as f:
file_bytes = f.read()
cache.set(filename, (file_bytes, mime))
response = send_file(io.BytesIO(file_bytes), mimetype=mime)
except Exception as e:
app.logger.error(f"Failed to read file {filename}: {e}")
abort(500)
# Set Cache-Control header (browser caching for 1 day)
response.headers['Cache-Control'] = 'public, max-age=86400'
return response
@app.route("/transcript/<path:filename>")
@require_secret
def get_transcript(filename):
fs_filename = filename.replace('/', os.sep)
full_path = os.path.join(app.config['FILE_ROOT'], fs_filename)
if not os.path.isfile(full_path):
return "Transcription not found", 404
with open(full_path, 'r', encoding='utf-8') as f:
content = f.read()
return content, 200, {'Content-Type': 'text/markdown; charset=utf-8'}
# Catch-all route to serve the single-page application template.
@app.route('/', defaults={'path': ''})
@app.route('/<path:path>')
@require_secret
def index(path):
return render_template("browse.html")
if __name__ == "__main__":
app.run(debug=True, host='0.0.0.0')