5.. warning ::
7   The multiprocess plugin is not available on Windows.
9The multiprocess plugin enables you to distribute your test run among a set of
10worker processes that run tests in parallel. This can speed up CPU-bound test
11runs (as long as the number of work processeses is around the number of
12processors or cores available), but is mainly useful for IO-bound tests that
13spend most of their time waiting for data to arrive from someplace else.
15.. note ::
17   See :doc:`../doc_tests/test_multiprocess/multiprocess` for additional
18   documentation and examples. Use of this plugin requires the
19   multiprocessing_ module, also available from PyPI.
21.. _multiprocessing :
23How tests are distributed
26The ideal case would be to dispatch each test to a worker process
27separately. This ideal is not attainable in all cases, however, because many
28test suites depend on context (class, module or package) fixtures.
30The plugin can't know (unless you tell it -- see below!) if a context fixture
31can be called many times concurrently (is re-entrant), or if it can be shared
32among tests running in different processes. Therefore, if a context has
33fixtures, the default behavior is to dispatch the entire suite to a worker as
34a unit.
36Controlling distribution
39There are two context-level variables that you can use to control this default
42If a context's fixtures are re-entrant, set ``_multiprocess_can_split_ = True``
43in the context, and the plugin will dispatch tests in suites bound to that
44context as if the context had no fixtures. This means that the fixtures will
45execute concurrently and multiple times, typically once per test.
47If a context's fixtures can be shared by tests running in different processes
48-- such as a package-level fixture that starts an external http server or
49initializes a shared database -- then set ``_multiprocess_shared_ = True`` in
50the context. These fixtures will then execute in the primary nose process, and
51tests in those contexts will be individually dispatched to run in parallel.
53How results are collected and reported
56As each test or suite executes in a worker process, results (failures, errors,
57and specially handled exceptions like SkipTest) are collected in that
58process. When the worker process finishes, it returns results to the main
59nose process. There, any progress output is printed (dots!), and the
60results from the test run are combined into a consolidated result
61set. When results have been received for all dispatched tests, or all
62workers have died, the result summary is output as normal.
67Not all test suites will benefit from, or even operate correctly using, this
68plugin. For example, CPU-bound tests will run more slowly if you don't have
69multiple processors. There are also some differences in plugin
70interactions and behaviors due to the way in which tests are dispatched and
71loaded. In general, test loading under this plugin operates as if it were
72always in directed mode instead of discovered mode. For instance, doctests
73in test modules will always be found when using this plugin with the doctest
76But the biggest issue you will face is probably concurrency. Unless you
77have kept your tests as religiously pure unit tests, with no side-effects, no
78ordering issues, and no external dependencies, chances are you will experience
79odd, intermittent and unexplainable failures and errors when using this
80plugin. This doesn't necessarily mean the plugin is broken; it may mean that
81your test suite is not safe for concurrency.
84import logging
85import os
86import sys
87import time
88import traceback
89import unittest
91from nose.core import TextTestRunner
92from nose import failure
93from nose import loader
94from nose.plugins.base import Plugin
95from nose.result import TextTestResult
96from nose.suite import ContextSuite
97from nose.util import test_address
98from Queue import Empty
99from warnings import warn
101    from cStringIO import StringIO
102except ImportError:
103    import StringIO
105log = logging.getLogger(__name__)
107Process = Queue = Pool = Event = None
109def _import_mp():
110    global Process, Queue, Pool, Event
111    if sys.platform == 'win32':
112        warn("multiprocess plugin is not available on windows",
113             RuntimeWarning)
114        return
115    try:
116        from multiprocessing import Process as Process_, \
117            Queue as Queue_, Pool as Pool_, Event as Event_
118        Process, Queue, Pool, Event = Process_, Queue_, Pool_, Event_
119    except ImportError:
120        warn("multiprocessing module is not available, multiprocess plugin "
121             "cannot be used", RuntimeWarning)
124class TestLet:
125    def __init__(self, case):
126        try:
127            self._id =
128        except AttributeError:
129            pass
130        self._short_description = case.shortDescription()
131        self._str = str(case)
133    def id(self):
134        return self._id
136    def shortDescription(self):
137        return self._short_description
139    def __str__(self):
140        return self._str
143class MultiProcess(Plugin):
144    """
145    Run tests in multiple processes. Requires processing module.
146    """
147    score = 1000
148    status = {}
150    def options(self, parser, env):
151        """
152        Register command-line options.
153        """
154        if sys.platform == 'win32':
155            return
156        parser.add_option("--processes", action="store",
157                          default=env.get('NOSE_PROCESSES', 0),
158                          dest="multiprocess_workers",
159                          metavar="NUM",
160                          help="Spread test run among this many processes. "
161                          "Set a number equal to the number of processors "
162                          "or cores in your machine for best results. "
163                          "[NOSE_PROCESSES]")
164        parser.add_option("--process-timeout", action="store",
165                          default=env.get('NOSE_PROCESS_TIMEOUT', 10),
166                          dest="multiprocess_timeout",
167                          metavar="SECONDS",
168                          help="Set timeout for return of results from each "
169                          "test runner process. [NOSE_PROCESS_TIMEOUT]")
171    def configure(self, options, config):
172        """
173        Configure plugin.
174        """
175        if sys.platform == 'win32':
176            return
177        try:
178            self.status.pop('active')
179        except KeyError:
180            pass
181        if not hasattr(options, 'multiprocess_workers'):
182            self.enabled = False
183            return
184        self.config = config
185        try:
186            workers = int(options.multiprocess_workers)
187        except (TypeError, ValueError):
188            workers = 0
189        if workers:
190            _import_mp()
191            if Process is None:
192                self.enabled = False
193                return
194            self.enabled = True
195            self.config.multiprocess_workers = workers
196            self.config.multiprocess_timeout = int(options.multiprocess_timeout)
197            self.status['active'] = True
199    def prepareTestLoader(self, loader):
200        """Remember loader class so MultiProcessTestRunner can instantiate
201        the right loader.
202        """
203        self.loaderClass = loader.__class__
205    def prepareTestRunner(self, runner):
206        """Replace test runner with MultiProcessTestRunner.
207        """
208        # replace with our runner class
209        return MultiProcessTestRunner(,
210                                      verbosity=self.config.verbosity,
211                                      config=self.config,
212                                      loaderClass=self.loaderClass)
215class MultiProcessTestRunner(TextTestRunner):
217    def __init__(self, **kw):
218        self.loaderClass = kw.pop('loaderClass', loader.defaultTestLoader)
219        super(MultiProcessTestRunner, self).__init__(**kw)
221    def run(self, test):
222        """
223        Execute the test (which may be a test suite). If the test is a suite,
224        distribute it out among as many processes as have been configured, at
225        as fine a level as is possible given the context fixtures defined in the
226        suite or any sub-suites.
228        """
229        log.debug(" (%s)", self, test, os.getpid())
230        wrapper = self.config.plugins.prepareTest(test)
231        if wrapper is not None:
232            test = wrapper
234        # plugins can decorate or capture the output stream
235        wrapped = self.config.plugins.setOutputStream(
236        if wrapped is not None:
237   = wrapped
239        testQueue = Queue()
240        resultQueue = Queue()
241        tasks = {}
242        completed = {}
243        workers = []
244        to_teardown = []
245        shouldStop = Event()
247        result = self._makeResult()
248        start = time.time()
250        # dispatch and collect results
251        # put indexes only on queue because tests aren't picklable
252        for case in self.nextBatch(test):
253            log.debug("Next batch %s (%s)", case, type(case))
254            if (isinstance(case, and
255                isinstance(case.test, failure.Failure)):
256                log.debug("Case is a Failure")
257                case(result) # run here to capture the failure
258                continue
259            # handle shared fixtures
260            if isinstance(case, ContextSuite) and self.sharedFixtures(case):
261                log.debug("%s has shared fixtures", case)
262                try:
263                    case.setUp()
264                except (KeyboardInterrupt, SystemExit):
265                    raise
266                except:
267                    log.debug("%s setup failed", sys.exc_info())
268                    result.addError(case, sys.exc_info())
269                else:
270                    to_teardown.append(case)
271                    for _t in case:
272                        test_addr = self.address(_t)
273                        testQueue.put(test_addr, block=False)
274                        tasks[test_addr] = None
275                        log.debug("Queued shared-fixture test %s (%s) to %s",
276                                  len(tasks), test_addr, testQueue)
278            else:
279                test_addr = self.address(case)
280                testQueue.put(test_addr, block=False)
281                tasks[test_addr] = None
282                log.debug("Queued test %s (%s) to %s",
283                          len(tasks), test_addr, testQueue)
285        log.debug("Starting %s workers", self.config.multiprocess_workers)
286        for i in range(self.config.multiprocess_workers):
287            p = Process(target=runner, args=(i,
288                                             testQueue,
289                                             resultQueue,
290                                             shouldStop,
291                                             self.loaderClass,
292                                             result.__class__,
293                                             self.config))
294            # p.setDaemon(True)
295            p.start()
296            workers.append(p)
297            log.debug("Started worker process %s", i+1)
299        num_tasks = len(tasks)
300        while tasks:
301            log.debug("Waiting for results (%s/%s tasks)",
302                      len(completed), num_tasks)
303            try:
304                addr, batch_result = resultQueue.get(
305                    timeout=self.config.multiprocess_timeout)
306                log.debug('Results received for %s', addr)
307                try:
308                    tasks.pop(addr)
309                except KeyError:
310                    log.debug("Got result for unknown task? %s", addr)
311                else:
312                    completed[addr] = batch_result
313                self.consolidate(result, batch_result)
314                if (self.config.stopOnError
315                    and not result.wasSuccessful()):
316                    # set the stop condition
317                    shouldStop.set()
318                    break
319            except Empty:
320                log.debug("Timed out with %s tasks pending", len(tasks))
321                any_alive = False
322                for w in workers:
323                    if w.is_alive():
324                        any_alive = True
325                        break
326                if not any_alive:
327                    log.debug("All workers dead")
328                    break
329        log.debug("Completed %s/%s tasks (%s remain)",
330                  len(completed), num_tasks, len(tasks))
332        for case in to_teardown:
333            log.debug("Tearing down shared fixtures for %s", case)
334            try:
335                case.tearDown()
336            except (KeyboardInterrupt, SystemExit):
337                raise
338            except:
339                result.addError(case, sys.exc_info())
341        stop = time.time()
343        result.printErrors()
344        result.printSummary(start, stop)
345        self.config.plugins.finalize(result)
347        # Tell all workers to stop
348        for w in workers:
349            if w.is_alive():
350                testQueue.put('STOP', block=False)
352        return result
354    def address(self, case):
355        if hasattr(case, 'address'):
356            file, mod, call = case.address()
357        elif hasattr(case, 'context'):
358            file, mod, call = test_address(case.context)
359        else:
360            raise Exception("Unable to convert %s to address" % case)
361        parts = []
362        if file is None:
363            if mod is None:
364                raise Exception("Unaddressable case %s" % case)
365            else:
366                parts.append(mod)
367        else:
368            parts.append(file)
369        if call is not None:
370            parts.append(call)
371        return ':'.join(map(str, parts))
373    def nextBatch(self, test):
374        # allows tests or suites to mark themselves as not safe
375        # for multiprocess execution
376        if hasattr(test, 'context'):
377            if not getattr(test.context, '_multiprocess_', True):
378                return
380        if ((isinstance(test, ContextSuite)
381             and test.hasFixtures(self.checkCanSplit))
382            or not getattr(test, 'can_split', True)
383            or not isinstance(test, unittest.TestSuite)):
384            # regular test case, or a suite with context fixtures
386            # special case: when run like nosetests path/to/
387            # the top-level suite has only one item, and it shares
388            # the same context as that item. In that case, we want the
389            # item, not the top-level suite
390            if isinstance(test, ContextSuite):
391                contained = list(test)
392                if (len(contained) == 1
393                    and getattr(contained[0], 'context', None) == test.context):
394                    test = contained[0]
395            yield test
396        else:
397            # Suite is without fixtures at this level; but it may have
398            # fixtures at any deeper level, so we need to examine it all
399            # the way down to the case level
400            for case in test:
401                for batch in self.nextBatch(case):
402                    yield batch
404    def checkCanSplit(self, context, fixt):
405        """
406        Callback that we use to check whether the fixtures found in a
407        context or ancestor are ones we care about.
409        Contexts can tell us that their fixtures are reentrant by setting
410        _multiprocess_can_split_. So if we see that, we return False to
411        disregard those fixtures.
412        """
413        if not fixt:
414            return False
415        if getattr(context, '_multiprocess_can_split_', False):
416            return False
417        return True
419    def sharedFixtures(self, case):
420        context = getattr(case, 'context', None)
421        if not context:
422            return False
423        return getattr(context, '_multiprocess_shared_', False)
425    def consolidate(self, result, batch_result):
426        log.debug("batch result is %s" , batch_result)
427        try:
428            output, testsRun, failures, errors, errorClasses = batch_result
429        except ValueError:
430            log.debug("result in unexpected format %s", batch_result)
431            failure.Failure(*sys.exc_info())(result)
432            return
434        result.testsRun += testsRun
435        result.failures.extend(failures)
436        result.errors.extend(errors)
437        for key, (storage, label, isfail) in errorClasses.items():
438            if key not in result.errorClasses:
439                # Ordinarily storage is result attribute
440                # but it's only processed through the errorClasses
441                # dict, so it's ok to fake it here
442                result.errorClasses[key] = ([], label, isfail)
443            mystorage, _junk, _junk = result.errorClasses[key]
444            mystorage.extend(storage)
445        log.debug("Ran %s tests (%s)", testsRun, result.testsRun)
448def runner(ix, testQueue, resultQueue, shouldStop,
449           loaderClass, resultClass, config):
450    log.debug("Worker %s executing", ix)
451    loader = loaderClass(config=config)
452    loader.suiteClass.suiteClass = NoSharedFixtureContextSuite
454    def get():
455        case = testQueue.get(timeout=config.multiprocess_timeout)
456        return case
458    def makeResult():
459        stream = unittest._WritelnDecorator(StringIO())
460        result = resultClass(stream, descriptions=1,
461                             verbosity=config.verbosity,
462                             config=config)
463        plug_result = config.plugins.prepareTestResult(result)
464        if plug_result:
465            return plug_result
466        return result
468    def batch(result):
469        failures = [(TestLet(c), err) for c, err in result.failures]
470        errors = [(TestLet(c), err) for c, err in result.errors]
471        errorClasses = {}
472        for key, (storage, label, isfail) in result.errorClasses.items():
473            errorClasses[key] = ([(TestLet(c), err) for c, err in storage],
474                                 label, isfail)
475        return (
476  ,
477            result.testsRun,
478            failures,
479            errors,
480            errorClasses)
481    try:
482        try:
483            for test_addr in iter(get, 'STOP'):
484                if shouldStop.is_set():
485                    break
486                result = makeResult()
487                test = loader.loadTestsFromNames([test_addr])
488                log.debug("Worker %s Test is %s (%s)", ix, test_addr, test)
490                try:
491                    test(result)
492                    resultQueue.put((test_addr, batch(result)))
493                except KeyboardInterrupt, SystemExit:
494                    raise
495                except:
496                    log.exception("Error running test or returning results")
497                    failure.Failure(*sys.exc_info())(result)
498                    resultQueue.put((test_addr, batch(result)))
499        except Empty:
500            log.debug("Worker %s timed out waiting for tasks", ix)
501    finally:
502        testQueue.close()
503        resultQueue.close()
504    log.debug("Worker %s ending", ix)
507class NoSharedFixtureContextSuite(ContextSuite):
508    """
509    Context suite that never fires shared fixtures.
511    When a context sets _multiprocess_shared_, fixtures in that context
512    are executed by the main process. Using this suite class prevents them
513    from executing in the runner process as well.
515    """
517    def setupContext(self, context):
518        if getattr(context, '_multiprocess_shared_', False):
519            return
520        super(NoSharedFixtureContextSuite, self).setupContext(context)
522    def teardownContext(self, context):
523        if getattr(context, '_multiprocess_shared_', False):
524            return
525        super(NoSharedFixtureContextSuite, self).teardownContext(context)
