llvm.org GIT mirror llvm / 919d78c
Always use the multiprocess module. This seems to work on freebsd and openbsd these days. git-svn-id: https://llvm.org/svn/llvm-project/llvm/trunk@303280 91177308-0d34-0410-b5e6-96231b3b80d8 Rafael Espindola 3 years ago
2 changed file(s) with 3 addition(s) and 115 deletion(s). Raw diff Collapse all Expand all
281281 debug_group.add_argument("--show-tests", dest="showTests",
282282 help="Show all discovered tests",
283283 action="store_true", default=False)
284 debug_group.add_argument("--use-process-pool", dest="executionStrategy",
285 help="Run tests in parallel with a process pool",
286 action="store_const", const="PROCESS_POOL")
287 debug_group.add_argument("--use-processes", dest="executionStrategy",
288 help="Run tests in parallel with processes (not threads)",
289 action="store_const", const="PROCESSES")
290 debug_group.add_argument("--use-threads", dest="executionStrategy",
291 help="Run tests in parallel with threads (not processes)",
292 action="store_const", const="THREADS")
293284
294285 opts = parser.parse_args()
295286 args = opts.test_paths
303294
304295 if opts.numThreads is None:
305296 opts.numThreads = lit.util.detectCPUs()
306
307 if opts.executionStrategy is None:
308 opts.executionStrategy = 'PROCESS_POOL'
309297
310298 if opts.maxFailures == 0:
311299 parser.error("Setting --max-failures to 0 does not have any effect.")
489477 startTime = time.time()
490478 display = TestingProgressDisplay(opts, len(run.tests), progressBar)
491479 try:
492 run.execute_tests(display, opts.numThreads, opts.maxTime,
493 opts.executionStrategy)
480 run.execute_tests(display, opts.numThreads, opts.maxTime)
494481 except KeyboardInterrupt:
495482 sys.exit(2)
496483 display.finish()
1212 except ImportError:
1313 win32api = None
1414
15 try:
16 import multiprocessing
17 except ImportError:
18 multiprocessing = None
19
15 import multiprocessing
2016 import lit.Test
2117
2218 def abort_now():
226222 def execute_test(self, test):
227223 return execute_test(test, self.lit_config, self.parallelism_semaphores)
228224
229 def execute_tests(self, display, jobs, max_time=None,
230 execution_strategy=None):
225 def execute_tests(self, display, jobs, max_time=None):
231226 """
232227 execute_tests(display, jobs, [max_time])
233228
248243 computed. Tests which were not actually executed (for any reason) will
249244 be given an UNRESOLVED result.
250245 """
251
252 if execution_strategy == 'PROCESS_POOL':
253 self.execute_tests_with_mp_pool(display, jobs, max_time)
254 return
255 # FIXME: Standardize on the PROCESS_POOL execution strategy and remove
256 # the other two strategies.
257
258 use_processes = execution_strategy == 'PROCESSES'
259
260 # Choose the appropriate parallel execution implementation.
261 consumer = None
262 if jobs != 1 and use_processes and multiprocessing:
263 try:
264 task_impl = multiprocessing.Process
265 queue_impl = multiprocessing.Queue
266 sem_impl = multiprocessing.Semaphore
267 canceled_flag = multiprocessing.Value('i', 0)
268 consumer = MultiprocessResultsConsumer(self, display, jobs)
269 except:
270 # multiprocessing fails to initialize with certain OpenBSD and
271 # FreeBSD Python versions: http://bugs.python.org/issue3770
272 # Unfortunately the error raised also varies by platform.
273 self.lit_config.note('failed to initialize multiprocessing')
274 consumer = None
275 if not consumer:
276 task_impl = threading.Thread
277 queue_impl = queue.Queue
278 sem_impl = threading.Semaphore
279 canceled_flag = LockedValue(0)
280 consumer = ThreadResultsConsumer(display)
281
282 self.parallelism_semaphores = {k: sem_impl(v)
283 for k, v in self.lit_config.parallelism_groups.items()}
284
285 # Create the test provider.
286 provider = TestProvider(queue_impl, canceled_flag)
287 handleFailures(provider, consumer, self.lit_config.maxFailures)
288
289 # Putting tasks into the threading or multiprocessing Queue may block,
290 # so do it in a separate thread.
291 # https://docs.python.org/2/library/multiprocessing.html
292 # e.g: On Mac OS X, we will hang if we put 2^15 elements in the queue
293 # without taking any out.
294 queuer = task_impl(target=provider.queue_tests, args=(self.tests, jobs))
295 queuer.start()
296
297 # Install a console-control signal handler on Windows.
298 if win32api is not None:
299 def console_ctrl_handler(type):
300 provider.cancel()
301 return True
302 win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
303
304 # Install a timeout handler, if requested.
305 if max_time is not None:
306 def timeout_handler():
307 provider.cancel()
308 timeout_timer = threading.Timer(max_time, timeout_handler)
309 timeout_timer.start()
310
311 # If not using multiple tasks, just run the tests directly.
312 if jobs == 1:
313 run_one_tester(self, provider, consumer)
314 else:
315 # Otherwise, execute the tests in parallel
316 self._execute_tests_in_parallel(task_impl, provider, consumer, jobs)
317
318 queuer.join()
319
320 # Cancel the timeout handler.
321 if max_time is not None:
322 timeout_timer.cancel()
323
324 # Update results for any tests which weren't run.
325 for test in self.tests:
326 if test.result is None:
327 test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
328
329 def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs):
330 # Start all of the tasks.
331 tasks = [task_impl(target=run_one_tester,
332 args=(self, provider, consumer))
333 for i in range(jobs)]
334 for t in tasks:
335 t.start()
336
337 # Allow the consumer to handle results, if necessary.
338 consumer.handle_results()
339
340 # Wait for all the tasks to complete.
341 for t in tasks:
342 t.join()
343
344 def execute_tests_with_mp_pool(self, display, jobs, max_time=None):
345246 # Don't do anything if we aren't going to run any tests.
346247 if not self.tests or jobs == 0:
347248 return