from flask import Flask, render_template, send_file, url_for, jsonify, request, session, send_from_directory import os from PIL import Image import io from functools import wraps import mimetypes import sqlite3 from datetime import datetime, date, timedelta import diskcache import json import geoip2.database from functools import lru_cache from urllib.parse import urlparse, unquote from werkzeug.middleware.proxy_fix import ProxyFix cache = diskcache.Cache('./filecache', size_limit= 48 * 1024**3) # 32 GB limit app = Flask(__name__) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1) app.config['SECRET_KEY'] = '85c1117eb3a5f2c79f0ff395bada8ff8d9a257b99ef5e143' app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=90) if os.environ.get('FLASK_ENV') == 'production': app.config['SESSION_COOKIE_SAMESITE'] = 'None' app.config['SESSION_COOKIE_SECURE'] = True def load_allowed_secrets(filename='allowed_secrets.json'): with open(filename) as f: secrets = json.load(f) for key, value in secrets.items(): if 'expiry' in value: value['expiry'] = datetime.strptime(value['expiry'], '%d.%m.%Y').date() return secrets def require_secret(f): @wraps(f) def decorated_function(*args, **kwargs): allowed_secrets = load_allowed_secrets() today = date.today() def is_valid(secret_data): expiry_date = secret_data.get('expiry') is_valid = expiry_date and today <= expiry_date return is_valid # Check if a secret was provided via GET parameter get_secret = request.args.get('secret') if get_secret is not None: secret_data = allowed_secrets.get(get_secret) if secret_data: if is_valid(secret_data): # Valid secret provided in URL: update session and config session['secret'] = get_secret session.permanent = True app.config['FILE_ROOT'] = secret_data.get('file_root') print("session:", session['secret']) return f(*args, **kwargs) else: # Secret provided via URL is expired or invalid return render_template('error.html', message="Invalid or expired secret."), 403 # If no secret provided via GET, check the session session_secret = session.get('secret') if session_secret is not None: secret_data = allowed_secrets.get(session_secret) if secret_data: if is_valid(secret_data): session.permanent = True app.config['FILE_ROOT'] = secret_data.get('file_root') return f(*args, **kwargs) else: # Session secret exists but is expired return render_template('error.html', message="Invalid or expired secret."), 403 # No secret provided at all; show the public index page return render_template('index.html') return decorated_function @lru_cache(maxsize=10) def get_cached_image(size): dimensions = tuple(map(int, size.split('-')[1].split('x'))) original_logo_path = os.path.join(app.root_path, 'static', 'logo.png') with Image.open(original_logo_path) as img: img = img.convert("RGBA") orig_width, orig_height = img.size if dimensions[0] >= orig_width and dimensions[1] >= orig_height: resized_img = img else: resized_img = img.copy() resized_img.thumbnail(dimensions, Image.LANCZOS) img_byte_arr = io.BytesIO() resized_img.save(img_byte_arr, format='PNG') return img_byte_arr.getvalue() @app.route('/static/icons/.png') def serve_resized_icon(size): cached_image_bytes = get_cached_image(size) return send_file( io.BytesIO(cached_image_bytes), mimetype='image/png' ) @app.route('/sw.js') def serve_sw(): return send_from_directory(os.path.join(app.root_path, 'static'), 'sw.js', mimetype='application/javascript') def list_directory_contents(directory, subpath): """ List only the immediate contents of the given directory. Also, if a "Transkription" subfolder exists, check for matching .md files for music files. Skip folders that start with a dot. """ directories = [] files = [] transcription_dir = os.path.join(directory, "Transkription") transcription_exists = os.path.isdir(transcription_dir) # Define allowed file extensions. allowed_music_exts = ('.mp3',) allowed_image_exts = ('.jpg', '.jpeg', '.png', '.gif', '.bmp') try: for item in sorted(os.listdir(directory)): # Skip hidden folders and files starting with a dot. if item.startswith('.'): continue full_path = os.path.join(directory, item) # Process directories. if os.path.isdir(full_path): # skip folder skip_folder = ["Transkription", "@eaDir"] if item in skip_folder: continue rel_path = os.path.join(subpath, item) if subpath else item rel_path = rel_path.replace(os.sep, '/') directories.append({'name': item, 'path': rel_path}) # Process files: either music or image files. elif os.path.isfile(full_path) and ( item.lower().endswith(allowed_music_exts) or item.lower().endswith(allowed_image_exts) ): rel_path = os.path.join(subpath, item) if subpath else item rel_path = rel_path.replace(os.sep, '/') # Determine the file type. if item.lower().endswith(allowed_music_exts): file_type = 'music' else: file_type = 'image' file_entry = {'name': item, 'path': rel_path, 'file_type': file_type} # Only check for transcription if it's a music file. if file_type == 'music' and transcription_exists: base_name = os.path.splitext(item)[0] transcript_filename = base_name + '.md' transcript_path = os.path.join(transcription_dir, transcript_filename) if os.path.isfile(transcript_path): file_entry['has_transcript'] = True transcript_rel_path = os.path.join(subpath, "Transkription", transcript_filename) if subpath else os.path.join("Transkription", transcript_filename) transcript_rel_path = transcript_rel_path.replace(os.sep, '/') file_entry['transcript_url'] = url_for('get_transcript', filename=transcript_rel_path) else: file_entry['has_transcript'] = False else: file_entry['has_transcript'] = False files.append(file_entry) except PermissionError: pass return directories, files def generate_breadcrumbs(subpath): breadcrumbs = [{'name': 'Home', 'path': ''}] if subpath: parts = subpath.split('/') path_accum = "" for part in parts: path_accum = f"{path_accum}/{part}" if path_accum else part breadcrumbs.append({'name': part, 'path': path_accum}) return breadcrumbs # API endpoint for AJAX: returns JSON for a given directory. @app.route('/api/path/', defaults={'subpath': ''}) @app.route('/api/path/') @require_secret def api_browse(subpath): file_root = app.config['FILE_ROOT'] directory = os.path.join(file_root, subpath.replace('/', os.sep)) if not os.path.isdir(directory): return jsonify({'error': 'Directory not found'}), 404 directories, files = list_directory_contents(directory, subpath) breadcrumbs = generate_breadcrumbs(subpath) return jsonify({ 'breadcrumbs': breadcrumbs, 'directories': directories, 'files': files }) def lookup_location(ip, reader): try: response = reader.city(ip) country = response.country.name if response.country.name else "Unknown" city = response.city.name if response.city.name else "Unknown" return country, city except Exception: return "Unknown", "Unknown" # Helper function to classify device type based on user agent string def get_device_type(user_agent): if 'Android' in user_agent: return 'Android' elif 'iPhone' in user_agent or 'iPad' in user_agent: return 'iOS' elif 'Windows' in user_agent: return 'Windows' elif 'Macintosh' in user_agent or 'Mac OS' in user_agent: return 'MacOS' elif 'Linux' in user_agent: return 'Linux' else: return 'Other' def shorten_referrer(url): segments = [seg for seg in url.split('/') if seg] segment = segments[-1] # Decode all percent-encoded characters (like %20, %2F, etc.) segment_decoded = unquote(segment) return segment_decoded @app.route("/dashboard") @require_secret def dashboard(): timeframe = request.args.get('timeframe', 'today') now = datetime.now() if timeframe == 'today': start = now.replace(hour=0, minute=0, second=0, microsecond=0) elif timeframe == '7days': start = now - timedelta(days=7) elif timeframe == '30days': start = now - timedelta(days=30) elif timeframe == '365days': start = now - timedelta(days=365) else: start = now.replace(hour=0, minute=0, second=0, microsecond=0) conn = sqlite3.connect('access_log.db') cursor = conn.cursor() # Raw file access counts for the table (top files) cursor.execute(''' SELECT full_path, COUNT(*) as access_count FROM file_access_log WHERE timestamp >= ? GROUP BY full_path ORDER BY access_count DESC LIMIT 20 ''', (start.isoformat(),)) rows = cursor.fetchall() # Daily access trend for a line chart cursor.execute(''' SELECT date(timestamp) as date, COUNT(*) as count FROM file_access_log WHERE timestamp >= ? GROUP BY date ORDER BY date ''', (start.isoformat(),)) daily_access_data = [dict(date=row[0], count=row[1]) for row in cursor.fetchall()] # Top files for bar chart (limit to 10) cursor.execute(''' SELECT full_path, COUNT(*) as access_count FROM file_access_log WHERE timestamp >= ? GROUP BY full_path ORDER BY access_count DESC LIMIT 10 ''', (start.isoformat(),)) top_files_data = [dict(full_path=row[0], access_count=row[1]) for row in cursor.fetchall()] # User agent distribution (aggregate by device type) cursor.execute(''' SELECT user_agent, COUNT(*) as count FROM file_access_log WHERE timestamp >= ? GROUP BY user_agent ORDER BY count DESC ''', (start.isoformat(),)) raw_user_agents = [dict(user_agent=row[0], count=row[1]) for row in cursor.fetchall()] device_counts = {} for entry in raw_user_agents: device = get_device_type(entry['user_agent']) device_counts[device] = device_counts.get(device, 0) + entry['count'] # Rename to user_agent_data for compatibility with the frontend user_agent_data = [dict(device=device, count=count) for device, count in device_counts.items()] # Referrer distribution (shorten links) cursor.execute(''' SELECT referrer, COUNT(*) as count FROM file_access_log WHERE timestamp >= ? GROUP BY referrer ORDER BY count DESC LIMIT 10 ''', (start.isoformat(),)) referrer_data = [] for row in cursor.fetchall(): raw_ref = row[0] shortened = shorten_referrer(raw_ref) if raw_ref else "Direct/None" referrer_data.append(dict(referrer=shortened, count=row[1])) # Aggregate IP addresses with counts cursor.execute(''' SELECT ip_address, COUNT(*) as count FROM file_access_log WHERE timestamp >= ? GROUP BY ip_address ORDER BY count DESC LIMIT 20 ''', (start.isoformat(),)) ip_rows = cursor.fetchall() # Initialize GeoIP2 reader once for efficiency reader = geoip2.database.Reader('GeoLite2-City.mmdb') ip_data = [] for ip, count in ip_rows: country, city = lookup_location(ip, reader) ip_data.append(dict(ip=ip, count=count, country=country, city=city)) reader.close() # Aggregate by city (ignoring entries without a city) city_counts = {} for entry in ip_data: if entry['city']: city_counts[entry['city']] = city_counts.get(entry['city'], 0) + entry['count'] city_data = [dict(city=city, count=count) for city, count in city_counts.items()] # Summary stats total_accesses = sum([row[1] for row in rows]) unique_files = len(rows) cursor.execute('SELECT COUNT(DISTINCT ip_address) FROM file_access_log WHERE timestamp >= ?', (start.isoformat(),)) unique_ips = cursor.fetchone()[0] conn.close() return render_template("dashboard.html", timeframe=timeframe, rows=rows, daily_access_data=daily_access_data, top_files_data=top_files_data, user_agent_data=user_agent_data, referrer_data=referrer_data, ip_data=ip_data, city_data=city_data, total_accesses=total_accesses, unique_files=unique_files, unique_ips=unique_ips) def log_file_access(full_path): """ Log file access details to a SQLite database. Records the timestamp, full file path, client IP, user agent, and referrer. """ # Connect to the database (this will create the file if it doesn't exist) conn = sqlite3.connect('access_log.db') cursor = conn.cursor() # Create the table if it doesn't exist cursor.execute(''' CREATE TABLE IF NOT EXISTS file_access_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT, full_path TEXT, ip_address TEXT, user_agent TEXT, referrer TEXT ) ''') # Gather information from the request timestamp = datetime.now().isoformat() ip_address = request.remote_addr user_agent = request.headers.get('User-Agent') referrer = request.headers.get('Referer') # Insert the access record into the database cursor.execute(''' INSERT INTO file_access_log (timestamp, full_path, ip_address, user_agent, referrer) VALUES (?, ?, ?, ?, ?) ''', (timestamp, full_path, ip_address, user_agent, referrer)) conn.commit() conn.close() @app.route("/media/") @require_secret def serve_file(filename): decoded_filename = unquote(filename).replace('/', os.sep) full_path = os.path.normpath(os.path.join(app.config['FILE_ROOT'], decoded_filename)) if not os.path.isfile(full_path): app.logger.error(f"File not found: {full_path}") return "File not found", 404 mime, _ = mimetypes.guess_type(full_path) mime = mime or 'application/octet-stream' if mime and mime.startswith('image/'): pass # do not log access to images else: # HEAD request are coming in to initiate server caching. # only log initial hits and not the reload of further file parts range_header = request.headers.get('Range') if request.method != 'HEAD' and (not range_header or range_header.startswith("bytes=0-")): log_file_access(full_path) # Check cache first (using diskcache) response = None cached = cache.get(filename) if cached: cached_file_bytes, mime = cached cached_file = io.BytesIO(cached_file_bytes) response = send_file(cached_file, mimetype=mime) else: if mime and mime.startswith('image/'): # Image processing branch (with caching) try: with Image.open(full_path) as img: img.thumbnail((1200, 1200)) img_bytes = io.BytesIO() img.save(img_bytes, format='PNG', quality=85) img_bytes = img_bytes.getvalue() cache.set(filename, (img_bytes, mime)) response = send_file(io.BytesIO(img_bytes), mimetype=mime) except Exception as e: app.logger.error(f"Image processing failed for {filename}: {e}") abort(500) else: # Cache non-image files: read bytes and cache try: with open(full_path, 'rb') as f: file_bytes = f.read() cache.set(filename, (file_bytes, mime)) response = send_file(io.BytesIO(file_bytes), mimetype=mime) except Exception as e: app.logger.error(f"Failed to read file {filename}: {e}") abort(500) # Set Cache-Control header (browser caching for 1 day) response.headers['Cache-Control'] = 'public, max-age=86400' return response @app.route("/transcript/") @require_secret def get_transcript(filename): fs_filename = filename.replace('/', os.sep) full_path = os.path.join(app.config['FILE_ROOT'], fs_filename) if not os.path.isfile(full_path): return "Transcription not found", 404 with open(full_path, 'r', encoding='utf-8') as f: content = f.read() return content, 200, {'Content-Type': 'text/markdown; charset=utf-8'} @app.route("/crawl/") @require_secret def crawl_and_cache(start_relative_path): """ Crawls through a directory (relative to app.config['FILE_ROOT']) and caches each file. For images, it creates a thumbnail (max 1200x1200) and caches the processed image. For non-images, it simply reads and caches the file bytes. :param start_relative_path: The folder (relative to FILE_ROOT) to start crawling. """ # Compute the absolute path for the starting directory base_dir = os.path.normpath(os.path.join(app.config['FILE_ROOT'], start_relative_path)) # Check that base_dir is under FILE_ROOT to prevent directory traversal if not base_dir.startswith(os.path.abspath(app.config['FILE_ROOT'])): return jsonify({"error": "Invalid path"}), 400 cached_files = [] # List to hold cached file relative paths # Walk through all subdirectories and files for root, dirs, files in os.walk(base_dir): for filename in files: full_path = os.path.join(root, filename) # Compute the relative key used for caching rel_key = os.path.relpath(full_path, app.config['FILE_ROOT']) # Skip if this file is already in the cache if cache.get(rel_key): continue # Determine the MIME type mime, _ = mimetypes.guess_type(full_path) mime = mime or 'application/octet-stream' # Process image files differently if mime.startswith('image/'): try: with Image.open(full_path) as img: # Create a thumbnail (max 1200x1200) img.thumbnail((1200, 1200)) img_bytes_io = io.BytesIO() # Save processed image as PNG img.save(img_bytes_io, format='PNG', quality=85) img_bytes = img_bytes_io.getvalue() # Cache the processed image bytes along with its mime type cache.set(rel_key, (img_bytes, mime)) cached_files.append(rel_key) except Exception as e: app.logger.error(f"Image processing failed for {rel_key}: {e}") else: # Process non-image files try: with open(full_path, 'rb') as f: file_bytes = f.read() cache.set(rel_key, (file_bytes, mime)) cached_files.append(rel_key) except Exception as e: app.logger.error(f"Failed to read file {rel_key}: {e}") # Return the list of cached files as a JSON response return json.dumps({"cached_files": cached_files}, indent=4), 200 # Catch-all route to serve the single-page application template. @app.route('/', defaults={'path': ''}) @app.route('/') @require_secret def index(path): return render_template("app.html") if __name__ == "__main__": app.run(debug=True, host='0.0.0.0')