Interacting With a Long-Running Child Process in Python
There's no single fits-all solution for this task, but here are a bunch of recipes to handle the more commonly occurring situations.
Join the DZone community and get the full member experience.
Join For FreeThe Python subprocess module is a powerful Swiss Army knife for launching and interacting with child processes. It comes with several high-level APIs like call
, check_output
, and (starting with Python 3.5) run
that are focused on child processes that our program runs and waits to complete.
In this post, I want to discuss a variation of this task that is less directly addressed: long-running child processes. Think about testing some server — for example, an HTTP server. We launch it as a child process, then connect clients to it and run some testing sequence. When we're done, we want to shut down the child process in an orderly way. This would be difficult to achieve with APIs that just run a child process to completion synchronously, so we'll have to look at some of the lower-level APIs.
Sure, we could launch a child process with subprocess.run
in one thread and interact with it (via a known port, for example) in another thread. But this would make it tricky to cleanly terminate the child process when we're done with it. If the child process has an orderly termination sequence (such as sending some sort of "quit" command), this is doable. But most servers do not, and will just spin forever until killed. This is the use-case this post addresses.
Launch, Interact, Terminate, and Get All Output When Done
The first, simplest use case will be launching an HTTP server, interacting with it, terminating it cleanly, and getting all the server's stdout
and stderr
when done. Here are the important bits of the code (all full code samples for this post are available here), tested with Python 3.6:
def main():
proc = subprocess.Popen(['python3', '-u', '-m', 'http.server', '8070'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
try:
time.sleep(0.2)
resp = urllib.request.urlopen('http://localhost:8070')
assert b'Directory listing' in resp.read()
finally:
proc.terminate()
try:
outs, _ = proc.communicate(timeout=0.2)
print('== subprocess exited with rc =', proc.returncode)
print(outs.decode('utf-8'))
except subprocess.TimeoutExpired:
print('subprocess did not terminate in time')
The child process is an HTTP server using Python's own http.server
module, serving contents from the directory it was launched in. We use the low-level Popen API to launch the process asynchronously (meaning that Popen returns immediately and the child process runs in the background).
Note the -u
passed to Python on invocation. This is critical to avoid stdout
buffering, and seeing as much of stdout
as possible when the process is killed. Buffering is a serious issue when interacting with child processes, and we'll see more examples of this later on.
The meat of the sample happens in the finally
block. proc.terminate()
sends the child a process SIGTERM
signal. Then, proc.communicate
waits for the child to exit and captures all of its stdout.communicate
and has a very convenient timeout argument starting with Python 3.3, letting us know if the child does not exit for some reason. A more sophisticated technique could be to send the child a SIGKILL
(with proc.kill
) if it didn't exit due to SIGTERM
.
If you run this script, you'll see the output:
$ python3.6 interact-http-server.py
== subprocess exited with rc = -15
Serving HTTP on 0.0.0.0 port 8070 (http://0.0.0.0:8070/) ...
127.0.0.1 - - [05/Jul/2017 05:48:34] "GET / HTTP/1.1" 200 -
The return code of the child is -15 (negative means terminated by a signal; 15 is the numeric code for SIGTERM
). The stdout
was properly captured and printed out.
Launch, Interact, Get Output in Real Time, Terminate
A related use case is getting the stdout
of a child process in "real-time" and not everything together at the end. Here we have to be really careful about buffering because it can easily bite and deadlock the program. Linux processes are usually line-buffered in interactive mode and fully buffered otherwise. Very few processes are fully unbuffered. Therefore, reading stdout
in chunks of less than a line is not recommended, in my opinion. Really, just don't do it. Standard I/O is meant to be used in a line-wise way (think of how all the Unix command-line tools work); if you need sub-line granularity, stdout is not the way to go (use a socket or something).
Anyway, to our example:
def output_reader(proc):
for line in iter(proc.stdout.readline, b''):
print('got line: {0}'.format(line.decode('utf-8')), end='')
def main():
proc = subprocess.Popen(['python3', '-u', '-m', 'http.server', '8070'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
t = threading.Thread(target=output_reader, args=(proc,))
t.start()
try:
time.sleep(0.2)
for i in range(4):
resp = urllib.request.urlopen('http://localhost:8070')
assert b'Directory listing' in resp.read()
time.sleep(0.1)
finally:
proc.terminate()
try:
proc.wait(timeout=0.2)
print('== subprocess exited with rc =', proc.returncode)
except subprocess.TimeoutExpired:
print('subprocess did not terminate in time')
t.join()
The sample is similar except for how stdout is handled; there are no more calls to communicate; instead, proc.wait
just waits for the child to exit (after SIGTERM
has been sent). A thread polls the child's stdout
attribute, looping as long as new lines are available and printing them immediately. If you run this sample, you'll notice that the child's stdout is reported in real-time, rather than as one lump at the end.
The iter(proc.stdout.readline, b'')
snippet is continuously calling proc.stdout.readline()
until this call returns an empty bytestring. This only happens when proc.stdout
is closed, which occurs when the child exits. Thus, while it may seem like the reader thread might never terminate, it always will! As long as the child process is running, the thread will dutifully block on that readline; as soon as the child terminates, the readline call returns b''
and the thread exits.
If we don't want to just print the captured stdout, but rather do something with it (such as look for expected patterns), this is easy to organize with Python's thread-safe queue. The reader thread becomes:
def output_reader(proc, outq):
for line in iter(proc.stdout.readline, b''):
outq.put(line.decode('utf-8'))
And we launch it with:
outq = queue.Queue()
t = threading.Thread(target=output_reader, args=(proc, outq))
t.start()
Then, at any point, we can check if there's stuff in the queue by using its non-blocking mode (the full code sample is here):
try:
line = outq.get(block=False)
print('got line from outq: {0}'.format(line), end='')
except queue.Empty:
print('could not get line from queue')
Direct Interaction With the Child's stdin and stdout
This sample is getting into dangerous waters; the subprocess module documentation warns against doing the things described here due to possible deadlocks, but sometimes there's simply no choice! Some programs like using their stdin
and stdout
for interaction. Alternatively, you may have a program with an interactive (interpreter) mode you'd like to test — like the Python interpreter itself. Sometimes, it's okay to feed this program all its input at once and then check its output; this can and should be done with communicate
, which is the perfect API for this purpose. It properly feeds stdin
, closes it when done (which signals many interactive programs that game's over), etc. But what if we really want to provide additional input based on some previous output of the child process? Here goes:
def main():
proc = subprocess.Popen(['python3', '-i'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# To avoid deadlocks: careful to: add \n to output, flush output, use
# readline() rather than read()
proc.stdin.write(b'2+2\n')
proc.stdin.flush()
print(proc.stdout.readline())
proc.stdin.write(b'len("foobar")\n')
proc.stdin.flush()
print(proc.stdout.readline())
proc.stdin.close()
proc.terminate()
proc.wait(timeout=0.2)
Let me reiterate what the comment in this code sample is saying:
- When sending input to a line interpreter, don't forget to send the actual newline.
- Always flush the stream after placing data into it, since it may be buffered.
- Use readline to get input from the line interpreter.
We have to be very careful to avoid the following situation:
- We send data to the child's
tdin
, but it doesn't get the complete input for some reason (lack of newline, buffering etc.). - We then invoke readline to wait for the reply.
Since the child is still waiting for input to complete (step 1), our step 2 may hang forever. This is a classic deadlock.
In the end of the interaction, we close the child's stdin (this is optional but useful for some kinds of child processes) call terminate and then wait. It would be better to send the child process some sort of "exit" command (quit()
in the case of the Python interpreter); the terminate here is to demonstrate what we have to do if the other options are unavailable. Note that we could also use communicate here instead of wait to capture the stderr output.
Interact using non-blocking reads and stoppable threads
The final sample demonstrates a slightly more advanced scenario. Suppose we're testing a long-lived socket server, and we're interested in orchestrating complex interactions with it, perhaps with multiple concurrent clients. We'll also want a clean shut-down of the whole setup of threads and child processes. The full code sample is here; what follows is a couple of representative snippets. The key ingredient is this socket reading function meant to be run in its own thread:
def socket_reader(sockobj, outq, exit_event):
while not exit_event.is_set():
try:
buf = sockobj.recv(1)
if len(buf) < 1:
break
outq.put(buf)
except socket.timeout:
continue
except OSError as e:
break
Best used with a socket that has a timeout set on it, this function will repeatedly monitor the socket for new data and push everything it receives into outq
, which is a queue.Queue
. The function exits when either the socket is closed (recv
returns an empty bytestring), or when exit_event
(a threading.Event) is set by the caller.
The caller can launch this function in a thread and occasionally try to read new items from the queue in a non-blocking way:
try:
v = outq.get(block=False)
print(v)
except queue.Empty:
break
When all is done, the caller can set the exit event to stop the thread (the thread will stop on its own if the socket it's reading from is closed, but the event lets us control this more directly).
There's no single fits-all solution for the task described in this post; I presented a bunch of recipes to handle the more commonly occurring situations, but it may be the case that specific use cases may not be addressed by them. Please let me know if you run into an interesting use case these recipes helped (or did not help!) resolve. Any other feedback is also welcome, as usual.
Published at DZone with permission of Eli Bendersky. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments