from flask import Flask, render_template, request, redirect, url_for, session from functools import wraps from datetime import datetime, date, timedelta import io import os import json import qrcode import base64 from typing import Dict, Any import zlib import hmac import hashlib FOLDER_CONFIG_FILENAME = 'folder_secret_config.json' APP_CONFIG_FILENAME = 'app_config.json' # initial read of the config files with open(APP_CONFIG_FILENAME, 'r') as file: app_config = json.load(file) with open(FOLDER_CONFIG_FILENAME) as file: folder_config = json.load(file) # functions to be used by other modules def load_folder_config(): global folder_config with open(FOLDER_CONFIG_FILENAME) as file: folder_config = json.load(file) return folder_config def load_app_config(): global app_config with open(APP_CONFIG_FILENAME, 'r') as file: app_config = json.load(file) return app_config def return_folder_config(): return folder_config def return_app_config(): return app_config def is_admin(): """ Check if the user is an admin based on the session. """ return session.get('admin', False) def require_secret(f): @wraps(f) def decorated_function(*args, **kwargs): global app_config, folder_config def is_valid_secret(config_item, provided_secret): """ Checks if today's date is <= validity date AND if the provided secret matches config_item['secret']. """ folder_validity = config_item['validity'] # Convert string to a date if necessary: if isinstance(folder_validity, str): folder_validity = datetime.strptime(folder_validity, '%d.%m.%Y').date() # Return whether it's still valid and secrets match: return ( date.today() <= folder_validity and provided_secret == config_item['secret'] ) # 1) Get secretand token from query params (if any) args_secret = request.args.get('secret') args_token = request.args.get('token') # 2) Initialize 'valid_secrets' in the session if missing if 'valid_secrets' not in session: session['valid_secrets'] = [] if 'valid_tokens' not in session: session['valid_tokens'] = [] # 3) If a new secret is provided, check if it’s valid, and add to session if so if args_secret: for config_item in folder_config: if is_valid_secret(config_item, args_secret): if args_secret not in session['valid_secrets']: session['valid_secrets'].append(args_secret) session.permanent = True # Make the session permanent if args_token: if is_valid_token(args_token): if args_token not in session['valid_tokens']: session['valid_tokens'].append(args_token) session.permanent = True # Make the session permanent # 4) Re-check validity of each secret and token to also catch previous ones # If a secret is no longer valid (or not in config), remove it. for secret_in_session in session['valid_secrets'][:]: # Find the current config item with matching secret config_item = next( (c for c in folder_config if c['secret'] == secret_in_session), None ) # If the config item doesn’t exist or is invalid, remove secret if config_item is None or not is_valid_secret(config_item, secret_in_session): session['valid_secrets'].remove(secret_in_session) for token_in_session in session['valid_tokens'][:]: if not is_valid_token(token_in_session): session['valid_tokens'].remove(token_in_session) # 5) Build session['folders'] fresh from the valid secrets session['folders'] = {} for secret_in_session in session.get('valid_secrets', []): config_item = next( (c for c in folder_config if c['secret'] == secret_in_session), None ) if config_item: for folder_info in config_item['folders']: session['folders'][folder_info['foldername']] = folder_info['folderpath'] for token_in_session in session.get('valid_tokens', []): token_item = decode_token(token_in_session) for folder_info in token_item['folders']: session['folders'][folder_info['foldername']] = folder_info['folderpath'] args_admin = request.args.get('admin', None) config_admin = app_config.get('ADMIN_KEY', None) if args_admin and config_admin: if args_admin == config_admin: print(f"Admin access granted with key: {args_admin}") session['admin'] = True else: print(f"Admin access denied with key: {args_admin}") session['admin'] = False # 6) If we have folders, proceed; otherwise show index if session['folders']: # assume since visitor has a valid secret, they are ok with annonymous tracking # this is required to track the devices connecting over the same ip address if 'device_id' not in session: session['device_id'] = os.urandom(32).hex() return f(*args, **kwargs) else: title_short = app_config.get('TITLE_SHORT', 'Default Title') title_long = app_config.get('TITLE_LONG' , 'Default Title') header_color = app_config.get('header_color' , '#000') header_text_color = app_config.get('header_text_color', '#fff') background_color = app_config.get('background_color', '#fff') main_text_color = app_config.get('main_text_color', '#000') return render_template('permission.html', title_short=title_short, title_long=title_long, header_color=header_color, header_text_color=header_text_color, main_text_color=main_text_color, background_color=background_color, admin_enabled=is_admin() ), 403 # Forbidden return decorated_function def require_admin(f): @wraps(f) @require_secret def decorated_function(*args, **kwargs): if is_admin(): return f(*args, **kwargs) else: return "You don't have admin permission", 403 return decorated_function @require_admin def save_folder_config(data): global folder_config folder_config = data with open(FOLDER_CONFIG_FILENAME, 'w') as file: json.dump(folder_config, file, indent=4) return folder_config @require_secret def mylinks(): scheme = request.scheme # current scheme (http or https) valid_secrets = session.get('valid_secrets', []) valid_tokens = session.get('valid_tokens', []) host = request.host secret_qr_codes = {} secret_folders = {} secret_url = {} secret_valid_to = {} token_qr_codes = {} token_folders = {} token_url = {} token_valid_to = {} # Build a QR code for each secret (using the URL with the secret as query parameter) for secret in valid_secrets: url = f"{scheme}://{host}?secret={secret}" qr = qrcode.QRCode(version=1, box_size=10, border=4) qr.add_data(url) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") buffer = io.BytesIO() img.save(buffer, format="PNG") buffer.seek(0) img_base64 = base64.b64encode(buffer.getvalue()).decode('ascii') secret_qr_codes[secret] = img_base64 # Lookup folder info and valid-to date for this secret from the global folder_config. config_item = next((c for c in folder_config if c['secret'] == secret), None) if config_item: secret_folders[secret] = config_item.get('folders') secret_url[secret] = url secret_valid_to[secret] = config_item.get('validity', 'Unbekannt') for token in valid_tokens: url = f"{scheme}://{host}?token={token}" qr = qrcode.QRCode(version=1, box_size=10, border=4) qr.add_data(url) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") buffer = io.BytesIO() img.save(buffer, format="PNG") buffer.seek(0) img_base64 = base64.b64encode(buffer.getvalue()).decode('ascii') token_qr_codes[token] = img_base64 token_item = decode_token(token) token_folders[token] = token_item.get('folders') token_url[token] = url token_valid_to[token] = token_item.get('validity', 'Unbekannt') title_short = app_config.get('TITLE_SHORT', 'Default Title') title_long = app_config.get('TITLE_LONG' , 'Default Title') return render_template('mylinks.html', valid_secrets=valid_secrets, secret_qr_codes=secret_qr_codes, secret_folders=secret_folders, secret_url=secret_url, secret_valid_to=secret_valid_to, valid_tokens=valid_tokens, token_qr_codes=token_qr_codes, token_folders=token_folders, token_url=token_url, token_valid_to=token_valid_to, admin_enabled=is_admin(), title_short=title_short, title_long=title_long ) def remove_secret(): secret_to_remove = request.form.get('secret') valid_secrets = session.get('valid_secrets', []) if secret_to_remove in valid_secrets: valid_secrets.remove(secret_to_remove) session['valid_secrets'] = valid_secrets return redirect(url_for('mylinks')) def remove_token(): token_to_remove = request.form.get('token') valid_tokens = session.get('valid_tokens', []) if token_to_remove in valid_tokens: valid_tokens.remove(token_to_remove) session['valid_tokens'] = valid_tokens return redirect(url_for('mylinks')) def is_valid_token(secret_key: str, hmac_length: int = 32) -> bool: """ Check whether the token is authentic _and_ its payload's "validity" date has not yet passed. Returns True if: - signature matches (via decode_token) - the payload contains a "validity" field in DD.MM.YYYY format - today's date <= validity date Otherwise returns False. """ try: payload = decode_token(secret_key, hmac_length) except ValueError: return False validity = payload.get("validity") if not isinstance(validity, str): return False try: day, month, year = map(int, validity.split('.')) expiry = datetime(year, month, day).date() except Exception: # malformed date return False return datetime.now().date() <= expiry # Define your key map: long → short KEY_MAP = { "validity": "v", "folders": "f", "foldername": "n", "folderpath": "p", } # Build the inverse map automatically INV_KEY_MAP = {short: long for long, short in KEY_MAP.items()} def _shorten_keys(obj: Any, mapping: Dict[str,str]) -> Any: """Recursively replace dict keys via mapping.""" if isinstance(obj, dict): return { mapping.get(k, k): _shorten_keys(v, mapping) for k, v in obj.items() } elif isinstance(obj, list): return [_shorten_keys(v, mapping) for v in obj] else: return obj def generate_token( payload: dict, hmac_length: int = 32 ) -> str: """ Compress & sign a JSON payload, with short keys and variable HMAC slice. Args: payload: your original dict hmac_length: how many chars of the URL-safe base64 HMAC to include Returns: A token: base64(zlib(json_short)) + signature[:hmac_length] + length_hex(4) """ salt = app_config["SALT"] # 1) Shorten keys, dump to bytes short = _shorten_keys(payload, KEY_MAP) raw = json.dumps(short, separators=(',', ':'), ensure_ascii=False).encode('utf-8') # 2) Compress comp = zlib.compress(raw, level=9) # 3) Base64-url encode, strip padding encoded = base64.urlsafe_b64encode(comp).decode('utf-8').rstrip('=') # 4) 4-digit hex length of compressed bytes length_hex = f"{len(comp):04X}" # 5) HMAC over (encoded + length_hex) msg = (encoded + length_hex).encode('utf-8') digest = hmac.new(salt.encode(), msg, hashlib.sha256).digest() full_sig = base64.urlsafe_b64encode(digest).decode('utf-8') signature = full_sig[:hmac_length] return f"{encoded}{signature}{length_hex}" def decode_token( secret_key: str, hmac_length: int = 32 ) -> dict: """ Reverse of generate_token: - verify signature (using same hmac_length) - base64-decode → zlib-decompress → JSON - restore original keys """ salt = app_config["SALT"] min_len = hmac_length + 4 if len(secret_key) < min_len: raise ValueError("Key too short.") # Extract pieces length_hex = secret_key[-4:] signature = secret_key[-(hmac_length+4):-4] encoded = secret_key[:-(hmac_length+4)] try: comp_len = int(length_hex, 16) except ValueError: raise ValueError("Invalid length prefix.") # Verify HMAC msg = (encoded + length_hex).encode('utf-8') expected = hmac.new(salt.encode(), msg, hashlib.sha256).digest() full_sig = base64.urlsafe_b64encode(expected).decode('utf-8') if not hmac.compare_digest(signature, full_sig[:hmac_length]): raise ValueError("Invalid signature.") # Decode & decompress padding = '=' * (-len(encoded) % 4) comp = base64.urlsafe_b64decode(encoded + padding) if len(comp) != comp_len: raise ValueError("Compressed-length mismatch.") raw = zlib.decompress(comp).decode('utf-8') # Load JSON and restore long keys short = json.loads(raw) def _restore(obj: Any) -> Any: if isinstance(obj, dict): return { INV_KEY_MAP.get(k, k): _restore(v) for k, v in obj.items() } elif isinstance(obj, list): return [_restore(v) for v in obj] else: return obj return _restore(short) # --- example usage --- if __name__ == '__main__': payload = { "validity": "31.12.2025", "folders": [ { "foldername": "20.04.2025 Ostersonntag", "folderpath": r"\\192.168.10.10\docker2\sync-bethaus\syncfiles\folders\Gottesdienste Speyer\2025\04. April\20.04.2025 Ostersonntag" } ] } token = generate_token(payload) print("Token:", token) result = decode_token(token) print("Decoded payload:", result)