datalad.runner.nonasyncrunner

Thread based subprocess execution with stdout and stderr passed to protocol objects

class datalad.runner.nonasyncrunner.ThreadedRunner(cmd, protocol_class, stdin, protocol_kwargs=None, timeout=None, exception_on_error=True, **popen_kwargs)[source]

Bases: object

A class the contains a naive implementation for concurrent sub-process execution. It uses subprocess.Popen and threads to read from stdout and stderr of the subprocess, and to write to stdin of the subprocess.

All read data and timeouts are passed to a protocol instance, which can create the final result.

Parameters:
  • cmd (str | list)

  • protocol_class (type[WitlessProtocol])

  • stdin (int | IO | bytes | Queue[Optional[bytes]] | None)

  • protocol_kwargs (Optional[dict])

  • timeout (Optional[float])

  • exception_on_error (bool)

check_for_stall()[source]
Return type:

bool

close_stdin()[source]
ensure_stdin_stdout_stderr_closed()[source]
ensure_stdout_stderr_closed()[source]
is_stalled()[source]
Return type:

bool

process_loop()[source]
Return type:

dict

process_queue()[source]

Get a single event from the queue or handle a timeout. This method might modify the set of active file numbers if a file-closed event is read from the output queue, or if a timeout-callback return True.

process_timeouts()[source]

Check for timeouts

This method checks whether a timeout occurred since it was called last. If a timeout occurred, the timeout handler is called.

Return type:

bool

Returns: bool

Return True if at least one timeout occurred, False if no timeout occurred.

remove_file_number(file_number)[source]

Remove a file number from the active set and from the timeout set.

Parameters:

file_number (int)

remove_process()[source]
run()[source]

Run the command as specified in __init__.

This method is not re-entrant. Furthermore, if the protocol is a subclass of GeneratorMixIn, and the generator has not been exhausted, i.e. it has not raised StopIteration, this method should not be called again. If it is called again before the generator is exhausted, a RuntimeError is raised. In the non-generator case, a second caller will be suspended until the first caller has returned.

Return type:

dict | _ResultGenerator

Returns:

  • Any – If the protocol is not a subclass of GeneratorMixIn, the result of protocol._prepare_result will be returned.

  • Generator – If the protocol is a subclass of GeneratorMixIn, a Generator will be returned. This allows to use this method in constructs like:

    for protocol_output in runner.run():
        ...
    

    Where the iterator yields whatever protocol.pipe_data_received sends into the generator. If all output was yielded and the process has terminated, the generator will raise StopIteration(return_code), where return_code is the return code of the process. The return code of the process will also be stored in the “return_code”-attribute of the runner. So you could write:

    gen = runner.run()
    for file_descriptor, data in gen:
        ...
    
    # get the return code of the process
    result = gen.return_code
    

should_continue()[source]
Return type:

bool

timeout_resolution = 0.2
wait_for_threads()[source]
datalad.runner.nonasyncrunner.run_command(cmd, protocol, stdin, protocol_kwargs=None, timeout=None, exception_on_error=True, **popen_kwargs)[source]

Run a command in a subprocess

this function delegates the execution to an instance of ThreadedRunner, please see ThreadedRunner.__init__() for a documentation of the parameters, and ThreadedRunner.run() for a documentation of the return values.

Parameters:
  • cmd (str | list)

  • protocol (type[WitlessProtocol])

  • stdin (int | IO | bytes | Queue[Optional[bytes]] | None)

  • protocol_kwargs (Optional[dict])

  • timeout (Optional[float])

  • exception_on_error (bool)

Return type:

dict | _ResultGenerator