Spaces:
Running
Running
# app.py | |
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification | |
from sklearn.ensemble import RandomForestClassifier | |
import joblib | |
import os | |
# Step 1: Load Hugging Face model for anomaly detection | |
# Using the "huggingface-course/distilbert-base-uncased-finetuned-imdb" model | |
tokenizer = AutoTokenizer.from_pretrained("huggingface-course/distilbert-base-uncased-finetuned-imdb") | |
model = AutoModelForSequenceClassification.from_pretrained("huggingface-course/distilbert-base-uncased-finetuned-imdb") | |
anomaly_detection = pipeline("text-classification", model=model, tokenizer=tokenizer) | |
# Step 2: Train or Load the Random Forest model for failure prediction | |
if not os.path.exists('failure_prediction_model.pkl'): | |
# Sample data (replace this with real Cisco device metrics data) | |
data = pd.DataFrame({ | |
'cpu_usage': [10, 20, 15, 35, 55], | |
'memory_usage': [30, 60, 45, 50, 80], | |
'error_rate': [0, 1, 0, 2, 5], | |
'failure': [0, 1, 0, 1, 1] # 0 = no failure, 1 = failure | |
}) | |
# Features and target | |
X = data[['cpu_usage', 'memory_usage', 'error_rate']] | |
y = data['failure'] | |
# Train the Random Forest model | |
failure_prediction_model = RandomForestClassifier(n_estimators=100, random_state=42) | |
failure_prediction_model.fit(X, y) | |
# Save the model for future use | |
joblib.dump(failure_prediction_model, 'failure_prediction_model.pkl') | |
else: | |
# Load the trained model from file | |
failure_prediction_model = joblib.load('failure_prediction_model.pkl') | |
# Step 3: Define function to preprocess logs for anomaly detection | |
def preprocess_logs(logs): | |
logs['timestamp'] = pd.to_datetime(logs['timestamp']) | |
logs['log_message'] = logs['log_message'].str.lower() # Convert log messages to lowercase for uniformity | |
return logs | |
# Step 4: Function to detect anomalies in logs | |
def detect_anomaly(logs): | |
preprocessed_logs = preprocess_logs(logs) | |
results = [] | |
for log in preprocessed_logs['log_message']: | |
anomaly_result = anomaly_detection(log) # Use Hugging Face pipeline for anomaly detection | |
results.append(anomaly_result[0]['label']) # Append label (e.g., "POSITIVE" or "NEGATIVE") | |
return results | |
# Step 5: Function to predict failures based on device metrics | |
def predict_failure(device_metrics): | |
# Convert device metrics into a numpy array for prediction | |
metrics_array = np.array([device_metrics['cpu_usage'], device_metrics['memory_usage'], device_metrics['error_rate']]).reshape(1, -1) | |
failure_prediction = failure_prediction_model.predict(metrics_array) # Use the Random Forest model for failure prediction | |
return failure_prediction | |
# Step 6: Function to process logs and predict both anomalies and failures | |
def process_logs_and_predict(log_file, metrics): | |
logs = pd.read_json(log_file) # Load logs from the uploaded JSON file | |
anomalies = detect_anomaly(logs) # Detect anomalies in logs | |
failure_pred = predict_failure(metrics) # Predict failures using device metrics | |
return f"Anomalies Detected: {anomalies}, Failure Prediction: {failure_pred}" | |
# Step 7: Set up Gradio interface for uploading logs and metrics for prediction | |
iface = gr.Interface(fn=process_logs_and_predict, | |
inputs=["file", "json"], | |
outputs="text", | |
title="Cisco Device Monitoring", | |
description="Upload log files to detect anomalies and predict potential device failures.") | |
# Launch the Gradio interface | |
iface.launch() | |