Spaces:
Running
Running
File size: 6,126 Bytes
ca89148 5a7e21a de321fc ca89148 5a7e21a ca89148 5a7e21a ca89148 cd69490 ca89148 cd69490 ca89148 de321fc ca89148 de321fc ca89148 5a7e21a ca89148 5a7e21a ca89148 cd69490 de321fc ca89148 cd69490 f98f521 de321fc ff06039 de321fc f98f521 5a7e21a f98f521 cd69490 ca89148 de321fc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
import os
import json
import shutil
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import asyncio
import aiofiles
import aiosmtplib
from agent_monitor.monitor import analyze_agent_steps
from agent_monitor.failure_report import analyze_agent_performance, AsyncOpenAIClient
import traceback
from tqdm import tqdm
async def check_and_process_uploads():
upload_dir = "evals_upload"
processed_dir = "evals_processed"
live_dir = "evals_live"
new_uploads = [f for f in os.listdir(upload_dir) if f.endswith('.json')]
if not new_uploads:
print("No new uploads found.")
return
# check for all new uploads whether they are already in live or processed directory
# Also check whether the files are actually identical
unprocessed_uploads = []
for upload in new_uploads:
upload_path = os.path.join(upload_dir, upload)
processed_path = os.path.join(processed_dir, upload)
live_path = os.path.join(live_dir, upload)
if not os.path.exists(live_path) and not os.path.exists(processed_path):
unprocessed_uploads.append(upload)
elif os.path.exists(processed_path):
# with open(upload_path, 'r') as f:
# new_data = json.load(f)
# with open(processed_path, 'r') as f:
# processed_data = json.load(f)
# TODO we can use a better comparison method with exact comparison
# if new_data != processed_data:
# unprocessed_uploads.append(upload)
print(f"Upload {upload} is already in processed directory.")
elif os.path.exists(live_path):
with open(upload_path, 'r') as f:
new_data = json.load(f)
with open(live_path, 'r') as f:
live_data = json.load(f)
# if new_data != live_data:
# unprocessed_uploads.append(upload)
print(f"Upload {upload} is already in live directory.")
else:
unprocessed_uploads.append(upload)
print(f"Processing {len(unprocessed_uploads)} new uploads.")
tasks = []
for upload in tqdm(unprocessed_uploads):
upload_path = os.path.join(upload_dir, upload)
processed_path = os.path.join(processed_dir, upload)
# tasks.append(process_single_upload(upload_path, processed_path)) # for async processing
await process_single_upload(upload_path, processed_path)
# await asyncio.gather(*tasks) # for async processing
async def process_single_upload(upload_path, processed_path):
# Check the structure of the upload
check_result = await check_upload_structure(upload_path)
if check_result['is_valid']:
# Process the file
await process_upload(upload_path, processed_path)
# Move the file to processed directory
# await asyncio.to_thread(shutil.move, upload_path, processed_path)
else:
print(f"Upload check failed for {upload_path}: {check_result['message']}")
async def check_upload_structure(file_path):
try:
async with aiofiles.open(file_path, 'r') as f:
data = json.loads(await f.read())
# Check for required keys
required_keys = ['config', 'results', 'raw_eval_results', 'raw_logging_results']
missing_keys = [key for key in required_keys if key not in data]
if missing_keys:
return {'is_valid': False, 'message': f"Missing required keys: {', '.join(missing_keys)}"}
# Check for specific structure in raw_logging_results
if not isinstance(data['raw_logging_results'], list):
return {'is_valid': False, 'message': "raw_logging_results should be a list"}
for item in data['raw_logging_results']:
if not all(key in item for key in ['weave_task_id', 'inputs', 'outputs']):
return {'is_valid': False, 'message': "Each item in raw_logging_results should have weave_task_id, inputs, and outputs"}
return {'is_valid': True, 'message': "File structure is valid"}
except json.JSONDecodeError:
return {'is_valid': False, 'message': "Invalid JSON format"}
except Exception as e:
return {'is_valid': False, 'message': f"Unexpected error: {str(e)}"}
async def process_upload(input_path, output_path):
print(f"Processing {input_path}...")
# load the file
with open(input_path, 'r') as f:
data = json.loads(f.read())
assert 'raw_logging_results' in data, "raw_logging_results key not found in the file"
openai_client = AsyncOpenAIClient(model="gpt-4o-mini")
try:
processed_calls = await analyze_agent_steps(data['raw_logging_results'], openai_client, llm_eval=True)
failure_report = await analyze_agent_performance(data['raw_logging_results'], data['results']['failed_tasks'], openai_client)
data['raw_logging_results'] = processed_calls
data['failure_report'] = failure_report
except Exception as e:
traceback.print_exc()
print(f"Error in processing: {str(e)}")
return
with open(output_path, 'w') as f:
json.dump(data, f, indent=4)
print(f"Processing of {input_path} successful. Results saved to {output_path}")
async def send_email_notification(filename, check_result, status):
sender_email = "your_email@example.com"
receiver_email = "receiver@example.com"
password = "your_password"
message = MIMEMultipart()
message["From"] = sender_email
message["To"] = receiver_email
message["Subject"] = f"Upload Processing Notification: {filename}"
body = f"""
File: {filename}
Status: {status}
Check Result: {check_result['message']}
"""
message.attach(MIMEText(body, "plain"))
await aiosmtplib.send(
message,
hostname="smtp.gmail.com",
port=465,
use_tls=True,
username=sender_email,
password=password
)
|