1# Copyright (c) 2012 The Chromium OS Authors.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4#
5# Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se>
6# Licensed to PSF under a Contributor Agreement.
7# See http://www.python.org/2.4/license for licensing details.
8
9"""Subprocess execution
10
11This module holds a subclass of subprocess.Popen with our own required
12features, mainly that we get access to the subprocess output while it
13is running rather than just at the end. This makes it easier to show
14progress information and filter output in real time.
15"""
16
17import errno
18import os
19import pty
20import select
21import subprocess
22import sys
23import unittest
24
25
26# Import these here so the caller does not need to import subprocess also.
27PIPE = subprocess.PIPE
28STDOUT = subprocess.STDOUT
29PIPE_PTY = -3     # Pipe output through a pty
30stay_alive = True
31
32
33class Popen(subprocess.Popen):
34    """Like subprocess.Popen with ptys and incremental output
35
36    This class deals with running a child process and filtering its output on
37    both stdout and stderr while it is running. We do this so we can monitor
38    progress, and possibly relay the output to the user if requested.
39
40    The class is similar to subprocess.Popen, the equivalent is something like:
41
42        Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
43
44    But this class has many fewer features, and two enhancement:
45
46    1. Rather than getting the output data only at the end, this class sends it
47         to a provided operation as it arrives.
48    2. We use pseudo terminals so that the child will hopefully flush its output
49         to us as soon as it is produced, rather than waiting for the end of a
50         line.
51
52    Use CommunicateFilter() to handle output from the subprocess.
53
54    """
55
56    def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY,
57                 shell=False, cwd=None, env=None, **kwargs):
58        """Cut-down constructor
59
60        Args:
61            args: Program and arguments for subprocess to execute.
62            stdin: See subprocess.Popen()
63            stdout: See subprocess.Popen(), except that we support the sentinel
64                    value of cros_subprocess.PIPE_PTY.
65            stderr: See subprocess.Popen(), except that we support the sentinel
66                    value of cros_subprocess.PIPE_PTY.
67            shell: See subprocess.Popen()
68            cwd: Working directory to change to for subprocess, or None if none.
69            env: Environment to use for this subprocess, or None to inherit parent.
70            kwargs: No other arguments are supported at the moment.    Passing other
71                    arguments will cause a ValueError to be raised.
72        """
73        stdout_pty = None
74        stderr_pty = None
75
76        if stdout == PIPE_PTY:
77            stdout_pty = pty.openpty()
78            stdout = os.fdopen(stdout_pty[1])
79        if stderr == PIPE_PTY:
80            stderr_pty = pty.openpty()
81            stderr = os.fdopen(stderr_pty[1])
82
83        super(Popen, self).__init__(args, stdin=stdin,
84                stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env,
85                **kwargs)
86
87        # If we're on a PTY, we passed the slave half of the PTY to the subprocess.
88        # We want to use the master half on our end from now on.    Setting this here
89        # does make some assumptions about the implementation of subprocess, but
90        # those assumptions are pretty minor.
91
92        # Note that if stderr is STDOUT, then self.stderr will be set to None by
93        # this constructor.
94        if stdout_pty is not None:
95            self.stdout = os.fdopen(stdout_pty[0])
96        if stderr_pty is not None:
97            self.stderr = os.fdopen(stderr_pty[0])
98
99        # Insist that unit tests exist for other arguments we don't support.
100        if kwargs:
101            raise ValueError("Unit tests do not test extra args - please add tests")
102
103    def ConvertData(self, data):
104        """Convert stdout/stderr data to the correct format for output
105
106        Args:
107            data: Data to convert, or None for ''
108
109        Returns:
110            Converted data, as bytes
111        """
112        if data is None:
113            return b''
114        return data
115
116    def CommunicateFilter(self, output):
117        """Interact with process: Read data from stdout and stderr.
118
119        This method runs until end-of-file is reached, then waits for the
120        subprocess to terminate.
121
122        The output function is sent all output from the subprocess and must be
123        defined like this:
124
125            def Output([self,] stream, data)
126            Args:
127                stream: the stream the output was received on, which will be
128                        sys.stdout or sys.stderr.
129                data: a string containing the data
130
131        Note: The data read is buffered in memory, so do not use this
132        method if the data size is large or unlimited.
133
134        Args:
135            output: Function to call with each fragment of output.
136
137        Returns:
138            A tuple (stdout, stderr, combined) which is the data received on
139            stdout, stderr and the combined data (interleaved stdout and stderr).
140
141            Note that the interleaved output will only be sensible if you have
142            set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on
143            the timing of the output in the subprocess. If a subprocess flips
144            between stdout and stderr quickly in succession, by the time we come to
145            read the output from each we may see several lines in each, and will read
146            all the stdout lines, then all the stderr lines. So the interleaving
147            may not be correct. In this case you might want to pass
148            stderr=cros_subprocess.STDOUT to the constructor.
149
150            This feature is still useful for subprocesses where stderr is
151            rarely used and indicates an error.
152
153            Note also that if you set stderr to STDOUT, then stderr will be empty
154            and the combined output will just be the same as stdout.
155        """
156
157        read_set = []
158        write_set = []
159        stdout = None # Return
160        stderr = None # Return
161
162        if self.stdin:
163            # Flush stdio buffer.    This might block, if the user has
164            # been writing to .stdin in an uncontrolled fashion.
165            self.stdin.flush()
166            if input:
167                write_set.append(self.stdin)
168            else:
169                self.stdin.close()
170        if self.stdout:
171            read_set.append(self.stdout)
172            stdout = b''
173        if self.stderr and self.stderr != self.stdout:
174            read_set.append(self.stderr)
175            stderr = b''
176        combined = b''
177
178        input_offset = 0
179        while read_set or write_set:
180            try:
181                rlist, wlist, _ = select.select(read_set, write_set, [], 0.2)
182            except select.error as e:
183                if e.args[0] == errno.EINTR:
184                    continue
185                raise
186
187            if not stay_alive:
188                    self.terminate()
189
190            if self.stdin in wlist:
191                # When select has indicated that the file is writable,
192                # we can write up to PIPE_BUF bytes without risk
193                # blocking.    POSIX defines PIPE_BUF >= 512
194                chunk = input[input_offset : input_offset + 512]
195                bytes_written = os.write(self.stdin.fileno(), chunk)
196                input_offset += bytes_written
197                if input_offset >= len(input):
198                    self.stdin.close()
199                    write_set.remove(self.stdin)
200
201            if self.stdout in rlist:
202                data = b''
203                # We will get an error on read if the pty is closed
204                try:
205                    data = os.read(self.stdout.fileno(), 1024)
206                except OSError:
207                    pass
208                if not len(data):
209                    self.stdout.close()
210                    read_set.remove(self.stdout)
211                else:
212                    stdout += data
213                    combined += data
214                    if output:
215                        output(sys.stdout, data)
216            if self.stderr in rlist:
217                data = b''
218                # We will get an error on read if the pty is closed
219                try:
220                    data = os.read(self.stderr.fileno(), 1024)
221                except OSError:
222                    pass
223                if not len(data):
224                    self.stderr.close()
225                    read_set.remove(self.stderr)
226                else:
227                    stderr += data
228                    combined += data
229                    if output:
230                        output(sys.stderr, data)
231
232        # All data exchanged.    Translate lists into strings.
233        stdout = self.ConvertData(stdout)
234        stderr = self.ConvertData(stderr)
235        combined = self.ConvertData(combined)
236
237        # Translate newlines, if requested.    We cannot let the file
238        # object do the translation: It is based on stdio, which is
239        # impossible to combine with select (unless forcing no
240        # buffering).
241        if self.universal_newlines and hasattr(file, 'newlines'):
242            if stdout:
243                stdout = self._translate_newlines(stdout)
244            if stderr:
245                stderr = self._translate_newlines(stderr)
246
247        self.wait()
248        return (stdout, stderr, combined)
249
250
251# Just being a unittest.TestCase gives us 14 public methods.    Unless we
252# disable this, we can only have 6 tests in a TestCase.    That's not enough.
253#
254# pylint: disable=R0904
255
256class TestSubprocess(unittest.TestCase):
257    """Our simple unit test for this module"""
258
259    class MyOperation:
260        """Provides a operation that we can pass to Popen"""
261        def __init__(self, input_to_send=None):
262            """Constructor to set up the operation and possible input.
263
264            Args:
265                input_to_send: a text string to send when we first get input. We will
266                    add \r\n to the string.
267            """
268            self.stdout_data = ''
269            self.stderr_data = ''
270            self.combined_data = ''
271            self.stdin_pipe = None
272            self._input_to_send = input_to_send
273            if input_to_send:
274                pipe = os.pipe()
275                self.stdin_read_pipe = pipe[0]
276                self._stdin_write_pipe = os.fdopen(pipe[1], 'w')
277
278        def Output(self, stream, data):
279            """Output handler for Popen. Stores the data for later comparison"""
280            if stream == sys.stdout:
281                self.stdout_data += data
282            if stream == sys.stderr:
283                self.stderr_data += data
284            self.combined_data += data
285
286            # Output the input string if we have one.
287            if self._input_to_send:
288                self._stdin_write_pipe.write(self._input_to_send + '\r\n')
289                self._stdin_write_pipe.flush()
290
291    def _BasicCheck(self, plist, oper):
292        """Basic checks that the output looks sane."""
293        self.assertEqual(plist[0], oper.stdout_data)
294        self.assertEqual(plist[1], oper.stderr_data)
295        self.assertEqual(plist[2], oper.combined_data)
296
297        # The total length of stdout and stderr should equal the combined length
298        self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2]))
299
300    def test_simple(self):
301        """Simple redirection: Get process list"""
302        oper = TestSubprocess.MyOperation()
303        plist = Popen(['ps']).CommunicateFilter(oper.Output)
304        self._BasicCheck(plist, oper)
305
306    def test_stderr(self):
307        """Check stdout and stderr"""
308        oper = TestSubprocess.MyOperation()
309        cmd = 'echo fred >/dev/stderr && false || echo bad'
310        plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
311        self._BasicCheck(plist, oper)
312        self.assertEqual(plist [0], 'bad\r\n')
313        self.assertEqual(plist [1], 'fred\r\n')
314
315    def test_shell(self):
316        """Check with and without shell works"""
317        oper = TestSubprocess.MyOperation()
318        cmd = 'echo test >/dev/stderr'
319        self.assertRaises(OSError, Popen, [cmd], shell=False)
320        plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
321        self._BasicCheck(plist, oper)
322        self.assertEqual(len(plist [0]), 0)
323        self.assertEqual(plist [1], 'test\r\n')
324
325    def test_list_args(self):
326        """Check with and without shell works using list arguments"""
327        oper = TestSubprocess.MyOperation()
328        cmd = ['echo', 'test', '>/dev/stderr']
329        plist = Popen(cmd, shell=False).CommunicateFilter(oper.Output)
330        self._BasicCheck(plist, oper)
331        self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n')
332        self.assertEqual(len(plist [1]), 0)
333
334        oper = TestSubprocess.MyOperation()
335
336        # this should be interpreted as 'echo' with the other args dropped
337        cmd = ['echo', 'test', '>/dev/stderr']
338        plist = Popen(cmd, shell=True).CommunicateFilter(oper.Output)
339        self._BasicCheck(plist, oper)
340        self.assertEqual(plist [0], '\r\n')
341
342    def test_cwd(self):
343        """Check we can change directory"""
344        for shell in (False, True):
345            oper = TestSubprocess.MyOperation()
346            plist = Popen('pwd', shell=shell, cwd='/tmp').CommunicateFilter(oper.Output)
347            self._BasicCheck(plist, oper)
348            self.assertEqual(plist [0], '/tmp\r\n')
349
350    def test_env(self):
351        """Check we can change environment"""
352        for add in (False, True):
353            oper = TestSubprocess.MyOperation()
354            env = os.environ
355            if add:
356                env ['FRED'] = 'fred'
357            cmd = 'echo $FRED'
358            plist = Popen(cmd, shell=True, env=env).CommunicateFilter(oper.Output)
359            self._BasicCheck(plist, oper)
360            self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n')
361
362    def test_extra_args(self):
363        """Check we can't add extra arguments"""
364        self.assertRaises(ValueError, Popen, 'true', close_fds=False)
365
366    def test_basic_input(self):
367        """Check that incremental input works
368
369        We set up a subprocess which will prompt for name. When we see this prompt
370        we send the name as input to the process. It should then print the name
371        properly to stdout.
372        """
373        oper = TestSubprocess.MyOperation('Flash')
374        prompt = 'What is your name?: '
375        cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt
376        plist = Popen([cmd], stdin=oper.stdin_read_pipe,
377                shell=True).CommunicateFilter(oper.Output)
378        self._BasicCheck(plist, oper)
379        self.assertEqual(len(plist [1]), 0)
380        self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n')
381
382    def test_isatty(self):
383        """Check that ptys appear as terminals to the subprocess"""
384        oper = TestSubprocess.MyOperation()
385        cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; '
386                'else echo "not %d" >&%d; fi;')
387        both_cmds = ''
388        for fd in (1, 2):
389            both_cmds += cmd % (fd, fd, fd, fd, fd)
390        plist = Popen(both_cmds, shell=True).CommunicateFilter(oper.Output)
391        self._BasicCheck(plist, oper)
392        self.assertEqual(plist [0], 'terminal 1\r\n')
393        self.assertEqual(plist [1], 'terminal 2\r\n')
394
395        # Now try with PIPE and make sure it is not a terminal
396        oper = TestSubprocess.MyOperation()
397        plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
398                shell=True).CommunicateFilter(oper.Output)
399        self._BasicCheck(plist, oper)
400        self.assertEqual(plist [0], 'not 1\n')
401        self.assertEqual(plist [1], 'not 2\n')
402
403if __name__ == '__main__':
404    unittest.main()
405