asjad321 commited on
Commit
4df2ab3
·
verified ·
1 Parent(s): 9dc0e15

Upload 4 files

Browse files
Files changed (4) hide show
  1. app.py +21 -0
  2. parameters.py +2 -0
  3. requirements.txt +0 -0
  4. utils.py +214 -0
app.py ADDED
@@ -0,0 +1,21 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import utils
2
+ import gradio as gr
3
+ import parameters
4
+
5
+ with gr.Blocks() as demo:
6
+ # gr.Tab(label="Error Metrics Dashboard")
7
+ gr.Markdown("# Error Metrics Dashboard")
8
+ with gr.Row():
9
+ project = gr.Textbox("Project Name", value= parameters.project)
10
+ start_date = gr.Textbox(label="Start Date (YYYY-MM-DD)", value="2025-02-20")
11
+ end_date = gr.Textbox(label="End Date (YYYY-MM-DD)", value="2025-02-21")
12
+ submit_btn = gr.Button("Process")
13
+ output = gr.Textbox(label="Results", lines=10)
14
+
15
+ submit_btn.click(
16
+ fn=utils.process_dates,
17
+ inputs=[start_date, end_date,project],
18
+ outputs=output
19
+ )
20
+
21
+ demo.launch(share = True)
parameters.py ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ api_key = '9tcxDNK6Wtfmf5C5ogWpqK7tr'
2
+ project ='vf-ai-prod'
requirements.txt ADDED
Binary file (3.37 kB). View file
 
