Class the starts a subprocess and keeps it around to communicate with it
via stdin. For each instruction send over stdin, a response is read and
returned. The response structure is determined by "output_proc"


import logging
import os
import queue
import sys
import warnings
from subprocess import TimeoutExpired
from typing import (
from datetime import datetime
from operator import attrgetter
from weakref import WeakValueDictionary

# start of legacy import block
# to avoid breakage of code written before datalad.runner
from datalad.runner.coreprotocols import (
from datalad.runner.gitrunner import (
from datalad.runner.nonasyncrunner import run_command
from datalad.runner.protocol import WitlessProtocol
from datalad.runner.runner import WitlessRunner
from import CommandError
# end of legacy import block

from datalad.runner.coreprotocols import StdOutErrCapture
from datalad.runner.nonasyncrunner import (
from datalad.runner.protocol import GeneratorMixIn
from datalad.runner.runner import WitlessRunner
from datalad.runner.utils import LineSplitter
from datalad.utils import (

lgr = logging.getLogger('datalad.cmd')

# TODO unused?
# In python3 to split byte stream on newline, it must be bytes
linesep_bytes = os.linesep.encode()

# TODO unused?
_TEMP_std = sys.stdout, sys.stderr

# TODO unused?
# To be used in the temp file name to distinguish the ones we create
# in Runner so we take care about their removal, in contrast to those
# which might be created outside and passed into Runner
_MAGICAL_OUTPUT_MARKER = "_runneroutput_"

[docs]def readline_rstripped(stdout): warnings.warn("the function `readline_rstripped()` is deprecated " "and will be removed in a future release", DeprecationWarning) return _readline_rstripped(stdout)
def _readline_rstripped(stdout): """Internal helper for BatchedCommand""" return stdout.readline().rstrip()
[docs]class BatchedCommandProtocol(GeneratorMixIn, StdOutErrCapture): def __init__(self, batched_command: "BatchedCommand", done_future: Any = None, encoding: Optional[str] = None, output_proc: Callable = None, ): GeneratorMixIn.__init__(self) StdOutErrCapture.__init__(self, done_future, encoding) self.batched_command = batched_command self.output_proc = output_proc self.line_splitter = LineSplitter()
[docs] def pipe_data_received(self, fd: int, data: bytes): if fd == STDERR_FILENO: self.send_result((fd, data)) elif fd == STDOUT_FILENO: for line in self.line_splitter.process(data.decode(self.encoding)): self.send_result((fd, line)) else: raise ValueError(f"unknown file descriptor: {fd}")
[docs] def pipe_connection_lost(self, fd: int, exc: Optional[Exception]): if fd == STDOUT_FILENO: remaining_line = self.line_splitter.finish_processing() if remaining_line is not None: lgr.debug(f"unterminated line: {remaining_line}") self.send_result((fd, remaining_line))
[docs] def timeout(self, fd: Optional[int]) -> bool: timeout_error = self.batched_command.get_timeout_exception(fd) if timeout_error: raise timeout_error self.send_result(("timeout", fd)) return False
[docs]class ReadlineEmulator: """ This class implements readline() on the basis of an instance of BatchedCommand. Its purpose is to emulate stdout's for output_procs, This allows us to provide a BatchedCommand API that is identical to the old version, but with an implementation that is based on the threaded runner. """ def __init__(self, batched_command: "BatchedCommand"): self.batched_command = batched_command
[docs] def readline(self): """ Read from the stdout provider until we have a line or None (which indicates some error). """ return self.batched_command.get_one_line()
[docs]class SafeDelCloseMixin(object): """A helper class to use where __del__ would call .close() which might fail if "too late in GC game" """ def __del__(self): try: self.close() except TypeError: if os.fdopen is None or lgr.debug is None: # if we are late in the game and things already gc'ed in py3, # it is Ok return raise
[docs]@auto_repr class BatchedCommand(SafeDelCloseMixin): """ Container for a running subprocess. Supports communication with the subprocess via stdin and stdout. """ # Collection of active BatchedCommands as a mapping from object IDs to # instances _active_instances = WeakValueDictionary() def __init__(self, cmd: Union[str, Tuple, List], path: Optional[str] = None, output_proc: Callable = None, timeout: Optional[float] = None, exception_on_timeout: bool = False, ): command = cmd self.command = [command] if not isinstance(command, List) else command self.path = path self.output_proc = output_proc self.timeout = timeout self.exception_on_timeout = exception_on_timeout self.stdin_queue = None self.stderr_output = b"" self.runner = None self.generator = None self.encoding = None self.wait_timed_out = None self.return_code = None self._abandon_cache = None self._active = 0 self._active_last = _now() self.clean_inactive() assert id(self) not in self._active_instances self._active_instances[id(self)] = self
[docs] @classmethod def clean_inactive(cls): from . import cfg max_batched = cfg.obtain("datalad.runtime.max-batched") max_inactive_age = cfg.obtain("datalad.runtime.max-inactive-age") if len(cls._active_instances) > max_batched: active_qty = 0 inactive = [] for c in cls._active_instances.values(): if c._active: active_qty += 1 else: inactive.append(c) inactive.sort(key=attrgetter("_active_last")) to_close = len(cls._active_instances) - max_batched if to_close <= 0: return too_young = 0 now = _now() for i, c in enumerate(inactive): if (now - c._active_last).total_seconds() <= max_inactive_age: too_young = len(inactive) - i break elif c._active: active_qty += 1 else: c.close() cls._active_instances.pop(id(c), None) to_close -= 1 if to_close <= 0: break if to_close > 0: lgr.debug( "Too many BatchedCommands remaining after cleanup;" " %d active, %d went inactive recently", active_qty, too_young, )
def _initialize(self): lgr.debug("Starting new runner for %s", repr(self)) lgr.log(5, "Command: %s", self.command) self.stdin_queue = queue.Queue() self.stderr_output = b"" self.wait_timed_out = None self.return_code = None self.runner = WitlessRunner( cwd=self.path, env=GitRunnerBase.get_git_environ_adjusted() ) self.generator = cmd=self.command, protocol=BatchedCommandProtocol, stdin=self.stdin_queue, cwd=self.path, # This mimics the behavior of the old implementation w.r.t # timeouts when waiting for the closing process timeout=self.timeout or 11.0, # Keyword arguments for the protocol batched_command=self, output_proc=self.output_proc, ) self.encoding = self.generator.runner.protocol.encoding self._active_last = _now()
[docs] def process_running(self) -> bool: if self.runner: return self.generator.runner.process.poll() is None return False
def __call__(self, cmds: Union[str, Tuple, List]): """ Send requests to the subprocess and return the responses. We expect one response per request. How the response is structured is determined by output_proc. If output_proc returns not-None, the responses is considered to be a response. If output_proc is not provided, we assume that a single response is a single line. If the subprocess does not exist yet it is started before the first command is sent. Parameters ---------- cmds : str or tuple or list of (str or tuple) request for the subprocess Returns ------- str or list Responses received from process. Either a string, or a list of strings, if cmds was a list. """ self._active += 1 requests = cmds input_multiple = isinstance(requests, list) if not input_multiple: requests = [requests] responses = [] try: # This code assumes that each processing request is # a single line and leads to a response that triggers a # `send_result` in the protocol. for request in requests: while True: try: responses.append(self.process_request(request)) break except StopIteration: # The process finished executing, store the last return # code and restart the process. lgr.debug(f"{self}: command exited") self.return_code = self.generator.return_code self.runner = None except CommandError as command_error: # The command exited with a non-zero return code lgr.error(f"{self}: command error: {command_error}") self.return_code = command_error.code self.runner = None finally: self._active -= 1 return responses if input_multiple else responses[0] if responses else None
[docs] def process_request(self, request: Union[Tuple, str]) -> str: self._active += 1 try: if not self.process_running(): self._initialize() # Send request to subprocess if not isinstance(request, str): request = ' '.join(request) self.stdin_queue.put((request + "\n").encode()) # Get the response from the generator. We only consider # data received on stdout as a response. if self.output_proc: # If we have an output procedure, let the output procedure # read stdout and decide about the nature of the response response = self.output_proc(ReadlineEmulator(self)) else: # If there is no output procedure we assume that a response # is one line. response = self.get_one_line() if response is not None: response = response.rstrip() return response finally: self._active -= 1
[docs] def proc1(self, single_command: str): """ Simulate the old interface. This method is used only once in AnnexRepo.get_metadata() """ self._active += 1 try: assert isinstance(single_command, str) return self(single_command) finally: self._active -= 1
[docs] def get_one_line(self) -> Optional[str]: """ Get a single stdout line from the generator. If timeout was specified, and exception_on_timeout is False, and if a timeout occurs, return None. Otherwise, return the string that was read from the generator. """ # Implementation remarks: # 1. We know that BatchedCommandProtocol only returns complete lines on # stdout, that makes this code simple. # 2. stderr is handled transparently within this method, # by adding all stderr-content to an internal buffer. while True: source, data = self.generator.send(None) if source == STDERR_FILENO: self.stderr_output += data elif source == STDOUT_FILENO: return data elif source == "timeout": # TODO: we should restart the subprocess on timeout, otherwise # we might end up with results from a previous instruction, # when handling multiple instructions at once. Until this is # done properly, communication timeouts are ignored in order # to avoid errors. pass else: raise ValueError(f"{self}: unknown source: {source}")
[docs] def close(self, return_stderr=False): """ Close communication and wait for process to terminate. If the "timeout" parameter to the constructor was not None, and if the configuration setting "datalad.runtime.stalled-external" is set to "abandon", the method will return latest after "timeout" seconds. If the subprocess did not exit within this time, the attribute "wait_timed_out" will be set to "True". Parameters ---------- return_stderr: bool if set to "True", the call will return all collected stderr content as string. In addition, if return_stderr is True and the log level is 5 or lower, and the configuration setting "datalad.log.outputs" evaluates to "True", the content of stderr will be logged. Returns ------- str, optional stderr output if return_stderr is True, None otherwise """ if self.runner: abandon = self._get_abandon() # Close stdin to let the process know that we want to end # communication. We also close stdout and stderr to inform # the generator that we do not care about them anymore. This # will trigger process wait timeouts. self.generator.runner.close_stdin() # Process all remaining messages until the subprocess exits. remaining = [] timeout = False try: for source, data in self.generator: if source == STDERR_FILENO: self.stderr_output += data elif source == STDOUT_FILENO: remaining.append(data) elif source == "timeout": if data is None and abandon is True: timeout = True break else: raise ValueError(f"{self}: unknown source: {source}") self.return_code = self.generator.return_code except CommandError as command_error: lgr.error(f"{self} subprocess failed with {command_error}") self.return_code = command_error.code if remaining: lgr.debug(f"{self}: remaining content: {remaining}") self.wait_timed_out = timeout is True if self.wait_timed_out: lgr.debug( f"{self}: timeout while waiting for subprocess to exit") lgr.warning( f"Batched process ({}) " f"did not finish, abandoning it without killing it") result = self.get_requested_error_output(return_stderr) self.runner = None self.stderr_output = b"" return result
[docs] def get_requested_error_output(self, return_stderr: bool): if not self.runner: return None stderr_content = ensure_unicode(self.stderr_output) if lgr.isEnabledFor(5): from . import cfg if cfg.getbool("datalad.log", "outputs", default=False): stderr_lines = stderr_content.splitlines() lgr.log( 5, "stderr of %s had %d lines:",, len(stderr_lines)) for line in stderr_lines: lgr.log(5, "| " + line) if return_stderr: return stderr_content return None
[docs] def get_timeout_exception(self, fd: Optional[int] ) -> Optional[TimeoutExpired]: """ Get a process timeout exception if timeout exceptions should be generated for a process that continues longer than timeout seconds after self.close() was initiated. """ if self.timeout is None \ or fd is not None \ or self.exception_on_timeout is False\ or self._get_abandon() == "wait": return None return TimeoutExpired( cmd=self.command, timeout=self.timeout or 11.0, stderr=self.stderr_output)
def _get_abandon(self): if self._abandon_cache is None: from . import cfg cfg_var = "datalad.runtime.stalled-external" cfg_val = cfg.obtain(cfg_var) if cfg_val not in ("wait", "abandon"): raise ValueError(f"Unexpected value: {cfg_var}={cfg_val!r}") self._abandon_cache = cfg_val == "abandon" return self._abandon_cache
def _now(): return