Skip to content

Commit 8f6df02

Browse files
committed
PTP: BlockingDrain for Threadpool and TaskQueue
This completes the NodePlatform rewiring begun in a previous commit. This BlockingDrain will wait on both V8 Tasks and libuv Tasks. It waits on all Tasks in the Threadpool, even though NodePlatform only cares about BlockingDrain'ing the V8 Tasks.
1 parent 3a2bbb6 commit 8f6df02

File tree

3 files changed

+54
-12
lines changed

3 files changed

+54
-12
lines changed

src/node_platform.cc

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,10 +175,7 @@ void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<v8::Task> task,
175175
}
176176

177177
void WorkerThreadsTaskRunner::BlockingDrain() {
178-
// TODO(davisjam): No support for this in threadpool::Threadpool
179-
// at the moment.
180-
// I believe this is the cause of the segfaults at the end of running 'node'.
181-
// pending_worker_tasks_.BlockingDrain();
178+
tp_->BlockingDrain();
182179
}
183180

184181
void WorkerThreadsTaskRunner::Shutdown() {

src/node_threadpool.cc

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "env-inl.h"
55
#include "debug_utils.h"
66
#include "util.h"
7+
78
#include <algorithm>
89

910
// TODO(davisjam): DO NOT MERGE. Only for debugging.
@@ -42,6 +43,8 @@ void Worker::_Run(void* data) {
4243
task->UpdateState(Task::ASSIGNED);
4344
task->Run();
4445
task->UpdateState(Task::COMPLETED);
46+
47+
queue->NotifyOfCompletion();
4548
}
4649
}
4750

@@ -139,7 +142,9 @@ void LibuvTask::Run() {
139142
***************/
140143

141144
TaskQueue::TaskQueue()
142-
: queue_(), stopped_(false), lock_(), tasks_available_() {
145+
: queue_(), outstanding_tasks_(0), stopped_(false)
146+
, lock_()
147+
, task_available_(), tasks_drained_() {
143148
}
144149

145150
bool TaskQueue::Push(std::unique_ptr<Task> task) {
@@ -151,7 +156,8 @@ bool TaskQueue::Push(std::unique_ptr<Task> task) {
151156

152157
task->UpdateState(Task::QUEUED);
153158
queue_.push(std::move(task));
154-
tasks_available_.Signal(scoped_lock);
159+
outstanding_tasks_++;
160+
task_available_.Signal(scoped_lock);
155161

156162
return true;
157163
}
@@ -172,7 +178,7 @@ std::unique_ptr<Task> TaskQueue::BlockingPop(void) {
172178
Mutex::ScopedLock scoped_lock(lock_);
173179

174180
while (queue_.empty() && !stopped_) {
175-
tasks_available_.Wait(scoped_lock);
181+
task_available_.Wait(scoped_lock);
176182
}
177183

178184
if (queue_.empty()) {
@@ -184,10 +190,27 @@ std::unique_ptr<Task> TaskQueue::BlockingPop(void) {
184190
return result;
185191
}
186192

193+
void TaskQueue::NotifyOfCompletion(void) {
194+
Mutex::ScopedLock scoped_lock(lock_);
195+
outstanding_tasks_--;
196+
CHECK_GE(outstanding_tasks_, 0);
197+
if (!outstanding_tasks_) {
198+
tasks_drained_.Broadcast(scoped_lock);
199+
}
200+
}
201+
202+
void TaskQueue::BlockingDrain(void) {
203+
Mutex::ScopedLock scoped_lock(lock_);
204+
while (outstanding_tasks_) {
205+
tasks_drained_.Wait(scoped_lock);
206+
}
207+
LOG("TaskQueue::BlockingDrain: Fully drained\n");
208+
}
209+
187210
void TaskQueue::Stop(void) {
188211
Mutex::ScopedLock scoped_lock(lock_);
189212
stopped_ = true;
190-
tasks_available_.Broadcast(scoped_lock);
213+
task_available_.Broadcast(scoped_lock);
191214
}
192215

193216
int TaskQueue::Length(void) const {
@@ -234,5 +257,13 @@ int Threadpool::QueueLength(void) const {
234257
return queue_.Length();
235258
}
236259

260+
void Threadpool::BlockingDrain(void) {
261+
queue_.BlockingDrain();
262+
}
263+
264+
int Threadpool::NWorkers(void) const {
265+
return workers_.size();
266+
}
267+
237268
} // namespace threadpool
238269
} // namespace node

src/node_threadpool.h

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,18 @@ class TaskQueue {
146146

147147
// Return true if Push succeeds, else false.
148148
bool Push(std::unique_ptr<Task> task);
149-
std::unique_ptr<Task> Pop(void);
150149

151-
// Returns nullptr when we're done.
150+
// Non-blocking Pop. Returns nullptr if queue is empty.
151+
std::unique_ptr<Task> Pop(void);
152+
// Blocking Pop. Returns nullptr if queue is empty or Stop'd.
152153
std::unique_ptr<Task> BlockingPop(void);
153154

155+
// Workers should call this after completing a Task.
156+
void NotifyOfCompletion(void);
157+
158+
// Block until there are no Tasks pending or scheduled.
159+
void BlockingDrain(void);
160+
154161
// Subsequent Push() will fail.
155162
// Pop calls will return nullptr once queue is drained.
156163
void Stop();
@@ -160,11 +167,15 @@ class TaskQueue {
160167
private:
161168
// Structures.
162169
std::queue<std::unique_ptr<Task>> queue_;
170+
int outstanding_tasks_; // Number of Tasks in non-COMPLETED states.
163171
bool stopped_;
164172

165173
// Synchronization.
166174
Mutex lock_;
167-
ConditionVariable tasks_available_;
175+
// Signal'd when there is at least one task in the queue.
176+
ConditionVariable task_available_;
177+
// Signal'd when all Push'd Tasks are in COMPLETED state.
178+
ConditionVariable tasks_drained_;
168179
};
169180

170181
// A threadpool works on asynchronous Tasks.
@@ -187,7 +198,10 @@ class Threadpool {
187198

188199
void Post(std::unique_ptr<Task> task);
189200
int QueueLength(void) const;
190-
int NWorkers(void) const { return workers_.size(); }
201+
// Block until there are no tasks pending or scheduled in the TP.
202+
void BlockingDrain(void);
203+
204+
int NWorkers(void) const;
191205

192206
private:
193207
TaskQueue queue_;

0 commit comments

Comments
 (0)