Joash commited on
Commit
395e49f
·
1 Parent(s): 6d59d74

Update complete app implementation for Spaces

Browse files
Files changed (1) hide show
  1. app.py +387 -2
app.py CHANGED
@@ -1,6 +1,6 @@
1
  import gradio as gr
2
- from transformers import AutoTokenizer, AutoModelForCausalLM
3
  import torch
 
4
  from huggingface_hub import login
5
  import os
6
  import logging
@@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
19
 
20
  # Environment variables
21
  HF_TOKEN = os.getenv("HUGGING_FACE_TOKEN")
22
- MODEL_NAME = os.getenv("MODEL_NAME", "google/gemma-2-2b-it") # Fixed model name
23
 
24
  # Login to Hugging Face with git credential
25
  if HF_TOKEN:
@@ -35,3 +35,388 @@ os.makedirs(DATA_DIR, exist_ok=True)
35
 
36
  # History file
37
  HISTORY_FILE = os.path.join(DATA_DIR, "review_history.json")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import gradio as gr
 
2
  import torch
3
+ from transformers import AutoTokenizer, AutoModelForCausalLM
4
  from huggingface_hub import login
5
  import os
6
  import logging
 
19
 
20
  # Environment variables
21
  HF_TOKEN = os.getenv("HUGGING_FACE_TOKEN")
22
+ MODEL_NAME = os.getenv("MODEL_NAME", "google/gemma-2-2b-it")
23
 
24
  # Login to Hugging Face with git credential
25
  if HF_TOKEN:
 
35
 
36
  # History file
37
  HISTORY_FILE = os.path.join(DATA_DIR, "review_history.json")