utils.py ADDED
@@ -0,0 +1,214 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import datetime
2
+ import json
3
+ import os
4
+ from opik import Opik
5
+ import parameters
6
+ from collections import defaultdict
7
+ from concurrent.futures import ThreadPoolExecutor, as_completed
8
+
9
+
10
+ class DateTimeEncoder(json.JSONEncoder):
11
+ def default(self, obj):
12
+ if isinstance(obj, datetime.datetime):
13
+ return obj.isoformat()
14
+ return super().default(obj)
15
+
16
+ def get_trace_content(opik, trace_id):
17
+ try:
18
+ trace_content = opik.get_trace_content(trace_id)
19
+ return trace_content.dict()
20
+ except Exception as e:
21
+ print(f"Error getting trace content {trace_id}: {e}")
22
+ return None
23
+
24
+ def get_span_content(opik, trace_id, span):
25
+ try:
26
+ content = opik.get_span_content(span.id)
27
+ return {"trace_id": trace_id, "span_id": span.id, "content": content.dict()}
28
+ except Exception as e:
29
+ print(f"Error getting span content {span.id}: {e}")
30
+ return None
31
+
32
+ def get_traces_on_date(start_date_str, end_date_str, project_name, api_key,max_workers=10):
33
+ try:
34
+ print("Step 1: Converting date strings")
35
+ date = datetime.date.fromisoformat(start_date_str)
36
+ start_date_str = date.isoformat() + "T00:00:00Z"
37
+
38
+ if not end_date_str:
39
+ end_date = date + datetime.timedelta(days=1)
40
+ end_date_str = end_date.isoformat() + "T00:00:00Z"
41
+ else:
42
+ end_date = datetime.date.fromisoformat(end_date_str)
43
+ end_date_str = end_date.isoformat() + "T00:00:00Z"
44
+
45
+ print(f"Start: {start_date_str} and end: {end_date_str}")
46
+ filter_string = f'start_time >= "{start_date_str}" and end_time <= "{end_date_str}"'
47
+ print("Filter string: ", filter_string)
48
+
49
+ print("Step 2: Initializing Opik client")
50
+ try:
51
+ opik = Opik(api_key=api_key, project_name=project_name, workspace='verba-tech-ninja')
52
+ print("Opik client initialized successfully")
53
+ except Exception as e:
54
+ print(f"Error initializing Opik client: {e}")
55
+ return [], []
56
+
57
+ print("Step 3: Searching traces")
58
+ try:
59
+ traces = opik.search_traces(filter_string=filter_string, project_name=project_name)
60
+ print("Total searches: ", len(traces))
61
+ except Exception as e:
62
+ print(f"Error searching traces: {e}")
63
+ return [], []
64
+
65
+ print("Step 4: Processing traces in parallel")
66
+ all_traces_content = []
67
+ try:
68
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
69
+ future_to_trace = {executor.submit(get_trace_content, opik, trace.id): trace for trace in traces}
70
+ for future in as_completed(future_to_trace):
71
+ result = future.result()
72
+ if result:
73
+ all_traces_content.append(result)
74
+ print(f"Completed processing {len(all_traces_content)} traces")
75
+ except Exception as e:
76
+ print(f"Error processing traces in parallel: {e}")
77
+
78
+ print("Step 5: Processing spans in parallel")
79
+ all_spans_content = []
80
+ try:
81
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
82
+ future_to_span = {}
83
+ for i, trace in enumerate(traces):
84
+ try:
85
+ print(f"Searching spans for trace_id: {trace.id}:{i+1}/{len(traces)}")
86
+ spans = opik.search_spans(project_name=parameters.project, trace_id=trace.id)
87
+ print(f"Found {len(spans)} spans for trace_id: {trace.id}")
88
+ for span in spans:
89
+ future_to_span[executor.submit(get_span_content, opik, trace.id, span)] = span
90
+ except Exception as e:
91
+ print(f"Error searching spans for trace {trace.id}: {e}")
92
+
93
+ for future in as_completed(future_to_span):
94
+ result = future.result()
95
+ if result:
96
+ all_spans_content.append(result)
97
+ print(f"Completed processing {len(all_spans_content)} spans")
98
+ except Exception as e:
99
+ print(f"Error processing spans in parallel: {e}")
100
+
101
+ print("Step 6: Saving to JSON files")
102
+ traces_file = 'all_traces_content.json'
103
+ spans_file = 'all_spans_content.json'
104
+ try:
105
+ if os.path.exists(traces_file):
106
+ os.remove(traces_file)
107
+ print(f"Removed existing {traces_file}")
108
+ if os.path.exists(spans_file):
109
+ os.remove(spans_file)
110
+ print(f"Removed existing {spans_file}")
111
+
112
+ print(f"Writing {len(all_traces_content)} traces to {traces_file}")
113
+ with open(traces_file, 'w') as f:
114
+ json.dump(all_traces_content, f, indent=2, cls=DateTimeEncoder)
115
+ print(f"Saved traces to {traces_file}")
116
+
117
+ print(f"Writing {len(all_spans_content)} spans to {spans_file}")
118
+ with open(spans_file, 'w') as f:
119
+ json.dump(all_spans_content, f, indent=2, cls=DateTimeEncoder)
120
+ print(f"Saved spans to {spans_file}")
121
+ except Exception as e:
122
+ print(f"Error saving to JSON files: {e}")
123
+ with open('partial_traces_content.json', 'w') as f:
124
+ json.dump(all_traces_content, f, indent=2, cls=DateTimeEncoder)
125
+ with open('partial_spans_content.json', 'w') as f:
126
+ json.dump(all_spans_content, f, indent=2, cls=DateTimeEncoder)
127
+ print("Saved partial data to partial_traces_content.json and partial_spans_content.json")
128
+
129
+ print("Step 7: Returning results")
130
+ return all_traces_content, all_spans_content
131
+
132
+ except Exception as e:
133
+ print(f"Main function error: {e}")
134
+ return [], []
135
+
136
+ def find_errors_and_metrics(traces, spans):
137
+ try:
138
+ print("Step 8: Analyzing outputs for errors")
139
+ error_spans = []
140
+ error_metrics = defaultdict(list)
141
+
142
+
143
+ for span in spans:
144
+ content = span['content']
145
+ output = content.get("output")
146
+ if isinstance(output, dict) and 'output' in output:
147
+ output_value = output.get("output")
148
+ else:
149
+ output_value = output
150
+ # print(f"Span output_value: {output_value}")
151
+ if (output_value is None or (isinstance(output, list) and len(output) ==0)):
152
+ # print(output_value)
153
+ error_spans.append({
154
+ "trace_id": span["trace_id"],
155
+ "span_id": span["span_id"],
156
+ "error_content": output
157
+ })
158
+ error_metrics["empty_output"].append(span["trace_id"])
159
+
160
+ print(f"Found {len(error_spans)} outputs with errors (empty/null)")
161
+
162
+ print("Step 9: Saving error spans")
163
+ error_file = 'error_spans.json'
164
+ try:
165
+ if os.path.exists(error_file):
166
+ os.remove(error_file)
167
+ print(f"Removed existing {error_file}")
168
+ print(f"Writing {len(error_spans)} error outputs to {error_file}")
169
+ with open(error_file, 'w') as f:
170
+ json.dump(error_spans, f, indent=2, cls=DateTimeEncoder)
171
+ print(f"Saved error outputs to {error_file}")
172
+ except Exception as e:
173
+ print(f"Error saving error spans: {e}")
174
+
175
+ print("Step 10: Calculating metrics")
176
+ metrics = {
177
+ "total_errors": len(error_spans),
178
+ "unique_error_types": {
179
+ error_type: {
180
+ "count": len(set(trace_ids)),
181
+ "trace_ids": list(set(trace_ids))
182
+ }
183
+ for error_type, trace_ids in error_metrics.items()
184
+ }
185
+ }
186
+ print(f"Metrics calculated: {len(metrics['unique_error_types'])} unique error types")
187
+ return metrics
188
+
189
+ except Exception as e:
190
+ print(f"Error in find_errors_and_metrics: {e}")
191
+ return {}
192
+
193
+ def process_dates(start_date, end_date,project):
194
+ try:
195
+ print("Pipeline Start: Processing dates")
196
+ traces, spans = get_traces_on_date(start_date, end_date, project, parameters.api_key)
197
+ metrics = find_errors_and_metrics(traces, spans)
198
+
199
+ output = f"Total Empty/Null Outputs Found: {metrics.get('total_errors', 0)}\n\n"
200
+ output += f"Total Traces Found: {len(traces)}\n"
201
+ output += f"Total Spans Processed: {len(spans)}\n\n"
202
+ output += "Error Metrics:\n"
203
+ if metrics.get('unique_error_types', {}):
204
+ for error_type, data in metrics.get('unique_error_types', {}).items():
205
+ output += f"{error_type.replace('_', ' ').title()}: {data['count']} unique occurrences\n"
206
+ output += f"Trace IDs: {', '.join(data['trace_ids'][:5])}{'...' if len(data['trace_ids']) > 5 else ''}\n\n"
207
+ else:
208
+ output += "No empty or null outputs detected.\n"
209
+
210
+ print("Pipeline End: Results formatted")
211
+ return output
212
+ except Exception as e:
213
+ print(f"Error processing dates: {e}")
214
+ return f"Error processing dates: {e}"