initial commit

This commit is contained in:
2025-01-19 00:04:08 +01:00
parent 2298827ff3
commit e37de4a7a4
21 changed files with 2926 additions and 0 deletions

Binary file not shown.

35
frontend/main.py Normal file
View File

@@ -0,0 +1,35 @@
import flet as ft
from urllib.parse import urlparse, parse_qs
from pages.main_page import main_page # Import main page function
from pages.login_page import login_page # Import login page function
from pages.register_teacher_page import register_teacher_page # Import register page function
from pages.register_student_page import register_student_page # Import search page function
from pages.create_page import create_page # Import create page function7
from pages.join_page import join_page
# Define your routes
ROUTES = {
"/": main_page,
"/login": login_page,
"/register_teacher": register_teacher_page,
"/register_student": register_student_page,
"/create": create_page,
"/join": join_page
}
def main(page: ft.Page):
def route_change(e: ft.RouteChangeEvent):
page.clean()
parsed_url = urlparse(e.route) # Parse the route
query_params = parse_qs(parsed_url.query) # Extract query parameters
render_function = ROUTES.get(parsed_url.path, main_page)
render_function(page)
page.on_route_change = route_change # Set up route change handling
page.go(page.route) # Navigate to the current route
if __name__ == "__main__":
ft.app(target=main, view=ft.AppView.WEB_BROWSER, port=9000)

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,199 @@
import flet as ft
import requests
import threading
import asyncio
import websockets
from websockets import connect # Ensure this is imported
import json
from datetime import datetime
def student_main_page(page: ft.Page):
page.clean()
page.theme_mode = ft.ThemeMode.LIGHT # Set the theme to light
page.title = "Room Information"
page.vertical_alignment = ft.MainAxisAlignment.START # Align all content to the top
page.scroll = "adaptive"
page.padding = 20
# Container for lessons
lessons_container = ft.Column(alignment=ft.MainAxisAlignment.CENTER, spacing=20)
# Buttons
test_button = ft.TextButton(
text="Auch Räume an anderen Tagen anzeigen",
on_click=lambda e: page.go("/all_rooms"),
style=ft.ButtonStyle(padding=20),
visible=False, # Initially hidden
)
join_more_rooms_button = ft.TextButton(
text="Tritt noch mehr Räume bei",
on_click=lambda e: page.go("/join"),
style=ft.ButtonStyle(padding=25),
)
def leave_current_room(e=None, unique_id=None):
if not unique_id:
page.snack_bar = ft.SnackBar(ft.Text("No room selected to leave."))
page.snack_bar.open = True
page.update()
return
url = f"http://awesom-o.org:8000/student/leave_room/?unique_id={unique_id}"
headers = {"accept": "application/json", "Content-Type": "application/json"}
data = {"session_id": page.session.get("access_token")}
try:
response = requests.post(url, json=data, headers=headers)
response.raise_for_status()
if e:
page.snack_bar = ft.SnackBar(ft.Text(f"Left Room {unique_id}"))
page.snack_bar.open = True
page.update()
except requests.RequestException as error:
# Handle all bad responses
error_detail = "Something went wrong."
if hasattr(error, "response") and error.response:
try:
error_detail = error.response.json().get("detail", "Something went wrong.")
except json.JSONDecodeError:
error_detail = error.response.text or "Something went wrong."
page.snack_bar = ft.SnackBar(ft.Text(f"Error: {error_detail}"))
page.snack_bar.open = True
page.update()
async def get_rooms():
uri = "ws://localhost:8000/ws/student/my_room"
headers = {"session-id": page.session.get("access_token")}
try:
async with websockets.connect(uri, extra_headers=headers) as websocket:
while True:
message = await websocket.recv()
data = json.loads(message)
# Clear previous lessons
lessons_container.controls.clear()
if "error" in data:
# Show "No Rooms for Today" message
lessons_container.controls.append(
ft.Text(
"Für heute hast du noch keine Räume ausgewählt!",
size=50,
weight="bold",
text_align="center",
)
)
else:
rooms = data.get("rooms", [])
# Filter and sort lessons for today
today = datetime.now().date()
today_lessons = [room for room in rooms if room["lesson_date"] == str(today)]
today_lessons.sort(key=lambda x: x["lesson_time"])
# Check if there are rooms on other dates
other_date_rooms = [room for room in rooms if room["lesson_date"] != str(today)]
test_button.visible = bool(other_date_rooms) # Show button if there are rooms on other dates
if today_lessons:
for lesson in today_lessons:
# Create lesson card
room_card = ft.Card(
content=ft.Container(
content=ft.Row(
[
# Room details
ft.Column(
[
# Top line: room_number, info, location
ft.Row(
[
ft.Text(
f"{lesson['room_number']} {lesson['info']}",
size=20,
weight="bold",
),
ft.Text(
f"| {lesson['location']}",
size=20,
weight="bold",
),
],
spacing=5,
),
# Second line: first_name and last_name
ft.Text(
f"Teacher: {lesson['first_name']} {lesson['last_name']}",
size=16,
),
# Third line: lesson_time
ft.Text(
f"Time: {lesson['lesson_time']}",
size=16,
),
],
expand=True,
alignment=ft.MainAxisAlignment.CENTER,
),
# IconButton on the right
ft.IconButton(
icon=ft.icons.EXIT_TO_APP,
on_click=lambda e, lesson_id=lesson['unique_id']: leave_current_room(e, lesson_id),
tooltip="View Room Details",
),
],
alignment=ft.MainAxisAlignment.SPACE_BETWEEN,
),
padding=20, # Add padding inside the card
),
margin=10,
elevation=5, # Add shadow for better visual appeal
)
lessons_container.controls.append(room_card)
else:
# Show "No Rooms for Today" message
lessons_container.controls.append(
ft.Text(
"No Rooms for Today",
size=50,
weight="bold",
text_align="center",
)
)
# Update page
page.update()
except Exception as e:
lessons_container.controls.clear()
lessons_container.controls.append(ft.Text(f"Error: {e}", size=50, weight="bold", text_align="center"))
page.update()
def start_get_rooms():
# Ensure a new event loop in the thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(get_rooms())
# Start the WebSocket client in a thread
import threading
threading.Thread(target=start_get_rooms, daemon=True).start()
# Add components to page
page.add(
ft.Column(
[
# Lessons container (centered)
ft.Column(
[lessons_container],
alignment=ft.MainAxisAlignment.CENTER,
expand=True,
),
# Buttons in one row at the bottom
ft.Row(
[test_button, join_more_rooms_button],
alignment=ft.MainAxisAlignment.CENTER,
spacing=20, # Add spacing between buttons
),
],
expand=True,
)
)

View File

@@ -0,0 +1,288 @@
import flet as ft
import requests
import datetime
def create_page(page: ft.Page):
page.clean()
page.theme_mode = ft.ThemeMode.LIGHT # Set the theme to light
def is_student():
url = 'http://127.0.0.1:8000/check_role'
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
session_id = page.session.get("access_token")
if not session_id:
print("Session ID is missing.")
return None
data = {'session_id': session_id}
try:
response = requests.post(url, json=data, headers=headers)
response.raise_for_status()
role = response.json()
if 'role' in role and 'role' in role['role']:
if role['role']['role'] == "student":
role = "s"
page.go("/login")
return role
elif role['role']['role'] == "teacher":
role = "t"
return role
print("Unexpected response structure:", role)
return None
except requests.exceptions.RequestException as error:
print("Error during role check:", error)
return None
page.title = "Create Room Page"
if is_student == "s":
page.gp("/")
session = page.session.get("access_token")
if not session:
page.go("/login")
# Initialize selected room state (use a simple variable instead of ft.State)
selected_room = ""
selected_date = ""
selected_time = ""
SEARCH_RESULT = ""
date_text = ft.Text(
"", # Initially empty
size=18, # Font size
weight=ft.FontWeight.BOLD, # Make the text bold
color="black", # Text colo
visible=False # Initially hidden
)
# Function to fetch room data
def fetch_rooms():
try:
response = requests.get("http://awesom-o.org:8000/teacher/all_room_information")
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print("Error fetching room data:", e)
return []
# Fetch rooms initially (don't display them immediately)
room_data = fetch_rooms()
# Function to filter room data
def search_rooms(query):
query = query.lower()
return [
room for room in room_data
if query in room["room_number"].lower()
or query in room["info"].lower()
or query in room["location"].lower()
]
# Search results container
search_results = ft.Column(
visible=False, # Initially hidden
scroll=ft.ScrollMode.AUTO,
expand=True
)
# Update search results
def update_results(query):
if query.strip() == "": # When the search field is empty, show all rooms
search_results.controls.clear()
for room in room_data:
search_results.controls.append(
ft.ListTile(
title=ft.Text(f"Room {room['room_number']}: {room['info']}"),
subtitle=ft.Text(room['location']),
on_click=lambda e, r=room: select_room(r),
)
)
search_results.visible = True
else:
search_results.controls.clear()
filtered_rooms = search_rooms(query)
for room in filtered_rooms:
search_results.controls.append(
ft.ListTile(
title=ft.Text(f"Room {room['room_number']}: {room['info']}"),
subtitle=ft.Text(room['location']),
on_click=lambda e, r=room: select_room(r),
)
)
search_results.visible = True
close_button.visible = search_results.visible # Show close button when results are visible
page.update()
# Select a room
def select_room(room):
nonlocal selected_room
nonlocal SEARCH_RESULT
selected_room = f"Room {room['room_number']}: {room['info']} - {room['location']}"
search_bar.value = selected_room
SEARCH_RESULT = room['number']
search_results.visible = False
close_button.visible = False
page.update()
# Close button to hide search results
def close_search_results(e):
search_results.visible = False
close_button.visible = False
page.update()
def handle_change_date_picker(e):
# Once a date is selected, update the Text and make it visible
nonlocal selected_date
selected_date = e.control.value.strftime('%Y-%m-%d')
if selected_time != "":
date_text.value = f"Your Room will be started on: {selected_date} at {selected_time}"
date_text.visible = True # Make the Text visible after date selection
page.update()
else:
date_text.value = f"Your Room will be started on: {selected_date}"
date_text.visible = True # Make the Text visible after date selection
page.update() # Update the page to reflect the changes
def handle_change_dropdown(e):
nonlocal selected_time
selected_time1 = e.control.value
selected_time = selected_time1.replace("Start time: ", "")
if selected_date != "":
date_text.value = f"Your Room will be started on: {selected_date} at {selected_time}"
date_text.visible = True # Make the Text visible after date selection
page.update()
else:
date_text.value = f"Your Room will be started at {selected_time}"
date_text.visible = True # Make the Text visible after date selection
page.update() # Update the page to reflect the changes
def go_to_main(e):
page.go("/") # Replace with the appropriate route for your register page
def create(e):
page.update()
search_results = SEARCH_RESULT.strip()
max_students = max_students_field.value.strip()
selected_date
selected_time
# Validate input
if not search_results or not max_students or not selected_date or not selected_time:
info_label.value = "Please enter Room Number, Max Students, Lesson Date and Lesson Time."
info_label.color = "red"
page.update()
return
try:
max_students = int(max_students)
except ValueError:
info_label.value = "Max Students must be a number."
info_label.color = "red"
page.update()
return
if max_students < 1:
info_label.value = "Please don't enter a negative or zero value for Max Students."
info_label.color = "red"
page.update()
return
try:
url = 'http://127.0.0.1:8000/teacher/create_room'
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
data = {
'room_number': search_results,
'max_students': max_students,
'session_id': page.session.get("access_token"),
'lesson_time': selected_time,
'lesson_date': selected_date
}
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
success_message = "The Room was created Successfully!"
info_label.value = success_message
info_label.color = "green"
else:
error_message = "Something went wrong..."
info_label.value = error_message
info_label.color = "red"
except requests.exceptions.RequestException as e:
info_label.value = f"An error occurred: {str(e)}"
info_label.color = "red"
page.update()
# Search bar
search_bar = ft.TextField(
label="Search for a room",
on_focus=lambda e: update_results(""), # Show all rooms when search bar is focused
on_change=lambda e: update_results(e.control.value),
expand=True # Make the search bar take up available space
)
# Close button
close_button = ft.IconButton(
icon=ft.icons.CLOSE,
on_click=close_search_results,
visible=False # Initially hidden
)
max_students_field = ft.TextField(label="Max Students")
info_label = ft.Text("", theme_style=ft.TextThemeStyle.TITLE_SMALL)
# Add components to the page
page.add(
ft.Row([search_bar, close_button]), # Add search bar and close button in a row
search_results,
max_students_field,
ft.Row([
ft.OutlinedButton(
"Pick the date when your lesson begins",
icon=ft.icons.CALENDAR_MONTH,
height=40,
on_click=lambda e: page.open(
ft.DatePicker(
first_date=datetime.datetime(year=2024, month=11, day=20),
on_change=handle_change_date_picker,
)
),
),
date_text, # Add the styled Text below the button
], spacing=20),
ft.Dropdown(
hint_text="Choose the Start of your Lesson",
options=[
ft.dropdown.Option("Start time: 8:00"),
ft.dropdown.Option("Start time: 9:20"),
ft.dropdown.Option("Start time: 10:40"),
ft.dropdown.Option("Start time: 11:50"),
ft.dropdown.Option("Start time: 12:50"),
ft.dropdown.Option("Start time: 13:55"),
ft.dropdown.Option("Start time: 15:00"),
],on_change=handle_change_dropdown,
label_style=ft.TextStyle(size=16), # Correct way to set font size for Dropdown
),
ft.ElevatedButton("Submit", on_click=create),
info_label,
ft.Row([
ft.TextButton("Enough? Click here to go to the Main Page", on_click=go_to_main),
]),
)
# Example usage
if __name__ == "__main__":
ft.app(target=create_page, view=ft.AppView.WEB_BROWSER)

