bethaus-app/auth.py
2025-06-03 22:54:17 +02:00

444 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, render_template, request, redirect, url_for, session
from functools import wraps
from datetime import datetime, date, timedelta
import io
import os
import json
import qrcode
import base64
from typing import Dict, Any
import zlib
import hmac
import hashlib
FOLDER_CONFIG_FILENAME = 'folder_secret_config.json'
APP_CONFIG_FILENAME = 'app_config.json'
# initial read of the config files
with open(APP_CONFIG_FILENAME, 'r') as file:
app_config = json.load(file)
with open(FOLDER_CONFIG_FILENAME) as file:
folder_config = json.load(file)
# functions to be used by other modules
def load_folder_config():
global folder_config
with open(FOLDER_CONFIG_FILENAME) as file:
folder_config = json.load(file)
return folder_config
def load_app_config():
global app_config
with open(APP_CONFIG_FILENAME, 'r') as file:
app_config = json.load(file)
return app_config
def return_folder_config():
return folder_config
def return_app_config():
return app_config
def is_admin():
"""
Check if the user is an admin based on the session.
"""
return session.get('admin', False)
def require_secret(f):
@wraps(f)
def decorated_function(*args, **kwargs):
global app_config, folder_config
def is_valid_secret(config_item, provided_secret):
"""
Checks if today's date is <= validity date
AND if the provided secret matches config_item['secret'].
"""
folder_validity = config_item['validity']
# Convert string to a date if necessary:
if isinstance(folder_validity, str):
folder_validity = datetime.strptime(folder_validity, '%d.%m.%Y').date()
# Return whether it's still valid and secrets match:
return (
date.today() <= folder_validity and
provided_secret == config_item['secret']
)
# 1) Get secretand token from query params (if any)
args_secret = request.args.get('secret')
args_token = request.args.get('token')
args_dltoken = request.args.get('dltoken')
# 1b) immediately return if dltoken is provided
if args_dltoken:
if is_valid_token(args_dltoken):
return f(*args, **kwargs)
# 2) Initialize 'valid_secrets' in the session if missing
if 'valid_secrets' not in session:
session['valid_secrets'] = []
if 'valid_tokens' not in session:
session['valid_tokens'] = []
# 3) If a new secret is provided, check if its valid, and add to session if so
if args_secret:
for config_item in folder_config:
if is_valid_secret(config_item, args_secret):
if args_secret not in session['valid_secrets']:
session['valid_secrets'].append(args_secret)
session.permanent = True # Make the session permanent
if args_token:
if is_valid_token(args_token):
if args_token not in session['valid_tokens']:
session['valid_tokens'].append(args_token)
session.permanent = True # Make the session permanent
# 4) Re-check validity of each secret and token to also catch previous ones
# If a secret is no longer valid (or not in config), remove it.
for secret_in_session in session['valid_secrets'][:]:
# Find the current config item with matching secret
config_item = next(
(c for c in folder_config if c['secret'] == secret_in_session),
None
)
# If the config item doesnt exist or is invalid, remove secret
if config_item is None or not is_valid_secret(config_item, secret_in_session):
session['valid_secrets'].remove(secret_in_session)
for token_in_session in session['valid_tokens'][:]:
if not is_valid_token(token_in_session):
session['valid_tokens'].remove(token_in_session)
# 5) Build session['folders'] fresh from the valid secrets
session['folders'] = {}
for secret_in_session in session.get('valid_secrets', []):
config_item = next(
(c for c in folder_config if c['secret'] == secret_in_session),
None
)
if config_item:
for folder_info in config_item['folders']:
session['folders'][folder_info['foldername']] = folder_info['folderpath']
for token_in_session in session.get('valid_tokens', []):
token_item = decode_token(token_in_session)
for folder_info in token_item['folders']:
session['folders'][folder_info['foldername']] = folder_info['folderpath']
args_admin = request.args.get('admin', None)
config_admin = app_config.get('ADMIN_KEY', None)
if args_admin and config_admin:
if args_admin == config_admin:
print(f"Admin access granted with key: {args_admin}")
session['admin'] = True
else:
print(f"Admin access denied with key: {args_admin}")
session['admin'] = False
# 6) If we have folders, proceed; otherwise show index
if session['folders']:
# assume since visitor has a valid secret, they are ok with annonymous tracking
# this is required to track the devices connecting over the same ip address
if 'device_id' not in session:
session['device_id'] = os.urandom(32).hex()
return f(*args, **kwargs)
else:
title_short = app_config.get('TITLE_SHORT', 'Default Title')
title_long = app_config.get('TITLE_LONG' , 'Default Title')
header_color = app_config.get('header_color' , '#000')
header_text_color = app_config.get('header_text_color', '#fff')
background_color = app_config.get('background_color', '#fff')
main_text_color = app_config.get('main_text_color', '#000')
return render_template('permission.html',
title_short=title_short,
title_long=title_long,
header_color=header_color,
header_text_color=header_text_color,
main_text_color=main_text_color,
background_color=background_color,
admin_enabled=is_admin()
), 403 # Forbidden
return decorated_function
def require_admin(f):
@wraps(f)
@require_secret
def decorated_function(*args, **kwargs):
if is_admin():
return f(*args, **kwargs)
else:
return "You don't have admin permission", 403
return decorated_function
@require_admin
def save_folder_config(data):
global folder_config
folder_config = data
with open(FOLDER_CONFIG_FILENAME, 'w') as file:
json.dump(folder_config, file, indent=4)
return folder_config
@require_secret
def mylinks():
scheme = request.scheme # current scheme (http or https)
valid_secrets = session.get('valid_secrets', [])
valid_tokens = session.get('valid_tokens', [])
host = request.host
secret_qr_codes = {}
secret_folders = {}
secret_url = {}
secret_valid_to = {}
token_qr_codes = {}
token_folders = {}
token_url = {}
token_valid_to = {}
# Build a QR code for each secret (using the URL with the secret as query parameter)
for secret in valid_secrets:
url = f"{scheme}://{host}?secret={secret}"
qr = qrcode.QRCode(version=1, box_size=10, border=4)
qr.add_data(url)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = io.BytesIO()
img.save(buffer, format="PNG")
buffer.seek(0)
img_base64 = base64.b64encode(buffer.getvalue()).decode('ascii')
secret_qr_codes[secret] = img_base64
# Lookup folder info and valid-to date for this secret from the global folder_config.
config_item = next((c for c in folder_config if c['secret'] == secret), None)
if config_item:
secret_folders[secret] = config_item.get('folders')
secret_url[secret] = url
secret_valid_to[secret] = config_item.get('validity', 'Unbekannt')
for token in valid_tokens:
url = f"{scheme}://{host}?token={token}"
qr = qrcode.QRCode(version=1, box_size=10, border=4)
qr.add_data(url)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = io.BytesIO()
img.save(buffer, format="PNG")
buffer.seek(0)
img_base64 = base64.b64encode(buffer.getvalue()).decode('ascii')
token_qr_codes[token] = img_base64
token_item = decode_token(token)
token_folders[token] = token_item.get('folders')
token_url[token] = url
token_valid_to[token] = token_item.get('validity', 'Unbekannt')
title_short = app_config.get('TITLE_SHORT', 'Default Title')
title_long = app_config.get('TITLE_LONG' , 'Default Title')
return render_template('mylinks.html',
valid_secrets=valid_secrets,
secret_qr_codes=secret_qr_codes,
secret_folders=secret_folders,
secret_url=secret_url,
secret_valid_to=secret_valid_to,
valid_tokens=valid_tokens,
token_qr_codes=token_qr_codes,
token_folders=token_folders,
token_url=token_url,
token_valid_to=token_valid_to,
admin_enabled=is_admin(),
title_short=title_short,
title_long=title_long
)
def remove_secret():
secret_to_remove = request.form.get('secret')
valid_secrets = session.get('valid_secrets', [])
if secret_to_remove in valid_secrets:
valid_secrets.remove(secret_to_remove)
session['valid_secrets'] = valid_secrets
return redirect(url_for('mylinks'))
def remove_token():
token_to_remove = request.form.get('token')
valid_tokens = session.get('valid_tokens', [])
if token_to_remove in valid_tokens:
valid_tokens.remove(token_to_remove)
session['valid_tokens'] = valid_tokens
return redirect(url_for('mylinks'))
def is_valid_token(secret_key: str, hmac_length: int = 32) -> bool:
"""
Check whether the token is authentic _and_ its payload's "validity"
date has not yet passed.
Returns True if:
- signature matches (via decode_token)
- the payload contains a "validity" field in DD.MM.YYYY format
- today's date <= validity date
Otherwise returns False.
"""
try:
payload = decode_token(secret_key, hmac_length)
except ValueError:
return False
validity = payload.get("validity")
if not isinstance(validity, str):
return False
try:
day, month, year = map(int, validity.split('.'))
expiry = datetime(year, month, day).date()
except Exception:
# malformed date
return False
return datetime.now().date() <= expiry
# Define your key map: long → short
KEY_MAP = {
"validity": "v",
"folders": "f",
"foldername": "n",
"folderpath": "p",
}
# Build the inverse map automatically
INV_KEY_MAP = {short: long for long, short in KEY_MAP.items()}
def _shorten_keys(obj: Any, mapping: Dict[str,str]) -> Any:
"""Recursively replace dict keys via mapping."""
if isinstance(obj, dict):
return {
mapping.get(k, k): _shorten_keys(v, mapping)
for k, v in obj.items()
}
elif isinstance(obj, list):
return [_shorten_keys(v, mapping) for v in obj]
else:
return obj
def generate_token(
payload: dict,
hmac_length: int = 32
) -> str:
"""
Compress & sign a JSON payload, with short keys and variable HMAC slice.
Args:
payload: your original dict
hmac_length: how many chars of the URL-safe base64 HMAC to include
Returns:
A token: base64(zlib(json_short)) + signature[:hmac_length] + length_hex(4)
"""
salt = app_config["SALT"]
# 1) Shorten keys, dump to bytes
short = _shorten_keys(payload, KEY_MAP)
raw = json.dumps(short, separators=(',', ':'), ensure_ascii=False).encode('utf-8')
# 2) Compress
comp = zlib.compress(raw, level=9)
# 3) Base64-url encode, strip padding
encoded = base64.urlsafe_b64encode(comp).decode('utf-8').rstrip('=')
# 4) 4-digit hex length of compressed bytes
length_hex = f"{len(comp):04X}"
# 5) HMAC over (encoded + length_hex)
msg = (encoded + length_hex).encode('utf-8')
digest = hmac.new(salt.encode(), msg, hashlib.sha256).digest()
full_sig = base64.urlsafe_b64encode(digest).decode('utf-8')
signature = full_sig[:hmac_length]
return f"{encoded}{signature}{length_hex}"
def decode_token(
secret_key: str,
hmac_length: int = 32
) -> dict:
"""
Reverse of generate_token:
- verify signature (using same hmac_length)
- base64-decode → zlib-decompress → JSON
- restore original keys
"""
salt = app_config["SALT"]
min_len = hmac_length + 4
if len(secret_key) < min_len:
raise ValueError("Key too short.")
# Extract pieces
length_hex = secret_key[-4:]
signature = secret_key[-(hmac_length+4):-4]
encoded = secret_key[:-(hmac_length+4)]
try:
comp_len = int(length_hex, 16)
except ValueError:
raise ValueError("Invalid length prefix.")
# Verify HMAC
msg = (encoded + length_hex).encode('utf-8')
expected = hmac.new(salt.encode(), msg, hashlib.sha256).digest()
full_sig = base64.urlsafe_b64encode(expected).decode('utf-8')
if not hmac.compare_digest(signature, full_sig[:hmac_length]):
raise ValueError("Invalid signature.")
# Decode & decompress
padding = '=' * (-len(encoded) % 4)
comp = base64.urlsafe_b64decode(encoded + padding)
if len(comp) != comp_len:
raise ValueError("Compressed-length mismatch.")
raw = zlib.decompress(comp).decode('utf-8')
# Load JSON and restore long keys
short = json.loads(raw)
def _restore(obj: Any) -> Any:
if isinstance(obj, dict):
return {
INV_KEY_MAP.get(k, k): _restore(v)
for k, v in obj.items()
}
elif isinstance(obj, list):
return [_restore(v) for v in obj]
else:
return obj
return _restore(short)
# --- example usage ---
if __name__ == '__main__':
payload = {
"validity": "31.12.2025",
"folders": [
{
"foldername": "20.04.2025 Ostersonntag",
"folderpath": r"\\192.168.10.10\docker2\sync-bethaus\syncfiles\folders\Gottesdienste Speyer\2025\04. April\20.04.2025 Ostersonntag"
}
]
}
token = generate_token(payload)
print("Token:", token)
result = decode_token(token)
print("Decoded payload:", result)