from flask import Flask, render_template, send_file, url_for, jsonify, request, session, send_from_directory, abort import os from PIL import Image import io from functools import wraps import mimetypes import sqlite3 from datetime import datetime, date, timedelta import diskcache import threading import json import time from flask_socketio import SocketIO import geoip2.database from functools import lru_cache from urllib.parse import urlparse, unquote from werkzeug.middleware.proxy_fix import ProxyFix from auth import require_secret import analytics as a cache = diskcache.Cache('./filecache', size_limit= 48 * 1024**3) # 48 GB limit app = Flask(__name__) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1) app.config['SECRET_KEY'] = '85c1117eb3a5f2c79f0ff395bada8ff8d9a257b99ef5e143' app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=90) if os.environ.get('FLASK_ENV') == 'production': app.config['SESSION_COOKIE_SAMESITE'] = 'None' app.config['SESSION_COOKIE_SECURE'] = True app.add_url_rule('/dashboard', view_func=a.dashboard) app.add_url_rule('/network', view_func=a.network) socketio = SocketIO(app) # Global variables to track the number of connected clients and the background thread clients_connected = 0 background_thread = None thread_lock = threading.Lock() @lru_cache(maxsize=10) def get_cached_image(size): dimensions = tuple(map(int, size.split('-')[1].split('x'))) original_logo_path = os.path.join(app.root_path, 'static', 'logo.png') with Image.open(original_logo_path) as img: img = img.convert("RGBA") orig_width, orig_height = img.size if dimensions[0] >= orig_width and dimensions[1] >= orig_height: resized_img = img else: resized_img = img.copy() resized_img.thumbnail(dimensions, Image.LANCZOS) img_byte_arr = io.BytesIO() resized_img.save(img_byte_arr, format='PNG') return img_byte_arr.getvalue() def list_directory_contents(directory, subpath): """ List only the immediate contents of the given directory. Also, if a "Transkription" subfolder exists, check for matching .md files for music files. Skip folders that start with a dot. """ directories = [] files = [] transcription_dir = os.path.join(directory, "Transkription") transcription_exists = os.path.isdir(transcription_dir) # Define allowed file extensions. allowed_music_exts = ('.mp3',) allowed_image_exts = ('.jpg', '.jpeg', '.png', '.gif', '.bmp') try: for item in sorted(os.listdir(directory)): # Skip hidden folders and files starting with a dot. if item.startswith('.'): continue full_path = os.path.join(directory, item) # Process directories. if os.path.isdir(full_path): # skip folder skip_folder = ["Transkription", "@eaDir"] if item in skip_folder: continue rel_path = os.path.join(subpath, item) if subpath else item rel_path = rel_path.replace(os.sep, '/') directories.append({'name': item, 'path': rel_path}) # Process files: either music or image files. elif os.path.isfile(full_path) and ( item.lower().endswith(allowed_music_exts) or item.lower().endswith(allowed_image_exts) ): rel_path = os.path.join(subpath, item) if subpath else item rel_path = rel_path.replace(os.sep, '/') # Determine the file type. if item.lower().endswith(allowed_music_exts): file_type = 'music' else: file_type = 'image' file_entry = {'name': item, 'path': rel_path, 'file_type': file_type} # Only check for transcription if it's a music file. if file_type == 'music' and transcription_exists: base_name = os.path.splitext(item)[0] transcript_filename = base_name + '.md' transcript_path = os.path.join(transcription_dir, transcript_filename) if os.path.isfile(transcript_path): file_entry['has_transcript'] = True transcript_rel_path = os.path.join(subpath, "Transkription", transcript_filename) if subpath else os.path.join("Transkription", transcript_filename) transcript_rel_path = transcript_rel_path.replace(os.sep, '/') file_entry['transcript_url'] = url_for('get_transcript', subpath=transcript_rel_path) else: file_entry['has_transcript'] = False else: file_entry['has_transcript'] = False files.append(file_entry) except PermissionError: pass return directories, files def generate_breadcrumbs(subpath=None): breadcrumbs = [{'name': 'Home', 'path': ''}] if subpath: parts = subpath.split('/') path_accum = "" for part in parts: path_accum = f"{path_accum}/{part}" if path_accum else part breadcrumbs.append({'name': part, 'path': path_accum}) return breadcrumbs @app.route('/static/icons/.png') def serve_resized_icon(size): cached_image_bytes = get_cached_image(size) return send_file( io.BytesIO(cached_image_bytes), mimetype='image/png' ) @app.route('/sw.js') def serve_sw(): return send_from_directory(os.path.join(app.root_path, 'static'), 'sw.js', mimetype='application/javascript') # API endpoint for AJAX: returns JSON for a given directory. @app.route('/api/path/', defaults={'subpath': ''}) @app.route('/api/path/') @require_secret def api_browse(subpath): if subpath == '': # root directory foldernames = [] for foldername, folderpath in session['folders'].items(): foldernames.append({'name': foldername, 'path': foldername}) return jsonify({ 'breadcrumbs': generate_breadcrumbs(), 'directories': foldernames, 'files': [] }) root, *relative_parts = subpath.split('/') base_path = session['folders'][root] directory = os.path.join(base_path, *relative_parts) if not os.path.isdir(directory): return jsonify({'error': 'Directory not found'}), 404 directories, files = list_directory_contents(directory, subpath) breadcrumbs = generate_breadcrumbs(subpath) return jsonify({ 'breadcrumbs': breadcrumbs, 'directories': directories, 'files': files }) @app.route("/media/") @require_secret def serve_file(subpath): root, *relative_parts = subpath.split('/') base_path = session['folders'][root] full_path = os.path.join(base_path, *relative_parts) if not os.path.isfile(full_path): app.logger.error(f"File not found: {full_path}") return "File not found", 404 mime, _ = mimetypes.guess_type(full_path) mime = mime or 'application/octet-stream' if mime and mime.startswith('image/'): pass # do not log access to images else: # HEAD request are coming in to initiate server caching. # only log initial hits and not the reload of further file parts range_header = request.headers.get('Range') if request.method != 'HEAD' and (not range_header or range_header.startswith("bytes=0-")): a.log_file_access(full_path) # Check cache first (using diskcache) response = None cached = cache.get(subpath) if cached: cached_file_bytes, mime = cached cached_file = io.BytesIO(cached_file_bytes) response = send_file(cached_file, mimetype=mime) else: if mime and mime.startswith('image/'): # Image processing branch (with caching) try: with Image.open(full_path) as img: img.thumbnail((1200, 1200)) img_bytes = io.BytesIO() img.save(img_bytes, format='PNG', quality=85) img_bytes = img_bytes.getvalue() cache.set(subpath, (img_bytes, mime)) response = send_file(io.BytesIO(img_bytes), mimetype=mime) except Exception as e: app.logger.error(f"Image processing failed for {subpath}: {e}") abort(500) else: # Cache non-image files: read bytes and cache try: with open(full_path, 'rb') as f: file_bytes = f.read() cache.set(subpath, (file_bytes, mime)) response = send_file(io.BytesIO(file_bytes), mimetype=mime) except Exception as e: app.logger.error(f"Failed to read file {subpath}: {e}") abort(500) # Set Cache-Control header (browser caching for 1 day) response.headers['Cache-Control'] = 'public, max-age=86400' return response @app.route("/transcript/") @require_secret def get_transcript(subpath): root, *relative_parts = subpath.split('/') base_path = session['folders'][root] full_path = os.path.join(base_path, *relative_parts) if not os.path.isfile(full_path): return "Transcription not found", 404 with open(full_path, 'r', encoding='utf-8') as f: content = f.read() return content, 200, {'Content-Type': 'text/markdown; charset=utf-8'} @app.route("/crawl/") @require_secret def crawl_and_cache(subpath): """ Crawls through a directory and caches each file. For images, it creates a thumbnail (max 1200x1200) and caches the processed image. For non-images, it simply reads and caches the file bytes. """ root, *relative_parts = subpath.split('/') base_path = session['folders'][root] full_path = os.path.join(base_path, *relative_parts) cached_files = [] # List to hold cached file relative paths # Walk through all subdirectories and files for root, dirs, files in os.walk(full_path): for filename in files: full_path_file = os.path.join(root, filename) # Skip if this file is already in the cache if cache.get(full_path_file): continue # Determine the MIME type mime, _ = mimetypes.guess_type(full_path) mime = mime or 'application/octet-stream' # Process image files differently if mime.startswith('image/'): try: with Image.open(full_path) as img: # Create a thumbnail (max 1200x1200) img.thumbnail((1200, 1200)) img_bytes_io = io.BytesIO() # Save processed image as PNG img.save(img_bytes_io, format='PNG', quality=85) img_bytes = img_bytes_io.getvalue() # Cache the processed image bytes along with its mime type cache.set(full_path_file, (img_bytes, mime)) cached_files.append(full_path_file) except Exception as e: app.logger.error(f"Image processing failed for {full_path_file}: {e}") else: # Process non-image files try: with open(full_path_file, 'rb') as f: file_bytes = f.read() cache.set(full_path_file, (file_bytes, mime)) cached_files.append(full_path_file) except Exception as e: app.logger.error(f"Failed to read file {full_path_file}: {e}") # Return the list of cached files as a JSON response return json.dumps({"cached_files": cached_files}, indent=4), 200 def query_recent_connections(): global clients_connected last_connections = None # Initialize with None to ensure the first emit happens while clients_connected > 0: rows = a.return_file_access() # Convert rows to dictionaries for the client. connections = [ { 'timestamp': row[0], 'full_path': row[1], 'ip_address': row[2], 'user_agent': row[3], 'referrer': row[4] } for row in rows ] # Only emit if there's a change compared to the previous connections. if connections != last_connections: socketio.emit('recent_connections', connections) last_connections = connections.copy() # Store a copy of the current state time.sleep(1) print("No clients connected; stopping query thread.") @socketio.on('connect') def handle_connect(): global clients_connected, background_thread clients_connected += 1 print("Client connected. Total clients:", clients_connected) with thread_lock: # Start the background task if it's not already running. if background_thread is None or not background_thread.is_alive(): background_thread = socketio.start_background_task(query_recent_connections) print("Started background query task.") @socketio.on('disconnect') def handle_disconnect(): global clients_connected clients_connected -= 1 print("Client disconnected. Total clients:", clients_connected) # Catch-all route to serve the single-page application template. @app.route('/', defaults={'path': ''}) @app.route('/') @require_secret def index(path): return render_template("app.html") if __name__ == '__main__': socketio.run(app, debug=True, host='0.0.0.0')