Google OR-Tools v9.15
a fast and portable software suite for combinatorial optimization
Loading...
Searching...
No Matches
threadpool.cc
Go to the documentation of this file.
1// Copyright 2010-2025 Google LLC
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
15
16#include <optional>
17#include <utility>
18
19#include "absl/algorithm/container.h"
20#include "absl/base/nullability.h"
21#include "absl/base/optimization.h"
22#include "absl/base/thread_annotations.h"
23#include "absl/functional/any_invocable.h"
24#include "absl/log/check.h"
25#include "absl/strings/string_view.h"
26#include "absl/synchronization/mutex.h"
27
28namespace operations_research {
29
30// It is a common error to call ThreadPool(workitems.size()), which
31// crashes when workitems is empty. Prevent those crashes by creating at
32// least one thread.
33ThreadPool::ThreadPool(int num_threads)
34 : max_threads_(num_threads == 0 ? 1 : num_threads) {
35 CHECK_GT(max_threads_, 0u);
36 // Spawn a single thread to handle work by default.
37 absl::MutexLock lock(mutex_);
38 SpawnThread();
39}
40
41ThreadPool::ThreadPool(absl::string_view prefix, int num_threads)
42 : ThreadPool(num_threads) {}
43
45 // Make threads finish up by setting stopping_. Ensure all threads waiting see
46 // this change by signalling their condvar.
47 {
48 absl::MutexLock l(mutex_);
49 stopping_ = true;
50 for (Waiter* absl_nonnull waiter : waiters_) {
51 waiter->cv.Signal();
52 }
53 // Wait until the queue is empty. This implies no new threads will be
54 // spawned, and all existing threads are exiting.
55 auto queue_empty = [this]() ABSL_SHARED_LOCKS_REQUIRED(mutex_) {
56 return queue_.empty();
57 };
58 mutex_.Await(absl::Condition(&queue_empty));
59 }
60 // Join and delete all threads. Because the queue is empty, we know no new
61 // threads will be added to threads_.
62 for (auto& worker : threads_) {
63 worker.join();
64 }
65}
66
67void ThreadPool::SpawnThread() {
68 CHECK_LE(threads_.size(), max_threads_);
69 threads_.emplace_back([this] { RunWorker(); });
70}
71
72void ThreadPool::RunWorker() {
73 {
74 absl::MutexLock lock(mutex_);
75 ++running_threads_;
76 }
77 while (true) {
78 std::optional<absl::AnyInvocable<void() &&>> item = DequeueWork();
79 if (!item.has_value()) { // Requesting to stop the worker thread.
80 break;
81 }
82 DCHECK(item);
83 std::move (*item)();
84 }
85}
86
87void ThreadPool::SignalWaiter() {
88 DCHECK(!queue_.empty());
89 if (waiters_.empty()) {
90 // If there are no waiters, try spawning a new thread to pick up work.
91 if (running_threads_ == threads_.size() && threads_.size() < max_threads_) {
92 SpawnThread();
93 }
94 } else {
95 // If there are waiters we wake the last inserted waiter. Note that we can
96 // signal this waiter multiple times. This is not only ok but it is crucial
97 // to reduce spurious wakeups.
98 waiters_.back()->cv.Signal();
99 }
100}
101
102std::optional<absl::AnyInvocable<void() &&>> ThreadPool::DequeueWork() {
103 // Wait for queue to be not-empty
104 absl::MutexLock m(mutex_);
105 while (queue_.empty() && !stopping_) {
106 Waiter self;
107 waiters_.push_back(&self);
108 self.cv.Wait(&mutex_);
109 waiters_.erase(absl::c_find(waiters_, &self));
110 }
111 if (queue_.empty()) {
112 DCHECK(stopping_);
113 return std::nullopt;
114 }
115 absl::AnyInvocable<void() &&> result = std::move(queue_.front());
116 queue_.pop_front();
117 if (!queue_.empty()) {
118 SignalWaiter();
119 }
120 return std::move(result);
121}
122
123void ThreadPool::Schedule(absl::AnyInvocable<void() &&> callback) {
124 // Wait for queue to be not-full
125 absl::MutexLock m(mutex_);
126 DCHECK(!stopping_) << "Callback added after destructor started";
127 if (ABSL_PREDICT_FALSE(stopping_)) return;
128 queue_.push_back(std::move(callback));
129 SignalWaiter();
130}
131
132} // namespace operations_research
void Schedule(absl::AnyInvocable< void() && > callback)
OR-Tools root namespace.