""" This script defines a custom tokenizer, `SupplyChainTokenizer`, specifically designed for a collaborative predictive supply chain model using Transformer-based architecture. It leverages a custom, industry-specific vocabulary (loaded from `vocab.json`) to prioritize domain-relevant tokens (SKUs, store IDs, plant IDs, promotion types, etc.) while employing Byte-Pair Encoding (BPE) to handle out-of-vocabulary words and variations. The script also includes a comprehensive example usage section demonstrating how to create, train, use, save, and load the tokenizer. This tokenizer is a critical component for bridging the gap between raw supply chain data and a Transformer-based forecasting model. """ import json import os from typing import List, Dict, Union, Tuple from tokenizers import ( Tokenizer, models, normalizers, pre_tokenizers, decoders, trainers, processors, ) from tokenizers.pre_tokenizers import WhitespaceSplit, Digits from tokenizers import Regex import pandas as pd class SupplyChainTokenizer: """ A custom tokenizer designed for the Enhanced Business Model for Collaborative Predictive Supply Chain. It prioritizes industry-specific tokens from a `vocab.json` file and uses Byte-Pair Encoding (BPE) for out-of-vocabulary (OOV) words. It handles various data types expected in supply chain data. Args: vocab_path (str): Path to the `vocab.json` file. max_length (int, optional): Maximum sequence length. Defaults to 512. """ def __init__(self, vocab_path: str, max_length: int = 512): if not os.path.exists(vocab_path): raise FileNotFoundError(f"Vocabulary file not found: {vocab_path}") self.vocab_path = vocab_path self.max_length = max_length # Load the custom vocabulary with open(self.vocab_path, "r", encoding="utf-8") as f: self.vocab = json.load(f) # 1. Create the BPE model self.bpe_model = models.BPE( vocab=self.vocab, # Initialize with the custom vocabulary merges=[], # We'll populate merges during training unk_token="[UNK]", # Unknown token ) # 2. Create a Tokenizer instance self.tokenizer = Tokenizer(self.bpe_model) # 3. Normalization (Lowercase and Unicode normalization) self.tokenizer.normalizer = normalizers.Sequence( [normalizers.NFD(), normalizers.Lowercase(), normalizers.StripAccents()] ) # 4. Pre-tokenization (Splitting into words) self.tokenizer.pre_tokenizer = pre_tokenizers.Sequence( [WhitespaceSplit(), Digits(individual_digits=True)] ) # 5. Decoder (Convert token IDs back to strings) self.tokenizer.decoder = decoders.BPEDecoder() # 6. Post-processing (Special tokens) self.tokenizer.post_processor = processors.TemplateProcessing( single="[CLS] $A [SEP]", pair="[CLS] $A [SEP] $B:1 [SEP]:1", special_tokens=[("[CLS]", self.vocab["[CLS]"]), ("[SEP]", self.vocab["[SEP]"])], ) # Adding this, although not used in encode or encode_as_ids self.pad_token_id = self.vocab["[PAD]"] def train_bpe(self, files: Union[str, List[str]], vocab_size: int = 30000): """ Trains the BPE model on text files. This updates the `merges` of the BPE model. This is *crucial* for handling words not in the initial `vocab.json`. Args: files (Union[str, List[str]]): Path(s) to text file(s) for training. vocab_size (int): The desired vocabulary size (including special tokens and initial vocabulary). """ if isinstance(files, str): files = [files] # Create a trainer trainer = trainers.BpeTrainer( vocab_size=vocab_size, special_tokens=["[UNK]", "[CLS]", "[SEP]", "[PAD]", "[MASK]"], initial_alphabet=pre_tokenizers.ByteLevel.alphabet(), # All single bytes show_progress=True, ) # Train the tokenizer self.tokenizer.train(files, trainer=trainer) def encode(self, text: str, text_pair: str = None) -> List[str]: """ Encodes text into a list of tokens. Args: text (str): The input text. text_pair (str, optional): An optional second input string. Returns: List[str]: A list of tokens. """ encoded = self.tokenizer.encode(text, text_pair) return encoded.tokens def encode_as_ids(self, text: str, text_pair: str = None) -> List[int]: """ Encodes text into a list of token IDs. Args: text (str): The input text. text_pair (str, optional): An optional second input string. Returns: List[int]: A list of token IDs. """ encoded = self.tokenizer.encode(text, text_pair) return encoded.ids def decode(self, ids: List[int], skip_special_tokens: bool = True) -> str: """ Decodes a list of token IDs back into a string. Args: ids (List[int]): The list of token IDs. skip_special_tokens (bool): Whether to skip special tokens in decoding. Returns: str: The decoded string. """ return self.tokenizer.decode(ids, skip_special_tokens=skip_special_tokens) def token_to_id(self, token: str) -> int: """ Converts a token to its corresponding ID. Args: token (str): The token. Returns: int: The token ID. Returns None if the token is not in the vocabulary. """ return self.vocab.get(token, self.vocab.get("[UNK]")) def id_to_token(self, id_: int) -> str: """ Converts a token ID to its corresponding token. Args: id_ (int): The token ID. Returns: str: The token. Returns "[UNK]" if the ID is not in the vocabulary. """ # Reverse lookup (efficient if needed frequently) reverse_vocab = {v: k for k, v in self.vocab.items()} return reverse_vocab.get(id_, "[UNK]") def get_vocab_size(self) -> int: """Gets the vocabulary size.""" return len(self.vocab) def save(self, directory: str, prefix: str = None): """ Saves the tokenizer configuration and vocabulary to a directory. Args: directory (str): The directory to save to. prefix (str, optional): An optional prefix for the filenames. """ if not os.path.exists(directory): os.makedirs(directory) # Save the tokenizer configuration self.tokenizer.save(os.path.join(directory, (prefix + "-" if prefix else "") + "tokenizer.json")) # Save a copy of the vocabulary (for easy access) with open(os.path.join(directory, (prefix + "-" if prefix else "") + "vocab.json"), "w", encoding="utf-8") as f: json.dump(self.vocab, f, ensure_ascii=False, indent=4) @staticmethod def from_pretrained(directory: str, prefix: str = None): """ Loads a pre-trained tokenizer from a directory. Args: directory (str): The directory to load from. prefix (str, optional): The optional prefix used when saving. Returns: SupplyChainTokenizer: The loaded tokenizer. """ vocab_path = os.path.join(directory, (prefix + "-" if prefix else "") + "vocab.json") # You could load the tokenizer.json, but since we have a custom class # with training logic, it's better to reconstruct the object this way. tokenizer = SupplyChainTokenizer(vocab_path) tokenizer.tokenizer = Tokenizer.from_file(os.path.join(directory, (prefix + "-" if prefix else "") + "tokenizer.json")) return tokenizer def prepare_for_model(self, data: pd.DataFrame) -> Tuple[List[List[int]], List[List[int]]]: """ Prepares a Pandas DataFrame for the Transformer model. This is the key method that integrates the tokenizer with the data. Args: data (pd.DataFrame): The input DataFrame, expected to have columns like 'timestamp', 'sku', 'store_id', 'quantity', 'price', 'discount', 'promotion_id', etc. The exact columns depend on the features you're using. Returns: Tuple[List[List[int]], List[List[int]]]: A tuple. 1. input_ids: List of token ID sequences for the model. 2. attention_mask: List of attention masks (1 for real tokens, 0 for padding). """ input_ids = [] attention_masks = [] for _, row in data.iterrows(): # Build the input string. This is where you define *how* your # features are combined into a single sequence. input_string = ( f"[CLS] timestamp: {row['timestamp']} " f"sku: {row['sku']} store_id: {row['store_id']} " f"quantity: {row['quantity']} price: {row['price']} " f"discount: {row['discount']} " ) # Add promotion information if available if 'promotion_id' in row and not pd.isna(row['promotion_id']): input_string += f"promotion_id: {row['promotion_id']} " # Add any other relevant features here if 'product_category' in row: input_string += f"product_category: {row['product_category']} " # Add other external features input_string += "[SEP]" # Tokenize encoded = self.tokenizer.encode(input_string) token_ids = encoded.ids attention_mask = encoded.attention_mask # Padding (up to max_length) padding_length = self.max_length - len(token_ids) if padding_length > 0: token_ids += [self.pad_token_id] * padding_length attention_mask += [0] * padding_length elif padding_length < 0: # Truncation token_ids = token_ids[:self.max_length] attention_mask = attention_mask[:self.max_length] input_ids.append(token_ids) attention_masks.append(attention_mask) return input_ids, attention_masks # Example Usage (Illustrative) if __name__ == "__main__": # --- Create a dummy vocab.json --- vocab = { "[UNK]": 0, "[CLS]": 1, "[SEP]": 2, "[PAD]": 3, "[MASK]": 4, "timestamp:": 5, "sku:": 6, "store_id:": 7, "quantity:": 8, "price:": 9, "discount:": 10, "promotion_id:": 11, "product_category:": 12, "SKU123": 13, # Example SKU "SKU123-RED": 14, # Example SKU variant "SKU123-BLUE": 15, "STORE456": 16, # Example store ID "PLANT789": 17, # Example plant ID "WHOLESALER001": 18, # Example Wholesaler "RETAILER002": 19, # Example Retailer "BOGO": 20, "DISCOUNT":21, } with open("vocab.json", "w") as f: json.dump(vocab, f, indent=4) # --- Create the tokenizer --- tokenizer = SupplyChainTokenizer(vocab_path="vocab.json") # --- Example training (on a dummy text file) --- with open("training_data.txt", "w", encoding="utf-8") as f: f.write("This is some example text for training the BPE model.\n") f.write("SKU123 is a product. STORE456 is another. plant789 is, too.\n") f.write("This file contains words not in the initial vocabulary.\n") tokenizer.train_bpe("training_data.txt", vocab_size=50) # Small vocab for the example # --- Example encoding --- text = "timestamp: 2024-07-03 sku: SKU123 store_id: STORE456 quantity: 2 price: 10.99 discount: 0.0" encoded_tokens = tokenizer.encode(text) encoded_ids = tokenizer.encode_as_ids(text) print(f"Encoded tokens: {encoded_tokens}") print(f"Encoded IDs: {encoded_ids}") decoded_text = tokenizer.decode(encoded_ids) print(f"Decoded text: {decoded_text}") # -- Example with DataFrame --- data = { 'timestamp': ['2024-07-03 10:00:00', '2024-07-03 11:00:00'], 'sku': ['SKU123', 'SKU123-RED'], 'store_id': ['STORE456', 'STORE456'], 'quantity': [2, 1], 'price': [10.99, 12.99], 'discount': [0.0, 1.0], 'promotion_id': ['BOGO', None], 'product_category': ['Electronics', 'Electronics'] } df = pd.DataFrame(data) input_ids, attention_masks = tokenizer.prepare_for_model(df) print(f"Input IDs (for model): {input_ids}") print(f"Attention Masks: {attention_masks}") # --- Save and load --- tokenizer.save("my_tokenizer") loaded_tokenizer = SupplyChainTokenizer.from_pretrained("my_tokenizer") print(f"Loaded tokenizer vocab size: {loaded_tokenizer.get_vocab_size()}") # Clean up example files os.remove("vocab.json") os.remove("training_data.txt")