Google OR-Tools v9.11
a fast and portable software suite for combinatorial optimization
Loading...
Searching...
No Matches
sharder.cc
Go to the documentation of this file.
1// Copyright 2010-2024 Google LLC
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
15
16#include <algorithm>
17#include <cmath>
18#include <cstdint>
19#include <functional>
20#include <vector>
21
22#include "Eigen/Core"
23#include "Eigen/SparseCore"
24#include "absl/log/check.h"
25#include "absl/synchronization/blocking_counter.h"
26#include "absl/time/time.h"
30#include "ortools/base/timer.h"
31
33
34using ::Eigen::VectorXd;
35
36Sharder::Sharder(const int64_t num_elements, const int num_shards,
37 ThreadPool* const thread_pool,
38 const std::function<int64_t(int64_t)>& element_mass)
39 : thread_pool_(thread_pool) {
40 CHECK_GE(num_elements, 0);
41 if (num_elements == 0) {
42 shard_starts_.push_back(0);
43 return;
44 }
45 CHECK_GE(num_shards, 1);
46 shard_starts_.reserve(
47 std::min(static_cast<int64_t>(num_shards), num_elements) + 1);
48 shard_masses_.reserve(
49 std::min(static_cast<int64_t>(num_shards), num_elements));
50 int64_t overall_mass = 0;
51 for (int64_t elem = 0; elem < num_elements; ++elem) {
52 overall_mass += element_mass(elem);
53 }
54 shard_starts_.push_back(0);
55 int64_t this_shard_mass = element_mass(0);
56 for (int64_t elem = 1; elem < num_elements; ++elem) {
57 int64_t this_elem_mass = element_mass(elem);
58 if (this_shard_mass + (this_elem_mass / 2) >= overall_mass / num_shards) {
59 // `elem` starts a new shard.
60 shard_masses_.push_back(this_shard_mass);
61 shard_starts_.push_back(elem);
62 this_shard_mass = this_elem_mass;
63 } else {
64 this_shard_mass += this_elem_mass;
65 }
66 }
67 shard_starts_.push_back(num_elements);
68 shard_masses_.push_back(this_shard_mass);
69 CHECK_EQ(NumShards(), shard_masses_.size());
70}
71
72Sharder::Sharder(const int64_t num_elements, const int num_shards,
73 ThreadPool* const thread_pool)
74 : thread_pool_(thread_pool) {
75 CHECK_GE(num_elements, 0);
76 if (num_elements == 0) {
77 shard_starts_.push_back(0);
78 return;
79 }
80 CHECK_GE(num_shards, 1);
81 shard_starts_.reserve(std::min(int64_t{num_shards}, num_elements) + 1);
82 shard_masses_.reserve(std::min(int64_t{num_shards}, num_elements));
83 if (num_shards >= num_elements) {
84 for (int64_t element = 0; element < num_elements; ++element) {
85 shard_starts_.push_back(static_cast<int>(element));
86 shard_masses_.push_back(1);
87 }
88 } else {
89 for (int shard = 0; shard < num_shards; ++shard) {
90 const int64_t this_shard_start = ((num_elements * shard) / num_shards);
91 const int64_t next_shard_start =
92 ((num_elements * (shard + 1)) / num_shards);
93 if (next_shard_start - this_shard_start > 0) {
94 shard_starts_.push_back(this_shard_start);
95 shard_masses_.push_back(next_shard_start - this_shard_start);
96 }
97 }
98 }
99 shard_starts_.push_back(num_elements);
100 CHECK_EQ(NumShards(), shard_masses_.size());
101}
102
103Sharder::Sharder(const Sharder& other_sharder, const int64_t num_elements)
104 // The `std::max()` protects against `other_sharder.NumShards() == 0`, which
105 // will happen if `other_sharder` had `num_elements == 0`.
106 : Sharder(num_elements, std::max(1, other_sharder.NumShards()),
107 other_sharder.thread_pool_) {}
108
110 const std::function<void(const Shard&)>& func) const {
111 if (thread_pool_) {
112 absl::BlockingCounter counter(NumShards());
113 VLOG(2) << "Starting ParallelForEachShard()";
114 for (int shard_num = 0; shard_num < NumShards(); ++shard_num) {
115 thread_pool_->Schedule([&, shard_num]() {
116 WallTimer timer;
117 if (VLOG_IS_ON(2)) {
118 timer.Start();
119 }
120 func(Shard(shard_num, this));
121 if (VLOG_IS_ON(2)) {
122 timer.Stop();
123 VLOG(2) << "Shard " << shard_num << " with " << ShardSize(shard_num)
124 << " elements and " << ShardMass(shard_num)
125 << " mass finished with "
126 << ShardMass(shard_num) /
127 std::max(int64_t{1}, absl::ToInt64Microseconds(
128 timer.GetDuration()))
129 << " mass/usec.";
130 }
131 counter.DecrementCount();
132 });
133 }
134 counter.Wait();
135 VLOG(2) << "Done ParallelForEachShard()";
136 } else {
137 for (int shard_num = 0; shard_num < NumShards(); ++shard_num) {
138 func(Shard(shard_num, this));
139 }
140 }
141}
142
144 const std::function<double(const Shard&)>& func) const {
145 VectorXd local_sums(NumShards());
146 ParallelForEachShard([&](const Sharder::Shard& shard) {
147 local_sums[shard.Index()] = func(shard);
148 });
149 return local_sums.sum();
150}
151
153 const std::function<bool(const Shard&)>& func) const {
154 // Recall `std::vector<bool>` is not thread-safe.
155 std::vector<int> local_result(NumShards());
156 ParallelForEachShard([&](const Sharder::Shard& shard) {
157 local_result[shard.Index()] = static_cast<int>(func(shard));
158 });
159 return std::all_of(local_result.begin(), local_result.end(),
160 [](const int v) { return static_cast<bool>(v); });
161}
162
164 const Eigen::SparseMatrix<double, Eigen::ColMajor, int64_t>& matrix,
165 const VectorXd& vector, const Sharder& sharder) {
166 CHECK_EQ(vector.size(), matrix.rows());
167 VectorXd answer(matrix.cols());
168 sharder.ParallelForEachShard([&](const Sharder::Shard& shard) {
169 // NOTE: For very sparse columns, assignment to `shard(answer)` incurs a
170 // measurable overhead compared to using a constructor
171 // (i.e. `VectorXd temp = ...`). It is not clear why this is the case, nor
172 // how to avoid it.
173 shard(answer) = shard(matrix).transpose() * vector;
174 });
175 return answer;
176}
177
178void SetZero(const Sharder& sharder, VectorXd& dest) {
179 dest.resize(sharder.NumElements());
180 sharder.ParallelForEachShard(
181 [&](const Sharder::Shard& shard) { shard(dest).setZero(); });
182}
183
184VectorXd ZeroVector(const Sharder& sharder) {
185 VectorXd result(sharder.NumElements());
186 SetZero(sharder, result);
187 return result;
188}
189
190VectorXd OnesVector(const Sharder& sharder) {
191 VectorXd result(sharder.NumElements());
192 sharder.ParallelForEachShard(
193 [&](const Sharder::Shard& shard) { shard(result).setOnes(); });
194 return result;
195}
196
197void AddScaledVector(const double scale, const VectorXd& increment,
198 const Sharder& sharder, VectorXd& dest) {
199 sharder.ParallelForEachShard([&](const Sharder::Shard& shard) {
200 shard(dest) += scale * shard(increment);
201 });
202}
203
204void AssignVector(const VectorXd& vec, const Sharder& sharder, VectorXd& dest) {
205 dest.resize(vec.size());
206 sharder.ParallelForEachShard(
207 [&](const Sharder::Shard& shard) { shard(dest) = shard(vec); });
208}
209
210VectorXd CloneVector(const VectorXd& vec, const Sharder& sharder) {
211 VectorXd dest;
212 AssignVector(vec, sharder, dest);
213 return dest;
214}
215
216void CoefficientWiseProductInPlace(const VectorXd& scale,
217 const Sharder& sharder, VectorXd& dest) {
218 sharder.ParallelForEachShard([&](const Sharder::Shard& shard) {
219 shard(dest) = shard(dest).cwiseProduct(shard(scale));
220 });
221}
222
223void CoefficientWiseQuotientInPlace(const VectorXd& scale,
224 const Sharder& sharder, VectorXd& dest) {
225 sharder.ParallelForEachShard([&](const Sharder::Shard& shard) {
226 shard(dest) = shard(dest).cwiseQuotient(shard(scale));
227 });
228}
229
230double Dot(const VectorXd& v1, const VectorXd& v2, const Sharder& sharder) {
231 return sharder.ParallelSumOverShards(
232 [&](const Sharder::Shard& shard) { return shard(v1).dot(shard(v2)); });
233}
234
235double LInfNorm(const VectorXd& vector, const Sharder& sharder) {
236 VectorXd local_max(sharder.NumShards());
237 sharder.ParallelForEachShard([&](const Sharder::Shard& shard) {
238 local_max[shard.Index()] = shard(vector).lpNorm<Eigen::Infinity>();
239 });
240 return local_max.lpNorm<Eigen::Infinity>();
241}
242
243double L1Norm(const VectorXd& vector, const Sharder& sharder) {
244 return sharder.ParallelSumOverShards(
245 [&](const Sharder::Shard& shard) { return shard(vector).lpNorm<1>(); });
246}
247
248double SquaredNorm(const VectorXd& vector, const Sharder& sharder) {
249 return sharder.ParallelSumOverShards(
250 [&](const Sharder::Shard& shard) { return shard(vector).squaredNorm(); });
251}
252
253double Norm(const VectorXd& vector, const Sharder& sharder) {
254 return std::sqrt(SquaredNorm(vector, sharder));
255}
256
257double SquaredDistance(const VectorXd& vector1, const VectorXd& vector2,
258 const Sharder& sharder) {
259 return sharder.ParallelSumOverShards([&](const Sharder::Shard& shard) {
260 return (shard(vector1) - shard(vector2)).squaredNorm();
261 });
262}
263
264double Distance(const VectorXd& vector1, const VectorXd& vector2,
265 const Sharder& sharder) {
266 return std::sqrt(SquaredDistance(vector1, vector2, sharder));
267}
268
269double ScaledLInfNorm(const VectorXd& vector, const VectorXd& scale,
270 const Sharder& sharder) {
271 VectorXd local_max(sharder.NumShards());
272 sharder.ParallelForEachShard([&](const Sharder::Shard& shard) {
273 local_max[shard.Index()] =
274 shard(vector).cwiseProduct(shard(scale)).lpNorm<Eigen::Infinity>();
275 });
276 return local_max.lpNorm<Eigen::Infinity>();
277}
278
279double ScaledSquaredNorm(const VectorXd& vector, const VectorXd& scale,
280 const Sharder& sharder) {
281 return sharder.ParallelSumOverShards([&](const Sharder::Shard& shard) {
282 return shard(vector).cwiseProduct(shard(scale)).squaredNorm();
283 });
284}
285
286double ScaledNorm(const VectorXd& vector, const VectorXd& scale,
287 const Sharder& sharder) {
288 return std::sqrt(ScaledSquaredNorm(vector, scale, sharder));
289}
290
292 const Eigen::SparseMatrix<double, Eigen::ColMajor, int64_t>& matrix,
293 const VectorXd& row_scaling_vec, const VectorXd& col_scaling_vec,
294 const Sharder& sharder) {
295 CHECK_EQ(matrix.cols(), col_scaling_vec.size());
296 CHECK_EQ(matrix.rows(), row_scaling_vec.size());
297 VectorXd answer(matrix.cols());
298 sharder.ParallelForEachShard([&](const Sharder::Shard& shard) {
299 auto matrix_shard = shard(matrix);
300 auto col_scaling_shard = shard(col_scaling_vec);
301 for (int64_t col_num = 0; col_num < shard(matrix).outerSize(); ++col_num) {
302 double max = 0.0;
303 for (decltype(matrix_shard)::InnerIterator it(matrix_shard, col_num); it;
304 ++it) {
305 max = std::max(max, std::abs(it.value() * row_scaling_vec[it.row()]));
306 }
307 shard(answer)[col_num] = max * std::abs(col_scaling_shard[col_num]);
308 }
309 });
310 return answer;
311}
312
314 const Eigen::SparseMatrix<double, Eigen::ColMajor, int64_t>& matrix,
315 const VectorXd& row_scaling_vec, const VectorXd& col_scaling_vec,
316 const Sharder& sharder) {
317 CHECK_EQ(matrix.cols(), col_scaling_vec.size());
318 CHECK_EQ(matrix.rows(), row_scaling_vec.size());
319 VectorXd answer(matrix.cols());
320 sharder.ParallelForEachShard([&](const Sharder::Shard& shard) {
321 auto matrix_shard = shard(matrix);
322 auto col_scaling_shard = shard(col_scaling_vec);
323 for (int64_t col_num = 0; col_num < shard(matrix).outerSize(); ++col_num) {
324 double sum_of_squares = 0.0;
325 for (decltype(matrix_shard)::InnerIterator it(matrix_shard, col_num); it;
326 ++it) {
327 sum_of_squares +=
328 MathUtil::Square(it.value() * row_scaling_vec[it.row()]);
329 }
330 shard(answer)[col_num] =
331 std::sqrt(sum_of_squares) * std::abs(col_scaling_shard[col_num]);
332 }
333 });
334 return answer;
335}
336
337} // namespace operations_research::pdlp
int64_t max
absl::Duration GetDuration() const
Definition timer.h:49
void Start()
When Start() is called multiple times, only the most recent is used.
Definition timer.h:32
void Stop()
Definition timer.h:40
static T Square(const T x)
Returns the square of x.
Definition mathutil.h:102
void Schedule(std::function< void()> closure)
Definition threadpool.cc:83
bool ParallelTrueForAllShards(const std::function< bool(const Shard &)> &func) const
Definition sharder.cc:152
double ParallelSumOverShards(const std::function< double(const Shard &)> &func) const
Runs func on each of the shards and sums the results.
Definition sharder.cc:143
int64_t ShardSize(int shard) const
Definition sharder.h:186
int64_t ShardMass(int shard) const
Definition sharder.h:198
Sharder(int64_t num_elements, int num_shards, ThreadPool *thread_pool, const std::function< int64_t(int64_t)> &element_mass)
Definition sharder.cc:36
void ParallelForEachShard(const std::function< void(const Shard &)> &func) const
Runs func on each of the shards.
Definition sharder.cc:109
int64_t NumElements() const
The number of elements that are split into shards.
Definition sharder.h:184
Validation utilities for solvers.proto.
double SquaredNorm(const VectorXd &vector, const Sharder &sharder)
Definition sharder.cc:248
void SetZero(const Sharder &sharder, VectorXd &dest)
Definition sharder.cc:178
double ScaledNorm(const VectorXd &vector, const VectorXd &scale, const Sharder &sharder)
Definition sharder.cc:286
double Dot(const VectorXd &v1, const VectorXd &v2, const Sharder &sharder)
Definition sharder.cc:230
double SquaredDistance(const VectorXd &vector1, const VectorXd &vector2, const Sharder &sharder)
Definition sharder.cc:257
double LInfNorm(const VectorXd &vector, const Sharder &sharder)
Definition sharder.cc:235
double Distance(const VectorXd &vector1, const VectorXd &vector2, const Sharder &sharder)
Definition sharder.cc:264
VectorXd TransposedMatrixVectorProduct(const Eigen::SparseMatrix< double, Eigen::ColMajor, int64_t > &matrix, const VectorXd &vector, const Sharder &sharder)
Definition sharder.cc:163
double ScaledLInfNorm(const VectorXd &vector, const VectorXd &scale, const Sharder &sharder)
Definition sharder.cc:269
double ScaledSquaredNorm(const VectorXd &vector, const VectorXd &scale, const Sharder &sharder)
Definition sharder.cc:279
VectorXd ScaledColLInfNorm(const Eigen::SparseMatrix< double, Eigen::ColMajor, int64_t > &matrix, const VectorXd &row_scaling_vec, const VectorXd &col_scaling_vec, const Sharder &sharder)
Definition sharder.cc:291
void AddScaledVector(const double scale, const VectorXd &increment, const Sharder &sharder, VectorXd &dest)
Definition sharder.cc:197
void CoefficientWiseProductInPlace(const VectorXd &scale, const Sharder &sharder, VectorXd &dest)
Definition sharder.cc:216
void CoefficientWiseQuotientInPlace(const VectorXd &scale, const Sharder &sharder, VectorXd &dest)
Definition sharder.cc:223
VectorXd ZeroVector(const Sharder &sharder)
Like VectorXd::Zero(sharder.NumElements()).
Definition sharder.cc:184
VectorXd ScaledColL2Norm(const Eigen::SparseMatrix< double, Eigen::ColMajor, int64_t > &matrix, const VectorXd &row_scaling_vec, const VectorXd &col_scaling_vec, const Sharder &sharder)
Definition sharder.cc:313
double L1Norm(const VectorXd &vector, const Sharder &sharder)
Definition sharder.cc:243
VectorXd CloneVector(const VectorXd &vec, const Sharder &sharder)
Definition sharder.cc:210
VectorXd OnesVector(const Sharder &sharder)
Like VectorXd::Ones(sharder.NumElements()).
Definition sharder.cc:190
double Norm(const VectorXd &vector, const Sharder &sharder)
Definition sharder.cc:253
void AssignVector(const VectorXd &vec, const Sharder &sharder, VectorXd &dest)
Definition sharder.cc:204
STL namespace.