import sqlite3 from datetime import datetime, timedelta, time from fastapi import FastAPI, HTTPException, Depends, WebSocket, WebSocketDisconnect, Header from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from fastapi.responses import FileResponse from typing import List, Dict, Optional import uuid import os import json import asyncio from starlette.websockets import WebSocketState from fpdf import FPDF import tempfile app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # SQLite database setup DATABASE_NAME = "school.db" SUBJECTS = ["De", "Ma", "NW", "En", "Gs", "Re", "WN", "Fr", "Sp", "Sn"] ROOM_NAMES = [ {"number": "1", "room_number": "0201", "info": "(Hörsaal)", "location": "Osttrakt"}, {"number": "2", "room_number": "0202", "info": "(NW)", "location": "Osttrakt"}, {"number": "3", "room_number": "0203", "info": "(NW)", "location": "Osttrakt"}, {"number": "4", "room_number": "0204", "info": "(NW)", "location": "Osttrakt"}, {"number": "5", "room_number": "0205", "info": "(NW)", "location": "Osttrakt"}, {"number": "6", "room_number": "0214", "info": "(NW)", "location": "Lichthof"}, {"number": "7", "room_number": "0215", "info": "(NW)", "location": "Lichthof"}, {"number": "8", "room_number": "0216", "info": "(NW)", "location": "Flur zum Lichthof"}, {"number": "9", "room_number": "0217", "info": "(NW)", "location": "Flur zum Lichthof"}, {"number": "10", "room_number": "0265", "info": "(NW)", "location": "Lichthof"}, {"number": "11", "room_number": "0266", "info": "(NW)", "location": "Flur zum Lichthof"}, {"number": "12", "room_number": "0267", "info": "(NW)", "location": "Flur zum Lichthof"}, {"number": "13", "room_number": "1860", "info": "Jahrgang Südwest", "location": "1. OG"}, {"number": "14", "room_number": "1861", "info": "Jahrgang Südwest", "location": "1. OG"}, {"number": "15", "room_number": "1862", "info": "Jahrgang Südwest", "location": "1. OG"}, {"number": "16", "room_number": "1863", "info": "Jahrgang Südwest", "location": "1. OG"}, {"number": "17", "room_number": "1864", "info": "Jahrgang Südwest", "location": "1. OG"}, {"number": "18", "room_number": "1865", "info": "Jahrgang Südwest", "location": "1. OG"}, {"number": "19", "room_number": "1866", "info": "Jahrgang Südwest", "location": "1. OG"}, {"number": "20", "room_number": "1867", "info": "Jahrgang Südwest", "location": "1. OG"}, {"number": "21", "room_number": "1868", "info": "Jahrgang Südwest", "location": "1. OG"}, {"number": "22", "room_number": "1869", "info": "Jahrgang Südwest", "location": "1. OG"}, {"number": "23", "room_number": "1870", "info": "Jahrgang Südwest", "location": "1. OG"}, {"number": "24", "room_number": "2860", "info": "Jahrgang Südwest", "location": "2. OG"}, {"number": "25", "room_number": "2861", "info": "Jahrgang Südwest", "location": "2. OG"}, {"number": "26", "room_number": "2862", "info": "Jahrgang Südwest", "location": "2. OG"}, {"number": "27", "room_number": "2863", "info": "Jahrgang Südwest", "location": "2. OG"}, {"number": "28", "room_number": "2864", "info": "Jahrgang Südwest", "location": "2. OG"}, {"number": "29", "room_number": "2865", "info": "Jahrgang Südwest", "location": "2. OG"}, {"number": "30", "room_number": "2866", "info": "Jahrgang Südwest", "location": "2. OG"}, {"number": "31", "room_number": "2867", "info": "Jahrgang Südwest", "location": "2. OG"}, {"number": "32", "room_number": "2868", "info": "Jahrgang Südwest", "location": "2. OG"}, {"number": "33", "room_number": "2869", "info": "Jahrgang Südwest", "location": "2. OG"}, {"number": "34", "room_number": "2870", "info": "Jahrgang Südwest", "location": "2. OG"} ] room_name_dict = {room['number']: room for room in ROOM_NAMES} active_connections: Dict[str, WebSocket] = {} #CURRENT_TIME_HOUR_MINUTE = datetime.now().strftime("%H:%M") CURRENT_DATE = datetime.now().strftime("%Y-%m-%d") CURRENT_TIME_HOUR_MINUTE = "9:10" # Helper function to connect to the database def get_db_connection(): conn = sqlite3.connect(DATABASE_NAME) conn.row_factory = sqlite3.Row return conn # Database tables creation on startup def create_tables(): conn = get_db_connection() cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS rooms ( room_number TEXT NOT NULL, max_students INTEGER NOT NULL, teacher_name TEXT NOT NULL, is_open BOOLEAN NOT NULL, lesson_date TEXT NOT NULL, -- Can also be DATETIME if you need both date and time lesson_time TEXT NOT NULL, unique_id TEXT PRIMARY KEY, -- Ensure unique_id is the primary key UNIQUE(room_number, lesson_date, lesson_time) -- Composite unique constraint ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS student_joins ( student_username TEXT NOT NULL, room_unique_id TEXT NOT NULL, join_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (student_username, room_unique_id), FOREIGN KEY (student_username) REFERENCES students(username) ON DELETE CASCADE, FOREIGN KEY (room_unique_id) REFERENCES rooms(unique_id) ON DELETE CASCADE ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS students ( username TEXT PRIMARY KEY NOT NULL, password TEXT NOT NULL, first_name TEXT, last_name TEXT, user_class TEXT, unique_id TEXT ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS teachers ( username TEXT PRIMARY KEY, password TEXT, first_name TEXT, last_name TEXT, subjects TEXT ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS sessions ( session_id TEXT PRIMARY KEY, username TEXT, role TEXT ) ''') # Clear all rooms and sessions on startup cursor.execute("DELETE FROM rooms") cursor.execute("DELETE FROM sessions") cursor.execute("DELETE FROM student_joins") conn.commit() conn.close() # Models class RoomCreate(BaseModel): room_number: str max_students: int session_id: str lesson_time: str lesson_date: str class Student(BaseModel): username: str class RoomInfo(BaseModel): room_number: str max_students: int current_students: List[str] teacher_name: str class User_register(BaseModel): username: str first_name: str last_name: str user_class: str password: str class User(BaseModel): username: str password: str class Session(BaseModel): session_id: str class Teacher_User(BaseModel): username: str password: str teacher_secret_password: str first_name: str last_name: str subjects: str def current_role(session_id: str): conn = get_db_connection() cursor = conn.cursor() # Check if the session ID exists in the sessions table cursor.execute("SELECT role FROM sessions WHERE session_id = ?", (session_id,)) role: list[sqlite3.Row] = cursor.fetchone() conn.close() return role # Helper functions def get_current_user(session_id: str): conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM sessions WHERE session_id = ?", (session_id,)) session = cursor.fetchone() if session is None: raise HTTPException(status_code=404, detail="Session not found") conn.close() return session def subjects_avaible(liste, worte): for fach in worte: if fach not in liste: return False, fach # Gibt False und das nicht gefundene Fach zurück return True, None async def check_lesson_time(): print(f"Checking for the event at {datetime.now()}...") try: conn = get_db_connection() cursor = conn.cursor() cursor.execute( """ UPDATE rooms SET is_open = ? WHERE lesson_date = ? AND lesson_time = ? """, (False, CURRENT_DATE, CURRENT_TIME_HOUR_MINUTE) ) conn.commit() conn.close() event_happened = False # Replace with your actual condition if event_happened: print("Event occurred!") return True return False except: print("no rooms") # Calculate seconds until the next target time def seconds_until_next_interval(): now = datetime.now() next_minute = (now.minute // 5 + 1) * 5 # Next multiple of 5 minutes if next_minute >= 60: # Handle hour overflow next_hour = now.hour + 1 if now.hour < 23 else 0 next_time = datetime(now.year, now.month, now.day, next_hour, 0, 0) else: next_time = datetime(now.year, now.month, now.day, now.hour, next_minute, 0) return (next_time - now).total_seconds() # Background task to check the event based on real-world time async def periodic_event_checker(): while True: try: event_detected = await check_lesson_time() if event_detected: print("Event detected! Taking action.") # Add any action you want to take when the event occurs # You can also exit the loop if needed break sleep_time = seconds_until_next_interval() print(f"Sleeping for {sleep_time:.2f} seconds until the next check.") await asyncio.sleep(sleep_time) except Exception as e: print(f"Error in background task: {e}") await asyncio.sleep(5 * 60) # Retry after 5 minutes in case of an error import logging logging.basicConfig(level=logging.DEBUG) def is_open_update(unique_id: str): conn = get_db_connection() cursor = conn.cursor() # Check the current number of students in the room cursor.execute(""" SELECT COUNT(s.username) AS current_students, COALESCE(r.max_students, 100) AS max_students, COALESCE(r.is_open, 0) AS is_open FROM students s JOIN student_joins sj ON s.username = sj.student_username JOIN rooms r ON r.unique_id = sj.room_unique_id WHERE sj.room_unique_id = ? """, (unique_id,)) result = cursor.fetchone() if result is None: logging.debug(f"No room found with unique_id: {unique_id}") conn.close() return current_students = int(result['current_students']) max_students = int(result['max_students']) current_is_open = int(result['is_open']) logging.debug(f"current_students: {current_students}, max_students: {max_students}, current_is_open: {current_is_open}") # If the number of students exceeds or equals the max students, close the room if current_students >= max_students: if current_is_open != 0: # Check if the room is not already closed cursor.execute(""" UPDATE rooms SET is_open = 0 WHERE unique_id = ? """, (unique_id,)) conn.commit() else: if current_is_open != 1: # Check if the room is not already cursor.execute(""" UPDATE rooms SET is_open = 1 WHERE unique_id = ? """, (unique_id,)) conn.commit() conn.close() # Clear all data when the application starts @app.on_event("startup") def startup_event(): create_tables() asyncio.create_task(periodic_event_checker()) @app.post("/teacher/register") def teacher_register(user: Teacher_User): conn = get_db_connection() cursor = conn.cursor() # Check if username exists in either teachers or students cursor.execute("SELECT * FROM teachers WHERE username = ?", (user.username,)) if cursor.fetchone(): conn.close() raise HTTPException(status_code=400, detail="Username already exists as a teacher") cursor.execute("SELECT * FROM students WHERE username = ?", (user.username,)) if cursor.fetchone(): conn.close() raise HTTPException(status_code=400, detail="Username already exists as a student") if user.teacher_secret_password != "chrissi": conn.close() raise HTTPException(status_code=403, detail="Incorrect teacher secret password") teacher_subjects = [fach.strip() for fach in user.subjects.split(",")] result, error_subject = subjects_avaible(SUBJECTS, teacher_subjects) if result == False: return{"message": "Unavaible Subject", "subject": error_subject} cursor.execute("INSERT INTO teachers (username, password, first_name, last_name, subjects) VALUES (?, ?, ?, ?, ?)", (user.username, user.password, user.first_name, user.last_name, user.subjects)) conn.commit() # Create session for teacher session_id = str(uuid.uuid4()) cursor.execute("INSERT INTO sessions (session_id, username, role) VALUES (?, ?, ?)", (session_id, user.username, "teacher")) conn.commit() conn.close() return {"message": "Teacher registered successfully", "session_id": session_id, "username": user.username} @app.post("/student/register") def student_register(user: User_register): conn = get_db_connection() cursor = conn.cursor() # Check if username exists in either students or teachers cursor.execute("SELECT * FROM students WHERE username = ?", (user.username,)) if cursor.fetchone(): conn.close() raise HTTPException(status_code=400, detail="Benutzername bereits vergeben") cursor.execute("SELECT * FROM teachers WHERE username = ?", (user.username,)) if cursor.fetchone(): conn.close() raise HTTPException(status_code=400, detail="Benutzername bereits vergeben") cursor.execute("INSERT INTO students (username, password, first_name, last_name, user_class) VALUES (?, ?, ?, ?, ?)", (user.username, user.password, user.first_name, user.last_name, user.user_class)) conn.commit() # Create session for student session_id = str(uuid.uuid4()) cursor.execute("INSERT INTO sessions (session_id, username, role) VALUES (?, ?, ?)", (session_id, user.username, "student")) conn.commit() conn.close() return {"message": "Schüler erfolgreich registriert", "session_id": session_id, "username": user.username} @app.post("/login") def login(user: User): conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM teachers WHERE username = ?", (user.username,)) teacher = cursor.fetchone() if teacher: if teacher["password"] != user.password: conn.close() raise HTTPException(status_code=403, detail="Anmeldung Fehlgeschlagen! Falscher Benutzername oder falsches Passwort") role = "teacher" else: cursor.execute("SELECT * FROM students WHERE username = ?", (user.username,)) student = cursor.fetchone() if student: if student["password"] != user.password: conn.close() raise HTTPException(status_code=403, detail="Anmeldung Fehlgeschlagen! Falscher Benutzername oder falsches Passwort") role = "student" else: conn.close() raise HTTPException(status_code=404, detail="Anmeldung Fehlgeschlagen! Falscher Benutzername oder falsches Passwort") session_id = str(uuid.uuid4()) cursor.execute("INSERT INTO sessions (session_id, username, role) VALUES (?, ?, ?)", (session_id, user.username, role)) conn.commit() conn.close() return {"session_id": session_id, "username": user.username} @app.post("/teacher/create_room") def create_room(room: RoomCreate): lesson_date = datetime.strptime(room.lesson_date, "%Y-%m-%d") today = datetime.today().date() if lesson_date.date() < today: raise HTTPException(status_code=400, detail="Lesson date cannot be in the past") # Check if the lesson date is today, but the lesson time has already passed if lesson_date.date() == today: lesson_time = datetime.strptime(room.lesson_time, "%H:%M").time() current_time = datetime.now().time() #if lesson_time < current_time: # raise HTTPException(status_code=400, detail="Lesson time has already passed today") user = get_current_user(room.session_id) if user['role'] != "teacher": raise HTTPException(status_code=403, detail="Only teachers can create rooms") unique_id = str(uuid.uuid4()) conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM rooms WHERE room_number = ? AND lesson_date = ? AND lesson_time = ?", (room.room_number, room.lesson_date, room.lesson_time,)) if cursor.fetchone(): conn.close() raise HTTPException(status_code=400, detail="Room already exists") cursor.execute( "INSERT INTO rooms (room_number, max_students, teacher_name, is_open, lesson_date, lesson_time, unique_id) VALUES (?, ?, ?, ?, ?, ?, ?)", ( room.room_number, room.max_students, user['username'], True, room.lesson_date, room.lesson_time, unique_id, ) ) conn.commit() conn.close() return {"message": "Room created successfully", "unique_id": unique_id} @app.delete("/teacher/delete_room/") def delete_room(unique_id: str, session: Session): user = get_current_user(session.session_id) if user['role'] != "teacher": raise HTTPException(status_code=403, detail="Only teachers can delete rooms") conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM rooms WHERE unique_id = ?", (unique_id,)) room = cursor.fetchone() if room is None: conn.close() raise HTTPException(status_code=404, detail="Room not found") if room["teacher_name"] != user['username']: conn.close() raise HTTPException(status_code=403, detail="You do not have permission to delete this room") cursor.execute("DELETE FROM rooms WHERE unique_id = ?", (unique_id,)) conn.commit() conn.close() return {"message": f"Room {unique_id} deleted successfully by {user['username']}"} @app.post("/teacher/room_students/") def get_room_students(unique_id: str, session: Session): user = get_current_user(session.session_id) if user['role'] != "teacher": raise HTTPException(status_code=403, detail="Only teachers can view students in their room") conn = get_db_connection() cursor = conn.cursor() # Check if the room exists and if it's owned by the current teacher cursor.execute("SELECT * FROM rooms WHERE unique_id = ? AND teacher_name = ?", (unique_id, user['username'])) room = cursor.fetchone() if room is None: conn.close() raise HTTPException(status_code=404, detail="Room not found or not owned by this teacher") # Fetch the students who are currently joined to the room cursor.execute(""" SELECT s.username FROM students s JOIN student_joins sj ON s.username = sj.student_username WHERE sj.room_unique_id = ? """, (unique_id,)) students = [row['username'] for row in cursor.fetchall()] conn.close() return {"unique_id": unique_id, "students": students} @app.websocket("/ws/student/my_room") async def websocket_get_my_room(websocket: WebSocket, session_id: Optional[str] = Header(None)): await websocket.accept() # Validate session ID and get user user = get_current_user(session_id) if not user or user['role'] != "student": await websocket.close(code=403) return # Store active connection active_connections[user['username']] = websocket try: while True: # Simulate periodic room and teacher subjects fetch conn = get_db_connection() cursor = conn.cursor() # Fetch rooms the student has joined cursor.execute( """ SELECT r.unique_id, r.room_number, r.teacher_name, r.max_students, r.lesson_date, r.lesson_time, t.first_name, t.last_name FROM student_joins sj JOIN rooms r ON sj.room_unique_id = r.unique_id JOIN teachers t ON r.teacher_name = t.username WHERE sj.student_username = ? """, (user['username'],) ) rooms = cursor.fetchall() # Fetch teacher subjects cursor.execute("SELECT username, subjects FROM teachers") teacher_subjects_raw = cursor.fetchall() teacher_subjects = {row[0]: row[1] for row in teacher_subjects_raw} rooms_list = [] for room in rooms: room_dict = dict(room) room_number = room_dict.get("room_number") teacher_name = room_dict.get("teacher_name") # Add additional room details from room_name_dict if room_number and room_number in room_name_dict: room_dict.update(room_name_dict[room_number]) # Add teacher subjects if available if teacher_name and teacher_name in teacher_subjects: room_dict["subjects"] = teacher_subjects[teacher_name] # Fetch current student count for the room cursor.execute( """ SELECT COUNT(s.username) as current_students FROM students s JOIN student_joins sj ON s.username = sj.student_username WHERE sj.room_unique_id = ? """, (room_dict["unique_id"],) ) current_students = cursor.fetchone()["current_students"] room_dict["current_students"] = current_students rooms_list.append(room_dict) conn.close() if rooms_list: await websocket.send_json({"rooms": rooms_list}) else: await websocket.send_json({"error": "Student has not joined any rooms"}) await asyncio.sleep(2) # Poll every 2 seconds except WebSocketDisconnect: del active_connections[user['username']] @app.websocket("/ws/open_rooms") async def websocket_open_rooms(websocket: WebSocket, session_id: Optional[str] = Header(None)): await websocket.accept() # Validate session ID and get user user = get_current_user(session_id) if not user or user['role'] != "student": await websocket.close(code=403) return # Store active connection active_connections[user['username']] = websocket try: while True: # Simulate periodic room fetch conn = sqlite3.connect('school.db') conn.row_factory = sqlite3.Row cursor = conn.cursor() # Fetch all rooms, both open and closed cursor.execute(""" SELECT r.unique_id, r.room_number, r.teacher_name, r.max_students, COUNT(s.username) AS current_students, r.is_open, r.lesson_date, r.lesson_time, t.first_name, t.last_name, t.subjects, CASE WHEN EXISTS ( SELECT 1 FROM student_joins sj JOIN sessions ss ON sj.student_username = ss.username WHERE sj.room_unique_id = r.unique_id AND ss.session_id = ? ) THEN 1 ELSE 0 END AS joined FROM rooms r LEFT JOIN student_joins sj ON r.unique_id = sj.room_unique_id LEFT JOIN students s ON sj.student_username = s.username LEFT JOIN teachers t ON r.teacher_name = t.username WHERE r.lesson_date = ? GROUP BY r.unique_id """, (session_id, CURRENT_DATE,)) all_rooms = [] for row in cursor.fetchall(): room = dict(row) room_number = str(room['room_number']) room_info = room_name_dict.get(room_number, {}) room.update({ 'room_number': int(room_number), 'room_name': room_info.get('room_number', ''), 'info': room_info.get('info', ''), 'location': room_info.get('location', ''), 'joined': bool(room['joined']), # Whether the user joined 'max_students': room['max_students'], # Maximum capacity 'current_students': room['current_students'], # Current count 'is_open': bool(room['is_open']), # Room open status }) all_rooms.append(room) conn.close() if all_rooms: await websocket.send_json({"rooms": all_rooms}) else: await websocket.send_json({"message": "No rooms available"}) await asyncio.sleep(1) except HTTPException: await websocket.send_json({"error": "Session not found"}) await websocket.close(code=404) return except WebSocketDisconnect: del active_connections[user['username']] @app.post("/student/join_room/") def join_room(unique_id: str, session: Session): user = get_current_user(session.session_id) if user['role'] != "student": raise HTTPException(status_code=403, detail="Nur Schüler können diesem Raum beitreten") conn = get_db_connection() cursor = conn.cursor() # Check if the room exists cursor.execute("SELECT * FROM rooms WHERE unique_id = ?", (unique_id,)) room = cursor.fetchone() if room is None: conn.close() raise HTTPException(status_code=404, detail="Raum nicht gefunden") # Check if the room is open if not room["is_open"]: conn.close() raise HTTPException(status_code=400, detail="Raum ist geschlossen") # Check if the student is already in a room at the same time (same lesson_date and lesson_time) cursor.execute(""" SELECT r.unique_id FROM student_joins sj JOIN rooms r ON sj.room_unique_id = r.unique_id WHERE sj.student_username = ? AND r.lesson_date = ? AND r.lesson_time = ? """, (user['username'], room["lesson_date"], room["lesson_time"])) conflicting_room = cursor.fetchone() if conflicting_room: conn.close() raise HTTPException(status_code=400, detail="Du bist bereits einem Raum für die gleiche Stunde zugewiesen") # Check the current number of students in the room cursor.execute(""" SELECT COUNT(sj.student_username) as current_students FROM student_joins sj WHERE sj.room_unique_id = ? """, (unique_id,)) current_students = cursor.fetchone()["current_students"] if current_students >= room["max_students"]: conn.close() raise HTTPException(status_code=400, detail="Raum ist voll") # Add the student to the room by inserting into student_joins table cursor.execute(""" INSERT INTO student_joins (student_username, room_unique_id) VALUES (?, ?) """, (user['username'], unique_id)) conn.commit() conn.close() is_open_update(unique_id) return {"message": f"Joined room {unique_id}"} @app.post("/student/leave_room/") def leave_room(unique_id: str, session: Session): user = get_current_user(session.session_id) if user['role'] != "student": raise HTTPException(status_code=403, detail="Only students can leave rooms") conn = get_db_connection() cursor = conn.cursor() # Check if the student is in the specified room by checking the student_joins table cursor.execute("SELECT * FROM student_joins WHERE student_username = ? AND room_unique_id = ?", (user['username'], unique_id)) student_join = cursor.fetchone() if not student_join: conn.close() raise HTTPException(status_code=400, detail="You are not in the specified room") # Remove the student from the room by deleting the entry from student_joins cursor.execute("DELETE FROM student_joins WHERE student_username = ? AND room_unique_id = ?", (user['username'], unique_id)) conn.commit() conn.close() is_open_update(unique_id) return {"message": f"You have left room {unique_id}"} @app.post("/check_session") def check_session(session: Session): conn = get_db_connection() cursor = conn.cursor() # Check if the session ID exists in the sessions table cursor.execute("SELECT * FROM sessions WHERE session_id = ?", (session.session_id,)) session_data = cursor.fetchone() conn.close() if session_data: return {"available": True} else: return {"available": False} @app.post("/check_role") def check_role(session: Session): session_data = current_role(session.session_id) if session_data: return {"role": session_data} else: return {"role": None} @app.websocket("/ws/teacher/open_rooms") async def websocket_list_open_rooms_teacher(websocket: WebSocket, session_id: Optional[str] = Header(None, alias="session-id")): await websocket.accept() try: # Get the current user and their role based on session_id info = get_current_user(session_id) # Convert info to a dictionary if it's a sqlite3.Row if isinstance(info, sqlite3.Row): info = dict(info) # Convert sqlite3.Row to dict role = current_role(session_id) teacher_username = info.get("username", "") # Access username from the info dictionary except Exception as e: print(f"Session validation error: {e}") try: await websocket.send_json({"error": "Session not found"}) except WebSocketDisconnect: pass # Ignore if the client has disconnected await websocket.close(code=4004) return role_final = ''.join([elt for elt in role]) # Check if the role is 'teacher' if role_final != "teacher": try: await websocket.send_json({"message": "You are not a Teacher"}) except WebSocketDisconnect: pass # Ignore if the client has disconnected await websocket.close(code=1008) return try: # Fetch and send room data for the teacher await send_room_data(websocket, teacher_username) while True: await asyncio.sleep(2) await send_room_data(websocket, teacher_username) except WebSocketDisconnect: print("WebSocket disconnected") except Exception as e: print(f"Unexpected error: {e}") try: await websocket.send_json({"error": "An internal server error occurred."}) except WebSocketDisconnect: pass # Ignore if the client has disconnected await websocket.close(code=1011) async def send_room_data(websocket: WebSocket, teacher_username: str): try: conn = get_db_connection() conn.row_factory = sqlite3.Row cursor = conn.cursor() # Update the query to filter rooms by the teacher's username cursor.execute(""" SELECT r.unique_id, r.room_number, r.teacher_name, r.max_students, COUNT(s.username) AS current_students FROM rooms r LEFT JOIN student_joins sj ON r.unique_id = sj.room_unique_id LEFT JOIN students s ON sj.student_username = s.username WHERE r.is_open = 1 AND r.teacher_name = ? GROUP BY r.unique_id ORDER BY r.room_number """, (teacher_username,)) open_rooms = [] for row in cursor.fetchall(): room = dict(row) # Convert the row to a dictionary room_number = str(room['room_number']) room_info = room_name_dict.get(room_number, {}) room.update({ 'room_number': int(room_number), 'room_name': room_info.get('room_number', ''), 'info': room_info.get('info', ''), 'location': room_info.get('location', ''), }) open_rooms.append(room) conn.close() if open_rooms: try: await websocket.send_json({"open_rooms": open_rooms}) except WebSocketDisconnect: pass # Ignore if the client has disconnected else: try: await websocket.send_json({"message": "No open rooms available"}) except WebSocketDisconnect: pass # Ignore if the client has disconnected except Exception as e: print(f"Error fetching or sending room data: {e}") try: await websocket.send_json({"error": "Failed to fetch room data"}) except WebSocketDisconnect: pass # Ignore if the client has disconnected @app.get("/teacher/all_room_information") def all_room_information(): return ROOM_NAMES @app.get("/test") def test(): conn = get_db_connection() cursor = conn.cursor() # Check if the session ID exists in the sessions table cursor.execute("SELECT * FROM students WHERE username = ?", ("hi",)) session_data = cursor.fetchone() print(session_data) conn.close() if session_data: print(session_data) # Liste der Fächer ausgeben return {"subjects": session_data} # Liste der Fächer zurückgeben else: return {"error": "Benutzer nicht gefunden oder keine Fächer vorhanden"} @app.websocket("/ws/student/test") async def websocket_get_my_room(websocket: WebSocket, session_id: Optional[str] = Header(None)): await websocket.accept() # Validate session ID user = get_current_user(session_id) for i in user: print(i) if not user: await websocket.close(code=403) return # Store connection active_connections[user['username']] = websocket counter = 0 # Initialize counter to 0 try: while True: # Send incrementing counter value every second counter += 1 await websocket.send_text(f"Counter: {counter}") await asyncio.sleep(1) # Wait 1 second before sending the next message except WebSocketDisconnect: del active_connections[user['username']] print(f"Connection closed for user: {user['username']}") def generate_pdf(file_path): class PDF(FPDF): def header(self): self.set_font("Arial", "B", 16) today = "01.01.2025" self.cell(0, 10, f"Attendance Sheet - {today}", ln=True, align="C") self.ln(10) pdf = PDF() pdf.add_page() pdf.set_font("Arial", size=12) # Column headers pdf.cell(80, 10, "Namen", border=1, align="C") pdf.cell(50, 10, "Anwesend", border=1, align="C") pdf.cell(50, 10, "Abwesend", border=1, align="C") pdf.ln() # Names list names = [ "Jasper Grevsmühl", "Papa Grevsmühl", "Clara Müller", "Anna Schmidt", "Max Mustermann", ] for name in names: pdf.cell(80, 10, name, border=1) pdf.cell(50, 10, "", border=1) pdf.cell(50, 10, "", border=1) pdf.ln() # Save the PDF to the provided file path pdf.output(file_path) # FastAPI route for downloading the PDF @app.get("/download/{filename}") def download(filename: str): # Prepare file path temp_dir = tempfile.mkdtemp() pdf_file_path = os.path.join(temp_dir, filename) # Generate the PDF file generate_pdf(pdf_file_path) # Return the generated file as a response return FileResponse(pdf_file_path)