430 lines
13 KiB
Python
430 lines
13 KiB
Python
# --- app.py ---
|
||
import os
|
||
import io
|
||
import base64
|
||
import pandas as pd
|
||
import qrcode
|
||
import uuid
|
||
from flask import Flask, render_template, request, session, redirect, url_for, abort
|
||
from flask_socketio import SocketIO, emit, join_room
|
||
from dotenv import load_dotenv
|
||
|
||
# global store of all games, keyed by secret
|
||
games = {}
|
||
|
||
# Load environment vars
|
||
load_dotenv()
|
||
app = Flask(__name__)
|
||
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev')
|
||
# manage_session ensures Flask session is available in SocketIO handlers
|
||
socketio = SocketIO(app, manage_session=True)
|
||
|
||
# Expect columns: question, answer1..answer4, correct (column name)
|
||
df = pd.read_excel('quizzes/questions.xlsx')
|
||
questions = []
|
||
for _, row in df.iterrows():
|
||
correct_col = row['correct'] # e.g. 'answer3'
|
||
correct_text = row[correct_col]
|
||
questions.append({
|
||
'question': row['question'],
|
||
'options': [row[f'answer{i}'] for i in range(1,5)], # only four options
|
||
'correct': correct_text
|
||
})
|
||
|
||
# In-memory game state
|
||
game = {
|
||
'players': {},
|
||
'order': [],
|
||
'current_q': 0,
|
||
'answered': [],
|
||
'started': False
|
||
}
|
||
|
||
@app.route('/')
|
||
def host():
|
||
session['role'] = 'host'
|
||
# — manage per-game secret in session —
|
||
secret = session.get('secret')
|
||
if not secret:
|
||
secret = uuid.uuid4().hex[:8]
|
||
session['secret'] = secret
|
||
# ensure game exists
|
||
if secret not in games:
|
||
games[secret] = {
|
||
'players': {}, 'order': [],
|
||
'current_q': 0, 'answered': [],
|
||
'started': False, 'finished': False,
|
||
'revealed': False
|
||
}
|
||
|
||
# build join URL + QR
|
||
join_url = url_for('player', secret=secret, _external=True)
|
||
img = qrcode.make(join_url)
|
||
buf = io.BytesIO()
|
||
img.save(buf, format='PNG')
|
||
qr_data = base64.b64encode(buf.getvalue()).decode('ascii')
|
||
|
||
return render_template(
|
||
'host.html',
|
||
qr_data=qr_data,
|
||
join_url=join_url,
|
||
secret=secret
|
||
)
|
||
|
||
|
||
@app.route('/new_game', methods=['POST'])
|
||
def new_game():
|
||
if session.get('role') != 'host':
|
||
abort(403)
|
||
# remove the old game entirely (so its secret is no longer valid)
|
||
old_secret = session.pop('secret', None)
|
||
if old_secret and old_secret in games:
|
||
del games[old_secret]
|
||
return redirect(url_for('host'))
|
||
|
||
@app.route('/player')
|
||
def player():
|
||
session['role'] = 'player'
|
||
|
||
# 1) Grab any secret from the URL
|
||
new_secret = request.args.get('secret')
|
||
old_secret = session.get('secret')
|
||
|
||
# 2) If they provided a new secret, it *must* already exist or we reject immediately
|
||
if new_secret:
|
||
if new_secret not in games:
|
||
return render_template(
|
||
'player.html',
|
||
joined=False,
|
||
state='join',
|
||
options=[],
|
||
answered=False,
|
||
error='Spiel nicht gefunden'
|
||
), 400
|
||
# only clear join info if it really *is* a different, valid game
|
||
if new_secret != old_secret:
|
||
for k in ('joined','player_id','player_name'):
|
||
session.pop(k, None)
|
||
secret = new_secret
|
||
else:
|
||
secret = old_secret
|
||
|
||
# 3) We need *some* secret in session — and it must still be in our games dict
|
||
if not secret or secret not in games:
|
||
return render_template(
|
||
'player.html',
|
||
joined=False,
|
||
state='join',
|
||
options=[],
|
||
answered=False,
|
||
error='Spiel nicht gefunden'
|
||
), 400
|
||
|
||
# 4) Save it for future Socket.IO events
|
||
session['secret'] = secret
|
||
|
||
# At this point, we’ve wiped only *this user’s* join info if they scanned a new code.
|
||
# We did *not* delete games[old_secret], so old games remain intact.
|
||
|
||
joined = session.get('joined', False)
|
||
# We don’t emit() here; the socket handlers will PUSH the right state.
|
||
return render_template(
|
||
'player.html',
|
||
joined=joined,
|
||
state='join', # actual view switched via sync_state over Socket.IO
|
||
options=[],
|
||
answered=False
|
||
)
|
||
|
||
|
||
@app.route('/player/join', methods=['POST'])
|
||
def do_join():
|
||
name = request.form.get('name') or 'Anonymous'
|
||
session['role'] = 'player'
|
||
session['joined'] = True
|
||
# make sure the player is still talking to a live game
|
||
secret = session.get('secret')
|
||
if not secret or secret not in games:
|
||
return render_template(
|
||
'player.html',
|
||
joined=False,
|
||
state='join',
|
||
options=[],
|
||
answered=False,
|
||
error='Spiel nicht gefunden'
|
||
), 400
|
||
|
||
session['player_name'] = name
|
||
session['player_id'] = str(uuid.uuid4())
|
||
return redirect(
|
||
url_for('player', secret=session.get('secret'))
|
||
)
|
||
|
||
|
||
@socketio.on('connect')
|
||
def on_connect():
|
||
secret = session.get('secret')
|
||
if not secret or secret not in games:
|
||
return
|
||
game = games[secret]
|
||
role = session.get('role')
|
||
|
||
if role == 'host':
|
||
join_room('host')
|
||
|
||
# ─── if the game is already finished, just re-emit the final board ───
|
||
if game.get('finished'):
|
||
board = sorted(
|
||
[{'name': v['name'], 'score': v['score']} for v in game['players'].values()],
|
||
key=lambda x: -x['score']
|
||
)
|
||
emit('game_over', {'board': board}, room='host')
|
||
return
|
||
|
||
# otherwise fall back to normal roster / question logic
|
||
players = [game['players'][pid]['name'] for pid in game['order']]
|
||
emit('update_roster', {'players': players})
|
||
|
||
if game.get('started') and game['current_q'] < len(questions):
|
||
q = questions[game['current_q']]
|
||
emit('new_question', {'question': q['question']}, room='host')
|
||
return
|
||
|
||
# player logic
|
||
if role != 'player' or not session.get('joined'):
|
||
return
|
||
|
||
pid = session['player_id']
|
||
join_room('player')
|
||
join_room(pid)
|
||
|
||
if pid not in game['players']:
|
||
game['players'][pid] = {
|
||
'name': session.get('player_name'),
|
||
'score': 0
|
||
}
|
||
game['order'].append(pid)
|
||
socketio.emit(
|
||
'update_roster',
|
||
{'players': [game['players'][p]['name'] for p in game['order']]},
|
||
room='host'
|
||
)
|
||
|
||
# delegate to rejoin logic
|
||
_send_sync_state(pid, game)
|
||
|
||
|
||
|
||
@socketio.on('start_game')
|
||
def on_start_game():
|
||
secret = session.get('secret')
|
||
if session.get('role') != 'host' or not secret or secret not in games:
|
||
return
|
||
game = games[secret]
|
||
|
||
# if already through, reset
|
||
if game.get('started') or game['current_q'] >= len(questions):
|
||
game.update({
|
||
'current_q': 0,
|
||
'answered': [],
|
||
'started': False,
|
||
'finished': False
|
||
})
|
||
for p in game['players'].values():
|
||
p['score'] = 0
|
||
|
||
game['started'] = True
|
||
socketio.emit('game_started', room='player')
|
||
_send_question(secret)
|
||
|
||
|
||
|
||
@socketio.on('get_roster')
|
||
def on_get_roster():
|
||
secret = session.get('secret')
|
||
if not secret or secret not in games:
|
||
return
|
||
game = games[secret]
|
||
roster = [game['players'][s]['name'] for s in game['order']]
|
||
emit('update_roster', {'players': roster})
|
||
|
||
|
||
|
||
@socketio.on('submit_answer')
|
||
def on_submit_answer(data):
|
||
secret = session.get('secret')
|
||
pid = session.get('player_id')
|
||
if not secret or secret not in games or pid not in games[secret]['players']:
|
||
return
|
||
game = games[secret]
|
||
if game.get('revealed'):
|
||
return
|
||
if any(pid == p for p,_ in game['answered']):
|
||
return
|
||
|
||
game['answered'].append((pid, data.get('answer')))
|
||
_score_and_emit(secret)
|
||
|
||
|
||
@socketio.on('next_question')
|
||
def on_next_question():
|
||
secret = session.get('secret')
|
||
if session.get('role') != 'host' or not secret or secret not in games:
|
||
return
|
||
game = games[secret]
|
||
game['current_q'] += 1
|
||
|
||
if game['current_q'] < len(questions):
|
||
_send_question(secret)
|
||
else:
|
||
# mark finished & emit final boards
|
||
game['finished'] = True
|
||
board = sorted(
|
||
[{'name':v['name'],'score':v['score']} for v in game['players'].values()],
|
||
key=lambda x: -x['score']
|
||
)
|
||
socketio.emit('game_over', {'board': board}, room='host')
|
||
for psid,pdata in game['players'].items():
|
||
placement = next(i+1 for i,p in enumerate(board) if p['name']==pdata['name'])
|
||
socketio.emit(
|
||
'game_over',
|
||
{'placement': placement, 'score': pdata['score']},
|
||
room=psid
|
||
)
|
||
|
||
|
||
|
||
@socketio.on('rejoin_game')
|
||
def on_rejoin_game():
|
||
secret = session.get('secret')
|
||
pid = session.get('player_id')
|
||
if not secret or secret not in games or not pid:
|
||
return
|
||
game = games[secret]
|
||
join_room('player')
|
||
join_room(pid)
|
||
_send_sync_state(pid, game)
|
||
|
||
|
||
@socketio.on('show_answer')
|
||
def on_show_answer():
|
||
secret = session.get('secret')
|
||
if session.get('role') != 'host' or not secret or secret not in games:
|
||
return
|
||
game = games[secret]
|
||
game['revealed'] = True
|
||
# find the correct answer text for current question
|
||
correct = questions[game['current_q']]['correct']
|
||
# emit to host so they can display it
|
||
emit('answer_revealed', {'correct': correct}, room='host')
|
||
# emit to all players so they can highlight it
|
||
socketio.emit('answer_revealed', {'correct': correct}, room='player')
|
||
|
||
|
||
# helpers
|
||
|
||
def _send_sync_state(pid, game):
|
||
# finished?
|
||
if game.get('finished'):
|
||
board = sorted(
|
||
[{'name': v['name'], 'score': v['score']} for v in game['players'].values()],
|
||
key=lambda x: -x['score']
|
||
)
|
||
me = game['players'][pid]
|
||
placement = next(i+1 for i,p in enumerate(board) if p['name']==me['name'])
|
||
emit('sync_state', {
|
||
'state': 'end',
|
||
'score': me['score'],
|
||
'placement': placement
|
||
}, room=pid)
|
||
# waiting?
|
||
elif not game['started']:
|
||
emit('sync_state', {'state':'waiting'}, room=pid)
|
||
# question?
|
||
else:
|
||
idx = game['current_q']
|
||
q = questions[idx]
|
||
answered = any(pid == p for p,_ in game['answered'])
|
||
emit('sync_state', {
|
||
'state': 'question',
|
||
'question': q['question'],
|
||
'options': q['options'],
|
||
'answered': answered
|
||
}, room=pid)
|
||
|
||
|
||
def _send_question(secret):
|
||
game = games[secret]
|
||
game['revealed'] = False
|
||
q = questions[game['current_q']]
|
||
# clear previous answers
|
||
game['answered'].clear()
|
||
# broadcast
|
||
socketio.emit(
|
||
'new_question',
|
||
{'question': q['question'], 'options': q['options']},
|
||
room='player'
|
||
)
|
||
socketio.emit(
|
||
'new_question',
|
||
{'question': q['question']},
|
||
room='host'
|
||
)
|
||
|
||
|
||
def _score_and_emit(secret):
|
||
game = games[secret]
|
||
score_map = {1: 4, 2: 3, 3: 2, 4: 1}
|
||
per_q = []
|
||
|
||
# grab the current question’s correct answer
|
||
correct = questions[game['current_q']]['correct']
|
||
# take only the most‐recent submission
|
||
pid, ans = game['answered'][-1]
|
||
|
||
# count how many *earlier* answers were correct
|
||
prev_correct = sum(1 for p, a in game['answered'][:-1] if a == correct)
|
||
rank = prev_correct + 1
|
||
|
||
# assign points only if this one is correct and within top‐4
|
||
if ans == correct and rank <= 4:
|
||
pts = score_map[rank]
|
||
game['players'][pid]['score'] += pts
|
||
per_q.append({
|
||
'name': game['players'][pid]['name'],
|
||
'points': pts
|
||
})
|
||
|
||
# rebuild overall leaderboard
|
||
board = sorted(
|
||
[{'name': v['name'], 'score': v['score']} for v in game['players'].values()],
|
||
key=lambda x: -x['score']
|
||
)
|
||
|
||
# emit just the new‐answer results, plus the overall board
|
||
socketio.emit('question_leader', {'results': per_q}, room='host')
|
||
socketio.emit('question_leader', {'results': per_q}, room='player')
|
||
socketio.emit('overall_leader', {'board': board}, room='host')
|
||
socketio.emit('overall_leader', {'board': board}, room='player')
|
||
|
||
# if that was the last question, finish up
|
||
if game['current_q'] >= len(questions):
|
||
game['finished'] = True
|
||
socketio.emit('game_over', {'board': board}, room='host')
|
||
for psid, pdata in game['players'].items():
|
||
placement = next(i+1 for i, p in enumerate(board) if p['name'] == pdata['name'])
|
||
socketio.emit(
|
||
'game_over',
|
||
{'placement': placement, 'score': pdata['score']},
|
||
room=psid
|
||
)
|
||
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# show own IP address
|
||
import socket
|
||
hostname = socket.gethostname()
|
||
ip_address = socket.gethostbyname(hostname)
|
||
print(f"Running on http://{ip_address}:5000/")
|
||
socketio.run(app, host='0.0.0.0', port=5000) |