from flask import Flask, render_template, send_file, url_for, jsonify, request, session, send_from_directory, abort import os from PIL import Image import io from functools import wraps import mimetypes from datetime import datetime, date, timedelta import diskcache import threading import json import time from flask_socketio import SocketIO, emit import geoip2.database from functools import lru_cache from urllib.parse import urlparse, unquote from werkzeug.middleware.proxy_fix import ProxyFix import re import search import auth import analytics as a with open("app_config.json", 'r') as file: app_config = json.load(file) cache_audio = diskcache.Cache('./filecache_audio', size_limit= app_config['filecache_size_limit_audio'] * 1024**3) cache_image = diskcache.Cache('./filecache_image', size_limit= app_config['filecache_size_limit_image'] * 1024**3) cache_video = diskcache.Cache('./filecache_video', size_limit= app_config['filecache_size_limit_video'] * 1024**3) cache_other = diskcache.Cache('./filecache_other', size_limit= app_config['filecache_size_limit_other'] * 1024**3) app = Flask(__name__) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1) app.config['SECRET_KEY'] = app_config['SECRET_KEY'] app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=90) if os.environ.get('FLASK_ENV') == 'production': app.config['SESSION_COOKIE_SAMESITE'] = 'None' app.config['SESSION_COOKIE_SECURE'] = True app.add_url_rule('/dashboard', view_func=a.dashboard) app.add_url_rule('/connections', view_func=a.connections) app.add_url_rule('/mylinks', view_func=auth.mylinks) app.add_url_rule('/remove_secret', view_func=auth.remove_secret, methods=['POST']) app.add_url_rule('/search', view_func=search.search, methods=['GET']) app.add_url_rule('/searchcommand', view_func=search.searchcommand, methods=['POST']) app.add_url_rule('/songs_dashboard', view_func=a.songs_dashboard) # Grab the HOST_RULE environment variable host_rule = os.getenv("HOST_RULE", "") # Use a regex to extract domain names between backticks in patterns like Host(`something`) pattern = r"Host\(`([^`]+)`\)" allowed_domains = re.findall(pattern, host_rule) socketio = SocketIO( app, async_mode='eventlet', cors_allowed_origins=allowed_domains ) background_thread_running = False # Global variables to track the number of connected clients and the background thread clients_connected = 0 background_thread = None thread_lock = threading.Lock() @lru_cache(maxsize=10) def get_cached_image(size): dimensions = tuple(map(int, size.split('-')[1].split('x'))) original_logo_path = os.path.join(app.root_path, 'custom_logo', 'logoB.png') with Image.open(original_logo_path) as img: img = img.convert("RGBA") orig_width, orig_height = img.size if dimensions[0] >= orig_width and dimensions[1] >= orig_height: resized_img = img else: resized_img = img.copy() resized_img.thumbnail(dimensions, Image.LANCZOS) img_byte_arr = io.BytesIO() resized_img.save(img_byte_arr, format='PNG') return img_byte_arr.getvalue() def list_directory_contents(directory, subpath): """ List only the immediate contents of the given directory. Also, if a "Transkription" subfolder exists, check for matching .md files for music files. Skip folders that start with a dot. """ directories = [] files = [] transcription_dir = os.path.join(directory, "Transkription") transcription_exists = os.path.isdir(transcription_dir) # Define allowed file extensions. music_exts = ('.mp3',) image_exts = ('.jpg', '.jpeg', '.png', '.gif', '.bmp') try: with os.scandir(directory) as it: # Sorting by name if required. for entry in sorted(it, key=lambda e: e.name): # Skip hidden files and directories. if entry.name.startswith('.'): continue if entry.is_dir(follow_symlinks=False): if entry.name in ["Transkription", "@eaDir"]: continue rel_path = os.path.join(subpath, entry.name) if subpath else entry.name directories.append({'name': entry.name, 'path': rel_path.replace(os.sep, '/')}) elif entry.is_file(follow_symlinks=False): lower_name = entry.name.lower() # implement file type filtering here !!! #if lower_name.endswith(music_exts) or lower_name.endswith(image_exts): rel_path = os.path.join(subpath, entry.name) if subpath else entry.name if lower_name.endswith(music_exts): file_type = 'music' elif lower_name.endswith(image_exts): file_type = 'image' else: file_type = 'other' file_entry = {'name': entry.name, 'path': rel_path.replace(os.sep, '/'), 'file_type': file_type} # Only check for transcription if it's a audio file. if file_type == 'music' and transcription_exists: base_name = os.path.splitext(entry.name)[0] transcript_filename = base_name + '.md' transcript_path = os.path.join(transcription_dir, transcript_filename) if os.path.isfile(transcript_path): file_entry['has_transcript'] = True transcript_rel_path = os.path.join(subpath, "Transkription", transcript_filename) if subpath else os.path.join("Transkription", transcript_filename) file_entry['transcript_url'] = url_for('get_transcript', subpath=transcript_rel_path.replace(os.sep, '/')) else: file_entry['has_transcript'] = False else: file_entry['has_transcript'] = False files.append(file_entry) except PermissionError: pass return directories, files def generate_breadcrumbs(subpath=None): breadcrumbs = [{'name': 'Home', 'path': ''}] if subpath: parts = subpath.split('/') path_accum = "" for part in parts: path_accum = f"{path_accum}/{part}" if path_accum else part breadcrumbs.append({'name': part, 'path': path_accum}) return breadcrumbs @app.route('/icon/.png') def serve_resized_icon(size): cached_image_bytes = get_cached_image(size) response = send_file( io.BytesIO(cached_image_bytes), mimetype='image/png' ) response.headers['Cache-Control'] = 'public, max-age=86400' return response @app.route('/custom_logo/.png') def custom_logo(filename): file_path = os.path.join('custom_logo', f"{filename}.png") if not os.path.exists(file_path): abort(404) with open(file_path, 'rb') as file: image_data = file.read() # Create a BytesIO object using the binary image data image_io = io.BytesIO(image_data) image_io.seek(0) # Important: reset the stream position response = send_file(image_io, mimetype='image/png') response.headers['Cache-Control'] = 'public, max-age=86400' return response @app.route('/sw.js') def serve_sw(): return send_from_directory(os.path.join(app.root_path, 'static'), 'sw.js', mimetype='application/javascript') # API endpoint for AJAX: returns JSON for a given directory. @app.route('/api/path/', defaults={'subpath': ''}) @app.route('/api/path/') @auth.require_secret def api_browse(subpath): if subpath == '': # root directory foldernames = [] for foldername, folderpath in session['folders'].items(): foldernames.append({'name': foldername, 'path': foldername}) return jsonify({ 'breadcrumbs': generate_breadcrumbs(), 'directories': foldernames, 'files': [] }) root, *relative_parts = subpath.split('/') base_path = session['folders'][root] directory = os.path.join(base_path, *relative_parts) playfile = None # Check if the constructed directory exists. if not os.path.isdir(directory): # Assume the last segment is a filename; remove it. if relative_parts: playfile = relative_parts.pop() # Get the filename. directory = os.path.join(base_path, *relative_parts) # Rebuild subpath to reflect the directory (without the file). subpath = '/'.join([root] + relative_parts) # If the parent directory still doesn't exist, return error. if not os.path.isdir(directory): return jsonify({'error': 'Directory not found'}), 404 directories, files = list_directory_contents(directory, subpath) breadcrumbs = generate_breadcrumbs(subpath) response = { 'breadcrumbs': breadcrumbs, 'directories': directories, 'files': files } # If a filename was selected include it. if playfile: response['playfile'] = os.path.join(subpath, playfile).replace(os.sep, '/') return jsonify(response) @app.route("/media/") @auth.require_secret def serve_file(subpath): root, *relative_parts = subpath.split('/') base_path = session['folders'][root] full_path = os.path.join(base_path, *relative_parts) if not os.path.isfile(full_path): app.logger.error(f"File not found: {full_path}") return "File not found", 404 mime, _ = mimetypes.guess_type(full_path) mime = mime or 'application/octet-stream' range_header = request.headers.get('Range') ip_address = request.remote_addr user_agent = request.headers.get('User-Agent') is_cache_request = request.headers.get('X-Cache-Request') == 'true' # Check cache first (using diskcache) response = None # determine the cache to use based on the file type if mime and mime.startswith('audio/'): cache = cache_audio elif mime and mime.startswith('image/'): cache = cache_image elif mime and mime.startswith('video/'): cache = cache_video else: cache = cache_other # Check if the file is already cached if is_cache_request: logging = False else: logging = True cached = cache.get(subpath) if cached: cached_file_bytes, mime = cached cached_file = io.BytesIO(cached_file_bytes) filesize = len(cached_file.getbuffer()) response = send_file(cached_file, mimetype=mime) else: if mime and mime.startswith('image/'): # Image processing branch (with caching) try: with Image.open(full_path) as img: img.thumbnail((1920, 1920)) if img.mode in ("RGBA", "P"): img = img.convert("RGB") output_format = 'JPEG' output_mime = 'image/jpeg' save_kwargs = {'quality': 85} img_bytes_io = io.BytesIO() img.save(img_bytes_io, format=output_format, **save_kwargs) thumb_bytes = img_bytes_io.getvalue() filesize = len(thumb_bytes) cache.set(subpath, (thumb_bytes, output_mime)) response = send_file(io.BytesIO(thumb_bytes), mimetype=output_mime, conditional=True) except Exception as e: app.logger.error(f"Image processing failed for {subpath}: {e}") abort(500) else: # Cache non-image files: read bytes and cache try: with open(full_path, 'rb') as f: file_bytes = f.read() cache.set(subpath, (file_bytes, mime)) file_bytes_io = io.BytesIO(file_bytes) filesize = len(file_bytes_io.getbuffer()) response = send_file(file_bytes_io, mimetype=mime, conditional=True) except Exception as e: app.logger.error(f"Failed to read file {subpath}: {e}") abort(500) # Set Cache-Control header (browser caching for 1 day) response.headers['Cache-Control'] = 'public, max-age=86400' # special rules for audio files. # HEAD request checks if the audio file is available. GET requests coming from the audi player itself and can be made multiple times. # a HEAD request only for logging will be ignored because of rules before if mime and mime.startswith('audio/mpeg') and request.method != 'HEAD': logging = False if logging: a.log_file_access(subpath, filesize, mime, ip_address, user_agent, session['device_id'], bool(cached)) return response @app.route("/transcript/") @auth.require_secret def get_transcript(subpath): root, *relative_parts = subpath.split('/') base_path = session['folders'][root] full_path = os.path.join(base_path, *relative_parts) if not os.path.isfile(full_path): return "Transcription not found", 404 with open(full_path, 'r', encoding='utf-8') as f: content = f.read() return content, 200, {'Content-Type': 'text/markdown; charset=utf-8'} @app.route("/crawl/") @auth.require_secret def crawl_and_cache(subpath): """ Crawls through a directory and caches each file. For images, it creates a thumbnail (max 1200x1200) and caches the processed image. For non-images, it simply reads and caches the file bytes. """ root, *relative_parts = subpath.split('/') base_path = session['folders'][root] full_path = os.path.join(base_path, *relative_parts) cached_files = [] # List to hold cached file relative paths # Walk through all subdirectories and files for root, dirs, files in os.walk(full_path): for filename in files: full_path_file = os.path.join(root, filename) # Skip if this file is already in the cache if cache.get(full_path_file): continue # Determine the MIME type mime, _ = mimetypes.guess_type(full_path) mime = mime or 'application/octet-stream' # Process image files differently if mime.startswith('image/'): try: with Image.open(full_path) as img: # Create a thumbnail (max 1200x1200) img.thumbnail((1200, 1200)) img_bytes_io = io.BytesIO() # Save processed image as PNG img.save(img_bytes_io, format='PNG', quality=85) img_bytes = img_bytes_io.getvalue() # Cache the processed image bytes along with its mime type cache.set(full_path_file, (img_bytes, mime)) cached_files.append(full_path_file) except Exception as e: app.logger.error(f"Image processing failed for {full_path_file}: {e}") else: # Process non-image files try: with open(full_path_file, 'rb') as f: file_bytes = f.read() cache.set(full_path_file, (file_bytes, mime)) cached_files.append(full_path_file) except Exception as e: app.logger.error(f"Failed to read file {full_path_file}: {e}") # Return the list of cached files as a JSON response return json.dumps({"cached_files": cached_files}, indent=4), 200 def query_recent_connections(): global clients_connected, background_thread_running background_thread_running = True last_connections = None try: while clients_connected > 0: rows = a.return_file_access() connections = [ { 'timestamp': datetime.fromisoformat(row[0]).strftime('%d.%m.%Y %H:%M:%S'), 'full_path': row[1], 'filesize': row[2], 'mime_typ': row[3], 'location': row[4], 'user_agent': row[5], 'cached': row[7] } for row in rows ] if connections != last_connections: socketio.emit('recent_connections', connections) last_connections = connections.copy() socketio.sleep(1) finally: background_thread_running = False print("No clients connected; stopping query thread.") @socketio.on('connect') def handle_connect(auth=None): global clients_connected, background_thread_running clients_connected += 1 print("Client connected. Total clients:", clients_connected) with thread_lock: if not background_thread_running: socketio.start_background_task(query_recent_connections) print("Started background query task.") @socketio.on('disconnect') def handle_disconnect(): global clients_connected clients_connected -= 1 print("Client disconnected. Total clients:", clients_connected) @socketio.on('request_initial_data') def handle_request_initial_data(): rows = a.return_file_access() connections = [ { 'timestamp': datetime.fromisoformat(row[0]).strftime('%d.%m.%Y %H:%M:%S'), 'full_path': row[1], 'filesize' : row[2], 'mime_typ' : row[3], 'location': row[4], 'user_agent': row[5], 'cached': row[7] } for row in rows ] emit('recent_connections', connections) # Catch-all route to serve the single-page application template. @app.route('/', defaults={'path': ''}) @app.route('/') @auth.require_secret def index(path): title_short = app_config.get('TITLE_SHORT', 'Default Title') title_long = app_config.get('TITLE_LONG' , 'Default Title') header_color = app_config.get('header_color' , '#000') header_text_color = app_config.get('header_text_color', '#fff') background_color = app_config.get('background_color', '#fff') main_text_color = app_config.get('main_text_color', '#000') return render_template("app.html", title_short=title_short, title_long=title_long, header_color=header_color, header_text_color=header_text_color, main_text_color=main_text_color, background_color=background_color, ) if __name__ == '__main__': socketio.run(app, debug=True, host='0.0.0.0')