97 std::vector<int> num_in_flight_per_subsolvers(subsolvers.size(), 0);
99 SynchronizeAll(subsolvers);
100 ClearSubsolversThatAreDone(num_in_flight_per_subsolvers, subsolvers);
101 const int best = NextSubsolverToSchedule(subsolvers);
102 if (best == -1)
break;
103 subsolvers[best]->NotifySelection();
107 subsolvers[best]->GenerateTask(task_id++)();
108 subsolvers[best]->AddTaskDuration(timer.
Get());
129 int num_threads,
int batch_size,
int max_num_batches) {
130 CHECK_GT(num_threads, 0);
131 CHECK_GT(batch_size, 0);
132 if (batch_size == 1) {
137 std::vector<int> num_in_flight_per_subsolvers(subsolvers.size(), 0);
138 std::vector<std::function<void()>> to_run;
139 std::vector<int> indices;
140 std::vector<double> timing;
141 to_run.reserve(batch_size);
144 for (
int batch_index = 0;; ++batch_index) {
145 VLOG(2) <<
"Starting deterministic batch of size " << batch_size;
146 SynchronizeAll(subsolvers);
147 ClearSubsolversThatAreDone(num_in_flight_per_subsolvers, subsolvers);
151 if (max_num_batches > 0 && batch_index >= max_num_batches)
break;
158 for (
int t = 0; t < batch_size; ++t) {
159 const int best = NextSubsolverToSchedule(subsolvers);
160 if (best == -1)
break;
161 num_in_flight_per_subsolvers[best]++;
162 subsolvers[best]->NotifySelection();
163 to_run.push_back(subsolvers[best]->GenerateTask(task_id++));
164 indices.push_back(best);
166 if (to_run.empty())
break;
169 timing.resize(to_run.size());
170 absl::BlockingCounter blocking_counter(
static_cast<int>(to_run.size()));
171 for (
int i = 0;
i < to_run.size(); ++
i) {
173 [
i, f = std::move(to_run[
i]), &timing, &blocking_counter]() {
177 timing[
i] = timer.
Get();
178 blocking_counter.DecrementCount();
184 blocking_counter.Wait();
187 num_in_flight_per_subsolvers.assign(subsolvers.size(), 0);
188 for (
int i = 0;
i < to_run.size(); ++
i) {
189 subsolvers[indices[
i]]->AddTaskDuration(timing[
i]);
195 const int num_threads,
197 CHECK_GT(num_threads, 0);
198 if (num_threads == 1) {
205 int num_in_flight = 0;
206 std::vector<int> num_in_flight_per_subsolvers(subsolvers.size(), 0);
210 const auto num_in_flight_lt_num_threads = [&num_in_flight, num_threads]() {
211 return num_in_flight < num_threads;
223 bool all_done =
false;
226 const bool condition = mutex.LockWhenWithTimeout(
227 absl::Condition(&num_in_flight_lt_num_threads),
228 absl::Milliseconds(100));
237 SynchronizeAll(subsolvers);
243 if (num_in_flight == 0) all_done =
true;
247 SynchronizeAll(subsolvers);
252 const absl::MutexLock mutex_lock(&mutex);
253 ClearSubsolversThatAreDone(num_in_flight_per_subsolvers, subsolvers);
254 best = NextSubsolverToSchedule(subsolvers,
false);
255 if (VLOG_IS_ON(1) &&
time_limit->LimitReached()) {
256 std::vector<std::string> debug;
257 for (
int i = 0;
i < subsolvers.size(); ++
i) {
258 if (subsolvers[
i] !=
nullptr && num_in_flight_per_subsolvers[
i] > 0) {
259 debug.push_back(absl::StrCat(subsolvers[
i]->name(),
":",
260 num_in_flight_per_subsolvers[
i]));
263 if (!debug.empty()) {
264 VLOG_EVERY_N_SEC(1, 1)
265 <<
"Subsolvers still running after time limit: "
266 << absl::StrJoin(debug,
",");
277 absl::SleepFor(absl::Milliseconds(1));
282 subsolvers[best]->NotifySelection();
284 absl::MutexLock mutex_lock(&mutex);
286 num_in_flight_per_subsolvers[best]++;
288 std::function<void()> task = subsolvers[best]->GenerateTask(task_id++);
289 const std::string name = subsolvers[best]->name();
290 pool.
Schedule([task = std::move(task), name, best, &subsolvers, &mutex,
291 &num_in_flight, &num_in_flight_per_subsolvers]() {
296 const absl::MutexLock mutex_lock(&mutex);
297 DCHECK(subsolvers[best] !=
nullptr);
298 DCHECK_GT(num_in_flight_per_subsolvers[best], 0);
299 num_in_flight_per_subsolvers[best]--;
300 VLOG(1) << name <<
" done in " << timer.
Get() <<
"s.";
301 subsolvers[best]->AddTaskDuration(timer.
Get());