# --- app.py --- import os import io import base64 import pandas as pd import qrcode import uuid from flask import Flask, render_template, request, session, redirect, url_for, abort from flask_socketio import SocketIO, emit, join_room from dotenv import load_dotenv # global store of all games, keyed by secret games = {} # Load environment vars load_dotenv() app = Flask(__name__) app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev') # manage_session ensures Flask session is available in SocketIO handlers socketio = SocketIO(app, manage_session=True) # Expect columns: question, answer1..answer4, correct (column name) df = pd.read_excel('quizzes/questions.xlsx') questions = [] for _, row in df.iterrows(): correct_col = row['correct'] # e.g. 'answer3' correct_text = row[correct_col] questions.append({ 'question': row['question'], 'options': [row[f'answer{i}'] for i in range(1,5)], # only four options 'correct': correct_text }) # In-memory game state game = { 'players': {}, 'order': [], 'current_q': 0, 'answered': [], 'started': False } @app.route('/') def host(): session['role'] = 'host' # — manage per-game secret in session — secret = session.get('secret') if not secret: secret = uuid.uuid4().hex[:8] session['secret'] = secret # ensure game exists if secret not in games: games[secret] = { 'players': {}, 'order': [], 'current_q': 0, 'answered': [], 'started': False, 'finished': False } # build join URL + QR join_url = url_for('player', secret=secret, _external=True) img = qrcode.make(join_url) buf = io.BytesIO() img.save(buf, format='PNG') qr_data = base64.b64encode(buf.getvalue()).decode('ascii') return render_template( 'host.html', qr_data=qr_data, join_url=join_url, secret=secret ) @app.route('/new_game', methods=['POST']) def new_game(): if session.get('role') != 'host': abort(403) # remove the old game entirely (so its secret is no longer valid) old_secret = session.pop('secret', None) if old_secret and old_secret in games: del games[old_secret] return redirect(url_for('host')) @app.route('/player') def player(): session['role'] = 'player' # 1) Grab any secret from the URL new_secret = request.args.get('secret') old_secret = session.get('secret') # 2) If they provided a new secret, it *must* already exist or we reject immediately if new_secret: if new_secret not in games: return render_template( 'player.html', joined=False, state='join', options=[], answered=False, error='Spiel nicht gefunden' ), 400 # only clear join info if it really *is* a different, valid game if new_secret != old_secret: for k in ('joined','player_id','player_name'): session.pop(k, None) secret = new_secret else: secret = old_secret # 3) We need *some* secret in session — and it must still be in our games dict if not secret or secret not in games: return render_template( 'player.html', joined=False, state='join', options=[], answered=False, error='Spiel nicht gefunden' ), 400 # 4) Save it for future Socket.IO events session['secret'] = secret # At this point, we’ve wiped only *this user’s* join info if they scanned a new code. # We did *not* delete games[old_secret], so old games remain intact. joined = session.get('joined', False) # We don’t emit() here; the socket handlers will PUSH the right state. return render_template( 'player.html', joined=joined, state='join', # actual view switched via sync_state over Socket.IO options=[], answered=False ) @app.route('/player/join', methods=['POST']) def do_join(): name = request.form.get('name') or 'Anonymous' session['role'] = 'player' session['joined'] = True # make sure the player is still talking to a live game secret = session.get('secret') if not secret or secret not in games: return render_template( 'player.html', joined=False, state='join', options=[], answered=False, error='Spiel nicht gefunden' ), 400 session['player_name'] = name session['player_id'] = str(uuid.uuid4()) return redirect( url_for('player', secret=session.get('secret')) ) @socketio.on('connect') def on_connect(): secret = session.get('secret') if not secret or secret not in games: return game = games[secret] role = session.get('role') if role == 'host': join_room('host') # ─── if the game is already finished, just re-emit the final board ─── if game.get('finished'): board = sorted( [{'name': v['name'], 'score': v['score']} for v in game['players'].values()], key=lambda x: -x['score'] ) emit('game_over', {'board': board}, room='host') return # otherwise fall back to normal roster / question logic players = [game['players'][pid]['name'] for pid in game['order']] emit('update_roster', {'players': players}) if game.get('started') and game['current_q'] < len(questions): q = questions[game['current_q']] emit('new_question', {'question': q['question']}, room='host') return # player logic if role != 'player' or not session.get('joined'): return pid = session['player_id'] join_room('player') join_room(pid) if pid not in game['players']: game['players'][pid] = { 'name': session.get('player_name'), 'score': 0 } game['order'].append(pid) socketio.emit( 'update_roster', {'players': [game['players'][p]['name'] for p in game['order']]}, room='host' ) # delegate to rejoin logic _send_sync_state(pid, game) @socketio.on('start_game') def on_start_game(): secret = session.get('secret') if session.get('role') != 'host' or not secret or secret not in games: return game = games[secret] # if already through, reset if game.get('started') or game['current_q'] >= len(questions): game.update({ 'current_q': 0, 'answered': [], 'started': False, 'finished': False }) for p in game['players'].values(): p['score'] = 0 game['started'] = True socketio.emit('game_started', room='player') _send_question(secret) @socketio.on('get_roster') def on_get_roster(): secret = session.get('secret') if not secret or secret not in games: return game = games[secret] roster = [game['players'][s]['name'] for s in game['order']] emit('update_roster', {'players': roster}) @socketio.on('submit_answer') def on_submit_answer(data): secret = session.get('secret') pid = session.get('player_id') if not secret or secret not in games or pid not in games[secret]['players']: return game = games[secret] if any(pid == p for p,_ in game['answered']): return game['answered'].append((pid, data.get('answer'))) _score_and_emit(secret) @socketio.on('next_question') def on_next_question(): secret = session.get('secret') if session.get('role') != 'host' or not secret or secret not in games: return game = games[secret] game['current_q'] += 1 if game['current_q'] < len(questions): _send_question(secret) else: # mark finished & emit final boards game['finished'] = True board = sorted( [{'name':v['name'],'score':v['score']} for v in game['players'].values()], key=lambda x: -x['score'] ) socketio.emit('game_over', {'board': board}, room='host') for psid,pdata in game['players'].items(): placement = next(i+1 for i,p in enumerate(board) if p['name']==pdata['name']) socketio.emit( 'game_over', {'placement': placement, 'score': pdata['score']}, room=psid ) @socketio.on('rejoin_game') def on_rejoin_game(): secret = session.get('secret') pid = session.get('player_id') if not secret or secret not in games or not pid: return game = games[secret] join_room('player') join_room(pid) _send_sync_state(pid, game) # helpers def _send_sync_state(pid, game): # finished? if game.get('finished'): board = sorted( [{'name': v['name'], 'score': v['score']} for v in game['players'].values()], key=lambda x: -x['score'] ) me = game['players'][pid] placement = next(i+1 for i,p in enumerate(board) if p['name']==me['name']) emit('sync_state', { 'state': 'end', 'score': me['score'], 'placement': placement }, room=pid) # waiting? elif not game['started']: emit('sync_state', {'state':'waiting'}, room=pid) # question? else: idx = game['current_q'] q = questions[idx] answered = any(pid == p for p,_ in game['answered']) emit('sync_state', { 'state': 'question', 'question': q['question'], 'options': q['options'], 'answered': answered }, room=pid) def _send_question(secret): game = games[secret] q = questions[game['current_q']] # clear previous answers game['answered'].clear() # broadcast socketio.emit( 'new_question', {'question': q['question'], 'options': q['options']}, room='player' ) socketio.emit( 'new_question', {'question': q['question']}, room='host' ) def _score_and_emit(secret): game = games[secret] score_map = {1: 4, 2: 3, 3: 2, 4: 1} per_q = [] # grab the current question’s correct answer correct = questions[game['current_q']]['correct'] # take only the most‐recent submission pid, ans = game['answered'][-1] # count how many *earlier* answers were correct prev_correct = sum(1 for p, a in game['answered'][:-1] if a == correct) rank = prev_correct + 1 # assign points only if this one is correct and within top‐4 if ans == correct and rank <= 4: pts = score_map[rank] game['players'][pid]['score'] += pts per_q.append({ 'name': game['players'][pid]['name'], 'points': pts }) # rebuild overall leaderboard board = sorted( [{'name': v['name'], 'score': v['score']} for v in game['players'].values()], key=lambda x: -x['score'] ) # emit just the new‐answer results, plus the overall board socketio.emit('question_leader', {'results': per_q}, room='host') socketio.emit('question_leader', {'results': per_q}, room='player') socketio.emit('overall_leader', {'board': board}, room='host') socketio.emit('overall_leader', {'board': board}, room='player') # if that was the last question, finish up if game['current_q'] >= len(questions): game['finished'] = True socketio.emit('game_over', {'board': board}, room='host') for psid, pdata in game['players'].items(): placement = next(i+1 for i, p in enumerate(board) if p['name'] == pdata['name']) socketio.emit( 'game_over', {'placement': placement, 'score': pdata['score']}, room=psid ) if __name__ == '__main__': socketio.run(app, host='0.0.0.0', port=5000)