File size: 6,126 Bytes
ca89148
 
 
 
 
 
 
 
 
5a7e21a
 
 
de321fc
ca89148
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5a7e21a
 
ca89148
5a7e21a
 
ca89148
cd69490
 
 
ca89148
 
 
 
 
 
 
 
cd69490
 
ca89148
 
 
 
 
 
de321fc
ca89148
 
de321fc
 
 
 
 
ca89148
 
 
 
 
 
 
 
5a7e21a
ca89148
 
5a7e21a
ca89148
 
cd69490
de321fc
ca89148
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cd69490
 
 
 
 
 
 
 
f98f521
de321fc
 
ff06039
de321fc
 
f98f521
5a7e21a
f98f521
 
cd69490
 
 
 
 
ca89148
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
de321fc
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import os
import json
import shutil
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import asyncio
import aiofiles
import aiosmtplib
from agent_monitor.monitor import analyze_agent_steps
from agent_monitor.failure_report import analyze_agent_performance, AsyncOpenAIClient
import traceback
from tqdm import tqdm

async def check_and_process_uploads():
    upload_dir =  "evals_upload"
    processed_dir = "evals_processed"
    live_dir = "evals_live"
    
    new_uploads = [f for f in os.listdir(upload_dir) if f.endswith('.json')]
    
    if not new_uploads:
        print("No new uploads found.")
        return
    
    # check for all new uploads whether they are already in live or processed directory
    # Also check whether the files are actually identical
    unprocessed_uploads = []
    for upload in new_uploads:
        upload_path = os.path.join(upload_dir, upload)
        processed_path = os.path.join(processed_dir, upload)
        live_path = os.path.join(live_dir, upload)
        
        if not os.path.exists(live_path) and not os.path.exists(processed_path):
            unprocessed_uploads.append(upload)
        elif os.path.exists(processed_path):
            # with open(upload_path, 'r') as f:
            #     new_data = json.load(f)
            
            # with open(processed_path, 'r') as f:
            #     processed_data = json.load(f)
            
            # TODO we can use a better comparison method with exact comparison
            # if new_data != processed_data:
            #     unprocessed_uploads.append(upload)
            print(f"Upload {upload} is already in processed directory.")
        elif os.path.exists(live_path):
            with open(upload_path, 'r') as f:
                new_data = json.load(f)
            
            with open(live_path, 'r') as f:
                live_data = json.load(f)
            
            # if new_data != live_data:
            #     unprocessed_uploads.append(upload)
            print(f"Upload {upload} is already in live directory.")
        else:
            unprocessed_uploads.append(upload)

    print(f"Processing {len(unprocessed_uploads)} new uploads.")
    tasks = []
    for upload in tqdm(unprocessed_uploads):
        upload_path = os.path.join(upload_dir, upload)
        processed_path = os.path.join(processed_dir, upload)
        # tasks.append(process_single_upload(upload_path, processed_path)) # for async processing
        await process_single_upload(upload_path, processed_path)


    # await asyncio.gather(*tasks) # for async processing


async def process_single_upload(upload_path, processed_path):
    # Check the structure of the upload
    check_result = await check_upload_structure(upload_path)
    
    if check_result['is_valid']:
        # Process the file
        await process_upload(upload_path, processed_path)
        
        # Move the file to processed directory
        # await asyncio.to_thread(shutil.move, upload_path, processed_path)
        
    else:
        print(f"Upload check failed for {upload_path}: {check_result['message']}")

        
        
async def check_upload_structure(file_path):
    try:
        async with aiofiles.open(file_path, 'r') as f:
            data = json.loads(await f.read())
        
        # Check for required keys
        required_keys = ['config', 'results', 'raw_eval_results', 'raw_logging_results']
        missing_keys = [key for key in required_keys if key not in data]
        
        if missing_keys:
            return {'is_valid': False, 'message': f"Missing required keys: {', '.join(missing_keys)}"}
        
        # Check for specific structure in raw_logging_results
        if not isinstance(data['raw_logging_results'], list):
            return {'is_valid': False, 'message': "raw_logging_results should be a list"}
        
        for item in data['raw_logging_results']:
            if not all(key in item for key in ['weave_task_id', 'inputs', 'outputs']):
                return {'is_valid': False, 'message': "Each item in raw_logging_results should have weave_task_id, inputs, and outputs"}
        
        return {'is_valid': True, 'message': "File structure is valid"}

    except json.JSONDecodeError:
        return {'is_valid': False, 'message': "Invalid JSON format"}
    except Exception as e:
        return {'is_valid': False, 'message': f"Unexpected error: {str(e)}"}
    

async def process_upload(input_path, output_path):
    print(f"Processing {input_path}...")
    # load the file
    with open(input_path, 'r') as f:
        data = json.loads(f.read())

    assert 'raw_logging_results' in data, "raw_logging_results key not found in the file"
    openai_client = AsyncOpenAIClient(model="gpt-4o-mini")

    try:
        processed_calls = await analyze_agent_steps(data['raw_logging_results'], openai_client, llm_eval=True)
        failure_report = await analyze_agent_performance(data['raw_logging_results'], data['results']['failed_tasks'], openai_client)

        data['raw_logging_results'] = processed_calls
        data['failure_report'] = failure_report
    except Exception as e:
        traceback.print_exc()
        print(f"Error in processing: {str(e)}")
        return

    with open(output_path, 'w') as f:
        json.dump(data, f, indent=4)

    print(f"Processing of {input_path} successful. Results saved to {output_path}")

async def send_email_notification(filename, check_result, status):
    sender_email = "your_email@example.com"
    receiver_email = "receiver@example.com"
    password = "your_password"

    message = MIMEMultipart()
    message["From"] = sender_email
    message["To"] = receiver_email
    message["Subject"] = f"Upload Processing Notification: {filename}"

    body = f"""
    File: {filename}
    Status: {status}
    Check Result: {check_result['message']}
    """

    message.attach(MIMEText(body, "plain"))

    await aiosmtplib.send(
        message,
        hostname="smtp.gmail.com",
        port=465,
        use_tls=True,
        username=sender_email,
        password=password
    )