520 lines
16 KiB
Python
520 lines
16 KiB
Python
# --- app.py ---
|
||
import os
|
||
import io
|
||
import base64
|
||
import pandas as pd
|
||
import qrcode
|
||
import uuid
|
||
import time, math
|
||
from flask import Flask, render_template, request, session, redirect, url_for, abort
|
||
from flask_socketio import SocketIO, emit, join_room
|
||
from dotenv import load_dotenv
|
||
|
||
# global store of all games, keyed by secret. Each entry will hold its own questions list.
|
||
games = {}
|
||
|
||
# Load environment vars
|
||
load_dotenv()
|
||
app = Flask(__name__)
|
||
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev')
|
||
# manage_session ensures Flask session is available in SocketIO handlers
|
||
socketio = SocketIO(app, manage_session=True)
|
||
|
||
# In-memory game state
|
||
game = {
|
||
'players': {},
|
||
'order': [],
|
||
'current_q': 0,
|
||
'answered': [],
|
||
'started': False
|
||
}
|
||
|
||
@app.route('/', methods=['GET'])
|
||
def host():
|
||
# If this browser is already a player, don't let it hit the host view.
|
||
if session.get('role') == 'player':
|
||
secret = session.get('secret')
|
||
if secret:
|
||
return redirect(url_for('player', secret=secret))
|
||
return redirect(url_for('player'))
|
||
|
||
session['role'] = 'host'
|
||
# — manage per-game secret in session —
|
||
secret = session.get('secret')
|
||
if not secret:
|
||
secret = uuid.uuid4().hex[:8]
|
||
session['secret'] = secret
|
||
|
||
# ensure game exists (with questions placeholder)
|
||
if secret not in games:
|
||
games[secret] = {
|
||
'questions': None,
|
||
'players': {}, 'order': [],
|
||
'current_q': 0, 'answered': [],
|
||
'started': False, 'finished': False,
|
||
'revealed': False
|
||
}
|
||
game = games[secret]
|
||
# if no question file has been uploaded yet, show upload form
|
||
if not game.get('questions'):
|
||
return render_template('upload.html')
|
||
|
||
# build join URL + QR
|
||
join_url = url_for('player', secret=secret, _external=True)
|
||
img = qrcode.make(join_url)
|
||
buf = io.BytesIO()
|
||
img.save(buf, format='PNG')
|
||
qr_data = base64.b64encode(buf.getvalue()).decode('ascii')
|
||
|
||
return render_template(
|
||
'host.html',
|
||
qr_data=qr_data,
|
||
join_url=join_url,
|
||
secret=secret
|
||
)
|
||
|
||
@app.route('/upload', methods=['POST'])
|
||
def upload():
|
||
session['role'] = 'host'
|
||
secret = session.get('secret')
|
||
if not secret or secret not in games:
|
||
return redirect(url_for('host'))
|
||
|
||
game = games[secret]
|
||
|
||
# retrieve uploaded file
|
||
file = request.files.get('file')
|
||
if not file:
|
||
return render_template('upload.html', error='Keine Datei ausgewählt')
|
||
|
||
# try to read Excel
|
||
try:
|
||
df = pd.read_excel(file)
|
||
except Exception:
|
||
return render_template('upload.html', error='Ungültige Excel-Datei')
|
||
|
||
# enforce maximum of 100 questions
|
||
max_questions = 100
|
||
if df.shape[0] > max_questions:
|
||
return render_template(
|
||
'upload.html',
|
||
error=f'Maximal dürfen {max_questions} Fragen hochgeladen werden. Ihre Datei enthält {df.shape[0]}.')
|
||
|
||
# mandatory columns
|
||
required = ['question', 'answer1', 'answer2', 'answer3', 'answer4', 'correct']
|
||
missing = [col for col in required if col not in df.columns]
|
||
if missing:
|
||
return render_template(
|
||
'upload.html',
|
||
error=f"Fehler in Datei: Fehlende Spalte(n): {', '.join(missing)}"
|
||
)
|
||
|
||
# validate that 'correct' references one of the answer columns
|
||
valid_keys = ['answer1', 'answer2', 'answer3', 'answer4']
|
||
invalid_mask = ~df['correct'].isin(valid_keys)
|
||
if invalid_mask.any():
|
||
idx = invalid_mask.idxmax()
|
||
excel_line = idx + 2 # Header is line 1
|
||
bad_value = df.at[idx, 'correct']
|
||
return render_template(
|
||
'upload.html',
|
||
error=(
|
||
f'Fehler in Zeile {excel_line}: '
|
||
f'"correct" enthält "{bad_value}", '
|
||
f'muss einer der Werte {valid_keys} sein.'
|
||
)
|
||
)
|
||
|
||
# build per-game question list
|
||
questions = []
|
||
for i, (_, row) in enumerate(df.iterrows()):
|
||
col = row['correct']
|
||
questions.append({
|
||
'qid': i,
|
||
'question': str(row['question']),
|
||
'options': [str(row[f'answer{i}']) for i in range(1, 5)],
|
||
'correct': str(row[col])
|
||
})
|
||
|
||
game['questions'] = questions
|
||
# redirect to host view, which will show the QR code
|
||
return redirect(url_for('host'))
|
||
|
||
|
||
@app.route('/new_game', methods=['POST'])
|
||
def new_game():
|
||
if session.get('role') != 'host':
|
||
abort(403)
|
||
# remove the old game entirely (so its secret is no longer valid)
|
||
old_secret = session.pop('secret', None)
|
||
if old_secret and old_secret in games:
|
||
del games[old_secret]
|
||
return redirect(url_for('host'))
|
||
|
||
@app.route('/player')
|
||
def player():
|
||
session['role'] = 'player'
|
||
|
||
# 1) Grab any secret from the URL
|
||
new_secret = request.args.get('secret')
|
||
old_secret = session.get('secret')
|
||
|
||
# 2) If they provided a new secret, it *must* already exist or we reject immediately
|
||
if new_secret:
|
||
if new_secret not in games:
|
||
return render_template(
|
||
'player.html',
|
||
joined=False,
|
||
state='join',
|
||
options=[],
|
||
answered=False,
|
||
error='Spiel nicht gefunden'
|
||
), 400
|
||
# only clear join info if it really *is* a different, valid game
|
||
if new_secret != old_secret:
|
||
for k in ('joined','player_id','player_name'):
|
||
session.pop(k, None)
|
||
secret = new_secret
|
||
else:
|
||
secret = old_secret
|
||
|
||
# 3) We need *some* secret in session — and it must still be in our games dict
|
||
if not secret or secret not in games:
|
||
return render_template(
|
||
'player.html',
|
||
joined=False,
|
||
state='join',
|
||
options=[],
|
||
answered=False,
|
||
error='Spiel nicht gefunden'
|
||
), 400
|
||
|
||
# 4) Save it for future Socket.IO events
|
||
session['secret'] = secret
|
||
|
||
# At this point, we’ve wiped only *this user’s* join info if they scanned a new code.
|
||
# We did *not* delete games[old_secret], so old games remain intact.
|
||
|
||
joined = session.get('joined', False)
|
||
# We don’t emit() here; the socket handlers will PUSH the right state.
|
||
return render_template(
|
||
'player.html',
|
||
joined=joined,
|
||
state='join', # actual view switched via sync_state over Socket.IO
|
||
options=[],
|
||
answered=False
|
||
)
|
||
|
||
|
||
@app.route('/player/join', methods=['POST'])
|
||
def do_join():
|
||
name = (request.form.get('name') or 'Anonymous').strip()
|
||
if not name:
|
||
name = 'Anonymous'
|
||
|
||
session['role'] = 'player'
|
||
session['joined'] = True
|
||
|
||
# make sure the player is still talking to a live game
|
||
secret = session.get('secret')
|
||
if not secret or secret not in games:
|
||
return render_template(
|
||
'player.html',
|
||
joined=False,
|
||
state='join',
|
||
options=[],
|
||
answered=False,
|
||
error='Spiel nicht gefunden'
|
||
), 400
|
||
|
||
# ─── enforce unique name per game (case-insensitive) ───
|
||
existing_lower = {v['name'].strip().lower() for v in games[secret]['players'].values()}
|
||
if name.lower() in existing_lower:
|
||
# do NOT mark session as joined; force them back to join view with error
|
||
session['joined'] = False
|
||
return render_template(
|
||
'player.html',
|
||
joined=False,
|
||
state='join',
|
||
options=[],
|
||
answered=False,
|
||
error='Dieser Name ist bereits vergeben. Bitte wähle einen anderen.'
|
||
), 400
|
||
|
||
# proceed with normal join
|
||
session['player_name'] = name
|
||
session['player_id'] = str(uuid.uuid4())
|
||
return redirect(url_for('player', secret=session.get('secret')))
|
||
|
||
|
||
|
||
@socketio.on('connect')
|
||
def on_connect():
|
||
secret = session.get('secret')
|
||
if not secret or secret not in games:
|
||
return
|
||
game = games[secret]
|
||
role = session.get('role')
|
||
|
||
if role == 'host':
|
||
join_room('host')
|
||
|
||
# ─── if the game is already finished, just re-emit the final board ───
|
||
if game.get('finished'):
|
||
board = sorted(
|
||
[{'name': v['name'], 'score': v['score']} for v in game['players'].values()],
|
||
key=lambda x: -x['score']
|
||
)
|
||
emit('game_over', {'board': board}, room='host')
|
||
return
|
||
|
||
# otherwise fall back to normal roster / question logic
|
||
players = [game['players'][pid]['name'] for pid in game['order']]
|
||
emit('update_roster', {'players': players})
|
||
|
||
if game.get('started') and game['current_q'] < len(game['questions']):
|
||
q = game['questions'][game['current_q']]
|
||
emit('new_question', {'question': q['question']}, room='host')
|
||
return
|
||
|
||
# player logic
|
||
if role != 'player' or not session.get('joined'):
|
||
return
|
||
|
||
pid = session['player_id']
|
||
join_room('player')
|
||
join_room(pid)
|
||
|
||
if pid not in game['players']:
|
||
game['players'][pid] = {
|
||
'name': session.get('player_name'),
|
||
'score': 0
|
||
}
|
||
game['order'].append(pid)
|
||
socketio.emit(
|
||
'update_roster',
|
||
{'players': [game['players'][p]['name'] for p in game['order']]},
|
||
room='host'
|
||
)
|
||
|
||
# delegate to rejoin logic
|
||
_send_sync_state(pid, game)
|
||
|
||
|
||
|
||
@socketio.on('start_game')
|
||
def on_start_game():
|
||
secret = session.get('secret')
|
||
if session.get('role') != 'host' or not secret or secret not in games:
|
||
return
|
||
game = games[secret]
|
||
|
||
# if already through, reset
|
||
if game.get('started') or game['current_q'] >= len(game['questions']):
|
||
game.update({
|
||
'current_q': 0,
|
||
'answered': [],
|
||
'started': False,
|
||
'finished': False
|
||
})
|
||
for p in game['players'].values():
|
||
p['score'] = 0.0
|
||
|
||
game['started'] = True
|
||
socketio.emit('game_started', room='player')
|
||
_send_question(secret)
|
||
|
||
|
||
|
||
@socketio.on('get_roster')
|
||
def on_get_roster():
|
||
secret = session.get('secret')
|
||
if not secret or secret not in games:
|
||
return
|
||
game = games[secret]
|
||
roster = [game['players'][s]['name'] for s in game['order']]
|
||
emit('update_roster', {'players': roster})
|
||
|
||
|
||
@socketio.on('submit_answer')
|
||
def on_submit_answer(data):
|
||
secret = session.get('secret')
|
||
pid = session.get('player_id')
|
||
if not secret or secret not in games or pid not in games[secret]['players']:
|
||
return
|
||
game = games[secret]
|
||
|
||
curr = game['questions'][game['current_q']]
|
||
if int(data.get('qid', -1)) != int(curr['qid']):
|
||
return
|
||
|
||
if game.get('revealed'):
|
||
return
|
||
if any(pid == t[0] for t in game['answered']):
|
||
return
|
||
now = time.perf_counter()
|
||
q_started = game.get('q_started', now)
|
||
rt = max(0.0, now - q_started)
|
||
game['answered'].append((pid, data.get('answer'), rt))
|
||
_score_and_emit(secret)
|
||
|
||
|
||
@socketio.on('next_question')
|
||
def on_next_question():
|
||
secret = session.get('secret')
|
||
if session.get('role') != 'host' or not secret or secret not in games:
|
||
return
|
||
game = games[secret]
|
||
game['revealed'] = True
|
||
game['current_q'] += 1
|
||
if game['current_q'] < len(game['questions']):
|
||
_send_question(secret)
|
||
else:
|
||
game['finished'] = True
|
||
board = sorted(
|
||
[{'name':v['name'],'score':v['score']} for v in game['players'].values()],
|
||
key=lambda x: -x['score']
|
||
)
|
||
socketio.emit('game_over', {'board': board}, room='host')
|
||
for psid,pdata in game['players'].items():
|
||
placement = next(i+1 for i,p in enumerate(board) if p['name']==pdata['name'])
|
||
socketio.emit('game_over', {'placement': placement, 'score': pdata['score']}, room=psid)
|
||
|
||
|
||
@socketio.on('rejoin_game')
|
||
def on_rejoin_game():
|
||
secret = session.get('secret')
|
||
pid = session.get('player_id')
|
||
if not secret or secret not in games or not pid:
|
||
return
|
||
game = games[secret]
|
||
join_room('player')
|
||
join_room(pid)
|
||
_send_sync_state(pid, game)
|
||
|
||
|
||
@socketio.on('show_answer')
|
||
def on_show_answer():
|
||
secret = session.get('secret')
|
||
if session.get('role') != 'host' or not secret or secret not in games:
|
||
return
|
||
game = games[secret]
|
||
game['revealed'] = True
|
||
# find the correct answer text for current question
|
||
correct = game['questions'][game['current_q']]['correct']
|
||
# emit to host so they can display it
|
||
emit('answer_revealed', {'correct': correct}, room='host')
|
||
# emit to all players so they can highlight it
|
||
socketio.emit('answer_revealed', {'correct': correct}, room='player')
|
||
|
||
|
||
# helpers
|
||
|
||
def _send_sync_state(pid, game):
|
||
# finished?
|
||
if game.get('finished'):
|
||
board = sorted(
|
||
[{'name': v['name'], 'score': v['score']} for v in game['players'].values()],
|
||
key=lambda x: -x['score']
|
||
)
|
||
me = game['players'][pid]
|
||
placement = next(i+1 for i,p in enumerate(board) if p['name']==me['name'])
|
||
emit('sync_state', {
|
||
'state': 'end',
|
||
'score': me['score'],
|
||
'placement': placement
|
||
}, room=pid)
|
||
# waiting?
|
||
elif not game['started']:
|
||
emit('sync_state', {'state':'waiting'}, room=pid)
|
||
# question?
|
||
else:
|
||
idx = game['current_q']
|
||
q = game['questions'][idx]
|
||
answered = any(pid == t[0] for t in game['answered'])
|
||
emit('sync_state', {
|
||
'state': 'question',
|
||
'question': q['question'],
|
||
'options': q['options'],
|
||
'answered': answered
|
||
}, room=pid)
|
||
|
||
|
||
def _send_question(secret):
|
||
game = games[secret]
|
||
game['revealed'] = False
|
||
q = game['questions'][game['current_q']]
|
||
game['answered'].clear()
|
||
game['q_started'] = time.perf_counter()
|
||
game['q_deadline_s'] = 20
|
||
socketio.emit('new_question', {
|
||
'qid': q['qid'],
|
||
'question': q['question'],
|
||
'options': q['options']
|
||
}, room='player')
|
||
socketio.emit('new_question', {'question': q['question']}, room='host')
|
||
|
||
|
||
def _calc_points(rt_s: float, deadline_s: float, correct: bool) -> float:
|
||
if not correct or deadline_s <= 0:
|
||
return 0.0
|
||
if rt_s < 0:
|
||
rt_s = 0.0
|
||
if rt_s >= deadline_s:
|
||
return 0.0
|
||
|
||
BASE = 10
|
||
MIN = 1
|
||
frac = 1.0 - (rt_s / float(deadline_s))
|
||
pts = MIN + (BASE - MIN) * frac
|
||
return round(pts, 1) # ← keep a decimal, not an int
|
||
|
||
|
||
def _score_and_emit(secret):
|
||
game = games[secret]
|
||
per_q = []
|
||
|
||
correct = game['questions'][game['current_q']]['correct']
|
||
pid, ans, rt = game['answered'][-1] # NEW: includes rt
|
||
|
||
# NEW: time-based points for correct answers
|
||
pts = _calc_points(rt, game.get('q_deadline_s', 20), ans == correct)
|
||
if pts > 0:
|
||
game['players'][pid]['score'] += pts
|
||
per_q.append({
|
||
'name': game['players'][pid]['name'],
|
||
'points': pts
|
||
})
|
||
|
||
board = sorted(
|
||
[{'name': v['name'], 'score': v['score']} for v in game['players'].values()],
|
||
key=lambda x: -x['score']
|
||
)
|
||
|
||
socketio.emit('question_leader', {'results': per_q}, room='host')
|
||
socketio.emit('question_leader', {'results': per_q}, room='player')
|
||
socketio.emit('overall_leader', {'board': board}, room='host')
|
||
socketio.emit('overall_leader', {'board': board}, room='player')
|
||
|
||
|
||
# if that was the last question, finish up
|
||
if game['current_q'] >= len(game['questions']):
|
||
game['finished'] = True
|
||
socketio.emit('game_over', {'board': board}, room='host')
|
||
for psid, pdata in game['players'].items():
|
||
placement = next(i+1 for i, p in enumerate(board) if p['name'] == pdata['name'])
|
||
socketio.emit(
|
||
'game_over',
|
||
{'placement': placement, 'score': pdata['score']},
|
||
room=psid
|
||
)
|
||
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# show own IP address
|
||
import socket
|
||
hostname = socket.gethostname()
|
||
ip_address = socket.gethostbyname(hostname)
|
||
print(f"Running on http://{ip_address}:5000/")
|
||
socketio.run(app, host='0.0.0.0', port=5000) |