[3] | 1 | """ |
---|
| 2 | Overview |
---|
| 3 | ======== |
---|
| 4 | |
---|
| 5 | .. warning :: |
---|
| 6 | |
---|
| 7 | The multiprocess plugin is not available on Windows. |
---|
| 8 | |
---|
| 9 | The multiprocess plugin enables you to distribute your test run among a set of |
---|
| 10 | worker processes that run tests in parallel. This can speed up CPU-bound test |
---|
| 11 | runs (as long as the number of work processeses is around the number of |
---|
| 12 | processors or cores available), but is mainly useful for IO-bound tests that |
---|
| 13 | spend most of their time waiting for data to arrive from someplace else. |
---|
| 14 | |
---|
| 15 | .. note :: |
---|
| 16 | |
---|
| 17 | See :doc:`../doc_tests/test_multiprocess/multiprocess` for additional |
---|
| 18 | documentation and examples. Use of this plugin requires the |
---|
| 19 | multiprocessing_ module, also available from PyPI. |
---|
| 20 | |
---|
| 21 | .. _multiprocessing : http://code.google.com/p/python-multiprocessing/ |
---|
| 22 | |
---|
| 23 | How tests are distributed |
---|
| 24 | ========================= |
---|
| 25 | |
---|
| 26 | The ideal case would be to dispatch each test to a worker process |
---|
| 27 | separately. This ideal is not attainable in all cases, however, because many |
---|
| 28 | test suites depend on context (class, module or package) fixtures. |
---|
| 29 | |
---|
| 30 | The plugin can't know (unless you tell it -- see below!) if a context fixture |
---|
| 31 | can be called many times concurrently (is re-entrant), or if it can be shared |
---|
| 32 | among tests running in different processes. Therefore, if a context has |
---|
| 33 | fixtures, the default behavior is to dispatch the entire suite to a worker as |
---|
| 34 | a unit. |
---|
| 35 | |
---|
| 36 | Controlling distribution |
---|
| 37 | ^^^^^^^^^^^^^^^^^^^^^^^^ |
---|
| 38 | |
---|
| 39 | There are two context-level variables that you can use to control this default |
---|
| 40 | behavior. |
---|
| 41 | |
---|
| 42 | If a context's fixtures are re-entrant, set ``_multiprocess_can_split_ = True`` |
---|
| 43 | in the context, and the plugin will dispatch tests in suites bound to that |
---|
| 44 | context as if the context had no fixtures. This means that the fixtures will |
---|
| 45 | execute concurrently and multiple times, typically once per test. |
---|
| 46 | |
---|
| 47 | If a context's fixtures can be shared by tests running in different processes |
---|
| 48 | -- such as a package-level fixture that starts an external http server or |
---|
| 49 | initializes a shared database -- then set ``_multiprocess_shared_ = True`` in |
---|
| 50 | the context. These fixtures will then execute in the primary nose process, and |
---|
| 51 | tests in those contexts will be individually dispatched to run in parallel. |
---|
| 52 | |
---|
| 53 | How results are collected and reported |
---|
| 54 | ====================================== |
---|
| 55 | |
---|
| 56 | As each test or suite executes in a worker process, results (failures, errors, |
---|
| 57 | and specially handled exceptions like SkipTest) are collected in that |
---|
| 58 | process. When the worker process finishes, it returns results to the main |
---|
| 59 | nose process. There, any progress output is printed (dots!), and the |
---|
| 60 | results from the test run are combined into a consolidated result |
---|
| 61 | set. When results have been received for all dispatched tests, or all |
---|
| 62 | workers have died, the result summary is output as normal. |
---|
| 63 | |
---|
| 64 | Beware! |
---|
| 65 | ======= |
---|
| 66 | |
---|
| 67 | Not all test suites will benefit from, or even operate correctly using, this |
---|
| 68 | plugin. For example, CPU-bound tests will run more slowly if you don't have |
---|
| 69 | multiple processors. There are also some differences in plugin |
---|
| 70 | interactions and behaviors due to the way in which tests are dispatched and |
---|
| 71 | loaded. In general, test loading under this plugin operates as if it were |
---|
| 72 | always in directed mode instead of discovered mode. For instance, doctests |
---|
| 73 | in test modules will always be found when using this plugin with the doctest |
---|
| 74 | plugin. |
---|
| 75 | |
---|
| 76 | But the biggest issue you will face is probably concurrency. Unless you |
---|
| 77 | have kept your tests as religiously pure unit tests, with no side-effects, no |
---|
| 78 | ordering issues, and no external dependencies, chances are you will experience |
---|
| 79 | odd, intermittent and unexplainable failures and errors when using this |
---|
| 80 | plugin. This doesn't necessarily mean the plugin is broken; it may mean that |
---|
| 81 | your test suite is not safe for concurrency. |
---|
| 82 | |
---|
| 83 | """ |
---|
| 84 | import logging |
---|
| 85 | import os |
---|
| 86 | import sys |
---|
| 87 | import time |
---|
| 88 | import traceback |
---|
| 89 | import unittest |
---|
| 90 | import nose.case |
---|
| 91 | from nose.core import TextTestRunner |
---|
| 92 | from nose import failure |
---|
| 93 | from nose import loader |
---|
| 94 | from nose.plugins.base import Plugin |
---|
| 95 | from nose.result import TextTestResult |
---|
| 96 | from nose.suite import ContextSuite |
---|
| 97 | from nose.util import test_address |
---|
| 98 | from Queue import Empty |
---|
| 99 | from warnings import warn |
---|
| 100 | try: |
---|
| 101 | from cStringIO import StringIO |
---|
| 102 | except ImportError: |
---|
| 103 | import StringIO |
---|
| 104 | |
---|
| 105 | log = logging.getLogger(__name__) |
---|
| 106 | |
---|
| 107 | Process = Queue = Pool = Event = None |
---|
| 108 | |
---|
| 109 | def _import_mp(): |
---|
| 110 | global Process, Queue, Pool, Event |
---|
| 111 | if sys.platform == 'win32': |
---|
| 112 | warn("multiprocess plugin is not available on windows", |
---|
| 113 | RuntimeWarning) |
---|
| 114 | return |
---|
| 115 | try: |
---|
| 116 | from multiprocessing import Process as Process_, \ |
---|
| 117 | Queue as Queue_, Pool as Pool_, Event as Event_ |
---|
| 118 | Process, Queue, Pool, Event = Process_, Queue_, Pool_, Event_ |
---|
| 119 | except ImportError: |
---|
| 120 | warn("multiprocessing module is not available, multiprocess plugin " |
---|
| 121 | "cannot be used", RuntimeWarning) |
---|
| 122 | |
---|
| 123 | |
---|
| 124 | class TestLet: |
---|
| 125 | def __init__(self, case): |
---|
| 126 | try: |
---|
| 127 | self._id = case.id() |
---|
| 128 | except AttributeError: |
---|
| 129 | pass |
---|
| 130 | self._short_description = case.shortDescription() |
---|
| 131 | self._str = str(case) |
---|
| 132 | |
---|
| 133 | def id(self): |
---|
| 134 | return self._id |
---|
| 135 | |
---|
| 136 | def shortDescription(self): |
---|
| 137 | return self._short_description |
---|
| 138 | |
---|
| 139 | def __str__(self): |
---|
| 140 | return self._str |
---|
| 141 | |
---|
| 142 | |
---|
| 143 | class MultiProcess(Plugin): |
---|
| 144 | """ |
---|
| 145 | Run tests in multiple processes. Requires processing module. |
---|
| 146 | """ |
---|
| 147 | score = 1000 |
---|
| 148 | status = {} |
---|
| 149 | |
---|
| 150 | def options(self, parser, env): |
---|
| 151 | """ |
---|
| 152 | Register command-line options. |
---|
| 153 | """ |
---|
| 154 | if sys.platform == 'win32': |
---|
| 155 | return |
---|
| 156 | parser.add_option("--processes", action="store", |
---|
| 157 | default=env.get('NOSE_PROCESSES', 0), |
---|
| 158 | dest="multiprocess_workers", |
---|
| 159 | metavar="NUM", |
---|
| 160 | help="Spread test run among this many processes. " |
---|
| 161 | "Set a number equal to the number of processors " |
---|
| 162 | "or cores in your machine for best results. " |
---|
| 163 | "[NOSE_PROCESSES]") |
---|
| 164 | parser.add_option("--process-timeout", action="store", |
---|
| 165 | default=env.get('NOSE_PROCESS_TIMEOUT', 10), |
---|
| 166 | dest="multiprocess_timeout", |
---|
| 167 | metavar="SECONDS", |
---|
| 168 | help="Set timeout for return of results from each " |
---|
| 169 | "test runner process. [NOSE_PROCESS_TIMEOUT]") |
---|
| 170 | |
---|
| 171 | def configure(self, options, config): |
---|
| 172 | """ |
---|
| 173 | Configure plugin. |
---|
| 174 | """ |
---|
| 175 | if sys.platform == 'win32': |
---|
| 176 | return |
---|
| 177 | try: |
---|
| 178 | self.status.pop('active') |
---|
| 179 | except KeyError: |
---|
| 180 | pass |
---|
| 181 | if not hasattr(options, 'multiprocess_workers'): |
---|
| 182 | self.enabled = False |
---|
| 183 | return |
---|
| 184 | self.config = config |
---|
| 185 | try: |
---|
| 186 | workers = int(options.multiprocess_workers) |
---|
| 187 | except (TypeError, ValueError): |
---|
| 188 | workers = 0 |
---|
| 189 | if workers: |
---|
| 190 | _import_mp() |
---|
| 191 | if Process is None: |
---|
| 192 | self.enabled = False |
---|
| 193 | return |
---|
| 194 | self.enabled = True |
---|
| 195 | self.config.multiprocess_workers = workers |
---|
| 196 | self.config.multiprocess_timeout = int(options.multiprocess_timeout) |
---|
| 197 | self.status['active'] = True |
---|
| 198 | |
---|
| 199 | def prepareTestLoader(self, loader): |
---|
| 200 | """Remember loader class so MultiProcessTestRunner can instantiate |
---|
| 201 | the right loader. |
---|
| 202 | """ |
---|
| 203 | self.loaderClass = loader.__class__ |
---|
| 204 | |
---|
| 205 | def prepareTestRunner(self, runner): |
---|
| 206 | """Replace test runner with MultiProcessTestRunner. |
---|
| 207 | """ |
---|
| 208 | # replace with our runner class |
---|
| 209 | return MultiProcessTestRunner(stream=runner.stream, |
---|
| 210 | verbosity=self.config.verbosity, |
---|
| 211 | config=self.config, |
---|
| 212 | loaderClass=self.loaderClass) |
---|
| 213 | |
---|
| 214 | |
---|
| 215 | class MultiProcessTestRunner(TextTestRunner): |
---|
| 216 | |
---|
| 217 | def __init__(self, **kw): |
---|
| 218 | self.loaderClass = kw.pop('loaderClass', loader.defaultTestLoader) |
---|
| 219 | super(MultiProcessTestRunner, self).__init__(**kw) |
---|
| 220 | |
---|
| 221 | def run(self, test): |
---|
| 222 | """ |
---|
| 223 | Execute the test (which may be a test suite). If the test is a suite, |
---|
| 224 | distribute it out among as many processes as have been configured, at |
---|
| 225 | as fine a level as is possible given the context fixtures defined in the |
---|
| 226 | suite or any sub-suites. |
---|
| 227 | |
---|
| 228 | """ |
---|
| 229 | log.debug("%s.run(%s) (%s)", self, test, os.getpid()) |
---|
| 230 | wrapper = self.config.plugins.prepareTest(test) |
---|
| 231 | if wrapper is not None: |
---|
| 232 | test = wrapper |
---|
| 233 | |
---|
| 234 | # plugins can decorate or capture the output stream |
---|
| 235 | wrapped = self.config.plugins.setOutputStream(self.stream) |
---|
| 236 | if wrapped is not None: |
---|
| 237 | self.stream = wrapped |
---|
| 238 | |
---|
| 239 | testQueue = Queue() |
---|
| 240 | resultQueue = Queue() |
---|
| 241 | tasks = {} |
---|
| 242 | completed = {} |
---|
| 243 | workers = [] |
---|
| 244 | to_teardown = [] |
---|
| 245 | shouldStop = Event() |
---|
| 246 | |
---|
| 247 | result = self._makeResult() |
---|
| 248 | start = time.time() |
---|
| 249 | |
---|
| 250 | # dispatch and collect results |
---|
| 251 | # put indexes only on queue because tests aren't picklable |
---|
| 252 | for case in self.nextBatch(test): |
---|
| 253 | log.debug("Next batch %s (%s)", case, type(case)) |
---|
| 254 | if (isinstance(case, nose.case.Test) and |
---|
| 255 | isinstance(case.test, failure.Failure)): |
---|
| 256 | log.debug("Case is a Failure") |
---|
| 257 | case(result) # run here to capture the failure |
---|
| 258 | continue |
---|
| 259 | # handle shared fixtures |
---|
| 260 | if isinstance(case, ContextSuite) and self.sharedFixtures(case): |
---|
| 261 | log.debug("%s has shared fixtures", case) |
---|
| 262 | try: |
---|
| 263 | case.setUp() |
---|
| 264 | except (KeyboardInterrupt, SystemExit): |
---|
| 265 | raise |
---|
| 266 | except: |
---|
| 267 | log.debug("%s setup failed", sys.exc_info()) |
---|
| 268 | result.addError(case, sys.exc_info()) |
---|
| 269 | else: |
---|
| 270 | to_teardown.append(case) |
---|
| 271 | for _t in case: |
---|
| 272 | test_addr = self.address(_t) |
---|
| 273 | testQueue.put(test_addr, block=False) |
---|
| 274 | tasks[test_addr] = None |
---|
| 275 | log.debug("Queued shared-fixture test %s (%s) to %s", |
---|
| 276 | len(tasks), test_addr, testQueue) |
---|
| 277 | |
---|
| 278 | else: |
---|
| 279 | test_addr = self.address(case) |
---|
| 280 | testQueue.put(test_addr, block=False) |
---|
| 281 | tasks[test_addr] = None |
---|
| 282 | log.debug("Queued test %s (%s) to %s", |
---|
| 283 | len(tasks), test_addr, testQueue) |
---|
| 284 | |
---|
| 285 | log.debug("Starting %s workers", self.config.multiprocess_workers) |
---|
| 286 | for i in range(self.config.multiprocess_workers): |
---|
| 287 | p = Process(target=runner, args=(i, |
---|
| 288 | testQueue, |
---|
| 289 | resultQueue, |
---|
| 290 | shouldStop, |
---|
| 291 | self.loaderClass, |
---|
| 292 | result.__class__, |
---|
| 293 | self.config)) |
---|
| 294 | # p.setDaemon(True) |
---|
| 295 | p.start() |
---|
| 296 | workers.append(p) |
---|
| 297 | log.debug("Started worker process %s", i+1) |
---|
| 298 | |
---|
| 299 | num_tasks = len(tasks) |
---|
| 300 | while tasks: |
---|
| 301 | log.debug("Waiting for results (%s/%s tasks)", |
---|
| 302 | len(completed), num_tasks) |
---|
| 303 | try: |
---|
| 304 | addr, batch_result = resultQueue.get( |
---|
| 305 | timeout=self.config.multiprocess_timeout) |
---|
| 306 | log.debug('Results received for %s', addr) |
---|
| 307 | try: |
---|
| 308 | tasks.pop(addr) |
---|
| 309 | except KeyError: |
---|
| 310 | log.debug("Got result for unknown task? %s", addr) |
---|
| 311 | else: |
---|
| 312 | completed[addr] = batch_result |
---|
| 313 | self.consolidate(result, batch_result) |
---|
| 314 | if (self.config.stopOnError |
---|
| 315 | and not result.wasSuccessful()): |
---|
| 316 | # set the stop condition |
---|
| 317 | shouldStop.set() |
---|
| 318 | break |
---|
| 319 | except Empty: |
---|
| 320 | log.debug("Timed out with %s tasks pending", len(tasks)) |
---|
| 321 | any_alive = False |
---|
| 322 | for w in workers: |
---|
| 323 | if w.is_alive(): |
---|
| 324 | any_alive = True |
---|
| 325 | break |
---|
| 326 | if not any_alive: |
---|
| 327 | log.debug("All workers dead") |
---|
| 328 | break |
---|
| 329 | log.debug("Completed %s/%s tasks (%s remain)", |
---|
| 330 | len(completed), num_tasks, len(tasks)) |
---|
| 331 | |
---|
| 332 | for case in to_teardown: |
---|
| 333 | log.debug("Tearing down shared fixtures for %s", case) |
---|
| 334 | try: |
---|
| 335 | case.tearDown() |
---|
| 336 | except (KeyboardInterrupt, SystemExit): |
---|
| 337 | raise |
---|
| 338 | except: |
---|
| 339 | result.addError(case, sys.exc_info()) |
---|
| 340 | |
---|
| 341 | stop = time.time() |
---|
| 342 | |
---|
| 343 | result.printErrors() |
---|
| 344 | result.printSummary(start, stop) |
---|
| 345 | self.config.plugins.finalize(result) |
---|
| 346 | |
---|
| 347 | # Tell all workers to stop |
---|
| 348 | for w in workers: |
---|
| 349 | if w.is_alive(): |
---|
| 350 | testQueue.put('STOP', block=False) |
---|
| 351 | |
---|
| 352 | return result |
---|
| 353 | |
---|
| 354 | def address(self, case): |
---|
| 355 | if hasattr(case, 'address'): |
---|
| 356 | file, mod, call = case.address() |
---|
| 357 | elif hasattr(case, 'context'): |
---|
| 358 | file, mod, call = test_address(case.context) |
---|
| 359 | else: |
---|
| 360 | raise Exception("Unable to convert %s to address" % case) |
---|
| 361 | parts = [] |
---|
| 362 | if file is None: |
---|
| 363 | if mod is None: |
---|
| 364 | raise Exception("Unaddressable case %s" % case) |
---|
| 365 | else: |
---|
| 366 | parts.append(mod) |
---|
| 367 | else: |
---|
| 368 | parts.append(file) |
---|
| 369 | if call is not None: |
---|
| 370 | parts.append(call) |
---|
| 371 | return ':'.join(map(str, parts)) |
---|
| 372 | |
---|
| 373 | def nextBatch(self, test): |
---|
| 374 | # allows tests or suites to mark themselves as not safe |
---|
| 375 | # for multiprocess execution |
---|
| 376 | if hasattr(test, 'context'): |
---|
| 377 | if not getattr(test.context, '_multiprocess_', True): |
---|
| 378 | return |
---|
| 379 | |
---|
| 380 | if ((isinstance(test, ContextSuite) |
---|
| 381 | and test.hasFixtures(self.checkCanSplit)) |
---|
| 382 | or not getattr(test, 'can_split', True) |
---|
| 383 | or not isinstance(test, unittest.TestSuite)): |
---|
| 384 | # regular test case, or a suite with context fixtures |
---|
| 385 | |
---|
| 386 | # special case: when run like nosetests path/to/module.py |
---|
| 387 | # the top-level suite has only one item, and it shares |
---|
| 388 | # the same context as that item. In that case, we want the |
---|
| 389 | # item, not the top-level suite |
---|
| 390 | if isinstance(test, ContextSuite): |
---|
| 391 | contained = list(test) |
---|
| 392 | if (len(contained) == 1 |
---|
| 393 | and getattr(contained[0], 'context', None) == test.context): |
---|
| 394 | test = contained[0] |
---|
| 395 | yield test |
---|
| 396 | else: |
---|
| 397 | # Suite is without fixtures at this level; but it may have |
---|
| 398 | # fixtures at any deeper level, so we need to examine it all |
---|
| 399 | # the way down to the case level |
---|
| 400 | for case in test: |
---|
| 401 | for batch in self.nextBatch(case): |
---|
| 402 | yield batch |
---|
| 403 | |
---|
| 404 | def checkCanSplit(self, context, fixt): |
---|
| 405 | """ |
---|
| 406 | Callback that we use to check whether the fixtures found in a |
---|
| 407 | context or ancestor are ones we care about. |
---|
| 408 | |
---|
| 409 | Contexts can tell us that their fixtures are reentrant by setting |
---|
| 410 | _multiprocess_can_split_. So if we see that, we return False to |
---|
| 411 | disregard those fixtures. |
---|
| 412 | """ |
---|
| 413 | if not fixt: |
---|
| 414 | return False |
---|
| 415 | if getattr(context, '_multiprocess_can_split_', False): |
---|
| 416 | return False |
---|
| 417 | return True |
---|
| 418 | |
---|
| 419 | def sharedFixtures(self, case): |
---|
| 420 | context = getattr(case, 'context', None) |
---|
| 421 | if not context: |
---|
| 422 | return False |
---|
| 423 | return getattr(context, '_multiprocess_shared_', False) |
---|
| 424 | |
---|
| 425 | def consolidate(self, result, batch_result): |
---|
| 426 | log.debug("batch result is %s" , batch_result) |
---|
| 427 | try: |
---|
| 428 | output, testsRun, failures, errors, errorClasses = batch_result |
---|
| 429 | except ValueError: |
---|
| 430 | log.debug("result in unexpected format %s", batch_result) |
---|
| 431 | failure.Failure(*sys.exc_info())(result) |
---|
| 432 | return |
---|
| 433 | self.stream.write(output) |
---|
| 434 | result.testsRun += testsRun |
---|
| 435 | result.failures.extend(failures) |
---|
| 436 | result.errors.extend(errors) |
---|
| 437 | for key, (storage, label, isfail) in errorClasses.items(): |
---|
| 438 | if key not in result.errorClasses: |
---|
| 439 | # Ordinarily storage is result attribute |
---|
| 440 | # but it's only processed through the errorClasses |
---|
| 441 | # dict, so it's ok to fake it here |
---|
| 442 | result.errorClasses[key] = ([], label, isfail) |
---|
| 443 | mystorage, _junk, _junk = result.errorClasses[key] |
---|
| 444 | mystorage.extend(storage) |
---|
| 445 | log.debug("Ran %s tests (%s)", testsRun, result.testsRun) |
---|
| 446 | |
---|
| 447 | |
---|
| 448 | def runner(ix, testQueue, resultQueue, shouldStop, |
---|
| 449 | loaderClass, resultClass, config): |
---|
| 450 | log.debug("Worker %s executing", ix) |
---|
| 451 | loader = loaderClass(config=config) |
---|
| 452 | loader.suiteClass.suiteClass = NoSharedFixtureContextSuite |
---|
| 453 | |
---|
| 454 | def get(): |
---|
| 455 | case = testQueue.get(timeout=config.multiprocess_timeout) |
---|
| 456 | return case |
---|
| 457 | |
---|
| 458 | def makeResult(): |
---|
| 459 | stream = unittest._WritelnDecorator(StringIO()) |
---|
| 460 | result = resultClass(stream, descriptions=1, |
---|
| 461 | verbosity=config.verbosity, |
---|
| 462 | config=config) |
---|
| 463 | plug_result = config.plugins.prepareTestResult(result) |
---|
| 464 | if plug_result: |
---|
| 465 | return plug_result |
---|
| 466 | return result |
---|
| 467 | |
---|
| 468 | def batch(result): |
---|
| 469 | failures = [(TestLet(c), err) for c, err in result.failures] |
---|
| 470 | errors = [(TestLet(c), err) for c, err in result.errors] |
---|
| 471 | errorClasses = {} |
---|
| 472 | for key, (storage, label, isfail) in result.errorClasses.items(): |
---|
| 473 | errorClasses[key] = ([(TestLet(c), err) for c, err in storage], |
---|
| 474 | label, isfail) |
---|
| 475 | return ( |
---|
| 476 | result.stream.getvalue(), |
---|
| 477 | result.testsRun, |
---|
| 478 | failures, |
---|
| 479 | errors, |
---|
| 480 | errorClasses) |
---|
| 481 | try: |
---|
| 482 | try: |
---|
| 483 | for test_addr in iter(get, 'STOP'): |
---|
| 484 | if shouldStop.is_set(): |
---|
| 485 | break |
---|
| 486 | result = makeResult() |
---|
| 487 | test = loader.loadTestsFromNames([test_addr]) |
---|
| 488 | log.debug("Worker %s Test is %s (%s)", ix, test_addr, test) |
---|
| 489 | |
---|
| 490 | try: |
---|
| 491 | test(result) |
---|
| 492 | resultQueue.put((test_addr, batch(result))) |
---|
| 493 | except KeyboardInterrupt, SystemExit: |
---|
| 494 | raise |
---|
| 495 | except: |
---|
| 496 | log.exception("Error running test or returning results") |
---|
| 497 | failure.Failure(*sys.exc_info())(result) |
---|
| 498 | resultQueue.put((test_addr, batch(result))) |
---|
| 499 | except Empty: |
---|
| 500 | log.debug("Worker %s timed out waiting for tasks", ix) |
---|
| 501 | finally: |
---|
| 502 | testQueue.close() |
---|
| 503 | resultQueue.close() |
---|
| 504 | log.debug("Worker %s ending", ix) |
---|
| 505 | |
---|
| 506 | |
---|
| 507 | class NoSharedFixtureContextSuite(ContextSuite): |
---|
| 508 | """ |
---|
| 509 | Context suite that never fires shared fixtures. |
---|
| 510 | |
---|
| 511 | When a context sets _multiprocess_shared_, fixtures in that context |
---|
| 512 | are executed by the main process. Using this suite class prevents them |
---|
| 513 | from executing in the runner process as well. |
---|
| 514 | |
---|
| 515 | """ |
---|
| 516 | |
---|
| 517 | def setupContext(self, context): |
---|
| 518 | if getattr(context, '_multiprocess_shared_', False): |
---|
| 519 | return |
---|
| 520 | super(NoSharedFixtureContextSuite, self).setupContext(context) |
---|
| 521 | |
---|
| 522 | def teardownContext(self, context): |
---|
| 523 | if getattr(context, '_multiprocess_shared_', False): |
---|
| 524 | return |
---|
| 525 | super(NoSharedFixtureContextSuite, self).teardownContext(context) |
---|