import gradio as gr
import pandas as pd
import numpy as np
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression, RidgeClassifier, SGDClassifier
from sklearn.metrics import accuracy_score
from sklearn.naive_bayes import ComplementNB
from sklearn.neighbors import KNeighborsClassifier, NearestCentroid
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import LinearSVC
from sklearn.utils.extmath import density
from time import time
import matplotlib.pyplot as plt
import matplotlib
from sklearn.metrics import ConfusionMatrixDisplay
import io
import base64
matplotlib.use('Agg') # set the backend to avoid GUI warning


all_categories = [
 'alt.atheism',
 'comp.graphics',
 'comp.os.ms-windows.misc',
 'comp.sys.ibm.pc.hardware',
 'comp.sys.mac.hardware',
 'comp.windows.x',
 'misc.forsale',
 'rec.autos',
 'rec.motorcycles',
 'rec.sport.baseball',
 'rec.sport.hockey',
 'sci.crypt',
 'sci.electronics',
 'sci.med',
 'sci.space',
 'soc.religion.christian',
 'talk.politics.guns',
 'talk.politics.mideast',
 'talk.politics.misc',
 'talk.religion.misc'
]


def size_mb(docs):
    return sum(len(s.encode("utf-8")) for s in docs) / 1e6


def load_dataset(categories, verbose=False, remove=()):
    """Load and vectorize the 20 newsgroups dataset."""

    data_train = fetch_20newsgroups(
        subset="train",
        categories=categories,
        shuffle=True,
        random_state=42,
        remove=remove,
    )

    data_test = fetch_20newsgroups(
        subset="test",
        categories=categories,
        shuffle=True,
        random_state=42,
        remove=remove,
    )

    # order of labels in `target_names` can be different from `categories`
    target_names = data_train.target_names

    # split target in a training set and a test set
    y_train, y_test = data_train.target, data_test.target

    # Extracting features from the training data using a sparse vectorizer
    t0 = time()
    vectorizer = TfidfVectorizer(
        sublinear_tf=True, max_df=0.5, min_df=5, stop_words="english"
    )
    X_train = vectorizer.fit_transform(data_train.data)
    duration_train = time() - t0

    # Extracting features from the test data using the same vectorizer
    t0 = time()
    X_test = vectorizer.transform(data_test.data)
    duration_test = time() - t0

    feature_names = vectorizer.get_feature_names_out()

    if verbose:

        # compute size of loaded data
        data_train_size_mb = size_mb(data_train.data)
        data_test_size_mb = size_mb(data_test.data)

        print(
            f"{len(data_train.data)} documents - "
            f"{data_train_size_mb:.2f}MB (training set)"
        )
        print(f"{len(data_test.data)} documents - {data_test_size_mb:.2f}MB (test set)")
        print(f"{len(target_names)} categories")
        print(
            f"vectorize training done in {duration_train:.3f}s "
            f"at {data_train_size_mb / duration_train:.3f}MB/s"
        )
        print(f"n_samples: {X_train.shape[0]}, n_features: {X_train.shape[1]}")
        print(
            f"vectorize testing done in {duration_test:.3f}s "
            f"at {data_test_size_mb / duration_test:.3f}MB/s"
        )
        print(f"n_samples: {X_test.shape[0]}, n_features: {X_test.shape[1]}")

    return X_train, X_test, y_train, y_test, feature_names, target_names

def benchmark(clf, X_train, X_test, y_train, y_test):
    print("_" * 80)
    print("Training: ")
    print(clf)
    t0 = time()
    clf.fit(X_train, y_train)
    train_time = time() - t0
    print(f"train time: {train_time:.3}s")

    t0 = time()
    pred = clf.predict(X_test)
    test_time = time() - t0
    print(f"test time:  {test_time:.3}s")

    score = accuracy_score(y_test, pred)
    print(f"accuracy:   {score:.3}")

    if hasattr(clf, "coef_"):
        print(f"dimensionality: {clf.coef_.shape[1]}")
        print(f"density: {density(clf.coef_)}")
        print()

    print()
    clf_descr = clf.__class__.__name__
    return clf_descr, score, train_time, test_time


def run_experiment(categories, models):
    X_train, X_test, y_train, y_test, feature_names, target_names = load_dataset(
        categories, verbose=True
    )
    results = []
    for clf, name in models:
        print("=" * 80)
        print(name)
        results.append(benchmark(clf, X_train, X_test, y_train, y_test))
        plot_feature_effects(clf, target_names, feature_names, X_train)
    clf_names, score, training_time, test_time = [list(x) for x in zip(*results)]

    training_time = np.array(training_time)
    test_time = np.array(test_time)

    fig, ax1 = plt.subplots(figsize=(10, 8))
    ax1.scatter(score, training_time, s=60)
    ax1.set(
        title="Score-training time trade-off",
        yscale="log",
        xlabel="test accuracy",
        ylabel="training time (s)",
    )

    fig, ax2 = plt.subplots(figsize=(10, 8))
    ax2.scatter(score, test_time, s=60)

    ax2.set(
        title="Score-test time trade-off",
        yscale="log",
        xlabel="test accuracy",
        ylabel="test time (s)",
    )

    for i, txt in enumerate(clf_names):
        ax1.annotate(txt, (score[i], training_time[i]))
        ax2.annotate(txt, (score[i], test_time[i]))

    result_df = pd.DataFrame(
        {"Model": clf_names, "Test Accuracy": score, "Training Time": training_time, "Test Time": test_time}
    )

    return result_df

def run_experiment_gradio():
    models = [(LogisticRegression(C=5, max_iter=1000), "Logistic Regression"), (RidgeClassifier(alpha=1.0, solver="sparse_cg"), "Ridge Classifier"), (KNeighborsClassifier(n_neighbors=100), "kNN"), (RandomForestClassifier(), "Random Forest"), (LinearSVC(C=0.1, dual=False, max_iter=1000), "Linear SVC"), (SGDClassifier(loss="log_loss", alpha=1e-4, n_iter_no_change=3, early_stopping=True), "log-loss SGD"), (NearestCentroid(), "NearestCentroid"), (ComplementNB(alpha=0.1), "Complement naive Bayes")]

    def run_model(model_names, categories):
        results = []
        print(model_names)
        for model_name in model_names:
            model = next((m[0] for m in models if str(m[0]) == model_name), None)
            if model is None:
                continue
            X_train, X_test, y_train, y_test, feature_names, target_names = load_dataset(
                categories, verbose=True
            )
            clf = model
            clf_descr, score, train_time, test_time = benchmark(clf, X_train, X_test, y_train, y_test)
            results.append({"Model": clf_descr, "Test Accuracy": score, "Training Time": train_time, "Test Time": test_time})
        return pd.DataFrame(results)

    category_options = [category for category in all_categories]
    category_group = gr.inputs.CheckboxGroup(
        label="Categories",
        choices=category_options,
        default=category_options[:5],
    )

    model_options = [model[0] for model in models]
    model_dropdown = gr.inputs.CheckboxGroup(
        choices=model_options,
        label="Models",
    )

    interface = gr.Interface(
        fn=run_model,
        inputs=[model_dropdown, category_group],
        outputs="dataframe",
        title="20 Newsgroups Text Classification Experiment",
        description="Select one or more categories and one or more models, then click 'Run Experiment' to evaluate them on the 20 newsgroups text classification task.",
        allow_flagging=False,
        analytics_enabled=False
    )

    return interface

run_experiment_gradio().launch(quiet=False)