Spaces:
Runtime error
Runtime error
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file. | |
import re | |
import sys | |
import os | |
from .ansi import AnsiFore, AnsiBack, AnsiStyle, Style, BEL | |
from .winterm import enable_vt_processing, WinTerm, WinColor, WinStyle | |
from .win32 import windll, winapi_test | |
winterm = None | |
if windll is not None: | |
winterm = WinTerm() | |
class StreamWrapper(object): | |
''' | |
Wraps a stream (such as stdout), acting as a transparent proxy for all | |
attribute access apart from method 'write()', which is delegated to our | |
Converter instance. | |
''' | |
def __init__(self, wrapped, converter): | |
# double-underscore everything to prevent clashes with names of | |
# attributes on the wrapped stream object. | |
self.__wrapped = wrapped | |
self.__convertor = converter | |
def __getattr__(self, name): | |
return getattr(self.__wrapped, name) | |
def __enter__(self, *args, **kwargs): | |
# special method lookup bypasses __getattr__/__getattribute__, see | |
# https://stackoverflow.com/questions/12632894/why-doesnt-getattr-work-with-exit | |
# thus, contextlib magic methods are not proxied via __getattr__ | |
return self.__wrapped.__enter__(*args, **kwargs) | |
def __exit__(self, *args, **kwargs): | |
return self.__wrapped.__exit__(*args, **kwargs) | |
def __setstate__(self, state): | |
self.__dict__ = state | |
def __getstate__(self): | |
return self.__dict__ | |
def write(self, text): | |
self.__convertor.write(text) | |
def isatty(self): | |
stream = self.__wrapped | |
if 'PYCHARM_HOSTED' in os.environ: | |
if stream is not None and (stream is sys.__stdout__ or stream is sys.__stderr__): | |
return True | |
try: | |
stream_isatty = stream.isatty | |
except AttributeError: | |
return False | |
else: | |
return stream_isatty() | |
def closed(self): | |
stream = self.__wrapped | |
try: | |
return stream.closed | |
# AttributeError in the case that the stream doesn't support being closed | |
# ValueError for the case that the stream has already been detached when atexit runs | |
except (AttributeError, ValueError): | |
return True | |
class AnsiToWin32(object): | |
''' | |
Implements a 'write()' method which, on Windows, will strip ANSI character | |
sequences from the text, and if outputting to a tty, will convert them into | |
win32 function calls. | |
''' | |
ANSI_CSI_RE = re.compile('\001?\033\\[((?:\\d|;)*)([a-zA-Z])\002?') # Control Sequence Introducer | |
ANSI_OSC_RE = re.compile('\001?\033\\]([^\a]*)(\a)\002?') # Operating System Command | |
def __init__(self, wrapped, convert=None, strip=None, autoreset=False): | |
# The wrapped stream (normally sys.stdout or sys.stderr) | |
self.wrapped = wrapped | |
# should we reset colors to defaults after every .write() | |
self.autoreset = autoreset | |
# create the proxy wrapping our output stream | |
self.stream = StreamWrapper(wrapped, self) | |
on_windows = os.name == 'nt' | |
# We test if the WinAPI works, because even if we are on Windows | |
# we may be using a terminal that doesn't support the WinAPI | |
# (e.g. Cygwin Terminal). In this case it's up to the terminal | |
# to support the ANSI codes. | |
conversion_supported = on_windows and winapi_test() | |
try: | |
fd = wrapped.fileno() | |
except Exception: | |
fd = -1 | |
system_has_native_ansi = not on_windows or enable_vt_processing(fd) | |
have_tty = not self.stream.closed and self.stream.isatty() | |
need_conversion = conversion_supported and not system_has_native_ansi | |
# should we strip ANSI sequences from our output? | |
if strip is None: | |
strip = need_conversion or not have_tty | |
self.strip = strip | |
# should we should convert ANSI sequences into win32 calls? | |
if convert is None: | |
convert = need_conversion and have_tty | |
self.convert = convert | |
# dict of ansi codes to win32 functions and parameters | |
self.win32_calls = self.get_win32_calls() | |
# are we wrapping stderr? | |
self.on_stderr = self.wrapped is sys.stderr | |
def should_wrap(self): | |
''' | |
True if this class is actually needed. If false, then the output | |
stream will not be affected, nor will win32 calls be issued, so | |
wrapping stdout is not actually required. This will generally be | |
False on non-Windows platforms, unless optional functionality like | |
autoreset has been requested using kwargs to init() | |
''' | |
return self.convert or self.strip or self.autoreset | |
def get_win32_calls(self): | |
if self.convert and winterm: | |
return { | |
AnsiStyle.RESET_ALL: (winterm.reset_all, ), | |
AnsiStyle.BRIGHT: (winterm.style, WinStyle.BRIGHT), | |
AnsiStyle.DIM: (winterm.style, WinStyle.NORMAL), | |
AnsiStyle.NORMAL: (winterm.style, WinStyle.NORMAL), | |
AnsiFore.BLACK: (winterm.fore, WinColor.BLACK), | |
AnsiFore.RED: (winterm.fore, WinColor.RED), | |
AnsiFore.GREEN: (winterm.fore, WinColor.GREEN), | |
AnsiFore.YELLOW: (winterm.fore, WinColor.YELLOW), | |
AnsiFore.BLUE: (winterm.fore, WinColor.BLUE), | |
AnsiFore.MAGENTA: (winterm.fore, WinColor.MAGENTA), | |
AnsiFore.CYAN: (winterm.fore, WinColor.CYAN), | |
AnsiFore.WHITE: (winterm.fore, WinColor.GREY), | |
AnsiFore.RESET: (winterm.fore, ), | |
AnsiFore.LIGHTBLACK_EX: (winterm.fore, WinColor.BLACK, True), | |
AnsiFore.LIGHTRED_EX: (winterm.fore, WinColor.RED, True), | |
AnsiFore.LIGHTGREEN_EX: (winterm.fore, WinColor.GREEN, True), | |
AnsiFore.LIGHTYELLOW_EX: (winterm.fore, WinColor.YELLOW, True), | |
AnsiFore.LIGHTBLUE_EX: (winterm.fore, WinColor.BLUE, True), | |
AnsiFore.LIGHTMAGENTA_EX: (winterm.fore, WinColor.MAGENTA, True), | |
AnsiFore.LIGHTCYAN_EX: (winterm.fore, WinColor.CYAN, True), | |
AnsiFore.LIGHTWHITE_EX: (winterm.fore, WinColor.GREY, True), | |
AnsiBack.BLACK: (winterm.back, WinColor.BLACK), | |
AnsiBack.RED: (winterm.back, WinColor.RED), | |
AnsiBack.GREEN: (winterm.back, WinColor.GREEN), | |
AnsiBack.YELLOW: (winterm.back, WinColor.YELLOW), | |
AnsiBack.BLUE: (winterm.back, WinColor.BLUE), | |
AnsiBack.MAGENTA: (winterm.back, WinColor.MAGENTA), | |
AnsiBack.CYAN: (winterm.back, WinColor.CYAN), | |
AnsiBack.WHITE: (winterm.back, WinColor.GREY), | |
AnsiBack.RESET: (winterm.back, ), | |
AnsiBack.LIGHTBLACK_EX: (winterm.back, WinColor.BLACK, True), | |
AnsiBack.LIGHTRED_EX: (winterm.back, WinColor.RED, True), | |
AnsiBack.LIGHTGREEN_EX: (winterm.back, WinColor.GREEN, True), | |
AnsiBack.LIGHTYELLOW_EX: (winterm.back, WinColor.YELLOW, True), | |
AnsiBack.LIGHTBLUE_EX: (winterm.back, WinColor.BLUE, True), | |
AnsiBack.LIGHTMAGENTA_EX: (winterm.back, WinColor.MAGENTA, True), | |
AnsiBack.LIGHTCYAN_EX: (winterm.back, WinColor.CYAN, True), | |
AnsiBack.LIGHTWHITE_EX: (winterm.back, WinColor.GREY, True), | |
} | |
return dict() | |
def write(self, text): | |
if self.strip or self.convert: | |
self.write_and_convert(text) | |
else: | |
self.wrapped.write(text) | |
self.wrapped.flush() | |
if self.autoreset: | |
self.reset_all() | |
def reset_all(self): | |
if self.convert: | |
self.call_win32('m', (0,)) | |
elif not self.strip and not self.stream.closed: | |
self.wrapped.write(Style.RESET_ALL) | |
def write_and_convert(self, text): | |
''' | |
Write the given text to our wrapped stream, stripping any ANSI | |
sequences from the text, and optionally converting them into win32 | |
calls. | |
''' | |
cursor = 0 | |
text = self.convert_osc(text) | |
for match in self.ANSI_CSI_RE.finditer(text): | |
start, end = match.span() | |
self.write_plain_text(text, cursor, start) | |
self.convert_ansi(*match.groups()) | |
cursor = end | |
self.write_plain_text(text, cursor, len(text)) | |
def write_plain_text(self, text, start, end): | |
if start < end: | |
self.wrapped.write(text[start:end]) | |
self.wrapped.flush() | |
def convert_ansi(self, paramstring, command): | |
if self.convert: | |
params = self.extract_params(command, paramstring) | |
self.call_win32(command, params) | |
def extract_params(self, command, paramstring): | |
if command in 'Hf': | |
params = tuple(int(p) if len(p) != 0 else 1 for p in paramstring.split(';')) | |
while len(params) < 2: | |
# defaults: | |
params = params + (1,) | |
else: | |
params = tuple(int(p) for p in paramstring.split(';') if len(p) != 0) | |
if len(params) == 0: | |
# defaults: | |
if command in 'JKm': | |
params = (0,) | |
elif command in 'ABCD': | |
params = (1,) | |
return params | |
def call_win32(self, command, params): | |
if command == 'm': | |
for param in params: | |
if param in self.win32_calls: | |
func_args = self.win32_calls[param] | |
func = func_args[0] | |
args = func_args[1:] | |
kwargs = dict(on_stderr=self.on_stderr) | |
func(*args, **kwargs) | |
elif command in 'J': | |
winterm.erase_screen(params[0], on_stderr=self.on_stderr) | |
elif command in 'K': | |
winterm.erase_line(params[0], on_stderr=self.on_stderr) | |
elif command in 'Hf': # cursor position - absolute | |
winterm.set_cursor_position(params, on_stderr=self.on_stderr) | |
elif command in 'ABCD': # cursor position - relative | |
n = params[0] | |
# A - up, B - down, C - forward, D - back | |
x, y = {'A': (0, -n), 'B': (0, n), 'C': (n, 0), 'D': (-n, 0)}[command] | |
winterm.cursor_adjust(x, y, on_stderr=self.on_stderr) | |
def convert_osc(self, text): | |
for match in self.ANSI_OSC_RE.finditer(text): | |
start, end = match.span() | |
text = text[:start] + text[end:] | |
paramstring, command = match.groups() | |
if command == BEL: | |
if paramstring.count(";") == 1: | |
params = paramstring.split(";") | |
# 0 - change title and icon (we will only change title) | |
# 1 - change icon (we don't support this) | |
# 2 - change title | |
if params[0] in '02': | |
winterm.set_title(params[1]) | |
return text | |
def flush(self): | |
self.wrapped.flush() | |