File size: 8,803 Bytes
9522dad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8f39ff5
 
 
 
 
9522dad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
01a20f5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# app.py
import gradio as gr
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import spacy
import textstat
from nltk.tokenize import word_tokenize
import nltk
import re
import joblib
from transformers import BertTokenizerFast, BertForSequenceClassification
from sentence_transformers import SentenceTransformer

# --- 1. SETUP: Constants and Model Loading ---
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
FINETUNE_MODEL_NAME = 'bert-base-uncased'
MAX_LEN_BERT = 128
print(f"Using device: {DEVICE}")
NLP = spacy.load('en_core_web_sm', disable=['ner'])
SCALER = joblib.load('scaler_mlp_discrete.joblib')

# --- (Re)Define the PyTorch MLP Model Class ---
class AdvancedMLP(nn.Module):
    # ... (This class is correct, no changes needed)
    def __init__(self, input_dim, num_classes=2):
        super(AdvancedMLP, self).__init__()
        self.layer_1 = nn.Linear(input_dim, 512)
        self.relu1 = nn.ReLU()
        self.batchnorm1 = nn.BatchNorm1d(512)
        self.dropout1 = nn.Dropout(0.3)
        self.layer_2 = nn.Linear(512, 128)
        self.relu2 = nn.ReLU()
        self.batchnorm2 = nn.BatchNorm1d(128)
        self.dropout2 = nn.Dropout(0.3)
        self.output_layer = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.layer_1(x); x = self.relu1(x); x = self.batchnorm1(x); x = self.dropout1(x)
        x = self.layer_2(x); x = self.relu2(x); x = self.batchnorm2(x); x = self.dropout2(x)
        x = self.output_layer(x)
        return x

# --- Load All Models and Artifacts ---
print("Loading models and artifacts...")
try:
    nltk.download('punkt', quiet=True)
    nltk.download('punkt_tab', quiet=True)

    TOKENIZER = BertTokenizerFast.from_pretrained(FINETUNE_MODEL_NAME)
    
    bert_for_seq_clf = BertForSequenceClassification.from_pretrained(FINETUNE_MODEL_NAME, num_labels=2)
    # NOTE: Ensure you have the correct file for the best BERT model. The user provided 'fold_4'.
    bert_for_seq_clf.load_state_dict(torch.load("best_bert_finetuned_fold_4.bin", map_location=DEVICE))
    BERT_EMBEDDING_MODEL = bert_for_seq_clf.bert.to(DEVICE).eval()

    INPUT_DIM_MLP = 768 + 19 
    MLP_MODEL = AdvancedMLP(input_dim=INPUT_DIM_MLP).to(DEVICE)
    MLP_MODEL.load_state_dict(torch.load("best_mlp_combined_features_ZuCo.bin", map_location=DEVICE))
    MLP_MODEL.eval()

    NLP = spacy.load('en_core_web_sm', disable=['ner'])

    # NOTE: Ensure this filename matches the scaler you saved.
    SCALER = joblib.load('scaler_mlp_discrete.joblib')
    
    print("All models and artifacts loaded successfully.")

except FileNotFoundError as e:
    print(f"ERROR: A required file was not found: {e.name}")
    print("Please ensure 'best_bert_finetuned_fold_4.bin', 'best_mlp_combined_features_ZuCo.bin', and 'scaler_mlp_discrete.joblib' are in the same directory.")
    exit()

# --- 2. PREPROCESSING & FEATURE ENGINEERING FUNCTIONS ---
def clean_text(text):
    text = str(text).lower()
    return re.sub(r'\\s+', ' ', text).strip()

