438 lines
15 KiB
Python
438 lines
15 KiB
Python
from flask import Flask, render_template, request, redirect, url_for, session
|
||
from functools import wraps
|
||
from datetime import datetime, date, timedelta
|
||
import io
|
||
import os
|
||
import json
|
||
import qrcode
|
||
import base64
|
||
from typing import Dict, Any
|
||
|
||
import zlib
|
||
import hmac
|
||
import hashlib
|
||
|
||
FOLDER_CONFIG_FILENAME = 'folder_secret_config.json'
|
||
APP_CONFIG_FILENAME = 'app_config.json'
|
||
|
||
# initial read of the config files
|
||
with open(APP_CONFIG_FILENAME, 'r') as file:
|
||
app_config = json.load(file)
|
||
|
||
with open(FOLDER_CONFIG_FILENAME) as file:
|
||
folder_config = json.load(file)
|
||
|
||
# functions to be used by other modules
|
||
def load_folder_config():
|
||
global folder_config
|
||
with open(FOLDER_CONFIG_FILENAME) as file:
|
||
folder_config = json.load(file)
|
||
return folder_config
|
||
|
||
def load_app_config():
|
||
global app_config
|
||
with open(APP_CONFIG_FILENAME, 'r') as file:
|
||
app_config = json.load(file)
|
||
return app_config
|
||
|
||
def return_folder_config():
|
||
return folder_config
|
||
|
||
def return_app_config():
|
||
return app_config
|
||
|
||
|
||
def is_admin():
|
||
"""
|
||
Check if the user is an admin based on the session.
|
||
"""
|
||
return session.get('admin', False)
|
||
|
||
|
||
def require_secret(f):
|
||
@wraps(f)
|
||
def decorated_function(*args, **kwargs):
|
||
global app_config, folder_config
|
||
|
||
def is_valid_secret(config_item, provided_secret):
|
||
"""
|
||
Checks if today's date is <= validity date
|
||
AND if the provided secret matches config_item['secret'].
|
||
"""
|
||
folder_validity = config_item['validity']
|
||
# Convert string to a date if necessary:
|
||
if isinstance(folder_validity, str):
|
||
folder_validity = datetime.strptime(folder_validity, '%d.%m.%Y').date()
|
||
|
||
# Return whether it's still valid and secrets match:
|
||
return (
|
||
date.today() <= folder_validity and
|
||
provided_secret == config_item['secret']
|
||
)
|
||
|
||
# 1) Get secretand token from query params (if any)
|
||
args_secret = request.args.get('secret')
|
||
args_token = request.args.get('token')
|
||
|
||
# 2) Initialize 'valid_secrets' in the session if missing
|
||
if 'valid_secrets' not in session:
|
||
session['valid_secrets'] = []
|
||
|
||
if 'valid_tokens' not in session:
|
||
session['valid_tokens'] = []
|
||
|
||
# 3) If a new secret is provided, check if it’s valid, and add to session if so
|
||
if args_secret:
|
||
for config_item in folder_config:
|
||
if is_valid_secret(config_item, args_secret):
|
||
if args_secret not in session['valid_secrets']:
|
||
session['valid_secrets'].append(args_secret)
|
||
session.permanent = True # Make the session permanent
|
||
|
||
if args_token:
|
||
if is_valid_token(args_token):
|
||
if args_token not in session['valid_tokens']:
|
||
session['valid_tokens'].append(args_token)
|
||
session.permanent = True # Make the session permanent
|
||
|
||
# 4) Re-check validity of each secret and token to also catch previous ones
|
||
# If a secret is no longer valid (or not in config), remove it.
|
||
for secret_in_session in session['valid_secrets'][:]:
|
||
# Find the current config item with matching secret
|
||
config_item = next(
|
||
(c for c in folder_config if c['secret'] == secret_in_session),
|
||
None
|
||
)
|
||
# If the config item doesn’t exist or is invalid, remove secret
|
||
if config_item is None or not is_valid_secret(config_item, secret_in_session):
|
||
session['valid_secrets'].remove(secret_in_session)
|
||
|
||
for token_in_session in session['valid_tokens'][:]:
|
||
if not is_valid_token(token_in_session):
|
||
session['valid_tokens'].remove(token_in_session)
|
||
|
||
# 5) Build session['folders'] fresh from the valid secrets
|
||
session['folders'] = {}
|
||
for secret_in_session in session.get('valid_secrets', []):
|
||
config_item = next(
|
||
(c for c in folder_config if c['secret'] == secret_in_session),
|
||
None
|
||
)
|
||
if config_item:
|
||
for folder_info in config_item['folders']:
|
||
session['folders'][folder_info['foldername']] = folder_info['folderpath']
|
||
|
||
for token_in_session in session.get('valid_tokens', []):
|
||
token_item = decode_secret_key_compressed(token_in_session)
|
||
for folder_info in token_item['folders']:
|
||
session['folders'][folder_info['foldername']] = folder_info['folderpath']
|
||
|
||
args_admin = request.args.get('admin', None)
|
||
config_admin = app_config.get('ADMIN_KEY', None)
|
||
|
||
if args_admin and config_admin:
|
||
if args_admin == config_admin:
|
||
print(f"Admin access granted with key: {args_admin}")
|
||
session['admin'] = True
|
||
else:
|
||
print(f"Admin access denied with key: {args_admin}")
|
||
session['admin'] = False
|
||
|
||
# 6) If we have folders, proceed; otherwise show index
|
||
if session['folders']:
|
||
# assume since visitor has a valid secret, they are ok with annonymous tracking
|
||
# this is required to track the devices connecting over the same ip address
|
||
if 'device_id' not in session:
|
||
session['device_id'] = os.urandom(32).hex()
|
||
return f(*args, **kwargs)
|
||
else:
|
||
title_short = app_config.get('TITLE_SHORT', 'Default Title')
|
||
title_long = app_config.get('TITLE_LONG' , 'Default Title')
|
||
header_color = app_config.get('header_color' , '#000')
|
||
header_text_color = app_config.get('header_text_color', '#fff')
|
||
background_color = app_config.get('background_color', '#fff')
|
||
main_text_color = app_config.get('main_text_color', '#000')
|
||
return render_template('index.html',
|
||
title_short=title_short,
|
||
title_long=title_long,
|
||
header_color=header_color,
|
||
header_text_color=header_text_color,
|
||
main_text_color=main_text_color,
|
||
background_color=background_color,
|
||
admin_enabled=is_admin()
|
||
)
|
||
return decorated_function
|
||
|
||
def require_admin(f):
|
||
@wraps(f)
|
||
@require_secret
|
||
def decorated_function(*args, **kwargs):
|
||
if is_admin():
|
||
return f(*args, **kwargs)
|
||
else:
|
||
return "You don't have admin permission", 403
|
||
return decorated_function
|
||
|
||
@require_admin
|
||
def save_folder_config(data):
|
||
global folder_config
|
||
folder_config = data
|
||
with open(FOLDER_CONFIG_FILENAME, 'w') as file:
|
||
json.dump(folder_config, file, indent=4)
|
||
return folder_config
|
||
|
||
@require_secret
|
||
def mylinks():
|
||
scheme = request.scheme # current scheme (http or https)
|
||
valid_secrets = session.get('valid_secrets', [])
|
||
valid_tokens = session.get('valid_tokens', [])
|
||
host = request.host
|
||
|
||
secret_qr_codes = {}
|
||
secret_folders = {}
|
||
secret_url = {}
|
||
secret_valid_to = {}
|
||
|
||
token_qr_codes = {}
|
||
token_folders = {}
|
||
token_url = {}
|
||
token_valid_to = {}
|
||
|
||
# Build a QR code for each secret (using the URL with the secret as query parameter)
|
||
for secret in valid_secrets:
|
||
url = f"{scheme}://{host}?secret={secret}"
|
||
qr = qrcode.QRCode(version=1, box_size=10, border=4)
|
||
qr.add_data(url)
|
||
qr.make(fit=True)
|
||
img = qr.make_image(fill_color="black", back_color="white")
|
||
buffer = io.BytesIO()
|
||
img.save(buffer, format="PNG")
|
||
buffer.seek(0)
|
||
img_base64 = base64.b64encode(buffer.getvalue()).decode('ascii')
|
||
secret_qr_codes[secret] = img_base64
|
||
|
||
# Lookup folder info and valid-to date for this secret from the global folder_config.
|
||
config_item = next((c for c in folder_config if c['secret'] == secret), None)
|
||
if config_item:
|
||
secret_folders[secret] = config_item.get('folders')
|
||
secret_url[secret] = url
|
||
secret_valid_to[secret] = config_item.get('validity', 'Unbekannt')
|
||
|
||
for token in valid_tokens:
|
||
url = f"{scheme}://{host}?token={token}"
|
||
qr = qrcode.QRCode(version=1, box_size=10, border=4)
|
||
qr.add_data(url)
|
||
qr.make(fit=True)
|
||
img = qr.make_image(fill_color="black", back_color="white")
|
||
buffer = io.BytesIO()
|
||
img.save(buffer, format="PNG")
|
||
buffer.seek(0)
|
||
img_base64 = base64.b64encode(buffer.getvalue()).decode('ascii')
|
||
token_qr_codes[token] = img_base64
|
||
|
||
token_item = decode_secret_key_compressed(token)
|
||
token_folders[token] = token_item.get('folders')
|
||
token_url[token] = url
|
||
token_valid_to[token] = token_item.get('validity', 'Unbekannt')
|
||
|
||
title_short = app_config.get('TITLE_SHORT', 'Default Title')
|
||
title_long = app_config.get('TITLE_LONG' , 'Default Title')
|
||
|
||
return render_template('mylinks.html',
|
||
valid_secrets=valid_secrets,
|
||
secret_qr_codes=secret_qr_codes,
|
||
secret_folders=secret_folders,
|
||
secret_url=secret_url,
|
||
secret_valid_to=secret_valid_to,
|
||
|
||
valid_tokens=valid_tokens,
|
||
token_qr_codes=token_qr_codes,
|
||
token_folders=token_folders,
|
||
token_url=token_url,
|
||
token_valid_to=token_valid_to,
|
||
admin_enabled=is_admin(),
|
||
|
||
title_short=title_short,
|
||
title_long=title_long
|
||
)
|
||
|
||
|
||
def remove_secret():
|
||
secret_to_remove = request.form.get('secret')
|
||
valid_secrets = session.get('valid_secrets', [])
|
||
if secret_to_remove in valid_secrets:
|
||
valid_secrets.remove(secret_to_remove)
|
||
session['valid_secrets'] = valid_secrets
|
||
return redirect(url_for('mylinks'))
|
||
|
||
def remove_token():
|
||
token_to_remove = request.form.get('token')
|
||
valid_tokens = session.get('valid_tokens', [])
|
||
if token_to_remove in valid_tokens:
|
||
valid_tokens.remove(token_to_remove)
|
||
session['valid_tokens'] = valid_tokens
|
||
return redirect(url_for('mylinks'))
|
||
|
||
def is_valid_token(secret_key: str, hmac_length: int = 32) -> bool:
|
||
"""
|
||
Check whether the token is authentic _and_ its payload's "validity"
|
||
date has not yet passed.
|
||
|
||
Returns True if:
|
||
- signature matches (via decode_secret_key_compressed)
|
||
- the payload contains a "validity" field in DD.MM.YYYY format
|
||
- today's date <= validity date
|
||
|
||
Otherwise returns False.
|
||
"""
|
||
try:
|
||
payload = decode_secret_key_compressed(secret_key, hmac_length)
|
||
except ValueError:
|
||
return False
|
||
|
||
validity = payload.get("validity")
|
||
if not isinstance(validity, str):
|
||
return False
|
||
|
||
try:
|
||
day, month, year = map(int, validity.split('.'))
|
||
expiry = datetime(year, month, day).date()
|
||
except Exception:
|
||
# malformed date
|
||
return False
|
||
|
||
return datetime.now().date() <= expiry
|
||
|
||
|
||
# Define your key map: long → short
|
||
KEY_MAP = {
|
||
"validity": "v",
|
||
"folders": "f",
|
||
"foldername": "n",
|
||
"folderpath": "p",
|
||
}
|
||
# Build the inverse map automatically
|
||
INV_KEY_MAP = {short: long for long, short in KEY_MAP.items()}
|
||
|
||
def _shorten_keys(obj: Any, mapping: Dict[str,str]) -> Any:
|
||
"""Recursively replace dict keys via mapping."""
|
||
if isinstance(obj, dict):
|
||
return {
|
||
mapping.get(k, k): _shorten_keys(v, mapping)
|
||
for k, v in obj.items()
|
||
}
|
||
elif isinstance(obj, list):
|
||
return [_shorten_keys(v, mapping) for v in obj]
|
||
else:
|
||
return obj
|
||
|
||
def generate_secret_key_compressed(
|
||
payload: dict,
|
||
hmac_length: int = 32
|
||
) -> str:
|
||
"""
|
||
Compress & sign a JSON payload, with short keys and variable HMAC slice.
|
||
|
||
Args:
|
||
payload: your original dict
|
||
hmac_length: how many chars of the URL-safe base64 HMAC to include
|
||
|
||
Returns:
|
||
A token: base64(zlib(json_short)) + signature[:hmac_length] + length_hex(4)
|
||
"""
|
||
salt = app_config["SALT"]
|
||
|
||
# 1) Shorten keys, dump to bytes
|
||
short = _shorten_keys(payload, KEY_MAP)
|
||
raw = json.dumps(short, separators=(',', ':'), ensure_ascii=False).encode('utf-8')
|
||
|
||
# 2) Compress
|
||
comp = zlib.compress(raw, level=9)
|
||
|
||
# 3) Base64-url encode, strip padding
|
||
encoded = base64.urlsafe_b64encode(comp).decode('utf-8').rstrip('=')
|
||
|
||
# 4) 4-digit hex length of compressed bytes
|
||
length_hex = f"{len(comp):04X}"
|
||
|
||
# 5) HMAC over (encoded + length_hex)
|
||
msg = (encoded + length_hex).encode('utf-8')
|
||
digest = hmac.new(salt.encode(), msg, hashlib.sha256).digest()
|
||
full_sig = base64.urlsafe_b64encode(digest).decode('utf-8')
|
||
signature = full_sig[:hmac_length]
|
||
|
||
return f"{encoded}{signature}{length_hex}"
|
||
|
||
|
||
def decode_secret_key_compressed(
|
||
secret_key: str,
|
||
hmac_length: int = 32
|
||
) -> dict:
|
||
"""
|
||
Reverse of generate_secret_key_compressed:
|
||
- verify signature (using same hmac_length)
|
||
- base64-decode → zlib-decompress → JSON
|
||
- restore original keys
|
||
"""
|
||
salt = app_config["SALT"]
|
||
min_len = hmac_length + 4
|
||
if len(secret_key) < min_len:
|
||
raise ValueError("Key too short.")
|
||
|
||
# Extract pieces
|
||
length_hex = secret_key[-4:]
|
||
signature = secret_key[-(hmac_length+4):-4]
|
||
encoded = secret_key[:-(hmac_length+4)]
|
||
|
||
try:
|
||
comp_len = int(length_hex, 16)
|
||
except ValueError:
|
||
raise ValueError("Invalid length prefix.")
|
||
|
||
# Verify HMAC
|
||
msg = (encoded + length_hex).encode('utf-8')
|
||
expected = hmac.new(salt.encode(), msg, hashlib.sha256).digest()
|
||
full_sig = base64.urlsafe_b64encode(expected).decode('utf-8')
|
||
if not hmac.compare_digest(signature, full_sig[:hmac_length]):
|
||
raise ValueError("Invalid signature.")
|
||
|
||
# Decode & decompress
|
||
padding = '=' * (-len(encoded) % 4)
|
||
comp = base64.urlsafe_b64decode(encoded + padding)
|
||
if len(comp) != comp_len:
|
||
raise ValueError("Compressed-length mismatch.")
|
||
raw = zlib.decompress(comp).decode('utf-8')
|
||
|
||
# Load JSON and restore long keys
|
||
short = json.loads(raw)
|
||
def _restore(obj: Any) -> Any:
|
||
if isinstance(obj, dict):
|
||
return {
|
||
INV_KEY_MAP.get(k, k): _restore(v)
|
||
for k, v in obj.items()
|
||
}
|
||
elif isinstance(obj, list):
|
||
return [_restore(v) for v in obj]
|
||
else:
|
||
return obj
|
||
|
||
return _restore(short)
|
||
|
||
|
||
# --- example usage ---
|
||
if __name__ == '__main__':
|
||
payload = {
|
||
"validity": "31.12.2025",
|
||
"folders": [
|
||
{
|
||
"foldername": "20.04.2025 Ostersonntag",
|
||
"folderpath": r"\\192.168.10.10\docker2\sync-bethaus\syncfiles\folders\Gottesdienste Speyer\2025\04. April\20.04.2025 Ostersonntag"
|
||
}
|
||
]
|
||
}
|
||
|
||
token = generate_secret_key_compressed(payload)
|
||
print("Token:", token)
|
||
|
||
result = decode_secret_key_compressed(token)
|
||
print("Decoded payload:", result) |