99 std::vector<int> num_in_flight_per_subsolvers(subsolvers.size(), 0);
101 SynchronizeAll(subsolvers);
102 ClearSubsolversThatAreDone(num_in_flight_per_subsolvers, subsolvers);
103 const int best = NextSubsolverToSchedule(subsolvers);
104 if (best == -1)
break;
105 subsolvers[best]->NotifySelection();
109 subsolvers[best]->GenerateTask(task_id++)();
110 subsolvers[best]->AddTaskDuration(timer.
Get());
131 int num_threads,
int batch_size,
int max_num_batches) {
132 CHECK_GT(num_threads, 0);
133 CHECK_GT(batch_size, 0);
134 if (batch_size == 1) {
139 std::vector<int> num_in_flight_per_subsolvers(subsolvers.size(), 0);
140 std::vector<std::function<void()>> to_run;
141 std::vector<int> indices;
142 std::vector<double> timing;
143 to_run.reserve(batch_size);
146 for (
int batch_index = 0;; ++batch_index) {
147 VLOG(2) <<
"Starting deterministic batch of size " << batch_size;
148 SynchronizeAll(subsolvers);
149 ClearSubsolversThatAreDone(num_in_flight_per_subsolvers, subsolvers);
153 if (max_num_batches > 0 && batch_index >= max_num_batches)
break;
160 for (
int t = 0; t < batch_size; ++t) {
161 const int best = NextSubsolverToSchedule(subsolvers);
162 if (best == -1)
break;
163 num_in_flight_per_subsolvers[best]++;
164 subsolvers[best]->NotifySelection();
165 to_run.push_back(subsolvers[best]->GenerateTask(task_id++));
166 indices.push_back(best);
168 if (to_run.empty())
break;
171 timing.resize(to_run.size());
172 absl::BlockingCounter blocking_counter(
static_cast<int>(to_run.size()));
173 for (
int i = 0;
i < to_run.size(); ++
i) {
175 [
i, f = std::move(to_run[
i]), &timing, &blocking_counter]() {
179 timing[
i] = timer.
Get();
180 blocking_counter.DecrementCount();
186 blocking_counter.Wait();
189 num_in_flight_per_subsolvers.assign(subsolvers.size(), 0);
190 for (
int i = 0;
i < to_run.size(); ++
i) {
191 subsolvers[indices[
i]]->AddTaskDuration(timing[
i]);
197 const int num_threads,
199 CHECK_GT(num_threads, 0);
200 if (num_threads == 1) {
207 int num_in_flight = 0;
208 std::vector<int> num_in_flight_per_subsolvers(subsolvers.size(), 0);
212 const auto num_in_flight_lt_num_threads = [&num_in_flight, num_threads]() {
213 return num_in_flight < num_threads;
225 bool all_done =
false;
228 const bool condition = mutex.LockWhenWithTimeout(
229 absl::Condition(&num_in_flight_lt_num_threads),
230 absl::Milliseconds(100));
239 SynchronizeAll(subsolvers);
245 if (num_in_flight == 0) all_done =
true;
249 SynchronizeAll(subsolvers);
254 const absl::MutexLock mutex_lock(&mutex);
255 ClearSubsolversThatAreDone(num_in_flight_per_subsolvers, subsolvers);
256 best = NextSubsolverToSchedule(subsolvers,
false);
257 if (VLOG_IS_ON(1) &&
time_limit->LimitReached()) {
258 std::vector<std::string> debug;
259 for (
int i = 0;
i < subsolvers.size(); ++
i) {
260 if (subsolvers[
i] !=
nullptr && num_in_flight_per_subsolvers[
i] > 0) {
261 debug.push_back(absl::StrCat(subsolvers[
i]->name(),
":",
262 num_in_flight_per_subsolvers[
i]));
265 if (!debug.empty()) {
266 VLOG_EVERY_N_SEC(1, 1)
267 <<
"Subsolvers still running after time limit: "
268 << absl::StrJoin(debug,
",");
279 absl::SleepFor(absl::Milliseconds(1));
284 subsolvers[best]->NotifySelection();
286 absl::MutexLock mutex_lock(&mutex);
288 num_in_flight_per_subsolvers[best]++;
290 std::function<void()> task = subsolvers[best]->GenerateTask(task_id++);
291 const std::string name = subsolvers[best]->name();
292 pool.
Schedule([task = std::move(task), name, best, &subsolvers, &mutex,
293 &num_in_flight, &num_in_flight_per_subsolvers]() {
298 const absl::MutexLock mutex_lock(&mutex);
299 DCHECK(subsolvers[best] !=
nullptr);
300 DCHECK_GT(num_in_flight_per_subsolvers[best], 0);
301 num_in_flight_per_subsolvers[best]--;
302 VLOG(1) << name <<
" done in " << timer.
Get() <<
"s.";
303 subsolvers[best]->AddTaskDuration(timer.
Get());