215 lines
8.0 KiB
Python

from datetime import datetime
from flask import Flask, render_template, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import func
import os
import logging
from flask_wtf import CSRFProtect
from flask_talisman import Talisman
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL')
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'change-this-secret-key')
# CSRF Protection
csrf = CSRFProtect(app)
# Security Headers
Talisman(app)
# Logging Configuration
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger(__name__)
db = SQLAlchemy(app)
# Models
class Player(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), nullable=False, unique=True)
class Date(db.Model):
id = db.Column(db.Integer, primary_key=True)
date_str = db.Column(db.String(20), nullable=False, unique=True) # e.g. '08/05/25'
class Attendance(db.Model):
"""
Attendance record for a player on a date.
status: 'yes', 'no', 'maybe', or blank (None)
"""
id = db.Column(db.Integer, primary_key=True)
date_id = db.Column(db.Integer, db.ForeignKey('date.id'), nullable=False)
player_id = db.Column(db.Integer, db.ForeignKey('player.id'), nullable=False)
status = db.Column(db.String(10)) # 'yes', 'no', 'maybe', or blank
__table_args__ = (db.UniqueConstraint('date_id', 'player_id', name='_date_player_uc'),)
class GuestName(db.Model):
id = db.Column(db.Integer, primary_key=True)
date_id = db.Column(db.Integer, db.ForeignKey('date.id'), nullable=False, unique=True)
name = db.Column(db.String(50), nullable=False)
def get_initial_data():
# Ensure tables exist
db.create_all()
# If no players, insert defaults
if Player.query.count() == 0:
for name in ["Alice", "Bob", "Charlie", "David", "Eve", "Frank", "Grace", "Hannah"]:
db.session.add(Player(name=name))
db.session.commit()
if not GuestName.query.first():
# Set default guest name for all dates
pass
def parse_date(date_str):
return datetime.strptime(date_str, '%d/%m/%y')
def db_to_json():
players = [p.name for p in Player.query.order_by(Player.id)]
guest = "Guest"
dates = sorted([d.date_str for d in Date.query.all()], key=parse_date)
attendance = {}
for att in Attendance.query.all():
date = Date.query.get(att.date_id).date_str
player = Player.query.get(att.player_id)
colIdx = players.index(player.name) if player.name in players else len(players) # guest is last
key = f"{date}|{colIdx}"
if att.status == 'yes':
attendance[key] = True
elif att.status == 'no':
attendance[key] = 'no'
elif att.status == 'maybe':
attendance[key] = 'maybe'
guest_names = {Date.query.get(g.date_id).date_str: g.name for g in GuestName.query.all()}
return {
"players": players,
"guest": guest,
"dates": dates,
"attendance": attendance,
"guestNames": guest_names
}
import re
def validate_player_name(name):
# Only allow a-z, A-Z, 0-9, spaces, hyphens, and periods, max 50 chars
return bool(re.fullmatch(r'[a-zA-Z0-9 .-]{1,50}', name))
def validate_guest_name(name):
# Only allow a-z, A-Z, 0-9, spaces, hyphens, and periods, max 50 chars
return bool(re.fullmatch(r'[a-zA-Z0-9 .-]{1,50}', name))
def validate_date_str(date_str):
# Format: DD/MM/YY
return bool(re.fullmatch(r'\d{2}/\d{2}/\d{2}', date_str))
def json_to_db(data):
try:
# Validate players
for name in data.get("players", []):
if not validate_player_name(name):
logger.warning(f"Invalid player name: {name}")
raise ValueError("Invalid player name.")
# Validate dates
for date_str in data.get("dates", []):
if not validate_date_str(date_str):
logger.warning(f"Invalid date string: {date_str}")
raise ValueError("Invalid date string.")
# Validate guest names
for guest_name in data.get("guestNames", {}).values():
if not validate_guest_name(guest_name):
logger.warning(f"Invalid guest name: {guest_name}")
raise ValueError("Invalid guest name.")
# Start a single transaction for all operations
with db.session.begin():
# Clear existing attendance records
db.session.query(Attendance).delete()
db.session.query(GuestName).delete()
# Update players - keep existing ones, add new ones
existing_players = {p.name: p for p in Player.query.all()}
for name in data.get("players", []):
if name not in existing_players:
db.session.add(Player(name=name))
# Update dates - keep existing ones, add new ones
existing_dates = {d.date_str: d for d in Date.query.all()}
for date_str in data.get("dates", []):
if date_str not in existing_dates:
date = Date(date_str=date_str)
db.session.add(date)
existing_dates[date_str] = date
# Refresh player list after potential additions
player_dict = {p.name: p for p in Player.query.all()}
# Add attendance records
for date_str in data.get("dates", []):
date = existing_dates[date_str]
for idx, player_name in enumerate(data["players"] + [data["guest"]]):
key = f"{date_str}|{idx}"
status = data["attendance"].get(key)
if status and player_name in player_dict:
player = player_dict[player_name]
# Convert boolean True to string 'yes'
status_value = 'yes' if status is True else status
# Only add if status is one of the valid values
if status_value in ['yes', 'no', 'maybe']:
db.session.add(Attendance(date_id=date.id, player_id=player.id, status=status_value))
# Add guest name if present
guest_name = data.get("guestNames", {}).get(date_str)
if guest_name:
db.session.add(GuestName(date_id=date.id, name=guest_name))
# The transaction will be committed automatically if no exceptions occur
except Exception as e:
logger.error(f"Error during database operation: {e}")
raise ValueError("An error occurred while processing the data.")
@app.route('/')
def index():
return render_template('index.html')
@app.route('/reports')
def reports():
return render_template('reports.html')
@app.route('/api/data', methods=['GET'])
def get_data():
get_initial_data()
return jsonify(db_to_json())
@app.route('/api/data', methods=['POST'])
@csrf.exempt # Remove this line if you want to enforce CSRF on API endpoints (for APIs, you may want to handle CSRF differently)
def update_data():
data = request.json
try:
json_to_db(data)
return jsonify({"status": "success"})
except ValueError as e:
return jsonify({"status": "error", "message": str(e)}), 400
except Exception as e:
logger.error(f"Unexpected error in /api/data POST: {e}")
return jsonify({"status": "error", "message": "Internal server error."}), 500
# Optional: export from DB as JSON (for compatibility)
@app.route('/export-data')
def export_data():
from flask import Response
get_initial_data()
data = db_to_json()
return Response(
response=json.dumps(data, indent=2),
mimetype='application/json',
headers={'Content-Disposition': 'attachment;filename=attendance_data.json'}
)
if __name__ == '__main__':
import os
port = int(os.environ.get('PORT', 5000))
app.run(host='0.0.0.0', port=port, debug=False)