238 lines
8.5 KiB
Python
238 lines
8.5 KiB
Python
from datetime import datetime
|
|
from flask import Flask, render_template, request, jsonify
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from sqlalchemy import func
|
|
import os
|
|
import logging
|
|
from flask_wtf import CSRFProtect
|
|
from flask_talisman import Talisman
|
|
|
|
app = Flask(__name__)
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL')
|
|
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'change-this-secret-key')
|
|
|
|
# CSRF Protection
|
|
csrf = CSRFProtect(app)
|
|
|
|
# Security Headers
|
|
csp = {
|
|
'default-src': [
|
|
"'self'"
|
|
],
|
|
'script-src': [
|
|
"'self'",
|
|
'https://cdn.jsdelivr.net/npm/chart.js',
|
|
"'unsafe-inline'",
|
|
'https://umami-ikow84gco0wcw8cgsc8o08g8.reflectonai.com'
|
|
],
|
|
'style-src': [
|
|
"'self'",
|
|
"'unsafe-inline'"
|
|
],
|
|
'img-src': [
|
|
"'self'",
|
|
'data:'
|
|
],
|
|
'connect-src': [
|
|
"'self'",
|
|
'https://umami-ikow84gco0wcw8cgsc8o08g8.reflectonai.com'
|
|
]
|
|
}
|
|
Talisman(app, content_security_policy=csp)
|
|
|
|
# Logging Configuration
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
db = SQLAlchemy(app)
|
|
|
|
# Models
|
|
class Player(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(50), nullable=False, unique=True)
|
|
|
|
class Date(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
date_str = db.Column(db.String(20), nullable=False, unique=True) # e.g. '08/05/25'
|
|
|
|
class Attendance(db.Model):
|
|
"""
|
|
Attendance record for a player on a date.
|
|
status: 'yes', 'no', 'maybe', or blank (None)
|
|
"""
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
date_id = db.Column(db.Integer, db.ForeignKey('date.id'), nullable=False)
|
|
player_id = db.Column(db.Integer, db.ForeignKey('player.id'), nullable=False)
|
|
status = db.Column(db.String(10)) # 'yes', 'no', 'maybe', or blank
|
|
|
|
__table_args__ = (db.UniqueConstraint('date_id', 'player_id', name='_date_player_uc'),)
|
|
|
|
class GuestName(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
date_id = db.Column(db.Integer, db.ForeignKey('date.id'), nullable=False, unique=True)
|
|
name = db.Column(db.String(50), nullable=False)
|
|
|
|
def get_initial_data():
|
|
# Ensure tables exist
|
|
db.create_all()
|
|
# If no players, insert defaults
|
|
if Player.query.count() == 0:
|
|
for name in ["Alice", "Bob", "Charlie", "David", "Eve", "Frank", "Grace", "Hannah"]:
|
|
db.session.add(Player(name=name))
|
|
db.session.commit()
|
|
if not GuestName.query.first():
|
|
# Set default guest name for all dates
|
|
pass
|
|
|
|
def parse_date(date_str):
|
|
return datetime.strptime(date_str, '%d/%m/%y')
|
|
|
|
def db_to_json():
|
|
players = [p.name for p in Player.query.order_by(Player.id)]
|
|
guest = "Guest"
|
|
dates = sorted([d.date_str for d in Date.query.all()], key=parse_date)
|
|
attendance = {}
|
|
for att in Attendance.query.all():
|
|
date = Date.query.get(att.date_id).date_str
|
|
player = Player.query.get(att.player_id)
|
|
colIdx = players.index(player.name) if player.name in players else len(players) # guest is last
|
|
key = f"{date}|{colIdx}"
|
|
if att.status == 'yes':
|
|
attendance[key] = True
|
|
elif att.status == 'no':
|
|
attendance[key] = 'no'
|
|
elif att.status == 'maybe':
|
|
attendance[key] = 'maybe'
|
|
guest_names = {Date.query.get(g.date_id).date_str: g.name for g in GuestName.query.all()}
|
|
return {
|
|
"players": players,
|
|
"guest": guest,
|
|
"dates": dates,
|
|
"attendance": attendance,
|
|
"guestNames": guest_names
|
|
}
|
|
|
|
import re
|
|
|
|
def validate_player_name(name):
|
|
# Only allow a-z, A-Z, 0-9, spaces, hyphens, and periods, max 50 chars
|
|
return bool(re.fullmatch(r'[a-zA-Z0-9 .-]{1,50}', name))
|
|
|
|
def validate_guest_name(name):
|
|
# Only allow a-z, A-Z, 0-9, spaces, hyphens, and periods, max 50 chars
|
|
return bool(re.fullmatch(r'[a-zA-Z0-9 .-]{1,50}', name))
|
|
|
|
def validate_date_str(date_str):
|
|
# Format: DD/MM/YY
|
|
return bool(re.fullmatch(r'\d{2}/\d{2}/\d{2}', date_str))
|
|
|
|
def json_to_db(data):
|
|
try:
|
|
# Validate players
|
|
for name in data.get("players", []):
|
|
if not validate_player_name(name):
|
|
logger.warning(f"Invalid player name: {name}")
|
|
raise ValueError("Invalid player name.")
|
|
# Validate dates
|
|
for date_str in data.get("dates", []):
|
|
if not validate_date_str(date_str):
|
|
logger.warning(f"Invalid date string: {date_str}")
|
|
raise ValueError("Invalid date string.")
|
|
# Validate guest names
|
|
for guest_name in data.get("guestNames", {}).values():
|
|
if not validate_guest_name(guest_name):
|
|
logger.warning(f"Invalid guest name: {guest_name}")
|
|
raise ValueError("Invalid guest name.")
|
|
|
|
# Start a single transaction for all operations
|
|
with db.session.begin():
|
|
# Clear existing attendance records
|
|
db.session.query(Attendance).delete()
|
|
db.session.query(GuestName).delete()
|
|
|
|
# Update players - keep existing ones, add new ones
|
|
existing_players = {p.name: p for p in Player.query.all()}
|
|
for name in data.get("players", []):
|
|
if name not in existing_players:
|
|
db.session.add(Player(name=name))
|
|
|
|
# Update dates - keep existing ones, add new ones
|
|
existing_dates = {d.date_str: d for d in Date.query.all()}
|
|
for date_str in data.get("dates", []):
|
|
if date_str not in existing_dates:
|
|
date = Date(date_str=date_str)
|
|
db.session.add(date)
|
|
existing_dates[date_str] = date
|
|
|
|
# Refresh player list after potential additions
|
|
player_dict = {p.name: p for p in Player.query.all()}
|
|
|
|
# Add attendance records
|
|
for date_str in data.get("dates", []):
|
|
date = existing_dates[date_str]
|
|
for idx, player_name in enumerate(data["players"] + [data["guest"]]):
|
|
key = f"{date_str}|{idx}"
|
|
status = data["attendance"].get(key)
|
|
if status and player_name in player_dict:
|
|
player = player_dict[player_name]
|
|
# Convert boolean True to string 'yes'
|
|
status_value = 'yes' if status is True else status
|
|
# Only add if status is one of the valid values
|
|
if status_value in ['yes', 'no', 'maybe']:
|
|
db.session.add(Attendance(date_id=date.id, player_id=player.id, status=status_value))
|
|
|
|
# Add guest name if present
|
|
guest_name = data.get("guestNames", {}).get(date_str)
|
|
if guest_name:
|
|
db.session.add(GuestName(date_id=date.id, name=guest_name))
|
|
|
|
# The transaction will be committed automatically if no exceptions occur
|
|
except Exception as e:
|
|
logger.error(f"Error during database operation: {e}")
|
|
raise ValueError("An error occurred while processing the data.")
|
|
|
|
|
|
|
|
@app.route('/')
|
|
def index():
|
|
return render_template('index.html')
|
|
|
|
@app.route('/reports')
|
|
def reports():
|
|
return render_template('reports.html')
|
|
|
|
@app.route('/api/data', methods=['GET'])
|
|
def get_data():
|
|
get_initial_data()
|
|
return jsonify(db_to_json())
|
|
|
|
@app.route('/api/data', methods=['POST'])
|
|
@csrf.exempt # Remove this line if you want to enforce CSRF on API endpoints (for APIs, you may want to handle CSRF differently)
|
|
def update_data():
|
|
data = request.json
|
|
try:
|
|
json_to_db(data)
|
|
return jsonify({"status": "success"})
|
|
except ValueError as e:
|
|
return jsonify({"status": "error", "message": str(e)}), 400
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error in /api/data POST: {e}")
|
|
return jsonify({"status": "error", "message": "Internal server error."}), 500
|
|
|
|
# Optional: export from DB as JSON (for compatibility)
|
|
@app.route('/export-data')
|
|
def export_data():
|
|
from flask import Response
|
|
get_initial_data()
|
|
data = db_to_json()
|
|
return Response(
|
|
response=json.dumps(data, indent=2),
|
|
mimetype='application/json',
|
|
headers={'Content-Disposition': 'attachment;filename=attendance_data.json'}
|
|
)
|
|
|
|
if __name__ == '__main__':
|
|
import os
|
|
port = int(os.environ.get('PORT', 5000))
|
|
app.run(host='0.0.0.0', port=port, debug=False)
|