|
|
|
|
|
from collections import OrderedDict |
|
from fairseq.data import Dictionary |
|
|
|
from transformers.tokenization_utils import PreTrainedTokenizer |
|
from transformers.dynamic_module_utils import custom_object_save |
|
from transformers.utils import ( |
|
is_tokenizers_available, |
|
logging, |
|
) |
|
|
|
from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, Sequence, Tuple, Union |
|
|
|
import copy |
|
import os |
|
import stanza |
|
import youtokentome as yttm |
|
import json |
|
|
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
|
|
SPECIAL_TOKENS_MAP_FILE = "special_tokens_map.json" |
|
ADDED_TOKENS_FILE = "added_tokens.json" |
|
TOKENIZER_CONFIG_FILE = "tokenizer_config.json" |
|
|
|
if is_tokenizers_available(): |
|
from tokenizers import AddedToken |
|
from tokenizers import Encoding as EncodingFast |
|
else: |
|
|
|
@dataclass(frozen=True, eq=True) |
|
class AddedToken: |
|
""" |
|
AddedToken represents a token to be added to a Tokenizer An AddedToken can have special options defining the |
|
way it should behave. |
|
""" |
|
|
|
content: str = field(default_factory=str) |
|
single_word: bool = False |
|
lstrip: bool = False |
|
rstrip: bool = False |
|
normalized: bool = True |
|
|
|
def __getstate__(self): |
|
return self.__dict__ |
|
|
|
@dataclass |
|
class EncodingFast: |
|
"""This is dummy class because without the `tokenizers` library we don't have these objects anyway""" |
|
|
|
pass |
|
|
|
|
|
class BertDictionary(Dictionary): |
|
"""Dictionary for BERT tasks |
|
extended from Dictionary by adding support for cls as well as mask symbols""" |
|
def __init__( |
|
self, |
|
pad='[PAD]', |
|
unk='[UNK]', |
|
cls='[CLS]', |
|
mask='[MASK]', |
|
sep='[SEP]' |
|
): |
|
super().__init__(pad=pad, unk=unk) |
|
( |
|
self.cls_word, |
|
self.mask_word, |
|
self.sep_word, |
|
) = cls, mask, sep |
|
|
|
self.is_end = None |
|
self.nspecial = len(self.symbols) |
|
|
|
def mask(self): |
|
"""Helper to get index of mask symbol""" |
|
idx = self.index(self.mask_word) |
|
return idx |
|
|
|
def is_end_word(self, idx): |
|
if self.is_end is None: |
|
self.is_end = [self.symbols[i].endswith("</w>") for i in range(len(self))] |
|
return self.is_end[idx] |
|
|
|
|
|
class FB2Tokenizer(PreTrainedTokenizer): |
|
""" |
|
YTTMTransformersTokenizer BPE tokenizer. Peculiarities: |
|
|
|
- Byte-level Byte-Pair-Encoding |
|
- Requires a space to start the input string => the encoding methods should be called with the |
|
``add_prefix_space`` flag set to ``True``. |
|
Otherwise, this tokenizer ``encode`` and ``decode`` method will not conserve |
|
the absence of a space at the beginning of a string: |
|
|
|
:: |
|
|
|
tokenizer.decode(tokenizer.encode("Hello", add_special_tokens=False)) |
|
|
|
This tokenizer inherits from :class:`~transformers.PreTrainedTokenizer` which contains most of the methods. Users |
|
should refer to the superclass for more information regarding methods. |
|
|
|
Args: |
|
vocab_file (:obj:`str`): |
|
Path to the vocabulary file. |
|
unk_token (:obj:`string`, `optional`, defaults to <UNK>`): |
|
The unknown token. A token that is not in the vocabulary cannot be converted to an ID and is set to be this |
|
token instead. |
|
bos_token (:obj:`string`, `optional`, defaults to `<BOS>`): |
|
The beginning of sequence token. |
|
eos_token (:obj:`string`, `optional`, defaults to `<EOS>`): |
|
The end of sequence token. |
|
pad_token (:obj:`string`, `optional`, defaults to `<PAD>`): |
|
The padding of sequence token. |
|
model_max_length: (`Optional`) int: the maximum length in number of tokens for the inputs to the transformer |
|
model. When the tokenizer is loaded with `from_pretrained`, |
|
this will be set to the value stored for the associated. |
|
""" |
|
vocab_files_names = {"vocab_file": "vocab.txt", "bpe_model": "bpe.model"} |
|
|
|
def __init__( |
|
self, |
|
vocab_file, |
|
bpe_model, |
|
unk_token="[UNK]", |
|
bos_token="<s>", |
|
cls_token="<s>", |
|
eos_token="</s>", |
|
pad_token="[PAD]", |
|
mask_token="[MASK]", |
|
sep_token="</s>", |
|
model_max_length=512, |
|
**kwargs |
|
): |
|
super().__init__( |
|
bos_token=bos_token, |
|
eos_token=eos_token, |
|
unk_token=unk_token, |
|
pad_token=pad_token, |
|
cls_token=cls_token, |
|
sep_token=sep_token, |
|
mask_token=mask_token, |
|
model_max_length=model_max_length, |
|
**kwargs |
|
) |
|
|
|
|
|
|
|
|
|
vocab_file = str(vocab_file) |
|
self.vocab_file = str(vocab_file) |
|
self.bpe_model_path = str(bpe_model) |
|
|
|
self.vocab_files_names = {'vocab_file': 'vocab.txt', 'bpe_model': 'bpe.model'} |
|
|
|
try: |
|
import stanza |
|
import youtokentome as yttm |
|
import fairseq |
|
except ImportError: |
|
raise ImportError("You need to install stanza, youtokentome and fairseq to use this tokenizer") |
|
|
|
if os.path.isfile(bpe_model): |
|
self.bpe = yttm.BPE(bpe_model, n_threads=-1) |
|
else: |
|
raise OSError("bpe_model should be a path to model file") |
|
|
|
self.nlp = stanza.Pipeline(lang='fr', |
|
processors='tokenize', |
|
tokenize_no_ssplit=True, |
|
use_gpu=True, tokenize_batch_size=128, verbose=False) |
|
|
|
self.vocab_file = vocab_file |
|
self.cache = {} |
|
self.dictionary = BertDictionary.load(vocab_file) |
|
self.dictionary.add_symbol(mask_token) |
|
|
|
self.vocab = OrderedDict([(key, val) for val, key in enumerate(self.dictionary.symbols)]) |
|
|
|
self.encoder = self.vocab |
|
self.decoder = {k: v for k, v in enumerate(self.dictionary.symbols)} |
|
|
|
@property |
|
def vocab_size(self) -> int: |
|
return len(self.vocab) |
|
|
|
def get_vocab(self): |
|
return dict(self.vocab) |
|
|
|
def save_vocabulary(self, save_directory: str, filename_prefix: Optional[str] = None) -> Tuple[str]: |
|
""" |
|
Save only the vocabulary of the tokenizer (vocabulary + added tokens). |
|
|
|
This method won't save the configuration and special token mappings of the tokenizer. Use |
|
[`~PreTrainedTokenizerFast._save_pretrained`] to save the whole state of the tokenizer. |
|
|
|
Args: |
|
save_directory (`str`): |
|
The directory in which to save the vocabulary. |
|
filename_prefix (`str`, *optional*): |
|
An optional prefix to add to the named of the saved files. |
|
|
|
Returns: |
|
`Tuple(str)`: Paths to the files saved. |
|
""" |
|
if not os.path.isdir(save_directory): |
|
exit(f"Provided path ({save_directory}) should be a directory") |
|
|
|
bpe_save_file = os.path.join(save_directory, (filename_prefix + "-" if filename_prefix else "") + "bpe.model") |
|
os.system(f"cp {self.bpe_model_path} {bpe_save_file}") |
|
self.bpe_model_path = bpe_save_file |
|
|
|
vocab_save_file = os.path.join(save_directory, (filename_prefix + "-" if filename_prefix else "") + "vocab.txt") |
|
os.system(f"cp {self.vocab_file} {vocab_save_file}") |
|
self.vocab_file = vocab_save_file |
|
|
|
return bpe_save_file, vocab_save_file |
|
|
|
def replace_brackets(self, sentence): |
|
|
|
sent = [None] * 10000 |
|
for j, tok in enumerate(sentence.tokens): |
|
if j > len(sent) - 1: |
|
break |
|
tok = tok.text |
|
if tok == "(": |
|
tok = "-LRB-" |
|
elif tok == ")": |
|
tok = "-RRB-" |
|
|
|
sent[j] = tok |
|
|
|
return sent[:len(sentence.tokens)] |
|
|
|
def _tokenize(self, text: str, **kwargs): |
|
"""Converts a string in a sequence of tokens (string), using the tokenizer. |
|
Split in words for word-based vocabulary or sub-words for sub-word-based vocabularies (BPE). |
|
""" |
|
sent = self.nlp([stanza.Document([], text=text)])[0].sentences[0] |
|
sent = ' '.join(self.replace_brackets(sent)) |
|
|
|
bpe = self.bpe.encode([sent], output_type=yttm.OutputType.SUBWORD)[0] |
|
return bpe |
|
|
|
|
|
def tokenize(self, text: Union[List[str], str], add_special_tokens=True, **kwargs): |
|
|
|
if isinstance(text, list): |
|
return list(map( |
|
lambda x: self.tokenize(x, add_special_tokens=add_special_tokens, **kwargs), |
|
text |
|
)) |
|
res = self._tokenize(text) |
|
if add_special_tokens: |
|
res = [self.bos_token] + res + [self.eos_token] |
|
return res |
|
|
|
def _convert_token_to_id(self, token): |
|
""" Converts a token (str) in an id using the vocab. """ |
|
return self.vocab.get(token, self.vocab.get(self.unk_token)) |
|
|
|
def _convert_id_to_token(self, index): |
|
"""Converts an index (integer) in a token (str) using the vocab.""" |
|
return self.decoder.get(index) |
|
|
|
def convert_tokens_to_string(self, tokens: List[str]): |
|
"""Converts a sequence of tokens (string) in a single string. """ |
|
if tokens[0] == self.bos_token: |
|
tokens = tokens[1:] |
|
if tokens[-1] == self.eos_token: |
|
tokens = tokens[:-1] |
|
return self.bpe.decode(list(map(self.bpe.subword_to_id, tokens)))[0] |
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_pretrained( |
|
self, |
|
save_directory: Union[str, os.PathLike], |
|
legacy_format: Optional[bool] = None, |
|
filename_prefix: Optional[str] = None, |
|
push_to_hub: bool = False, |
|
**kwargs, |
|
) -> Tuple[str]: |
|
|
|
""" |
|
Save the full tokenizer state. |
|
|
|
|
|
This method make sure the full tokenizer can then be re-loaded using the |
|
[`~tokenization_utils_base.PreTrainedTokenizer.from_pretrained`] class method.. |
|
|
|
Warning,None This won't save modifications you may have applied to the tokenizer after the instantiation (for |
|
instance, modifying `tokenizer.do_lower_case` after creation). |
|
|
|
Args: |
|
save_directory (`str` or `os.PathLike`): The path to a directory where the tokenizer will be saved. |
|
legacy_format (`bool`, *optional*): |
|
Only applicable for a fast tokenizer. If unset (default), will save the tokenizer in the unified JSON |
|
format as well as in legacy format if it exists, i.e. with tokenizer specific vocabulary and a separate |
|
added_tokens files. |
|
|
|
If `False`, will only save the tokenizer in the unified JSON format. This format is incompatible with |
|
"slow" tokenizers (not powered by the *tokenizers* library), so the tokenizer will not be able to be |
|
loaded in the corresponding "slow" tokenizer. |
|
|
|
If `True`, will save the tokenizer in legacy format. If the "slow" tokenizer doesn't exits, a value |
|
error is raised. |
|
filename_prefix: (`str`, *optional*): |
|
A prefix to add to the names of the files saved by the tokenizer. |
|
push_to_hub (`bool`, *optional*, defaults to `False`): |
|
Whether or not to push your model to the Hugging Face model hub after saving it. You can specify the |
|
repository you want to push to with `repo_id` (will default to the name of `save_directory` in your |
|
namespace). |
|
kwargs: |
|
Additional key word arguments passed along to the [`~utils.PushToHubMixin.push_to_hub`] method. |
|
|
|
Returns: |
|
A tuple of `str`: The files saved. |
|
""" |
|
if os.path.isfile(save_directory): |
|
logger.error(f"Provided path ({save_directory}) should be a directory, not a file") |
|
return |
|
|
|
os.makedirs(save_directory, exist_ok=True) |
|
|
|
if push_to_hub: |
|
commit_message = kwargs.pop("commit_message", None) |
|
repo_id = kwargs.pop("repo_id", save_directory.split(os.path.sep)[-1]) |
|
repo_id, token = self._create_repo(repo_id, **kwargs) |
|
files_timestamps = self._get_files_timestamps(save_directory) |
|
|
|
special_tokens_map_file = os.path.join( |
|
save_directory, (filename_prefix + "-" if filename_prefix else "") + SPECIAL_TOKENS_MAP_FILE |
|
) |
|
tokenizer_config_file = os.path.join( |
|
save_directory, (filename_prefix + "-" if filename_prefix else "") + TOKENIZER_CONFIG_FILE |
|
) |
|
|
|
tokenizer_config = copy.deepcopy(self.init_kwargs) |
|
|
|
|
|
|
|
target_keys = ["model_max_length"] |
|
for k in target_keys: |
|
if hasattr(self, k): |
|
tokenizer_config[k] = getattr(self, k) |
|
|
|
if len(self.init_inputs) > 0: |
|
tokenizer_config["init_inputs"] = copy.deepcopy(self.init_inputs) |
|
for file_id in self.vocab_files_names.keys(): |
|
tokenizer_config.pop(file_id, None) |
|
|
|
|
|
def convert_added_tokens(obj: Union[AddedToken, Any], add_type_field=True): |
|
if isinstance(obj, AddedToken): |
|
out = obj.__getstate__() |
|
if add_type_field: |
|
out["__type"] = "AddedToken" |
|
return out |
|
elif isinstance(obj, (list, tuple)): |
|
return list(convert_added_tokens(o, add_type_field=add_type_field) for o in obj) |
|
elif isinstance(obj, dict): |
|
return {k: convert_added_tokens(v, add_type_field=add_type_field) for k, v in obj.items()} |
|
return obj |
|
|
|
|
|
tokenizer_config = convert_added_tokens(tokenizer_config, add_type_field=True) |
|
|
|
|
|
tokenizer_class = self.__class__.__name__ |
|
|
|
if tokenizer_class.endswith("Fast") and tokenizer_class != "PreTrainedTokenizerFast": |
|
tokenizer_class = tokenizer_class[:-4] |
|
tokenizer_config["tokenizer_class"] = tokenizer_class |
|
|
|
|
|
if getattr(self, "_auto_map", None) is not None: |
|
tokenizer_config["auto_map"] = self._auto_map |
|
if getattr(self, "_processor_class", None) is not None: |
|
tokenizer_config["processor_class"] = self._processor_class |
|
|
|
|
|
|
|
if self._auto_class is not None: |
|
custom_object_save(self, save_directory, config=tokenizer_config) |
|
|
|
|
|
|
|
with open(tokenizer_config_file, "w", encoding="utf-8") as f: |
|
out_str = json.dumps(tokenizer_config, indent=2, sort_keys=True, ensure_ascii=False) + "\n" |
|
f.write(out_str) |
|
logger.info(f"tokenizer config file saved in {tokenizer_config_file}") |
|
|
|
|
|
write_dict = convert_added_tokens(self.special_tokens_map_extended, add_type_field=False) |
|
with open(special_tokens_map_file, "w", encoding="utf-8") as f: |
|
out_str = json.dumps(write_dict, indent=2, sort_keys=True, ensure_ascii=False) + "\n" |
|
f.write(out_str) |
|
logger.info(f"Special tokens file saved in {special_tokens_map_file}") |
|
|
|
file_names = (tokenizer_config_file, special_tokens_map_file) |
|
save_files = self._save_pretrained( |
|
save_directory=save_directory, |
|
file_names=file_names, |
|
legacy_format=legacy_format, |
|
filename_prefix=filename_prefix, |
|
) |
|
|
|
|
|
|
|
if push_to_hub: |
|
self._upload_modified_files( |
|
save_directory, repo_id, files_timestamps, commit_message=commit_message, token=token |
|
) |
|
|
|
return save_files |
|
|
|
def _save_pretrained( |
|
self, |
|
save_directory: Union[str, os.PathLike], |
|
file_names: Tuple[str], |
|
legacy_format: Optional[bool] = None, |
|
filename_prefix: Optional[str] = None, |
|
) -> Tuple[str]: |
|
""" |
|
Save a tokenizer using the slow-tokenizer/legacy format: vocabulary + added tokens. |
|
|
|
Fast tokenizers can also be saved in a unique JSON file containing {config + vocab + added-tokens} using the |
|
specific [`~tokenization_utils_fast.PreTrainedTokenizerFast._save_pretrained`] |
|
""" |
|
if legacy_format is False: |
|
raise ValueError( |
|
"Only fast tokenizers (instances of PreTrainedTokenizerFast) can be saved in non legacy format." |
|
) |
|
|
|
save_directory = str(save_directory) |
|
|
|
added_tokens_file = os.path.join( |
|
save_directory, (filename_prefix + "-" if filename_prefix else "") + ADDED_TOKENS_FILE |
|
) |
|
added_vocab = self.get_added_vocab() |
|
if added_vocab: |
|
with open(added_tokens_file, "w", encoding="utf-8") as f: |
|
out_str = json.dumps(added_vocab, indent=2, sort_keys=True, ensure_ascii=False) + "\n" |
|
f.write(out_str) |
|
logger.info(f"added tokens file saved in {added_tokens_file}") |
|
vocab_files = self.save_vocabulary(save_directory, filename_prefix=filename_prefix) |
|
|
|
return file_names + vocab_files + (added_tokens_file,) |
|
|
|
|
|
|