425
frontend/pages/join_page.py Normal file
View File

@@ -0,0 +1,425 @@
import flet as ft
import asyncio
import websockets
import json
import requests
import threading
from datetime import datetime
def get_initials(user_name: str):
if user_name:
return user_name[0].capitalize()
else:
return "E"
def get_avatar_color(user_name: str):
colors_lookup = [
ft.colors.AMBER,
ft.colors.BLUE,
ft.colors.BROWN,
ft.colors.CYAN,
ft.colors.GREEN,
ft.colors.INDIGO,
ft.colors.LIME,
ft.colors.ORANGE,
ft.colors.PINK,
ft.colors.PURPLE,
ft.colors.RED,
ft.colors.TEAL,
]
return colors_lookup[hash(user_name) % len(colors_lookup)]
def join_page(page: ft.Page):
page.clean()
page.theme_mode = ft.ThemeMode.LIGHT # Set the theme to light
page.title = "Join Rooms"
page.vertical_alignment = ft.MainAxisAlignment.START # Align all content to the top
page.scroll = "adaptive"
page.padding = 20
# Check if the user is logged in
session = page.session.get("access_token")
if not session:
page.go("/login") # Redirect to login if no session
def test(e):
print(e)
page.go("/test")
print("test")
# Search field
search_field = ft.TextField(label="Lehrkraft / Raumnummer ", expand=True)
# Dropdown for time filter
time_filter = ft.Dropdown(
label="Stunde wählen",
options=[
ft.dropdown.Option("8:00", "8:00"),
ft.dropdown.Option("9:20", "9:20"),
ft.dropdown.Option("10:40", "10:40"),
ft.dropdown.Option("11:50", "11:50"),
ft.dropdown.Option("12:50", "12:50"),
ft.dropdown.Option("13:55", "13:55"),
ft.dropdown.Option("15:00", "15:00"),
],
width=170, # Shorter width for time filter
)
# Cross icon button to clear time filter
clear_time_filter = ft.IconButton(
icon=ft.icons.CLOSE,
on_click=lambda e: clear_filter("time"),
tooltip="Clear Time Filter",
)
# Container for the room list
room_container = ft.Column(scroll=ft.ScrollMode.AUTO, expand=True)
def ausloggen(e = None):
page.go("/login")
print(page.session.set("access_token", None))
# Function to clear filters
def clear_filter(filter_type):
if filter_type == "time":
time_filter.value = None
if filter_type == "lesson":
lesson_filter.value = None
update_room_list(json.loads(page.session.get("rooms_data", "[]")))
def update_room_list(rooms):
# Ensure rooms is a list
if not isinstance(rooms, list):
rooms = []
# Apply search filter
search_query = search_field.value.lower()
filtered_rooms = [
room
for room in rooms
if (search_query in str(room.get("room_name", "")).lower() # Search by room_name
or search_query in room.get("info", "").lower() # Search by room_info
or search_query in room.get("location", "").lower() # Search by location
or search_query in room.get("subjects", "").lower() # Search by subjects
or search_query in room.get("first_name", "").lower() # Search by teacher_first_name
or search_query in room.get("last_name", "").lower()) # Search by teacher_last_name
]
if time_filter.value: # Only apply the filter if a time is selected
filtered_rooms = [
room
for room in filtered_rooms
if room.get("lesson_time", "") == time_filter.value
]
if lesson_filter.value: # Only apply the filter if a time is selected
filtered_rooms = [
room
for room in filtered_rooms
if lesson_filter.value.lower() in room.get("subjects", "").lower().split(", ")
]
if show_full_rooms.value == True:
filtered_rooms = [
room
for room in filtered_rooms
if room.get("is_open", "") == True
]
# Apply date filter (only if a date is selected)
room_container.controls.clear()
def page_dialog_click(e, unique_id):
dialog_join = ft.AlertDialog(
modal=True,
title=ft.Text("Bitte bestätige"),
content=ft.Text("Möchtest du diesem Raum wirklich beitreten?"),
actions=[
ft.TextButton("Ja", on_click=lambda e: (page.close(dialog_join), room_clicked(e, unique_id))),
ft.TextButton("Nein", on_click=lambda e: page.close(dialog_join)),
],
actions_alignment=ft.MainAxisAlignment.END,
)
page.dialog = dialog_join
dialog_join.open = True
page.update()
def room_clicked(e, unique_id):
try:
url = f"http://awesom-o.org:8000/student/join_room/?unique_id={unique_id}"
headers = {"accept": "application/json", "Content-Type": "application/json"}
data = {"session_id": page.session.get("access_token")}
response = requests.post(url, json=data, headers=headers)
if response.status_code == 200:
try:
page.snack_bar = ft.SnackBar(ft.Text(f"Raum erfolgreich beigetreten: {unique_id}"))
page.snack_bar.open = True
page.update()
page.go("/")
except:
page.snack_bar = ft.SnackBar(ft.Text(f"Ein fehler ist aufgetreten, bitte melde dich bei der Administration"))
page.snack_bar.open = True
page.update()
else:
dialog_join = ft.AlertDialog(
modal=True,
title=ft.Text("Fehler"),
content=ft.Text(response.json().get("detail")),
actions=[
ft.TextButton("Schließen", on_click=lambda e: page.close(dialog_join)),
],
actions_alignment=ft.MainAxisAlignment.END,
)
page.dialog = dialog_join
dialog_join.open = True
page.update()
except requests.exceptions.RequestException as e:
dialog_join = ft.AlertDialog(
modal=True,
title=ft.Text("Fehler"),
content=ft.Text("Ein Fehler ist aufgetreten"),
actions=[
ft.TextButton("Schließen", on_click=lambda e: page.close(dialog_join)),
],
actions_alignment=ft.MainAxisAlignment.END,
)
page.dialog = dialog_join
dialog_join.open = True
page.update()
for room in filtered_rooms:
room_name = room.get("room_name", "Unknown Room")
room_info = room.get("info", "")
location = room.get("location", "")
first_name = room.get("first_name", "")
last_name = room.get("last_name", "")
subjects = room.get("subjects", "")
lesson_time = room.get("lesson_time", "")
lesson_date = room.get("lesson_date", "")
max_students = room.get("max_students", 0)
current_students = room.get("current_students", 0)
unique_id = room.get("unique_id", "")
is_open = room.get("is_open", True)
joined = room.get("joined", False)
if current_students == max_students:
BACKROUND_COLOR = "#f10d0c"
TEXT_COLOR = ft.colors.WHITE
user_interface_button_text = ft.Text("Raum bereits voll")
else:
BACKROUND_COLOR = "#729fcf"
TEXT_COLOR = ft.colors.BLACK
if joined == True:
user_interface_button_text = ft.Text("Raum bereits gebucht")
else:
user_interface_button_text = ft.TextButton(
"Jetzt Platz reservieren",
on_click=lambda e, unique_id=unique_id: page_dialog_click(e, unique_id),
style=ft.ButtonStyle(
bgcolor=ft.colors.WHITE,
color=ft.colors.BLACK,
shape=ft.RoundedRectangleBorder(radius=5),
padding=20,
))
room_card = ft.Card(
content=ft.Container(
content=ft.Row(
[
# Room details
ft.Column(
[
ft.Text(f"Heute, {lesson_time} Uhr", size=16, color=TEXT_COLOR),
ft.Text(f"Lehrkraft: {first_name} {last_name} - {subjects}", size=16, color=TEXT_COLOR),
ft.Text(f"Raum {room_name} {room_info} | {location}", size=16, color=TEXT_COLOR),
ft.Text(f"Belegte Plätze: {current_students}/{max_students}", size=16, color=TEXT_COLOR),
],
expand=True,
alignment=ft.MainAxisAlignment.CENTER,
),
# IconButton on the right
user_interface_button_text,
],
alignment=ft.MainAxisAlignment.SPACE_BETWEEN,
),
padding=20, # Add padding inside the card
),
color=BACKROUND_COLOR,
margin=10,
)
room_container.controls.append(room_card)
if not filtered_rooms:
room_container.controls.append(ft.Text("Keine offenen Räume verfügbar für Heute.", size=18))
page.update()
# WebSocket listener for real-time updates
async def listen_for_updates():
uri = "ws://localhost:8000/ws/open_rooms"
headers = {"session-id": page.session.get("access_token")}
async with websockets.connect(uri, extra_headers=headers) as websocket:
print("Connected to WebSocket server")
try:
while True:
# Receive data from the server
data = await websocket.recv()
rooms = json.loads(data)
# Ensure rooms is a list
if isinstance(rooms, dict):
rooms = rooms.get("rooms", [])
elif not isinstance(rooms, list):
rooms = []
# Store rooms data in session for filtering
page.session.set("rooms_data", json.dumps(rooms))
# Extract unique dates and sort them from nearest to longest
unique_dates = list(set(room.get("lesson_date", "") for room in rooms))
unique_dates.sort(key=lambda x: datetime.strptime(x, "%Y-%m-%d")) # Sort dates
# Update the room list
update_room_list(rooms)
except websockets.ConnectionClosed:
print("WebSocket connection closed")
# Start the WebSocket listener in a separate thread
def start_websocket_listener():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(listen_for_updates())
threading.Thread(target=start_websocket_listener, daemon=True).start()
top_bar = ft.Row(
[
search_field,
ft.Row([time_filter, clear_time_filter], spacing=5),
],
alignment=ft.MainAxisAlignment.START,
height=50,
)
lesson_filter = ft.Dropdown(
label="Fach wählen",
options=[
ft.dropdown.Option("De", "De"),
ft.dropdown.Option("Ma", "Ma"),
ft.dropdown.Option("NW", "NW"),
ft.dropdown.Option("En", "En"),
ft.dropdown.Option("Gs", "Gs"),
ft.dropdown.Option("Re", "Re"),
ft.dropdown.Option("WN", "WN"),
ft.dropdown.Option("Fr", "Fr"),
ft.dropdown.Option("Sp", "Sp"),
ft.dropdown.Option("Sn", "Sn"),
],
width=170,
)
clear_lesson_filter = ft.IconButton(
icon=ft.icons.CLOSE,
on_click=lambda e: clear_filter("lesson"),
tooltip="Clear Lesson Filter",
)
show_full_rooms = ft.Checkbox(label="Volle Räume ausblenden ", value=False, label_style=ft.TextStyle(size=20))
middle_bar = ft.Row(
[
show_full_rooms,
ft.Row(
[
ft.Column(
[
ft.Text(
"Nur Räume anzeigen, in denen\nfolgendes Fach angeboten wird:",
size=20,
),
],
),
lesson_filter,
clear_lesson_filter,
],
spacing=5,
alignment=ft.MainAxisAlignment.END,
),
],
alignment=ft.MainAxisAlignment.SPACE_BETWEEN,
height=50,
)
go_home_button = ft.Container(
content=ft.Text("Home", size=24, color=ft.colors.BLUE_900),
on_click=lambda e: page.go("/"),
alignment=ft.alignment.center,
width=200,
height=60,
bgcolor='transparent',
ink=True,
)
navigation_bar = ft.Row(
[go_home_button, ft.Text("Räume suchen", size=23, weight=ft.FontWeight.BOLD, width=170, no_wrap=True)],
)
page.add(
ft.Column(
[
ft.Row(
[
ft.Text(
"Daltonraum-Buchungssystem der IGS Garbsen", size=30),
ft.Column([
ft.PopupMenuButton(
items=[
ft.PopupMenuItem(content=ft.Text(f"Eingeloggt als: {page.session.get("username")}", weight=ft.FontWeight.BOLD)),
ft.PopupMenuItem(text="Profil anzeigen", on_click=test),
ft.PopupMenuItem(text="Ausloggen", on_click=ausloggen),
],
content=ft.CircleAvatar(
content=ft.Text(get_initials(
page.session.get("username"))),
color=ft.colors.WHITE,
bgcolor=get_avatar_color(
page.session.get("username")),
),
menu_position=ft.PopupMenuPosition.UNDER,
tooltip="",
)
],
# Align column content to the end (optional)
alignment=ft.MainAxisAlignment.END,
),
],
alignment=ft.MainAxisAlignment.SPACE_BETWEEN, # Space out the two elements
vertical_alignment=ft.CrossAxisAlignment.CENTER, # Align items vertically
),
navigation_bar,
top_bar,
middle_bar,
ft.Column(
[
room_container,
],
alignment=ft.MainAxisAlignment.CENTER,
expand=True,
),
],
expand=True,
alignment=ft.MainAxisAlignment.CENTER,
)
)

