Spaces:
Paused
Paused
| """ | |
| class Ruler | |
| Helper class, used by [[MarkdownIt#core]], [[MarkdownIt#block]] and | |
| [[MarkdownIt#inline]] to manage sequences of functions (rules): | |
| - keep rules in defined order | |
| - assign the name to each rule | |
| - enable/disable rules | |
| - add/replace rules | |
| - allow assign rules to additional named chains (in the same) | |
| - caching lists of active rules | |
| You will not need use this class directly until write plugins. For simple | |
| rules control use [[MarkdownIt.disable]], [[MarkdownIt.enable]] and | |
| [[MarkdownIt.use]]. | |
| """ | |
| from __future__ import annotations | |
| from collections.abc import Iterable | |
| from dataclasses import dataclass, field | |
| from typing import TYPE_CHECKING, Generic, TypedDict, TypeVar | |
| import warnings | |
| from markdown_it._compat import DATACLASS_KWARGS | |
| from .utils import EnvType | |
| if TYPE_CHECKING: | |
| from markdown_it import MarkdownIt | |
| class StateBase: | |
| def __init__(self, src: str, md: MarkdownIt, env: EnvType): | |
| self.src = src | |
| self.env = env | |
| self.md = md | |
| def src(self) -> str: | |
| return self._src | |
| def src(self, value: str) -> None: | |
| self._src = value | |
| self._srcCharCode: tuple[int, ...] | None = None | |
| def srcCharCode(self) -> tuple[int, ...]: | |
| warnings.warn( | |
| "StateBase.srcCharCode is deprecated. Use StateBase.src instead.", | |
| DeprecationWarning, | |
| stacklevel=2, | |
| ) | |
| if self._srcCharCode is None: | |
| self._srcCharCode = tuple(ord(c) for c in self._src) | |
| return self._srcCharCode | |
| class RuleOptionsType(TypedDict, total=False): | |
| alt: list[str] | |
| RuleFuncTv = TypeVar("RuleFuncTv") | |
| """A rule function, whose signature is dependent on the state type.""" | |
| class Rule(Generic[RuleFuncTv]): | |
| name: str | |
| enabled: bool | |
| fn: RuleFuncTv = field(repr=False) | |
| alt: list[str] | |
| class Ruler(Generic[RuleFuncTv]): | |
| def __init__(self) -> None: | |
| # List of added rules. | |
| self.__rules__: list[Rule[RuleFuncTv]] = [] | |
| # Cached rule chains. | |
| # First level - chain name, '' for default. | |
| # Second level - diginal anchor for fast filtering by charcodes. | |
| self.__cache__: dict[str, list[RuleFuncTv]] | None = None | |
| def __find__(self, name: str) -> int: | |
| """Find rule index by name""" | |
| for i, rule in enumerate(self.__rules__): | |
| if rule.name == name: | |
| return i | |
| return -1 | |
| def __compile__(self) -> None: | |
| """Build rules lookup cache""" | |
| chains = {""} | |
| # collect unique names | |
| for rule in self.__rules__: | |
| if not rule.enabled: | |
| continue | |
| for name in rule.alt: | |
| chains.add(name) | |
| self.__cache__ = {} | |
| for chain in chains: | |
| self.__cache__[chain] = [] | |
| for rule in self.__rules__: | |
| if not rule.enabled: | |
| continue | |
| if chain and (chain not in rule.alt): | |
| continue | |
| self.__cache__[chain].append(rule.fn) | |
| def at( | |
| self, ruleName: str, fn: RuleFuncTv, options: RuleOptionsType | None = None | |
| ) -> None: | |
| """Replace rule by name with new function & options. | |
| :param ruleName: rule name to replace. | |
| :param fn: new rule function. | |
| :param options: new rule options (not mandatory). | |
| :raises: KeyError if name not found | |
| """ | |
| index = self.__find__(ruleName) | |
| options = options or {} | |
| if index == -1: | |
| raise KeyError(f"Parser rule not found: {ruleName}") | |
| self.__rules__[index].fn = fn | |
| self.__rules__[index].alt = options.get("alt", []) | |
| self.__cache__ = None | |
| def before( | |
| self, | |
| beforeName: str, | |
| ruleName: str, | |
| fn: RuleFuncTv, | |
| options: RuleOptionsType | None = None, | |
| ) -> None: | |
| """Add new rule to chain before one with given name. | |
| :param beforeName: new rule will be added before this one. | |
| :param ruleName: new rule will be added before this one. | |
| :param fn: new rule function. | |
| :param options: new rule options (not mandatory). | |
| :raises: KeyError if name not found | |
| """ | |
| index = self.__find__(beforeName) | |
| options = options or {} | |
| if index == -1: | |
| raise KeyError(f"Parser rule not found: {beforeName}") | |
| self.__rules__.insert( | |
| index, Rule[RuleFuncTv](ruleName, True, fn, options.get("alt", [])) | |
| ) | |
| self.__cache__ = None | |
| def after( | |
| self, | |
| afterName: str, | |
| ruleName: str, | |
| fn: RuleFuncTv, | |
| options: RuleOptionsType | None = None, | |
| ) -> None: | |
| """Add new rule to chain after one with given name. | |
| :param afterName: new rule will be added after this one. | |
| :param ruleName: new rule will be added after this one. | |
| :param fn: new rule function. | |
| :param options: new rule options (not mandatory). | |
| :raises: KeyError if name not found | |
| """ | |
| index = self.__find__(afterName) | |
| options = options or {} | |
| if index == -1: | |
| raise KeyError(f"Parser rule not found: {afterName}") | |
| self.__rules__.insert( | |
| index + 1, Rule[RuleFuncTv](ruleName, True, fn, options.get("alt", [])) | |
| ) | |
| self.__cache__ = None | |
| def push( | |
| self, ruleName: str, fn: RuleFuncTv, options: RuleOptionsType | None = None | |
| ) -> None: | |
| """Push new rule to the end of chain. | |
| :param ruleName: new rule will be added to the end of chain. | |
| :param fn: new rule function. | |
| :param options: new rule options (not mandatory). | |
| """ | |
| self.__rules__.append( | |
| Rule[RuleFuncTv](ruleName, True, fn, (options or {}).get("alt", [])) | |
| ) | |
| self.__cache__ = None | |
| def enable( | |
| self, names: str | Iterable[str], ignoreInvalid: bool = False | |
| ) -> list[str]: | |
| """Enable rules with given names. | |
| :param names: name or list of rule names to enable. | |
| :param ignoreInvalid: ignore errors when rule not found | |
| :raises: KeyError if name not found and not ignoreInvalid | |
| :return: list of found rule names | |
| """ | |
| if isinstance(names, str): | |
| names = [names] | |
| result: list[str] = [] | |
| for name in names: | |
| idx = self.__find__(name) | |
| if (idx < 0) and ignoreInvalid: | |
| continue | |
| if (idx < 0) and not ignoreInvalid: | |
| raise KeyError(f"Rules manager: invalid rule name {name}") | |
| self.__rules__[idx].enabled = True | |
| result.append(name) | |
| self.__cache__ = None | |
| return result | |
| def enableOnly( | |
| self, names: str | Iterable[str], ignoreInvalid: bool = False | |
| ) -> list[str]: | |
| """Enable rules with given names, and disable everything else. | |
| :param names: name or list of rule names to enable. | |
| :param ignoreInvalid: ignore errors when rule not found | |
| :raises: KeyError if name not found and not ignoreInvalid | |
| :return: list of found rule names | |
| """ | |
| if isinstance(names, str): | |
| names = [names] | |
| for rule in self.__rules__: | |
| rule.enabled = False | |
| return self.enable(names, ignoreInvalid) | |
| def disable( | |
| self, names: str | Iterable[str], ignoreInvalid: bool = False | |
| ) -> list[str]: | |
| """Disable rules with given names. | |
| :param names: name or list of rule names to enable. | |
| :param ignoreInvalid: ignore errors when rule not found | |
| :raises: KeyError if name not found and not ignoreInvalid | |
| :return: list of found rule names | |
| """ | |
| if isinstance(names, str): | |
| names = [names] | |
| result = [] | |
| for name in names: | |
| idx = self.__find__(name) | |
| if (idx < 0) and ignoreInvalid: | |
| continue | |
| if (idx < 0) and not ignoreInvalid: | |
| raise KeyError(f"Rules manager: invalid rule name {name}") | |
| self.__rules__[idx].enabled = False | |
| result.append(name) | |
| self.__cache__ = None | |
| return result | |
| def getRules(self, chainName: str = "") -> list[RuleFuncTv]: | |
| """Return array of active functions (rules) for given chain name. | |
| It analyzes rules configuration, compiles caches if not exists and returns result. | |
| Default chain name is `''` (empty string). It can't be skipped. | |
| That's done intentionally, to keep signature monomorphic for high speed. | |
| """ | |
| if self.__cache__ is None: | |
| self.__compile__() | |
| assert self.__cache__ is not None | |
| # Chain can be empty, if rules disabled. But we still have to return Array. | |
| return self.__cache__.get(chainName, []) or [] | |
| def get_all_rules(self) -> list[str]: | |
| """Return all available rule names.""" | |
| return [r.name for r in self.__rules__] | |
| def get_active_rules(self) -> list[str]: | |
| """Return the active rule names.""" | |
| return [r.name for r in self.__rules__ if r.enabled] | |