|
from datetime import datetime |
|
import random |
|
|
|
import numpy as np |
|
import torch |
|
from torch import nn |
|
from torch.utils.data import DataLoader, Dataset |
|
from transformers import AutoModel, AutoModelForSequenceClassification, AutoTokenizer |
|
from fastapi import APIRouter |
|
from datasets import load_dataset |
|
from sklearn.metrics import accuracy_score |
|
from skops.io import load |
|
|
|
from .utils.evaluation import TextEvaluationRequest |
|
from .utils.emissions import tracker, clean_emissions_data, get_space_info |
|
from .utils.text_preprocessor import preprocess |
|
from accelerate.test_utils.testing import get_backend |
|
from .custom_classifiers import SentenceBERTClassifier, MoEClassifier |
|
|
|
router = APIRouter() |
|
|
|
DESCRIPTION = "Random Baseline" |
|
ROUTE = "/text" |
|
|
|
MODEL_DESCRIPTIONS = { |
|
"baseline": "random baseline", |
|
"tfidf_xgb": "TF-IDF vectorizer and XGBoost classifier", |
|
"bert_base_pruned": "Pruned BERT base model", |
|
|
|
"sbert_distilroberta": "Fine-tuned sentence transformer DistilRoBERTa", |
|
"embedding_moe": "Mixture of expert classifier with DistilBERT Embeddings" |
|
} |
|
|
|
|
|
def baseline_model(dataset_length: int): |
|
|
|
predictions = [random.randint(0, 7) for _ in range(dataset_length)] |
|
|
|
return predictions |
|
|
|
def tree_classifier(test_dataset: dict, model: str): |
|
print("Starting tree model run") |
|
|
|
texts = test_dataset["quote"] |
|
|
|
texts = preprocess(texts) |
|
|
|
model_path = f"tasks/text_models/{model}.skops" |
|
|
|
model = load(model_path, |
|
trusted=[ |
|
'scipy.sparse._csr.csr_matrix', |
|
'xgboost.core.Booster', |
|
'xgboost.sklearn.XGBClassifier']) |
|
|
|
predictions = model.predict(texts) |
|
print("Finished tree model run") |
|
|
|
|
|
return predictions |
|
|
|
class TextDataset(Dataset): |
|
def __init__(self, texts, tokenizer, max_length=512): |
|
self.texts = texts |
|
self.tokenized_texts = tokenizer( |
|
texts, |
|
truncation=True, |
|
padding=True, |
|
max_length=max_length, |
|
return_tensors="pt", |
|
) |
|
|
|
def __getitem__(self, idx): |
|
item = {key: val[idx] for key, val in self.tokenized_texts.items()} |
|
return item |
|
|
|
def __len__(self) -> int: |
|
return len(self.texts) |
|
|
|
|
|
def bert_classifier(test_dataset: dict, model: str): |
|
print("Starting BERT model run") |
|
texts = test_dataset["quote"] |
|
|
|
model_repo = f"theterryzhang/frugal_ai_{model}" |
|
|
|
tokenizer = AutoTokenizer.from_pretrained(model_repo) |
|
|
|
if model in ["bert_base_pruned"]: |
|
model = AutoModelForSequenceClassification.from_pretrained(model_repo) |
|
elif model in ["sbert_distilroberta"]: |
|
model = SentenceBERTClassifier.from_pretrained(model_repo) |
|
else: |
|
raise(ValueError) |
|
|
|
device, _, _ = get_backend() |
|
|
|
model = model.to(device) |
|
|
|
|
|
dataset = TextDataset(texts, tokenizer=tokenizer) |
|
dataloader = DataLoader(dataset, batch_size=32, shuffle=False) |
|
|
|
model.eval() |
|
with torch.no_grad(): |
|
predictions = np.array([]) |
|
for batch in dataloader: |
|
test_input_ids = batch["input_ids"].to(device) |
|
test_attention_mask = batch["attention_mask"].to(device) |
|
outputs = model(test_input_ids, test_attention_mask) |
|
p = torch.argmax(outputs.logits, dim=1) |
|
predictions = np.append(predictions, p.cpu().numpy()) |
|
|
|
print("Finished BERT model run") |
|
|
|
return predictions |
|
|
|
def moe_classifier(test_dataset: dict, model: str): |
|
print("Starting MoE run") |
|
|
|
device, _, _ = get_backend() |
|
|
|
texts = test_dataset["quote"] |
|
|
|
model_path = f"tasks/text_models/0131_MoE_final.pt" |
|
|
|
embedding_model = AutoModel.from_pretrained("sentence-transformers/all-distilroberta-v1") |
|
embedding_model = embedding_model.to(device) |
|
tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-distilroberta-v1") |
|
|
|
dataset = TextDataset(texts, tokenizer=tokenizer, max_length=512) |
|
dataloader = DataLoader(dataset, batch_size=64, shuffle=False) |
|
|
|
model = MoEClassifier(3, 0.05) |
|
model.load_state_dict(torch.load(model_path)) |
|
model = model.to(device) |
|
|
|
print("Starting MoE Classifier") |
|
|
|
model.eval() |
|
with torch.no_grad(): |
|
predictions = np.array([]) |
|
for batch in dataloader: |
|
input_ids = batch['input_ids'].to(device) |
|
attn_mask = batch['attention_mask'].to(device) |
|
embedding_outputs = embedding_model(input_ids, attn_mask) |
|
embeddings = embedding_outputs.last_hidden_state[:, 0, :] |
|
|
|
outputs = model(embeddings) |
|
p = torch.argmax(outputs, dim=1) |
|
predictions = np.append(predictions, p.cpu().numpy()) |
|
|
|
print("Finished running MoE Classifier") |
|
|
|
return predictions |
|
|
|
@router.post(ROUTE, tags=["Text Task"]) |
|
async def evaluate_text(request: TextEvaluationRequest, |
|
model: str = "embedding_moe"): |
|
""" |
|
Evaluate text classification for climate disinformation detection. |
|
|
|
Current Model: Random Baseline |
|
- Makes random predictions from the label space (0-7) |
|
- Used as a baseline for comparison |
|
""" |
|
|
|
username, space_url = get_space_info() |
|
|
|
|
|
LABEL_MAPPING = { |
|
"0_not_relevant": 0, |
|
"1_not_happening": 1, |
|
"2_not_human": 2, |
|
"3_not_bad": 3, |
|
"4_solutions_harmful_unnecessary": 4, |
|
"5_science_unreliable": 5, |
|
"6_proponents_biased": 6, |
|
"7_fossil_fuels_needed": 7 |
|
} |
|
|
|
|
|
dataset = load_dataset(request.dataset_name) |
|
|
|
|
|
dataset = dataset.map(lambda x: {"label": LABEL_MAPPING[x["label"]]}) |
|
|
|
|
|
train_test = dataset["train"].train_test_split(test_size=request.test_size, seed=request.test_seed) |
|
test_dataset = train_test["test"] |
|
|
|
|
|
tracker.start() |
|
tracker.start_task("inference") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
true_labels = test_dataset["label"] |
|
|
|
if model == "baseline": |
|
predictions = baseline_model(len(true_labels)) |
|
elif model == "tfidf_xgb": |
|
predictions = tree_classifier(test_dataset, model='xgb_pipeline') |
|
elif 'bert' in model: |
|
predictions = bert_classifier(test_dataset, model) |
|
elif 'moe' in model: |
|
predictions = moe_classifier(test_dataset, model) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
emissions_data = tracker.stop_task() |
|
|
|
|
|
accuracy = accuracy_score(true_labels, predictions) |
|
|
|
|
|
results = { |
|
"username": username, |
|
"space_url": space_url, |
|
"submission_timestamp": datetime.now().isoformat(), |
|
"model_description": MODEL_DESCRIPTIONS[model], |
|
"accuracy": float(accuracy), |
|
"energy_consumed_wh": emissions_data.energy_consumed * 1000, |
|
"emissions_gco2eq": emissions_data.emissions * 1000, |
|
"emissions_data": clean_emissions_data(emissions_data), |
|
"api_route": ROUTE, |
|
"dataset_config": { |
|
"dataset_name": request.dataset_name, |
|
"test_size": request.test_size, |
|
"test_seed": request.test_seed |
|
} |
|
} |
|
|
|
return results |