View File

@@ -0,0 +1,79 @@
import flet as ft
import requests
def login_page(page: ft.Page):
page.clean()
page.title = "Login Page"
page.theme_mode = ft.ThemeMode.LIGHT # Set the theme to light
# UI components
username_field = ft.TextField(label="Accountname")
password_field = ft.TextField(label="Passwort", password=True, can_reveal_password=True)
# Create an info text label
info_label = ft.Text("", theme_style=ft.TextThemeStyle.TITLE_SMALL)
# Function to handle login
def login(e):
username = username_field.value.strip() # Strip whitespace
password = password_field.value.strip()
# Validate input
if not username or not password:
info_label.value = "Bitte Benutzername und Passwort eingeben"
info_label.color = "red"
page.update() # Update the page to reflect the changes
return
# Send request over HTTP
try:
url = 'http://awesom-o.org:8000/login'
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
data = {
'username': username,
'password': password
}
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
token_data = response.json()
page.session.set("username", token_data['username'])
page.session.set("access_token", token_data['session_id'])
page.go("/")
else:
error_message = response.json().get("detail")
info_label.value = error_message
info_label.color = "red"
except requests.exceptions.RequestException as e:
info_label.value = f"An error occurred: {str(e)}"
info_label.color = "red"
# Update the page to reflect the changes
page.update()
# Function to navigate to the register page
def go_to_register(e):
page.go("/register_student") # Replace with the appropriate route for your register page
# Add components to the page
page.add(
ft.Text("Daltonraum-Buchungssystem der IGS Garbsen", size=30),
ft.Text("Login", size=23, weight=ft.FontWeight.BOLD),
username_field,
password_field,
ft.ElevatedButton("Anmelden", on_click=login),
info_label, # Add the info label to the page
ft.Row([
ft.TextButton("Noch nicht registriert? Registriere dich hier!", on_click=go_to_register),
])
)
# You can run the login page separately or as part of your main app.
if __name__ == "__main__":
ft.app(target=login_page, view=ft.AppView.WEB_BROWSER)

272
frontend/pages/main_page.py Normal file
View File

