llvm.org GIT mirror llvm / 0631be2
[lit][NFC] Cleanup lit worker process handling Move code that is executed on worker process to separate file. This makes the use of the pickled arguments stored in global variables in the worker a bit clearer. (Still not pretty though.) Extract handling of parallelism groups to it's own function. Use BoundedSemaphore instead of Semaphore. BoundedSemaphore raises for unmatched release() calls. Cleanup imports. Differential Revision: https://reviews.llvm.org/D58196 git-svn-id: https://llvm.org/svn/llvm-project/llvm/trunk@354187 91177308-0d34-0410-b5e6-96231b3b80d8 Julian Lettner 6 months ago
3 changed file(s) with 109 addition(s) and 106 deletion(s). Raw diff Collapse all Expand all
None import os
1 import sys
2 import threading
0 import multiprocessing
31 import time
4 import traceback
5 try:
6 import Queue as queue
7 except ImportError:
8 import queue
92
10 try:
11 import win32api
12 except ImportError:
13 win32api = None
14
15 import multiprocessing
163 import lit.Test
17
18 def abort_now():
19 """Abort the current process without doing any exception teardown"""
20 sys.stdout.flush()
21 if win32api:
22 win32api.TerminateProcess(win32api.GetCurrentProcess(), 3)
23 else:
24 os.kill(0, 9)
4 import lit.util
5 import lit.worker
256
267 class _Display(object):
278 def __init__(self, display, provider, maxFailures):
4728 # For example, some ASan tests require lots of virtual memory and run
4829 # faster with less parallelism on OS X.
4930 self.parallelism_semaphores = \
50 {k: multiprocessing.Semaphore(v) for k, v in
31 {k: multiprocessing.BoundedSemaphore(v) for k, v in
5132 self.lit_config.parallelism_groups.items()}
5233
5334 def execute_test(self, test):
54 return _execute_test_impl(test, self.lit_config,
55 self.parallelism_semaphores)
35 return lit.worker._execute_test(test, self.lit_config)
5636
5737 def execute_tests_in_pool(self, jobs, max_time):
5838 # We need to issue many wait calls, so compute the final deadline and
6646 # interrupts the workers before we make it into our task callback, they
6747 # will each raise a KeyboardInterrupt exception and print to stderr at
6848 # the same time.
69 pool = multiprocessing.Pool(jobs, worker_initializer,
49 pool = multiprocessing.Pool(jobs, lit.worker.initializer,
7050 (self.lit_config,
7151 self.parallelism_semaphores))
7252
7353 # Install a console-control signal handler on Windows.
74 if win32api is not None:
54 if lit.util.win32api is not None:
7555 def console_ctrl_handler(type):
7656 print('\nCtrl-C detected, terminating.')
7757 pool.terminate()
7858 pool.join()
79 abort_now()
59 lit.util.abort_now()
8060 return True
81 win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
61 lit.util.win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
8262
8363 try:
84 async_results = [pool.apply_async(worker_run_one_test,
64 async_results = [pool.apply_async(lit.worker.run_one_test,
8565 args=(test_index, test),
8666 callback=self.consume_test_result)
8767 for test_index, test in enumerate(self.tests)]
142122 self.failure_count = 0
143123 self.hit_max_failures = False
144124 if jobs == 1:
145 global child_lit_config
146 child_lit_config = self.lit_config
147125 for test_index, test in enumerate(self.tests):
148 result = worker_run_one_test(test_index, test)
149 self.consume_test_result(result)
126 lit.worker._execute_test(test, self.lit_config)
127 self.consume_test_result((test_index, test))
150128 if self.hit_max_failures:
151129 break
152130 else:
158136 test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
159137
160138 def consume_test_result(self, pool_result):
161 """Test completion callback for worker_run_one_test
139 """Test completion callback for lit.worker.run_one_test
162140
163141 Updates the test result status in the parent process. Each task in the
164142 pool returns the test index and the result, and we use the index to look
185163 if self.lit_config.maxFailures and \
186164 self.failure_count == self.lit_config.maxFailures:
187165 self.hit_max_failures = True
188
189 def _execute_test_impl(test, lit_config, parallelism_semaphores):
190 """Execute one test"""
191 pg = test.config.parallelism_group
192 if callable(pg):
193 pg = pg(test)
194
195 result = None
196 semaphore = None
197 try:
198 if pg:
199 semaphore = parallelism_semaphores[pg]
200 if semaphore:
201 semaphore.acquire()
202 start_time = time.time()
203 result = test.config.test_format.execute(test, lit_config)
204 # Support deprecated result from execute() which returned the result
205 # code and additional output as a tuple.
206 if isinstance(result, tuple):
207 code, output = result
208 result = lit.Test.Result(code, output)
209 elif not isinstance(result, lit.Test.Result):
210 raise ValueError("unexpected result from test execution")
211 result.elapsed = time.time() - start_time
212 except KeyboardInterrupt:
213 raise
214 except:
215 if lit_config.debug:
216 raise
217 output = 'Exception during script execution:\n'
218 output += traceback.format_exc()
219 output += '\n'
220 result = lit.Test.Result(lit.Test.UNRESOLVED, output)
221 finally:
222 if semaphore:
223 semaphore.release()
224
225 test.setResult(result)
226
227 child_lit_config = None
228 child_parallelism_semaphores = None
229
230 def worker_initializer(lit_config, parallelism_semaphores):
231 """Copy expensive repeated data into worker processes"""
232 global child_lit_config
233 child_lit_config = lit_config
234 global child_parallelism_semaphores
235 child_parallelism_semaphores = parallelism_semaphores
236
237 def worker_run_one_test(test_index, test):
238 """Run one test in a multiprocessing.Pool
239
240 Side effects in this function and functions it calls are not visible in the
241 main lit process.
242
243 Arguments and results of this function are pickled, so they should be cheap
244 to copy. For efficiency, we copy all data needed to execute all tests into
245 each worker and store it in the child_* global variables. This reduces the
246 cost of each task.
247
248 Returns an index and a Result, which the parent process uses to update
249 the display.
250 """
251 try:
252 _execute_test_impl(test, child_lit_config, child_parallelism_semaphores)
253 return (test_index, test)
254 except KeyboardInterrupt as e:
255 # If a worker process gets an interrupt, abort it immediately.
256 abort_now()
257 except:
258 traceback.print_exc()
423423 psutilProc.kill()
424424 except psutil.NoSuchProcess:
425425 pass
426
427
428 try:
429 import win32api
430 except ImportError:
431 win32api = None
432
433 def abort_now():
434 """Abort the current process without doing any exception teardown"""
435 sys.stdout.flush()
436 if win32api:
437 win32api.TerminateProcess(win32api.GetCurrentProcess(), 3)
438 else:
439 os.kill(0, 9)
0 # The functions in this module are meant to run on a separate worker process.
1 # Exception: in single process mode _execute_test is called directly.
2 import time
3 import traceback
4
5 import lit.Test
6 import lit.util
7
8 _lit_config = None
9 _parallelism_semaphores = None
10
11 def initializer(lit_config, parallelism_semaphores):
12 """Copy expensive repeated data into worker processes"""
13 global _lit_config
14 global _parallelism_semaphores
15 _lit_config = lit_config
16 _parallelism_semaphores = parallelism_semaphores
17
18 def run_one_test(test_index, test):
19 """Run one test in a multiprocessing.Pool
20
21 Side effects in this function and functions it calls are not visible in the
22 main lit process.
23
24 Arguments and results of this function are pickled, so they should be cheap
25 to copy. For efficiency, we copy all data needed to execute all tests into
26 each worker and store it in the worker_* global variables. This reduces the
27 cost of each task.
28
29 Returns an index and a Result, which the parent process uses to update
30 the display.
31 """
32 try:
33 _execute_test_in_parallelism_group(test, _lit_config,
34 _parallelism_semaphores)
35 return (test_index, test)
36 except KeyboardInterrupt:
37 # If a worker process gets an interrupt, abort it immediately.
38 lit.util.abort_now()
39 except:
40 traceback.print_exc()
41
42 def _execute_test_in_parallelism_group(test, lit_config, parallelism_semaphores):
43 """Execute one test inside the appropriate parallelism group"""
44 pg = test.config.parallelism_group
45 if callable(pg):
46 pg = pg(test)
47
48 if pg:
49 semaphore = parallelism_semaphores[pg]
50 try:
51 semaphore.acquire()
52 _execute_test(test, lit_config)
53 finally:
54 semaphore.release()
55 else:
56 _execute_test(test, lit_config)
57
58 def _execute_test(test, lit_config):
59 """Execute one test"""
60 try:
61 start_time = time.time()
62 result = test.config.test_format.execute(test, lit_config)
63 # Support deprecated result from execute() which returned the result
64 # code and additional output as a tuple.
65 if isinstance(result, tuple):
66 code, output = result
67 result = lit.Test.Result(code, output)
68 elif not isinstance(result, lit.Test.Result):
69 raise ValueError("unexpected result from test execution")
70 result.elapsed = time.time() - start_time
71 except KeyboardInterrupt:
72 raise
73 except:
74 if lit_config.debug:
75 raise
76 output = 'Exception during script execution:\n'
77 output += traceback.format_exc()
78 output += '\n'
79 result = lit.Test.Result(lit.Test.UNRESOLVED, output)
80
81 test.setResult(result)