bethaus-app/app.py

692 lines
24 KiB
Python
Executable File

import eventlet
eventlet.monkey_patch()
from flask import Flask, render_template, send_file, url_for, jsonify, request, session, send_from_directory, make_response, abort
import os
from PIL import Image, ImageOps
import io
from functools import wraps
import mimetypes
from datetime import datetime, date, timedelta
import diskcache
import threading
import time
from flask_socketio import SocketIO, emit
import geoip2.database
from functools import lru_cache
from urllib.parse import urlparse, unquote
from werkzeug.middleware.proxy_fix import ProxyFix
from pathlib import Path
import re
import qrcode
import base64
import search
import auth
import analytics as a
import folder_secret_config_editor as fsce
import helperfunctions as hf
import fnmatch
app_config = auth.return_app_config()
BASE_DIR = os.path.realpath(app_config['BASE_DIR'])
cache_audio = diskcache.Cache('./filecache_audio', size_limit= app_config['filecache_size_limit_audio'] * 1024**3)
cache_image = diskcache.Cache('./filecache_image', size_limit= app_config['filecache_size_limit_image'] * 1024**3)
cache_video = diskcache.Cache('./filecache_video', size_limit= app_config['filecache_size_limit_video'] * 1024**3)
cache_other = diskcache.Cache('./filecache_other', size_limit= app_config['filecache_size_limit_other'] * 1024**3)
app = Flask(__name__)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1)
app.config['SECRET_KEY'] = app_config['SECRET_KEY']
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=90)
if os.environ.get('FLASK_ENV') == 'production':
app.config['SESSION_COOKIE_SAMESITE'] = 'None'
app.config['SESSION_COOKIE_SECURE'] = True
app.add_url_rule('/dashboard', view_func=auth.require_admin(a.dashboard))
app.add_url_rule('/file_access', view_func=auth.require_admin(a.file_access))
app.add_url_rule('/connections', view_func=auth.require_admin(a.connections))
app.add_url_rule('/mylinks', view_func=auth.require_secret(auth.mylinks))
app.add_url_rule('/songs_dashboard', view_func=auth.require_admin(a.songs_dashboard))
app.add_url_rule('/remove_secret', view_func=auth.remove_secret, methods=['POST'])
app.add_url_rule('/remove_token', view_func=auth.remove_token, methods=['POST'])
app.add_url_rule('/searchcommand', view_func=search.searchcommand, methods=['POST'])
app.add_url_rule('/admin/folder_secret_config_editor', view_func=auth.require_admin(fsce.folder_secret_config_editor), methods=['GET', 'POST'])
app.add_url_rule('/admin/folder_secret_config_editor/data', view_func=auth.require_admin(auth.load_folder_config))
app.add_url_rule('/admin/folder_secret_config_editor/action', view_func=auth.require_admin(fsce.folder_secret_config_action), methods=['POST'])
# Grab the HOST_RULE environment variable
host_rule = os.getenv("HOST_RULE", "")
# Use a regex to extract domain names between backticks in patterns like Host(`something`)
pattern = r"Host\(`([^`]+)`\)"
allowed_domains = re.findall(pattern, host_rule)
socketio = SocketIO(
app,
async_mode='eventlet',
cors_allowed_origins=allowed_domains
)
background_thread_running = False
# Global variables to track the number of connected clients and the background thread
clients_connected = 0
background_thread = None
thread_lock = threading.Lock()
@lru_cache(maxsize=10)
def get_cached_image(size):
dimensions = tuple(map(int, size.split('-')[1].split('x')))
original_logo_path = os.path.join(app.root_path, 'custom_logo', 'logoB.png')
with Image.open(original_logo_path) as img:
img = img.convert("RGBA")
orig_width, orig_height = img.size
if dimensions[0] >= orig_width and dimensions[1] >= orig_height:
resized_img = img
else:
resized_img = img.copy()
resized_img.thumbnail(dimensions, Image.LANCZOS)
img_byte_arr = io.BytesIO()
resized_img.save(img_byte_arr, format='PNG')
return img_byte_arr.getvalue()
def check_path(access_path: str) -> Path:
"""
Take an absolute access_path, then ensure it lives inside BASE_DIR.
Raises ValueError or PermissionError on failure.
"""
p = Path(access_path)
if not p.is_absolute():
raise ValueError(f"Path {access_path} is not a valid absolute path")
# Resolve symlinks & eliminate “..” components
candidate = p.resolve()
base = Path(BASE_DIR).resolve()
try:
# Will raise ValueError if candidate is not under base
candidate.relative_to(base)
except ValueError:
raise PermissionError(f"Access to {access_path} is forbidden")
return candidate
def list_directory_contents(directory, subpath):
"""
List only the immediate contents of the given directory.
Also, if a "Transkription" subfolder exists, check for matching .md files for music files.
Skip folders that start with a dot.
"""
directories = []
files = []
folder_config = auth.return_folder_config()
transcription_dir = os.path.join(directory, "Transkription")
transcription_exists = os.path.isdir(transcription_dir)
direct_directories = []
for item in folder_config:
for folder in item['folders']:
direct_directories.append(folder['foldername'])
# Define allowed file extensions.
music_exts = ('.mp3',)
image_exts = ('.jpg', '.jpeg', '.png', '.gif', '.bmp')
blocked_filenames = ['Thumbs.db', '*.mrk']
try:
with os.scandir(directory) as it:
# Sorting by name if required.
for entry in sorted(it, key=lambda e: e.name):
# Skip hidden files and directories.
if entry.name.startswith('.'):
continue
# Skip blocked_filenames using fnmatch for wildcards
if any(fnmatch.fnmatch(entry.name, pattern) for pattern in blocked_filenames):
continue
if entry.is_dir(follow_symlinks=False):
if entry.name in ["Transkription", "@eaDir", ".ai"]:
continue
rel_path = os.path.join(subpath, entry.name) if subpath else entry.name
# check if path is inside a direct directory --> shareable
if subpath.split('/')[0] in direct_directories:
allow_share = True
else:
allow_share = False
# build directory entry
directories.append({'name': entry.name, 'path': rel_path.replace(os.sep, '/'), 'share': allow_share})
elif entry.is_file(follow_symlinks=False):
lower_name = entry.name.lower()
# implement file type filtering here !!!
#if lower_name.endswith(music_exts) or lower_name.endswith(image_exts):
rel_path = os.path.join(subpath, entry.name) if subpath else entry.name
if lower_name.endswith(music_exts):
file_type = 'music'
elif lower_name.endswith(image_exts):
file_type = 'image'
else:
file_type = 'other'
# build file entry
file_entry = {'name': entry.name, 'path': rel_path.replace(os.sep, '/'), 'file_type': file_type}
# Only check for transcription if it's a audio file.
if file_type == 'music' and transcription_exists:
base_name = os.path.splitext(entry.name)[0]
transcript_filename = base_name + '.md'
transcript_path = os.path.join(transcription_dir, transcript_filename)
if os.path.isfile(transcript_path):
file_entry['has_transcript'] = True
transcript_rel_path = os.path.join(subpath, "Transkription", transcript_filename) if subpath else os.path.join("Transkription", transcript_filename)
file_entry['transcript_url'] = url_for('get_transcript', subpath=transcript_rel_path.replace(os.sep, '/'))
else:
file_entry['has_transcript'] = False
else:
file_entry['has_transcript'] = False
files.append(file_entry)
except PermissionError:
pass
return directories, files
def generate_breadcrumbs(subpath=None):
breadcrumbs = [{'name': 'Home', 'path': ''}]
if subpath:
parts = subpath.split('/')
path_accum = ""
for part in parts:
path_accum = f"{path_accum}/{part}" if path_accum else part
if 'toplist' in part:
part = part.replace('toplist', 'oft angehört')
breadcrumbs.append({'name': part, 'path': path_accum})
return breadcrumbs
@app.route('/icon/<string:size>.png')
def serve_resized_icon(size):
cached_image_bytes = get_cached_image(size)
response = send_file(
io.BytesIO(cached_image_bytes),
mimetype='image/png'
)
response.headers['Cache-Control'] = 'public, max-age=86400'
return response
@app.route('/custom_logo/<string:filename>.png')
def custom_logo(filename):
file_path = os.path.join('custom_logo', f"{filename}.png")
if not os.path.exists(file_path):
abort(404)
with open(file_path, 'rb') as file:
image_data = file.read()
# Create a BytesIO object using the binary image data
image_io = io.BytesIO(image_data)
image_io.seek(0) # Important: reset the stream position
response = send_file(image_io, mimetype='image/png')
response.headers['Cache-Control'] = 'public, max-age=86400'
return response
@app.route('/sw.js')
def serve_sw():
return send_from_directory(os.path.join(app.root_path, 'static'), 'sw.js', mimetype='application/javascript')
# API endpoint for AJAX: returns JSON for a given directory.
@app.route('/api/path/', defaults={'subpath': ''})
@app.route('/api/path/<path:subpath>')
@auth.require_secret
def api_browse(subpath):
toplist_categories = app_config.get('toplist_categories', None)
if subpath == '': # root directory
foldernames = []
for foldername, _ in session['folders'].items():
foldernames.append({'name': foldername, 'path': foldername})
return jsonify({
'breadcrumbs': generate_breadcrumbs(),
'directories': foldernames,
'files': [],
'folder_today': a.return_folder_today(),
'folder_yesterday': a.return_folder_yesterday(),
'toplist_enabled': bool(toplist_categories)
})
if subpath == 'heute' or subpath == 'gestern':
foldernames = []
if len(a.return_folder_today()) > 0:
for item in a.return_folder_today():
foldernames.append({'name': item['rel_path'], 'path': item['rel_path']})
elif len(a.return_folder_yesterday()) > 0:
for item in a.return_folder_yesterday():
foldernames.append({'name': item['rel_path'], 'path': item['rel_path']})
return jsonify({
'breadcrumbs': generate_breadcrumbs(subpath),
'directories': foldernames,
'files': [],
'folder_today': [],
'folder_yesterday': []
})
if subpath.startswith('toplist'):
foldernames = []
files = []
split_path = subpath.split('/')
if len(split_path) == 1 and split_path[0] == 'toplist':
foldernames = [
{
'name': categorie, 'path': 'toplist/' + categorie
} for categorie in toplist_categories
]
elif len(split_path) > 1 and split_path[0] == 'toplist':
files = hf.generate_top_list(split_path[1])
return jsonify({
'breadcrumbs': generate_breadcrumbs(subpath),
'directories': foldernames,
'files': files
})
root, *relative_parts = subpath.split('/')
base_path = session['folders'][root]
directory = os.path.join(base_path, *relative_parts)
playfile = None
try:
directory = check_path(directory)
except (ValueError, PermissionError) as e:
return jsonify({'error': str(e)}), 403
# Check if the constructed directory exists.
if not os.path.isdir(directory):
# Assume the last segment is a filename; remove it.
if relative_parts:
playfile = relative_parts.pop() # Get the filename.
directory = os.path.join(base_path, *relative_parts)
# Rebuild subpath to reflect the directory (without the file).
subpath = '/'.join([root] + relative_parts)
# If the parent directory still doesn't exist, return error.
if not os.path.isdir(directory):
return jsonify({'error': 'Directory not found'}), 404
directories, files = list_directory_contents(directory, subpath)
breadcrumbs = generate_breadcrumbs(subpath)
response = {
'breadcrumbs': breadcrumbs,
'directories': directories,
'files': files,
'folder_today': a.return_folder_today(),
'folder_yesterday': a.return_folder_yesterday()
}
# If a filename was selected include it.
if playfile:
response['playfile'] = os.path.join(subpath, playfile).replace(os.sep, '/')
return jsonify(response)
@app.route("/media/<path:subpath>")
@auth.require_secret
def serve_file(subpath):
# 1) Locate the real file on disk
root, *relative_parts = subpath.split('/')
dltoken = request.args.get('dltoken')
if dltoken:
as_attachment = True
full_path = auth.decode_token(dltoken)['filename']
else:
as_attachment = False
base_path = session['folders'].get(root)
full_path = os.path.join(base_path or '', *relative_parts)
try:
full_path = check_path(full_path)
except (ValueError, PermissionError) as e:
return jsonify({'Unauthorized': str(e)}), 403
if not os.path.isfile(full_path):
app.logger.error(f"File not found: {full_path}")
return "File not found", 404
# 2) Prep request info
mime, _ = mimetypes.guess_type(full_path)
mime = mime or 'application/octet-stream'
is_cache_request = request.headers.get('X-Cache-Request') == 'true'
is_audio_get = mime.startswith('audio/') and request.method == 'GET'
ip_address = request.remote_addr
user_agent = request.headers.get('User-Agent')
# skip logging on cache hits or on audio GETs (per your rules)
do_log = (
not is_cache_request # skip if upstream CDN asked us to cache
and not is_audio_get # skip audio GETs
)
# 3) Pick cache
if mime.startswith('audio/'):
cache = cache_audio
elif mime.startswith('image/'):
cache = cache_image
elif mime.startswith('video/'):
cache = cache_video
else:
cache = cache_other
# 4) Image and thumbnail handling first
if mime.startswith('image/'):
small = request.args.get('thumbnail') == 'true'
name, ext = os.path.splitext(subpath)
orig_key = subpath
small_key = f"{name}_small{ext}"
cache_key = small_key if small else orig_key
try:
# Try to read the requested variant
with cache.read(cache_key) as reader:
file_path = reader.name
cached_hit = True
except KeyError:
if small: # do not create when thumbnail requested
response = make_response('', 204)
return response
cached_hit = False
# On miss: generate both full-size and small thumb, then cache
with Image.open(full_path) as orig:
img = ImageOps.exif_transpose(orig)
variants = {
orig_key: (1920, 1920),
small_key: ( 480, 480),
}
for key, size in variants.items():
thumb = img.copy()
thumb.thumbnail(size, Image.LANCZOS)
if thumb.mode in ("RGBA", "P"):
thumb = thumb.convert("RGB")
bio = io.BytesIO()
thumb.save(bio, format='JPEG', quality=85)
bio.seek(0)
cache.set(key, bio, read=True)
# Read back the variant we need
with cache.read(cache_key) as reader:
file_path = reader.name
# Serve the image variant
response = send_file(
file_path,
mimetype=mime,
conditional=True,
as_attachment=(request.args.get('download') == 'true'),
download_name=os.path.basename(orig_key)
)
response.headers['Content-Disposition'] = 'inline'
response.headers['Cache-Control'] = 'public, max-age=86400'
if do_log and not small:
a.log_file_access(
cache_key,
os.path.getsize(file_path),
mime,
ip_address,
user_agent,
session['device_id'],
cached_hit
)
return response
# 5) Non-image branch: ensure original is cached
try:
with cache.read(subpath) as reader:
file_path = reader.name
cached_hit = True
except KeyError:
cached_hit = False
try:
cache.set(subpath, open(full_path, 'rb'), read=True)
with cache.read(subpath) as reader:
file_path = reader.name
except Exception as e:
app.logger.error(f"Failed to cache file {subpath}: {e}")
abort(500)
# 6) Build response for non-image
filesize = os.path.getsize(file_path)
filename = os.path.basename(full_path)
if as_attachment:
download_name = filename
mimetype = 'application/octet-stream'
else:
download_name = None
mimetype = mime
# Single send_file call with proper attachment handling
response = send_file(
file_path,
mimetype=mimetype,
conditional=True,
as_attachment=as_attachment,
download_name=filename if as_attachment else None
)
if as_attachment:
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['Content-Disposition'] = 'attachment'
else:
response.headers['Content-Disposition'] = 'inline'
response.headers['Cache-Control'] = 'public, max-age=86400'
# 7) Logging
if do_log:
a.log_file_access(
subpath,
filesize,
mime,
ip_address,
user_agent,
session['device_id'],
cached_hit
)
return response
@app.route("/transcript/<path:subpath>")
@auth.require_secret
def get_transcript(subpath):
root, *relative_parts = subpath.split('/')
base_path = session['folders'][root]
full_path = os.path.join(base_path, *relative_parts)
if not os.path.isfile(full_path):
return "Transcription not found", 404
with open(full_path, 'r', encoding='utf-8') as f:
content = f.read()
return content, 200, {'Content-Type': 'text/markdown; charset=utf-8'}
@app.route("/create_token/<path:subpath>")
@auth.require_secret
def create_token(subpath):
scheme = request.scheme # current scheme (http or https)
host = request.host
if 'admin' not in session and not session.get('admin'):
return "Unauthorized", 403
folder_config = auth.return_folder_config()
paths = {}
for item in folder_config:
for folder in item['folders']:
paths[folder['foldername']] = folder['folderpath']
# use folder config file and ignore validity to get full path
root, *relative_parts = subpath.split('/')
base_path = paths[root]
full_path = os.path.join(base_path, *relative_parts)
foldername = relative_parts[-1]
validity_date = (datetime.now() + timedelta(days=90)).strftime('%d.%m.%Y')
data = {
"validity": validity_date,
"folders": [
{
"foldername": foldername,
"folderpath": full_path
}
]
}
token = auth.generate_token(data)
url = f"{scheme}://{host}?token={token}"
qr = qrcode.QRCode(version=1, box_size=10, border=4)
qr.add_data(url)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = io.BytesIO()
img.save(buffer, format="PNG")
buffer.seek(0)
img_base64 = base64.b64encode(buffer.getvalue()).decode('ascii')
token_item = auth.decode_token(token)
return render_template('view_token.html',
token_qr_code=img_base64,
token_folder=token_item.get('folders'),
token_url=url,
token_valid_to=token_item.get('validity', 'Unbekannt')
)
@app.route("/create_dltoken/<path:subpath>")
@auth.require_secret
def create_dltoken(subpath):
scheme = request.scheme # current scheme (http or https)
host = request.host
# 1) Locate the real file on disk
root, *relative_parts = subpath.split('/')
base_path = session['folders'].get(root)
full_path = os.path.join(base_path or '', *relative_parts)
try:
full_path = check_path(full_path)
except (ValueError, PermissionError) as e:
return jsonify({'Unauthorized': str(e)}), 403
if not os.path.isfile(full_path):
app.logger.error(f"File not found: {full_path}")
return "File not found", 404
validity_date = datetime.now().strftime('%d.%m.%Y')
data = {
"validity": validity_date,
"filename": str(full_path)
}
token = auth.generate_token(data)
url = f"{scheme}://{host}/media/{subpath}?dltoken={token}"
return url
def query_recent_connections():
global clients_connected, background_thread_running
background_thread_running = True
last_connections = None
try:
while clients_connected > 0:
rows = a.return_file_access()
connections = [
{
'timestamp': datetime.fromisoformat(row[0]).strftime('%d.%m.%Y %H:%M:%S'),
'full_path': row[1],
'filesize': row[2],
'mime_typ': row[3],
'location': row[4],
'user_agent': row[5],
'cached': row[7]
}
for row in rows
]
if connections != last_connections:
socketio.emit('recent_connections', connections)
last_connections = connections.copy()
socketio.sleep(1)
finally:
background_thread_running = False
@socketio.on('connect')
def handle_connect(auth=None):
global clients_connected, background_thread_running
clients_connected += 1
print("Client connected. Total clients:", clients_connected)
with thread_lock:
if not background_thread_running:
socketio.start_background_task(query_recent_connections)
print("Started background query task.")
@socketio.on('disconnect')
def handle_disconnect():
global clients_connected
clients_connected -= 1
print("Client disconnected. Total clients:", clients_connected)
@socketio.on('request_initial_data')
def handle_request_initial_data():
rows = a.return_file_access()
connections = [
{
'timestamp': datetime.fromisoformat(row[0]).strftime('%d.%m.%Y %H:%M:%S'),
'full_path': row[1],
'filesize' : row[2],
'mime_typ' : row[3],
'location': row[4],
'user_agent': row[5],
'cached': row[7]
}
for row in rows
]
emit('recent_connections', connections)
# Catch-all route to serve the single-page application template.
@app.route('/', defaults={'path': ''})
@app.route('/<path:path>')
@auth.require_secret
def index(path):
app_config = auth.return_app_config()
return render_template("app.html",
search_folders = list(session['folders'].keys()),
title_short=app_config.get('TITLE_SHORT', 'Default Title'),
title_long=app_config.get('TITLE_LONG' , 'Default Title'),
admin_enabled=auth.is_admin()
)
if __name__ == '__main__':
socketio.run(app, debug=True, host='0.0.0.0')