1# SPDX-License-Identifier: GPL-2.0
2# Copyright (c) 2015 Stephen Warren
3# Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved.
4
5# Common logic to interact with U-Boot via the console. This class provides
6# the interface that tests use to execute U-Boot shell commands and wait for
7# their results. Sub-classes exist to perform board-type-specific setup
8# operations, such as spawning a sub-process for Sandbox, or attaching to the
9# serial console of real hardware.
10
11import multiplexed_log
12import os
13import pytest
14import re
15import sys
16import u_boot_spawn
17
18# Regexes for text we expect U-Boot to send to the console.
19pattern_u_boot_spl_signon = re.compile('(U-Boot SPL \\d{4}\\.\\d{2}[^\r\n]*\\))')
20pattern_u_boot_spl2_signon = re.compile('(U-Boot SPL \\d{4}\\.\\d{2}[^\r\n]*\\))')
21pattern_u_boot_main_signon = re.compile('(U-Boot \\d{4}\\.\\d{2}[^\r\n]*\\))')
22pattern_stop_autoboot_prompt = re.compile('Hit any key to stop autoboot: ')
23pattern_unknown_command = re.compile('Unknown command \'.*\' - try \'help\'')
24pattern_error_notification = re.compile('## Error: ')
25pattern_error_please_reset = re.compile('### ERROR ### Please RESET the board ###')
26
27PAT_ID = 0
28PAT_RE = 1
29
30bad_pattern_defs = (
31    ('spl_signon', pattern_u_boot_spl_signon),
32    ('spl2_signon', pattern_u_boot_spl2_signon),
33    ('main_signon', pattern_u_boot_main_signon),
34    ('stop_autoboot_prompt', pattern_stop_autoboot_prompt),
35    ('unknown_command', pattern_unknown_command),
36    ('error_notification', pattern_error_notification),
37    ('error_please_reset', pattern_error_please_reset),
38)
39
40class ConsoleDisableCheck(object):
41    """Context manager (for Python's with statement) that temporarily disables
42    the specified console output error check. This is useful when deliberately
43    executing a command that is known to trigger one of the error checks, in
44    order to test that the error condition is actually raised. This class is
45    used internally by ConsoleBase::disable_check(); it is not intended for
46    direct usage."""
47
48    def __init__(self, console, check_type):
49        self.console = console
50        self.check_type = check_type
51
52    def __enter__(self):
53        self.console.disable_check_count[self.check_type] += 1
54        self.console.eval_bad_patterns()
55
56    def __exit__(self, extype, value, traceback):
57        self.console.disable_check_count[self.check_type] -= 1
58        self.console.eval_bad_patterns()
59
60class ConsoleSetupTimeout(object):
61    """Context manager (for Python's with statement) that temporarily sets up
62    timeout for specific command. This is useful when execution time is greater
63    then default 30s."""
64
65    def __init__(self, console, timeout):
66        self.p = console.p
67        self.orig_timeout = self.p.timeout
68        self.p.timeout = timeout
69
70    def __enter__(self):
71        return self
72
73    def __exit__(self, extype, value, traceback):
74        self.p.timeout = self.orig_timeout
75
76class ConsoleBase(object):
77    """The interface through which test functions interact with the U-Boot
78    console. This primarily involves executing shell commands, capturing their
79    results, and checking for common error conditions. Some common utilities
80    are also provided too."""
81
82    def __init__(self, log, config, max_fifo_fill):
83        """Initialize a U-Boot console connection.
84
85        Can only usefully be called by sub-classes.
86
87        Args:
88            log: A mulptiplex_log.Logfile object, to which the U-Boot output
89                will be logged.
90            config: A configuration data structure, as built by conftest.py.
91            max_fifo_fill: The maximum number of characters to send to U-Boot
92                command-line before waiting for U-Boot to echo the characters
93                back. For UART-based HW without HW flow control, this value
94                should be set less than the UART RX FIFO size to avoid
95                overflow, assuming that U-Boot can't keep up with full-rate
96                traffic at the baud rate.
97
98        Returns:
99            Nothing.
100        """
101
102        self.log = log
103        self.config = config
104        self.max_fifo_fill = max_fifo_fill
105
106        self.logstream = self.log.get_stream('console', sys.stdout)
107
108        # Array slice removes leading/trailing quotes
109        self.prompt = self.config.buildconfig['config_sys_prompt'][1:-1]
110        self.prompt_compiled = re.compile('^' + re.escape(self.prompt), re.MULTILINE)
111        self.p = None
112        self.disable_check_count = {pat[PAT_ID]: 0 for pat in bad_pattern_defs}
113        self.eval_bad_patterns()
114
115        self.at_prompt = False
116        self.at_prompt_logevt = None
117
118    def eval_bad_patterns(self):
119        self.bad_patterns = [pat[PAT_RE] for pat in bad_pattern_defs \
120            if self.disable_check_count[pat[PAT_ID]] == 0]
121        self.bad_pattern_ids = [pat[PAT_ID] for pat in bad_pattern_defs \
122            if self.disable_check_count[pat[PAT_ID]] == 0]
123
124    def close(self):
125        """Terminate the connection to the U-Boot console.
126
127        This function is only useful once all interaction with U-Boot is
128        complete. Once this function is called, data cannot be sent to or
129        received from U-Boot.
130
131        Args:
132            None.
133
134        Returns:
135            Nothing.
136        """
137
138        if self.p:
139            self.p.close()
140        self.logstream.close()
141
142    def run_command(self, cmd, wait_for_echo=True, send_nl=True,
143            wait_for_prompt=True):
144        """Execute a command via the U-Boot console.
145
146        The command is always sent to U-Boot.
147
148        U-Boot echoes any command back to its output, and this function
149        typically waits for that to occur. The wait can be disabled by setting
150        wait_for_echo=False, which is useful e.g. when sending CTRL-C to
151        interrupt a long-running command such as "ums".
152
153        Command execution is typically triggered by sending a newline
154        character. This can be disabled by setting send_nl=False, which is
155        also useful when sending CTRL-C.
156
157        This function typically waits for the command to finish executing, and
158        returns the console output that it generated. This can be disabled by
159        setting wait_for_prompt=False, which is useful when invoking a long-
160        running command such as "ums".
161
162        Args:
163            cmd: The command to send.
164            wait_for_echo: Boolean indicating whether to wait for U-Boot to
165                echo the command text back to its output.
166            send_nl: Boolean indicating whether to send a newline character
167                after the command string.
168            wait_for_prompt: Boolean indicating whether to wait for the
169                command prompt to be sent by U-Boot. This typically occurs
170                immediately after the command has been executed.
171
172        Returns:
173            If wait_for_prompt == False:
174                Nothing.
175            Else:
176                The output from U-Boot during command execution. In other
177                words, the text U-Boot emitted between the point it echod the
178                command string and emitted the subsequent command prompts.
179        """
180
181        if self.at_prompt and \
182                self.at_prompt_logevt != self.logstream.logfile.cur_evt:
183            self.logstream.write(self.prompt, implicit=True)
184
185        try:
186            self.at_prompt = False
187            if send_nl:
188                cmd += '\n'
189            while cmd:
190                # Limit max outstanding data, so UART FIFOs don't overflow
191                chunk = cmd[:self.max_fifo_fill]
192                cmd = cmd[self.max_fifo_fill:]
193                self.p.send(chunk)
194                if not wait_for_echo:
195                    continue
196                chunk = re.escape(chunk)
197                chunk = chunk.replace('\\\n', '[\r\n]')
198                m = self.p.expect([chunk] + self.bad_patterns)
199                if m != 0:
200                    self.at_prompt = False
201                    raise Exception('Bad pattern found on console: ' +
202                                    self.bad_pattern_ids[m - 1])
203            if not wait_for_prompt:
204                return
205            m = self.p.expect([self.prompt_compiled] + self.bad_patterns)
206            if m != 0:
207                self.at_prompt = False
208                raise Exception('Bad pattern found on console: ' +
209                                self.bad_pattern_ids[m - 1])
210            self.at_prompt = True
211            self.at_prompt_logevt = self.logstream.logfile.cur_evt
212            # Only strip \r\n; space/TAB might be significant if testing
213            # indentation.
214            return self.p.before.strip('\r\n')
215        except Exception as ex:
216            self.log.error(str(ex))
217            self.cleanup_spawn()
218            raise
219        finally:
220            self.log.timestamp()
221
222    def run_command_list(self, cmds):
223        """Run a list of commands.
224
225        This is a helper function to call run_command() with default arguments
226        for each command in a list.
227
228        Args:
229            cmd: List of commands (each a string).
230        Returns:
231            A list of output strings from each command, one element for each
232            command.
233        """
234        output = []
235        for cmd in cmds:
236            output.append(self.run_command(cmd))
237        return output
238
239    def ctrlc(self):
240        """Send a CTRL-C character to U-Boot.
241
242        This is useful in order to stop execution of long-running synchronous
243        commands such as "ums".
244
245        Args:
246            None.
247
248        Returns:
249            Nothing.
250        """
251
252        self.log.action('Sending Ctrl-C')
253        self.run_command(chr(3), wait_for_echo=False, send_nl=False)
254
255    def wait_for(self, text):
256        """Wait for a pattern to be emitted by U-Boot.
257
258        This is useful when a long-running command such as "dfu" is executing,
259        and it periodically emits some text that should show up at a specific
260        location in the log file.
261
262        Args:
263            text: The text to wait for; either a string (containing raw text,
264                not a regular expression) or an re object.
265
266        Returns:
267            Nothing.
268        """
269
270        if type(text) == type(''):
271            text = re.escape(text)
272        m = self.p.expect([text] + self.bad_patterns)
273        if m != 0:
274            raise Exception('Bad pattern found on console: ' +
275                            self.bad_pattern_ids[m - 1])
276
277    def drain_console(self):
278        """Read from and log the U-Boot console for a short time.
279
280        U-Boot's console output is only logged when the test code actively
281        waits for U-Boot to emit specific data. There are cases where tests
282        can fail without doing this. For example, if a test asks U-Boot to
283        enable USB device mode, then polls until a host-side device node
284        exists. In such a case, it is useful to log U-Boot's console output
285        in case U-Boot printed clues as to why the host-side even did not
286        occur. This function will do that.
287
288        Args:
289            None.
290
291        Returns:
292            Nothing.
293        """
294
295        # If we are already not connected to U-Boot, there's nothing to drain.
296        # This should only happen when a previous call to run_command() or
297        # wait_for() failed (and hence the output has already been logged), or
298        # the system is shutting down.
299        if not self.p:
300            return
301
302        orig_timeout = self.p.timeout
303        try:
304            # Drain the log for a relatively short time.
305            self.p.timeout = 1000
306            # Wait for something U-Boot will likely never send. This will
307            # cause the console output to be read and logged.
308            self.p.expect(['This should never match U-Boot output'])
309        except:
310            # We expect a timeout, since U-Boot won't print what we waited
311            # for. Squash it when it happens.
312            #
313            # Squash any other exception too. This function is only used to
314            # drain (and log) the U-Boot console output after a failed test.
315            # The U-Boot process will be restarted, or target board reset, once
316            # this function returns. So, we don't care about detecting any
317            # additional errors, so they're squashed so that the rest of the
318            # post-test-failure cleanup code can continue operation, and
319            # correctly terminate any log sections, etc.
320            pass
321        finally:
322            self.p.timeout = orig_timeout
323
324    def ensure_spawned(self):
325        """Ensure a connection to a correctly running U-Boot instance.
326
327        This may require spawning a new Sandbox process or resetting target
328        hardware, as defined by the implementation sub-class.
329
330        This is an internal function and should not be called directly.
331
332        Args:
333            None.
334
335        Returns:
336            Nothing.
337        """
338
339        if self.p:
340            return
341        try:
342            self.log.start_section('Starting U-Boot')
343            self.at_prompt = False
344            self.p = self.get_spawn()
345            # Real targets can take a long time to scroll large amounts of
346            # text if LCD is enabled. This value may need tweaking in the
347            # future, possibly per-test to be optimal. This works for 'help'
348            # on board 'seaboard'.
349            if not self.config.gdbserver:
350                self.p.timeout = 30000
351            self.p.logfile_read = self.logstream
352            bcfg = self.config.buildconfig
353            config_spl = bcfg.get('config_spl', 'n') == 'y'
354            config_spl_serial_support = bcfg.get('config_spl_serial_support',
355                                                 'n') == 'y'
356            env_spl_skipped = self.config.env.get('env__spl_skipped',
357                                                  False)
358            env_spl2_skipped = self.config.env.get('env__spl2_skipped',
359                                                  True)
360            if config_spl and config_spl_serial_support and not env_spl_skipped:
361                m = self.p.expect([pattern_u_boot_spl_signon] +
362                                  self.bad_patterns)
363                if m != 0:
364                    raise Exception('Bad pattern found on SPL console: ' +
365                                    self.bad_pattern_ids[m - 1])
366            if not env_spl2_skipped:
367                m = self.p.expect([pattern_u_boot_spl2_signon] +
368                                  self.bad_patterns)
369                if m != 0:
370                    raise Exception('Bad pattern found on SPL2 console: ' +
371                                    self.bad_pattern_ids[m - 1])
372            m = self.p.expect([pattern_u_boot_main_signon] + self.bad_patterns)
373            if m != 0:
374                raise Exception('Bad pattern found on console: ' +
375                                self.bad_pattern_ids[m - 1])
376            self.u_boot_version_string = self.p.after
377            while True:
378                m = self.p.expect([self.prompt_compiled,
379                    pattern_stop_autoboot_prompt] + self.bad_patterns)
380                if m == 0:
381                    break
382                if m == 1:
383                    self.p.send(' ')
384                    continue
385                raise Exception('Bad pattern found on console: ' +
386                                self.bad_pattern_ids[m - 2])
387            self.at_prompt = True
388            self.at_prompt_logevt = self.logstream.logfile.cur_evt
389        except Exception as ex:
390            self.log.error(str(ex))
391            self.cleanup_spawn()
392            raise
393        finally:
394            self.log.timestamp()
395            self.log.end_section('Starting U-Boot')
396
397    def cleanup_spawn(self):
398        """Shut down all interaction with the U-Boot instance.
399
400        This is used when an error is detected prior to re-establishing a
401        connection with a fresh U-Boot instance.
402
403        This is an internal function and should not be called directly.
404
405        Args:
406            None.
407
408        Returns:
409            Nothing.
410        """
411
412        try:
413            if self.p:
414                self.p.close()
415        except:
416            pass
417        self.p = None
418
419    def restart_uboot(self):
420        """Shut down and restart U-Boot."""
421        self.cleanup_spawn()
422        self.ensure_spawned()
423
424    def get_spawn_output(self):
425        """Return the start-up output from U-Boot
426
427        Returns:
428            The output produced by ensure_spawed(), as a string.
429        """
430        if self.p:
431            return self.p.get_expect_output()
432        return None
433
434    def validate_version_string_in_text(self, text):
435        """Assert that a command's output includes the U-Boot signon message.
436
437        This is primarily useful for validating the "version" command without
438        duplicating the signon text regex in a test function.
439
440        Args:
441            text: The command output text to check.
442
443        Returns:
444            Nothing. An exception is raised if the validation fails.
445        """
446
447        assert(self.u_boot_version_string in text)
448
449    def disable_check(self, check_type):
450        """Temporarily disable an error check of U-Boot's output.
451
452        Create a new context manager (for use with the "with" statement) which
453        temporarily disables a particular console output error check.
454
455        Args:
456            check_type: The type of error-check to disable. Valid values may
457            be found in self.disable_check_count above.
458
459        Returns:
460            A context manager object.
461        """
462
463        return ConsoleDisableCheck(self, check_type)
464
465    def temporary_timeout(self, timeout):
466        """Temporarily set up different timeout for commands.
467
468        Create a new context manager (for use with the "with" statement) which
469        temporarily change timeout.
470
471        Args:
472            timeout: Time in milliseconds.
473
474        Returns:
475            A context manager object.
476        """
477
478        return ConsoleSetupTimeout(self, timeout)
479