Files
schoolplanner/main.py

996 lines
36 KiB
Python
Raw Permalink Normal View History

2025-01-19 00:04:08 +01:00
import sqlite3
from datetime import datetime, timedelta, time
from fastapi import FastAPI, HTTPException, Depends, WebSocket, WebSocketDisconnect, Header
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from fastapi.responses import FileResponse
from typing import List, Dict, Optional
import uuid
import os
import json
import asyncio
from starlette.websockets import WebSocketState
from fpdf import FPDF
import tempfile
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# SQLite database setup
DATABASE_NAME = "school.db"
SUBJECTS = ["De", "Ma", "NW", "En", "Gs", "Re", "WN", "Fr", "Sp", "Sn"]
ROOM_NAMES = [
{"number": "1", "room_number": "0201", "info": "(Hörsaal)", "location": "Osttrakt"},
{"number": "2", "room_number": "0202", "info": "(NW)", "location": "Osttrakt"},
{"number": "3", "room_number": "0203", "info": "(NW)", "location": "Osttrakt"},
{"number": "4", "room_number": "0204", "info": "(NW)", "location": "Osttrakt"},
{"number": "5", "room_number": "0205", "info": "(NW)", "location": "Osttrakt"},
{"number": "6", "room_number": "0214", "info": "(NW)", "location": "Lichthof"},
{"number": "7", "room_number": "0215", "info": "(NW)", "location": "Lichthof"},
{"number": "8", "room_number": "0216", "info": "(NW)", "location": "Flur zum Lichthof"},
{"number": "9", "room_number": "0217", "info": "(NW)", "location": "Flur zum Lichthof"},
{"number": "10", "room_number": "0265", "info": "(NW)", "location": "Lichthof"},
{"number": "11", "room_number": "0266", "info": "(NW)", "location": "Flur zum Lichthof"},
{"number": "12", "room_number": "0267", "info": "(NW)", "location": "Flur zum Lichthof"},
{"number": "13", "room_number": "1860", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "14", "room_number": "1861", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "15", "room_number": "1862", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "16", "room_number": "1863", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "17", "room_number": "1864", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "18", "room_number": "1865", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "19", "room_number": "1866", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "20", "room_number": "1867", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "21", "room_number": "1868", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "22", "room_number": "1869", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "23", "room_number": "1870", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "24", "room_number": "2860", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "25", "room_number": "2861", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "26", "room_number": "2862", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "27", "room_number": "2863", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "28", "room_number": "2864", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "29", "room_number": "2865", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "30", "room_number": "2866", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "31", "room_number": "2867", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "32", "room_number": "2868", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "33", "room_number": "2869", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "34", "room_number": "2870", "info": "Jahrgang Südwest", "location": "2. OG"}
]
room_name_dict = {room['number']: room for room in ROOM_NAMES}
active_connections: Dict[str, WebSocket] = {}
#CURRENT_TIME_HOUR_MINUTE = datetime.now().strftime("%H:%M")
CURRENT_DATE = datetime.now().strftime("%Y-%m-%d")
CURRENT_TIME_HOUR_MINUTE = "9:10"
# Helper function to connect to the database
def get_db_connection():
conn = sqlite3.connect(DATABASE_NAME)
conn.row_factory = sqlite3.Row
return conn
# Database tables creation on startup
def create_tables():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS rooms (
room_number TEXT NOT NULL,
max_students INTEGER NOT NULL,
teacher_name TEXT NOT NULL,
is_open BOOLEAN NOT NULL,
lesson_date TEXT NOT NULL, -- Can also be DATETIME if you need both date and time
lesson_time TEXT NOT NULL,
unique_id TEXT PRIMARY KEY, -- Ensure unique_id is the primary key
UNIQUE(room_number, lesson_date, lesson_time) -- Composite unique constraint
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS student_joins (
student_username TEXT NOT NULL,
room_unique_id TEXT NOT NULL,
join_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (student_username, room_unique_id),
FOREIGN KEY (student_username) REFERENCES students(username) ON DELETE CASCADE,
FOREIGN KEY (room_unique_id) REFERENCES rooms(unique_id) ON DELETE CASCADE
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS students (
username TEXT PRIMARY KEY NOT NULL,
password TEXT NOT NULL,
first_name TEXT,
last_name TEXT,
user_class TEXT,
unique_id TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS teachers (
username TEXT PRIMARY KEY,
password TEXT,
first_name TEXT,
last_name TEXT,
subjects TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
username TEXT,
role TEXT
)
''')
# Clear all rooms and sessions on startup
cursor.execute("DELETE FROM rooms")
cursor.execute("DELETE FROM sessions")
cursor.execute("DELETE FROM student_joins")
conn.commit()
conn.close()
# Models
class RoomCreate(BaseModel):
room_number: str
max_students: int
session_id: str
lesson_time: str
lesson_date: str
class Student(BaseModel):
username: str
class RoomInfo(BaseModel):
room_number: str
max_students: int
current_students: List[str]
teacher_name: str
class User_register(BaseModel):
username: str
first_name: str
last_name: str
user_class: str
password: str
class User(BaseModel):
username: str
password: str
class Session(BaseModel):
session_id: str
class Teacher_User(BaseModel):
username: str
password: str
teacher_secret_password: str
first_name: str
last_name: str
subjects: str
def current_role(session_id: str):
conn = get_db_connection()
cursor = conn.cursor()
# Check if the session ID exists in the sessions table
cursor.execute("SELECT role FROM sessions WHERE session_id = ?", (session_id,))
role: list[sqlite3.Row] = cursor.fetchone()
conn.close()
return role
# Helper functions
def get_current_user(session_id: str):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM sessions WHERE session_id = ?", (session_id,))
session = cursor.fetchone()
if session is None:
raise HTTPException(status_code=404, detail="Session not found")
conn.close()
return session
def subjects_avaible(liste, worte):
for fach in worte:
if fach not in liste:
return False, fach # Gibt False und das nicht gefundene Fach zurück
return True, None
async def check_lesson_time():
print(f"Checking for the event at {datetime.now()}...")
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
"""
UPDATE rooms
SET is_open = ?
WHERE lesson_date = ? AND lesson_time = ?
""",
(False, CURRENT_DATE, CURRENT_TIME_HOUR_MINUTE)
)
conn.commit()
conn.close()
event_happened = False # Replace with your actual condition
if event_happened:
print("Event occurred!")
return True
return False
except:
print("no rooms")
# Calculate seconds until the next target time
def seconds_until_next_interval():
now = datetime.now()
next_minute = (now.minute // 5 + 1) * 5 # Next multiple of 5 minutes
if next_minute >= 60: # Handle hour overflow
next_hour = now.hour + 1 if now.hour < 23 else 0
next_time = datetime(now.year, now.month, now.day, next_hour, 0, 0)
else:
next_time = datetime(now.year, now.month, now.day, now.hour, next_minute, 0)
return (next_time - now).total_seconds()
# Background task to check the event based on real-world time
async def periodic_event_checker():
while True:
try:
event_detected = await check_lesson_time()
if event_detected:
print("Event detected! Taking action.")
# Add any action you want to take when the event occurs
# You can also exit the loop if needed
break
sleep_time = seconds_until_next_interval()
print(f"Sleeping for {sleep_time:.2f} seconds until the next check.")
await asyncio.sleep(sleep_time)
except Exception as e:
print(f"Error in background task: {e}")
await asyncio.sleep(5 * 60) # Retry after 5 minutes in case of an error
import logging
logging.basicConfig(level=logging.DEBUG)
def is_open_update(unique_id: str):
conn = get_db_connection()
cursor = conn.cursor()
# Check the current number of students in the room
cursor.execute("""
SELECT COUNT(s.username) AS current_students,
COALESCE(r.max_students, 100) AS max_students,
COALESCE(r.is_open, 0) AS is_open
FROM students s
JOIN student_joins sj ON s.username = sj.student_username
JOIN rooms r ON r.unique_id = sj.room_unique_id
WHERE sj.room_unique_id = ?
""", (unique_id,))
result = cursor.fetchone()
if result is None:
logging.debug(f"No room found with unique_id: {unique_id}")
conn.close()
return
current_students = int(result['current_students'])
max_students = int(result['max_students'])
current_is_open = int(result['is_open'])
logging.debug(f"current_students: {current_students}, max_students: {max_students}, current_is_open: {current_is_open}")
# If the number of students exceeds or equals the max students, close the room
if current_students >= max_students:
if current_is_open != 0: # Check if the room is not already closed
cursor.execute("""
UPDATE rooms
SET is_open = 0
WHERE unique_id = ?
""", (unique_id,))
conn.commit()
else:
if current_is_open != 1: # Check if the room is not already
cursor.execute("""
UPDATE rooms
SET is_open = 1
WHERE unique_id = ?
""", (unique_id,))
conn.commit()
conn.close()
# Clear all data when the application starts
@app.on_event("startup")
def startup_event():
create_tables()
asyncio.create_task(periodic_event_checker())
@app.post("/teacher/register")
def teacher_register(user: Teacher_User):
conn = get_db_connection()
cursor = conn.cursor()
# Check if username exists in either teachers or students
cursor.execute("SELECT * FROM teachers WHERE username = ?", (user.username,))
if cursor.fetchone():
conn.close()
raise HTTPException(status_code=400, detail="Username already exists as a teacher")
cursor.execute("SELECT * FROM students WHERE username = ?", (user.username,))
if cursor.fetchone():
conn.close()
raise HTTPException(status_code=400, detail="Username already exists as a student")
if user.teacher_secret_password != "chrissi":
conn.close()
raise HTTPException(status_code=403, detail="Incorrect teacher secret password")
teacher_subjects = [fach.strip() for fach in user.subjects.split(",")]
result, error_subject = subjects_avaible(SUBJECTS, teacher_subjects)
if result == False:
return{"message": "Unavaible Subject", "subject": error_subject}
cursor.execute("INSERT INTO teachers (username, password, first_name, last_name, subjects) VALUES (?, ?, ?, ?, ?)", (user.username, user.password, user.first_name, user.last_name, user.subjects))
conn.commit()
# Create session for teacher
session_id = str(uuid.uuid4())
cursor.execute("INSERT INTO sessions (session_id, username, role) VALUES (?, ?, ?)", (session_id, user.username, "teacher"))
conn.commit()
conn.close()
return {"message": "Teacher registered successfully", "session_id": session_id, "username": user.username}
@app.post("/student/register")
def student_register(user: User_register):
conn = get_db_connection()
cursor = conn.cursor()
# Check if username exists in either students or teachers
cursor.execute("SELECT * FROM students WHERE username = ?", (user.username,))
if cursor.fetchone():
conn.close()
raise HTTPException(status_code=400, detail="Benutzername bereits vergeben")
cursor.execute("SELECT * FROM teachers WHERE username = ?", (user.username,))
if cursor.fetchone():
conn.close()
raise HTTPException(status_code=400, detail="Benutzername bereits vergeben")
cursor.execute("INSERT INTO students (username, password, first_name, last_name, user_class) VALUES (?, ?, ?, ?, ?)", (user.username, user.password, user.first_name, user.last_name, user.user_class))
conn.commit()
# Create session for student
session_id = str(uuid.uuid4())
cursor.execute("INSERT INTO sessions (session_id, username, role) VALUES (?, ?, ?)", (session_id, user.username, "student"))
conn.commit()
conn.close()
return {"message": "Schüler erfolgreich registriert", "session_id": session_id, "username": user.username}
@app.post("/login")
def login(user: User):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM teachers WHERE username = ?", (user.username,))
teacher = cursor.fetchone()
if teacher:
if teacher["password"] != user.password:
conn.close()
raise HTTPException(status_code=403, detail="Anmeldung Fehlgeschlagen! Falscher Benutzername oder falsches Passwort")
role = "teacher"
else:
cursor.execute("SELECT * FROM students WHERE username = ?", (user.username,))
student = cursor.fetchone()
if student:
if student["password"] != user.password:
conn.close()
raise HTTPException(status_code=403, detail="Anmeldung Fehlgeschlagen! Falscher Benutzername oder falsches Passwort")
role = "student"
else:
conn.close()
raise HTTPException(status_code=404, detail="Anmeldung Fehlgeschlagen! Falscher Benutzername oder falsches Passwort")
session_id = str(uuid.uuid4())
cursor.execute("INSERT INTO sessions (session_id, username, role) VALUES (?, ?, ?)", (session_id, user.username, role))
conn.commit()
conn.close()
return {"session_id": session_id, "username": user.username}
@app.post("/teacher/create_room")
def create_room(room: RoomCreate):
lesson_date = datetime.strptime(room.lesson_date, "%Y-%m-%d")
today = datetime.today().date()
if lesson_date.date() < today:
raise HTTPException(status_code=400, detail="Lesson date cannot be in the past")
# Check if the lesson date is today, but the lesson time has already passed
if lesson_date.date() == today:
lesson_time = datetime.strptime(room.lesson_time, "%H:%M").time()
current_time = datetime.now().time()
#if lesson_time < current_time:
# raise HTTPException(status_code=400, detail="Lesson time has already passed today")
user = get_current_user(room.session_id)
if user['role'] != "teacher":
raise HTTPException(status_code=403, detail="Only teachers can create rooms")
unique_id = str(uuid.uuid4())
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM rooms WHERE room_number = ? AND lesson_date = ? AND lesson_time = ?", (room.room_number, room.lesson_date, room.lesson_time,))
if cursor.fetchone():
conn.close()
raise HTTPException(status_code=400, detail="Room already exists")
cursor.execute(
"INSERT INTO rooms (room_number, max_students, teacher_name, is_open, lesson_date, lesson_time, unique_id) VALUES (?, ?, ?, ?, ?, ?, ?)",
(
room.room_number,
room.max_students,
user['username'],
True,
room.lesson_date,
room.lesson_time,
unique_id,
)
)
conn.commit()
conn.close()
return {"message": "Room created successfully", "unique_id": unique_id}
@app.delete("/teacher/delete_room/")
def delete_room(unique_id: str, session: Session):
user = get_current_user(session.session_id)
if user['role'] != "teacher":
raise HTTPException(status_code=403, detail="Only teachers can delete rooms")
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM rooms WHERE unique_id = ?", (unique_id,))
room = cursor.fetchone()
if room is None:
conn.close()
raise HTTPException(status_code=404, detail="Room not found")
if room["teacher_name"] != user['username']:
conn.close()
raise HTTPException(status_code=403, detail="You do not have permission to delete this room")
cursor.execute("DELETE FROM rooms WHERE unique_id = ?", (unique_id,))
conn.commit()
conn.close()
return {"message": f"Room {unique_id} deleted successfully by {user['username']}"}
@app.post("/teacher/room_students/")
def get_room_students(unique_id: str, session: Session):
user = get_current_user(session.session_id)
if user['role'] != "teacher":
raise HTTPException(status_code=403, detail="Only teachers can view students in their room")
conn = get_db_connection()
cursor = conn.cursor()
# Check if the room exists and if it's owned by the current teacher
cursor.execute("SELECT * FROM rooms WHERE unique_id = ? AND teacher_name = ?", (unique_id, user['username']))
room = cursor.fetchone()
if room is None:
conn.close()
raise HTTPException(status_code=404, detail="Room not found or not owned by this teacher")
# Fetch the students who are currently joined to the room
cursor.execute("""
SELECT s.username
FROM students s
JOIN student_joins sj ON s.username = sj.student_username
WHERE sj.room_unique_id = ?
""", (unique_id,))
students = [row['username'] for row in cursor.fetchall()]
conn.close()
return {"unique_id": unique_id, "students": students}
@app.websocket("/ws/student/my_room")
async def websocket_get_my_room(websocket: WebSocket, session_id: Optional[str] = Header(None)):
await websocket.accept()
# Validate session ID and get user
user = get_current_user(session_id)
if not user or user['role'] != "student":
await websocket.close(code=403)
return
# Store active connection
active_connections[user['username']] = websocket
try:
while True:
# Simulate periodic room and teacher subjects fetch
conn = get_db_connection()
cursor = conn.cursor()
# Fetch rooms the student has joined
cursor.execute(
"""
SELECT r.unique_id, r.room_number, r.teacher_name, r.max_students, r.lesson_date, r.lesson_time,
t.first_name, t.last_name
FROM student_joins sj
JOIN rooms r ON sj.room_unique_id = r.unique_id
JOIN teachers t ON r.teacher_name = t.username
WHERE sj.student_username = ?
""",
(user['username'],)
)
rooms = cursor.fetchall()
# Fetch teacher subjects
cursor.execute("SELECT username, subjects FROM teachers")
teacher_subjects_raw = cursor.fetchall()
teacher_subjects = {row[0]: row[1] for row in teacher_subjects_raw}
rooms_list = []
for room in rooms:
room_dict = dict(room)
room_number = room_dict.get("room_number")
teacher_name = room_dict.get("teacher_name")
# Add additional room details from room_name_dict
if room_number and room_number in room_name_dict:
room_dict.update(room_name_dict[room_number])
# Add teacher subjects if available
if teacher_name and teacher_name in teacher_subjects:
room_dict["subjects"] = teacher_subjects[teacher_name]
# Fetch current student count for the room
cursor.execute(
"""
SELECT COUNT(s.username) as current_students
FROM students s
JOIN student_joins sj ON s.username = sj.student_username
WHERE sj.room_unique_id = ?
""",
(room_dict["unique_id"],)
)
current_students = cursor.fetchone()["current_students"]
room_dict["current_students"] = current_students
rooms_list.append(room_dict)
conn.close()
if rooms_list:
await websocket.send_json({"rooms": rooms_list})
else:
await websocket.send_json({"error": "Student has not joined any rooms"})
await asyncio.sleep(2) # Poll every 2 seconds
except WebSocketDisconnect:
del active_connections[user['username']]
@app.websocket("/ws/open_rooms")
async def websocket_open_rooms(websocket: WebSocket, session_id: Optional[str] = Header(None)):
await websocket.accept()
# Validate session ID and get user
user = get_current_user(session_id)
if not user or user['role'] != "student":
await websocket.close(code=403)
return
# Store active connection
active_connections[user['username']] = websocket
try:
while True:
# Simulate periodic room fetch
conn = sqlite3.connect('school.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Fetch all rooms, both open and closed
cursor.execute("""
SELECT r.unique_id, r.room_number, r.teacher_name, r.max_students,
COUNT(s.username) AS current_students,
r.is_open,
r.lesson_date, r.lesson_time,
t.first_name, t.last_name, t.subjects,
CASE
WHEN EXISTS (
SELECT 1
FROM student_joins sj
JOIN sessions ss ON sj.student_username = ss.username
WHERE sj.room_unique_id = r.unique_id AND ss.session_id = ?
) THEN 1
ELSE 0
END AS joined
FROM rooms r
LEFT JOIN student_joins sj ON r.unique_id = sj.room_unique_id
LEFT JOIN students s ON sj.student_username = s.username
LEFT JOIN teachers t ON r.teacher_name = t.username
WHERE r.lesson_date = ?
GROUP BY r.unique_id
""", (session_id, CURRENT_DATE,))
all_rooms = []
for row in cursor.fetchall():
room = dict(row)
room_number = str(room['room_number'])
room_info = room_name_dict.get(room_number, {})
room.update({
'room_number': int(room_number),
'room_name': room_info.get('room_number', ''),
'info': room_info.get('info', ''),
'location': room_info.get('location', ''),
'joined': bool(room['joined']), # Whether the user joined
'max_students': room['max_students'], # Maximum capacity
'current_students': room['current_students'], # Current count
'is_open': bool(room['is_open']), # Room open status
})
all_rooms.append(room)
conn.close()
if all_rooms:
await websocket.send_json({"rooms": all_rooms})
else:
await websocket.send_json({"message": "No rooms available"})
await asyncio.sleep(1)
except HTTPException:
await websocket.send_json({"error": "Session not found"})
await websocket.close(code=404)
return
except WebSocketDisconnect:
del active_connections[user['username']]
@app.post("/student/join_room/")
def join_room(unique_id: str, session: Session):
user = get_current_user(session.session_id)
if user['role'] != "student":
raise HTTPException(status_code=403, detail="Nur Schüler können diesem Raum beitreten")
conn = get_db_connection()
cursor = conn.cursor()
# Check if the room exists
cursor.execute("SELECT * FROM rooms WHERE unique_id = ?", (unique_id,))
room = cursor.fetchone()
if room is None:
conn.close()
raise HTTPException(status_code=404, detail="Raum nicht gefunden")
# Check if the room is open
if not room["is_open"]:
conn.close()
raise HTTPException(status_code=400, detail="Raum ist geschlossen")
# Check if the student is already in a room at the same time (same lesson_date and lesson_time)
cursor.execute("""
SELECT r.unique_id
FROM student_joins sj
JOIN rooms r ON sj.room_unique_id = r.unique_id
WHERE sj.student_username = ? AND r.lesson_date = ? AND r.lesson_time = ?
""", (user['username'], room["lesson_date"], room["lesson_time"]))
conflicting_room = cursor.fetchone()
if conflicting_room:
conn.close()
raise HTTPException(status_code=400, detail="Du bist bereits einem Raum für die gleiche Stunde zugewiesen")
# Check the current number of students in the room
cursor.execute("""
SELECT COUNT(sj.student_username) as current_students
FROM student_joins sj
WHERE sj.room_unique_id = ?
""", (unique_id,))
current_students = cursor.fetchone()["current_students"]
if current_students >= room["max_students"]:
conn.close()
raise HTTPException(status_code=400, detail="Raum ist voll")
# Add the student to the room by inserting into student_joins table
cursor.execute("""
INSERT INTO student_joins (student_username, room_unique_id)
VALUES (?, ?)
""", (user['username'], unique_id))
conn.commit()
conn.close()
is_open_update(unique_id)
return {"message": f"Joined room {unique_id}"}
@app.post("/student/leave_room/")
def leave_room(unique_id: str, session: Session):
user = get_current_user(session.session_id)
if user['role'] != "student":
raise HTTPException(status_code=403, detail="Only students can leave rooms")
conn = get_db_connection()
cursor = conn.cursor()
# Check if the student is in the specified room by checking the student_joins table
cursor.execute("SELECT * FROM student_joins WHERE student_username = ? AND room_unique_id = ?",
(user['username'], unique_id))
student_join = cursor.fetchone()
if not student_join:
conn.close()
raise HTTPException(status_code=400, detail="You are not in the specified room")
# Remove the student from the room by deleting the entry from student_joins
cursor.execute("DELETE FROM student_joins WHERE student_username = ? AND room_unique_id = ?",
(user['username'], unique_id))
conn.commit()
conn.close()
is_open_update(unique_id)
return {"message": f"You have left room {unique_id}"}
@app.post("/check_session")
def check_session(session: Session):
conn = get_db_connection()
cursor = conn.cursor()
# Check if the session ID exists in the sessions table
cursor.execute("SELECT * FROM sessions WHERE session_id = ?", (session.session_id,))
session_data = cursor.fetchone()
conn.close()
if session_data:
return {"available": True}
else:
return {"available": False}
@app.post("/check_role")
def check_role(session: Session):
session_data = current_role(session.session_id)
if session_data:
return {"role": session_data}
else:
return {"role": None}
@app.websocket("/ws/teacher/open_rooms")
async def websocket_list_open_rooms_teacher(websocket: WebSocket, session_id: Optional[str] = Header(None, alias="session-id")):
await websocket.accept()
try:
# Get the current user and their role based on session_id
info = get_current_user(session_id)
# Convert info to a dictionary if it's a sqlite3.Row
if isinstance(info, sqlite3.Row):
info = dict(info) # Convert sqlite3.Row to dict
role = current_role(session_id)
teacher_username = info.get("username", "") # Access username from the info dictionary
except Exception as e:
print(f"Session validation error: {e}")
try:
await websocket.send_json({"error": "Session not found"})
except WebSocketDisconnect:
pass # Ignore if the client has disconnected
await websocket.close(code=4004)
return
role_final = ''.join([elt for elt in role])
# Check if the role is 'teacher'
if role_final != "teacher":
try:
await websocket.send_json({"message": "You are not a Teacher"})
except WebSocketDisconnect:
pass # Ignore if the client has disconnected
await websocket.close(code=1008)
return
try:
# Fetch and send room data for the teacher
await send_room_data(websocket, teacher_username)
while True:
await asyncio.sleep(2)
await send_room_data(websocket, teacher_username)
except WebSocketDisconnect:
print("WebSocket disconnected")
except Exception as e:
print(f"Unexpected error: {e}")
try:
await websocket.send_json({"error": "An internal server error occurred."})
except WebSocketDisconnect:
pass # Ignore if the client has disconnected
await websocket.close(code=1011)
async def send_room_data(websocket: WebSocket, teacher_username: str):
try:
conn = get_db_connection()
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Update the query to filter rooms by the teacher's username
cursor.execute("""
SELECT r.unique_id, r.room_number, r.teacher_name, r.max_students,
COUNT(s.username) AS current_students
FROM rooms r
LEFT JOIN student_joins sj ON r.unique_id = sj.room_unique_id
LEFT JOIN students s ON sj.student_username = s.username
WHERE r.is_open = 1 AND r.teacher_name = ?
GROUP BY r.unique_id
ORDER BY r.room_number
""", (teacher_username,))
open_rooms = []
for row in cursor.fetchall():
room = dict(row) # Convert the row to a dictionary
room_number = str(room['room_number'])
room_info = room_name_dict.get(room_number, {})
room.update({
'room_number': int(room_number),
'room_name': room_info.get('room_number', ''),
'info': room_info.get('info', ''),
'location': room_info.get('location', ''),
})
open_rooms.append(room)
conn.close()
if open_rooms:
try:
await websocket.send_json({"open_rooms": open_rooms})
except WebSocketDisconnect:
pass # Ignore if the client has disconnected
else:
try:
await websocket.send_json({"message": "No open rooms available"})
except WebSocketDisconnect:
pass # Ignore if the client has disconnected
except Exception as e:
print(f"Error fetching or sending room data: {e}")
try:
await websocket.send_json({"error": "Failed to fetch room data"})
except WebSocketDisconnect:
pass # Ignore if the client has disconnected
@app.get("/teacher/all_room_information")
def all_room_information():
return ROOM_NAMES
@app.get("/test")
def test():
conn = get_db_connection()
cursor = conn.cursor()
# Check if the session ID exists in the sessions table
cursor.execute("SELECT * FROM students WHERE username = ?", ("hi",))
session_data = cursor.fetchone()
print(session_data)
conn.close()
if session_data:
print(session_data) # Liste der Fächer ausgeben
return {"subjects": session_data} # Liste der Fächer zurückgeben
else:
return {"error": "Benutzer nicht gefunden oder keine Fächer vorhanden"}
@app.websocket("/ws/student/test")
async def websocket_get_my_room(websocket: WebSocket, session_id: Optional[str] = Header(None)):
await websocket.accept()
# Validate session ID
user = get_current_user(session_id)
for i in user:
print(i)
if not user:
await websocket.close(code=403)
return
# Store connection
active_connections[user['username']] = websocket
counter = 0 # Initialize counter to 0
try:
while True:
# Send incrementing counter value every second
counter += 1
await websocket.send_text(f"Counter: {counter}")
await asyncio.sleep(1) # Wait 1 second before sending the next message
except WebSocketDisconnect:
del active_connections[user['username']]
print(f"Connection closed for user: {user['username']}")
def generate_pdf(file_path):
class PDF(FPDF):
def header(self):
self.set_font("Arial", "B", 16)
today = "01.01.2025"
self.cell(0, 10, f"Attendance Sheet - {today}", ln=True, align="C")
self.ln(10)
pdf = PDF()
pdf.add_page()
pdf.set_font("Arial", size=12)
# Column headers
pdf.cell(80, 10, "Namen", border=1, align="C")
pdf.cell(50, 10, "Anwesend", border=1, align="C")
pdf.cell(50, 10, "Abwesend", border=1, align="C")
pdf.ln()
# Names list
names = [
"Jasper Grevsmühl",
"Papa Grevsmühl",
"Clara Müller",
"Anna Schmidt",
"Max Mustermann",
]
for name in names:
pdf.cell(80, 10, name, border=1)
pdf.cell(50, 10, "", border=1)
pdf.cell(50, 10, "", border=1)
pdf.ln()
# Save the PDF to the provided file path
pdf.output(file_path)
# FastAPI route for downloading the PDF
@app.get("/download/{filename}")
def download(filename: str):
# Prepare file path
temp_dir = tempfile.mkdtemp()
pdf_file_path = os.path.join(temp_dir, filename)
# Generate the PDF file
generate_pdf(pdf_file_path)
# Return the generated file as a response
return FileResponse(pdf_file_path)