bethaus-app/app.py
2025-03-17 21:37:03 +00:00

270 lines
10 KiB
Python
Executable File

from flask import Flask, render_template, send_file, url_for, jsonify, request, session, send_from_directory
import os
from PIL import Image
import io
from functools import wraps
import mimetypes
from datetime import datetime, date
from urllib.parse import unquote
import diskcache
import json
cache = diskcache.Cache('./filecache', size_limit= 32 * 1024**3) # 32 GB limit
app = Flask(__name__)
# Use a raw string for the default FILE_ROOT path.
app.config['FILE_ROOT'] = r'/mp3_root'
app.config['SECRET_KEY'] = '85c1117eb3a5f2c79f0ff395bada8ff8d9a257b99ef5e143'
def load_allowed_secrets(filename='allowed_secrets.json'):
with open(filename) as f:
secrets = json.load(f)
for key, value in secrets.items():
if 'expiry' in value:
value['expiry'] = datetime.strptime(value['expiry'], '%d.%m.%Y').date()
return secrets
app.config['ALLOWED_SECRETS'] = load_allowed_secrets()
def require_secret(f):
@wraps(f)
def decorated_function(*args, **kwargs):
allowed_secrets = app.config['ALLOWED_SECRETS']
today = date.today()
def is_valid(secret_data):
expiry_date = secret_data.get('expiry')
return expiry_date and today <= expiry_date
# Check if a secret was provided via GET parameter
get_secret = request.args.get('secret')
if get_secret is not None:
secret_data = allowed_secrets.get(get_secret)
if secret_data:
if is_valid(secret_data):
# Valid secret provided in URL: update session and config
session['secret'] = get_secret
app.config['FILE_ROOT'] = secret_data.get('file_root')
return f(*args, **kwargs)
else:
# Secret provided via URL is expired or invalid
return render_template('error.html', message="Invalid or expired secret."), 403
# If no secret provided via GET, check the session
session_secret = session.get('secret')
if session_secret is not None:
secret_data = allowed_secrets.get(session_secret)
if secret_data:
if is_valid(secret_data):
app.config['FILE_ROOT'] = secret_data.get('file_root')
return f(*args, **kwargs)
else:
# Session secret exists but is expired
return render_template('error.html', message="Invalid or expired secret."), 403
# No secret provided at all; show the public index page
return render_template('index.html')
return decorated_function
@app.route('/static/icons/<string:size>.png')
def serve_resized_icon(size):
dimensions = tuple(map(int, size.split('-')[1].split('x')))
original_logo_path = os.path.join(app.root_path, 'static', 'logo.png')
with Image.open(original_logo_path) as img:
img = img.convert("RGBA")
# Original image dimensions
orig_width, orig_height = img.size
# Check if requested dimensions exceed original dimensions
if dimensions[0] >= orig_width and dimensions[1] >= orig_height:
# Requested size is larger or equal; return original image
return send_file(original_logo_path, mimetype='image/png')
# Otherwise, resize
resized_img = img.copy()
resized_img.thumbnail(dimensions, Image.LANCZOS)
# Save resized image to bytes
resized_bytes = io.BytesIO()
resized_img.save(resized_bytes, format='PNG')
resized_bytes.seek(0)
return send_file(resized_bytes, mimetype='image/png')
@app.route('/sw.js')
def serve_sw():
return send_from_directory(os.path.join(app.root_path, 'static'), 'sw.js', mimetype='application/javascript')
def list_directory_contents(directory, subpath):
"""
List only the immediate contents of the given directory.
Also, if a "Transkription" subfolder exists, check for matching .md files for music files.
Skip folders that start with a dot.
"""
directories = []
files = []
transcription_dir = os.path.join(directory, "Transkription")
transcription_exists = os.path.isdir(transcription_dir)
# Define allowed file extensions.
allowed_music_exts = ('.mp3',)
allowed_image_exts = ('.jpg', '.jpeg', '.png', '.gif', '.bmp')
try:
for item in sorted(os.listdir(directory)):
# Skip hidden folders and files starting with a dot.
if item.startswith('.'):
continue
full_path = os.path.join(directory, item)
# Process directories.
if os.path.isdir(full_path):
# Also skip the "Transkription" folder.
if item.lower() == "transkription":
continue
rel_path = os.path.join(subpath, item) if subpath else item
rel_path = rel_path.replace(os.sep, '/')
directories.append({'name': item, 'path': rel_path})
# Process files: either music or image files.
elif os.path.isfile(full_path) and (
item.lower().endswith(allowed_music_exts) or item.lower().endswith(allowed_image_exts)
):
rel_path = os.path.join(subpath, item) if subpath else item
rel_path = rel_path.replace(os.sep, '/')
# Determine the file type.
if item.lower().endswith(allowed_music_exts):
file_type = 'music'
else:
file_type = 'image'
file_entry = {'name': item, 'path': rel_path, 'file_type': file_type}
# Only check for transcription if it's a music file.
if file_type == 'music' and transcription_exists:
base_name = os.path.splitext(item)[0]
transcript_filename = base_name + '.md'
transcript_path = os.path.join(transcription_dir, transcript_filename)
if os.path.isfile(transcript_path):
file_entry['has_transcript'] = True
transcript_rel_path = os.path.join(subpath, "Transkription", transcript_filename) if subpath else os.path.join("Transkription", transcript_filename)
transcript_rel_path = transcript_rel_path.replace(os.sep, '/')
file_entry['transcript_url'] = url_for('get_transcript', filename=transcript_rel_path)
else:
file_entry['has_transcript'] = False
else:
file_entry['has_transcript'] = False
files.append(file_entry)
except PermissionError:
pass
return directories, files
def generate_breadcrumbs(subpath):
breadcrumbs = [{'name': 'Home', 'path': ''}]
if subpath:
parts = subpath.split('/')
path_accum = ""
for part in parts:
path_accum = f"{path_accum}/{part}" if path_accum else part
breadcrumbs.append({'name': part, 'path': path_accum})
return breadcrumbs
# API endpoint for AJAX: returns JSON for a given directory.
@app.route('/api/path/', defaults={'subpath': ''})
@app.route('/api/path/<path:subpath>')
@require_secret
def api_browse(subpath):
file_root = app.config['FILE_ROOT']
directory = os.path.join(file_root, subpath.replace('/', os.sep))
if not os.path.isdir(directory):
return jsonify({'error': 'Directory not found'}), 404
directories, files = list_directory_contents(directory, subpath)
breadcrumbs = generate_breadcrumbs(subpath)
return jsonify({
'breadcrumbs': breadcrumbs,
'directories': directories,
'files': files
})
@app.route("/media/<path:filename>")
@require_secret
def serve_file(filename):
decoded_filename = unquote(filename).replace('/', os.sep)
full_path = os.path.normpath(os.path.join(app.config['FILE_ROOT'], decoded_filename))
if not os.path.isfile(full_path):
app.logger.error(f"File not found: {full_path}")
return "File not found", 404
mime, _ = mimetypes.guess_type(full_path)
mime = mime or 'application/octet-stream'
response = None
# Check cache first (using diskcache)
cached = cache.get(filename)
if cached:
app.logger.info(f"Cache hit for {filename}")
cached_file_bytes, mime = cached
cached_file = io.BytesIO(cached_file_bytes)
response = send_file(cached_file, mimetype=mime)
else:
app.logger.info(f"Cache miss for {filename}")
if mime and mime.startswith('image/'):
# Image processing branch (with caching)
try:
with Image.open(full_path) as img:
img.thumbnail((1200, 1200))
img_bytes = io.BytesIO()
img.save(img_bytes, format='PNG', quality=85)
img_bytes = img_bytes.getvalue()
cache.set(filename, (img_bytes, mime))
response = send_file(io.BytesIO(img_bytes), mimetype=mime)
except Exception as e:
app.logger.error(f"Image processing failed for {filename}: {e}")
abort(500)
else:
# Cache non-image files: read bytes and cache
try:
with open(full_path, 'rb') as f:
file_bytes = f.read()
cache.set(filename, (file_bytes, mime))
response = send_file(io.BytesIO(file_bytes), mimetype=mime)
except Exception as e:
app.logger.error(f"Failed to read file {filename}: {e}")
abort(500)
# Set Cache-Control header (browser caching for 1 day)
response.headers['Cache-Control'] = 'public, max-age=86400'
return response
@app.route("/transcript/<path:filename>")
@require_secret
def get_transcript(filename):
fs_filename = filename.replace('/', os.sep)
full_path = os.path.join(app.config['FILE_ROOT'], fs_filename)
if not os.path.isfile(full_path):
return "Transcription not found", 404
with open(full_path, 'r', encoding='utf-8') as f:
content = f.read()
return content, 200, {'Content-Type': 'text/markdown; charset=utf-8'}
# Catch-all route to serve the single-page application template.
@app.route('/', defaults={'path': ''})
@app.route('/<path:path>')
@require_secret
def index(path):
return render_template("browse.html")
if __name__ == "__main__":
app.run(debug=True, host='0.0.0.0')