llvm.org GIT mirror llvm / 4090b47
Enable ThreadPool to support tasks that return values. Previously ThreadPool could only queue async "jobs", i.e. work that was done for its side effects and not for its result. It's useful occasionally to queue async work that returns a value. From an API perspective, this is very intuitive. The previous API just returned a shared_future<void>, so all we need to do is make it return a shared_future<T>, where T is the type of value that the operation returns. Making this work required a little magic, but ultimately it's not too bad. Instead of keeping a shared queue<packaged_task<void()>> we just keep a shared queue<unique_ptr<TaskBase>>, where TaskBase is a class with a pure virtual execute() method, then have a templated derived class that stores a packaged_task<T()>. Everything else works out pretty cleanly. Differential Revision: https://reviews.llvm.org/D48115 git-svn-id: https://llvm.org/svn/llvm-project/llvm/trunk@334643 91177308-0d34-0410-b5e6-96231b3b80d8 Zachary Turner 1 year, 4 months ago
3 changed file(s) with 64 addition(s) and 26 deletion(s). Raw diff Collapse all Expand all
1919 #include
2020
2121 #include
22 #include
2223 #include
2324 #include
2425 #include
3435 /// The pool keeps a vector of threads alive, waiting on a condition variable
3536 /// for some work to become available.
3637 class ThreadPool {
38 struct TaskBase {
39 virtual ~TaskBase() {}
40 virtual void execute() = 0;
41 };
42
43 template struct TypedTask : public TaskBase {
44 explicit TypedTask(std::packaged_task Task)
45 : Task(std::move(Task)) {}
46
47 void execute() override { Task(); }
48
49 std::packaged_task Task;
50 };
51
3752 public:
38 using TaskTy = std::function;
39 using PackagedTaskTy = std::packaged_task;
40
4153 /// Construct a pool with the number of threads found by
4254 /// hardware_concurrency().
4355 ThreadPool();
5163 /// Asynchronous submission of a task to the pool. The returned future can be
5264 /// used to wait for the task to finish and is *non-blocking* on destruction.
5365 template
54 inline std::shared_future<void> async(Function &&F, Args &&... ArgList) {
66 inline std::shared_future<typename std::result_of::type>
67 async(Function &&F, Args &&... ArgList) {
5568 auto Task =
5669 std::bind(std::forward(F), std::forward(ArgList)...);
5770 return asyncImpl(std::move(Task));
6073 /// Asynchronous submission of a task to the pool. The returned future can be
6174 /// used to wait for the task to finish and is *non-blocking* on destruction.
6275 template
63 inline std::shared_future<void> async(Function &&F) {
76 inline std::shared_future<typename std::result_of::type>
77 async(Function &&F) {
6478 return asyncImpl(std::forward(F));
6579 }
6680
7185 private:
7286 /// Asynchronous submission of a task to the pool. The returned future can be
7387 /// used to wait for the task to finish and is *non-blocking* on destruction.
74 std::shared_future asyncImpl(TaskTy F);
88 template
89 std::shared_future::type>
90 asyncImpl(TaskTy &&Task) {
91 typedef decltype(Task()) ResultTy;
92
93 /// Wrap the Task in a packaged_task to return a future object.
94 std::packaged_task PackagedTask(std::move(Task));
95 auto Future = PackagedTask.get_future();
96 std::unique_ptr TB =
97 llvm::make_unique>(std::move(PackagedTask));
98
99 {
100 // Lock the queue and push the new task
101 std::unique_lock LockGuard(QueueLock);
102
103 // Don't allow enqueueing after disabling the pool
104 assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
105
106 Tasks.push(std::move(TB));
107 }
108 QueueCondition.notify_one();
109 return Future.share();
110 }
75111
76112 /// Threads in flight
77113 std::vector Threads;
78114
79115 /// Tasks waiting for execution in the pool.
80 std::queue<PackagedTaskTy> Tasks;
116 std::queue<std::unique_ptr> Tasks;
81117
82118 /// Locking and signaling for accessing the Tasks queue.
83119 std::mutex QueueLock;
3131 for (unsigned ThreadID = 0; ThreadID < ThreadCount; ++ThreadID) {
3232 Threads.emplace_back([&] {
3333 while (true) {
34 PackagedTaskTy Task;
34 std::unique_ptr Task;
3535 {
3636 std::unique_lock LockGuard(QueueLock);
3737 // Wait for tasks to be pushed in the queue
5353 Tasks.pop();
5454 }
5555 // Run the task we just grabbed
56 Task();
56 Task->execute();
5757
5858 {
5959 // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
7676 // race.
7777 CompletionCondition.wait(LockGuard,
7878 [&] { return !ActiveThreads && Tasks.empty(); });
79 }
80
81 std::shared_future ThreadPool::asyncImpl(TaskTy Task) {
82 /// Wrap the Task in a packaged_task to return a future object.
83 PackagedTaskTy PackagedTask(std::move(Task));
84 auto Future = PackagedTask.get_future();
85 {
86 // Lock the queue and push the new task
87 std::unique_lock LockGuard(QueueLock);
88
89 // Don't allow enqueueing after disabling the pool
90 assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
91
92 Tasks.push(std::move(PackagedTask));
93 }
94 QueueCondition.notify_one();
95 return Future.share();
9679 }
9780
9881 // The destructor joins all threads, waiting for completion.
146146 ASSERT_EQ(2, i.load());
147147 }
148148
149 TEST_F(ThreadPoolTest, TaskWithResult) {
150 CHECK_UNSUPPORTED();
151 // By making only 1 thread in the pool the two tasks are serialized with
152 // respect to each other, which means that the second one must return 2.
153 ThreadPool Pool{1};
154 std::atomic_int i{0};
155 Pool.async([this, &i] {
156 waitForMainThread();
157 ++i;
158 });
159 // Force the future using get()
160 std::shared_future Future = Pool.async([&i] { return ++i; });
161 ASSERT_EQ(0, i.load());
162 setMainThreadReady();
163 int Result = Future.get();
164 ASSERT_EQ(2, i.load());
165 ASSERT_EQ(2, Result);
166 }
167
149168 TEST_F(ThreadPoolTest, PoolDestruction) {
150169 CHECK_UNSUPPORTED();
151170 // Test that we are waiting on destruction