@@ -0,0 +1,272 @@
import flet as ft
import requests
import threading
import asyncio
import websockets
from websockets import connect # Ensure this is imported
import json
from datetime import datetime
from pages.student_main_page import student_main_page
def main_page(page: ft.Page):
page.clean()
page.theme_mode = ft.ThemeMode.LIGHT
session = page.session.get("access_token")
if not session:
page.go("/login") # Redirect to login if no session
def is_student():
url = 'http://awesom-o.org:8000/check_role'
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
session_id = page.session.get("access_token")
if not session_id:
return None
data = {'session_id': session_id}
try:
response = requests.post(url, json=data, headers=headers)
response.raise_for_status()
role = response.json()
if role and 'role' in role and 'role' in role['role']:
if role['role']['role'] == "student":
role = "s"
return role
elif role['role']['role'] == "teacher":
role = "t"
return role
print("Unexpected response structure:", role)
return None
except requests.exceptions.RequestExceptin as error:
print("Error during role check:", error)
return None
page.title = "Room Information"
page.vertical_alignment = ft.MainAxisAlignment.START # Align all content to the top
if is_student() == "s":
student_main_page(page)
elif is_student() == "t":
async def fetch_room_data(session_id: str, update_room_list):
uri = "ws://localhost:8000/ws/teacher/open_rooms"
headers = {"session-id": session_id} # Add session ID to headers
async with websockets.connect(uri, extra_headers=headers) as websocket:
print("WebSocket connected")
try:
while True:
# Receive data from the WebSocket
data = await websocket.recv()
room_data = json.loads(data)
open_rooms = room_data.get("open_rooms", [])
# Update the room list
update_room_list(open_rooms)
except websockets.ConnectionClosed:
print("WebSocket connection closed")
except Exception as e:
print(f"WebSocket error: {e}")
# Function to fetch students in a room
def fetch_students_in_room(unique_id: str, session_id: str):
url = f"http://awesom-o.org:8000/teacher/room_students/?unique_id={unique_id}"
headers = {"accept": "application/json", "Content-Type": "application/json"}
data = {"session_id": session_id}
try:
response = requests.post(url, json=data, headers=headers)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"Error fetching students: {e}")
return None
# Function to delete a room
def delete_room(unique_id: str, session_id: str):
url = f"http://awesom-o.org:8000/teacher/delete_room/?unique_id={unique_id}"
headers = {"accept": "application/json", "Content-Type": "application/json"}
data = {"session_id": session_id}
try:
response = requests.delete(url, json=data, headers=headers)
response.raise_for_status()
return True
except requests.RequestException as e:
print(f"Error deleting room: {e}")
return False
# Main Flet app
page.title = "Room Management"
page.vertical_alignment = ft.MainAxisAlignment.START # Align content to the top
page.horizontal_alignment = ft.CrossAxisAlignment.CENTER
page.padding = 20
page.scroll = "adaptive"
# Check if the user is logged in
session_id = page.session.get("access_token")
if not session_id:
page.go("/login") # Redirect to login if no session
# Search field
search_field = ft.TextField(label="Search", expand=True)
# Icon button to navigate to "/create" page
create_button = ft.IconButton(
icon=ft.icons.ADD,
on_click=lambda _: page.go("/create"),
)
# Container for the room list
room_container = ft.Column(scroll=ft.ScrollMode.AUTO, expand=True)
# Function to update the room list
def update_room_list(rooms):
room_container.controls.clear()
for room in rooms:
room_name = room.get("room_name", "Unknown Room")
room_info = room.get("info", "")
location = room.get("location", "")
teacher_name = room.get("teacher_name", "")
max_students = room.get("max_students", 0)
current_students = room.get("current_students", 0)
unique_id = room.get("unique_id", "")
# Create a clickable room card
room_card = ft.GestureDetector(
content=ft.Card(
content=ft.Container(
content=ft.Column(
[
# Top line: room_name, room_info, location
ft.Row(
[
ft.Text(f"{room_name}", size=20),
ft.Text(f"{room_info}", size=16),
ft.Text(f"| {location}", size=16),
],
spacing=5,
),
# Teacher line
ft.Text(f"Teacher: {teacher_name}", size=16),
# Capacity line
ft.Text(f"Capacity: {current_students}/{max_students}", size=16),
],
spacing=10,
),
padding=20,
),
margin=10,
elevation=5,
),
on_tap=lambda e, unique_id=unique_id: open_room_dialog(unique_id),
)
room_container.controls.append(room_card)
if not rooms:
room_container.controls.append(ft.Text("No open rooms available.", size=18))
page.update()
# Function to open the room dialog
def open_room_dialog(unique_id: str):
# Fetch students in the room
students_data = fetch_students_in_room(unique_id, session_id)
if not students_data:
page.snack_bar = ft.SnackBar(ft.Text("Failed to fetch students."))
page.snack_bar.open = True
page.update()
return
students = students_data.get("students", [])
if students != None:
students_list = ft.Column(
[ft.Text(f"Student: {student}") for student in students],
scroll=ft.ScrollMode.AUTO,
expand=True,
)
else:
students_list = ft.Column(ft.Text(f"There are no students who joined your Room"),
scroll=ft.ScrollMode.AUTO,
expand=True,
)
# Confirmation dialog for deleting the room
def open_delete_confirmation_dialog():
page.close(dialog)
def confirm_delete(e):
if delete_room(unique_id, session_id):
page.snack_bar = ft.SnackBar(ft.Text("Room deleted successfully."))
page.snack_bar.open = True
page.update()
page.close(confirm_dialog)
page.update()
else:
page.snack_bar = ft.SnackBar(ft.Text("Failed to delete room."))
page.snack_bar.open = True
page.update()
confirm_dialog = ft.AlertDialog(
title=ft.Text("Are you sure you want to delete this room?"),
actions=[
ft.TextButton("No", on_click=lambda e: page.close(confirm_dialog)),
ft.TextButton("Yes", on_click=confirm_delete),
],
)
page.dialog = confirm_dialog
confirm_dialog.open = True
page.update()
# Room dialog
dialog = ft.AlertDialog(
title=ft.Text("Room Details"),
content=students_list,
actions=[
ft.TextButton("Close", on_click=lambda e: page.close(dialog)),
ft.TextButton("Delete Room", on_click=lambda e: open_delete_confirmation_dialog()),
],
)
page.dialog = dialog
dialog.open = True
page.update()
# WebSocket listener for real-time updates
async def listen_for_updates():
while True:
try:
await fetch_room_data(session_id, update_room_list)
except Exception as e:
print(f"WebSocket connection error: {e}. Reconnecting in 5 seconds...")
await asyncio.sleep(5) # Wait before reconnecting
# Top bar layout
top_bar = ft.Row(
[
search_field,
create_button,
],
alignment=ft.MainAxisAlignment.SPACE_BETWEEN,
vertical_alignment=ft.CrossAxisAlignment.CENTER,
)
# Add components to the page
page.add(
top_bar,
room_container,
)
# Start the WebSocket listener
page.run_task(listen_for_updates)
else:
page.go("/login")

View File