# FIX 1: Pass the `nlp_model` object as an argument.
def get_discrete_features(sentence, nlp_model):
    """Calculates all 19 discrete linguistic features for a single sentence."""
    features = {}
    
    # ... (rest of the feature calculation is correct)
    features['char_count'] = len(sentence)
    words = sentence.split()
    features['word_count'] = len(words)
    features['avg_word_length'] = features['char_count'] / features['word_count'] if features['word_count'] > 0 else 0
    features['flesch_ease'] = textstat.flesch_reading_ease(sentence)
    features['flesch_grade'] = textstat.flesch_kincaid_grade(sentence)
    features['gunning_fog'] = textstat.gunning_fog(sentence)
    tokens = word_tokenize(sentence)
    features['ttr'] = len(set(tokens)) / len(tokens) if tokens else 0
    features['lex_density_proxy'] = sum(1 for w in tokens if len(w) > 6) / len(tokens) if tokens else 0
    
    # FIX 2: Use the passed `nlp_model` argument instead of the global name `NLP`.
    doc = nlp_model(sentence)
    dep_distances = [abs(token.i - token.head.i) for token in doc if token.head is not token]
    pos_counts = doc.count_by(spacy.attrs.POS)
    
    features['num_subord_clauses'] = sum(1 for token in doc if token.dep_ == 'mark')
    features['num_conj_clauses'] = sum(1 for token in doc if token.dep_ == 'cc' and token.head.pos_ == 'VERB')
    features['avg_dep_dist'] = np.mean(dep_distances) if dep_distances else 0
    features['max_dep_dist'] = np.max(dep_distances) if dep_distances else 0
    features['num_verbs'] = pos_counts.get(spacy.parts_of_speech.VERB, 0)
    features['num_nouns'] = pos_counts.get(spacy.parts_of_speech.NOUN, 0) + pos_counts.get(spacy.parts_of_speech.PROPN, 0)
    features['num_adjectives'] = pos_counts.get(spacy.parts_of_speech.ADJ, 0)
    features['num_adverbs'] = pos_counts.get(spacy.parts_of_speech.ADV, 0)
    features['num_prepositions'] = pos_counts.get(spacy.parts_of_speech.ADP, 0)
    features['num_conjunctions'] = pos_counts.get(spacy.parts_of_speech.CCONJ, 0) + pos_counts.get(spacy.parts_of_speech.SCONJ, 0)
    
    feature_order = [
        'char_count', 'word_count', 'avg_word_length', 'ttr', 'lex_density_proxy',
        'flesch_ease', 'flesch_grade', 'gunning_fog', 'num_subord_clauses',
        'num_conj_clauses', 'avg_dep_dist', 'max_dep_dist', 'num_verbs',
        'num_nouns', 'num_adjectives', 'num_adverbs', 'num_prepositions', 'num_conjunctions',
        'ollama_llm_rating'
    ]
    features['ollama_llm_rating'] = 3.0
    return np.array([features[k] for k in feature_order]).reshape(1, -1)

def get_bert_embedding(sentence):
    # ... (This function is correct, no changes needed)
    encoded = TOKENIZER.encode_plus(sentence, add_special_tokens=True, max_length=MAX_LEN_BERT, return_token_type_ids=False, padding='max_length', truncation=True, return_attention_mask=True, return_tensors='pt')
    input_ids, attention_mask = encoded['input_ids'].to(DEVICE), encoded['attention_mask'].to(DEVICE)
    with torch.no_grad():
        outputs = BERT_EMBEDDING_MODEL(input_ids, attention_mask=attention_mask)
        embedding = outputs.last_hidden_state[:, 0, :].cpu().numpy()
    return embedding

# --- 3. THE PREDICTION FUNCTION ---
def predict_cognitive_state(sentence):
    if not sentence.strip():
        return {"Normal Reading (NR)": 0, "Task-Specific Reading (TSR)": 0}

    cleaned = clean_text(sentence)

    # FIX 3: Pass the loaded NLP model into the function.
    discrete_features = get_discrete_features(cleaned, NLP)
    
    scaled_discrete_features = SCALER.transform(discrete_features)
    bert_embedding = get_bert_embedding(cleaned)
    combined_features = np.concatenate((bert_embedding, scaled_discrete_features), axis=1)
    
    features_tensor = torch.tensor(combined_features, dtype=torch.float32).to(DEVICE)
    with torch.no_grad():
        logits = MLP_MODEL(features_tensor)
        probabilities = torch.softmax(logits, dim=1).cpu().numpy()[0]

    labels = ["Normal Reading (NR)", "Task-Specific Reading (TSR)"]
    confidences = {label: float(prob) for label, prob in zip(labels, probabilities)}
    
    return confidences

# --- 4. GRADIO INTERFACE ---
title = "🧠 Cognitive State Analysis from Text"
description = (
    "Enter a sentence to predict its cognitive state. This demo uses a fine-tuned BERT model for semantic "
    "embeddings combined with 19 discrete linguistic features. These features are fed into a Multi-Layer Perceptron (MLP) "
    "to classify the input as either:\n\n"
    "- **Normal Reading (NR):** Casual reading without a specific goal—like reading a story or browsing news.\n"
    "- **Task-Specific Reading (TSR):** Purpose-driven reading—such as searching for an answer or following instructions.\n\n"
    "The model is trained on text data from the ZuCo dataset, using only linguistic features—no EEG or eye-tracking signals are used."
)
example_list = [
    ["Through his son Timothy Bush, Jr., who was also a blacksmith, descended two American Presidents -George H. W. Bush and George W. Bush."],
    ["He received his bachelor's degree in 1965 and master's degree in political science in 1966 both from the University of Wyoming."],
    ["What does the abbreviation Ph.D. stand for?"],
    ["What is the name of the director of the 2003 American film 'The Haunted Mansion'?"],
]

demo = gr.Interface(
    fn=predict_cognitive_state,
    inputs=gr.Textbox(lines=3, label="Input Sentence", placeholder="Type a sentence here..."),
    outputs=gr.Label(num_top_classes=2, label="Prediction"),
    title=title,
    description=description,
    examples=example_list,
    allow_flagging="never"
)

if __name__ == "__main__":
    # FIX 4: Corrected the typo from Launch to launch (lowercase 'l').
    demo.launch(debug=True)