19#include "absl/algorithm/container.h"
20#include "absl/base/nullability.h"
21#include "absl/base/optimization.h"
22#include "absl/base/thread_annotations.h"
23#include "absl/functional/any_invocable.h"
24#include "absl/log/check.h"
25#include "absl/strings/string_view.h"
26#include "absl/synchronization/mutex.h"
34 : max_threads_(num_threads == 0 ? 1 : num_threads) {
35 CHECK_GT(max_threads_, 0u);
37 absl::MutexLock lock(mutex_);
48 absl::MutexLock l(mutex_);
50 for (Waiter* absl_nonnull waiter : waiters_) {
55 auto queue_empty = [
this]() ABSL_SHARED_LOCKS_REQUIRED(mutex_) {
56 return queue_.empty();
58 mutex_.Await(absl::Condition(&queue_empty));
62 for (
auto& worker : threads_) {
67void ThreadPool::SpawnThread() {
68 CHECK_LE(threads_.size(), max_threads_);
69 threads_.emplace_back([
this] { RunWorker(); });
72void ThreadPool::RunWorker() {
74 absl::MutexLock lock(mutex_);
78 std::optional<absl::AnyInvocable<void() &&>> item = DequeueWork();
79 if (!item.has_value()) {
87void ThreadPool::SignalWaiter() {
88 DCHECK(!queue_.empty());
89 if (waiters_.empty()) {
91 if (running_threads_ == threads_.size() && threads_.size() < max_threads_) {
98 waiters_.back()->cv.Signal();
102std::optional<absl::AnyInvocable<void() &&>> ThreadPool::DequeueWork() {
104 absl::MutexLock m(mutex_);
105 while (queue_.empty() && !stopping_) {
107 waiters_.push_back(&self);
108 self.cv.Wait(&mutex_);
109 waiters_.erase(absl::c_find(waiters_, &self));
111 if (queue_.empty()) {
115 absl::AnyInvocable<void() &&> result = std::move(queue_.front());
117 if (!queue_.empty()) {
120 return std::move(result);
125 absl::MutexLock m(mutex_);
126 DCHECK(!stopping_) <<
"Callback added after destructor started";
127 if (ABSL_PREDICT_FALSE(stopping_))
return;
128 queue_.push_back(std::move(callback));
ThreadPool(int num_threads)
void Schedule(absl::AnyInvocable< void() && > callback)