File size: 4,316 Bytes
45b3261
eb1168d
4d6e8c2
 
 
 
 
 
 
45b3261
 
 
4d6e8c2
 
45b3261
1c33274
70f5f26
eb1168d
4d6e8c2
 
70f5f26
4d6e8c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1358711
 
eb1168d
 
1358711
eb1168d
 
1358711
 
 
4d6e8c2
 
 
70f5f26
45b3261
 
 
 
 
a065296
eb1168d
45b3261
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4d6e8c2
45b3261
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from fastapi import APIRouter
from datetime import datetime
from datasets import load_dataset
from sklearn.metrics import accuracy_score

from .utils.evaluation import TextEvaluationRequest
from .utils.emissions import tracker, clean_emissions_data, get_space_info

import numpy as np
import torch

router = APIRouter()

DESCRIPTION = "FrugalDisinfoHunter Model"
ROUTE = "/text"

@router.post(ROUTE, tags=["Text Task"], description=DESCRIPTION)
async def evaluate_text(request: TextEvaluationRequest):
    """
    Evaluate text classification for climate disinformation detection.
    """
    # Get space info
    username, space_url = get_space_info()

    # Define the label mapping
    LABEL_MAPPING = {
        "0_not_relevant": 0,
        "1_not_happening": 1,
        "2_not_human": 2,
        "3_not_bad": 3,
        "4_solutions_harmful_unnecessary": 4,
        "5_science_unreliable": 5,
        "6_proponents_biased": 6,
        "7_fossil_fuels_needed": 7
    }

    # Load and prepare the dataset
    dataset = load_dataset(request.dataset_name)

    # Convert string labels to integers
    dataset = dataset.map(lambda x: {"label": LABEL_MAPPING[x["label"]]})

    # Split dataset
    train_test = dataset["train"].train_test_split(test_size=request.test_size, seed=request.test_seed)
    test_dataset = train_test["test"]
    
    # Start tracking emissions
    tracker.start()
    tracker.start_task("inference")

    try:
        # Model configuration
        model_name = "Zen0/FrugalDisinfoHunter"  # Model path
        tokenizer_name = "google/mobilebert-uncased"  # Base MobileBERT tokenizer
        BATCH_SIZE = 32  # Batch size for efficient processing
        MAX_LENGTH = 512  # Maximum sequence length

        # Initialize model and tokenizer
        model = AutoModelForSequenceClassification.from_pretrained(
            model_name,
            num_labels=8,
            output_hidden_states=True,
            problem_type="single_label_classification"
        )
        tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)

        # Move model to appropriate device
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model = model.to(device)
        model.eval()  # Set model to evaluation mode

        # Get test texts
        test_texts = test_dataset["quote"]
        predictions = []

        # Process in batches
        for i in range(0, len(test_texts), BATCH_SIZE):
            batch_texts = test_texts[i:i + BATCH_SIZE]
            
            # Tokenize batch
            inputs = tokenizer(
                batch_texts,
                padding=True,
                truncation=True,
                return_tensors="pt",
                max_length=MAX_LENGTH
            )
            
            # Move inputs to device
            inputs = {key: val.to(device) for key, val in inputs.items()}

            # Run inference
            with torch.no_grad():
                outputs = model(**inputs)
                batch_preds = torch.argmax(outputs.logits, dim=1)
                predictions.extend(batch_preds.cpu().numpy())

        # Get true labels
        true_labels = test_dataset['label']

        # Stop tracking emissions
        emissions_data = tracker.stop_task()
        
        # Calculate accuracy
        accuracy = accuracy_score(true_labels, predictions)
        
        # Prepare results dictionary
        results = {
            "username": username,
            "space_url": space_url,
            "submission_timestamp": datetime.now().isoformat(),
            "model_description": DESCRIPTION,
            "accuracy": float(accuracy),
            "energy_consumed_wh": emissions_data.energy_consumed * 1000,
            "emissions_gco2eq": emissions_data.emissions * 1000,
            "emissions_data": clean_emissions_data(emissions_data),
            "api_route": ROUTE,
            "dataset_config": {
                "dataset_name": request.dataset_name,
                "test_size": request.test_size,
                "test_seed": request.test_seed
            }
        }
        
        return results
        
    except Exception as e:
        # Stop tracking in case of error
        tracker.stop_task()
        raise e