dosage/dosagelib/output.py

153 lines
4.6 KiB
Python
Raw Permalink Normal View History

# SPDX-License-Identifier: MIT
2016-10-28 22:21:41 +00:00
# Copyright (C) 2004-2008 Tristan Seligmann and Jonathan Jacobs
2014-01-05 15:50:57 +00:00
# Copyright (C) 2012-2014 Bastian Kleineidam
# Copyright (C) 2015-2020 Tobias Gruetzmacher
2013-04-29 18:25:05 +00:00
import codecs
import contextlib
import io
import os
import pydoc
import sys
import threading
import time
import traceback
from shutil import get_terminal_size
import colorama
from colorama import Fore, Style
2012-12-09 17:12:41 +00:00
2016-03-20 22:55:32 +00:00
2012-12-09 17:12:41 +00:00
lock = threading.Lock()
2012-06-20 19:58:13 +00:00
2016-03-20 22:55:32 +00:00
2014-01-05 15:17:34 +00:00
def get_threadname():
"""Return name of current thread."""
return threading.current_thread().name
2014-01-05 15:17:34 +00:00
2012-06-20 19:58:13 +00:00
class Output(object):
2012-09-26 14:47:39 +00:00
"""Print output with context, indentation and optional timestamps."""
DEFAULT_WIDTH = 80
2013-04-30 18:26:36 +00:00
def __init__(self, stream=None):
2012-09-26 14:47:39 +00:00
"""Initialize context and indentation."""
2014-01-05 16:37:13 +00:00
self.context = None
2012-06-20 19:58:13 +00:00
self.level = 0
self.timestamps = False
2013-04-30 18:26:36 +00:00
if stream is None:
colorama.init(wrap=False)
2013-04-30 18:26:36 +00:00
if hasattr(sys.stdout, "encoding") and sys.stdout.encoding:
self.encoding = sys.stdout.encoding
2013-04-29 18:25:05 +00:00
else:
self.encoding = 'utf-8'
2016-03-20 22:55:32 +00:00
if hasattr(sys.stdout, 'buffer'):
2013-04-30 18:26:36 +00:00
stream = sys.stdout.buffer
else:
stream = sys.stdout
stream = codecs.getwriter(self.encoding)(stream, 'replace')
if os.name == 'nt':
stream = colorama.AnsiToWin32(stream).stream
self.stream = stream
2020-10-04 20:12:34 +00:00
self.base_stream = stream
2012-06-20 19:58:13 +00:00
2012-12-07 23:45:18 +00:00
def info(self, s, level=0):
2012-12-12 16:41:29 +00:00
"""Write an informational message."""
2012-12-07 23:45:18 +00:00
self.write(s, level=level)
2013-02-07 17:28:40 +00:00
def debug(self, s, level=2):
2012-12-12 16:41:29 +00:00
"""Write a debug message."""
# "white" is the default color for most terminals...
self.write(s, level=level, color=Fore.WHITE)
2012-12-07 23:45:18 +00:00
def warn(self, s, level=0):
2012-12-12 16:41:29 +00:00
"""Write a warning message."""
self.write(u"WARN: %s" % s, level=level, color=Style.BRIGHT +
Fore.YELLOW)
2012-12-07 23:45:18 +00:00
def error(self, s, level=0):
2012-12-12 16:41:29 +00:00
"""Write an error message."""
self.write(u"ERROR: %s" % s, level=level, color=Style.DIM + Fore.RED)
2013-03-25 18:39:37 +00:00
def exception(self, s):
"""Write error message with traceback info."""
self.error(s)
type, value, tb = sys.exc_info()
self.writelines(traceback.format_stack(), 1)
self.writelines(traceback.format_tb(tb)[1:], 1)
self.writelines(traceback.format_exception_only(type, value), 1)
2012-12-07 23:45:18 +00:00
2012-12-12 16:41:29 +00:00
def write(self, s, level=0, color=None):
2012-09-26 14:47:39 +00:00
"""Write message with indentation, context and optional timestamp."""
2012-06-20 19:58:13 +00:00
if level > self.level:
return
2014-01-05 15:17:34 +00:00
if self.timestamps:
timestamp = time.strftime(u'%H:%M:%S ')
2012-06-20 19:58:13 +00:00
else:
timestamp = u''
2012-12-09 17:12:41 +00:00
with lock:
2020-10-04 20:12:34 +00:00
# FIXME: context is some kind of magic tri-state:
# - non-empty string
# - explicit None
# - anything falsy (empty string is used elsewhere)
2014-01-05 15:17:34 +00:00
if self.context:
self.stream.write(u'%s%s> ' % (timestamp, self.context))
2014-01-05 16:37:13 +00:00
elif self.context is None:
2014-01-05 15:17:34 +00:00
self.stream.write(u'%s%s> ' % (timestamp, get_threadname()))
if color and self.is_tty:
s = u'%s%s%s' % (color, s, Style.RESET_ALL)
2020-10-04 20:12:34 +00:00
self.stream.write(str(s) + os.linesep)
2012-12-12 16:41:29 +00:00
self.stream.flush()
2012-06-20 19:58:13 +00:00
def writelines(self, lines, level=0):
2012-09-26 14:47:39 +00:00
"""Write multiple messages."""
2012-06-20 19:58:13 +00:00
for line in lines:
2022-05-28 15:52:42 +00:00
for sline in line.rstrip(u'\n').split(u'\n'):
self.write(sline.rstrip(u'\n'), level=level)
2012-06-20 19:58:13 +00:00
@property
def width(self):
"""Get width of this output."""
if not self.is_tty:
return self.DEFAULT_WIDTH
try:
w = get_terminal_size().columns
if w <= 0:
return self.DEFAULT_WIDTH
return w
except ValueError:
return self.DEFAULT_WIDTH
@property
def is_tty(self):
"""Is this output stream a terminal?"""
2020-10-04 20:12:34 +00:00
return getattr(self.base_stream, "isatty", lambda: False)()
@contextlib.contextmanager
def temporary_context(self, context):
"""Run a block with a temporary output context"""
orig_context = self.context
self.context = context
try:
yield
finally:
self.context = orig_context
@contextlib.contextmanager
def pager(self):
"""Run the output of a block through a pager."""
try:
if self.is_tty:
fd = io.StringIO()
2020-10-04 20:12:34 +00:00
self.stream = fd
with self.temporary_context(u''):
yield
if self.is_tty:
pydoc.pager(fd.getvalue())
finally:
2020-10-04 20:12:34 +00:00
self.stream = self.base_stream
2012-06-20 19:58:13 +00:00
out = Output()