Google OR-Tools v9.14
a fast and portable software suite for combinatorial optimization
Loading...
Searching...
No Matches
sharder.cc
Go to the documentation of this file.
1// Copyright 2010-2025 Google LLC
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
15
16#include <algorithm>
17#include <cmath>
18#include <cstdint>
19#include <functional>
20#include <vector>
21
22#include "Eigen/Core"
23#include "Eigen/SparseCore"
24#include "absl/log/check.h"
25#include "absl/log/log.h"
26#include "absl/log/vlog_is_on.h"
27#include "absl/synchronization/blocking_counter.h"
28#include "absl/time/time.h"
30#include "ortools/base/timer.h"
32
34
35using ::Eigen::VectorXd;
36
37Sharder::Sharder(const int64_t num_elements, const int num_shards,
38 Scheduler* const scheduler,
39 const std::function<int64_t(int64_t)>& element_mass)
40 : scheduler_(scheduler) {
41 CHECK_GE(num_elements, 0);
42 if (num_elements == 0) {
43 shard_starts_.push_back(0);
44 return;
45 }
46 CHECK_GE(num_shards, 1);
47 shard_starts_.reserve(
48 std::min(static_cast<int64_t>(num_shards), num_elements) + 1);
49 shard_masses_.reserve(
50 std::min(static_cast<int64_t>(num_shards), num_elements));
51 int64_t overall_mass = 0;
52 for (int64_t elem = 0; elem < num_elements; ++elem) {
53 overall_mass += element_mass(elem);
54 }
55 shard_starts_.push_back(0);
56 int64_t this_shard_mass = element_mass(0);
57 for (int64_t elem = 1; elem < num_elements; ++elem) {
58 int64_t this_elem_mass = element_mass(elem);
59 if (this_shard_mass + (this_elem_mass / 2) >= overall_mass / num_shards) {
60 // `elem` starts a new shard.
61 shard_masses_.push_back(this_shard_mass);
62 shard_starts_.push_back(elem);
63 this_shard_mass = this_elem_mass;
64 } else {
65 this_shard_mass += this_elem_mass;
66 }
67 }
68 shard_starts_.push_back(num_elements);
69 shard_masses_.push_back(this_shard_mass);
70 CHECK_EQ(NumShards(), shard_masses_.size());
71}
72
73Sharder::Sharder(const int64_t num_elements, const int num_shards,
74 Scheduler* const scheduler)
75 : scheduler_(scheduler) {
76 CHECK_GE(num_elements, 0);
77 if (num_elements == 0) {
78 shard_starts_.push_back(0);
79 return;
80 }
81 CHECK_GE(num_shards, 1);
82 shard_starts_.reserve(std::min(int64_t{num_shards}, num_elements) + 1);
83 shard_masses_.reserve(std::min(int64_t{num_shards}, num_elements));
84 if (num_shards >= num_elements) {
85 for (int64_t element = 0; element < num_elements; ++element) {
86 shard_starts_.push_back(static_cast<int>(element));
87 shard_masses_.push_back(1);
88 }
89 } else {
90 for (int shard = 0; shard < num_shards; ++shard) {
91 const int64_t this_shard_start = ((num_elements * shard) / num_shards);
92 const int64_t next_shard_start =
93 ((num_elements * (shard + 1)) / num_shards);
94 if (next_shard_start - this_shard_start > 0) {
95 shard_starts_.push_back(this_shard_start);
96 shard_masses_.push_back(next_shard_start - this_shard_start);
97 }
98 }
99 }
100 shard_starts_.push_back(num_elements);
101 CHECK_EQ(NumShards(), shard_masses_.size());
102}
103
104Sharder::Sharder(const Sharder& other_sharder, const int64_t num_elements)
105 // The `std::max()` protects against `other_sharder.NumShards() == 0`, which
106 // will happen if `other_sharder` had `num_elements == 0`.
107 : Sharder(num_elements, std::max(1, other_sharder.NumShards()),
108 other_sharder.scheduler_) {}
109
111 const std::function<void(const Shard&)>& func) const {
112 if (scheduler_) {
113 absl::BlockingCounter counter(NumShards());
114 VLOG(2) << "Starting ParallelForEachShard()";
115 scheduler_->ParallelFor(0, NumShards(), [&](int shard_num) {
116 WallTimer timer;
117 if (VLOG_IS_ON(2)) {
118 timer.Start();
119 }
120 func(Shard(shard_num, this));
121 if (VLOG_IS_ON(2)) {
122 timer.Stop();
123 VLOG(2) << "Shard " << shard_num << " with " << ShardSize(shard_num)
124 << " elements and " << ShardMass(shard_num)
125 << " mass finished with "
126 << ShardMass(shard_num) /
127 std::max(int64_t{1},
128 absl::ToInt64Microseconds(timer.GetDuration()))
129 << " mass/usec.";
130 }
131 });
132 VLOG(2) << "Done ParallelForEachShard()";
133 } else {
134 for (int shard_num = 0; shard_num < NumShards(); ++shard_num) {
135 func(Shard(shard_num, this));
136 }
137 }
138}
139
141 const std::function<double(const Shard&)>& func) const {
142 VectorXd local_sums(NumShards());
143 ParallelForEachShard([&](const Sharder::Shard& shard) {
144 local_sums[shard.Index()] = func(shard);
145 });
146 return local_sums.sum();
147}
148
150 const std::function<bool(const Shard&)>& func) const {
151 // Recall `std::vector<bool>` is not thread-safe.
152 std::vector<int> local_result(NumShards());
153 ParallelForEachShard([&](const Sharder::Shard& shard) {
154 local_result[shard.Index()] = static_cast<int>(func(shard));
155 });
156 return std::all_of(local_result.begin(), local_result.end(),
157 [](const int v) { return static_cast<bool>(v); });
158}
159
161 const Eigen::SparseMatrix<double, Eigen::ColMajor, int64_t>& matrix,
162 const VectorXd& vector, const Sharder& sharder) {
163 CHECK_EQ(vector.size(), matrix.rows());
164 VectorXd answer(matrix.cols());
165 sharder.ParallelForEachShard([&](const Sharder::Shard& shard) {
166 // NOTE: For very sparse columns, assignment to `shard(answer)` incurs a
167 // measurable overhead compared to using a constructor
168 // (i.e. `VectorXd temp = ...`). It is not clear why this is the case, nor
169 // how to avoid it.
170 shard(answer) = shard(matrix).transpose() * vector;
171 });
172 return answer;
173}
174
175void SetZero(const Sharder& sharder, VectorXd& dest) {
176 dest.resize(sharder.NumElements());
177 sharder.ParallelForEachShard(
178 [&](const Sharder::Shard& shard) { shard(dest).setZero(); });
179}
180
181VectorXd ZeroVector(const Sharder& sharder) {
182 VectorXd result(sharder.NumElements());
183 SetZero(sharder, result);
184 return result;
185}
186
187VectorXd OnesVector(const Sharder& sharder) {
188 VectorXd result(sharder.NumElements());
189 sharder.ParallelForEachShard(
190 [&](const Sharder::Shard& shard) { shard(result).setOnes(); });
191 return result;
192}
193
194void AddScaledVector(const double scale, const VectorXd& increment,
195 const Sharder& sharder, VectorXd& dest) {
196 sharder.ParallelForEachShard([&](const Sharder::Shard& shard) {
197 shard(dest) += scale * shard(increment);
198 });
199}
200
201void AssignVector(const VectorXd& vec, const Sharder& sharder, VectorXd& dest) {
202 dest.resize(vec.size());
203 sharder.ParallelForEachShard(
204 [&](const Sharder::Shard& shard) { shard(dest) = shard(vec); });
205}
206
207VectorXd CloneVector(const VectorXd& vec, const Sharder& sharder) {
208 VectorXd dest;
209 AssignVector(vec, sharder, dest);
210 return dest;
211}
212
213void CoefficientWiseProductInPlace(const VectorXd& scale,
214 const Sharder& sharder, VectorXd& dest) {
215 sharder.ParallelForEachShard([&](const Sharder::Shard& shard) {
216 shard(dest) = shard(dest).cwiseProduct(shard(scale));
217 });
218}
219
220void CoefficientWiseQuotientInPlace(const VectorXd& scale,
221 const Sharder& sharder, VectorXd& dest) {
222 sharder.ParallelForEachShard([&](const Sharder::Shard& shard) {
223 shard(dest) = shard(dest).cwiseQuotient(shard(scale));
224 });
225}
226
227double Dot(const VectorXd& v1, const VectorXd& v2, const Sharder& sharder) {
228 return sharder.ParallelSumOverShards(
229 [&](const Sharder::Shard& shard) { return shard(v1).dot(shard(v2)); });
230}
231
232double LInfNorm(const VectorXd& vector, const Sharder& sharder) {
233 VectorXd local_max(sharder.NumShards());
234 sharder.ParallelForEachShard([&](const Sharder::Shard& shard) {
235 local_max[shard.Index()] = shard(vector).lpNorm<Eigen::Infinity>();
236 });
237 return local_max.lpNorm<Eigen::Infinity>();
238}
239
240double L1Norm(const VectorXd& vector, const Sharder& sharder) {
241 return sharder.ParallelSumOverShards(
242 [&](const Sharder::Shard& shard) { return shard(vector).lpNorm<1>(); });
243}
244
245double SquaredNorm(const VectorXd& vector, const Sharder& sharder) {
246 return sharder.ParallelSumOverShards(
247 [&](const Sharder::Shard& shard) { return shard(vector).squaredNorm(); });
248}
249
250double Norm(const VectorXd& vector, const Sharder& sharder) {
251 return std::sqrt(SquaredNorm(vector, sharder));
252}
253
254double SquaredDistance(const VectorXd& vector1, const VectorXd& vector2,
255 const Sharder& sharder) {
256 return sharder.ParallelSumOverShards([&](const Sharder::Shard& shard) {
257 return (shard(vector1) - shard(vector2)).squaredNorm();
258 });
259}
260
261double Distance(const VectorXd& vector1, const VectorXd& vector2,
262 const Sharder& sharder) {
263 return std::sqrt(SquaredDistance(vector1, vector2, sharder));
264}
265
266double ScaledLInfNorm(const VectorXd& vector, const VectorXd& scale,
267 const Sharder& sharder) {
268 VectorXd local_max(sharder.NumShards());
269 sharder.ParallelForEachShard([&](const Sharder::Shard& shard) {
270 local_max[shard.Index()] =
271 shard(vector).cwiseProduct(shard(scale)).lpNorm<Eigen::Infinity>();
272 });
273 return local_max.lpNorm<Eigen::Infinity>();
274}
275
276double ScaledSquaredNorm(const VectorXd& vector, const VectorXd& scale,
277 const Sharder& sharder) {
278 return sharder.ParallelSumOverShards([&](const Sharder::Shard& shard) {
279 return shard(vector).cwiseProduct(shard(scale)).squaredNorm();
280 });
281}
282
283double ScaledNorm(const VectorXd& vector, const VectorXd& scale,
284 const Sharder& sharder) {
285 return std::sqrt(ScaledSquaredNorm(vector, scale, sharder));
286}
287
289 const Eigen::SparseMatrix<double, Eigen::ColMajor, int64_t>& matrix,
290 const VectorXd& row_scaling_vec, const VectorXd& col_scaling_vec,
291 const Sharder& sharder) {
292 CHECK_EQ(matrix.cols(), col_scaling_vec.size());
293 CHECK_EQ(matrix.rows(), row_scaling_vec.size());
294 VectorXd answer(matrix.cols());
295 sharder.ParallelForEachShard([&](const Sharder::Shard& shard) {
296 auto matrix_shard = shard(matrix);
297 auto col_scaling_shard = shard(col_scaling_vec);
298 for (int64_t col_num = 0; col_num < shard(matrix).outerSize(); ++col_num) {
299 double max = 0.0;
300 for (decltype(matrix_shard)::InnerIterator it(matrix_shard, col_num); it;
301 ++it) {
302 max = std::max(max, std::abs(it.value() * row_scaling_vec[it.row()]));
303 }
304 shard(answer)[col_num] = max * std::abs(col_scaling_shard[col_num]);
305 }
306 });
307 return answer;
308}
309
311 const Eigen::SparseMatrix<double, Eigen::ColMajor, int64_t>& matrix,
312 const VectorXd& row_scaling_vec, const VectorXd& col_scaling_vec,
313 const Sharder& sharder) {
314 CHECK_EQ(matrix.cols(), col_scaling_vec.size());
315 CHECK_EQ(matrix.rows(), row_scaling_vec.size());
316 VectorXd answer(matrix.cols());
317 sharder.ParallelForEachShard([&](const Sharder::Shard& shard) {
318 auto matrix_shard = shard(matrix);
319 auto col_scaling_shard = shard(col_scaling_vec);
320 for (int64_t col_num = 0; col_num < shard(matrix).outerSize(); ++col_num) {
321 double sum_of_squares = 0.0;
322 for (decltype(matrix_shard)::InnerIterator it(matrix_shard, col_num); it;
323 ++it) {
324 sum_of_squares +=
325 MathUtil::Square(it.value() * row_scaling_vec[it.row()]);
326 }
327 shard(answer)[col_num] =
328 std::sqrt(sum_of_squares) * std::abs(col_scaling_shard[col_num]);
329 }
330 });
331 return answer;
332}
333
334} // namespace operations_research::pdlp
absl::Duration GetDuration() const
Definition timer.h:47
void Start()
When Start() is called multiple times, only the most recent is used.
Definition timer.h:30
void Stop()
Definition timer.h:38
static T Square(const T x)
Returns the square of x.
Definition mathutil.h:101
Thread scheduling interface.
Definition scheduler.h:39
bool ParallelTrueForAllShards(const std::function< bool(const Shard &)> &func) const
Definition sharder.cc:149
double ParallelSumOverShards(const std::function< double(const Shard &)> &func) const
Runs func on each of the shards and sums the results.
Definition sharder.cc:140
int64_t ShardSize(int shard) const
Definition sharder.h:186
int64_t ShardMass(int shard) const
Definition sharder.h:198
Sharder(int64_t num_elements, int num_shards, Scheduler *scheduler, const std::function< int64_t(int64_t)> &element_mass)
Definition sharder.cc:37
void ParallelForEachShard(const std::function< void(const Shard &)> &func) const
Runs func on each of the shards.
Definition sharder.cc:110
int64_t NumElements() const
The number of elements that are split into shards.
Definition sharder.h:184
Validation utilities for solvers.proto.
double SquaredNorm(const VectorXd &vector, const Sharder &sharder)
Definition sharder.cc:245
void SetZero(const Sharder &sharder, VectorXd &dest)
Definition sharder.cc:175
double ScaledNorm(const VectorXd &vector, const VectorXd &scale, const Sharder &sharder)
Definition sharder.cc:283
double Dot(const VectorXd &v1, const VectorXd &v2, const Sharder &sharder)
Definition sharder.cc:227
double SquaredDistance(const VectorXd &vector1, const VectorXd &vector2, const Sharder &sharder)
Definition sharder.cc:254
double LInfNorm(const VectorXd &vector, const Sharder &sharder)
Definition sharder.cc:232
double Distance(const VectorXd &vector1, const VectorXd &vector2, const Sharder &sharder)
Definition sharder.cc:261
VectorXd TransposedMatrixVectorProduct(const Eigen::SparseMatrix< double, Eigen::ColMajor, int64_t > &matrix, const VectorXd &vector, const Sharder &sharder)
Definition sharder.cc:160
double ScaledLInfNorm(const VectorXd &vector, const VectorXd &scale, const Sharder &sharder)
Definition sharder.cc:266
double ScaledSquaredNorm(const VectorXd &vector, const VectorXd &scale, const Sharder &sharder)
Definition sharder.cc:276
VectorXd ScaledColLInfNorm(const Eigen::SparseMatrix< double, Eigen::ColMajor, int64_t > &matrix, const VectorXd &row_scaling_vec, const VectorXd &col_scaling_vec, const Sharder &sharder)
Definition sharder.cc:288
void AddScaledVector(const double scale, const VectorXd &increment, const Sharder &sharder, VectorXd &dest)
Definition sharder.cc:194
void CoefficientWiseProductInPlace(const VectorXd &scale, const Sharder &sharder, VectorXd &dest)
Definition sharder.cc:213
void CoefficientWiseQuotientInPlace(const VectorXd &scale, const Sharder &sharder, VectorXd &dest)
Definition sharder.cc:220
VectorXd ZeroVector(const Sharder &sharder)
Like VectorXd::Zero(sharder.NumElements()).
Definition sharder.cc:181
VectorXd ScaledColL2Norm(const Eigen::SparseMatrix< double, Eigen::ColMajor, int64_t > &matrix, const VectorXd &row_scaling_vec, const VectorXd &col_scaling_vec, const Sharder &sharder)
Definition sharder.cc:310
double L1Norm(const VectorXd &vector, const Sharder &sharder)
Definition sharder.cc:240
VectorXd CloneVector(const VectorXd &vec, const Sharder &sharder)
Definition sharder.cc:207
VectorXd OnesVector(const Sharder &sharder)
Like VectorXd::Ones(sharder.NumElements()).
Definition sharder.cc:187
double Norm(const VectorXd &vector, const Sharder &sharder)
Definition sharder.cc:250
void AssignVector(const VectorXd &vec, const Sharder &sharder, VectorXd &dest)
Definition sharder.cc:201
STL namespace.