|
import gradio as gr |
|
import json |
|
import importlib |
|
import os |
|
import sys |
|
from pathlib import Path |
|
import concurrent.futures |
|
import multiprocessing |
|
import time |
|
import threading |
|
import queue |
|
import uuid |
|
import numpy as np |
|
from datetime import datetime |
|
from tqdm.auto import tqdm |
|
from src.containerized_eval import eval_string_script |
|
|
|
|
|
current_dir = os.path.dirname(os.path.abspath(__file__)) |
|
src_dir = os.path.join(current_dir, "src") |
|
if current_dir not in sys.path: |
|
sys.path.append(current_dir) |
|
if src_dir not in sys.path: |
|
sys.path.append(src_dir) |
|
|
|
|
|
task_queue = queue.Queue() |
|
|
|
task_status = {} |
|
|
|
task_history = [] |
|
|
|
lock = threading.Lock() |
|
|
|
worker_threads = max(1, multiprocessing.cpu_count() // 2) |
|
|
|
running = True |
|
|
|
task_type_times = {} |
|
|
|
def queue_processor(): |
|
"""Process tasks in the queue""" |
|
while running: |
|
try: |
|
task_id, input_data, request_time = task_queue.get(timeout=3) |
|
with lock: |
|
task_status[task_id]['status'] = 'processing' |
|
task_status[task_id]['start_time'] = time.time() |
|
|
|
if isinstance(input_data, list) and len(input_data) > 0: |
|
sample_task = input_data[0] |
|
language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown' |
|
task_size = len(input_data) |
|
task_complexity = _estimate_task_complexity(input_data) |
|
|
|
with lock: |
|
task_status[task_id]['estimated_factors'] = { |
|
'language': language, |
|
'size': task_size, |
|
'complexity': task_complexity |
|
} |
|
|
|
result = evaluate(input_data) |
|
|
|
end_time = time.time() |
|
process_time = end_time - task_status[task_id]['start_time'] |
|
|
|
with lock: |
|
task_status[task_id]['status'] = 'completed' |
|
task_status[task_id]['result'] = result |
|
task_status[task_id]['end_time'] = end_time |
|
task_status[task_id]['process_time'] = process_time |
|
|
|
if 'estimated_factors' in task_status[task_id]: |
|
factors = task_status[task_id]['estimated_factors'] |
|
key = f"{factors['language']}_{factors['complexity']}" |
|
|
|
if key not in task_type_times: |
|
task_type_times[key] = [] |
|
|
|
task_type_times[key].append(process_time / factors['size']) |
|
if len(task_type_times[key]) > 10: |
|
task_type_times[key] = task_type_times[key][-10:] |
|
|
|
task_history.append({ |
|
'task_id': task_id, |
|
'request_time': request_time, |
|
'process_time': process_time, |
|
'status': 'completed', |
|
'factors': task_status[task_id].get('estimated_factors', {}) |
|
}) |
|
while len(task_history) > 200: |
|
task_history.pop(0) |
|
|
|
task_queue.task_done() |
|
|
|
except queue.Empty: |
|
continue |
|
except Exception as e: |
|
if 'task_id' in locals(): |
|
with lock: |
|
task_status[task_id]['status'] = 'error' |
|
task_status[task_id]['error'] = str(e) |
|
task_status[task_id]['end_time'] = time.time() |
|
task_queue.task_done() |
|
|
|
def _estimate_task_complexity(tasks): |
|
"""Estimate task complexity |
|
|
|
Returns: 'simple', 'medium', or 'complex' |
|
""" |
|
total_code_length = 0 |
|
count = 0 |
|
|
|
for task in tasks: |
|
if isinstance(task, dict): |
|
prompt = task.get('prompt', '') |
|
tests = task.get('tests', '') |
|
completions = task.get('processed_completions', []) |
|
|
|
code_length = len(prompt) + len(tests) |
|
if completions: |
|
code_length += sum(len(comp) for comp in completions) |
|
|
|
total_code_length += code_length |
|
count += 1 |
|
|
|
if count == 0: |
|
return 'medium' |
|
|
|
avg_length = total_code_length / count |
|
|
|
if avg_length < 1000: |
|
return 'simple' |
|
elif avg_length < 5000: |
|
return 'medium' |
|
else: |
|
return 'complex' |
|
|
|
def evaluate(input_data): |
|
"""Main function for code evaluation""" |
|
try: |
|
if not isinstance(input_data, list): |
|
return {"status": "Exception", "error": "Input must be a list"} |
|
|
|
results = [] |
|
|
|
|
|
|
|
max_workers = max(1, min(multiprocessing.cpu_count() // 2, 4)) |
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
future_to_item = {executor.submit(evaluate_single_case, item): item for item in input_data} |
|
for future in concurrent.futures.as_completed(future_to_item): |
|
item = future_to_item[future] |
|
try: |
|
result = future.result() |
|
results.append(result) |
|
except Exception as e: |
|
item.update({"status": "Exception", "error": str(e)}) |
|
results.append(item) |
|
return results |
|
|
|
except Exception as e: |
|
return {"status": "Exception", "error": str(e)} |
|
|
|
def evaluate_single_case(input_data): |
|
"""Evaluate a single code case""" |
|
try: |
|
if not isinstance(input_data, dict): |
|
return {"status": "Exception", "error": "Input item must be a dictionary"} |
|
|
|
language = input_data.get('language') |
|
code = input_data.get('code', "") |
|
|
|
if code == "": |
|
return {"status": "Exception", "error": "No code provided"} |
|
|
|
|
|
max_retries = 2 |
|
|
|
|
|
status, stderr = "", "" |
|
for attempt in range(max_retries): |
|
result = evaluate_code(code, language) |
|
|
|
|
|
if result["status"] == "OK": |
|
break |
|
|
|
time.sleep(0.3) |
|
|
|
status = result["status"] |
|
stderr = result["stderr"] |
|
|
|
detail_data = { |
|
'name': input_data['input_data'], |
|
'language': input_data['language'], |
|
'code': code, |
|
'status': status, |
|
'stderr': stderr, |
|
} |
|
return detail_data |
|
|
|
except Exception as e: |
|
return {"status": "Exception", "error": str(e)} |
|
|
|
def evaluate_code(code, language): |
|
"""Evaluate code in a specific language""" |
|
try: |
|
result = eval_string_script(language, code) |
|
return result |
|
|
|
except Exception as e: |
|
return {"status": "Exception", "error": str(e)} |
|
|
|
def synchronous_evaluate(input_data): |
|
"""Synchronously evaluate code, compatible with original interface""" |
|
if isinstance(input_data, list) and len(input_data) > 0: |
|
sample_task = input_data[0] |
|
language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown' |
|
task_size = len(input_data) |
|
task_complexity = _estimate_task_complexity(input_data) |
|
else: |
|
language = 'unknown' |
|
task_size = 1 |
|
task_complexity = 'medium' |
|
|
|
estimated_time_per_task = _get_estimated_time_for_task(language, task_complexity) |
|
estimated_total_time = estimated_time_per_task * task_size |
|
|
|
queue_info = get_queue_status() |
|
waiting_tasks = queue_info['waiting_tasks'] |
|
|
|
task_id = str(uuid.uuid4()) |
|
request_time = time.time() |
|
|
|
with lock: |
|
task_status[task_id] = { |
|
'status': 'queued', |
|
'queued_time': request_time, |
|
'queue_position': task_queue.qsize() + 1, |
|
'synchronous': True, |
|
'estimated_factors': { |
|
'language': language, |
|
'size': task_size, |
|
'complexity': task_complexity |
|
}, |
|
'estimated_time': estimated_total_time |
|
} |
|
|
|
task_queue.put((task_id, input_data, request_time)) |
|
|
|
while True: |
|
with lock: |
|
if task_id in task_status: |
|
status = task_status[task_id]['status'] |
|
if status == 'completed': |
|
result = task_status[task_id]['result'] |
|
task_status.pop(task_id, None) |
|
return result |
|
elif status == 'error': |
|
error = task_status[task_id].get('error', 'Unknown error') |
|
task_status.pop(task_id, None) |
|
return {"status": "Exception", "error": error} |
|
|
|
time.sleep(0.1) |
|
|
|
def _get_estimated_time_for_task(language, complexity): |
|
"""Get estimated processing time for a specific task type""" |
|
key = f"{language}_{complexity}" |
|
|
|
if key in task_type_times and len(task_type_times[key]) > 0: |
|
return np.median(task_type_times[key]) |
|
|
|
if complexity == 'simple': |
|
return 1.0 |
|
elif complexity == 'medium': |
|
return 3.0 |
|
else: |
|
return 8.0 |
|
|
|
def enqueue_task(input_data): |
|
"""Add task to queue""" |
|
if isinstance(input_data, list) and len(input_data) > 0: |
|
sample_task = input_data[0] |
|
language = sample_task.get('language', 'unknown') if isinstance(sample_task, dict) else 'unknown' |
|
task_size = len(input_data) |
|
task_complexity = _estimate_task_complexity(input_data) |
|
else: |
|
language = 'unknown' |
|
task_size = 1 |
|
task_complexity = 'medium' |
|
|
|
estimated_time_per_task = _get_estimated_time_for_task(language, task_complexity) |
|
estimated_total_time = estimated_time_per_task * task_size |
|
|
|
task_id = str(uuid.uuid4()) |
|
request_time = time.time() |
|
|
|
with lock: |
|
task_status[task_id] = { |
|
'status': 'queued', |
|
'queued_time': request_time, |
|
'queue_position': task_queue.qsize() + 1, |
|
'estimated_factors': { |
|
'language': language, |
|
'size': task_size, |
|
'complexity': task_complexity |
|
}, |
|
'estimated_time': estimated_total_time |
|
} |
|
|
|
queue_info = get_queue_status() |
|
est_wait = queue_info['estimated_wait'] |
|
|
|
task_queue.put((task_id, input_data, request_time)) |
|
|
|
return { |
|
'task_id': task_id, |
|
'status': 'queued', |
|
'queue_position': task_status[task_id]['queue_position'], |
|
'estimated_wait': est_wait, |
|
'estimated_processing': estimated_total_time |
|
} |
|
|
|
def check_status(task_id): |
|
"""Check task status""" |
|
with lock: |
|
if task_id not in task_status: |
|
return {'status': 'not_found'} |
|
|
|
status_info = task_status[task_id].copy() |
|
|
|
if status_info['status'] in ['completed', 'error'] and time.time() - status_info.get('end_time', 0) > 3600: |
|
task_status.pop(task_id, None) |
|
|
|
return status_info |
|
|
|
def get_queue_status(): |
|
"""Get queue status""" |
|
with lock: |
|
queued_tasks = [t for t in task_status.values() if t['status'] == 'queued'] |
|
processing_tasks = [t for t in task_status.values() if t['status'] == 'processing'] |
|
|
|
queue_size = task_queue.qsize() |
|
active_tasks = len(processing_tasks) |
|
waiting_tasks = len(queued_tasks) |
|
|
|
remaining_processing_time = 0 |
|
for task in processing_tasks: |
|
if 'start_time' in task and 'estimated_time' in task: |
|
elapsed = time.time() - task['start_time'] |
|
remaining = max(0, task['estimated_time'] - elapsed) |
|
remaining_processing_time += remaining |
|
else: |
|
remaining_processing_time += 2 |
|
|
|
if active_tasks > 0: |
|
remaining_processing_time = remaining_processing_time / min(active_tasks, worker_threads) |
|
|
|
queued_processing_time = 0 |
|
for task in queued_tasks: |
|
if 'estimated_time' in task: |
|
queued_processing_time += task['estimated_time'] |
|
else: |
|
queued_processing_time += 5 |
|
|
|
if worker_threads > 0 and queued_processing_time > 0: |
|
queued_processing_time = queued_processing_time / worker_threads |
|
|
|
estimated_wait = remaining_processing_time + queued_processing_time |
|
|
|
if task_history: |
|
prediction_ratios = [] |
|
for task in task_history: |
|
if 'factors' in task and 'estimated_time' in task: |
|
prediction_ratios.append(task['process_time'] / task['estimated_time']) |
|
|
|
if prediction_ratios: |
|
correction_factor = np.median(prediction_ratios) |
|
correction_factor = max(0.5, min(2.0, correction_factor)) |
|
estimated_wait *= correction_factor |
|
|
|
estimated_wait = max(0.1, estimated_wait) |
|
if waiting_tasks == 0 and active_tasks == 0: |
|
estimated_wait = 0 |
|
|
|
recent_tasks = task_history[-5:] if task_history else [] |
|
|
|
return { |
|
'queue_size': queue_size, |
|
'active_tasks': active_tasks, |
|
'waiting_tasks': waiting_tasks, |
|
'worker_threads': worker_threads, |
|
'estimated_wait': estimated_wait, |
|
'recent_tasks': recent_tasks |
|
} |
|
|
|
def format_time(seconds): |
|
"""Format time into readable format""" |
|
if seconds < 60: |
|
return f"{seconds:.1f} seconds" |
|
elif seconds < 3600: |
|
minutes = int(seconds / 60) |
|
seconds = seconds % 60 |
|
return f"{minutes}m {seconds:.1f}s" |
|
else: |
|
hours = int(seconds / 3600) |
|
minutes = int((seconds % 3600) / 60) |
|
return f"{hours}h {minutes}m" |
|
|
|
def ui_get_queue_info(): |
|
"""Get queue info for UI""" |
|
queue_info = get_queue_status() |
|
|
|
tasks_html = "" |
|
for task in reversed(queue_info['recent_tasks']): |
|
tasks_html += f""" |
|
<tr> |
|
<td>{task['task_id'][:8]}...</td> |
|
<td>{datetime.fromtimestamp(task['request_time']).strftime('%H:%M:%S')}</td> |
|
<td>{format_time(task['process_time'])}</td> |
|
</tr> |
|
""" |
|
|
|
if not tasks_html: |
|
tasks_html = """ |
|
<tr> |
|
<td colspan="3" style="text-align: center; padding: 20px;">No historical tasks</td> |
|
</tr> |
|
""" |
|
|
|
return f""" |
|
<div class="dashboard"> |
|
<div class="queue-info-card main-card"> |
|
<h3 class="card-title">Queue Status Monitor</h3> |
|
<div class="queue-stats"> |
|
<div class="stat-item"> |
|
<div class="stat-value">{queue_info['waiting_tasks']}</div> |
|
<div class="stat-label">Waiting</div> |
|
</div> |
|
<div class="stat-item"> |
|
<div class="stat-value">{queue_info['active_tasks']}</div> |
|
<div class="stat-label">Processing</div> |
|
</div> |
|
<div class="stat-item"> |
|
<div class="stat-value">{queue_info['worker_threads']}</div> |
|
<div class="stat-label">Worker Threads</div> |
|
</div> |
|
</div> |
|
|
|
<div class="wait-time"> |
|
<p><b>Current Estimated Wait Time:</b> {format_time(queue_info['estimated_wait'])}</p> |
|
<p class="last-update"><small>Last update: {datetime.now().strftime('%H:%M:%S')}</small></p> |
|
</div> |
|
</div> |
|
|
|
<div class="queue-info-card history-card"> |
|
<h3 class="card-title">Recently Processed Tasks</h3> |
|
<table class="recent-tasks"> |
|
<thead> |
|
<tr> |
|
<th>Task ID</th> |
|
<th>Request Time</th> |
|
<th>Processing Time</th> |
|
</tr> |
|
</thead> |
|
<tbody> |
|
{tasks_html} |
|
</tbody> |
|
</table> |
|
</div> |
|
</div> |
|
""" |
|
|
|
def launch_workers(): |
|
"""Launch worker threads""" |
|
global running |
|
running = True |
|
|
|
for _ in range(worker_threads): |
|
worker = threading.Thread(target=queue_processor) |
|
worker.daemon = True |
|
worker.start() |
|
|
|
|
|
custom_css = """ |
|
.container { |
|
max-width: 1200px; |
|
margin: 0 auto; |
|
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; |
|
} |
|
|
|
.dashboard { |
|
display: flex; |
|
flex-direction: column; |
|
gap: 20px; |
|
} |
|
|
|
.card-title { |
|
color: #333; |
|
border-bottom: 2px solid #ddd; |
|
padding-bottom: 10px; |
|
margin-top: 0; |
|
} |
|
|
|
.status-card, .queue-info-card { |
|
background: #fff; |
|
border-radius: 12px; |
|
padding: 20px; |
|
margin: 10px 0; |
|
box-shadow: 0 4px 15px rgba(0,0,0,0.08); |
|
} |
|
|
|
.main-card { |
|
border-top: 5px solid #4285f4; |
|
} |
|
|
|
.history-card { |
|
border-top: 5px solid #34a853; |
|
} |
|
|
|
.status-card.success { |
|
background: #e7f5e7; |
|
border-left: 5px solid #28a745; |
|
} |
|
|
|
.status-card.error { |
|
background: #f8d7da; |
|
border-left: 5px solid #dc3545; |
|
} |
|
|
|
.error-message { |
|
color: #dc3545; |
|
font-weight: bold; |
|
padding: 10px; |
|
background: #f8d7da; |
|
border-radius: 5px; |
|
} |
|
|
|
.notice { |
|
color: #0c5460; |
|
background-color: #d1ecf1; |
|
padding: 10px; |
|
border-radius: 5px; |
|
} |
|
|
|
.queue-stats { |
|
display: flex; |
|
justify-content: space-around; |
|
margin: 20px 0; |
|
} |
|
|
|
.stat-item { |
|
text-align: center; |
|
padding: 15px; |
|
background: #f8f9fa; |
|
border-radius: 10px; |
|
min-width: 120px; |
|
transition: transform 0.3s ease; |
|
} |
|
|
|
.stat-item:hover { |
|
transform: translateY(-5px); |
|
box-shadow: 0 5px 15px rgba(0,0,0,0.1); |
|
} |
|
|
|
.stat-value { |
|
font-size: 32px; |
|
font-weight: bold; |
|
color: #4285f4; |
|
margin-bottom: 5px; |
|
} |
|
|
|
.stat-label { |
|
color: #5f6368; |
|
font-size: 16px; |
|
} |
|
|
|
.wait-time { |
|
text-align: center; |
|
margin: 20px 0; |
|
padding: 15px; |
|
background: #f1f3f4; |
|
border-radius: 8px; |
|
font-size: 18px; |
|
} |
|
|
|
.last-update { |
|
color: #80868b; |
|
margin-top: 10px; |
|
margin-bottom: 0; |
|
} |
|
|
|
.recent-tasks { |
|
width: 100%; |
|
border-collapse: collapse; |
|
margin-top: 15px; |
|
background: white; |
|
box-shadow: 0 1px 3px rgba(0,0,0,0.05); |
|
} |
|
|
|
.recent-tasks th, .recent-tasks td { |
|
border: 1px solid #e0e0e0; |
|
padding: 12px 15px; |
|
text-align: center; |
|
} |
|
|
|
.recent-tasks th { |
|
background-color: #f1f3f4; |
|
color: #202124; |
|
font-weight: 500; |
|
} |
|
|
|
.recent-tasks tbody tr:hover { |
|
background-color: #f8f9fa; |
|
} |
|
|
|
.tabs { |
|
margin-top: 20px; |
|
} |
|
|
|
button.primary { |
|
background-color: #4285f4; |
|
color: white; |
|
padding: 10px 20px; |
|
border: none; |
|
border-radius: 4px; |
|
cursor: pointer; |
|
font-size: 16px; |
|
font-weight: 500; |
|
transition: background-color 0.3s; |
|
} |
|
|
|
button.primary:hover { |
|
background-color: #3367d6; |
|
} |
|
""" |
|
|
|
|
|
launch_workers() |
|
|
|
|
|
with gr.Blocks(css=custom_css) as demo: |
|
gr.Markdown("# Code Evaluation Service") |
|
gr.Markdown("Code evaluation service supporting multiple programming languages, using queue mechanism to process requests") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
|
|
queue_info_html = gr.HTML() |
|
refresh_queue_btn = gr.Button("Refresh Queue Status", variant="primary") |
|
|
|
|
|
with gr.Row(visible=False): |
|
api_input = gr.JSON() |
|
api_output = gr.JSON() |
|
|
|
|
|
def update_queue_info(): |
|
return ui_get_queue_info() |
|
|
|
|
|
demo.load(update_queue_info, None, queue_info_html, every=3) |
|
|
|
|
|
refresh_queue_btn.click(update_queue_info, None, queue_info_html) |
|
|
|
|
|
demo.queue() |
|
evaluate_endpoint = demo.load(fn=synchronous_evaluate, inputs=api_input, outputs=api_output, api_name="evaluate") |
|
|
|
if __name__ == "__main__": |
|
try: |
|
demo.launch() |
|
finally: |
|
|
|
running = False |