from flask import Flask, render_template, send_file, url_for, jsonify, request, session, send_from_directory, abort import os from PIL import Image import io from functools import wraps import mimetypes from datetime import datetime, date, timedelta import diskcache import threading import json import time from flask_socketio import SocketIO, emit import geoip2.database from functools import lru_cache from urllib.parse import urlparse, unquote from werkzeug.middleware.proxy_fix import ProxyFix import re import search import auth import analytics as a with open("app_config.json", 'r') as file: app_config = json.load(file) cache_audio = diskcache.Cache('./filecache_audio', size_limit= app_config['filecache_size_limit_audio'] * 1024**3) cache_image = diskcache.Cache('./filecache_image', size_limit= app_config['filecache_size_limit_image'] * 1024**3) cache_video = diskcache.Cache('./filecache_video', size_limit= app_config['filecache_size_limit_video'] * 1024**3) cache_other = diskcache.Cache('./filecache_other', size_limit= app_config['filecache_size_limit_other'] * 1024**3) app = Flask(__name__) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1) app.config['SECRET_KEY'] = app_config['SECRET_KEY'] app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=90) if os.environ.get('FLASK_ENV') == 'production': app.config['SESSION_COOKIE_SAMESITE'] = 'None' app.config['SESSION_COOKIE_SECURE'] = True app.add_url_rule('/dashboard', view_func=a.dashboard) app.add_url_rule('/connections', view_func=a.connections) app.add_url_rule('/mylinks', view_func=auth.mylinks) app.add_url_rule('/remove_secret', view_func=auth.remove_secret, methods=['POST']) app.add_url_rule('/search', view_func=search.search, methods=['GET']) app.add_url_rule('/searchcommand', view_func=search.searchcommand, methods=['POST']) app.add_url_rule('/songs_dashboard', view_func=a.songs_dashboard) # Grab the HOST_RULE environment variable host_rule = os.getenv("HOST_RULE", "") # Use a regex to extract domain names between backticks in patterns like Host(`something`) pattern = r"Host\(`([^`]+)`\)" allowed_domains = re.findall(pattern, host_rule) socketio = SocketIO( app, async_mode='eventlet', cors_allowed_origins=allowed_domains ) background_thread_running = False # Global variables to track the number of connected clients and the background thread clients_connected = 0 background_thread = None thread_lock = threading.Lock() @lru_cache(maxsize=10) def get_cached_image(size): dimensions = tuple(map(int, size.split('-')[1].split('x'))) original_logo_path = os.path.join(app.root_path, 'custom_logo', 'logoB.png') with Image.open(original_logo_path) as img: img = img.convert("RGBA") orig_width, orig_height = img.size if dimensions[0] >= orig_width and dimensions[1] >= orig_height: resized_img = img else: resized_img = img.copy() resized_img.thumbnail(dimensions, Image.LANCZOS) img_byte_arr = io.BytesIO() resized_img.save(img_byte_arr, format='PNG') return img_byte_arr.getvalue() def list_directory_contents(directory, subpath): """ List only the immediate contents of the given directory. Also, if a "Transkription" subfolder exists, check for matching .md files for music files. Skip folders that start with a dot. """ directories = [] files = [] transcription_dir = os.path.join(directory, "Transkription") transcription_exists = os.path.isdir(transcription_dir) # Define allowed file extensions. music_exts = ('.mp3',) image_exts = ('.jpg', '.jpeg', '.png', '.gif', '.bmp') try: with os.scandir(directory) as it: # Sorting by name if required. for entry in sorted(it, key=lambda e: e.name): # Skip hidden files and directories. if entry.name.startswith('.'): continue if entry.is_dir(follow_symlinks=False): if entry.name in ["Transkription", "@eaDir"]: continue rel_path = os.path.join(subpath, entry.name) if subpath else entry.name directories.append({'name': entry.name, 'path': rel_path.replace(os.sep, '/')}) elif entry.is_file(follow_symlinks=False): lower_name = entry.name.lower() # implement file type filtering here !!! #if lower_name.endswith(music_exts) or lower_name.endswith(image_exts): rel_path = os.path.join(subpath, entry.name) if subpath else entry.name if lower_name.endswith(music_exts): file_type = 'music' elif lower_name.endswith(image_exts): file_type = 'image' else: file_type = 'other' file_entry = {'name': entry.name, 'path': rel_path.replace(os.sep, '/'), 'file_type': file_type} # Only check for transcription if it's a audio file. if file_type == 'music' and transcription_exists: base_name = os.path.splitext(entry.name)[0] transcript_filename = base_name + '.md' transcript_path = os.path.join(transcription_dir, transcript_filename) if os.path.isfile(transcript_path): file_entry['has_transcript'] = True transcript_rel_path = os.path.join(subpath, "Transkription", transcript_filename) if subpath else os.path.join("Transkription", transcript_filename) file_entry['transcript_url'] = url_for('get_transcript', subpath=transcript_rel_path.replace(os.sep, '/')) else: file_entry['has_transcript'] = False else: file_entry['has_transcript'] = False files.append(file_entry) except PermissionError: pass return directories, files def generate_breadcrumbs(subpath=None): breadcrumbs = [{'name': 'Home', 'path': ''}] if subpath: parts = subpath.split('/') path_accum = "" for part in parts: path_accum = f"{path_accum}/{part}" if path_accum else part breadcrumbs.append({'name': part, 'path': path_accum}) return breadcrumbs @app.route('/icon/.png') def serve_resized_icon(size): cached_image_bytes = get_cached_image(size) response = send_file( io.BytesIO(cached_image_bytes), mimetype='image/png' ) response.headers['Cache-Control'] = 'public, max-age=86400' return response @app.route('/custom_logo/.png') def custom_logo(filename): file_path = os.path.join('custom_logo', f"{filename}.png") if not os.path.exists(file_path): abort(404) with open(file_path, 'rb') as file: image_data = file.read() # Create a BytesIO object using the binary image data image_io = io.BytesIO(image_data) image_io.seek(0) # Important: reset the stream position response = send_file(image_io, mimetype='image/png') response.headers['Cache-Control'] = 'public, max-age=86400' return response @app.route('/sw.js') def serve_sw(): return send_from_directory(os.path.join(app.root_path, 'static'), 'sw.js', mimetype='application/javascript') # API endpoint for AJAX: returns JSON for a given directory. @app.route('/api/path/', defaults={'subpath': ''}) @app.route('/api/path/') @auth.require_secret def api_browse(subpath): if subpath == '': # root directory foldernames = [] for foldername, folderpath in session['folders'].items(): foldernames.append({'name': foldername, 'path': foldername}) return jsonify({ 'breadcrumbs': generate_breadcrumbs(), 'directories': foldernames, 'files': [] }) root, *relative_parts = subpath.split('/') base_path = session['folders'][root] directory = os.path.join(base_path, *relative_parts) playfile = None # Check if the constructed directory exists. if not os.path.isdir(directory): # Assume the last segment is a filename; remove it. if relative_parts: playfile = relative_parts.pop() # Get the filename. directory = os.path.join(base_path, *relative_parts) # Rebuild subpath to reflect the directory (without the file). subpath = '/'.join([root] + relative_parts) # If the parent directory still doesn't exist, return error. if not os.path.isdir(directory): return jsonify({'error': 'Directory not found'}), 404 directories, files = list_directory_contents(directory, subpath) breadcrumbs = generate_breadcrumbs(subpath) response = { 'breadcrumbs': breadcrumbs, 'directories': directories, 'files': files } # If a filename was selected include it. if playfile: response['playfile'] = os.path.join(subpath, playfile).replace(os.sep, '/') return jsonify(response) @app.route("/media/") @auth.require_secret def serve_file(subpath): # 1) Locate the real file on disk root, *relative_parts = subpath.split('/') base_path = session['folders'].get(root) full_path = os.path.join(base_path or '', *relative_parts) if not os.path.isfile(full_path): app.logger.error(f"File not found: {full_path}") return "File not found", 404 # 2) Prep request info mime, _ = mimetypes.guess_type(full_path) mime = mime or 'application/octet-stream' is_cache_request = request.headers.get('X-Cache-Request') == 'true' ip_address = request.remote_addr user_agent = request.headers.get('User-Agent') # skip logging on cache hits or on audio GETs (per your rules) do_log = not is_cache_request if mime == 'audio/mpeg' and request.method != 'HEAD': do_log = False # 3) Pick cache if mime.startswith('audio/'): cache = cache_audio elif mime.startswith('image/'): cache = cache_image elif mime.startswith('video/'): cache = cache_video else: cache = cache_other # 4) Ensure cached on‑disk file and get its path try: with cache.read(subpath) as reader: file_path = reader.name except KeyError: # cache miss if mime.startswith('image/'): # ─── 4a) Image branch: thumbnail & cache the JPEG ─── try: with Image.open(full_path) as img: img.thumbnail((1920, 1920)) if img.mode in ("RGBA", "P"): img = img.convert("RGB") thumb_io = io.BytesIO() img.save(thumb_io, format='JPEG', quality=85) thumb_io.seek(0) # write thumbnail into diskcache as a real file cache.set(subpath, thumb_io, read=True) # now re-open from cache to get the on-disk path with cache.read(subpath) as reader: file_path = reader.name except Exception as e: app.logger.error(f"Image processing failed for {subpath}: {e}") abort(500) else: # ─── 4b) Non-image branch: cache original file ─── try: # store the real file on diskcache cache.set(subpath, open(full_path, 'rb'), read=True) # read back to get its path with cache.read(subpath) as reader: file_path = reader.name except Exception as e: app.logger.error(f"Failed to cache file {subpath}: {e}") abort(500) filesize = os.path.getsize(file_path) cached_hit = True # or False if you want to pass that into your logger # 5) Figure out download flag and filename ask_download = request.args.get('download') == 'true' filename = os.path.basename(full_path) # 6) Single send_file call with proper attachment handling response = send_file( file_path, mimetype = mime, conditional = True, as_attachment = ask_download, download_name = filename if ask_download else None ) # Explicitly force inline if not downloading if not ask_download: response.headers['Content-Disposition'] = 'inline' response.headers['Cache-Control'] = 'public, max-age=86400' # 7) Logging if do_log: a.log_file_access( subpath, filesize, mime, ip_address, user_agent, session['device_id'], cached_hit ) return response @app.route("/transcript/") @auth.require_secret def get_transcript(subpath): root, *relative_parts = subpath.split('/') base_path = session['folders'][root] full_path = os.path.join(base_path, *relative_parts) if not os.path.isfile(full_path): return "Transcription not found", 404 with open(full_path, 'r', encoding='utf-8') as f: content = f.read() return content, 200, {'Content-Type': 'text/markdown; charset=utf-8'} def query_recent_connections(): global clients_connected, background_thread_running background_thread_running = True last_connections = None try: while clients_connected > 0: rows = a.return_file_access() connections = [ { 'timestamp': datetime.fromisoformat(row[0]).strftime('%d.%m.%Y %H:%M:%S'), 'full_path': row[1], 'filesize': row[2], 'mime_typ': row[3], 'location': row[4], 'user_agent': row[5], 'cached': row[7] } for row in rows ] if connections != last_connections: socketio.emit('recent_connections', connections) last_connections = connections.copy() socketio.sleep(1) finally: background_thread_running = False print("No clients connected; stopping query thread.") @socketio.on('connect') def handle_connect(auth=None): global clients_connected, background_thread_running clients_connected += 1 print("Client connected. Total clients:", clients_connected) with thread_lock: if not background_thread_running: socketio.start_background_task(query_recent_connections) print("Started background query task.") @socketio.on('disconnect') def handle_disconnect(): global clients_connected clients_connected -= 1 print("Client disconnected. Total clients:", clients_connected) @socketio.on('request_initial_data') def handle_request_initial_data(): rows = a.return_file_access() connections = [ { 'timestamp': datetime.fromisoformat(row[0]).strftime('%d.%m.%Y %H:%M:%S'), 'full_path': row[1], 'filesize' : row[2], 'mime_typ' : row[3], 'location': row[4], 'user_agent': row[5], 'cached': row[7] } for row in rows ] emit('recent_connections', connections) # Catch-all route to serve the single-page application template. @app.route('/', defaults={'path': ''}) @app.route('/') @auth.require_secret def index(path): title_short = app_config.get('TITLE_SHORT', 'Default Title') title_long = app_config.get('TITLE_LONG' , 'Default Title') return render_template("app.html", title_short=title_short, title_long=title_long ) if __name__ == '__main__': socketio.run(app, debug=True, host='0.0.0.0')