Google OR-Tools v9.11
a fast and portable software suite for combinatorial optimization
Loading...
Searching...
No Matches
threadpool.cc
Go to the documentation of this file.
1// Copyright 2010-2024 Google LLC
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
15
16#include <functional>
17#include <mutex>
18
19#include "absl/log/check.h"
20#include "absl/strings/string_view.h"
21
22namespace operations_research {
23void RunWorker(void* data) {
24 ThreadPool* const thread_pool = reinterpret_cast<ThreadPool*>(data);
25 std::function<void()> work = thread_pool->GetNextTask();
26 while (work != nullptr) {
27 work();
28 work = thread_pool->GetNextTask();
29 }
30}
31
32ThreadPool::ThreadPool(int num_threads) : num_workers_(num_threads) {}
33
34ThreadPool::ThreadPool(absl::string_view /*prefix*/, int num_threads)
35 : num_workers_(num_threads) {}
36
38 if (started_) {
39 std::unique_lock<std::mutex> mutex_lock(mutex_);
40 waiting_to_finish_ = true;
41 mutex_lock.unlock();
42 condition_.notify_all();
43 for (int i = 0; i < num_workers_; ++i) {
44 all_workers_[i].join();
45 }
46 }
47}
48
49void ThreadPool::SetQueueCapacity(int capacity) {
50 CHECK_GT(capacity, num_workers_);
51 CHECK(!started_);
52 queue_capacity_ = capacity;
53}
54
56 started_ = true;
57 for (int i = 0; i < num_workers_; ++i) {
58 all_workers_.push_back(std::thread(&RunWorker, this));
59 }
60}
61
62std::function<void()> ThreadPool::GetNextTask() {
63 std::unique_lock<std::mutex> lock(mutex_);
64 for (;;) {
65 if (!tasks_.empty()) {
66 std::function<void()> task = tasks_.front();
67 tasks_.pop_front();
68 if (tasks_.size() < queue_capacity_ && waiting_for_capacity_) {
69 waiting_for_capacity_ = false;
70 capacity_condition_.notify_all();
71 }
72 return task;
73 }
74 if (waiting_to_finish_) {
75 return nullptr;
76 } else {
77 condition_.wait(lock);
78 }
79 }
80 return nullptr;
81}
82
83void ThreadPool::Schedule(std::function<void()> closure) {
84 std::unique_lock<std::mutex> lock(mutex_);
85 while (tasks_.size() >= queue_capacity_) {
86 waiting_for_capacity_ = true;
87 capacity_condition_.wait(lock);
88 }
89 tasks_.push_back(closure);
90 if (started_) {
91 lock.unlock();
92 condition_.notify_all();
93 }
94}
95
96} // namespace operations_research
void Schedule(std::function< void()> closure)
Definition threadpool.cc:83
std::function< void()> GetNextTask()
Definition threadpool.cc:62
void SetQueueCapacity(int capacity)
Definition threadpool.cc:49
In SWIG mode, we don't want anything besides these top-level includes.
void RunWorker(void *data)
Definition threadpool.cc:23