bethaus-app/app.py
2025-05-01 19:04:18 +02:00

539 lines
19 KiB
Python
Executable File

import eventlet
eventlet.monkey_patch()
from flask import Flask, render_template, send_file, url_for, jsonify, request, session, send_from_directory, make_response, abort
import os
from PIL import Image, ImageOps
import io
from functools import wraps
import mimetypes
from datetime import datetime, date, timedelta
import diskcache
import threading
import json
import time
from flask_socketio import SocketIO, emit
import geoip2.database
from functools import lru_cache
from urllib.parse import urlparse, unquote
from werkzeug.middleware.proxy_fix import ProxyFix
import re
import search
import auth
import analytics as a
with open("app_config.json", 'r') as file:
app_config = json.load(file)
with open('folder_permission_config.json') as file:
folder_config = json.load(file)
cache_audio = diskcache.Cache('./filecache_audio', size_limit= app_config['filecache_size_limit_audio'] * 1024**3)
cache_image = diskcache.Cache('./filecache_image', size_limit= app_config['filecache_size_limit_image'] * 1024**3)
cache_video = diskcache.Cache('./filecache_video', size_limit= app_config['filecache_size_limit_video'] * 1024**3)
cache_other = diskcache.Cache('./filecache_other', size_limit= app_config['filecache_size_limit_other'] * 1024**3)
app = Flask(__name__)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1)
app.config['SECRET_KEY'] = app_config['SECRET_KEY']
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=90)
if os.environ.get('FLASK_ENV') == 'production':
app.config['SESSION_COOKIE_SAMESITE'] = 'None'
app.config['SESSION_COOKIE_SECURE'] = True
app.add_url_rule('/dashboard', view_func=a.dashboard)
app.add_url_rule('/connections', view_func=a.connections)
app.add_url_rule('/mylinks', view_func=auth.mylinks)
app.add_url_rule('/remove_secret', view_func=auth.remove_secret, methods=['POST'])
app.add_url_rule('/remove_token', view_func=auth.remove_token, methods=['POST'])
app.add_url_rule('/search', view_func=search.search, methods=['GET'])
app.add_url_rule('/searchcommand', view_func=search.searchcommand, methods=['POST'])
app.add_url_rule('/songs_dashboard', view_func=a.songs_dashboard)
# Grab the HOST_RULE environment variable
host_rule = os.getenv("HOST_RULE", "")
# Use a regex to extract domain names between backticks in patterns like Host(`something`)
pattern = r"Host\(`([^`]+)`\)"
allowed_domains = re.findall(pattern, host_rule)
socketio = SocketIO(
app,
async_mode='eventlet',
cors_allowed_origins=allowed_domains
)
background_thread_running = False
# Global variables to track the number of connected clients and the background thread
clients_connected = 0
background_thread = None
thread_lock = threading.Lock()
@lru_cache(maxsize=10)
def get_cached_image(size):
dimensions = tuple(map(int, size.split('-')[1].split('x')))
original_logo_path = os.path.join(app.root_path, 'custom_logo', 'logoB.png')
with Image.open(original_logo_path) as img:
img = img.convert("RGBA")
orig_width, orig_height = img.size
if dimensions[0] >= orig_width and dimensions[1] >= orig_height:
resized_img = img
else:
resized_img = img.copy()
resized_img.thumbnail(dimensions, Image.LANCZOS)
img_byte_arr = io.BytesIO()
resized_img.save(img_byte_arr, format='PNG')
return img_byte_arr.getvalue()
def list_directory_contents(directory, subpath):
"""
List only the immediate contents of the given directory.
Also, if a "Transkription" subfolder exists, check for matching .md files for music files.
Skip folders that start with a dot.
"""
directories = []
files = []
transcription_dir = os.path.join(directory, "Transkription")
transcription_exists = os.path.isdir(transcription_dir)
# Define allowed file extensions.
music_exts = ('.mp3',)
image_exts = ('.jpg', '.jpeg', '.png', '.gif', '.bmp')
blocked_filenames = ['Thumbs.db']
try:
with os.scandir(directory) as it:
# Sorting by name if required.
for entry in sorted(it, key=lambda e: e.name):
# Skip hidden files and directories.
if entry.name.startswith('.'):
continue
# Skip blocked_filenames
if entry.name in blocked_filenames:
continue
if entry.is_dir(follow_symlinks=False):
if entry.name in ["Transkription", "@eaDir"]:
continue
rel_path = os.path.join(subpath, entry.name) if subpath else entry.name
directories.append({'name': entry.name, 'path': rel_path.replace(os.sep, '/')})
elif entry.is_file(follow_symlinks=False):
lower_name = entry.name.lower()
# implement file type filtering here !!!
#if lower_name.endswith(music_exts) or lower_name.endswith(image_exts):
rel_path = os.path.join(subpath, entry.name) if subpath else entry.name
if lower_name.endswith(music_exts):
file_type = 'music'
elif lower_name.endswith(image_exts):
file_type = 'image'
else:
file_type = 'other'
file_entry = {'name': entry.name, 'path': rel_path.replace(os.sep, '/'), 'file_type': file_type}
# Only check for transcription if it's a audio file.
if file_type == 'music' and transcription_exists:
base_name = os.path.splitext(entry.name)[0]
transcript_filename = base_name + '.md'
transcript_path = os.path.join(transcription_dir, transcript_filename)
if os.path.isfile(transcript_path):
file_entry['has_transcript'] = True
transcript_rel_path = os.path.join(subpath, "Transkription", transcript_filename) if subpath else os.path.join("Transkription", transcript_filename)
file_entry['transcript_url'] = url_for('get_transcript', subpath=transcript_rel_path.replace(os.sep, '/'))
else:
file_entry['has_transcript'] = False
else:
file_entry['has_transcript'] = False
files.append(file_entry)
except PermissionError:
pass
return directories, files
def generate_breadcrumbs(subpath=None):
breadcrumbs = [{'name': 'Home', 'path': ''}]
if subpath:
parts = subpath.split('/')
path_accum = ""
for part in parts:
path_accum = f"{path_accum}/{part}" if path_accum else part
breadcrumbs.append({'name': part, 'path': path_accum})
return breadcrumbs
@app.route('/icon/<string:size>.png')
def serve_resized_icon(size):
cached_image_bytes = get_cached_image(size)
response = send_file(
io.BytesIO(cached_image_bytes),
mimetype='image/png'
)
response.headers['Cache-Control'] = 'public, max-age=86400'
return response
@app.route('/custom_logo/<string:filename>.png')
def custom_logo(filename):
file_path = os.path.join('custom_logo', f"{filename}.png")
if not os.path.exists(file_path):
abort(404)
with open(file_path, 'rb') as file:
image_data = file.read()
# Create a BytesIO object using the binary image data
image_io = io.BytesIO(image_data)
image_io.seek(0) # Important: reset the stream position
response = send_file(image_io, mimetype='image/png')
response.headers['Cache-Control'] = 'public, max-age=86400'
return response
@app.route('/sw.js')
def serve_sw():
return send_from_directory(os.path.join(app.root_path, 'static'), 'sw.js', mimetype='application/javascript')
# API endpoint for AJAX: returns JSON for a given directory.
@app.route('/api/path/', defaults={'subpath': ''})
@app.route('/api/path/<path:subpath>')
@auth.require_secret
def api_browse(subpath):
if subpath == '': # root directory
foldernames = []
for foldername, folderpath in session['folders'].items():
foldernames.append({'name': foldername, 'path': foldername})
return jsonify({
'breadcrumbs': generate_breadcrumbs(),
'directories': foldernames,
'files': []
})
root, *relative_parts = subpath.split('/')
base_path = session['folders'][root]
directory = os.path.join(base_path, *relative_parts)
playfile = None
# Check if the constructed directory exists.
if not os.path.isdir(directory):
# Assume the last segment is a filename; remove it.
if relative_parts:
playfile = relative_parts.pop() # Get the filename.
directory = os.path.join(base_path, *relative_parts)
# Rebuild subpath to reflect the directory (without the file).
subpath = '/'.join([root] + relative_parts)
# If the parent directory still doesn't exist, return error.
if not os.path.isdir(directory):
return jsonify({'error': 'Directory not found'}), 404
directories, files = list_directory_contents(directory, subpath)
breadcrumbs = generate_breadcrumbs(subpath)
response = {
'breadcrumbs': breadcrumbs,
'directories': directories,
'files': files
}
# If a filename was selected include it.
if playfile:
response['playfile'] = os.path.join(subpath, playfile).replace(os.sep, '/')
return jsonify(response)
@app.route("/media/<path:subpath>")
@auth.require_secret
def serve_file(subpath):
# 1) Locate the real file on disk
root, *relative_parts = subpath.split('/')
base_path = session['folders'].get(root)
full_path = os.path.join(base_path or '', *relative_parts)
if not os.path.isfile(full_path):
app.logger.error(f"File not found: {full_path}")
return "File not found", 404
# 2) Prep request info
mime, _ = mimetypes.guess_type(full_path)
mime = mime or 'application/octet-stream'
is_cache_request = request.headers.get('X-Cache-Request') == 'true'
is_audio_get = mime.startswith('audio/') and request.method == 'GET'
ip_address = request.remote_addr
user_agent = request.headers.get('User-Agent')
# skip logging on cache hits or on audio GETs (per your rules)
do_log = (
not is_cache_request # skip if upstream CDN asked us to cache
and not is_audio_get # skip audio GETs
)
# 3) Pick cache
if mime.startswith('audio/'):
cache = cache_audio
elif mime.startswith('image/'):
cache = cache_image
elif mime.startswith('video/'):
cache = cache_video
else:
cache = cache_other
# 4) Image and thumbnail handling first
if mime.startswith('image/'):
small = request.args.get('thumbnail') == 'true'
name, ext = os.path.splitext(subpath)
orig_key = subpath
small_key = f"{name}_small{ext}"
cache_key = small_key if small else orig_key
try:
# Try to read the requested variant
with cache.read(cache_key) as reader:
file_path = reader.name
cached_hit = True
except KeyError:
if small: # do not create when thumbnail requested
response = make_response('', 204)
return response
cached_hit = False
# On miss: generate both full-size and small thumb, then cache
with Image.open(full_path) as orig:
img = ImageOps.exif_transpose(orig)
variants = {
orig_key: (1920, 1920),
small_key: ( 480, 480),
}
for key, size in variants.items():
thumb = img.copy()
thumb.thumbnail(size, Image.LANCZOS)
if thumb.mode in ("RGBA", "P"):
thumb = thumb.convert("RGB")
bio = io.BytesIO()
thumb.save(bio, format='JPEG', quality=85)
bio.seek(0)
cache.set(key, bio, read=True)
# Read back the variant we need
with cache.read(cache_key) as reader:
file_path = reader.name
# Serve the image variant
response = send_file(
file_path,
mimetype=mime,
conditional=True,
as_attachment=(request.args.get('download') == 'true'),
download_name=os.path.basename(orig_key)
)
response.headers['Content-Disposition'] = 'inline'
response.headers['Cache-Control'] = 'public, max-age=86400'
if do_log and not small:
a.log_file_access(
cache_key,
os.path.getsize(file_path),
mime,
ip_address,
user_agent,
session['device_id'],
cached_hit
)
return response
# 5) Non-image branch: ensure original is cached
try:
with cache.read(subpath) as reader:
file_path = reader.name
cached_hit = True
except KeyError:
cached_hit = False
try:
cache.set(subpath, open(full_path, 'rb'), read=True)
with cache.read(subpath) as reader:
file_path = reader.name
except Exception as e:
app.logger.error(f"Failed to cache file {subpath}: {e}")
abort(500)
# 6) Build response for non-image
filesize = os.path.getsize(file_path)
# Figure out download flag and filename
ask_download = request.args.get('download') == 'true'
filename = os.path.basename(full_path)
# Single send_file call with proper attachment handling
response = send_file(
file_path,
mimetype=mime,
conditional=True,
as_attachment=ask_download,
download_name=filename if ask_download else None
)
if not ask_download:
response.headers['Content-Disposition'] = 'inline'
response.headers['Cache-Control'] = 'public, max-age=86400'
# 7) Logging
if do_log:
a.log_file_access(
subpath,
filesize,
mime,
ip_address,
user_agent,
session['device_id'],
cached_hit
)
return response
@app.route("/transcript/<path:subpath>")
@auth.require_secret
def get_transcript(subpath):
root, *relative_parts = subpath.split('/')
base_path = session['folders'][root]
full_path = os.path.join(base_path, *relative_parts)
if not os.path.isfile(full_path):
return "Transcription not found", 404
with open(full_path, 'r', encoding='utf-8') as f:
content = f.read()
return content, 200, {'Content-Type': 'text/markdown; charset=utf-8'}
@app.route("/create_share/<path:subpath>")
@auth.require_secret
def create_share(subpath):
scheme = request.scheme # current scheme (http or https)
if 'admin ' not in session and not session.get('admin'):
return "Unauthorized", 403
paths = {}
for item in folder_config:
for folder in item['folders']:
paths[folder['foldername']] = folder['folderpath']
# use folder config file and ignore validity to get full path
root, *relative_parts = subpath.split('/')
base_path = paths[root]
full_path = os.path.join(base_path, *relative_parts)
foldername = relative_parts[-1]
validity_date = (datetime.now() + timedelta(days=90)).strftime('%d.%m.%Y')
data = {
"validity": validity_date,
"folders": [
{
"foldername": foldername,
"folderpath": full_path
}
]
}
print (data)
token = auth.generate_secret_key_compressed(data)
content = f"""Ordner:\\
{foldername}\\
\\
Gültig bis:\\
{validity_date}\\
\\
Link:\\
`{scheme}://{request.host}?token={token}`"""
return content, 200, {'Content-Type': 'text/markdown; charset=utf-8'}
def query_recent_connections():
global clients_connected, background_thread_running
background_thread_running = True
last_connections = None
try:
while clients_connected > 0:
rows = a.return_file_access()
connections = [
{
'timestamp': datetime.fromisoformat(row[0]).strftime('%d.%m.%Y %H:%M:%S'),
'full_path': row[1],
'filesize': row[2],
'mime_typ': row[3],
'location': row[4],
'user_agent': row[5],
'cached': row[7]
}
for row in rows
]
if connections != last_connections:
socketio.emit('recent_connections', connections)
last_connections = connections.copy()
socketio.sleep(1)
finally:
background_thread_running = False
@socketio.on('connect')
def handle_connect(auth=None):
global clients_connected, background_thread_running
clients_connected += 1
print("Client connected. Total clients:", clients_connected)
with thread_lock:
if not background_thread_running:
socketio.start_background_task(query_recent_connections)
print("Started background query task.")
@socketio.on('disconnect')
def handle_disconnect():
global clients_connected
clients_connected -= 1
print("Client disconnected. Total clients:", clients_connected)
@socketio.on('request_initial_data')
def handle_request_initial_data():
rows = a.return_file_access()
connections = [
{
'timestamp': datetime.fromisoformat(row[0]).strftime('%d.%m.%Y %H:%M:%S'),
'full_path': row[1],
'filesize' : row[2],
'mime_typ' : row[3],
'location': row[4],
'user_agent': row[5],
'cached': row[7]
}
for row in rows
]
emit('recent_connections', connections)
# Catch-all route to serve the single-page application template.
@app.route('/', defaults={'path': ''})
@app.route('/<path:path>')
@auth.require_secret
def index(path):
title_short = app_config.get('TITLE_SHORT', 'Default Title')
title_long = app_config.get('TITLE_LONG' , 'Default Title')
if 'admin' in session:
admin_enabled = session['admin']
return render_template("app.html",
title_short=title_short,
title_long=title_long,
admin_enabled=admin_enabled,
)
if __name__ == '__main__':
socketio.run(app, debug=True, host='0.0.0.0')