From 77b922132099c47ee014703b1f01c9464ee7e2d5 Mon Sep 17 00:00:00 2001 From: lelo Date: Thu, 1 May 2025 12:01:14 +0200 Subject: [PATCH] add token based foldershare --- app.py | 1 + auth.py | 297 ++++++++++++++++++++++++++++++++++++----- keygen.py | 100 -------------- templates/mylinks.html | 57 ++++++-- 4 files changed, 308 insertions(+), 147 deletions(-) delete mode 100644 keygen.py diff --git a/app.py b/app.py index 0eeaead..17674ea 100755 --- a/app.py +++ b/app.py @@ -43,6 +43,7 @@ app.add_url_rule('/dashboard', view_func=a.dashboard) app.add_url_rule('/connections', view_func=a.connections) app.add_url_rule('/mylinks', view_func=auth.mylinks) app.add_url_rule('/remove_secret', view_func=auth.remove_secret, methods=['POST']) +app.add_url_rule('/remove_token', view_func=auth.remove_token, methods=['POST']) app.add_url_rule('/search', view_func=search.search, methods=['GET']) app.add_url_rule('/searchcommand', view_func=search.searchcommand, methods=['POST']) diff --git a/auth.py b/auth.py index e3ba65c..8daa476 100644 --- a/auth.py +++ b/auth.py @@ -6,21 +6,27 @@ import os import json import qrcode import base64 +from typing import Dict, Any + +import zlib +import hmac +import hashlib + -folder_config = {} with open("app_config.json", 'r') as file: app_config = json.load(file) +with open('folder_permission_config.json') as file: + folder_config = json.load(file) + + def require_secret(f): @wraps(f) def decorated_function(*args, **kwargs): - global folder_config - if not folder_config: - with open('folder_permission_config.json') as file: - folder_config = json.load(file) + global app_config, folder_config - def is_valid(config_item, provided_secret): + def is_valid_secret(config_item, provided_secret): """ Checks if today's date is <= validity date AND if the provided secret matches config_item['secret']. @@ -36,36 +42,50 @@ def require_secret(f): provided_secret == config_item['secret'] ) - # 1) Get secret from query params (if any) + # 1) Get secretand token from query params (if any) args_secret = request.args.get('secret') + args_token = request.args.get('token') - # 2) Initialize 'allowed_secrets' in the session if missing - if 'allowed_secrets' not in session: - session['allowed_secrets'] = [] + # 2) Initialize 'valid_secrets' in the session if missing + if 'valid_secrets' not in session: + session['valid_secrets'] = [] + + if 'valid_tokens' not in session: + session['valid_tokens'] = [] # 3) If a new secret is provided, check if it’s valid, and add to session if so if args_secret: for config_item in folder_config: - if is_valid(config_item, args_secret): - if args_secret not in session['allowed_secrets']: - session['allowed_secrets'].append(args_secret) + if is_valid_secret(config_item, args_secret): + if args_secret not in session['valid_secrets']: + session['valid_secrets'].append(args_secret) session.permanent = True # Make the session permanent + + if args_token: + if is_valid_token(args_token): + if args_token not in session['valid_tokens']: + session['valid_tokens'].append(args_token) + session.permanent = True # Make the session permanent - # 4) Re-check validity of each secret in session['allowed_secrets'] + # 4) Re-check validity of each secret and token to also catch previous ones # If a secret is no longer valid (or not in config), remove it. - for secret_in_session in session['allowed_secrets'][:]: + for secret_in_session in session['valid_secrets'][:]: # Find the current config item with matching secret config_item = next( (c for c in folder_config if c['secret'] == secret_in_session), None ) # If the config item doesn’t exist or is invalid, remove secret - if config_item is None or not is_valid(config_item, secret_in_session): - session['allowed_secrets'].remove(secret_in_session) + if config_item is None or not is_valid_secret(config_item, secret_in_session): + session['valid_secrets'].remove(secret_in_session) + + for token_in_session in session['valid_tokens'][:]: + if not is_valid_token(token_in_session): + session['valid_tokens'].remove(token_in_session) # 5) Build session['folders'] fresh from the valid secrets session['folders'] = {} - for secret_in_session in session.get('allowed_secrets', []): + for secret_in_session in session.get('valid_secrets', []): config_item = next( (c for c in folder_config if c['secret'] == secret_in_session), None @@ -74,6 +94,11 @@ def require_secret(f): for folder_info in config_item['folders']: session['folders'][folder_info['foldername']] = folder_info['folderpath'] + for token_in_session in session.get('valid_tokens', []): + token_item = decode_secret_key_compressed(token_in_session) + for folder_info in token_item['folders']: + session['folders'][folder_info['foldername']] = folder_info['folderpath'] + # 6) If we have folders, proceed; otherwise show index if session['folders']: # assume since visitor has a valid secret, they are ok with annonymous tracking @@ -100,16 +125,24 @@ def require_secret(f): @require_secret def mylinks(): - allowed_secrets = session.get('allowed_secrets', []) + scheme = request.scheme # current scheme (http or https) + valid_secrets = session.get('valid_secrets', []) + valid_tokens = session.get('valid_tokens', []) host = request.host secret_qr_codes = {} secret_folders = {} + secret_url = {} secret_valid_to = {} + token_qr_codes = {} + token_folders = {} + token_url = {} + token_valid_to = {} + # Build a QR code for each secret (using the URL with the secret as query parameter) - for secret in allowed_secrets: - url = f"https://{host}?secret={secret}" + for secret in valid_secrets: + url = f"{scheme}://{host}?secret={secret}" qr = qrcode.QRCode(version=1, box_size=10, border=4) qr.add_data(url) qr.make(fit=True) @@ -123,28 +156,218 @@ def mylinks(): # Lookup folder info and valid-to date for this secret from the global folder_config. config_item = next((c for c in folder_config if c['secret'] == secret), None) if config_item: - secret_folders[secret] = config_item['folders'] + secret_folders[secret] = config_item.get('folders') + secret_url[secret] = url secret_valid_to[secret] = config_item.get('validity', 'Unbekannt') - else: - secret_folders[secret] = [] - secret_valid_to[secret] = 'Unbekannt' + + for token in valid_tokens: + url = f"{scheme}://{host}?token={token}" + qr = qrcode.QRCode(version=1, box_size=10, border=4) + qr.add_data(url) + qr.make(fit=True) + img = qr.make_image(fill_color="black", back_color="white") + buffer = io.BytesIO() + img.save(buffer, format="PNG") + buffer.seek(0) + img_base64 = base64.b64encode(buffer.getvalue()).decode('ascii') + token_qr_codes[token] = img_base64 + + token_item = decode_secret_key_compressed(token) + token_folders[token] = token_item.get('folders') + token_url[token] = url + token_valid_to[token] = token_item.get('validity', 'Unbekannt') return render_template('mylinks.html', - allowed_secrets=allowed_secrets, - secret_qr_codes=secret_qr_codes, - secret_folders=secret_folders, - secret_valid_to=secret_valid_to) + valid_secrets=valid_secrets, + secret_qr_codes=secret_qr_codes, + secret_folders=secret_folders, + secret_url=secret_url, + secret_valid_to=secret_valid_to, + + valid_tokens=valid_tokens, + token_qr_codes=token_qr_codes, + token_folders=token_folders, + token_url=token_url, + token_valid_to=token_valid_to + ) def remove_secret(): secret_to_remove = request.form.get('secret') - allowed_secrets = session.get('allowed_secrets', []) - if secret_to_remove in allowed_secrets: - allowed_secrets.remove(secret_to_remove) - session['allowed_secrets'] = allowed_secrets + valid_secrets = session.get('valid_secrets', []) + if secret_to_remove in valid_secrets: + valid_secrets.remove(secret_to_remove) + session['valid_secrets'] = valid_secrets return redirect(url_for('mylinks')) - return render_template('mylinks.html', - allowed_secrets=allowed_secrets, - secret_qr_codes=secret_qr_codes, - secret_folders=secret_folders) \ No newline at end of file +def remove_token(): + token_to_remove = request.form.get('token') + valid_tokens = session.get('valid_tokens', []) + if token_to_remove in valid_tokens: + valid_tokens.remove(token_to_remove) + session['valid_tokens'] = valid_tokens + return redirect(url_for('mylinks')) + +def is_valid_token(secret_key: str, hmac_length: int = 32) -> bool: + """ + Check whether the token is authentic _and_ its payload's "validity" + date has not yet passed. + + Returns True if: + - signature matches (via decode_secret_key_compressed) + - the payload contains a "validity" field in DD.MM.YYYY format + - today's date <= validity date + + Otherwise returns False. + """ + try: + payload = decode_secret_key_compressed(secret_key, hmac_length) + except ValueError: + return False + + validity = payload.get("validity") + if not isinstance(validity, str): + return False + + try: + day, month, year = map(int, validity.split('.')) + expiry = datetime(year, month, day).date() + except Exception: + # malformed date + return False + + return datetime.now().date() <= expiry + + +# Define your key map: long → short +KEY_MAP = { + "validity": "v", + "folders": "f", + "foldername": "n", + "folderpath": "p", +} +# Build the inverse map automatically +INV_KEY_MAP = {short: long for long, short in KEY_MAP.items()} + +def _shorten_keys(obj: Any, mapping: Dict[str,str]) -> Any: + """Recursively replace dict keys via mapping.""" + if isinstance(obj, dict): + return { + mapping.get(k, k): _shorten_keys(v, mapping) + for k, v in obj.items() + } + elif isinstance(obj, list): + return [_shorten_keys(v, mapping) for v in obj] + else: + return obj + +def generate_secret_key_compressed( + payload: dict, + hmac_length: int = 32 +) -> str: + """ + Compress & sign a JSON payload, with short keys and variable HMAC slice. + + Args: + payload: your original dict + hmac_length: how many chars of the URL-safe base64 HMAC to include + + Returns: + A token: base64(zlib(json_short)) + signature[:hmac_length] + length_hex(4) + """ + salt = app_config["SALT"] + + # 1) Shorten keys, dump to bytes + short = _shorten_keys(payload, KEY_MAP) + raw = json.dumps(short, separators=(',', ':'), ensure_ascii=False).encode('utf-8') + + # 2) Compress + comp = zlib.compress(raw, level=9) + + # 3) Base64-url encode, strip padding + encoded = base64.urlsafe_b64encode(comp).decode('utf-8').rstrip('=') + + # 4) 4-digit hex length of compressed bytes + length_hex = f"{len(comp):04X}" + + # 5) HMAC over (encoded + length_hex) + msg = (encoded + length_hex).encode('utf-8') + digest = hmac.new(salt.encode(), msg, hashlib.sha256).digest() + full_sig = base64.urlsafe_b64encode(digest).decode('utf-8') + signature = full_sig[:hmac_length] + + return f"{encoded}{signature}{length_hex}" + + +def decode_secret_key_compressed( + secret_key: str, + hmac_length: int = 32 +) -> dict: + """ + Reverse of generate_secret_key_compressed: + - verify signature (using same hmac_length) + - base64-decode → zlib-decompress → JSON + - restore original keys + """ + salt = app_config["SALT"] + min_len = hmac_length + 4 + if len(secret_key) < min_len: + raise ValueError("Key too short.") + + # Extract pieces + length_hex = secret_key[-4:] + signature = secret_key[-(hmac_length+4):-4] + encoded = secret_key[:-(hmac_length+4)] + + try: + comp_len = int(length_hex, 16) + except ValueError: + raise ValueError("Invalid length prefix.") + + # Verify HMAC + msg = (encoded + length_hex).encode('utf-8') + expected = hmac.new(salt.encode(), msg, hashlib.sha256).digest() + full_sig = base64.urlsafe_b64encode(expected).decode('utf-8') + if not hmac.compare_digest(signature, full_sig[:hmac_length]): + raise ValueError("Invalid signature.") + + # Decode & decompress + padding = '=' * (-len(encoded) % 4) + comp = base64.urlsafe_b64decode(encoded + padding) + if len(comp) != comp_len: + raise ValueError("Compressed-length mismatch.") + raw = zlib.decompress(comp).decode('utf-8') + + # Load JSON and restore long keys + short = json.loads(raw) + def _restore(obj: Any) -> Any: + if isinstance(obj, dict): + return { + INV_KEY_MAP.get(k, k): _restore(v) + for k, v in obj.items() + } + elif isinstance(obj, list): + return [_restore(v) for v in obj] + else: + return obj + + return _restore(short) + + +# --- example usage --- +if __name__ == '__main__': + payload = { + "validity": "31.12.2025", + "folders": [ + { + "foldername": "20.04.2025 Ostersonntag", + "folderpath": r"\\192.168.10.10\docker2\sync-bethaus\syncfiles\folders\Gottesdienste Speyer\2025\04. April\20.04.2025 Ostersonntag" + } + ] + } + + token = generate_secret_key_compressed(payload) + print("Token:", token) + + result = decode_secret_key_compressed(token) + print("Decoded payload:", result) \ No newline at end of file diff --git a/keygen.py b/keygen.py deleted file mode 100644 index 645162b..0000000 --- a/keygen.py +++ /dev/null @@ -1,100 +0,0 @@ -import hmac -import hashlib -import base64 - -salt = "UqEEf08yMDE3qIQodAGAyNQ2EPFhb26f2o85MTIyMeAAkqlANiXcF" - -def generate_secret_key(identifier: str, expiry: str) -> str: - """ - Generate a secret key with the following structure: - - encoded_data: Base64 encoding of (identifier + expiry) - - signature: 32 characters from the URL-safe base64 HMAC signature - - id_length_hex: 2-character hexadecimal number representing the length of the identifier - The HMAC is computed over (encoded_data + id_length_hex). - """ - global salt - if len(identifier) > 255: - raise ValueError("Identifier must be at most 255 characters long.") - if len(expiry) != 8: - raise ValueError("Expiry must be an 8-character string in DDMMYYYY format.") - - # Concatenate identifier and expiry, then base64 encode the result. - plain_data = identifier + expiry - # Remove padding for compactness; we'll add it back during decoding. - encoded_data = base64.urlsafe_b64encode(plain_data.encode()).decode().rstrip("=") - - # Format the length of the identifier as a 2-digit hexadecimal string. - id_length_hex = f"{len(identifier):02X}" - - # Build the message for HMAC: encoded_data + id_length_hex. - message = encoded_data + id_length_hex - - # Compute HMAC using SHA-256. - hmac_digest = hmac.new(salt.encode(), message.encode(), hashlib.sha256).digest() - - # Get a URL-safe base64 representation and take the first 32 characters as the signature. - signature = base64.urlsafe_b64encode(hmac_digest).decode()[:32] - - # Construct the final secret key. - secret_key = encoded_data + signature + id_length_hex - return secret_key - -def decode_secret_key(secret_key: str): - """ - Decode the secret key and extract the identifier and expiry. - - Token structure: - - encoded_data: secret_key[0 : (len(secret_key)-34)] (variable length) - - signature: secret_key[-34:-2] (32 characters) - - id_length_hex: secret_key[-2:] (2 characters) - - The HMAC is verified over (encoded_data + id_length_hex). Then, encoded_data is base64-decoded - (with proper padding added) to yield (identifier + expiry), where the last 8 characters represent expiry. - The identifier length is determined from id_length_hex. - """ - global salt - - if len(secret_key) < 34: # Must at least have signature (32) + id_length_hex (2) - raise ValueError("Invalid key length.") - - # Extract the last 2 characters: id_length_hex. - id_length_hex = secret_key[-2:] - try: - id_length = int(id_length_hex, 16) - except ValueError: - raise ValueError("Invalid identifier length prefix in key.") - - # The signature is the 32 characters preceding the last 2. - signature = secret_key[-34:-2] - # The remainder is the encoded_data. - encoded_data = secret_key[:-34] - - # Verify the signature. - message = encoded_data + id_length_hex - expected_hmac = hmac.new(salt.encode(), message.encode(), hashlib.sha256).digest() - expected_signature = base64.urlsafe_b64encode(expected_hmac).decode()[:32] - - if not hmac.compare_digest(signature, expected_signature): - raise ValueError("Invalid key signature.") - - # Base64-decode the encoded_data. - # Add back any missing '=' padding. - padding = '=' * (-len(encoded_data) % 4) - plain_data = base64.urlsafe_b64decode(encoded_data + padding).decode() - - # plain_data should be (identifier + expiry), where expiry is the last 8 characters. - if len(plain_data) != id_length + 8: - raise ValueError("Decoded data length does not match expected identifier length and expiry.") - - identifier = plain_data[:id_length] - expiry = plain_data[id_length:] - - return identifier, expiry - - -if __name__ == '__main__': - key = generate_secret_key("Besondere Gottesdienste", "30042025") - print(key) - identifier, expiry = decode_secret_key(key) - print("identifier:", identifier) - print("expiry:", expiry) diff --git a/templates/mylinks.html b/templates/mylinks.html index 030754f..abb3eac 100644 --- a/templates/mylinks.html +++ b/templates/mylinks.html @@ -41,9 +41,9 @@
- {% if allowed_secrets %} -
- {% for secret in allowed_secrets %} +
+ {% if valid_secrets %} + {% for secret in valid_secrets %}
QR Code for secret @@ -51,7 +51,7 @@
Geheimnis: {{ secret }}

- https://{{ request.host }}?secret={{ secret }} + {{ secret_url[secret] }}

@@ -77,12 +77,49 @@

{% endfor %} -
- {% else %} - - {% endif %} + {% endif %} + {% if valid_tokens %} + {% for token in valid_tokens %} +
+
+ QR Code for token +
+
Token-Link:
+

+ + {{ token_url[token] }} + +

+

+ Gültig bis: {{ token_valid_to[token] }} +

+
+ + +
+ {% if token_folders[token] %} +
Ordner
+
    + {% for folder in token_folders[token] %} +
  • + {{ folder.foldername }} +
  • + {% endfor %} +
+ {% else %} +

Keine Ordner für dieses Gemeimnis hinterlegt.

+ {% endif %} +
+
+
+ {% endfor %} + {% endif %} + {% if not valid_secrets and not valid_tokens %} + + {% endif %} +