1283 lines
44 KiB
Python
Executable File
1283 lines
44 KiB
Python
Executable File
import os
|
|
import sqlite3
|
|
|
|
# Use eventlet only in production; keep dev on threading to avoid monkey_patch issues with reloader
|
|
FLASK_ENV = os.environ.get('FLASK_ENV', 'production')
|
|
if FLASK_ENV == 'production':
|
|
import eventlet
|
|
eventlet.monkey_patch()
|
|
|
|
from flask import Flask, render_template, send_file, url_for, jsonify, request, session, send_from_directory, make_response, abort
|
|
from PIL import Image, ImageOps, ExifTags
|
|
import io
|
|
from functools import wraps
|
|
import mimetypes
|
|
from datetime import datetime, date, timedelta
|
|
import diskcache
|
|
import threading
|
|
from flask_socketio import SocketIO, emit
|
|
import geoip2.database
|
|
from functools import lru_cache
|
|
from urllib.parse import urlparse, unquote
|
|
from werkzeug.middleware.proxy_fix import ProxyFix
|
|
from pathlib import Path
|
|
import re
|
|
import qrcode
|
|
import base64
|
|
import json
|
|
import io
|
|
import search
|
|
import auth
|
|
import analytics as a
|
|
import folder_secret_config_editor as fsce
|
|
import helperfunctions as hf
|
|
import search_db_analyzer as sdb
|
|
import fnmatch
|
|
import openpyxl
|
|
from collections import OrderedDict
|
|
|
|
app_config = auth.return_app_config()
|
|
BASE_DIR = os.path.realpath(app_config['BASE_DIR'])
|
|
|
|
cache_audio = diskcache.Cache('./filecache_audio', size_limit= app_config['filecache_size_limit_audio'] * 1024**3)
|
|
cache_image = diskcache.Cache('./filecache_image', size_limit= app_config['filecache_size_limit_image'] * 1024**3)
|
|
cache_video = diskcache.Cache('./filecache_video', size_limit= app_config['filecache_size_limit_video'] * 1024**3)
|
|
cache_other = diskcache.Cache('./filecache_other', size_limit= app_config['filecache_size_limit_other'] * 1024**3)
|
|
|
|
_logged_request_ids = OrderedDict()
|
|
_logged_request_ids_lock = threading.Lock()
|
|
_LOGGED_REQUEST_IDS_MAX = 2048
|
|
|
|
|
|
def _is_duplicate_request(req_id: str) -> bool:
|
|
if not req_id:
|
|
return False
|
|
with _logged_request_ids_lock:
|
|
return req_id in _logged_request_ids
|
|
|
|
|
|
def _mark_request_logged(req_id: str):
|
|
if not req_id:
|
|
return
|
|
with _logged_request_ids_lock:
|
|
_logged_request_ids[req_id] = None
|
|
_logged_request_ids.move_to_end(req_id)
|
|
if len(_logged_request_ids) > _LOGGED_REQUEST_IDS_MAX:
|
|
_logged_request_ids.popitem(last=False)
|
|
|
|
app = Flask(__name__)
|
|
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1)
|
|
|
|
app.config['SECRET_KEY'] = app_config['SECRET_KEY']
|
|
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=90)
|
|
if FLASK_ENV == 'production':
|
|
app.config['SESSION_COOKIE_SAMESITE'] = 'None'
|
|
app.config['SESSION_COOKIE_SECURE'] = True
|
|
|
|
app.add_url_rule('/dashboard', view_func=auth.require_admin(a.dashboard))
|
|
app.add_url_rule('/file_access', view_func=auth.require_admin(a.file_access))
|
|
app.add_url_rule('/connections', view_func=auth.require_admin(a.connections))
|
|
app.add_url_rule('/mylinks', view_func=auth.require_secret(auth.mylinks))
|
|
app.add_url_rule('/songs_dashboard', view_func=auth.require_admin(a.songs_dashboard))
|
|
|
|
app.add_url_rule('/remove_secret', view_func=auth.remove_secret, methods=['POST'])
|
|
app.add_url_rule('/remove_token', view_func=auth.remove_token, methods=['POST'])
|
|
app.add_url_rule('/searchcommand', view_func=search.searchcommand, methods=['POST'])
|
|
|
|
app.add_url_rule('/admin/folder_secret_config_editor', view_func=auth.require_admin(fsce.folder_secret_config_editor), methods=['GET', 'POST'])
|
|
app.add_url_rule('/admin/folder_secret_config_editor/data', view_func=auth.require_admin(auth.load_folder_config))
|
|
app.add_url_rule('/admin/folder_secret_config_editor/action', view_func=auth.require_admin(fsce.folder_secret_config_action), methods=['POST'])
|
|
|
|
@app.route('/admin/generate_qr/<secret>')
|
|
@auth.require_admin
|
|
def generate_qr_code(secret):
|
|
scheme = request.scheme
|
|
host = request.host
|
|
url = f"{scheme}://{host}?secret={secret}"
|
|
qr = qrcode.QRCode(version=1, box_size=10, border=4)
|
|
qr.add_data(url)
|
|
qr.make(fit=True)
|
|
img = qr.make_image(fill_color="black", back_color="white")
|
|
buffer = io.BytesIO()
|
|
img.save(buffer, format="PNG")
|
|
buffer.seek(0)
|
|
img_base64 = base64.b64encode(buffer.getvalue()).decode('ascii')
|
|
return jsonify({'qr_code': img_base64})
|
|
|
|
app.add_url_rule('/admin/search_db_analyzer', view_func=auth.require_admin(sdb.search_db_analyzer))
|
|
app.add_url_rule('/admin/search_db_analyzer/query', view_func=auth.require_admin(sdb.search_db_query), methods=['POST'])
|
|
app.add_url_rule('/admin/search_db_analyzer/folders', view_func=auth.require_admin(sdb.search_db_folders))
|
|
|
|
# Messages/News routes
|
|
MESSAGES_FILENAME = 'messages.json'
|
|
CALENDAR_DB = 'calendar.db'
|
|
|
|
def load_messages():
|
|
"""Load messages from JSON file."""
|
|
try:
|
|
with open(MESSAGES_FILENAME, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
return []
|
|
|
|
def save_messages(messages):
|
|
"""Save messages to JSON file."""
|
|
with open(MESSAGES_FILENAME, 'w', encoding='utf-8') as f:
|
|
json.dump(messages, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
def init_calendar_db():
|
|
"""Ensure calendar table exists."""
|
|
conn = sqlite3.connect(CALENDAR_DB)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS calendar_entries (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
date TEXT NOT NULL,
|
|
time TEXT,
|
|
title TEXT NOT NULL,
|
|
location TEXT,
|
|
details TEXT,
|
|
created_at TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
# Ensure details column exists for legacy DBs
|
|
cols = [r[1] for r in conn.execute("PRAGMA table_info(calendar_entries)").fetchall()]
|
|
if 'details' not in cols:
|
|
conn.execute("ALTER TABLE calendar_entries ADD COLUMN details TEXT")
|
|
conn.commit()
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def serialize_calendar_row(row):
|
|
return {
|
|
'id': row['id'],
|
|
'date': row['date'],
|
|
'time': row['time'] or '',
|
|
'title': row['title'],
|
|
'location': row['location'] or '',
|
|
'details': row['details'] or ''
|
|
}
|
|
|
|
|
|
init_calendar_db()
|
|
|
|
@app.route('/api/messages', methods=['GET'])
|
|
@auth.require_secret
|
|
def get_messages():
|
|
"""Get all messages."""
|
|
messages = load_messages()
|
|
# Sort by datetime descending (newest first)
|
|
messages.sort(key=lambda x: x.get('datetime', ''), reverse=True)
|
|
return jsonify(messages)
|
|
|
|
@app.route('/api/messages', methods=['POST'])
|
|
@auth.require_admin
|
|
def create_message():
|
|
"""Create a new message."""
|
|
data = request.get_json()
|
|
messages = load_messages()
|
|
|
|
# Generate new ID
|
|
new_id = max([m.get('id', 0) for m in messages], default=0) + 1
|
|
|
|
new_message = {
|
|
'id': new_id,
|
|
'title': data.get('title', ''),
|
|
'content': data.get('content', ''),
|
|
'datetime': data.get('datetime', datetime.now().isoformat())
|
|
}
|
|
|
|
messages.append(new_message)
|
|
save_messages(messages)
|
|
|
|
return jsonify(new_message), 201
|
|
|
|
@app.route('/api/messages/<int:message_id>', methods=['PUT'])
|
|
@auth.require_admin
|
|
def update_message(message_id):
|
|
"""Update an existing message."""
|
|
data = request.get_json()
|
|
messages = load_messages()
|
|
|
|
for message in messages:
|
|
if message['id'] == message_id:
|
|
message['title'] = data.get('title', message['title'])
|
|
message['content'] = data.get('content', message['content'])
|
|
message['datetime'] = data.get('datetime', message['datetime'])
|
|
save_messages(messages)
|
|
return jsonify(message)
|
|
|
|
return jsonify({'error': 'Message not found'}), 404
|
|
|
|
@app.route('/api/messages/<int:message_id>', methods=['DELETE'])
|
|
@auth.require_admin
|
|
def delete_message(message_id):
|
|
"""Delete a message."""
|
|
messages = load_messages()
|
|
messages = [m for m in messages if m['id'] != message_id]
|
|
save_messages(messages)
|
|
|
|
return jsonify({'success': True})
|
|
|
|
|
|
@app.route('/api/calendar/<int:entry_id>', methods=['PUT'])
|
|
@auth.require_admin
|
|
def update_calendar_entry(entry_id):
|
|
"""Update a calendar entry."""
|
|
data = request.get_json() or {}
|
|
entry_date = data.get('date')
|
|
title = data.get('title')
|
|
if not entry_date or not title:
|
|
return jsonify({'error': 'date and title are required'}), 400
|
|
|
|
time_value = data.get('time') or ''
|
|
location = data.get('location') or ''
|
|
details = data.get('details') or ''
|
|
|
|
conn = sqlite3.connect(CALENDAR_DB)
|
|
conn.row_factory = sqlite3.Row
|
|
cur = conn.execute(
|
|
"""
|
|
UPDATE calendar_entries
|
|
SET date = ?, time = ?, title = ?, location = ?, details = ?
|
|
WHERE id = ?
|
|
""",
|
|
(entry_date, time_value, title, location, details, entry_id)
|
|
)
|
|
if cur.rowcount == 0:
|
|
conn.close()
|
|
return jsonify({'error': 'not found'}), 404
|
|
|
|
row = conn.execute(
|
|
"SELECT id, date, time, title, location, details FROM calendar_entries WHERE id = ?",
|
|
(entry_id,)
|
|
).fetchone()
|
|
conn.commit()
|
|
conn.close()
|
|
return jsonify(serialize_calendar_row(row))
|
|
|
|
|
|
@app.route('/api/calendar/<int:entry_id>', methods=['DELETE'])
|
|
@auth.require_admin
|
|
def delete_calendar_entry(entry_id):
|
|
"""Delete a calendar entry."""
|
|
conn = sqlite3.connect(CALENDAR_DB)
|
|
cur = conn.execute("DELETE FROM calendar_entries WHERE id = ?", (entry_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
if cur.rowcount == 0:
|
|
return jsonify({'error': 'not found'}), 404
|
|
return jsonify({'success': True})
|
|
|
|
|
|
@app.route('/api/calendar/export', methods=['GET'])
|
|
@auth.require_admin
|
|
def export_calendar():
|
|
"""Export all calendar entries to an Excel file."""
|
|
conn = sqlite3.connect(CALENDAR_DB)
|
|
conn.row_factory = sqlite3.Row
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT date, time, title, location, details
|
|
FROM calendar_entries
|
|
ORDER BY date ASC, time ASC
|
|
"""
|
|
).fetchall()
|
|
conn.close()
|
|
|
|
wb = openpyxl.Workbook()
|
|
ws = wb.active
|
|
ws.title = "Kalender"
|
|
headers = ["date", "time", "title", "location", "details"]
|
|
ws.append(headers)
|
|
|
|
def fmt_date(date_str):
|
|
try:
|
|
return datetime.strptime(date_str, '%Y-%m-%d').strftime('%d.%m.%Y')
|
|
except Exception:
|
|
return date_str
|
|
|
|
for row in rows:
|
|
ws.append([
|
|
fmt_date(row['date']),
|
|
row['time'] or '',
|
|
row['title'],
|
|
row['location'] or '',
|
|
row['details'] or ''
|
|
])
|
|
|
|
output = io.BytesIO()
|
|
wb.save(output)
|
|
output.seek(0)
|
|
|
|
response = make_response(output.getvalue())
|
|
response.headers['Content-Disposition'] = 'attachment; filename=kalender.xlsx'
|
|
response.headers['Content-Type'] = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
|
|
return response
|
|
|
|
|
|
@app.route('/api/calendar/import', methods=['POST'])
|
|
@auth.require_admin
|
|
def import_calendar():
|
|
"""Import calendar entries from an uploaded Excel file, replacing existing entries."""
|
|
if 'file' not in request.files or request.files['file'].filename == '':
|
|
return jsonify({'error': 'No file uploaded'}), 400
|
|
|
|
file = request.files['file']
|
|
try:
|
|
wb = openpyxl.load_workbook(file, data_only=True)
|
|
except Exception as e:
|
|
return jsonify({'error': f'Invalid Excel file: {e}'}), 400
|
|
|
|
ws = wb.active
|
|
headers = [str(cell.value).strip().lower() if cell.value else '' for cell in next(ws.iter_rows(min_row=1, max_row=1, values_only=False))]
|
|
required = {'date', 'title'}
|
|
optional = {'time', 'location', 'details'}
|
|
all_allowed = required | optional
|
|
|
|
header_map = {}
|
|
for idx, h in enumerate(headers):
|
|
if h in all_allowed and h not in header_map:
|
|
header_map[h] = idx
|
|
|
|
if not required.issubset(set(header_map.keys())):
|
|
return jsonify({'error': 'Header must include at least date and title columns'}), 400
|
|
|
|
entries = []
|
|
for row in ws.iter_rows(min_row=2, values_only=True):
|
|
entry = {}
|
|
for key, col_idx in header_map.items():
|
|
value = row[col_idx] if col_idx < len(row) else ''
|
|
entry[key] = value if value is not None else ''
|
|
# Normalize
|
|
entry_date_raw = entry.get('date', '')
|
|
entry_date = str(entry_date_raw).strip()
|
|
if not entry_date:
|
|
continue
|
|
try:
|
|
# Allow Excel date cells
|
|
if isinstance(entry_date_raw, (datetime, date)):
|
|
entry_date = entry_date_raw.strftime('%Y-%m-%d')
|
|
elif '.' in entry_date:
|
|
entry_date = datetime.strptime(entry_date, '%d.%m.%Y').strftime('%Y-%m-%d')
|
|
else:
|
|
# Expect yyyy-mm-dd
|
|
datetime.strptime(entry_date, '%Y-%m-%d')
|
|
except Exception:
|
|
return jsonify({'error': f'Invalid date format: {entry_date}'}), 400
|
|
|
|
entries.append({
|
|
'date': entry_date,
|
|
'time': str(entry.get('time', '') or '').strip(),
|
|
'title': str(entry.get('title', '') or '').strip(),
|
|
'location': str(entry.get('location', '') or '').strip(),
|
|
'details': str(entry.get('details', '') or '').strip()
|
|
})
|
|
|
|
conn = sqlite3.connect(CALENDAR_DB)
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("DELETE FROM calendar_entries")
|
|
cur.executemany(
|
|
"""
|
|
INSERT INTO calendar_entries (date, time, title, location, details, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
[
|
|
(
|
|
e['date'],
|
|
e['time'],
|
|
e['title'],
|
|
e['location'],
|
|
e['details'],
|
|
datetime.utcnow().isoformat()
|
|
) for e in entries
|
|
]
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
return jsonify({'success': True, 'count': len(entries)})
|
|
|
|
@app.route('/api/calendar', methods=['GET'])
|
|
@auth.require_secret
|
|
def get_calendar_entries():
|
|
"""Return calendar entries between start and end date (inclusive)."""
|
|
start = request.args.get('start') or date.today().isoformat()
|
|
end = request.args.get('end') or (date.today() + timedelta(weeks=4)).isoformat()
|
|
|
|
conn = sqlite3.connect(CALENDAR_DB)
|
|
conn.row_factory = sqlite3.Row
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT id, date, time, title, location, details
|
|
FROM calendar_entries
|
|
WHERE date BETWEEN ? AND ?
|
|
ORDER BY date ASC, time ASC
|
|
""",
|
|
(start, end)
|
|
).fetchall()
|
|
conn.close()
|
|
|
|
return jsonify([serialize_calendar_row(r) for r in rows])
|
|
|
|
|
|
@app.route('/api/calendar', methods=['POST'])
|
|
@auth.require_admin
|
|
def create_calendar_entry():
|
|
"""Store a new calendar entry."""
|
|
data = request.get_json() or {}
|
|
entry_date = data.get('date')
|
|
title = data.get('title')
|
|
if not entry_date or not title:
|
|
return jsonify({'error': 'date and title are required'}), 400
|
|
|
|
time_value = data.get('time') or ''
|
|
location = data.get('location') or ''
|
|
details = data.get('details') or ''
|
|
|
|
conn = sqlite3.connect(CALENDAR_DB)
|
|
cur = conn.execute(
|
|
"""
|
|
INSERT INTO calendar_entries (date, time, title, location, details, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(entry_date, time_value, title, location, details, datetime.utcnow().isoformat())
|
|
)
|
|
conn.commit()
|
|
new_id = cur.lastrowid
|
|
conn.close()
|
|
|
|
return jsonify({
|
|
'id': new_id,
|
|
'date': entry_date,
|
|
'time': time_value,
|
|
'title': title,
|
|
'location': location,
|
|
'details': details
|
|
}), 201
|
|
|
|
# Grab the HOST_RULE environment variable
|
|
host_rule = os.getenv("HOST_RULE", "")
|
|
# Use a regex to extract domain names between backticks in patterns like Host(`something`)
|
|
pattern = r"Host\(`([^`]+)`\)"
|
|
allowed_domains = re.findall(pattern, host_rule)
|
|
|
|
socketio = SocketIO(
|
|
app,
|
|
async_mode='eventlet' if FLASK_ENV == 'production' else 'threading',
|
|
cors_allowed_origins=allowed_domains
|
|
)
|
|
background_thread_running = False
|
|
|
|
# Global variables to track the number of connected clients and the background thread
|
|
clients_connected = 0
|
|
background_thread = None
|
|
thread_lock = threading.Lock()
|
|
|
|
@lru_cache(maxsize=10)
|
|
def get_cached_image(size):
|
|
dimensions = tuple(map(int, size.split('-')[1].split('x')))
|
|
original_logo_path = os.path.join(app.root_path, 'custom_logo', 'logoB.png')
|
|
|
|
with Image.open(original_logo_path) as img:
|
|
img = img.convert("RGBA")
|
|
|
|
orig_width, orig_height = img.size
|
|
|
|
if dimensions[0] >= orig_width and dimensions[1] >= orig_height:
|
|
resized_img = img
|
|
else:
|
|
resized_img = img.copy()
|
|
resized_img.thumbnail(dimensions, Image.LANCZOS)
|
|
|
|
img_byte_arr = io.BytesIO()
|
|
resized_img.save(img_byte_arr, format='PNG')
|
|
return img_byte_arr.getvalue()
|
|
|
|
def check_path(access_path: str) -> Path:
|
|
"""
|
|
Take an absolute access_path, then ensure it lives inside BASE_DIR.
|
|
Raises ValueError or PermissionError on failure.
|
|
"""
|
|
p = Path(access_path)
|
|
if not p.is_absolute():
|
|
raise ValueError(f"Path {access_path} is not a valid absolute path")
|
|
|
|
# Resolve symlinks & eliminate “..” components
|
|
candidate = p.resolve()
|
|
base = Path(BASE_DIR).resolve()
|
|
|
|
try:
|
|
# Will raise ValueError if candidate is not under base
|
|
candidate.relative_to(base)
|
|
except ValueError:
|
|
raise PermissionError(f"Access to {access_path} is forbidden")
|
|
|
|
return candidate
|
|
|
|
def list_directory_contents(directory, subpath):
|
|
"""
|
|
List only the immediate contents of the given directory.
|
|
Also, if a "Transkription" subfolder exists, check for matching .md files for music files.
|
|
Skip folders that start with a dot.
|
|
"""
|
|
directories = []
|
|
files = []
|
|
folder_config = auth.return_folder_config()
|
|
transcription_dir = os.path.join(directory, "Transkription")
|
|
transcription_exists = os.path.isdir(transcription_dir)
|
|
|
|
direct_directories = []
|
|
for item in folder_config:
|
|
for folder in item['folders']:
|
|
direct_directories.append(folder['foldername'])
|
|
|
|
# Define allowed file extensions.
|
|
music_exts = ('.mp3',)
|
|
image_exts = ('.jpg', '.jpeg', '.png', '.gif', '.bmp')
|
|
|
|
blocked_filenames = [
|
|
'Thumbs.db',
|
|
'*.mrk', '*.md',
|
|
'*.arw', '*.cr2', '*.lrf',
|
|
'*.mov', '*.mp4', '*.avi', '*.mkv'
|
|
]
|
|
|
|
|
|
try:
|
|
with os.scandir(directory) as it:
|
|
# Sorting by name if required.
|
|
for entry in sorted(it, key=lambda e: e.name):
|
|
# Skip hidden files and directories.
|
|
if entry.name.startswith('.'):
|
|
continue
|
|
|
|
# Skip blocked_filenames using fnmatch for wildcards
|
|
if any(fnmatch.fnmatch(entry.name, pattern) for pattern in blocked_filenames):
|
|
continue
|
|
|
|
if entry.is_dir(follow_symlinks=False):
|
|
# skip system/hidden helper folders
|
|
if entry.name in ["Transkription", "@eaDir", ".app", "#recycle"]:
|
|
continue
|
|
rel_path = os.path.join(subpath, entry.name) if subpath else entry.name
|
|
|
|
# check if path is inside a direct directory --> shareable
|
|
if subpath.split('/')[0] in direct_directories:
|
|
allow_share = True
|
|
else:
|
|
allow_share = False
|
|
|
|
# build directory entry
|
|
directories.append({'name': entry.name, 'path': rel_path.replace(os.sep, '/'), 'share': allow_share})
|
|
|
|
elif entry.is_file(follow_symlinks=False):
|
|
lower_name = entry.name.lower()
|
|
|
|
# implement file type filtering here !!!
|
|
#if lower_name.endswith(music_exts) or lower_name.endswith(image_exts):
|
|
rel_path = os.path.join(subpath, entry.name) if subpath else entry.name
|
|
if lower_name.endswith(music_exts):
|
|
file_type = 'music'
|
|
elif lower_name.endswith(image_exts):
|
|
file_type = 'image'
|
|
else:
|
|
file_type = 'other'
|
|
|
|
# build file entry
|
|
file_entry = {'name': entry.name, 'path': rel_path.replace(os.sep, '/'), 'file_type': file_type}
|
|
|
|
# Only check for transcription if it's a audio file.
|
|
if file_type == 'music' and transcription_exists:
|
|
base_name = os.path.splitext(entry.name)[0]
|
|
transcript_filename = base_name + '.md'
|
|
transcript_path = os.path.join(transcription_dir, transcript_filename)
|
|
if os.path.isfile(transcript_path):
|
|
file_entry['has_transcript'] = True
|
|
transcript_rel_path = os.path.join(subpath, "Transkription", transcript_filename) if subpath else os.path.join("Transkription", transcript_filename)
|
|
file_entry['transcript_url'] = url_for('get_transcript', subpath=transcript_rel_path.replace(os.sep, '/'))
|
|
else:
|
|
file_entry['has_transcript'] = False
|
|
else:
|
|
file_entry['has_transcript'] = False
|
|
files.append(file_entry)
|
|
except PermissionError:
|
|
pass
|
|
|
|
return directories, files
|
|
|
|
|
|
def generate_breadcrumbs(subpath=None):
|
|
breadcrumbs = [{'name': 'Home', 'path': ''}]
|
|
if subpath:
|
|
parts = subpath.split('/')
|
|
path_accum = ""
|
|
for part in parts:
|
|
path_accum = f"{path_accum}/{part}" if path_accum else part
|
|
if 'toplist' in part:
|
|
part = part.replace('toplist', 'oft angehört')
|
|
breadcrumbs.append({'name': part, 'path': path_accum})
|
|
return breadcrumbs
|
|
|
|
|
|
def human_readable_size(num_bytes):
|
|
"""
|
|
Convert a byte count into a human-readable string (e.g., 1.2 MB).
|
|
"""
|
|
if num_bytes is None:
|
|
return "-"
|
|
try:
|
|
num = float(num_bytes)
|
|
except (TypeError, ValueError):
|
|
return "-"
|
|
units = ['B', 'KB', 'MB', 'GB', 'TB']
|
|
for unit in units:
|
|
if num < 1024 or unit == units[-1]:
|
|
return f"{num:.1f} {unit}" if num % 1 else f"{int(num)} {unit}"
|
|
num /= 1024
|
|
|
|
@app.route('/icon/<string:size>.png')
|
|
@app.route('/icons/<string:size>.png') # legacy path
|
|
def serve_resized_icon(size):
|
|
cached_image_bytes = get_cached_image(size)
|
|
response = send_file(
|
|
io.BytesIO(cached_image_bytes),
|
|
mimetype='image/png'
|
|
)
|
|
response.headers['Cache-Control'] = 'public, max-age=86400, immutable'
|
|
return response
|
|
|
|
@app.route('/custom_logo/<string:filename>.png')
|
|
def custom_logo(filename):
|
|
file_path = os.path.join('custom_logo', f"{filename}.png")
|
|
if not os.path.exists(file_path):
|
|
abort(404)
|
|
with open(file_path, 'rb') as file:
|
|
image_data = file.read()
|
|
|
|
# Create a BytesIO object using the binary image data
|
|
image_io = io.BytesIO(image_data)
|
|
image_io.seek(0) # Important: reset the stream position
|
|
|
|
response = send_file(image_io, mimetype='image/png')
|
|
response.headers['Cache-Control'] = 'public, max-age=86400'
|
|
return response
|
|
|
|
|
|
@app.route('/sw.js')
|
|
def serve_sw():
|
|
return send_from_directory(os.path.join(app.root_path, 'static'), 'sw.js', mimetype='application/javascript')
|
|
|
|
# API endpoint for AJAX: returns JSON for a given directory.
|
|
@app.route('/api/path/', defaults={'subpath': ''})
|
|
@app.route('/api/path/<path:subpath>')
|
|
@auth.require_secret
|
|
def api_browse(subpath):
|
|
toplist_categories = app_config.get('toplist_categories', None)
|
|
|
|
if subpath == '': # root directory
|
|
foldernames = []
|
|
for foldername, _ in session['folders'].items():
|
|
foldernames.append({'name': foldername, 'path': foldername})
|
|
|
|
return jsonify({
|
|
'breadcrumbs': generate_breadcrumbs(),
|
|
'directories': foldernames,
|
|
'files': [],
|
|
'folder_today': a.return_folder_today(),
|
|
'folder_yesterday': a.return_folder_yesterday(),
|
|
'toplist_enabled': bool(toplist_categories)
|
|
})
|
|
|
|
if subpath == 'heute' or subpath == 'gestern':
|
|
foldernames = []
|
|
if len(a.return_folder_today()) > 0:
|
|
for item in a.return_folder_today():
|
|
foldernames.append({'name': item['rel_path'], 'path': item['rel_path']})
|
|
elif len(a.return_folder_yesterday()) > 0:
|
|
for item in a.return_folder_yesterday():
|
|
foldernames.append({'name': item['rel_path'], 'path': item['rel_path']})
|
|
|
|
return jsonify({
|
|
'breadcrumbs': generate_breadcrumbs(subpath),
|
|
'directories': foldernames,
|
|
'files': [],
|
|
'folder_today': [],
|
|
'folder_yesterday': []
|
|
})
|
|
|
|
if subpath.startswith('toplist'):
|
|
foldernames = []
|
|
files = []
|
|
split_path = subpath.split('/')
|
|
|
|
if len(split_path) == 1 and split_path[0] == 'toplist':
|
|
foldernames = [
|
|
{
|
|
'name': categorie, 'path': 'toplist/' + categorie
|
|
} for categorie in toplist_categories
|
|
]
|
|
elif len(split_path) > 1 and split_path[0] == 'toplist':
|
|
files = hf.generate_top_list(split_path[1])
|
|
|
|
return jsonify({
|
|
'breadcrumbs': generate_breadcrumbs(subpath),
|
|
'directories': foldernames,
|
|
'files': files
|
|
})
|
|
|
|
|
|
root, *relative_parts = subpath.split('/')
|
|
base_path = session.get('folders', {}).get(root)
|
|
if not base_path:
|
|
app.logger.warning(f"Requested root '{root}' not found in session folders")
|
|
return jsonify({'error': 'Directory not found'}), 404
|
|
directory = os.path.join(base_path, *relative_parts)
|
|
|
|
playfile = None
|
|
|
|
try:
|
|
directory = check_path(directory)
|
|
except (ValueError, PermissionError) as e:
|
|
return jsonify({'error': str(e)}), 403
|
|
|
|
# Check if the constructed directory exists.
|
|
if not os.path.isdir(directory):
|
|
# Assume the last segment is a filename; remove it.
|
|
if relative_parts:
|
|
playfile = relative_parts.pop() # Get the filename.
|
|
directory = os.path.join(base_path, *relative_parts)
|
|
# Rebuild subpath to reflect the directory (without the file).
|
|
subpath = '/'.join([root] + relative_parts)
|
|
# If the parent directory still doesn't exist, return error.
|
|
if not os.path.isdir(directory):
|
|
return jsonify({'error': 'Directory not found'}), 404
|
|
|
|
directories, files = list_directory_contents(directory, subpath)
|
|
breadcrumbs = generate_breadcrumbs(subpath)
|
|
|
|
response = {
|
|
'breadcrumbs': breadcrumbs,
|
|
'directories': directories,
|
|
'files': files,
|
|
'folder_today': a.return_folder_today(),
|
|
'folder_yesterday': a.return_folder_yesterday()
|
|
}
|
|
|
|
# If a filename was selected include it.
|
|
if playfile:
|
|
response['playfile'] = os.path.join(subpath, playfile).replace(os.sep, '/')
|
|
|
|
return jsonify(response)
|
|
|
|
@app.route("/media/<path:subpath>")
|
|
@auth.require_secret
|
|
def serve_file(subpath):
|
|
# 1) Locate the real file on disk
|
|
root, *relative_parts = subpath.split('/')
|
|
|
|
dltoken = request.args.get('dltoken')
|
|
if dltoken:
|
|
as_attachment = True
|
|
full_path = auth.decode_token(dltoken)['filename']
|
|
else:
|
|
as_attachment = False
|
|
base_path = session['folders'].get(root)
|
|
full_path = os.path.join(base_path or '', *relative_parts)
|
|
|
|
try:
|
|
full_path = check_path(full_path)
|
|
except (ValueError, PermissionError) as e:
|
|
return jsonify({'Unauthorized': str(e)}), 403
|
|
|
|
if not os.path.isfile(full_path):
|
|
app.logger.error(f"File not found: {full_path}")
|
|
return "File not found", 404
|
|
|
|
filesize = os.path.getsize(full_path)
|
|
filename = os.path.basename(full_path)
|
|
|
|
# 2) Prep request info
|
|
mime, _ = mimetypes.guess_type(full_path)
|
|
mime = mime or 'application/octet-stream'
|
|
is_cache_request = request.headers.get('X-Cache-Request') == 'true'
|
|
is_audio_get = mime.startswith('audio/') and request.method == 'GET'
|
|
ip_address = request.remote_addr
|
|
user_agent = request.headers.get('User-Agent')
|
|
range_header = request.headers.get('Range', '')
|
|
req_id = request.args.get('req') or request.headers.get('X-Request-Id')
|
|
|
|
def is_range_prefetch(header, ua):
|
|
"""
|
|
Detect tiny range requests (common Apple prefetch) so we can skip logging duplicates.
|
|
"""
|
|
if not header:
|
|
return False
|
|
try:
|
|
if not header.lower().startswith('bytes='):
|
|
return False
|
|
range_spec = header.split('=', 1)[1]
|
|
start_str, end_str = range_spec.split('-', 1)
|
|
if not start_str.isdigit() or not end_str.isdigit():
|
|
return False
|
|
start = int(start_str)
|
|
end = int(end_str)
|
|
length = end - start + 1
|
|
if length <= 1024 and start == 0:
|
|
return True
|
|
except Exception:
|
|
return False
|
|
return False
|
|
|
|
# Logging: log every client GET (cached or not), but skip CDN prefetches (X-Cache-Request)
|
|
# and HEAD probes to avoid double-counting. Also skip tiny range-prefetches (e.g., Apple).
|
|
do_log = (
|
|
not is_cache_request # skip if upstream CDN asked us to cache
|
|
and request.method != 'HEAD'
|
|
and not is_range_prefetch(range_header, user_agent)
|
|
)
|
|
|
|
# 3) Pick cache
|
|
if mime.startswith('audio/'):
|
|
cache = cache_audio
|
|
elif mime.startswith('image/'):
|
|
cache = cache_image
|
|
elif mime.startswith('video/'):
|
|
cache = cache_video
|
|
else:
|
|
cache = cache_other
|
|
|
|
# Plain HEAD requests (without X-Cache-Request) should not populate the cache.
|
|
# They are just probes and would otherwise turn the first real GET into a “cached hit”.
|
|
if request.method == 'HEAD' and not is_cache_request:
|
|
response = make_response('', 200)
|
|
response.headers['Content-Type'] = mime
|
|
response.headers['Content-Length'] = str(filesize)
|
|
response.headers['Accept-Ranges'] = 'bytes'
|
|
response.headers['Cache-Control'] = 'public, max-age=86400'
|
|
return response
|
|
|
|
# 4) Image and thumbnail handling first
|
|
if mime.startswith('image/'):
|
|
small = request.args.get('thumbnail') == 'true'
|
|
name, ext = os.path.splitext(subpath)
|
|
orig_key = subpath
|
|
small_key = f"{name}_small{ext}"
|
|
cache_key = small_key if small else orig_key
|
|
|
|
try:
|
|
# Try to read the requested variant
|
|
with cache.read(cache_key) as reader:
|
|
file_path = reader.name
|
|
cached_hit = True
|
|
except KeyError:
|
|
if small: # do not create when thumbnail requested
|
|
response = make_response('', 204)
|
|
return response
|
|
|
|
cached_hit = False
|
|
# On miss: generate both full-size and small thumb, then cache
|
|
with Image.open(full_path) as orig:
|
|
|
|
img = ImageOps.exif_transpose(orig)
|
|
exif_bytes = None
|
|
try:
|
|
exif = img.getexif()
|
|
if exif:
|
|
exif.pop(0x0112, None) # Drop orientation tag since we already rotated the image
|
|
exif_bytes = exif.tobytes()
|
|
except Exception:
|
|
exif_bytes = None
|
|
variants = {
|
|
orig_key: (1920, 1920),
|
|
small_key: ( 480, 480),
|
|
}
|
|
for key, size in variants.items():
|
|
thumb = img.copy()
|
|
thumb.thumbnail(size, Image.LANCZOS)
|
|
if thumb.mode in ("RGBA", "P"):
|
|
thumb = thumb.convert("RGB")
|
|
bio = io.BytesIO()
|
|
save_kwargs = {'format': 'JPEG', 'quality': 85}
|
|
if key == orig_key and exif_bytes:
|
|
save_kwargs['exif'] = exif_bytes
|
|
thumb.save(bio, **save_kwargs)
|
|
bio.seek(0)
|
|
cache.set(key, bio, read=True)
|
|
# Read back the variant we need
|
|
with cache.read(cache_key) as reader:
|
|
file_path = reader.name
|
|
|
|
# Serve the image variant
|
|
response = send_file(
|
|
file_path,
|
|
mimetype=mime,
|
|
conditional=True,
|
|
as_attachment=(request.args.get('download') == 'true'),
|
|
download_name=os.path.basename(orig_key)
|
|
)
|
|
response.headers['Content-Disposition'] = 'inline'
|
|
response.headers['Cache-Control'] = 'public, max-age=86400'
|
|
|
|
if do_log and not small:
|
|
if not _is_duplicate_request(req_id):
|
|
a.log_file_access(
|
|
cache_key,
|
|
os.path.getsize(file_path),
|
|
mime,
|
|
ip_address,
|
|
user_agent,
|
|
session['device_id'],
|
|
cached_hit,
|
|
request.method
|
|
)
|
|
_mark_request_logged(req_id)
|
|
return response
|
|
|
|
# 5) Non-image branch: check if cached; if not, fill cache synchronously and then serve from cache
|
|
try:
|
|
with cache.read(subpath) as reader:
|
|
file_path = reader.name
|
|
cached_hit = True
|
|
except KeyError:
|
|
cached_hit = False
|
|
try:
|
|
# Copy full file into diskcache (blocks the request; avoids serving partial content)
|
|
with open(full_path, 'rb') as source:
|
|
cache.set(subpath, source, read=True)
|
|
with cache.read(subpath) as reader:
|
|
file_path = reader.name
|
|
app.logger.info(f"Cached {subpath}")
|
|
except Exception as e:
|
|
app.logger.error(f"Cache fill failed for {subpath}: {e}")
|
|
abort(503, description="Service temporarily unavailable - cache population failed")
|
|
|
|
# 6) Build response for non-image
|
|
if as_attachment:
|
|
download_name = filename
|
|
mimetype = 'application/octet-stream'
|
|
else:
|
|
download_name = None
|
|
mimetype = mime
|
|
|
|
|
|
# Single send_file call with proper attachment handling
|
|
response = send_file(
|
|
file_path,
|
|
mimetype=mimetype,
|
|
conditional=True,
|
|
as_attachment=as_attachment,
|
|
download_name=filename if as_attachment else None
|
|
)
|
|
|
|
if as_attachment:
|
|
response.headers['X-Content-Type-Options'] = 'nosniff'
|
|
response.headers['Content-Disposition'] = 'attachment'
|
|
else:
|
|
response.headers['Content-Disposition'] = 'inline'
|
|
|
|
response.headers['Cache-Control'] = 'public, max-age=86400'
|
|
|
|
if request.method == 'HEAD':
|
|
response.set_data(b'')
|
|
|
|
# 7) Logging
|
|
if do_log:
|
|
if not _is_duplicate_request(req_id):
|
|
a.log_file_access(
|
|
subpath,
|
|
filesize,
|
|
mime,
|
|
ip_address,
|
|
user_agent,
|
|
session['device_id'],
|
|
cached_hit,
|
|
request.method
|
|
)
|
|
_mark_request_logged(req_id)
|
|
return response
|
|
|
|
|
|
@app.route("/media_exif/<path:subpath>")
|
|
@auth.require_secret
|
|
def media_exif(subpath):
|
|
root, *relative_parts = subpath.split('/')
|
|
base_path = session.get('folders', {}).get(root)
|
|
if not base_path:
|
|
return jsonify({'error': 'Directory not found'}), 404
|
|
|
|
full_path = os.path.join(base_path, *relative_parts)
|
|
try:
|
|
full_path = check_path(full_path)
|
|
except (ValueError, PermissionError) as e:
|
|
return jsonify({'error': str(e)}), 403
|
|
|
|
if not os.path.isfile(full_path):
|
|
return jsonify({'error': 'File not found'}), 404
|
|
|
|
mime, _ = mimetypes.guess_type(full_path)
|
|
if not mime or not mime.startswith('image/'):
|
|
return jsonify({'error': 'Not an image'}), 400
|
|
|
|
try:
|
|
with Image.open(full_path) as img:
|
|
exif_data = img.getexif()
|
|
if not exif_data:
|
|
return jsonify({'exif': {}})
|
|
|
|
readable = {}
|
|
|
|
ignore_tags = {'Orientation', 'ExifOffset', 'MakerNote', 'PrintImageMatching'}
|
|
|
|
def add_items(exif_dict):
|
|
for tag_id, value in exif_dict.items():
|
|
tag = ExifTags.TAGS.get(tag_id, str(tag_id))
|
|
if tag in ignore_tags:
|
|
continue
|
|
try:
|
|
if isinstance(value, bytes):
|
|
value = value.decode('utf-8', errors='ignore').strip()
|
|
except Exception:
|
|
pass
|
|
readable[tag] = str(value)
|
|
|
|
add_items(exif_data)
|
|
|
|
# Include Exif and GPS sub-IFDs when available to surface exposure/ISO/lens details.
|
|
try:
|
|
if hasattr(ExifTags, 'IFD'):
|
|
for ifd in (ExifTags.IFD.Exif, ExifTags.IFD.GPSInfo):
|
|
try:
|
|
sub_ifd = exif_data.get_ifd(ifd)
|
|
add_items(sub_ifd)
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
app.logger.warning(f"EXIF read failed for {subpath}: {e}")
|
|
return jsonify({'exif': {}})
|
|
|
|
return jsonify({'exif': readable})
|
|
|
|
|
|
|
|
@app.route("/transcript/<path:subpath>")
|
|
@auth.require_secret
|
|
def get_transcript(subpath):
|
|
|
|
root, *relative_parts = subpath.split('/')
|
|
base_path = session['folders'][root]
|
|
full_path = os.path.join(base_path, *relative_parts)
|
|
|
|
if not os.path.isfile(full_path):
|
|
return "Transcription not found", 404
|
|
|
|
with open(full_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
return content, 200, {'Content-Type': 'text/markdown; charset=utf-8'}
|
|
|
|
|
|
@app.route("/create_token/<path:subpath>")
|
|
@auth.require_secret
|
|
def create_token(subpath):
|
|
scheme = request.scheme # current scheme (http or https)
|
|
host = request.host
|
|
if 'admin' not in session and not session.get('admin'):
|
|
return "Unauthorized", 403
|
|
|
|
folder_config = auth.return_folder_config()
|
|
paths = {}
|
|
for item in folder_config:
|
|
for folder in item['folders']:
|
|
paths[folder['foldername']] = folder['folderpath']
|
|
|
|
# use folder config file and ignore validity to get full path
|
|
root, *relative_parts = subpath.split('/')
|
|
base_path = paths[root]
|
|
full_path = os.path.join(base_path, *relative_parts)
|
|
foldername = relative_parts[-1]
|
|
validity_date = (datetime.now() + timedelta(days=90)).strftime('%d.%m.%Y')
|
|
data = {
|
|
"validity": validity_date,
|
|
"folders": [
|
|
{
|
|
"foldername": foldername,
|
|
"folderpath": full_path
|
|
}
|
|
]
|
|
}
|
|
|
|
token = auth.generate_token(data)
|
|
|
|
url = f"{scheme}://{host}?token={token}"
|
|
qr = qrcode.QRCode(version=1, box_size=10, border=4)
|
|
qr.add_data(url)
|
|
qr.make(fit=True)
|
|
img = qr.make_image(fill_color="black", back_color="white")
|
|
buffer = io.BytesIO()
|
|
img.save(buffer, format="PNG")
|
|
buffer.seek(0)
|
|
img_base64 = base64.b64encode(buffer.getvalue()).decode('ascii')
|
|
token_item = auth.decode_token(token)
|
|
|
|
return render_template('view_token.html',
|
|
token_qr_code=img_base64,
|
|
token_folder=token_item.get('folders'),
|
|
token_url=url,
|
|
token_valid_to=token_item.get('validity', 'Unbekannt'),
|
|
title_short=auth.return_app_config().get('TITLE_SHORT', ''),
|
|
title_long=auth.return_app_config().get('TITLE_LONG', '')
|
|
)
|
|
|
|
|
|
@app.route("/create_dltoken/<path:subpath>")
|
|
@auth.require_secret
|
|
def create_dltoken(subpath):
|
|
scheme = request.scheme # current scheme (http or https)
|
|
host = request.host
|
|
# 1) Locate the real file on disk
|
|
root, *relative_parts = subpath.split('/')
|
|
base_path = session['folders'].get(root)
|
|
full_path = os.path.join(base_path or '', *relative_parts)
|
|
|
|
try:
|
|
full_path = check_path(full_path)
|
|
except (ValueError, PermissionError) as e:
|
|
return jsonify({'Unauthorized': str(e)}), 403
|
|
|
|
if not os.path.isfile(full_path):
|
|
app.logger.error(f"File not found: {full_path}")
|
|
return "File not found", 404
|
|
|
|
validity_date = datetime.now().strftime('%d.%m.%Y')
|
|
data = {
|
|
"validity": validity_date,
|
|
"filename": str(full_path)
|
|
}
|
|
|
|
token = auth.generate_token(data)
|
|
url = f"{scheme}://{host}/media/{subpath}?dltoken={token}"
|
|
return url
|
|
|
|
|
|
def query_recent_connections():
|
|
global clients_connected, background_thread_running
|
|
background_thread_running = True
|
|
last_connections = None
|
|
try:
|
|
while clients_connected > 0:
|
|
rows = a.return_file_access()
|
|
connections = [
|
|
{
|
|
'timestamp': datetime.fromisoformat(row[0]).strftime('%d.%m.%Y %H:%M:%S'),
|
|
'full_path': row[1],
|
|
'filesize': row[2],
|
|
'filesize_human': human_readable_size(row[2]),
|
|
'mime_typ': row[3],
|
|
'location': row[4],
|
|
'user_agent': row[5],
|
|
'cached': row[7]
|
|
}
|
|
for row in rows
|
|
]
|
|
|
|
if connections != last_connections:
|
|
socketio.emit('recent_connections', connections)
|
|
last_connections = connections.copy()
|
|
|
|
socketio.sleep(1)
|
|
finally:
|
|
background_thread_running = False
|
|
|
|
|
|
@socketio.on('connect')
|
|
def handle_connect(auth=None):
|
|
global clients_connected, background_thread_running
|
|
clients_connected += 1
|
|
print("Client connected. Total clients:", clients_connected)
|
|
with thread_lock:
|
|
if not background_thread_running:
|
|
socketio.start_background_task(query_recent_connections)
|
|
print("Started background query task.")
|
|
|
|
@socketio.on('disconnect')
|
|
def handle_disconnect():
|
|
global clients_connected
|
|
clients_connected -= 1
|
|
print("Client disconnected. Total clients:", clients_connected)
|
|
|
|
@socketio.on('request_initial_data')
|
|
def handle_request_initial_data():
|
|
rows = a.return_file_access()
|
|
connections = [
|
|
{
|
|
'timestamp': datetime.fromisoformat(row[0]).strftime('%d.%m.%Y %H:%M:%S'),
|
|
'full_path': row[1],
|
|
'filesize' : row[2],
|
|
'filesize_human': human_readable_size(row[2]),
|
|
'mime_typ' : row[3],
|
|
'location': row[4],
|
|
'user_agent': row[5],
|
|
'cached': row[7]
|
|
}
|
|
for row in rows
|
|
]
|
|
emit('recent_connections', connections)
|
|
|
|
@socketio.on('request_map_data')
|
|
def handle_request_map_data():
|
|
"""Send initial map data from in-memory log to avoid hitting the DB."""
|
|
rows = a.return_file_access()
|
|
connections = []
|
|
for row in rows:
|
|
# lat/lon are appended at indexes 10/11 in analytics.log_file_access
|
|
lat = row[10] if len(row) > 10 else None
|
|
lon = row[11] if len(row) > 11 else None
|
|
city = row[8] if len(row) > 8 else None
|
|
country = row[9] if len(row) > 9 else None
|
|
if lat and lon:
|
|
connections.append({
|
|
'timestamp': row[0],
|
|
'city': city,
|
|
'country': country,
|
|
'lat': lat,
|
|
'lon': lon
|
|
})
|
|
emit('map_initial_data', {'connections': connections})
|
|
|
|
# Catch-all route to serve the single-page application template.
|
|
@app.route('/', defaults={'path': ''})
|
|
@app.route('/<path:path>')
|
|
@auth.require_secret
|
|
def index(path):
|
|
app_config = auth.return_app_config()
|
|
|
|
# Generate dynamic Open Graph meta tags for Telegram preview
|
|
og_title = app_config.get('TITLE_LONG', 'Default Title')
|
|
og_description = "... uns aber, die wir gerettet werden, ist es eine Gotteskraft."
|
|
|
|
# If secret or token is provided, show folder names in description
|
|
folder_names = list(session['folders'].keys())
|
|
if folder_names:
|
|
folder_list = ", ".join(folder_names)
|
|
og_description = folder_list
|
|
|
|
return render_template("app.html",
|
|
search_folders = folder_names,
|
|
title_short=app_config.get('TITLE_SHORT', 'Default Title'),
|
|
title_long=app_config.get('TITLE_LONG' , 'Default Title'),
|
|
features=app_config.get('FEATURES', None),
|
|
og_title=og_title,
|
|
og_description=og_description,
|
|
admin_enabled=auth.is_admin()
|
|
)
|
|
|
|
if __name__ == '__main__':
|
|
socketio.run(app, debug=True, host='0.0.0.0')
|