# --- app.py --- import os import io import base64 import pandas as pd import qrcode import uuid import time, math from flask import Flask, render_template, request, session, redirect, url_for, abort from flask_socketio import SocketIO, emit, join_room from dotenv import load_dotenv # global store of all games, keyed by secret. Each entry will hold its own questions list. games = {} # Load environment vars load_dotenv() app = Flask(__name__) app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev') # manage_session ensures Flask session is available in SocketIO handlers socketio = SocketIO(app, manage_session=True) # In-memory game state game = { 'players': {}, 'order': [], 'current_q': 0, 'answered': [], 'started': False } @app.route('/', methods=['GET']) def host(): # If this browser is already a player, don't let it hit the host view. if session.get('role') == 'player': secret = session.get('secret') if secret: return redirect(url_for('player', secret=secret)) return redirect(url_for('player')) session['role'] = 'host' # — manage per-game secret in session — secret = session.get('secret') if not secret: secret = uuid.uuid4().hex[:8] session['secret'] = secret # ensure game exists (with questions placeholder) if secret not in games: games[secret] = { 'questions': None, 'players': {}, 'order': [], 'current_q': 0, 'answered': [], 'started': False, 'finished': False, 'revealed': False } game = games[secret] # if no question file has been uploaded yet, show upload form if not game.get('questions'): return render_template('upload.html') # build join URL + QR join_url = url_for('player', secret=secret, _external=True) img = qrcode.make(join_url) buf = io.BytesIO() img.save(buf, format='PNG') qr_data = base64.b64encode(buf.getvalue()).decode('ascii') return render_template( 'host.html', qr_data=qr_data, join_url=join_url, secret=secret ) @app.route('/upload', methods=['POST']) def upload(): session['role'] = 'host' secret = session.get('secret') if not secret or secret not in games: return redirect(url_for('host')) game = games[secret] # retrieve uploaded file file = request.files.get('file') if not file: return render_template('upload.html', error='Keine Datei ausgewählt') # try to read Excel try: df = pd.read_excel(file) except Exception: return render_template('upload.html', error='Ungültige Excel-Datei') # enforce maximum of 100 questions max_questions = 100 if df.shape[0] > max_questions: return render_template( 'upload.html', error=f'Maximal dürfen {max_questions} Fragen hochgeladen werden. Ihre Datei enthält {df.shape[0]}.') # mandatory columns required = ['question', 'answer1', 'answer2', 'answer3', 'answer4', 'correct'] missing = [col for col in required if col not in df.columns] if missing: return render_template( 'upload.html', error=f"Fehler in Datei: Fehlende Spalte(n): {', '.join(missing)}" ) # validate that 'correct' references one of the answer columns valid_keys = ['answer1', 'answer2', 'answer3', 'answer4'] invalid_mask = ~df['correct'].isin(valid_keys) if invalid_mask.any(): idx = invalid_mask.idxmax() excel_line = idx + 2 # Header is line 1 bad_value = df.at[idx, 'correct'] return render_template( 'upload.html', error=( f'Fehler in Zeile {excel_line}: ' f'"correct" enthält "{bad_value}", ' f'muss einer der Werte {valid_keys} sein.' ) ) # build per-game question list questions = [] for _, row in df.iterrows(): col = row['correct'] questions.append({ 'question': str(row['question']), 'options': [str(row[f'answer{i}']) for i in range(1, 5)], 'correct': str(row[col]) }) game['questions'] = questions # redirect to host view, which will show the QR code return redirect(url_for('host')) @app.route('/new_game', methods=['POST']) def new_game(): if session.get('role') != 'host': abort(403) # remove the old game entirely (so its secret is no longer valid) old_secret = session.pop('secret', None) if old_secret and old_secret in games: del games[old_secret] return redirect(url_for('host')) @app.route('/player') def player(): session['role'] = 'player' # 1) Grab any secret from the URL new_secret = request.args.get('secret') old_secret = session.get('secret') # 2) If they provided a new secret, it *must* already exist or we reject immediately if new_secret: if new_secret not in games: return render_template( 'player.html', joined=False, state='join', options=[], answered=False, error='Spiel nicht gefunden' ), 400 # only clear join info if it really *is* a different, valid game if new_secret != old_secret: for k in ('joined','player_id','player_name'): session.pop(k, None) secret = new_secret else: secret = old_secret # 3) We need *some* secret in session — and it must still be in our games dict if not secret or secret not in games: return render_template( 'player.html', joined=False, state='join', options=[], answered=False, error='Spiel nicht gefunden' ), 400 # 4) Save it for future Socket.IO events session['secret'] = secret # At this point, we’ve wiped only *this user’s* join info if they scanned a new code. # We did *not* delete games[old_secret], so old games remain intact. joined = session.get('joined', False) # We don’t emit() here; the socket handlers will PUSH the right state. return render_template( 'player.html', joined=joined, state='join', # actual view switched via sync_state over Socket.IO options=[], answered=False ) @app.route('/player/join', methods=['POST']) def do_join(): name = (request.form.get('name') or 'Anonymous').strip() if not name: name = 'Anonymous' session['role'] = 'player' session['joined'] = True # make sure the player is still talking to a live game secret = session.get('secret') if not secret or secret not in games: return render_template( 'player.html', joined=False, state='join', options=[], answered=False, error='Spiel nicht gefunden' ), 400 # ─── enforce unique name per game (case-insensitive) ─── existing_lower = {v['name'].strip().lower() for v in games[secret]['players'].values()} if name.lower() in existing_lower: # do NOT mark session as joined; force them back to join view with error session['joined'] = False return render_template( 'player.html', joined=False, state='join', options=[], answered=False, error='Dieser Name ist bereits vergeben. Bitte wähle einen anderen.' ), 400 # proceed with normal join session['player_name'] = name session['player_id'] = str(uuid.uuid4()) return redirect(url_for('player', secret=session.get('secret'))) @socketio.on('connect') def on_connect(): secret = session.get('secret') if not secret or secret not in games: return game = games[secret] role = session.get('role') if role == 'host': join_room('host') # ─── if the game is already finished, just re-emit the final board ─── if game.get('finished'): board = sorted( [{'name': v['name'], 'score': v['score']} for v in game['players'].values()], key=lambda x: -x['score'] ) emit('game_over', {'board': board}, room='host') return # otherwise fall back to normal roster / question logic players = [game['players'][pid]['name'] for pid in game['order']] emit('update_roster', {'players': players}) if game.get('started') and game['current_q'] < len(game['questions']): q = game['questions'][game['current_q']] emit('new_question', {'question': q['question']}, room='host') return # player logic if role != 'player' or not session.get('joined'): return pid = session['player_id'] join_room('player') join_room(pid) if pid not in game['players']: game['players'][pid] = { 'name': session.get('player_name'), 'score': 0 } game['order'].append(pid) socketio.emit( 'update_roster', {'players': [game['players'][p]['name'] for p in game['order']]}, room='host' ) # delegate to rejoin logic _send_sync_state(pid, game) @socketio.on('start_game') def on_start_game(): secret = session.get('secret') if session.get('role') != 'host' or not secret or secret not in games: return game = games[secret] # if already through, reset if game.get('started') or game['current_q'] >= len(game['questions']): game.update({ 'current_q': 0, 'answered': [], 'started': False, 'finished': False }) for p in game['players'].values(): p['score'] = 0.0 game['started'] = True socketio.emit('game_started', room='player') _send_question(secret) @socketio.on('get_roster') def on_get_roster(): secret = session.get('secret') if not secret or secret not in games: return game = games[secret] roster = [game['players'][s]['name'] for s in game['order']] emit('update_roster', {'players': roster}) @socketio.on('submit_answer') def on_submit_answer(data): secret = session.get('secret') pid = session.get('player_id') if not secret or secret not in games or pid not in games[secret]['players']: return game = games[secret] if game.get('revealed'): return if any(pid == t[0] for t in game['answered']): return # capture reaction time now = time.perf_counter() q_started = game.get('q_started', now) rt = max(0.0, now - q_started) # store (pid, answer, rt) game['answered'].append((pid, data.get('answer'), rt)) _score_and_emit(secret) @socketio.on('next_question') def on_next_question(): secret = session.get('secret') if session.get('role') != 'host' or not secret or secret not in games: return game = games[secret] game['current_q'] += 1 if game['current_q'] < len(game['questions']): _send_question(secret) else: # mark finished & emit final boards game['finished'] = True board = sorted( [{'name':v['name'],'score':v['score']} for v in game['players'].values()], key=lambda x: -x['score'] ) socketio.emit('game_over', {'board': board}, room='host') for psid,pdata in game['players'].items(): placement = next(i+1 for i,p in enumerate(board) if p['name']==pdata['name']) socketio.emit( 'game_over', {'placement': placement, 'score': pdata['score']}, room=psid ) @socketio.on('rejoin_game') def on_rejoin_game(): secret = session.get('secret') pid = session.get('player_id') if not secret or secret not in games or not pid: return game = games[secret] join_room('player') join_room(pid) _send_sync_state(pid, game) @socketio.on('show_answer') def on_show_answer(): secret = session.get('secret') if session.get('role') != 'host' or not secret or secret not in games: return game = games[secret] game['revealed'] = True # find the correct answer text for current question correct = game['questions'][game['current_q']]['correct'] # emit to host so they can display it emit('answer_revealed', {'correct': correct}, room='host') # emit to all players so they can highlight it socketio.emit('answer_revealed', {'correct': correct}, room='player') # helpers def _send_sync_state(pid, game): # finished? if game.get('finished'): board = sorted( [{'name': v['name'], 'score': v['score']} for v in game['players'].values()], key=lambda x: -x['score'] ) me = game['players'][pid] placement = next(i+1 for i,p in enumerate(board) if p['name']==me['name']) emit('sync_state', { 'state': 'end', 'score': me['score'], 'placement': placement }, room=pid) # waiting? elif not game['started']: emit('sync_state', {'state':'waiting'}, room=pid) # question? else: idx = game['current_q'] q = game['questions'][idx] answered = any(pid == t[0] for t in game['answered']) emit('sync_state', { 'state': 'question', 'question': q['question'], 'options': q['options'], 'answered': answered }, room=pid) def _send_question(secret): game = games[secret] game['revealed'] = False q = game['questions'][game['current_q']] game['answered'].clear() # NEW: start monotonic timer for this question game['q_started'] = time.perf_counter() game['q_deadline_s'] = 20 # configurable per question if you want socketio.emit('new_question', {'question': q['question'], 'options': q['options']}, room='player') socketio.emit('new_question', {'question': q['question']}, room='host') def _calc_points(rt_s: float, deadline_s: float, correct: bool) -> float: if not correct or deadline_s <= 0: return 0.0 if rt_s < 0: rt_s = 0.0 if rt_s >= deadline_s: return 0.0 BASE = 10 MIN = 1 frac = 1.0 - (rt_s / float(deadline_s)) pts = MIN + (BASE - MIN) * frac return round(pts, 1) # ← keep a decimal, not an int def _score_and_emit(secret): game = games[secret] per_q = [] correct = game['questions'][game['current_q']]['correct'] pid, ans, rt = game['answered'][-1] # NEW: includes rt # NEW: time-based points for correct answers pts = _calc_points(rt, game.get('q_deadline_s', 20), ans == correct) if pts > 0: game['players'][pid]['score'] += pts per_q.append({ 'name': game['players'][pid]['name'], 'points': pts }) board = sorted( [{'name': v['name'], 'score': v['score']} for v in game['players'].values()], key=lambda x: -x['score'] ) socketio.emit('question_leader', {'results': per_q}, room='host') socketio.emit('question_leader', {'results': per_q}, room='player') socketio.emit('overall_leader', {'board': board}, room='host') socketio.emit('overall_leader', {'board': board}, room='player') # if that was the last question, finish up if game['current_q'] >= len(game['questions']): game['finished'] = True socketio.emit('game_over', {'board': board}, room='host') for psid, pdata in game['players'].items(): placement = next(i+1 for i, p in enumerate(board) if p['name'] == pdata['name']) socketio.emit( 'game_over', {'placement': placement, 'score': pdata['score']}, room=psid ) if __name__ == '__main__': # show own IP address import socket hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) print(f"Running on http://{ip_address}:5000/") socketio.run(app, host='0.0.0.0', port=5000)