1 | """ |
---|
2 | Overview |
---|
3 | ======== |
---|
4 | |
---|
5 | .. warning :: |
---|
6 | |
---|
7 | The multiprocess plugin is not available on Windows. |
---|
8 | |
---|
9 | The multiprocess plugin enables you to distribute your test run among a set of |
---|
10 | worker processes that run tests in parallel. This can speed up CPU-bound test |
---|
11 | runs (as long as the number of work processeses is around the number of |
---|
12 | processors or cores available), but is mainly useful for IO-bound tests that |
---|
13 | spend most of their time waiting for data to arrive from someplace else. |
---|
14 | |
---|
15 | .. note :: |
---|
16 | |
---|
17 | See :doc:`../doc_tests/test_multiprocess/multiprocess` for additional |
---|
18 | documentation and examples. Use of this plugin requires the |
---|
19 | multiprocessing_ module, also available from PyPI. |
---|
20 | |
---|
21 | .. _multiprocessing : http://code.google.com/p/python-multiprocessing/ |
---|
22 | |
---|
23 | How tests are distributed |
---|
24 | ========================= |
---|
25 | |
---|
26 | The ideal case would be to dispatch each test to a worker process |
---|
27 | separately. This ideal is not attainable in all cases, however, because many |
---|
28 | test suites depend on context (class, module or package) fixtures. |
---|
29 | |
---|
30 | The plugin can't know (unless you tell it -- see below!) if a context fixture |
---|
31 | can be called many times concurrently (is re-entrant), or if it can be shared |
---|
32 | among tests running in different processes. Therefore, if a context has |
---|
33 | fixtures, the default behavior is to dispatch the entire suite to a worker as |
---|
34 | a unit. |
---|
35 | |
---|
36 | Controlling distribution |
---|
37 | ^^^^^^^^^^^^^^^^^^^^^^^^ |
---|
38 | |
---|
39 | There are two context-level variables that you can use to control this default |
---|
40 | behavior. |
---|
41 | |
---|
42 | If a context's fixtures are re-entrant, set ``_multiprocess_can_split_ = True`` |
---|
43 | in the context, and the plugin will dispatch tests in suites bound to that |
---|
44 | context as if the context had no fixtures. This means that the fixtures will |
---|
45 | execute concurrently and multiple times, typically once per test. |
---|
46 | |
---|
47 | If a context's fixtures can be shared by tests running in different processes |
---|
48 | -- such as a package-level fixture that starts an external http server or |
---|
49 | initializes a shared database -- then set ``_multiprocess_shared_ = True`` in |
---|
50 | the context. These fixtures will then execute in the primary nose process, and |
---|
51 | tests in those contexts will be individually dispatched to run in parallel. |
---|
52 | |
---|
53 | How results are collected and reported |
---|
54 | ====================================== |
---|
55 | |
---|
56 | As each test or suite executes in a worker process, results (failures, errors, |
---|
57 | and specially handled exceptions like SkipTest) are collected in that |
---|
58 | process. When the worker process finishes, it returns results to the main |
---|
59 | nose process. There, any progress output is printed (dots!), and the |
---|
60 | results from the test run are combined into a consolidated result |
---|
61 | set. When results have been received for all dispatched tests, or all |
---|
62 | workers have died, the result summary is output as normal. |
---|
63 | |
---|
64 | Beware! |
---|
65 | ======= |
---|
66 | |
---|
67 | Not all test suites will benefit from, or even operate correctly using, this |
---|
68 | plugin. For example, CPU-bound tests will run more slowly if you don't have |
---|
69 | multiple processors. There are also some differences in plugin |
---|
70 | interactions and behaviors due to the way in which tests are dispatched and |
---|
71 | loaded. In general, test loading under this plugin operates as if it were |
---|
72 | always in directed mode instead of discovered mode. For instance, doctests |
---|
73 | in test modules will always be found when using this plugin with the doctest |
---|
74 | plugin. |
---|
75 | |
---|
76 | But the biggest issue you will face is probably concurrency. Unless you |
---|
77 | have kept your tests as religiously pure unit tests, with no side-effects, no |
---|
78 | ordering issues, and no external dependencies, chances are you will experience |
---|
79 | odd, intermittent and unexplainable failures and errors when using this |
---|
80 | plugin. This doesn't necessarily mean the plugin is broken; it may mean that |
---|
81 | your test suite is not safe for concurrency. |
---|
82 | |
---|
83 | """ |
---|
84 | import logging |
---|
85 | import os |
---|
86 | import sys |
---|
87 | import time |
---|
88 | import traceback |
---|
89 | import unittest |
---|
90 | import nose.case |
---|
91 | from nose.core import TextTestRunner |
---|
92 | from nose import failure |
---|
93 | from nose import loader |
---|
94 | from nose.plugins.base import Plugin |
---|
95 | from nose.result import TextTestResult |
---|
96 | from nose.suite import ContextSuite |
---|
97 | from nose.util import test_address |
---|
98 | from Queue import Empty |
---|
99 | from warnings import warn |
---|
100 | try: |
---|
101 | from cStringIO import StringIO |
---|
102 | except ImportError: |
---|
103 | import StringIO |
---|
104 | |
---|
105 | log = logging.getLogger(__name__) |
---|
106 | |
---|
107 | Process = Queue = Pool = Event = None |
---|
108 | |
---|
109 | def _import_mp(): |
---|
110 | global Process, Queue, Pool, Event |
---|
111 | if sys.platform == 'win32': |
---|
112 | warn("multiprocess plugin is not available on windows", |
---|
113 | RuntimeWarning) |
---|
114 | return |
---|
115 | try: |
---|
116 | from multiprocessing import Process as Process_, \ |
---|
117 | Queue as Queue_, Pool as Pool_, Event as Event_ |
---|
118 | Process, Queue, Pool, Event = Process_, Queue_, Pool_, Event_ |
---|
119 | except ImportError: |
---|
120 | warn("multiprocessing module is not available, multiprocess plugin " |
---|
121 | "cannot be used", RuntimeWarning) |
---|
122 | |
---|
123 | |
---|
124 | class TestLet: |
---|
125 | def __init__(self, case): |
---|
126 | try: |
---|
127 | self._id = case.id() |
---|
128 | except AttributeError: |
---|
129 | pass |
---|
130 | self._short_description = case.shortDescription() |
---|
131 | self._str = str(case) |
---|
132 | |
---|
133 | def id(self): |
---|
134 | return self._id |
---|
135 | |
---|
136 | def shortDescription(self): |
---|
137 | return self._short_description |
---|
138 | |
---|
139 | def __str__(self): |
---|
140 | return self._str |
---|
141 | |
---|
142 | |
---|
143 | class MultiProcess(Plugin): |
---|
144 | """ |
---|
145 | Run tests in multiple processes. Requires processing module. |
---|
146 | """ |
---|
147 | score = 1000 |
---|
148 | status = {} |
---|
149 | |
---|
150 | def options(self, parser, env): |
---|
151 | """ |
---|
152 | Register command-line options. |
---|
153 | """ |
---|
154 | if sys.platform == 'win32': |
---|
155 | return |
---|
156 | parser.add_option("--processes", action="store", |
---|
157 | default=env.get('NOSE_PROCESSES', 0), |
---|
158 | dest="multiprocess_workers", |
---|
159 | metavar="NUM", |
---|
160 | help="Spread test run among this many processes. " |
---|
161 | "Set a number equal to the number of processors " |
---|
162 | "or cores in your machine for best results. " |
---|
163 | "[NOSE_PROCESSES]") |
---|
164 | parser.add_option("--process-timeout", action="store", |
---|
165 | default=env.get('NOSE_PROCESS_TIMEOUT', 10), |
---|
166 | dest="multiprocess_timeout", |
---|
167 | metavar="SECONDS", |
---|
168 | help="Set timeout for return of results from each " |
---|
169 | "test runner process. [NOSE_PROCESS_TIMEOUT]") |
---|
170 | |
---|
171 | def configure(self, options, config): |
---|
172 | """ |
---|
173 | Configure plugin. |
---|
174 | """ |
---|
175 | if sys.platform == 'win32': |
---|
176 | return |
---|
177 | try: |
---|
178 | self.status.pop('active') |
---|
179 | except KeyError: |
---|
180 | pass |
---|
181 | if not hasattr(options, 'multiprocess_workers'): |
---|
182 | self.enabled = False |
---|
183 | return |
---|
184 | self.config = config |
---|
185 | try: |
---|
186 | workers = int(options.multiprocess_workers) |
---|
187 | except (TypeError, ValueError): |
---|
188 | workers = 0 |
---|
189 | if workers: |
---|
190 | _import_mp() |
---|
191 | if Process is None: |
---|
192 | self.enabled = False |
---|
193 | return |
---|
194 | self.enabled = True |
---|
195 | self.config.multiprocess_workers = workers |
---|
196 | self.config.multiprocess_timeout = int(options.multiprocess_timeout) |
---|
197 | self.status['active'] = True |
---|
198 | |
---|
199 | def prepareTestLoader(self, loader): |
---|
200 | """Remember loader class so MultiProcessTestRunner can instantiate |
---|
201 | the right loader. |
---|
202 | """ |
---|
203 | self.loaderClass = loader.__class__ |
---|
204 | |
---|
205 | def prepareTestRunner(self, runner): |
---|
206 | """Replace test runner with MultiProcessTestRunner. |
---|
207 | """ |
---|
208 | # replace with our runner class |
---|
209 | return MultiProcessTestRunner(stream=runner.stream, |
---|
210 | verbosity=self.config.verbosity, |
---|
211 | config=self.config, |
---|
212 | loaderClass=self.loaderClass) |
---|
213 | |
---|
214 | |
---|
215 | class MultiProcessTestRunner(TextTestRunner): |
---|
216 | |
---|
217 | def __init__(self, **kw): |
---|
218 | self.loaderClass = kw.pop('loaderClass', loader.defaultTestLoader) |
---|
219 | super(MultiProcessTestRunner, self).__init__(**kw) |
---|
220 | |
---|
221 | def run(self, test): |
---|
222 | """ |
---|
223 | Execute the test (which may be a test suite). If the test is a suite, |
---|
224 | distribute it out among as many processes as have been configured, at |
---|
225 | as fine a level as is possible given the context fixtures defined in the |
---|
226 | suite or any sub-suites. |
---|
227 | |
---|
228 | """ |
---|
229 | log.debug("%s.run(%s) (%s)", self, test, os.getpid()) |
---|
230 | wrapper = self.config.plugins.prepareTest(test) |
---|
231 | if wrapper is not None: |
---|
232 | test = wrapper |
---|
233 | |
---|
234 | # plugins can decorate or capture the output stream |
---|
235 | wrapped = self.config.plugins.setOutputStream(self.stream) |
---|
236 | if wrapped is not None: |
---|
237 | self.stream = wrapped |
---|
238 | |
---|
239 | testQueue = Queue() |
---|
240 | resultQueue = Queue() |
---|
241 | tasks = {} |
---|
242 | completed = {} |
---|
243 | workers = [] |
---|
244 | to_teardown = [] |
---|
245 | shouldStop = Event() |
---|
246 | |
---|
247 | result = self._makeResult() |
---|
248 | start = time.time() |
---|
249 | |
---|
250 | # dispatch and collect results |
---|
251 | # put indexes only on queue because tests aren't picklable |
---|
252 | for case in self.nextBatch(test): |
---|
253 | log.debug("Next batch %s (%s)", case, type(case)) |
---|
254 | if (isinstance(case, nose.case.Test) and |
---|
255 | isinstance(case.test, failure.Failure)): |
---|
256 | log.debug("Case is a Failure") |
---|
257 | case(result) # run here to capture the failure |
---|
258 | continue |
---|
259 | # handle shared fixtures |
---|
260 | if isinstance(case, ContextSuite) and self.sharedFixtures(case): |
---|
261 | log.debug("%s has shared fixtures", case) |
---|
262 | try: |
---|
263 | case.setUp() |
---|
264 | except (KeyboardInterrupt, SystemExit): |
---|
265 | raise |
---|
266 | except: |
---|
267 | log.debug("%s setup failed", sys.exc_info()) |
---|
268 | result.addError(case, sys.exc_info()) |
---|
269 | else: |
---|
270 | to_teardown.append(case) |
---|
271 | for _t in case: |
---|
272 | test_addr = self.address(_t) |
---|
273 | testQueue.put(test_addr, block=False) |
---|
274 | tasks[test_addr] = None |
---|
275 | log.debug("Queued shared-fixture test %s (%s) to %s", |
---|
276 | len(tasks), test_addr, testQueue) |
---|
277 | |
---|
278 | else: |
---|
279 | test_addr = self.address(case) |
---|
280 | testQueue.put(test_addr, block=False) |
---|
281 | tasks[test_addr] = None |
---|
282 | log.debug("Queued test %s (%s) to %s", |
---|
283 | len(tasks), test_addr, testQueue) |
---|
284 | |
---|
285 | log.debug("Starting %s workers", self.config.multiprocess_workers) |
---|
286 | for i in range(self.config.multiprocess_workers): |
---|
287 | p = Process(target=runner, args=(i, |
---|
288 | testQueue, |
---|
289 | resultQueue, |
---|
290 | shouldStop, |
---|
291 | self.loaderClass, |
---|
292 | result.__class__, |
---|
293 | self.config)) |
---|
294 | # p.setDaemon(True) |
---|
295 | p.start() |
---|
296 | workers.append(p) |
---|
297 | log.debug("Started worker process %s", i+1) |
---|
298 | |
---|
299 | num_tasks = len(tasks) |
---|
300 | while tasks: |
---|
301 | log.debug("Waiting for results (%s/%s tasks)", |
---|
302 | len(completed), num_tasks) |
---|
303 | try: |
---|
304 | addr, batch_result = resultQueue.get( |
---|
305 | timeout=self.config.multiprocess_timeout) |
---|
306 | log.debug('Results received for %s', addr) |
---|
307 | try: |
---|
308 | tasks.pop(addr) |
---|
309 | except KeyError: |
---|
310 | log.debug("Got result for unknown task? %s", addr) |
---|
311 | else: |
---|
312 | completed[addr] = batch_result |
---|
313 | self.consolidate(result, batch_result) |
---|
314 | if (self.config.stopOnError |
---|
315 | and not result.wasSuccessful()): |
---|
316 | # set the stop condition |
---|
317 | shouldStop.set() |
---|
318 | break |
---|
319 | except Empty: |
---|
320 | log.debug("Timed out with %s tasks pending", len(tasks)) |
---|
321 | any_alive = False |
---|
322 | for w in workers: |
---|
323 | if w.is_alive(): |
---|
324 | any_alive = True |
---|
325 | break |
---|
326 | if not any_alive: |
---|
327 | log.debug("All workers dead") |
---|
328 | break |
---|
329 | log.debug("Completed %s/%s tasks (%s remain)", |
---|
330 | len(completed), num_tasks, len(tasks)) |
---|
331 | |
---|
332 | for case in to_teardown: |
---|
333 | log.debug("Tearing down shared fixtures for %s", case) |
---|
334 | try: |
---|
335 | case.tearDown() |
---|
336 | except (KeyboardInterrupt, SystemExit): |
---|
337 | raise |
---|
338 | except: |
---|
339 | result.addError(case, sys.exc_info()) |
---|
340 | |
---|
341 | stop = time.time() |
---|
342 | |
---|
343 | result.printErrors() |
---|
344 | result.printSummary(start, stop) |
---|
345 | self.config.plugins.finalize(result) |
---|
346 | |
---|
347 | # Tell all workers to stop |
---|
348 | for w in workers: |
---|
349 | if w.is_alive(): |
---|
350 | testQueue.put('STOP', block=False) |
---|
351 | |
---|
352 | return result |
---|
353 | |
---|
354 | def address(self, case): |
---|
355 | if hasattr(case, 'address'): |
---|
356 | file, mod, call = case.address() |
---|
357 | elif hasattr(case, 'context'): |
---|
358 | file, mod, call = test_address(case.context) |
---|
359 | else: |
---|
360 | raise Exception("Unable to convert %s to address" % case) |
---|
361 | parts = [] |
---|
362 | if file is None: |
---|
363 | if mod is None: |
---|
364 | raise Exception("Unaddressable case %s" % case) |
---|
365 | else: |
---|
366 | parts.append(mod) |
---|
367 | else: |
---|
368 | parts.append(file) |
---|
369 | if call is not None: |
---|
370 | parts.append(call) |
---|
371 | return ':'.join(map(str, parts)) |
---|
372 | |
---|
373 | def nextBatch(self, test): |
---|
374 | # allows tests or suites to mark themselves as not safe |
---|
375 | # for multiprocess execution |
---|
376 | if hasattr(test, 'context'): |
---|
377 | if not getattr(test.context, '_multiprocess_', True): |
---|
378 | return |
---|
379 | |
---|
380 | if ((isinstance(test, ContextSuite) |
---|
381 | and test.hasFixtures(self.checkCanSplit)) |
---|
382 | or not getattr(test, 'can_split', True) |
---|
383 | or not isinstance(test, unittest.TestSuite)): |
---|
384 | # regular test case, or a suite with context fixtures |
---|
385 | |
---|
386 | # special case: when run like nosetests path/to/module.py |
---|
387 | # the top-level suite has only one item, and it shares |
---|
388 | # the same context as that item. In that case, we want the |
---|
389 | # item, not the top-level suite |
---|
390 | if isinstance(test, ContextSuite): |
---|
391 | contained = list(test) |
---|
392 | if (len(contained) == 1 |
---|
393 | and getattr(contained[0], 'context', None) == test.context): |
---|
394 | test = contained[0] |
---|
395 | yield test |
---|
396 | else: |
---|
397 | # Suite is without fixtures at this level; but it may have |
---|
398 | # fixtures at any deeper level, so we need to examine it all |
---|
399 | # the way down to the case level |
---|
400 | for case in test: |
---|
401 | for batch in self.nextBatch(case): |
---|
402 | yield batch |
---|
403 | |
---|
404 | def checkCanSplit(self, context, fixt): |
---|
405 | """ |
---|
406 | Callback that we use to check whether the fixtures found in a |
---|
407 | context or ancestor are ones we care about. |
---|
408 | |
---|
409 | Contexts can tell us that their fixtures are reentrant by setting |
---|
410 | _multiprocess_can_split_. So if we see that, we return False to |
---|
411 | disregard those fixtures. |
---|
412 | """ |
---|
413 | if not fixt: |
---|
414 | return False |
---|
415 | if getattr(context, '_multiprocess_can_split_', False): |
---|
416 | return False |
---|
417 | return True |
---|
418 | |
---|
419 | def sharedFixtures(self, case): |
---|
420 | context = getattr(case, 'context', None) |
---|
421 | if not context: |
---|
422 | return False |
---|
423 | return getattr(context, '_multiprocess_shared_', False) |
---|
424 | |
---|
425 | def consolidate(self, result, batch_result): |
---|
426 | log.debug("batch result is %s" , batch_result) |
---|
427 | try: |
---|
428 | output, testsRun, failures, errors, errorClasses = batch_result |
---|
429 | except ValueError: |
---|
430 | log.debug("result in unexpected format %s", batch_result) |
---|
431 | failure.Failure(*sys.exc_info())(result) |
---|
432 | return |
---|
433 | self.stream.write(output) |
---|
434 | result.testsRun += testsRun |
---|
435 | result.failures.extend(failures) |
---|
436 | result.errors.extend(errors) |
---|
437 | for key, (storage, label, isfail) in errorClasses.items(): |
---|
438 | if key not in result.errorClasses: |
---|
439 | # Ordinarily storage is result attribute |
---|
440 | # but it's only processed through the errorClasses |
---|
441 | # dict, so it's ok to fake it here |
---|
442 | result.errorClasses[key] = ([], label, isfail) |
---|
443 | mystorage, _junk, _junk = result.errorClasses[key] |
---|
444 | mystorage.extend(storage) |
---|
445 | log.debug("Ran %s tests (%s)", testsRun, result.testsRun) |
---|
446 | |
---|
447 | |
---|
448 | def runner(ix, testQueue, resultQueue, shouldStop, |
---|
449 | loaderClass, resultClass, config): |
---|
450 | log.debug("Worker %s executing", ix) |
---|
451 | loader = loaderClass(config=config) |
---|
452 | loader.suiteClass.suiteClass = NoSharedFixtureContextSuite |
---|
453 | |
---|
454 | def get(): |
---|
455 | case = testQueue.get(timeout=config.multiprocess_timeout) |
---|
456 | return case |
---|
457 | |
---|
458 | def makeResult(): |
---|
459 | stream = unittest._WritelnDecorator(StringIO()) |
---|
460 | result = resultClass(stream, descriptions=1, |
---|
461 | verbosity=config.verbosity, |
---|
462 | config=config) |
---|
463 | plug_result = config.plugins.prepareTestResult(result) |
---|
464 | if plug_result: |
---|
465 | return plug_result |
---|
466 | return result |
---|
467 | |
---|
468 | def batch(result): |
---|
469 | failures = [(TestLet(c), err) for c, err in result.failures] |
---|
470 | errors = [(TestLet(c), err) for c, err in result.errors] |
---|
471 | errorClasses = {} |
---|
472 | for key, (storage, label, isfail) in result.errorClasses.items(): |
---|
473 | errorClasses[key] = ([(TestLet(c), err) for c, err in storage], |
---|
474 | label, isfail) |
---|
475 | return ( |
---|
476 | result.stream.getvalue(), |
---|
477 | result.testsRun, |
---|
478 | failures, |
---|
479 | errors, |
---|
480 | errorClasses) |
---|
481 | try: |
---|
482 | try: |
---|
483 | for test_addr in iter(get, 'STOP'): |
---|
484 | if shouldStop.is_set(): |
---|
485 | break |
---|
486 | result = makeResult() |
---|
487 | test = loader.loadTestsFromNames([test_addr]) |
---|
488 | log.debug("Worker %s Test is %s (%s)", ix, test_addr, test) |
---|
489 | |
---|
490 | try: |
---|
491 | test(result) |
---|
492 | resultQueue.put((test_addr, batch(result))) |
---|
493 | except KeyboardInterrupt, SystemExit: |
---|
494 | raise |
---|
495 | except: |
---|
496 | log.exception("Error running test or returning results") |
---|
497 | failure.Failure(*sys.exc_info())(result) |
---|
498 | resultQueue.put((test_addr, batch(result))) |
---|
499 | except Empty: |
---|
500 | log.debug("Worker %s timed out waiting for tasks", ix) |
---|
501 | finally: |
---|
502 | testQueue.close() |
---|
503 | resultQueue.close() |
---|
504 | log.debug("Worker %s ending", ix) |
---|
505 | |
---|
506 | |
---|
507 | class NoSharedFixtureContextSuite(ContextSuite): |
---|
508 | """ |
---|
509 | Context suite that never fires shared fixtures. |
---|
510 | |
---|
511 | When a context sets _multiprocess_shared_, fixtures in that context |
---|
512 | are executed by the main process. Using this suite class prevents them |
---|
513 | from executing in the runner process as well. |
---|
514 | |
---|
515 | """ |
---|
516 | |
---|
517 | def setupContext(self, context): |
---|
518 | if getattr(context, '_multiprocess_shared_', False): |
---|
519 | return |
---|
520 | super(NoSharedFixtureContextSuite, self).setupContext(context) |
---|
521 | |
---|
522 | def teardownContext(self, context): |
---|
523 | if getattr(context, '_multiprocess_shared_', False): |
---|
524 | return |
---|
525 | super(NoSharedFixtureContextSuite, self).teardownContext(context) |
---|