1#!/usr/bin/env python 2# SPDX-License-Identifier: GPL-2.0+ 3# 4# Modified by: Corey Goldberg, 2013 5# 6# Original code from: 7# Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013) 8# Copyright (C) 2005-2011 Canonical Ltd 9 10"""Python testtools extension for running unittest suites concurrently. 11 12The `testtools` project provides a ConcurrentTestSuite class, but does 13not provide a `make_tests` implementation needed to use it. 14 15This allows you to parallelize a test run across a configurable number 16of worker processes. While this can speed up CPU-bound test runs, it is 17mainly useful for IO-bound tests that spend most of their time waiting for 18data to arrive from someplace else and can benefit from cocncurrency. 19 20Unix only. 21""" 22 23import os 24import sys 25import traceback 26import unittest 27from itertools import cycle 28from multiprocessing import cpu_count 29 30from subunit import ProtocolTestCase, TestProtocolClient 31from subunit.test_results import AutoTimingTestResultDecorator 32 33from testtools import ConcurrentTestSuite, iterate_tests 34 35 36_all__ = [ 37 'ConcurrentTestSuite', 38 'fork_for_tests', 39 'partition_tests', 40] 41 42 43CPU_COUNT = cpu_count() 44 45 46def fork_for_tests(concurrency_num=CPU_COUNT): 47 """Implementation of `make_tests` used to construct `ConcurrentTestSuite`. 48 49 :param concurrency_num: number of processes to use. 50 """ 51 def do_fork(suite): 52 """Take suite and start up multiple runners by forking (Unix only). 53 54 :param suite: TestSuite object. 55 56 :return: An iterable of TestCase-like objects which can each have 57 run(result) called on them to feed tests to result. 58 """ 59 result = [] 60 test_blocks = partition_tests(suite, concurrency_num) 61 # Clear the tests from the original suite so it doesn't keep them alive 62 suite._tests[:] = [] 63 for process_tests in test_blocks: 64 process_suite = unittest.TestSuite(process_tests) 65 # Also clear each split list so new suite has only reference 66 process_tests[:] = [] 67 c2pread, c2pwrite = os.pipe() 68 pid = os.fork() 69 if pid == 0: 70 try: 71 stream = os.fdopen(c2pwrite, 'wb') 72 os.close(c2pread) 73 # Leave stderr and stdout open so we can see test noise 74 # Close stdin so that the child goes away if it decides to 75 # read from stdin (otherwise its a roulette to see what 76 # child actually gets keystrokes for pdb etc). 77 sys.stdin.close() 78 subunit_result = AutoTimingTestResultDecorator( 79 TestProtocolClient(stream) 80 ) 81 process_suite.run(subunit_result) 82 except: 83 # Try and report traceback on stream, but exit with error 84 # even if stream couldn't be created or something else 85 # goes wrong. The traceback is formatted to a string and 86 # written in one go to avoid interleaving lines from 87 # multiple failing children. 88 try: 89 stream.write(traceback.format_exc()) 90 finally: 91 os._exit(1) 92 os._exit(0) 93 else: 94 os.close(c2pwrite) 95 stream = os.fdopen(c2pread, 'rb') 96 test = ProtocolTestCase(stream) 97 result.append(test) 98 return result 99 return do_fork 100 101 102def partition_tests(suite, count): 103 """Partition suite into count lists of tests.""" 104 # This just assigns tests in a round-robin fashion. On one hand this 105 # splits up blocks of related tests that might run faster if they shared 106 # resources, but on the other it avoids assigning blocks of slow tests to 107 # just one partition. So the slowest partition shouldn't be much slower 108 # than the fastest. 109 partitions = [list() for _ in range(count)] 110 tests = iterate_tests(suite) 111 for partition, test in zip(cycle(partitions), tests): 112 partition.append(test) 113 return partitions 114 115 116if __name__ == '__main__': 117 import time 118 119 class SampleTestCase(unittest.TestCase): 120 """Dummy tests that sleep for demo.""" 121 122 def test_me_1(self): 123 time.sleep(0.5) 124 125 def test_me_2(self): 126 time.sleep(0.5) 127 128 def test_me_3(self): 129 time.sleep(0.5) 130 131 def test_me_4(self): 132 time.sleep(0.5) 133 134 # Load tests from SampleTestCase defined above 135 suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase) 136 runner = unittest.TextTestRunner() 137 138 # Run tests sequentially 139 runner.run(suite) 140 141 # Run same tests across 4 processes 142 suite = unittest.TestLoader().loadTestsFromTestCase(SampleTestCase) 143 concurrent_suite = ConcurrentTestSuite(suite, fork_for_tests(4)) 144 runner.run(concurrent_suite) 145