Source code for astropy.utils.console

# -*- coding: utf-8 -*-
# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Utilities for console input and output.
"""
from __future__ import division, print_function

import re
import math
import sys
import time

try:
    get_ipython()
except NameError:
    OutStream = None
else:
    try:
        from IPython.zmq.iostream import OutStream
    except ImportError:
        OutStream = None

from ..config import ConfigurationItem


__all__ = [
    'isatty', 'color_print', 'human_time', 'ProgressBar', 'Spinner',
    'print_code_line', 'ProgressBarOrSpinner']


USE_COLOR = ConfigurationItem(
    'use_color', True,
    'When True, use ANSI color escape sequences when writing to the console.')
USE_UNICODE = ConfigurationItem(
    'use_unicode', True,
    'Use Unicode characters when drawing progress bars etc. at the console.')


[docs]def isatty(file): """ Returns `True` if `file` is a tty. Most built-in Python file-like objects have an `isatty` member, but some user-defined types may not, so this assumes those are not ttys. """ if (OutStream is not None and isinstance(file, OutStream) and file.name == 'stdout'): return True elif hasattr(file, 'isatty'): return file.isatty() return False
def _color_text(text, color): """ Returns a string wrapped in ANSI color codes for coloring the text in a terminal:: colored_text = color_text('Here is a message', 'blue') This won't actually effect the text until it is printed to the terminal. Parameters ---------- text : str The string to return, bounded by the color codes. color : str An ANSI terminal color name. Must be one of: black, red, green, brown, blue, magenta, cyan, lightgrey, default, darkgrey, lightred, lightgreen, yellow, lightblue, lightmagenta, lightcyan, white, or '' (the empty string). """ color_mapping = { 'black': '0;30', 'red': '0;31', 'green': '0;32', 'brown': '0;33', 'blue': '0;34', 'magenta': '0;35', 'cyan': '0;36', 'lightgrey': '0;37', 'default': '0;39', 'darkgrey': '1;30', 'lightred': '1;31', 'lightgreen': '1;32', 'yellow': '1;33', 'lightblue': '1;34', 'lightmagenta': '1;35', 'lightcyan': '1;36', 'white': '1;37'} color_code = color_mapping.get(color, '0;39') return u'\033[{0}m{1}\033[0m'.format(color_code, text)
[docs]def color_print(*args, **kwargs): """ Prints colors and styles to the terminal uses ANSI escape sequences. :: color_print('This is the color ', 'default', 'GREEN', 'green') Parameters ---------- positional args : strings The positional arguments come in pairs (*msg*, *color*), where *msg* is the string to display and *color* is the color to display it in. *color* is an ANSI terminal color name. Must be one of: black, red, green, brown, blue, magenta, cyan, lightgrey, default, darkgrey, lightred, lightgreen, yellow, lightblue, lightmagenta, lightcyan, white, or '' (the empty string). file : writeable file-like object, optional Where to write to. Defaults to `sys.stdout`. If file is not a tty (as determined by calling its `isatty` member, if one exists), no coloring will be included. end : str, optional The ending of the message. Defaults to ``\\n``. The end will be printed after resetting any color or font state. """ file = kwargs.get('file', sys.stdout) end = kwargs.get('end', u'\n') write = file.write if isatty(file) and USE_COLOR(): for i in xrange(0, len(args), 2): msg = args[i] if i + 1 == len(args): color = '' else: color = args[i + 1] if isinstance(msg, bytes): msg = msg.decode('ascii') if color == u'' or color is None: write(msg) else: write(_color_text(msg, color)) write(end) else: for i in xrange(0, len(args), 2): msg = args[i] if isinstance(msg, bytes): msg = msg.decode('ascii') write(msg) write(end)
def strip_ansi_codes(s): """ Remove ANSI color codes from the string. """ return re.sub('\033\[([0-9]+)(;[0-9]+)*m', '', s)
[docs]def human_time(seconds): """ Returns a human-friendly time string that is always exactly 6 characters long. Depending on the number of seconds given, can be one of:: 1w 3d 2d 4h 1h 5m 1m 4s 15s Will be in color if console coloring is turned on. Parameters ---------- seconds : int The number of seconds to represent Returns ------- time : str A human-friendly representation of the given number of seconds that is always exactly 6 characters. """ units = [ (u'y', 60 * 60 * 24 * 7 * 52), (u'w', 60 * 60 * 24 * 7), (u'd', 60 * 60 * 24), (u'h', 60 * 60), (u'm', 60), (u's', 1), ] seconds = int(seconds) if seconds < 60: return u' {0:02d}s'.format(seconds) for i in xrange(len(units) - 1): unit1, limit1 = units[i] unit2, limit2 = units[i + 1] if seconds >= limit1: return u'{0:02d}{1}{2:02d}{3}'.format( seconds // limit1, unit1, (seconds % limit1) // limit2, unit2) return u' ~inf'
[docs]class ProgressBar: """ A class to display a progress bar in the terminal. It is designed to be used with the `with` statement:: with ProgressBar(len(items)) as bar: for item in enumerate(items): bar.update() """ def __init__(self, total, file=sys.stdout): """ Parameters ---------- total : int The number of increments in the process being tracked. file : writable file-like object, optional The file to write the progress bar to. Defaults to `sys.stdout`. If `file` is not a tty (as determined by calling its `isatty` member, if any), the scrollbar will be completely silent. """ if not isatty(file): self.update = self._silent_update self._silent = True else: self._silent = False self._total = total self._file = file self._start_time = time.time() terminal_width = 78 if sys.platform.startswith('linux'): import subprocess p = subprocess.Popen( 'stty size', shell=True, stdout=subprocess.PIPE) stdout, stderr = p.communicate() parts = stdout.split() if len(parts) == 2: rows, cols = parts terminal_width = int(cols) self._bar_length = terminal_width - 36 num_scale = int(math.floor(math.log(self._total) / math.log(1000))) if num_scale > 7: self._suffix = '?' else: suffixes = u' kMGTPEH' self._suffix = suffixes[num_scale] self._num_scale = int(math.pow(1000, num_scale)) self.update(0) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): if not self._silent: if exc_type is None: self.update(self._total) self._file.write('\n') self._file.flush()
[docs] def update(self, value=None): """ Update the progress bar to the given value (out of the total given to the constructor. """ if value is None: value = self._current_value = self._current_value + 1 else: self._current_value = value if self._total == 0: frac = 1.0 else: frac = float(value) / float(self._total) file = self._file write = file.write bar_fill = int(float(self._bar_length) * frac) write(u'\r|') color_print(u'=' * bar_fill, 'blue', file=file, end=u'') if bar_fill < self._bar_length: color_print(u'>', 'green', file=file, end=u'') write(u'-' * (self._bar_length - bar_fill - 1)) write(u'|') if value >= self._total: t = time.time() - self._start_time prefix = u' ' elif value <= 0: t = None prefix = u'' else: t = ((time.time() - self._start_time) * (1.0 - frac)) / frac prefix = u' ETA ' write(u' {0:>3d}/{1:>3d}{2}'.format( value // self._num_scale, self._total // self._num_scale, self._suffix)) write(u' ({0:>6s}%)'.format(u'{0:.2f}'.format(frac * 100.0))) write(prefix) if t is not None: write(human_time(t)) self._file.flush()
def _silent_update(self, value=None): pass @classmethod
[docs] def map(cls, function, items, multiprocess=False, file=sys.stdout): """ Does a `map` operation while displaying a progress bar with percentage complete. :: def work(i): print(i) ProgressBar.map(work, range(50)) Parameters ---------- function : function Function to call for each step items : sequence Sequence where each element is a tuple of arguments to pass to *function*. multiprocess : bool, optional If `True`, use the `multiprocessing` module to distribute each task to a different processor core. file : writeable file-like object, optional The file to write the progress bar to. Defaults to `sys.stdout`. If `file` is not a tty (as determined by calling its `isatty` member, if any), the scrollbar will be completely silent. """ with cls(len(items), file=file) as bar: step_size = max(200, bar._bar_length) steps = max(int(float(len(items)) / step_size), 1) if not multiprocess: for i, item in enumerate(items): function(item) if (i % steps) == 0: bar.update(i) else: import multiprocessing p = multiprocessing.Pool() for i, _ in enumerate( p.imap_unordered(function, items, steps)): bar.update(i)
@classmethod
[docs] def iterate(cls, items, file=sys.stdout): """ Iterate over a sequence while indicating progress with a progress bar in the terminal. :: for item in ProgressBar.iterate(items): pass Parameters ---------- items : sequence A sequence of items to iterate over file : writeable file-like object, optional The file to write the progress bar to. Defaults to `sys.stdout`. If `file` is not a tty (as determined by calling its `isatty` member, if any), the scrollbar will be completely silent. Returns ------- generator : A generator over `items` """ with cls(len(items), file=file) as bar: for item in items: yield item bar.update()
[docs]class Spinner(): """ A class to display a spinner in the terminal. It is designed to be used with the `with` statement:: with Spinner("Reticulating splines", "green") as s: for item in enumerate(items): s.next() """ _default_unicode_chars = u"◓◑◒◐" _default_ascii_chars = u"-/|\\" def __init__(self, msg, color='default', file=sys.stdout, step=1, chars=None): """ Parameters ---------- msg : str The message to print color : str, optional An ANSI terminal color name. Must be one of: black, red, green, brown, blue, magenta, cyan, lightgrey, default, darkgrey, lightred, lightgreen, yellow, lightblue, lightmagenta, lightcyan, white. file : writeable file-like object, optional The file to write the spinner to. Defaults to `sys.stdout`. If `file` is not a tty (as determined by calling its `isatty` member, if any), the scrollbar will be completely silent. step : int, optional Only update the spinner every *step* steps chars : str, optional The character sequence to use for the spinner """ self._msg = msg self._color = color self._file = file self._step = step if chars is None: if USE_UNICODE: chars = self._default_unicode_chars else: chars = self._default_ascii_chars self._chars = chars self._silent = not isatty(file) def _iterator(self): chars = self._chars index = 0 file = self._file write = file.write flush = file.flush while True: write(u'\r') color_print(self._msg, self._color, file=file, end=u'') write(u' ') write(chars[index]) flush() yield for i in xrange(self._step): yield index += 1 if index == len(chars): index = 0 def __enter__(self): if self._silent: return self._silent_iterator() else: return self._iterator() def __exit__(self, exc_type, exc_value, traceback): file = self._file write = file.write flush = file.flush if not self._silent: write(u'\r') color_print(self._msg, self._color, file=file, end=u'') if exc_type is None: color_print(u' [Done]', 'green', file=file) else: color_print(u' [Failed]', 'red', file=file) flush() def _silent_iterator(self): color_print(self._msg, self._color, file=self._file, end=u'') self._file.flush() while True: yield
[docs]class ProgressBarOrSpinner: """ A class that displays either a `ProgressBar` or `Spinner` depending on whether the total size of the operation is known or not. It is designed to be used with the `with` statement:: if file.has_length(): length = file.get_length() else: length = None bytes_read = 0 with ProgressBarOrSpinner(length) as bar: while file.read(blocksize): bytes_read += blocksize bar.update(bytes_read) """ def __init__(self, total, msg, color='default', file=sys.stdout): """ Parameters ---------- total : int or None If an int, the number of increments in the process being tracked and a `ProgressBar` is displayed. If `None`, a `Spinner` is displayed. msg : str The message to display above the `ProgressBar` or alongside the `Spinner`. color : str, optional The color of `msg`, if any. Must be an ANSI terminal color name. Must be one of: black, red, green, brown, blue, magenta, cyan, lightgrey, default, darkgrey, lightred, lightgreen, yellow, lightblue, lightmagenta, lightcyan, white. file : writable file-like object, optional The file to write the to. Defaults to `sys.stdout`. If `file` is not a tty (as determined by calling its `isatty` member, if any), only `msg` will be displayed: the `ProgressBar` or `Spinner` will be silent. """ if total is None or not isatty(file): self._is_spinner = True self._obj = Spinner(msg, color=color, file=file) else: self._is_spinner = False color_print(msg, color, file=file) self._obj = ProgressBar(total, file=file) def __enter__(self): self._iter = self._obj.__enter__() return self def __exit__(self, exc_type, exc_value, traceback): return self._obj.__exit__(exc_type, exc_value, traceback)
[docs] def update(self, value): """ Update the progress bar to the given value (out of the total given to the constructor. """ if self._is_spinner: self._iter.next() else: self._obj.update(value)
class Getch(object): """Get a single character from standard input without screen echo. Returns ------- char : str (one character) """ def __init__(self): try: self.impl = _GetchWindows() except ImportError: try: self.impl = _GetchMacCarbon() except (ImportError, AttributeError): self.impl = _GetchUnix() def __call__(self): return self.impl() class _GetchUnix(object): def __init__(self): import tty import sys import termios # import termios now or else you'll get the Unix # version on the Mac def __call__(self): import sys import tty import termios fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: tty.setraw(sys.stdin.fileno()) ch = sys.stdin.read(1) finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) return ch class _GetchWindows(object): def __init__(self): import msvcrt def __call__(self): import msvcrt return msvcrt.getch() class _GetchMacCarbon: """ A function which returns the current ASCII key that is down; if no ASCII key is down, the null string is returned. The page http://www.mactech.com/macintosh-c/chap02-1.html was very helpful in figuring out how to do this. """ def __init__(self): import Carbon Carbon.Evt # see if it has this (in Unix, it doesn't) def __call__(self): import Carbon if Carbon.Evt.EventAvail(0x0008)[0] == 0: # 0x0008 is the keyDownMask return '' else: # # The event contains the following info: # (what,msg,when,where,mod)=Carbon.Evt.GetNextEvent(0x0008)[1] # # The message (msg) contains the ASCII char which is # extracted with the 0x000000FF charCodeMask; this # number is converted to an ASCII character with chr() and # returned # (what, msg, when, where, mod) = Carbon.Evt.GetNextEvent(0x0008)[1] return chr(msg & 0x000000FF)

Page Contents