88 std::vector<int64_t> num_generated_tasks(subsolvers.size(), 0);
89 std::vector<int> num_in_flight_per_subsolvers(subsolvers.size(), 0);
91 SynchronizeAll(subsolvers);
92 ClearSubsolversThatAreDone(num_in_flight_per_subsolvers, subsolvers);
93 const int best = NextSubsolverToSchedule(subsolvers, num_generated_tasks);
94 if (best == -1)
break;
95 num_generated_tasks[best]++;
99 subsolvers[best]->GenerateTask(task_id++)();
100 subsolvers[best]->AddTaskDuration(timer.
Get());
121 int num_threads,
int batch_size,
int max_num_batches) {
122 CHECK_GT(num_threads, 0);
123 CHECK_GT(batch_size, 0);
124 if (batch_size == 1) {
129 std::vector<int64_t> num_generated_tasks(subsolvers.size(), 0);
130 std::vector<int> num_in_flight_per_subsolvers(subsolvers.size(), 0);
131 std::vector<std::function<void()>> to_run;
132 std::vector<int> indices;
133 std::vector<double> timing;
134 to_run.reserve(batch_size);
135 ThreadPool pool(
"DeterministicLoop", num_threads);
137 for (
int batch_index = 0;; ++batch_index) {
138 VLOG(2) <<
"Starting deterministic batch of size " << batch_size;
139 SynchronizeAll(subsolvers);
140 ClearSubsolversThatAreDone(num_in_flight_per_subsolvers, subsolvers);
144 if (max_num_batches > 0 && batch_index >= max_num_batches)
break;
151 for (
int t = 0; t < batch_size; ++t) {
152 const int best = NextSubsolverToSchedule(subsolvers, num_generated_tasks);
153 if (best == -1)
break;
154 num_in_flight_per_subsolvers[best]++;
155 num_generated_tasks[best]++;
156 to_run.push_back(subsolvers[best]->GenerateTask(task_id++));
157 indices.push_back(best);
159 if (to_run.empty())
break;
162 timing.resize(to_run.size());
163 absl::BlockingCounter blocking_counter(
static_cast<int>(to_run.size()));
164 for (
int i = 0;
i < to_run.size(); ++
i) {
166 [
i, f = std::move(to_run[
i]), &timing, &blocking_counter]() {
170 timing[
i] = timer.
Get();
171 blocking_counter.DecrementCount();
177 blocking_counter.Wait();
180 num_in_flight_per_subsolvers.assign(subsolvers.size(), 0);
181 for (
int i = 0;
i < to_run.size(); ++
i) {
182 subsolvers[indices[
i]]->AddTaskDuration(timing[
i]);
188 const int num_threads) {
189 CHECK_GT(num_threads, 0);
190 if (num_threads == 1) {
197 int num_in_flight = 0;
198 std::vector<int> num_in_flight_per_subsolvers(subsolvers.size(), 0);
202 const auto num_in_flight_lt_num_threads = [&num_in_flight, num_threads]() {
203 return num_in_flight < num_threads;
206 ThreadPool pool(
"NonDeterministicLoop", num_threads);
213 std::vector<int64_t> num_generated_tasks(subsolvers.size(), 0);
216 bool all_done =
false;
219 const bool condition = mutex.LockWhenWithTimeout(
220 absl::Condition(&num_in_flight_lt_num_threads),
221 absl::Milliseconds(100));
230 SynchronizeAll(subsolvers);
236 if (num_in_flight == 0) all_done =
true;
240 SynchronizeAll(subsolvers);
244 const absl::MutexLock mutex_lock(&mutex);
245 ClearSubsolversThatAreDone(num_in_flight_per_subsolvers, subsolvers);
247 const int best = NextSubsolverToSchedule(subsolvers, num_generated_tasks);
255 absl::SleepFor(absl::Milliseconds(1));
260 num_generated_tasks[best]++;
262 absl::MutexLock mutex_lock(&mutex);
264 num_in_flight_per_subsolvers[best]++;
266 std::function<void()> task = subsolvers[best]->GenerateTask(task_id++);
267 const std::string
name = subsolvers[best]->name();
268 pool.
Schedule([task = std::move(task),
name, best, &subsolvers, &mutex,
269 &num_in_flight, &num_in_flight_per_subsolvers]() {
274 const absl::MutexLock mutex_lock(&mutex);
275 DCHECK(subsolvers[best] !=
nullptr);
276 DCHECK_GT(num_in_flight_per_subsolvers[best], 0);
277 num_in_flight_per_subsolvers[best]--;
278 VLOG(1) <<
name <<
" done in " << timer.
Get() <<
"s.";
279 subsolvers[best]->AddTaskDuration(timer.
Get());