Spaces:
Sleeping
Sleeping
import uuid | |
import json | |
import gradio as gr | |
from datetime import datetime | |
import urllib.parse | |
import pytz | |
class SubmissionService: | |
def __init__(self, gcs_service): | |
self.gcs_service = gcs_service | |
self.bucket_name = "ai_assignment_submission" | |
def create_submission_id(self): | |
while True: | |
submission_id = str(uuid.uuid4()) | |
file_name = f"submissions/{submission_id}.json" | |
if not self.gcs_service.check_file_exists(self.bucket_name, file_name): | |
return submission_id | |
def submit_assignment(self, assignment_id, student_id, student_name, student_email, submission_content, file_name=None, bucket_name=None): | |
submission_id = self.create_submission_id() | |
timestamp_now = datetime.now(pytz.utc).astimezone(pytz.timezone('Asia/Taipei')).strftime("%Y-%m-%d %H:%M:%S") | |
submission_data = { | |
"file_name": file_name, | |
"bucket_name": bucket_name, | |
"content": submission_content | |
} | |
submission_data = { | |
"submission_id": submission_id, | |
"assignment_id": assignment_id, | |
"student_id": student_id, | |
"student_email": student_email, | |
"student_name": student_name, | |
"submission_data": submission_data, | |
"timestamp": timestamp_now | |
} | |
self.save_submission_to_gcs(submission_data) | |
self.save_user_submissions_to_gcs(student_id, submission_data) | |
self.update_assignment_submission_ids(assignment_id, submission_id) | |
return f"作業已收到,學生ID:{student_id}" | |
def update_assignment_submission_ids(self, assignment_id, submission_id): | |
try: | |
assignment_data = self.get_assignment_data(assignment_id) | |
if assignment_data: | |
if "submission_ids" not in assignment_data: | |
assignment_data["submission_ids"] = [] | |
assignment_data["submission_ids"].append(submission_id) | |
file_name = f"assignments/{assignment_id}.json" | |
self.gcs_service.upload_json_string(self.bucket_name, file_name, json.dumps(assignment_data)) | |
except Exception as e: | |
print(f"更新作業提交ID時出錯:{e}") | |
def get_assignment_data(self, assignment_id): | |
try: | |
file_name = f"assignments/{assignment_id}.json" | |
assignment_json = self.gcs_service.download_as_string(self.bucket_name, file_name) | |
assignment_data = json.loads(assignment_json) | |
except Exception as e: | |
print(f"Error: {e}") | |
return None | |
return assignment_data | |
def save_submission_to_gcs(self, submission_data): | |
file_name = f"submissions/{submission_data['submission_id']}.json" | |
self.gcs_service.upload_json_string(self.bucket_name, file_name, json.dumps(submission_data)) | |
def save_user_submissions_to_gcs(self, student_id, submission_data): | |
encoded_student_id_url = urllib.parse.quote(student_id, safe='') | |
try: | |
user_submissions_json = self.gcs_service.download_as_string(self.bucket_name, f"users/{encoded_student_id_url}/submissions.json") | |
user_submissions = json.loads(user_submissions_json) | |
except Exception as e: | |
print(f"Error: {e}") | |
user_submissions = {} | |
submission_id = submission_data['submission_id'] | |
assignment_id = submission_data['assignment_id'] | |
assignment_data = self.get_assignment_data(assignment_id) | |
user_submissions[submission_id] = { | |
"assignment_id": submission_data['assignment_id'], | |
"assignment_name": assignment_data["metadata"]["topic"], | |
"assignment_type": assignment_data["assignment_type"], | |
"timestamp": submission_data['timestamp'], | |
} | |
self.gcs_service.upload_json_string(self.bucket_name, f"users/{encoded_student_id_url}/submissions.json", json.dumps(user_submissions)) | |
def get_submission_from_gcs(self, submission_id): | |
try: | |
file_name = f"submissions/{submission_id}.json" | |
submission_json = self.gcs_service.download_as_string(self.bucket_name, file_name) | |
submission_data = json.loads(submission_json) | |
except Exception as e: | |
print(f"Error: {e}") | |
return None | |
return submission_data | |
def list_submissions(self): | |
# 列出所有提交的作业 | |
try: | |
prefix = "submissions/" | |
blobs = self.gcs_service.list_files(self.bucket_name, prefix) | |
submission_ids = [blob.name.split("/")[-1].replace(".json", "") for blob in blobs] | |
return submission_ids | |
except Exception as e: | |
print(f"Error: {e}") | |
return [] | |
def get_student_submissions_list(self, student_id): | |
try: | |
encoded_student_id_url = urllib.parse.quote(student_id, safe='') | |
user_submissions_json = self.gcs_service.download_as_string(self.bucket_name, f"users/{encoded_student_id_url}/submissions.json") | |
user_submissions = json.loads(user_submissions_json) | |
except Exception as e: | |
print(f"Error: {e}") | |
return {} | |
return user_submissions | |
def update_submission_list(self, student_id): | |
choices = [] | |
user_submissions = self.get_student_submissions_list(student_id) | |
for submission_id, submission in user_submissions.items(): | |
topic = submission.get("assignment_name", "未命名作业") | |
choice = (topic, submission_id) | |
choices.append(choice) | |
return gr.update(choices=choices) | |
def get_student_submissions(self, student_id): | |
submission_ids = self.list_submissions() | |
for submission_id in submission_ids: | |
submission_data = self.get_submission_from_gcs(submission_id) | |
if submission_data and submission_data.get("student_id") == student_id: | |
return submission_data["submission"] | |
return "未找到學生提交的作業" | |