Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import bz2 | |
| import json | |
| import re | |
| from functools import lru_cache | |
| from typing import TYPE_CHECKING, Literal, Sequence | |
| import emoji | |
| import pandas as pd | |
| import spacy | |
| from joblib import Parallel, delayed | |
| from tqdm import tqdm | |
| from app.constants import ( | |
| AMAZONREVIEWS_PATH, | |
| AMAZONREVIEWS_URL, | |
| IMDB50K_PATH, | |
| IMDB50K_URL, | |
| SENTIMENT140_PATH, | |
| SENTIMENT140_URL, | |
| SLANGMAP_PATH, | |
| SLANGMAP_URL, | |
| TEST_DATASET_PATH, | |
| TEST_DATASET_URL, | |
| ) | |
| if TYPE_CHECKING: | |
| from re import Pattern | |
| from spacy.tokens import Doc | |
| __all__ = ["load_data", "tokenize"] | |
| try: | |
| nlp = spacy.load("en_core_web_sm") | |
| except OSError: | |
| print("Downloading spaCy model...") | |
| from spacy.cli import download as spacy_download | |
| spacy_download("en_core_web_sm") | |
| nlp = spacy.load("en_core_web_sm") | |
| def slang() -> tuple[Pattern, dict[str, str]]: | |
| """Compile a re pattern for slang terms. | |
| Returns: | |
| Slang pattern and mapping | |
| Raises: | |
| FileNotFoundError: If the file is not found | |
| """ | |
| if not SLANGMAP_PATH.exists(): | |
| msg = ( | |
| f"Slang mapping file not found at: '{SLANGMAP_PATH}'\n" | |
| "Please download the file from:\n" | |
| f"{SLANGMAP_URL}" | |
| ) # fmt: off | |
| raise FileNotFoundError(msg) | |
| with SLANGMAP_PATH.open() as f: | |
| mapping = json.load(f) | |
| return re.compile(r"\b(" + "|".join(map(re.escape, mapping.keys())) + r")\b"), mapping | |
| def _clean(text: str) -> str: | |
| """Perform basic text cleaning. | |
| Args: | |
| text: Text to clean | |
| Returns: | |
| Cleaned text | |
| """ | |
| # Make text lowercase | |
| text = text.lower() | |
| # Remove HTML tags | |
| text = re.sub(r"<[^>]*>", "", text) | |
| # Map slang terms | |
| slang_pattern, slang_mapping = slang() | |
| text = slang_pattern.sub(lambda x: slang_mapping[x.group()], text) | |
| # Remove acronyms and abbreviations | |
| text = re.sub(r"\b(?:[a-z]\.?)(?:[a-z]\.)\b", "", text) | |
| # Remove honorifics | |
| text = re.sub(r"\b(?:mr|mrs|ms|dr|prof|sr|jr)\.?\b", "", text) | |
| # Remove year abbreviations | |
| text = re.sub(r"\b(?:\d{3}0|\d0)s?\b", "", text) | |
| # Remove hashtags | |
| text = re.sub(r"#[^\s]+", "", text) | |
| # Replace mentions with a generic tag | |
| text = re.sub(r"@[^\s]+", "user", text) | |
| # Replace X/Y with X or Y | |
| text = re.sub(r"\b([a-z]+)[//]([a-z]+)\b", r"\1 or \2", text) | |
| # Convert emojis to text | |
| text = emoji.demojize(text, delimiters=("emoji_", "")) | |
| # Remove special characters | |
| text = re.sub(r"[^a-z0-9\s]", "", text) | |
| # EXTRA: imdb50k specific cleaning | |
| text = re.sub(r"mst3k", "", text) # Very common acronym for Mystery Science Theater 3000 | |
| return text.strip() | |
| def _lemmatize(doc: Doc, threshold: int = 3) -> Sequence[str]: | |
| """Lemmatize the provided text using spaCy. | |
| Args: | |
| doc: spaCy document | |
| threshold: Minimum character length of tokens | |
| Returns: | |
| Sequence of lemmatized tokens | |
| """ | |
| return [ | |
| tok | |
| for token in doc | |
| if not token.is_stop # Ignore stop words | |
| and not token.is_punct # Ignore punctuation | |
| and not token.like_email # Ignore email addresses | |
| and not token.like_url # Ignore URLs | |
| and not token.like_num # Ignore numbers | |
| and token.is_alpha # Ignore non-alphabetic tokens | |
| and (len(tok := token.lemma_.lower().strip()) >= threshold) # Ignore short tokens | |
| ] | |
| def tokenize( | |
| text_data: Sequence[str], | |
| batch_size: int = 512, | |
| n_jobs: int = 4, | |
| character_threshold: int = 3, | |
| show_progress: bool = True, | |
| ) -> Sequence[Sequence[str]]: | |
| """Tokenize the provided text using spaCy. | |
| Args: | |
| text_data: Text data to tokenize | |
| batch_size: Batch size for tokenization | |
| n_jobs: Number of parallel jobs | |
| character_threshold: Minimum character length of tokens | |
| show_progress: Whether to show a progress bar | |
| Returns: | |
| Tokenized text data | |
| """ | |
| text_data = Parallel(n_jobs=n_jobs)( | |
| delayed(_clean)(text) | |
| for text in tqdm( | |
| text_data, | |
| desc="Cleaning", | |
| unit="doc", | |
| disable=not show_progress, | |
| ) | |
| ) | |
| return pd.Series( | |
| [ | |
| _lemmatize(doc, character_threshold) | |
| for doc in tqdm( | |
| nlp.pipe(text_data, batch_size=batch_size, n_process=n_jobs, disable=["parser", "ner"]), | |
| total=len(text_data), | |
| desc="Lemmatization", | |
| unit="doc", | |
| disable=not show_progress, | |
| ) | |
| ], | |
| ) | |
| def load_sentiment140(include_neutral: bool = False) -> tuple[list[str], list[int]]: | |
| """Load the sentiment140 dataset and make it suitable for use. | |
| Args: | |
| include_neutral: Whether to include neutral sentiment | |
| Returns: | |
| Text and label data | |
| Raises: | |
| FileNotFoundError: If the dataset is not found | |
| """ | |
| # Check if the dataset exists | |
| if not SENTIMENT140_PATH.exists(): | |
| msg = ( | |
| f"Sentiment140 dataset not found at: '{SENTIMENT140_PATH}'\n" | |
| "Please download the dataset from:\n" | |
| f"{SENTIMENT140_URL}" | |
| ) | |
| raise FileNotFoundError(msg) | |
| # Load the dataset | |
| data = pd.read_csv( | |
| SENTIMENT140_PATH, | |
| encoding="ISO-8859-1", | |
| names=[ | |
| "target", # 0 = negative, 2 = neutral, 4 = positive | |
| "id", # The id of the tweet | |
| "date", # The date of the tweet | |
| "flag", # The query, NO_QUERY if not present | |
| "user", # The user that tweeted | |
| "text", # The text of the tweet | |
| ], | |
| ) | |
| # Ignore rows with neutral sentiment | |
| if not include_neutral: | |
| data = data[data["target"] != 2] | |
| # Map sentiment values | |
| data["sentiment"] = data["target"].map( | |
| { | |
| 0: 0, # Negative | |
| 4: 1, # Positive | |
| 2: 2, # Neutral | |
| }, | |
| ) | |
| # Return as lists | |
| return data["text"].tolist(), data["sentiment"].tolist() | |
| def load_amazonreviews() -> tuple[list[str], list[int]]: | |
| """Load the amazonreviews dataset and make it suitable for use. | |
| Returns: | |
| Text and label data | |
| Raises: | |
| FileNotFoundError: If the dataset is not found | |
| """ | |
| # Check if the dataset exists | |
| if not AMAZONREVIEWS_PATH.exists(): | |
| msg = ( | |
| f"Amazonreviews dataset not found at: '{AMAZONREVIEWS_PATH}'\n" | |
| "Please download the dataset from:\n" | |
| f"{AMAZONREVIEWS_URL}" | |
| ) | |
| raise FileNotFoundError(msg) | |
| # Load the dataset | |
| with bz2.BZ2File(AMAZONREVIEWS_PATH) as f: | |
| dataset = [line.decode("utf-8") for line in f] | |
| # Split the data into labels and text | |
| labels, texts = zip(*(line.split(" ", 1) for line in dataset)) | |
| # Map sentiment values | |
| sentiments = [int(label.split("__label__")[1]) - 1 for label in labels] | |
| # Return as lists | |
| return texts, sentiments | |
| def load_imdb50k() -> tuple[list[str], list[int]]: | |
| """Load the imdb50k dataset and make it suitable for use. | |
| Returns: | |
| Text and label data | |
| Raises: | |
| FileNotFoundError: If the dataset is not found | |
| """ | |
| # Check if the dataset exists | |
| if not IMDB50K_PATH.exists(): | |
| msg = ( | |
| f"IMDB50K dataset not found at: '{IMDB50K_PATH}'\n" | |
| "Please download the dataset from:\n" | |
| f"{IMDB50K_URL}" | |
| ) # fmt: off | |
| raise FileNotFoundError(msg) | |
| # Load the dataset | |
| data = pd.read_csv(IMDB50K_PATH) | |
| # Map sentiment values | |
| data["sentiment"] = data["sentiment"].map( | |
| { | |
| "positive": 1, | |
| "negative": 0, | |
| }, | |
| ) | |
| # Return as lists | |
| return data["review"].tolist(), data["sentiment"].tolist() | |
| def load_test() -> tuple[list[str], list[int]]: | |
| """Load the test dataset and make it suitable for use. | |
| Returns: | |
| Text and label data | |
| Raises: | |
| FileNotFoundError: If the dataset is not found | |
| """ | |
| # Check if the dataset exists | |
| if not TEST_DATASET_PATH.exists(): | |
| msg = ( | |
| f"Test dataset not found at: '{TEST_DATASET_PATH}'\n" | |
| "Please download the dataset from:\n" | |
| f"{TEST_DATASET_URL}" | |
| ) | |
| raise FileNotFoundError(msg) | |
| # Load the dataset | |
| data = pd.read_csv(TEST_DATASET_PATH) | |
| # Return as lists | |
| return data["text"].tolist(), data["sentiment"].tolist() | |
| def load_data(dataset: Literal["sentiment140", "amazonreviews", "imdb50k", "test"]) -> tuple[list[str], list[int]]: | |
| """Load and preprocess the specified dataset. | |
| Args: | |
| dataset: Dataset to load | |
| Returns: | |
| Text and label data | |
| Raises: | |
| ValueError: If the dataset is not recognized | |
| """ | |
| match dataset: | |
| case "sentiment140": | |
| return load_sentiment140(include_neutral=False) | |
| case "amazonreviews": | |
| return load_amazonreviews() | |
| case "imdb50k": | |
| return load_imdb50k() | |
| case "test": | |
| return load_test() | |
| case _: | |
| msg = f"Unknown dataset: {dataset}" | |
| raise ValueError(msg) | |