@@ -0,0 +1,94 @@
import flet as ft
import requests
import json
def register_student_page(page: ft.Page):
page.clean()
page.title = "Register Page"
page.theme_mode = ft.ThemeMode.LIGHT
# UI components
username_field = ft.TextField(label="Accountname (darfst du dir ausdenken)")
first_name_field = ft.TextField(label="Vorname")
last_name_field = ft.TextField(label="Nachname")
user_class_field = ft.TextField(label="Klasse")
password_field0 = ft.TextField(label="Passwort", password=True, can_reveal_password=True)
password_field1 = ft.TextField(label="Passwort bestätigen", password=True, can_reveal_password=True)
# Create an info text label
info_label = ft.Text("", theme_style=ft.TextThemeStyle.TITLE_SMALL)
# Function to handle registration
def register(e):
username = username_field.value.strip()
password0 = password_field0.value.strip()
first_name = first_name_field.value.strip()
last_name = last_name_field.value.strip()
user_class = user_class_field.value.strip()
password1 = password_field1.value.strip()
# Validate input
if not username or not password0 or not password1 or not first_name or not last_name or not user_class :
info_label.value = "Bitte fülle alle lücken aus"
info_label.color = "red"
page.update()
return
if not password0 == password1:
info_label.value = "Passwörter stimmen nicht überein"
info_label.color = "red"
page.update()
return
# Send request over HTTP
try:
url = 'http://awesom-o.org:8000/student/register'
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
data = {
'username': username,
'first_name': first_name,
'last_name': last_name,
'user_class': user_class,
'password': password0,
}
response = requests.post(url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
session_id = response.json().get("session_id")
username = response.json().get("username")
page.session.set("access_token", session_id)
page.session.set("username", username)
page.go("/")
else:
error_message = response.json().get("detail", "Registrierung fehlgeschlagen")
info_label.value = error_message
info_label.color = "red"
except requests.exceptions.RequestException as e:
info_label.value = f"An error occurred: {str(e)}"
info_label.color = "red"
page.update()
page.add(
ft.Text("Daltonraum-Buchungssystem der IGS Garbsen", size=30),
ft.Text("Registrieren", size=23, weight=ft.FontWeight.BOLD),
username_field,
first_name_field,
last_name_field,
user_class_field,
password_field0,
password_field1,
ft.ElevatedButton("Registrieren", on_click=register),
info_label, # Add the info label to the page
ft.TextButton("Bereits registriert? Zurück zur Login-Seite.", on_click=lambda _: page.go("/login")) # Link to login
)
# You can run the registration page separately or as part of your main app.
if __name__ == "__main__":
ft.app(target=register_student_page, view=ft.AppView.WEB_BROWSER)

View File

@@ -0,0 +1,180 @@
import flet as ft
import requests
def register_teacher_page(page: ft.Page):
page.clean()
page.title = "Register Page"
page.theme_mode = ft.ThemeMode.LIGHT
subjects_list = []
row = ft.Row()
# UI components
username_field = ft.TextField(label="Benutzername")
first_name_field = ft.TextField(label="Vorname")
last_name_field = ft.TextField(label="Nachname")
password_field0 = ft.TextField(label="Passwort", password=True, can_reveal_password=True)
password_field1 = ft.TextField(label="Passwort bestätigen", password=True, can_reveal_password=True)
secret_teacher_password_field = ft.TextField(label="Secret Teacher Password", password=True, can_reveal_password=True)
info_label = ft.Text("", theme_style=ft.TextThemeStyle.TITLE_SMALL)
def delete_subject(dropdown, delete_button):
if dropdown.value in subjects_list:
subjects_list.remove(dropdown.value)
row.controls.remove(dropdown)
row.controls.remove(delete_button)
page.update()
refresh_dropdowns()
def append_subject(e=None, dropdown=None, plus_button=None):
if not dropdown.value or dropdown.value in subjects_list or dropdown.value == "Wähle deine Fächer aus":
dropdown.value = "" # Reset if empty or duplicate
page.update()
return
subjects_list.append(dropdown.value)
# Disable current dropdown and button after selection
dropdown.disabled = True
plus_button.visible = False
# Add new dropdown to the same row
add_new_dropdown()
page.update()
def on_dropdown_change(e, dropdown, plus_button):
if dropdown.value and dropdown.value not in subjects_list and dropdown.value != "Wähle deine Fächer aus":
subjects_list.append(dropdown.value)
dropdown.disabled = True
plus_button.visible = False
add_new_dropdown()
page.update()
def refresh_dropdowns():
for control in row.controls:
if isinstance(control, ft.Dropdown) and not control.disabled:
available_subjects = get_available_subjects()
control.options = [
ft.dropdown.Option("Wähle deine Fächer aus", disabled=True)
] + [
ft.dropdown.Option(subj) for subj in available_subjects
]
page.update()
def get_available_subjects():
available_subjects = [
"De", "En", "Fr", "Gs", "Ma", "NW", "Re", "Sn", "Sp", "Wn"
]
return [subj for subj in available_subjects if subj not in subjects_list]
def add_new_dropdown():
options = [
ft.dropdown.Option("Wähle deine Fächer aus", disabled=True)
] + [
ft.dropdown.Option(subj) for subj in get_available_subjects()
]
dropdown = ft.Dropdown(
width=250,
options=options,
value="Wähle deine Fächer aus",
on_change=lambda e: on_dropdown_change(e, dropdown, plus_button)
)
plus_button = ft.IconButton(
icon=ft.icons.ADD,
visible=False # Initially hidden
)
delete_button = ft.IconButton(
icon=ft.icons.CLOSE_OUTLINED,
on_click=lambda e: delete_subject(dropdown, delete_button)
)
plus_button.on_click = lambda e: append_subject(e, dropdown, plus_button)
# Add dropdown and buttons to the row
row.controls.append(dropdown)
row.controls.append(plus_button)
row.controls.append(delete_button)
page.update()
def register(e):
print(subjects_list)
username = username_field.value.strip()
password0 = password_field0.value.strip()
password1 = password_field1.value.strip()
secret_teacher_password = secret_teacher_password_field.value.strip()
last_name = last_name_field.value.strip()
first_name = first_name_field.value.strip()
subjects_string = ", ".join(subjects_list)
if not username or not password0 or not secret_teacher_password or not password1 or not last_name or not first_name or not subjects_string:
info_label.value = "Bitte fülle alle Felder aus"
info_label.color = "red"
page.update()
#page.clean()
#register_teacher_page(page=page)
return
if not password0 == password1:
info_label.value = "Passwörter stimmen nicht überein"
info_label.color = "red"
page.update()
return
try:
url = 'http://awesom-o.org:8000/teacher/register'
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
data = {
'username': username,
'password': password0,
'teacher_secret_password': secret_teacher_password,
'first_name': first_name,
'last_name': last_name,
'subjects': subjects_string
}
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
session_id = response.json().get("session_id")
username = response.json().get("username")
page.session.set("access_token", session_id)
page.session.set("username", username)
page.go("/")
else:
error_message = response.json().get("detail", "Registrierung fehlgeschlagen")
info_label.value = error_message
info_label.color = "red"
except requests.exceptions.RequestException as e:
info_label.value = f"An error occurred: {str(e)}"
info_label.color = "red"
page.update()
page.add(
ft.Text("Daltonraum-Buchungssystem der IGS Garbsen", size=30),
ft.Text("Registrieren", size=23, weight=ft.FontWeight.BOLD),
username_field,
ft.Row([
first_name_field,
last_name_field
]),
row,
password_field0,
password_field1,
secret_teacher_password_field,
ft.ElevatedButton("Registrieren", on_click=register),
info_label,
ft.TextButton("Kein Lehrer? Klicke hier!", on_click=lambda _: page.go("/register_student"))
)
add_new_dropdown()

View File

@@ -0,0 +1,343 @@
import flet as ft
import requests
import threading
import asyncio
import websockets
from websockets import connect # Ensure this is imported
import json
from datetime import datetime
def get_initials(user_name: str):
if user_name:
return user_name[0].capitalize()
else:
return "E" # or any default value you prefer
def get_avatar_color(user_name: str):
colors_lookup = [
ft.colors.AMBER,
ft.colors.BLUE,
ft.colors.BROWN,
ft.colors.CYAN,
ft.colors.GREEN,
ft.colors.INDIGO,
ft.colors.LIME,
ft.colors.ORANGE,
ft.colors.PINK,
ft.colors.PURPLE,
ft.colors.RED,
ft.colors.TEAL,
]
return colors_lookup[hash(user_name) % len(colors_lookup)]
def student_main_page(page: ft.Page):
page.clean()
page.title = "Room Information"
page.theme_mode = ft.ThemeMode.LIGHT
page.vertical_alignment = ft.MainAxisAlignment.START # Align all content to the top
page.scroll = "adaptive"
page.padding = 20
# Container for lessons
lessons_container = ft.Column(
alignment=ft.MainAxisAlignment.CENTER, spacing=20)
def test(e):
print(e)
page.go("/test")
print("test")
def ausloggen(e=None):
page.go("/login")
print(page.session.get("access_token"))
def page_dialog_click(e, lesson_id):
dialog_leave = ft.AlertDialog(
modal=True,
title=ft.Text("Bitte bestätige"),
content=ft.Text("Möchtest du diesem Raum wirklich verlassen?"),
actions=[
ft.TextButton("Ja", on_click=lambda e: (page.close(
dialog_leave), leave_current_room(e, lesson_id))),
ft.TextButton(
"Nein", on_click=lambda e: page.close(dialog_leave)),
],
actions_alignment=ft.MainAxisAlignment.END,
)
page.dialog = dialog_leave
dialog_leave.open = True
page.update()
def leave_current_room(e=None, unique_id=None):
if not unique_id:
page.snack_bar = ft.SnackBar(
ft.Text("Kein Raum zum verlassen ausgewählt"))
page.snack_bar.open = True
page.update()
return
url = f"http://awesom-o.org:8000/student/leave_room/?unique_id={
unique_id}"
headers = {"accept": "application/json",
"Content-Type": "application/json"}
data = {"session_id": page.session.get("access_token")}
try:
response = requests.post(url, json=data, headers=headers)
response.raise_for_status()
if response:
get_rooms()
page.snack_bar = ft.SnackBar(
ft.Text("Raum erfolgreich verlassen"))
page.snack_bar.open = True
page.update()
get_rooms()
except requests.RequestException as error:
# Handle all bad responses
error_detail = "Something went wrong."
if hasattr(error, "response") and error.response:
try:
error_detail = error.response.json().get("detail", "Something went wrong.")
except json.JSONDecodeError:
error_detail = error.response.text or "Etwas ist schiefgelaufen"
page.snack_bar = ft.SnackBar(ft.Text(f"Error: {error_detail}"))
page.snack_bar.open = True
page.update()
async def get_rooms():
uri = "ws://localhost:8000/ws/student/my_room"
headers = {"session-id": page.session.get("access_token")}
try:
async with websockets.connect(uri, extra_headers=headers) as websocket:
while True:
message = await websocket.recv()
data = json.loads(message)
# Clear previous lessons
lessons_container.controls.clear()
if "error" in data:
# Show "No Rooms for Today" message
lessons_container.controls.append(
ft.Container(
content=ft.Text(
"Für heute wurden noch keine Räume ausgewählt.",
size=25,
text_align="center",
),
border=ft.Border(
top=ft.BorderSide(1, ft.colors.BLACK),
bottom=ft.BorderSide(1, ft.colors.BLACK),
left=ft.BorderSide(1, ft.colors.BLACK),
right=ft.BorderSide(1, ft.colors.BLACK)
),
border_radius=20,
padding=10,
height=150,
bgcolor=ft.colors.WHITE,
alignment=ft.alignment.center,
)
)
else:
rooms = data.get("rooms", [])
# Filter and sort lessons for today
today = datetime.now().date()
today_lessons = [
room for room in rooms if room["lesson_date"] == str(today)]
today_lessons.sort(key=lambda x: x["lesson_time"])
# Check if there are rooms on other dates
other_date_rooms = [
room for room in rooms if room["lesson_date"] != str(today)]
if today_lessons:
for lesson in today_lessons:
if lesson['current_students'] == lesson['max_students']:
BACKROUND_COLOR = "#f10d0c"
TEXT_COLOR = ft.colors.WHITE
else:
BACKROUND_COLOR = "#729fcf"
TEXT_COLOR = ft.colors.BLACK
# Create lesson card
room_card = ft.Card(
content=ft.Container(
content=ft.Row(
[
# Room details
ft.Column(
[
# Top line:
# room_number, info, location
ft.Text(
f"Heute, {
lesson['lesson_time']} Uhr",
size=16,
color=TEXT_COLOR
),
# Second line: first_name and last_name
ft.Text(
f"Lehrkraft: {lesson['first_name']} {
lesson['last_name']} - {lesson['subjects']}",
size=16,
color=TEXT_COLOR
),
# Third line: lesson_time
ft.Text(
f"Raum {lesson['room_number']} {
lesson['info']} | {lesson['location']}",
size=16,
color=TEXT_COLOR
),
ft.Text(
f"Belegte Plätze: {
lesson['current_students']}/{lesson['max_students']}",
size=16,
color=TEXT_COLOR
),
],
expand=True,
alignment=ft.MainAxisAlignment.CENTER,
),
# IconButton on the right
ft.Column(
[
ft.TextButton("Aus Raum abmelden", on_click=lambda e, lesson_id=lesson['unique_id']: page_dialog_click(e, lesson_id),
style=ft.ButtonStyle(
bgcolor=ft.colors.WHITE,
color=ft.colors.BLACK,
shape=ft.RoundedRectangleBorder(
radius=5),
padding=20,
)
),
ft.Text("ACHTUNG: Hierdurch wird", size=12), ft.Text(
"dein reservierter Platz für", size=12), ft.Text("jemand anderes freigegeben.", size=12),
]
)
],
alignment=ft.MainAxisAlignment.SPACE_BETWEEN,
),
padding=20, # Add padding inside the card
),
color=BACKROUND_COLOR,
margin=10,
)
lessons_container.controls.append(room_card)
else:
# Show "No Rooms for Today" message
lessons_container.controls.append(
ft.Container(
content=ft.Text(
"Für heute wurden noch keine Räume ausgewählt.",
size=25,
text_align="center",
),
border=ft.Border(
top=ft.BorderSide(1, ft.colors.BLACK),
bottom=ft.BorderSide(
1, ft.colors.BLACK),
left=ft.BorderSide(1, ft.colors.BLACK),
right=ft.BorderSide(1, ft.colors.BLACK)
),
border_radius=20,
padding=10,
height=150,
bgcolor=ft.colors.WHITE,
alignment=ft.alignment.center,
)
)
# Update page
page.update()
except Exception as e:
lessons_container.controls.clear()
lessons_container.controls.append(
ft.Text(f"Error: {e}", size=50, weight="bold", text_align="center"))
page.update()
def start_get_rooms():
# Ensure a new event loop in the thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(get_rooms())
# Start the WebSocket client in a thread
import threading
threading.Thread(target=start_get_rooms, daemon=True).start()
join_more_rooms_button = ft.Container(
content=ft.Text("Räume suchen", size=24, color=ft.colors.BLUE_900),
on_click=lambda e: page.go("/join"),
alignment=ft.alignment.center,
width=200,
height=60,
bgcolor='transparent',
ink=True,
)
navigation_bar = ft.Row(
[ft.Text("Home", size=23, weight=ft.FontWeight.BOLD,
width=100), join_more_rooms_button],
)
# Add components to page
page.add(
ft.Column(
[
ft.Row(
[
ft.Text(
"Daltonraum-Buchungssystem der IGS Garbsen", size=30),
ft.Column([
ft.PopupMenuButton(
items=[
ft.PopupMenuItem(content=ft.Text(f"Eingeloggt als: {page.session.get("username")}", weight=ft.FontWeight.BOLD)),
ft.PopupMenuItem(text="Profil anzeigen", on_click=test),
ft.PopupMenuItem(text="Ausloggen", on_click=ausloggen),
],
content=ft.CircleAvatar(
content=ft.Text(get_initials(
page.session.get("username"))),
color=ft.colors.WHITE,
bgcolor=get_avatar_color(
page.session.get("username")),
),
menu_position=ft.PopupMenuPosition.UNDER,
tooltip="",
)
],
# Align column content to the end (optional)
alignment=ft.MainAxisAlignment.END,
),
],
alignment=ft.MainAxisAlignment.SPACE_BETWEEN, # Space out the two elements
vertical_alignment=ft.CrossAxisAlignment.CENTER, # Align items vertically
),
navigation_bar,
ft.Row( # Wrap the Text in a Row
[
ft.Text(
"Du hast dich heute in folgende Daltonräume eingetragen:", size=25)
],
alignment=ft.MainAxisAlignment.CENTER # Center the Text horizontally
),
ft.Column(
[
lessons_container,
],
alignment=ft.MainAxisAlignment.CENTER,
expand=True, # Make it take available space vertically
),
],
expand=True, # This makes the outer column take available space as well
alignment=ft.MainAxisAlignment.CENTER, # Center the whole column vertically
)
)