38
+
39
+ class Review:
40
+ def __init__(self, code: str, language: str, suggestions: str):
41
+ self.code = code
42
+ self.language = language
43
+ self.suggestions = suggestions
44
+ self.timestamp = datetime.now().isoformat()
45
+ self.response_time = 0.0
46
+
47
+ def to_dict(self):
48
+ return {
49
+ 'timestamp': self.timestamp,
50
+ 'language': self.language,
51
+ 'code': self.code,
52
+ 'suggestions': self.suggestions,
53
+ 'response_time': self.response_time
54
+ }
55
+
56
+ @classmethod
57
+ def from_dict(cls, data):
58
+ review = cls(data['code'], data['language'], data['suggestions'])
59
+ review.timestamp = data['timestamp']
60
+ review.response_time = data['response_time']
61
+ return review
62
+
63
+ class CodeReviewer:
64
+ def __init__(self):
65
+ self.model = None
66
+ self.tokenizer = None
67
+ self.device = None
68
+ self.review_history: List[Review] = []
69
+ self.metrics = {
70
+ 'total_reviews': 0,
71
+ 'avg_response_time': 0.0,
72
+ 'reviews_today': 0
73
+ }
74
+ self._initialized = False
75
+ self.load_history()
76
+
77
+ def load_history(self):
78
+ """Load review history from file."""
79
+ try:
80
+ if os.path.exists(HISTORY_FILE):
81
+ with open(HISTORY_FILE, 'r') as f:
82
+ data = json.load(f)
83
+ self.review_history = [Review.from_dict(r) for r in data['history']]
84
+ self.metrics = data['metrics']
85
+ logger.info(f"Loaded {len(self.review_history)} reviews from history")
86
+ except Exception as e:
87
+ logger.error(f"Error loading history: {e}")
88
+ # Initialize empty history if file doesn't exist or is corrupted
89
+ self.review_history = []
90
+ self.metrics = {
91
+ 'total_reviews': 0,
92
+ 'avg_response_time': 0.0,
93
+ 'reviews_today': 0
94
+ }
95
+
96
+ def save_history(self):
97
+ """Save review history to file."""
98
+ try:
99
+ data = {
100
+ 'history': [r.to_dict() for r in self.review_history],
101
+ 'metrics': self.metrics
102
+ }
103
+ # Ensure the directory exists
104
+ os.makedirs(os.path.dirname(HISTORY_FILE), exist_ok=True)
105
+ with open(HISTORY_FILE, 'w') as f:
106
+ json.dump(data, f)
107
+ logger.info("Saved review history")
108
+ except Exception as e:
109
+ logger.error(f"Error saving history: {e}")
110
+
111
+ @spaces.GPU
112
+ def ensure_initialized(self):
113
+ """Ensure model is initialized."""
114
+ if not self._initialized:
115
+ self.initialize_model()
116
+ self._initialized = True
117
+
118
+ def initialize_model(self):
119
+ """Initialize the model and tokenizer."""
120
+ try:
121
+ logger.info("Loading tokenizer...")
122
+ self.tokenizer = AutoTokenizer.from_pretrained(
123
+ MODEL_NAME,
124
+ token=HF_TOKEN,
125
+ trust_remote_code=True
126
+ )
127
+ special_tokens = {
128
+ 'pad_token': '[PAD]',
129
+ 'eos_token': '</s>',
130
+ 'bos_token': '<s>'
131
+ }
132
+ num_added = self.tokenizer.add_special_tokens(special_tokens)
133
+ logger.info(f"Added {num_added} special tokens")
134
+ logger.info("Tokenizer loaded successfully")
135
+
136
+ logger.info("Loading model...")
137
+ self.model = AutoModelForCausalLM.from_pretrained(
138
+ MODEL_NAME,
139
+ device_map="auto",
140
+ torch_dtype=torch.float16,
141
+ trust_remote_code=True,
142
+ low_cpu_mem_usage=True,
143
+ token=HF_TOKEN
144
+ )
145
+ if num_added > 0:
146
+ logger.info("Resizing model embeddings for special tokens")
147
+ self.model.resize_token_embeddings(len(self.tokenizer))
148
+
149
+ self.device = next(self.model.parameters()).device
150
+ logger.info(f"Model loaded successfully on {self.device}")
151
+ self._initialized = True
152
+ return True
153
+ except Exception as e:
154
+ logger.error(f"Error initializing model: {e}")
155
+ self._initialized = False
156
+ return False
157
+
158
+ def create_review_prompt(self, code: str, language: str) -> str:
159
+ """Create a structured prompt for code review."""
160
+ return f"""Review this {language} code. List specific points in these sections:
161
+ Issues:
162
+ Improvements:
163
+ Best Practices:
164
+ Security:
165
+
166
+ Code:
167
+ ```{language}
168
+ {code}
169
+ ```"""
170
+
171
+ @spaces.GPU
172
+ def review_code(self, code: str, language: str) -> str:
173
+ """Perform code review using the model."""
174
+ try:
175
+ if not self._initialized and not self.initialize_model():
176
+ return "Error: Model initialization failed. Please try again later."
177
+
178
+ start_time = datetime.now()
179
+ prompt = self.create_review_prompt(code, language)
180
+
181
+ try:
182
+ inputs = self.tokenizer(
183
+ prompt,
184
+ return_tensors="pt",
185
+ truncation=True,
186
+ max_length=512,
187
+ padding=True
188
+ )
189
+ if inputs is None:
190
+ raise ValueError("Failed to tokenize input")
191
+ inputs = inputs.to(self.device)
192
+ except Exception as token_error:
193
+ logger.error(f"Tokenization error: {token_error}")
194
+ return "Error: Failed to process input code. Please try again."
195
+
196
+ try:
197
+ with torch.no_grad():
198
+ outputs = self.model.generate(
199
+ **inputs,
200
+ max_new_tokens=512,
201
+ do_sample=True,
202
+ temperature=0.7,
203
+ top_p=0.95,
204
+ num_beams=1,
205
+ early_stopping=True,
206
+ pad_token_id=self.tokenizer.pad_token_id,
207
+ eos_token_id=self.tokenizer.eos_token_id
208
+ )
209
+ except Exception as gen_error:
210
+ logger.error(f"Generation error: {gen_error}")
211
+ return "Error: Failed to generate review. Please try again."
212
+
213
+ try:
214
+ response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
215
+ suggestions = response[len(prompt):].strip()
216
+ except Exception as decode_error:
217
+ logger.error(f"Decoding error: {decode_error}")
218
+ return "Error: Failed to decode model output. Please try again."
219
+
220
+ # Create and save review
221
+ end_time = datetime.now()
222
+ review = Review(code, language, suggestions)
223
+ review.response_time = (end_time - start_time).total_seconds()
224
+
225
+ # Update metrics first
226
+ self.metrics['total_reviews'] += 1
227
+ total_time = self.metrics['avg_response_time'] * (self.metrics['total_reviews'] - 1)
228
+ total_time += review.response_time
229
+ self.metrics['avg_response_time'] = total_time / self.metrics['total_reviews']
230
+
231
+ today = datetime.now().date()
232
+
233
+ # Add review to history
234
+ self.review_history.append(review)
235
+
236
+ # Update today's reviews count
237
+ self.metrics['reviews_today'] = sum(
238
+ 1 for r in self.review_history
239
+ if datetime.fromisoformat(r.timestamp).date() == today
240
+ )
241
+
242
+ # Save to file
243
+ self.save_history()
244
+
245
+ if self.device and self.device.type == "cuda":
246
+ del inputs, outputs
247
+ torch.cuda.empty_cache()
248
+
249
+ return suggestions
250
+
251
+ except Exception as e:
252
+ logger.error(f"Error during code review: {e}")
253
+ return f"Error performing code review: {str(e)}"
254
+
255
+ def update_metrics(self, review: Review):
256
+ """Update metrics with new review."""
257
+ self.metrics['total_reviews'] += 1
258
+
259
+ total_time = self.metrics['avg_response_time'] * (self.metrics['total_reviews'] - 1)
260
+ total_time += review.response_time
261
+ self.metrics['avg_response_time'] = total_time / self.metrics['total_reviews']
262
+
263
+ today = datetime.now().date()
264
+ self.metrics['reviews_today'] = sum(
265
+ 1 for r in self.review_history
266
+ if datetime.fromisoformat(r.timestamp).date() == today
267
+ )
268
+
269
+ def get_history(self) -> List[Dict]:
270
+ """Get formatted review history."""
271
+ return [
272
+ {
273
+ 'timestamp': r.timestamp,
274
+ 'language': r.language,
275
+ 'code': r.code,
276
+ 'suggestions': r.suggestions,
277
+ 'response_time': f"{r.response_time:.2f}s"
278
+ }
279
+ for r in reversed(self.review_history[-10:])
280
+ ]
281
+
282
+ def get_metrics(self) -> Dict:
283
+ """Get current metrics."""
284
+ return {
285
+ 'Total Reviews': self.metrics['total_reviews'],
286
+ 'Average Response Time': f"{self.metrics['avg_response_time']:.2f}s",
287
+ 'Reviews Today': self.metrics['reviews_today'],
288
+ 'Device': str(self.device) if self.device else "Not initialized"
289
+ }
290
+
291
+ # Initialize reviewer
292
+ reviewer = CodeReviewer()
293
+
294
+ # Create Gradio interface
295
+ with gr.Blocks(theme=gr.themes.Soft()) as iface:
296
+ gr.Markdown("# Code Review Assistant")
297
+ gr.Markdown("An automated code review system powered by Gemma-2b")
298
+
299
+ with gr.Tabs():
300
+ with gr.Tab("Review Code"):
301
+ with gr.Row():
302
+ with gr.Column():
303
+ code_input = gr.Textbox(
304
+ lines=10,
305
+ placeholder="Enter your code here...",
306
+ label="Code"
307
+ )
308
+ language_input = gr.Dropdown(
309
+ choices=["python", "javascript", "java", "cpp", "typescript", "go", "rust"],
310
+ value="python",
311
+ label="Language"
312
+ )
313
+ submit_btn = gr.Button("Submit for Review", variant="primary")
314
+ with gr.Column():
315
+ output = gr.Textbox(
316
+ label="Review Results",
317
+ lines=10
318
+ )
319
+
320
+ with gr.Tab("History"):
321
+ with gr.Row():
322
+ refresh_history = gr.Button("Refresh History", variant="secondary")
323
+ history_output = gr.Textbox(
324
+ label="Review History",
325
+ lines=20,
326
+ value="Click 'Refresh History' to view review history"
327
+ )
328
+
329
+ with gr.Tab("Metrics"):
330
+ with gr.Row():
331
+ refresh_metrics = gr.Button("Refresh Metrics", variant="secondary")
332
+ metrics_output = gr.JSON(
333
+ label="Performance Metrics"
334
+ )
335
+
336
+ @spaces.GPU
337
+ def review_code_interface(code: str, language: str) -> str:
338
+ if not code.strip():
339
+ return "Please enter some code to review."
340
+ try:
341
+ reviewer.ensure_initialized()
342
+ result = reviewer.review_code(code, language)
343
+ return result
344
+ except Exception as e:
345
+ logger.error(f"Interface error: {e}")
346
+ return f"Error: {str(e)}"
347
+
348
+ def get_history_interface() -> str:
349
+ try:
350
+ history = reviewer.get_history()
351
+ if not history:
352
+ return "No reviews yet."
353
+ result = ""
354
+ for review in history:
355
+ result += f"Time: {review['timestamp']}\n"
356
+ result += f"Language: {review['language']}\n"
357
+ result += f"Response Time: {review['response_time']}\n"
358
+ result += "Code:\n```\n" + review['code'] + "\n```\n"
359
+ result += "Suggestions:\n" + review['suggestions'] + "\n"
360
+ result += "-" * 80 + "\n\n"
361
+ return result
362
+ except Exception as e:
363
+ logger.error(f"History error: {e}")
364
+ return "Error retrieving history"
365
+
366
+ def get_metrics_interface() -> Dict:
367
+ try:
368
+ metrics = reviewer.get_metrics()
369
+ if not metrics:
370
+ return {
371
+ 'Total Reviews': 0,
372
+ 'Average Response Time': '0.00s',
373
+ 'Reviews Today': 0,
374
+ 'Device': str(reviewer.device) if reviewer.device else "Not initialized"
375
+ }
376
+ return metrics
377
+ except Exception as e:
378
+ logger.error(f"Metrics error: {e}")
379
+ return {"error": str(e)}
380
+
381
+ def update_all_outputs(code: str, language: str) -> tuple:
382
+ """Update all outputs after code review."""
383
+ result = review_code_interface(code, language)
384
+ history = get_history_interface()
385
+ metrics = get_metrics_interface()
386
+ return result, history, metrics
387
+
388
+ # Connect the interface
389
+ submit_btn.click(
390
+ update_all_outputs,
391
+ inputs=[code_input, language_input],
392
+ outputs=[output, history_output, metrics_output]
393
+ )
394
+
395
+ refresh_history.click(
396
+ get_history_interface,
397
+ outputs=history_output
398
+ )
399
+
400
+ refresh_metrics.click(
401
+ get_metrics_interface,
402
+ outputs=metrics_output
403
+ )
404
+
405
+ # Add example inputs
406
+ gr.Examples(
407
+ examples=[
408
+ ["""def add_numbers(a, b):
409
+ return a + b""", "python"],
410
+ ["""function calculateSum(numbers) {
411
+ let sum = 0;
412
+ for(let i = 0; i < numbers.length; i++) {
413
+ sum += numbers[i];
414
+ }
415
+ return sum;
416
+ }""", "javascript"]
417
+ ],
418
+ inputs=[code_input, language_input]
419
+ )
420
+
421
+ if __name__ == "__main__":
422
+ iface.launch()