llvm.org GIT mirror llvm / 7c17201
Revert "Enable ThreadPool to queue tasks that return values." This is failing to compile when LLVM_ENABLE_THREADS is false, and the fix is not immediately obvious, so reverting while I look into it. git-svn-id: https://llvm.org/svn/llvm-project/llvm/trunk@334658 91177308-0d34-0410-b5e6-96231b3b80d8 Zachary Turner 1 year, 4 months ago
3 changed file(s) with 26 addition(s) and 65 deletion(s). Raw diff Collapse all Expand all
1313 #ifndef LLVM_SUPPORT_THREAD_POOL_H
1414 #define LLVM_SUPPORT_THREAD_POOL_H
1515
16 #include "llvm/ADT/STLExtras.h"
1716 #include "llvm/Config/llvm-config.h"
1817 #include "llvm/Support/thread.h"
1918
2019 #include
2120
2221 #include
23 #include
2422 #include
2523 #include
2624 #include
3634 /// The pool keeps a vector of threads alive, waiting on a condition variable
3735 /// for some work to become available.
3836 class ThreadPool {
39 struct TaskBase {
40 virtual ~TaskBase() {}
41 virtual void execute() = 0;
42 };
37 public:
38 using TaskTy = std::function;
39 using PackagedTaskTy = std::packaged_task;
4340
44 template struct TypedTask : public TaskBase {
45 explicit TypedTask(std::packaged_task Task)
46 : Task(std::move(Task)) {}
47
48 void execute() override { Task(); }
49
50 std::packaged_task Task;
51 };
52
53 public:
5441 /// Construct a pool with the number of threads found by
5542 /// hardware_concurrency().
5643 ThreadPool();
6451 /// Asynchronous submission of a task to the pool. The returned future can be
6552 /// used to wait for the task to finish and is *non-blocking* on destruction.
6653 template
67 inline std::shared_future::type>
68 async(Function &&F, Args &&... ArgList) {
54 inline std::shared_future async(Function &&F, Args &&... ArgList) {
6955 auto Task =
7056 std::bind(std::forward(F), std::forward(ArgList)...);
7157 return asyncImpl(std::move(Task));
7460 /// Asynchronous submission of a task to the pool. The returned future can be
7561 /// used to wait for the task to finish and is *non-blocking* on destruction.
7662 template
77 inline std::shared_future::type>
78 async(Function &&F) {
63 inline std::shared_future async(Function &&F) {
7964 return asyncImpl(std::forward(F));
8065 }
8166
8671 private:
8772 /// Asynchronous submission of a task to the pool. The returned future can be
8873 /// used to wait for the task to finish and is *non-blocking* on destruction.
89 template
90 std::shared_future::type>
91 asyncImpl(TaskTy &&Task) {
92 typedef decltype(Task()) ResultTy;
93
94 /// Wrap the Task in a packaged_task to return a future object.
95 std::packaged_task PackagedTask(std::move(Task));
96 auto Future = PackagedTask.get_future();
97 std::unique_ptr TB =
98 llvm::make_unique>(std::move(PackagedTask));
99
100 {
101 // Lock the queue and push the new task
102 std::unique_lock LockGuard(QueueLock);
103
104 // Don't allow enqueueing after disabling the pool
105 assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
106
107 Tasks.push(std::move(TB));
108 }
109 QueueCondition.notify_one();
110 return Future.share();
111 }
74 std::shared_future asyncImpl(TaskTy F);
11275
11376 /// Threads in flight
11477 std::vector Threads;
11578
11679 /// Tasks waiting for execution in the pool.
117 std::queue<std::unique_ptr> Tasks;
80 std::queue<PackagedTaskTy> Tasks;
11881
11982 /// Locking and signaling for accessing the Tasks queue.
12083 std::mutex QueueLock;
3131 for (unsigned ThreadID = 0; ThreadID < ThreadCount; ++ThreadID) {
3232 Threads.emplace_back([&] {
3333 while (true) {
34 std::unique_ptr Task;
34 PackagedTaskTy Task;
3535 {
3636 std::unique_lock LockGuard(QueueLock);
3737 // Wait for tasks to be pushed in the queue
5353 Tasks.pop();
5454 }
5555 // Run the task we just grabbed
56 Task->execute();
56 Task();
5757
5858 {
5959 // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
7676 // race.
7777 CompletionCondition.wait(LockGuard,
7878 [&] { return !ActiveThreads && Tasks.empty(); });
79 }
80
81 std::shared_future ThreadPool::asyncImpl(TaskTy Task) {
82 /// Wrap the Task in a packaged_task to return a future object.
83 PackagedTaskTy PackagedTask(std::move(Task));
84 auto Future = PackagedTask.get_future();
85 {
86 // Lock the queue and push the new task
87 std::unique_lock LockGuard(QueueLock);
88
89 // Don't allow enqueueing after disabling the pool
90 assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
91
92 Tasks.push(std::move(PackagedTask));
93 }
94 QueueCondition.notify_one();
95 return Future.share();
7996 }
8097
8198 // The destructor joins all threads, waiting for completion.
146146 ASSERT_EQ(2, i.load());
147147 }
148148
149 TEST_F(ThreadPoolTest, TaskWithResult) {
150 CHECK_UNSUPPORTED();
151 // By making only 1 thread in the pool the two tasks are serialized with
152 // respect to each other, which means that the second one must return 2.
153 ThreadPool Pool{1};
154 std::atomic_int i{0};
155 Pool.async([this, &i] {
156 waitForMainThread();
157 ++i;
158 });
159 // Force the future using get()
160 std::shared_future Future = Pool.async([&i] { return ++i; });
161 ASSERT_EQ(0, i.load());
162 setMainThreadReady();
163 int Result = Future.get();
164 ASSERT_EQ(2, i.load());
165 ASSERT_EQ(2, Result);
166 }
167
168149 TEST_F(ThreadPoolTest, PoolDestruction) {
169150 CHECK_UNSUPPORTED();
170151 // Test that we are waiting on destruction