99 std::vector<int> num_in_flight_per_subsolvers(subsolvers.size(), 0);
101 SynchronizeAll(subsolvers);
102 ClearSubsolversThatAreDone(num_in_flight_per_subsolvers, subsolvers);
103 const int best = NextSubsolverToSchedule(subsolvers);
104 if (best == -1)
break;
105 subsolvers[best]->NotifySelection();
109 subsolvers[best]->GenerateTask(task_id++)();
110 subsolvers[best]->AddTaskDuration(timer.
Get());
131 int num_threads,
int batch_size,
int max_num_batches) {
132 CHECK_GT(num_threads, 0);
133 CHECK_GT(batch_size, 0);
134 if (batch_size == 1) {
139 std::vector<int> num_in_flight_per_subsolvers(subsolvers.size(), 0);
140 std::vector<std::function<void()>> to_run;
141 std::vector<int> indices;
142 std::vector<double> timing;
143 to_run.reserve(batch_size);
145 for (
int batch_index = 0;; ++batch_index) {
146 VLOG(2) <<
"Starting deterministic batch of size " << batch_size;
147 SynchronizeAll(subsolvers);
148 ClearSubsolversThatAreDone(num_in_flight_per_subsolvers, subsolvers);
152 if (max_num_batches > 0 && batch_index >= max_num_batches)
break;
159 for (
int t = 0; t < batch_size; ++t) {
160 const int best = NextSubsolverToSchedule(subsolvers);
161 if (best == -1)
break;
162 num_in_flight_per_subsolvers[best]++;
163 subsolvers[best]->NotifySelection();
164 to_run.push_back(subsolvers[best]->GenerateTask(task_id++));
165 indices.push_back(best);
167 if (to_run.empty())
break;
170 timing.resize(to_run.size());
171 absl::BlockingCounter blocking_counter(
static_cast<int>(to_run.size()));
172 for (
int i = 0;
i < to_run.size(); ++
i) {
174 [
i, f = std::move(to_run[
i]), &timing, &blocking_counter]() {
178 timing[
i] = timer.
Get();
179 blocking_counter.DecrementCount();
185 blocking_counter.Wait();
188 num_in_flight_per_subsolvers.assign(subsolvers.size(), 0);
189 for (
int i = 0;
i < to_run.size(); ++
i) {
190 subsolvers[indices[
i]]->AddTaskDuration(timing[
i]);
196 const int num_threads,
198 CHECK_GT(num_threads, 0);
199 if (num_threads == 1) {
206 int num_in_flight = 0;
207 std::vector<int> num_in_flight_per_subsolvers(subsolvers.size(), 0);
211 const auto num_in_flight_lt_num_threads = [&num_in_flight, num_threads]() {
212 return num_in_flight < num_threads;
223 bool all_done =
false;
226 const bool condition = mutex.LockWhenWithTimeout(
227 absl::Condition(&num_in_flight_lt_num_threads),
228 absl::Milliseconds(100));
237 SynchronizeAll(subsolvers);
243 if (num_in_flight == 0) all_done =
true;
247 SynchronizeAll(subsolvers);
252 const absl::MutexLock mutex_lock(mutex);
253 ClearSubsolversThatAreDone(num_in_flight_per_subsolvers, subsolvers);
254 best = NextSubsolverToSchedule(subsolvers,
false);
256 std::vector<std::string> debug;
257 for (
int i = 0;
i < subsolvers.size(); ++
i) {
258 if (subsolvers[
i] !=
nullptr && num_in_flight_per_subsolvers[
i] > 0) {
259 debug.push_back(absl::StrCat(subsolvers[
i]->name(),
":",
260 num_in_flight_per_subsolvers[
i]));
263 if (!debug.empty()) {
264 VLOG_EVERY_N_SEC(1, 1)
265 <<
"Subsolvers still running after time limit: "
266 << absl::StrJoin(debug,
",");
277 absl::SleepFor(absl::Milliseconds(1));
282 subsolvers[best]->NotifySelection();
284 absl::MutexLock mutex_lock(mutex);
286 num_in_flight_per_subsolvers[best]++;
288 std::function<void()> task = subsolvers[best]->GenerateTask(task_id++);
289 const std::string name = subsolvers[best]->name();
290 pool.
Schedule([task = std::move(task), name, best, &subsolvers, &mutex,
291 &num_in_flight, &num_in_flight_per_subsolvers]() {
296 const absl::MutexLock mutex_lock(mutex);
297 DCHECK(subsolvers[best] !=
nullptr);
298 DCHECK_GT(num_in_flight_per_subsolvers[best], 0);
299 num_in_flight_per_subsolvers[best]--;
300 VLOG(1) << name <<
" done in " << timer.
Get() <<
"s.";
301 subsolvers[best]->AddTaskDuration(timer.
Get());