publicquiz/app.py

458 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# --- app.py ---
import os
import io
import base64
import pandas as pd
import qrcode
import uuid
from flask import Flask, render_template, request, session, redirect, url_for, abort
from flask_socketio import SocketIO, emit, join_room
from dotenv import load_dotenv
# global store of all games, keyed by secret. Each entry will hold its own questions list.
games = {}
# Load environment vars
load_dotenv()
app = Flask(__name__)
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev')
# manage_session ensures Flask session is available in SocketIO handlers
socketio = SocketIO(app, manage_session=True)
# In-memory game state
game = {
'players': {},
'order': [],
'current_q': 0,
'answered': [],
'started': False
}
@app.route('/', methods=['GET'])
def host():
session['role'] = 'host'
# — manage per-game secret in session —
secret = session.get('secret')
if not secret:
secret = uuid.uuid4().hex[:8]
session['secret'] = secret
# ensure game exists (with questions placeholder)
if secret not in games:
games[secret] = {
'questions': None,
'players': {}, 'order': [],
'current_q': 0, 'answered': [],
'started': False, 'finished': False,
'revealed': False
}
game = games[secret]
# if no question file has been uploaded yet, show upload form
if not game.get('questions'):
return render_template('upload.html')
# build join URL + QR
join_url = url_for('player', secret=secret, _external=True)
img = qrcode.make(join_url)
buf = io.BytesIO()
img.save(buf, format='PNG')
qr_data = base64.b64encode(buf.getvalue()).decode('ascii')
return render_template(
'host.html',
qr_data=qr_data,
join_url=join_url,
secret=secret
)
@app.route('/upload', methods=['POST'])
def upload():
session['role'] = 'host'
secret = session.get('secret')
if not secret or secret not in games:
return redirect(url_for('host'))
game = games[secret]
# retrieve uploaded file
file = request.files.get('file')
if not file:
return render_template('upload.html', error='Keine Datei ausgewählt')
# try to read Excel
try:
df = pd.read_excel(file)
except Exception:
return render_template('upload.html', error='Ungültige Excel-Datei')
# mandatory columns
required = ['question','answer1','answer2','answer3','answer4','correct']
if not all(col in df.columns for col in required):
return render_template('upload.html', error=f'Fehler in Datei: Fehlende Spalten. Erforderlich: {required}')
# validate that 'correct' references one of the answer columns
if not df['correct'].isin(['answer1','answer2','answer3','answer4']).all():
return render_template('upload.html', error='Fehler in Datei: Antwortspalte "correct" muss einen der Werte "answer1", "answer2", "answer3" oder "answer4" enthalten.')
# build per-game question list
questions = []
for _, row in df.iterrows():
col = row['correct']
questions.append({
'question': row['question'],
'options': [row[f'answer{i}'] for i in range(1,5)],
'correct': row[col]
})
game['questions'] = questions
# once upload and validation succeeds, redirect to host view (which will show the QR)
return redirect(url_for('host'))
@app.route('/new_game', methods=['POST'])
def new_game():
if session.get('role') != 'host':
abort(403)
# remove the old game entirely (so its secret is no longer valid)
old_secret = session.pop('secret', None)
if old_secret and old_secret in games:
del games[old_secret]
return redirect(url_for('host'))
@app.route('/player')
def player():
session['role'] = 'player'
# 1) Grab any secret from the URL
new_secret = request.args.get('secret')
old_secret = session.get('secret')
# 2) If they provided a new secret, it *must* already exist or we reject immediately
if new_secret:
if new_secret not in games:
return render_template(
'player.html',
joined=False,
state='join',
options=[],
answered=False,
error='Spiel nicht gefunden'
), 400
# only clear join info if it really *is* a different, valid game
if new_secret != old_secret:
for k in ('joined','player_id','player_name'):
session.pop(k, None)
secret = new_secret
else:
secret = old_secret
# 3) We need *some* secret in session — and it must still be in our games dict
if not secret or secret not in games:
return render_template(
'player.html',
joined=False,
state='join',
options=[],
answered=False,
error='Spiel nicht gefunden'
), 400
# 4) Save it for future Socket.IO events
session['secret'] = secret
# At this point, weve wiped only *this users* join info if they scanned a new code.
# We did *not* delete games[old_secret], so old games remain intact.
joined = session.get('joined', False)
# We dont emit() here; the socket handlers will PUSH the right state.
return render_template(
'player.html',
joined=joined,
state='join', # actual view switched via sync_state over Socket.IO
options=[],
answered=False
)
@app.route('/player/join', methods=['POST'])
def do_join():
name = request.form.get('name') or 'Anonymous'
session['role'] = 'player'
session['joined'] = True
# make sure the player is still talking to a live game
secret = session.get('secret')
if not secret or secret not in games:
return render_template(
'player.html',
joined=False,
state='join',
options=[],
answered=False,
error='Spiel nicht gefunden'
), 400
session['player_name'] = name
session['player_id'] = str(uuid.uuid4())
return redirect(
url_for('player', secret=session.get('secret'))
)
@socketio.on('connect')
def on_connect():
secret = session.get('secret')
if not secret or secret not in games:
return
game = games[secret]
role = session.get('role')
if role == 'host':
join_room('host')
# ─── if the game is already finished, just re-emit the final board ───
if game.get('finished'):
board = sorted(
[{'name': v['name'], 'score': v['score']} for v in game['players'].values()],
key=lambda x: -x['score']
)
emit('game_over', {'board': board}, room='host')
return
# otherwise fall back to normal roster / question logic
players = [game['players'][pid]['name'] for pid in game['order']]
emit('update_roster', {'players': players})
if game.get('started') and game['current_q'] < len(game['questions']):
q = game['questions'][game['current_q']]
emit('new_question', {'question': q['question']}, room='host')
return
# player logic
if role != 'player' or not session.get('joined'):
return
pid = session['player_id']
join_room('player')
join_room(pid)
if pid not in game['players']:
game['players'][pid] = {
'name': session.get('player_name'),
'score': 0
}
game['order'].append(pid)
socketio.emit(
'update_roster',
{'players': [game['players'][p]['name'] for p in game['order']]},
room='host'
)
# delegate to rejoin logic
_send_sync_state(pid, game)
@socketio.on('start_game')
def on_start_game():
secret = session.get('secret')
if session.get('role') != 'host' or not secret or secret not in games:
return
game = games[secret]
# if already through, reset
if game.get('started') or game['current_q'] >= len(game['questions']):
game.update({
'current_q': 0,
'answered': [],
'started': False,
'finished': False
})
for p in game['players'].values():
p['score'] = 0
game['started'] = True
socketio.emit('game_started', room='player')
_send_question(secret)
@socketio.on('get_roster')
def on_get_roster():
secret = session.get('secret')
if not secret or secret not in games:
return
game = games[secret]
roster = [game['players'][s]['name'] for s in game['order']]
emit('update_roster', {'players': roster})
@socketio.on('submit_answer')
def on_submit_answer(data):
secret = session.get('secret')
pid = session.get('player_id')
if not secret or secret not in games or pid not in games[secret]['players']:
return
game = games[secret]
if game.get('revealed'):
return
if any(pid == p for p,_ in game['answered']):
return
game['answered'].append((pid, data.get('answer')))
_score_and_emit(secret)
@socketio.on('next_question')
def on_next_question():
secret = session.get('secret')
if session.get('role') != 'host' or not secret or secret not in games:
return
game = games[secret]
game['current_q'] += 1
if game['current_q'] < len(game['questions']):
_send_question(secret)
else:
# mark finished & emit final boards
game['finished'] = True
board = sorted(
[{'name':v['name'],'score':v['score']} for v in game['players'].values()],
key=lambda x: -x['score']
)
socketio.emit('game_over', {'board': board}, room='host')
for psid,pdata in game['players'].items():
placement = next(i+1 for i,p in enumerate(board) if p['name']==pdata['name'])
socketio.emit(
'game_over',
{'placement': placement, 'score': pdata['score']},
room=psid
)
@socketio.on('rejoin_game')
def on_rejoin_game():
secret = session.get('secret')
pid = session.get('player_id')
if not secret or secret not in games or not pid:
return
game = games[secret]
join_room('player')
join_room(pid)
_send_sync_state(pid, game)
@socketio.on('show_answer')
def on_show_answer():
secret = session.get('secret')
if session.get('role') != 'host' or not secret or secret not in games:
return
game = games[secret]
game['revealed'] = True
# find the correct answer text for current question
correct = game['questions'][game['current_q']]['correct']
# emit to host so they can display it
emit('answer_revealed', {'correct': correct}, room='host')
# emit to all players so they can highlight it
socketio.emit('answer_revealed', {'correct': correct}, room='player')
# helpers
def _send_sync_state(pid, game):
# finished?
if game.get('finished'):
board = sorted(
[{'name': v['name'], 'score': v['score']} for v in game['players'].values()],
key=lambda x: -x['score']
)
me = game['players'][pid]
placement = next(i+1 for i,p in enumerate(board) if p['name']==me['name'])
emit('sync_state', {
'state': 'end',
'score': me['score'],
'placement': placement
}, room=pid)
# waiting?
elif not game['started']:
emit('sync_state', {'state':'waiting'}, room=pid)
# question?
else:
idx = game['current_q']
q = game['questions'][idx]
answered = any(pid == p for p,_ in game['answered'])
emit('sync_state', {
'state': 'question',
'question': q['question'],
'options': q['options'],
'answered': answered
}, room=pid)
def _send_question(secret):
game = games[secret]
game['revealed'] = False
q = game['questions'][game['current_q']]
# clear previous answers
game['answered'].clear()
# broadcast
socketio.emit(
'new_question',
{'question': q['question'], 'options': q['options']},
room='player'
)
socketio.emit(
'new_question',
{'question': q['question']},
room='host'
)
def _score_and_emit(secret):
game = games[secret]
score_map = {1: 4, 2: 3, 3: 2, 4: 1}
per_q = []
# grab the current questions correct answer
correct = game['questions'][game['current_q']]['correct']
# take only the mostrecent submission
pid, ans = game['answered'][-1]
# count how many *earlier* answers were correct
prev_correct = sum(1 for p, a in game['answered'][:-1] if a == correct)
rank = prev_correct + 1
# assign points only if this one is correct and within top4
if ans == correct and rank <= 4:
pts = score_map[rank]
game['players'][pid]['score'] += pts
per_q.append({
'name': game['players'][pid]['name'],
'points': pts
})
# rebuild overall leaderboard
board = sorted(
[{'name': v['name'], 'score': v['score']} for v in game['players'].values()],
key=lambda x: -x['score']
)
# emit just the newanswer results, plus the overall board
socketio.emit('question_leader', {'results': per_q}, room='host')
socketio.emit('question_leader', {'results': per_q}, room='player')
socketio.emit('overall_leader', {'board': board}, room='host')
socketio.emit('overall_leader', {'board': board}, room='player')
# if that was the last question, finish up
if game['current_q'] >= len(game['questions']):
game['finished'] = True
socketio.emit('game_over', {'board': board}, room='host')
for psid, pdata in game['players'].items():
placement = next(i+1 for i, p in enumerate(board) if p['name'] == pdata['name'])
socketio.emit(
'game_over',
{'placement': placement, 'score': pdata['score']},
room=psid
)
if __name__ == '__main__':
# show own IP address
import socket
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
print(f"Running on http://{ip_address}:5000/")
socketio.run(app, host='0.0.0.0', port=5000)