Binary file not shown.

15
frontend/pages/test.py Normal file
View File

@@ -0,0 +1,15 @@
import flet as ft
def main(page: ft.Page):
# Button click handler to launch the download URL
def on_download_click(e):
# Launch the URL to trigger file download
page.launch_url("http://awesom-o.org:8000/download/tedt.pdf")
# Create an ElevatedButton that will trigger the download
download_button = ft.ElevatedButton("Download myfile", on_click=on_download_click)
# Add the button to the page
page.add(download_button)
ft.app(target=main)

996
main.py Normal file
View File

@@ -0,0 +1,996 @@
import sqlite3
from datetime import datetime, timedelta, time
from fastapi import FastAPI, HTTPException, Depends, WebSocket, WebSocketDisconnect, Header
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from fastapi.responses import FileResponse
from typing import List, Dict, Optional
import uuid
import os
import json
import asyncio
from starlette.websockets import WebSocketState
from fpdf import FPDF
import tempfile
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# SQLite database setup
DATABASE_NAME = "school.db"
SUBJECTS = ["De", "Ma", "NW", "En", "Gs", "Re", "WN", "Fr", "Sp", "Sn"]
ROOM_NAMES = [
{"number": "1", "room_number": "0201", "info": "(Hörsaal)", "location": "Osttrakt"},
{"number": "2", "room_number": "0202", "info": "(NW)", "location": "Osttrakt"},
{"number": "3", "room_number": "0203", "info": "(NW)", "location": "Osttrakt"},
{"number": "4", "room_number": "0204", "info": "(NW)", "location": "Osttrakt"},
{"number": "5", "room_number": "0205", "info": "(NW)", "location": "Osttrakt"},
{"number": "6", "room_number": "0214", "info": "(NW)", "location": "Lichthof"},
{"number": "7", "room_number": "0215", "info": "(NW)", "location": "Lichthof"},
{"number": "8", "room_number": "0216", "info": "(NW)", "location": "Flur zum Lichthof"},
{"number": "9", "room_number": "0217", "info": "(NW)", "location": "Flur zum Lichthof"},
{"number": "10", "room_number": "0265", "info": "(NW)", "location": "Lichthof"},
{"number": "11", "room_number": "0266", "info": "(NW)", "location": "Flur zum Lichthof"},
{"number": "12", "room_number": "0267", "info": "(NW)", "location": "Flur zum Lichthof"},
{"number": "13", "room_number": "1860", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "14", "room_number": "1861", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "15", "room_number": "1862", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "16", "room_number": "1863", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "17", "room_number": "1864", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "18", "room_number": "1865", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "19", "room_number": "1866", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "20", "room_number": "1867", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "21", "room_number": "1868", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "22", "room_number": "1869", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "23", "room_number": "1870", "info": "Jahrgang Südwest", "location": "1. OG"},
{"number": "24", "room_number": "2860", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "25", "room_number": "2861", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "26", "room_number": "2862", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "27", "room_number": "2863", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "28", "room_number": "2864", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "29", "room_number": "2865", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "30", "room_number": "2866", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "31", "room_number": "2867", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "32", "room_number": "2868", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "33", "room_number": "2869", "info": "Jahrgang Südwest", "location": "2. OG"},
{"number": "34", "room_number": "2870", "info": "Jahrgang Südwest", "location": "2. OG"}
]
room_name_dict = {room['number']: room for room in ROOM_NAMES}
active_connections: Dict[str, WebSocket] = {}
#CURRENT_TIME_HOUR_MINUTE = datetime.now().strftime("%H:%M")
CURRENT_DATE = datetime.now().strftime("%Y-%m-%d")
CURRENT_TIME_HOUR_MINUTE = "9:10"
# Helper function to connect to the database
def get_db_connection():
conn = sqlite3.connect(DATABASE_NAME)
conn.row_factory = sqlite3.Row
return conn
# Database tables creation on startup
def create_tables():
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS rooms (
room_number TEXT NOT NULL,
max_students INTEGER NOT NULL,
teacher_name TEXT NOT NULL,
is_open BOOLEAN NOT NULL,
lesson_date TEXT NOT NULL, -- Can also be DATETIME if you need both date and time
lesson_time TEXT NOT NULL,
unique_id TEXT PRIMARY KEY, -- Ensure unique_id is the primary key
UNIQUE(room_number, lesson_date, lesson_time) -- Composite unique constraint
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS student_joins (
student_username TEXT NOT NULL,
room_unique_id TEXT NOT NULL,
join_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (student_username, room_unique_id),
FOREIGN KEY (student_username) REFERENCES students(username) ON DELETE CASCADE,
FOREIGN KEY (room_unique_id) REFERENCES rooms(unique_id) ON DELETE CASCADE
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS students (
username TEXT PRIMARY KEY NOT NULL,
password TEXT NOT NULL,
first_name TEXT,
last_name TEXT,
user_class TEXT,
unique_id TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS teachers (
username TEXT PRIMARY KEY,
password TEXT,
first_name TEXT,
last_name TEXT,
subjects TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
username TEXT,
role TEXT
)
''')
# Clear all rooms and sessions on startup
cursor.execute("DELETE FROM rooms")
cursor.execute("DELETE FROM sessions")
cursor.execute("DELETE FROM student_joins")
conn.commit()
conn.close()
# Models
class RoomCreate(BaseModel):
room_number: str
max_students: int
session_id: str
lesson_time: str
lesson_date: str
class Student(BaseModel):
username: str
class RoomInfo(BaseModel):
room_number: str
max_students: int
current_students: List[str]
teacher_name: str
class User_register(BaseModel):
username: str
first_name: str
last_name: str
user_class: str
password: str
class User(BaseModel):
username: str
password: str
class Session(BaseModel):
session_id: str
class Teacher_User(BaseModel):
username: str
password: str
teacher_secret_password: str
first_name: str
last_name: str
subjects: str
def current_role(session_id: str):
conn = get_db_connection()
cursor = conn.cursor()
# Check if the session ID exists in the sessions table
cursor.execute("SELECT role FROM sessions WHERE session_id = ?", (session_id,))
role: list[sqlite3.Row] = cursor.fetchone()
conn.close()
return role
# Helper functions
def get_current_user(session_id: str):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM sessions WHERE session_id = ?", (session_id,))
session = cursor.fetchone()
if session is None:
raise HTTPException(status_code=404, detail="Session not found")
conn.close()
return session
def subjects_avaible(liste, worte):
for fach in worte:
if fach not in liste:
return False, fach # Gibt False und das nicht gefundene Fach zurück
return True, None
async def check_lesson_time():
print(f"Checking for the event at {datetime.now()}...")
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(
"""
UPDATE rooms
SET is_open = ?
WHERE lesson_date = ? AND lesson_time = ?
""",
(False, CURRENT_DATE, CURRENT_TIME_HOUR_MINUTE)
)
conn.commit()
conn.close()
event_happened = False # Replace with your actual condition
if event_happened:
print("Event occurred!")
return True
return False
except:
print("no rooms")
# Calculate seconds until the next target time
def seconds_until_next_interval():
now = datetime.now()
next_minute = (now.minute // 5 + 1) * 5 # Next multiple of 5 minutes
if next_minute >= 60: # Handle hour overflow
next_hour = now.hour + 1 if now.hour < 23 else 0
next_time = datetime(now.year, now.month, now.day, next_hour, 0, 0)
else:
next_time = datetime(now.year, now.month, now.day, now.hour, next_minute, 0)
return (next_time - now).total_seconds()
# Background task to check the event based on real-world time
async def periodic_event_checker():
while True:
try:
event_detected = await check_lesson_time()
if event_detected:
print("Event detected! Taking action.")
# Add any action you want to take when the event occurs
# You can also exit the loop if needed
break
sleep_time = seconds_until_next_interval()
print(f"Sleeping for {sleep_time:.2f} seconds until the next check.")
await asyncio.sleep(sleep_time)
except Exception as e:
print(f"Error in background task: {e}")
await asyncio.sleep(5 * 60) # Retry after 5 minutes in case of an error
import logging
logging.basicConfig(level=logging.DEBUG)
def is_open_update(unique_id: str):
conn = get_db_connection()
cursor = conn.cursor()
# Check the current number of students in the room
cursor.execute("""
SELECT COUNT(s.username) AS current_students,
COALESCE(r.max_students, 100) AS max_students,
COALESCE(r.is_open, 0) AS is_open
FROM students s
JOIN student_joins sj ON s.username = sj.student_username
JOIN rooms r ON r.unique_id = sj.room_unique_id
WHERE sj.room_unique_id = ?
""", (unique_id,))
result = cursor.fetchone()
if result is None:
logging.debug(f"No room found with unique_id: {unique_id}")
conn.close()
return
current_students = int(result['current_students'])
max_students = int(result['max_students'])
current_is_open = int(result['is_open'])
logging.debug(f"current_students: {current_students}, max_students: {max_students}, current_is_open: {current_is_open}")
# If the number of students exceeds or equals the max students, close the room
if current_students >= max_students:
if current_is_open != 0: # Check if the room is not already closed
cursor.execute("""
UPDATE rooms
SET is_open = 0
WHERE unique_id = ?
""", (unique_id,))
conn.commit()
else:
if current_is_open != 1: # Check if the room is not already
cursor.execute("""
UPDATE rooms
SET is_open = 1
WHERE unique_id = ?
""", (unique_id,))
conn.commit()
conn.close()
# Clear all data when the application starts
@app.on_event("startup")
def startup_event():
create_tables()
asyncio.create_task(periodic_event_checker())
@app.post("/teacher/register")
def teacher_register(user: Teacher_User):
conn = get_db_connection()
cursor = conn.cursor()
# Check if username exists in either teachers or students
cursor.execute("SELECT * FROM teachers WHERE username = ?", (user.username,))
if cursor.fetchone():
conn.close()
raise HTTPException(status_code=400, detail="Username already exists as a teacher")
cursor.execute("SELECT * FROM students WHERE username = ?", (user.username,))
if cursor.fetchone():
conn.close()
raise HTTPException(status_code=400, detail="Username already exists as a student")
if user.teacher_secret_password != "chrissi":
conn.close()
raise HTTPException(status_code=403, detail="Incorrect teacher secret password")
teacher_subjects = [fach.strip() for fach in user.subjects.split(",")]
result, error_subject = subjects_avaible(SUBJECTS, teacher_subjects)
if result == False:
return{"message": "Unavaible Subject", "subject": error_subject}
cursor.execute("INSERT INTO teachers (username, password, first_name, last_name, subjects) VALUES (?, ?, ?, ?, ?)", (user.username, user.password, user.first_name, user.last_name, user.subjects))
conn.commit()
# Create session for teacher
session_id = str(uuid.uuid4())
cursor.execute("INSERT INTO sessions (session_id, username, role) VALUES (?, ?, ?)", (session_id, user.username, "teacher"))
conn.commit()
conn.close()
return {"message": "Teacher registered successfully", "session_id": session_id, "username": user.username}
@app.post("/student/register")
def student_register(user: User_register):
conn = get_db_connection()
cursor = conn.cursor()
# Check if username exists in either students or teachers
cursor.execute("SELECT * FROM students WHERE username = ?", (user.username,))
if cursor.fetchone():
conn.close()
raise HTTPException(status_code=400, detail="Benutzername bereits vergeben")
cursor.execute("SELECT * FROM teachers WHERE username = ?", (user.username,))
if cursor.fetchone():
conn.close()
raise HTTPException(status_code=400, detail="Benutzername bereits vergeben")
cursor.execute("INSERT INTO students (username, password, first_name, last_name, user_class) VALUES (?, ?, ?, ?, ?)", (user.username, user.password, user.first_name, user.last_name, user.user_class))
conn.commit()
# Create session for student
session_id = str(uuid.uuid4())
cursor.execute("INSERT INTO sessions (session_id, username, role) VALUES (?, ?, ?)", (session_id, user.username, "student"))
conn.commit()
conn.close()
return {"message": "Schüler erfolgreich registriert", "session_id": session_id, "username": user.username}
@app.post("/login")
def login(user: User):
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM teachers WHERE username = ?", (user.username,))
teacher = cursor.fetchone()
if teacher:
if teacher["password"] != user.password:
conn.close()
raise HTTPException(status_code=403, detail="Anmeldung Fehlgeschlagen! Falscher Benutzername oder falsches Passwort")
role = "teacher"
else:
cursor.execute("SELECT * FROM students WHERE username = ?", (user.username,))
student = cursor.fetchone()
if student:
if student["password"] != user.password:
conn.close()
raise HTTPException(status_code=403, detail="Anmeldung Fehlgeschlagen! Falscher Benutzername oder falsches Passwort")
role = "student"
else:
conn.close()
raise HTTPException(status_code=404, detail="Anmeldung Fehlgeschlagen! Falscher Benutzername oder falsches Passwort")
session_id = str(uuid.uuid4())
cursor.execute("INSERT INTO sessions (session_id, username, role) VALUES (?, ?, ?)", (session_id, user.username, role))
conn.commit()
conn.close()
return {"session_id": session_id, "username": user.username}
@app.post("/teacher/create_room")
def create_room(room: RoomCreate):
lesson_date = datetime.strptime(room.lesson_date, "%Y-%m-%d")
today = datetime.today().date()
if lesson_date.date() < today:
raise HTTPException(status_code=400, detail="Lesson date cannot be in the past")
# Check if the lesson date is today, but the lesson time has already passed
if lesson_date.date() == today:
lesson_time = datetime.strptime(room.lesson_time, "%H:%M").time()
current_time = datetime.now().time()
#if lesson_time < current_time:
# raise HTTPException(status_code=400, detail="Lesson time has already passed today")
user = get_current_user(room.session_id)
if user['role'] != "teacher":
raise HTTPException(status_code=403, detail="Only teachers can create rooms")
unique_id = str(uuid.uuid4())
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM rooms WHERE room_number = ? AND lesson_date = ? AND lesson_time = ?", (room.room_number, room.lesson_date, room.lesson_time,))
if cursor.fetchone():
conn.close()
raise HTTPException(status_code=400, detail="Room already exists")
cursor.execute(
"INSERT INTO rooms (room_number, max_students, teacher_name, is_open, lesson_date, lesson_time, unique_id) VALUES (?, ?, ?, ?, ?, ?, ?)",
(
room.room_number,
room.max_students,
user['username'],
True,
room.lesson_date,
room.lesson_time,
unique_id,
)
)
conn.commit()
conn.close()
return {"message": "Room created successfully", "unique_id": unique_id}
@app.delete("/teacher/delete_room/")
def delete_room(unique_id: str, session: Session):
user = get_current_user(session.session_id)
if user['role'] != "teacher":
raise HTTPException(status_code=403, detail="Only teachers can delete rooms")
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM rooms WHERE unique_id = ?", (unique_id,))
room = cursor.fetchone()
if room is None:
conn.close()
raise HTTPException(status_code=404, detail="Room not found")
if room["teacher_name"] != user['username']:
conn.close()
raise HTTPException(status_code=403, detail="You do not have permission to delete this room")
cursor.execute("DELETE FROM rooms WHERE unique_id = ?", (unique_id,))
conn.commit()
conn.close()
return {"message": f"Room {unique_id} deleted successfully by {user['username']}"}
@app.post("/teacher/room_students/")
def get_room_students(unique_id: str, session: Session):
user = get_current_user(session.session_id)
if user['role'] != "teacher":
raise HTTPException(status_code=403, detail="Only teachers can view students in their room")
conn = get_db_connection()
cursor = conn.cursor()
# Check if the room exists and if it's owned by the current teacher
cursor.execute("SELECT * FROM rooms WHERE unique_id = ? AND teacher_name = ?", (unique_id, user['username']))
room = cursor.fetchone()
if room is None:
conn.close()
raise HTTPException(status_code=404, detail="Room not found or not owned by this teacher")
# Fetch the students who are currently joined to the room
cursor.execute("""
SELECT s.username
FROM students s
JOIN student_joins sj ON s.username = sj.student_username
WHERE sj.room_unique_id = ?
""", (unique_id,))
students = [row['username'] for row in cursor.fetchall()]
conn.close()
return {"unique_id": unique_id, "students": students}
@app.websocket("/ws/student/my_room")
async def websocket_get_my_room(websocket: WebSocket, session_id: Optional[str] = Header(None)):
await websocket.accept()
# Validate session ID and get user
user = get_current_user(session_id)
if not user or user['role'] != "student":
await websocket.close(code=403)
return
# Store active connection
active_connections[user['username']] = websocket
try:
while True:
# Simulate periodic room and teacher subjects fetch
conn = get_db_connection()
cursor = conn.cursor()
# Fetch rooms the student has joined
cursor.execute(
"""
SELECT r.unique_id, r.room_number, r.teacher_name, r.max_students, r.lesson_date, r.lesson_time,
t.first_name, t.last_name
FROM student_joins sj
JOIN rooms r ON sj.room_unique_id = r.unique_id
JOIN teachers t ON r.teacher_name = t.username
WHERE sj.student_username = ?
""",
(user['username'],)
)
rooms = cursor.fetchall()
# Fetch teacher subjects
cursor.execute("SELECT username, subjects FROM teachers")
teacher_subjects_raw = cursor.fetchall()
teacher_subjects = {row[0]: row[1] for row in teacher_subjects_raw}
rooms_list = []
for room in rooms:
room_dict = dict(room)
room_number = room_dict.get("room_number")
teacher_name = room_dict.get("teacher_name")
# Add additional room details from room_name_dict
if room_number and room_number in room_name_dict:
room_dict.update(room_name_dict[room_number])
# Add teacher subjects if available
if teacher_name and teacher_name in teacher_subjects:
room_dict["subjects"] = teacher_subjects[teacher_name]
# Fetch current student count for the room
cursor.execute(
"""
SELECT COUNT(s.username) as current_students
FROM students s
JOIN student_joins sj ON s.username = sj.student_username
WHERE sj.room_unique_id = ?
""",
(room_dict["unique_id"],)
)
current_students = cursor.fetchone()["current_students"]
room_dict["current_students"] = current_students
rooms_list.append(room_dict)
conn.close()
if rooms_list:
await websocket.send_json({"rooms": rooms_list})
else:
await websocket.send_json({"error": "Student has not joined any rooms"})
await asyncio.sleep(2) # Poll every 2 seconds
except WebSocketDisconnect:
del active_connections[user['username']]
@app.websocket("/ws/open_rooms")
async def websocket_open_rooms(websocket: WebSocket, session_id: Optional[str] = Header(None)):
await websocket.accept()
# Validate session ID and get user
user = get_current_user(session_id)
if not user or user['role'] != "student":
await websocket.close(code=403)
return
# Store active connection
active_connections[user['username']] = websocket
try:
while True:
# Simulate periodic room fetch
conn = sqlite3.connect('school.db')
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Fetch all rooms, both open and closed
cursor.execute("""
SELECT r.unique_id, r.room_number, r.teacher_name, r.max_students,
COUNT(s.username) AS current_students,
r.is_open,
r.lesson_date, r.lesson_time,
t.first_name, t.last_name, t.subjects,
CASE
WHEN EXISTS (
SELECT 1
FROM student_joins sj
JOIN sessions ss ON sj.student_username = ss.username
WHERE sj.room_unique_id = r.unique_id AND ss.session_id = ?
) THEN 1
ELSE 0
END AS joined
FROM rooms r
LEFT JOIN student_joins sj ON r.unique_id = sj.room_unique_id
LEFT JOIN students s ON sj.student_username = s.username
LEFT JOIN teachers t ON r.teacher_name = t.username
WHERE r.lesson_date = ?
GROUP BY r.unique_id
""", (session_id, CURRENT_DATE,))
all_rooms = []
for row in cursor.fetchall():
room = dict(row)
room_number = str(room['room_number'])
room_info = room_name_dict.get(room_number, {})
room.update({
'room_number': int(room_number),
'room_name': room_info.get('room_number', ''),
'info': room_info.get('info', ''),
'location': room_info.get('location', ''),
'joined': bool(room['joined']), # Whether the user joined
'max_students': room['max_students'], # Maximum capacity
'current_students': room['current_students'], # Current count
'is_open': bool(room['is_open']), # Room open status
})
all_rooms.append(room)
conn.close()
if all_rooms:
await websocket.send_json({"rooms": all_rooms})
else:
await websocket.send_json({"message": "No rooms available"})
await asyncio.sleep(1)
except HTTPException:
await websocket.send_json({"error": "Session not found"})
await websocket.close(code=404)
return
except WebSocketDisconnect:
del active_connections[user['username']]
@app.post("/student/join_room/")
def join_room(unique_id: str, session: Session):
user = get_current_user(session.session_id)
if user['role'] != "student":
raise HTTPException(status_code=403, detail="Nur Schüler können diesem Raum beitreten")
conn = get_db_connection()
cursor = conn.cursor()
# Check if the room exists
cursor.execute("SELECT * FROM rooms WHERE unique_id = ?", (unique_id,))
room = cursor.fetchone()
if room is None:
conn.close()
raise HTTPException(status_code=404, detail="Raum nicht gefunden")
# Check if the room is open
if not room["is_open"]:
conn.close()
raise HTTPException(status_code=400, detail="Raum ist geschlossen")
# Check if the student is already in a room at the same time (same lesson_date and lesson_time)
cursor.execute("""
SELECT r.unique_id
FROM student_joins sj
JOIN rooms r ON sj.room_unique_id = r.unique_id
WHERE sj.student_username = ? AND r.lesson_date = ? AND r.lesson_time = ?
""", (user['username'], room["lesson_date"], room["lesson_time"]))
conflicting_room = cursor.fetchone()
if conflicting_room:
conn.close()
raise HTTPException(status_code=400, detail="Du bist bereits einem Raum für die gleiche Stunde zugewiesen")
# Check the current number of students in the room
cursor.execute("""
SELECT COUNT(sj.student_username) as current_students
FROM student_joins sj
WHERE sj.room_unique_id = ?
""", (unique_id,))
current_students = cursor.fetchone()["current_students"]
if current_students >= room["max_students"]:
conn.close()
raise HTTPException(status_code=400, detail="Raum ist voll")
# Add the student to the room by inserting into student_joins table
cursor.execute("""
INSERT INTO student_joins (student_username, room_unique_id)
VALUES (?, ?)
""", (user['username'], unique_id))
conn.commit()
conn.close()
is_open_update(unique_id)
return {"message": f"Joined room {unique_id}"}
@app.post("/student/leave_room/")
def leave_room(unique_id: str, session: Session):
user = get_current_user(session.session_id)
if user['role'] != "student":
raise HTTPException(status_code=403, detail="Only students can leave rooms")
conn = get_db_connection()
cursor = conn.cursor()
# Check if the student is in the specified room by checking the student_joins table
cursor.execute("SELECT * FROM student_joins WHERE student_username = ? AND room_unique_id = ?",
(user['username'], unique_id))
student_join = cursor.fetchone()
if not student_join:
conn.close()
raise HTTPException(status_code=400, detail="You are not in the specified room")
# Remove the student from the room by deleting the entry from student_joins
cursor.execute("DELETE FROM student_joins WHERE student_username = ? AND room_unique_id = ?",
(user['username'], unique_id))
conn.commit()
conn.close()
is_open_update(unique_id)
return {"message": f"You have left room {unique_id}"}
@app.post("/check_session")
def check_session(session: Session):
conn = get_db_connection()
cursor = conn.cursor()
# Check if the session ID exists in the sessions table
cursor.execute("SELECT * FROM sessions WHERE session_id = ?", (session.session_id,))
session_data = cursor.fetchone()
conn.close()
if session_data:
return {"available": True}
else:
return {"available": False}
@app.post("/check_role")
def check_role(session: Session):
session_data = current_role(session.session_id)
if session_data:
return {"role": session_data}
else:
return {"role": None}
@app.websocket("/ws/teacher/open_rooms")
async def websocket_list_open_rooms_teacher(websocket: WebSocket, session_id: Optional[str] = Header(None, alias="session-id")):
await websocket.accept()
try:
# Get the current user and their role based on session_id
info = get_current_user(session_id)
# Convert info to a dictionary if it's a sqlite3.Row
if isinstance(info, sqlite3.Row):
info = dict(info) # Convert sqlite3.Row to dict
role = current_role(session_id)
teacher_username = info.get("username", "") # Access username from the info dictionary
except Exception as e:
print(f"Session validation error: {e}")
try:
await websocket.send_json({"error": "Session not found"})
except WebSocketDisconnect:
pass # Ignore if the client has disconnected
await websocket.close(code=4004)
return
role_final = ''.join([elt for elt in role])
# Check if the role is 'teacher'
if role_final != "teacher":
try:
await websocket.send_json({"message": "You are not a Teacher"})
except WebSocketDisconnect:
pass # Ignore if the client has disconnected
await websocket.close(code=1008)
return
try:
# Fetch and send room data for the teacher
await send_room_data(websocket, teacher_username)
while True:
await asyncio.sleep(2)
await send_room_data(websocket, teacher_username)
except WebSocketDisconnect:
print("WebSocket disconnected")
except Exception as e:
print(f"Unexpected error: {e}")
try:
await websocket.send_json({"error": "An internal server error occurred."})
except WebSocketDisconnect:
pass # Ignore if the client has disconnected
await websocket.close(code=1011)
async def send_room_data(websocket: WebSocket, teacher_username: str):
try:
conn = get_db_connection()
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Update the query to filter rooms by the teacher's username
cursor.execute("""
SELECT r.unique_id, r.room_number, r.teacher_name, r.max_students,
COUNT(s.username) AS current_students
FROM rooms r
LEFT JOIN student_joins sj ON r.unique_id = sj.room_unique_id
LEFT JOIN students s ON sj.student_username = s.username
WHERE r.is_open = 1 AND r.teacher_name = ?
GROUP BY r.unique_id
ORDER BY r.room_number
""", (teacher_username,))
open_rooms = []
for row in cursor.fetchall():
room = dict(row) # Convert the row to a dictionary
room_number = str(room['room_number'])
room_info = room_name_dict.get(room_number, {})
room.update({
'room_number': int(room_number),
'room_name': room_info.get('room_number', ''),
'info': room_info.get('info', ''),
'location': room_info.get('location', ''),
})
open_rooms.append(room)
conn.close()
if open_rooms:
try:
await websocket.send_json({"open_rooms": open_rooms})
except WebSocketDisconnect:
pass # Ignore if the client has disconnected
else:
try:
await websocket.send_json({"message": "No open rooms available"})
except WebSocketDisconnect:
pass # Ignore if the client has disconnected
except Exception as e:
print(f"Error fetching or sending room data: {e}")
try:
await websocket.send_json({"error": "Failed to fetch room data"})
except WebSocketDisconnect:
pass # Ignore if the client has disconnected
@app.get("/teacher/all_room_information")
def all_room_information():
return ROOM_NAMES
@app.get("/test")
def test():
conn = get_db_connection()
cursor = conn.cursor()
# Check if the session ID exists in the sessions table
cursor.execute("SELECT * FROM students WHERE username = ?", ("hi",))
session_data = cursor.fetchone()
print(session_data)
conn.close()
if session_data:
print(session_data) # Liste der Fächer ausgeben
return {"subjects": session_data} # Liste der Fächer zurückgeben
else:
return {"error": "Benutzer nicht gefunden oder keine Fächer vorhanden"}
@app.websocket("/ws/student/test")
async def websocket_get_my_room(websocket: WebSocket, session_id: Optional[str] = Header(None)):
await websocket.accept()
# Validate session ID
user = get_current_user(session_id)
for i in user:
print(i)
if not user:
await websocket.close(code=403)
return
# Store connection
active_connections[user['username']] = websocket
counter = 0 # Initialize counter to 0
try:
while True:
# Send incrementing counter value every second
counter += 1
await websocket.send_text(f"Counter: {counter}")
await asyncio.sleep(1) # Wait 1 second before sending the next message
except WebSocketDisconnect:
del active_connections[user['username']]
print(f"Connection closed for user: {user['username']}")
def generate_pdf(file_path):
class PDF(FPDF):
def header(self):
self.set_font("Arial", "B", 16)
today = "01.01.2025"
self.cell(0, 10, f"Attendance Sheet - {today}", ln=True, align="C")
self.ln(10)
pdf = PDF()
pdf.add_page()
pdf.set_font("Arial", size=12)
# Column headers
pdf.cell(80, 10, "Namen", border=1, align="C")
pdf.cell(50, 10, "Anwesend", border=1, align="C")
pdf.cell(50, 10, "Abwesend", border=1, align="C")
pdf.ln()
# Names list
names = [
"Jasper Grevsmühl",
"Papa Grevsmühl",
"Clara Müller",
"Anna Schmidt",
"Max Mustermann",
]
for name in names:
pdf.cell(80, 10, name, border=1)
pdf.cell(50, 10, "", border=1)
pdf.cell(50, 10, "", border=1)
pdf.ln()
# Save the PDF to the provided file path
pdf.output(file_path)
# FastAPI route for downloading the PDF
@app.get("/download/{filename}")
def download(filename: str):
# Prepare file path
temp_dir = tempfile.mkdtemp()
pdf_file_path = os.path.join(temp_dir, filename)
# Generate the PDF file
generate_pdf(pdf_file_path)
# Return the generated file as a response
return FileResponse(pdf_file_path)