23#include "Eigen/SparseCore"
24#include "absl/log/check.h"
25#include "absl/synchronization/blocking_counter.h"
26#include "absl/time/time.h"
34using ::Eigen::VectorXd;
38 const std::function<int64_t(int64_t)>& element_mass)
39 : scheduler_(scheduler) {
40 CHECK_GE(num_elements, 0);
41 if (num_elements == 0) {
42 shard_starts_.push_back(0);
45 CHECK_GE(num_shards, 1);
46 shard_starts_.reserve(
47 std::min(
static_cast<int64_t
>(num_shards), num_elements) + 1);
48 shard_masses_.reserve(
49 std::min(
static_cast<int64_t
>(num_shards), num_elements));
50 int64_t overall_mass = 0;
51 for (int64_t elem = 0; elem < num_elements; ++elem) {
52 overall_mass += element_mass(elem);
54 shard_starts_.push_back(0);
55 int64_t this_shard_mass = element_mass(0);
56 for (int64_t elem = 1; elem < num_elements; ++elem) {
57 int64_t this_elem_mass = element_mass(elem);
58 if (this_shard_mass + (this_elem_mass / 2) >= overall_mass / num_shards) {
60 shard_masses_.push_back(this_shard_mass);
61 shard_starts_.push_back(elem);
62 this_shard_mass = this_elem_mass;
64 this_shard_mass += this_elem_mass;
67 shard_starts_.push_back(num_elements);
68 shard_masses_.push_back(this_shard_mass);
69 CHECK_EQ(
NumShards(), shard_masses_.size());
74 : scheduler_(scheduler) {
75 CHECK_GE(num_elements, 0);
76 if (num_elements == 0) {
77 shard_starts_.push_back(0);
80 CHECK_GE(num_shards, 1);
81 shard_starts_.reserve(std::min(int64_t{num_shards}, num_elements) + 1);
82 shard_masses_.reserve(std::min(int64_t{num_shards}, num_elements));
83 if (num_shards >= num_elements) {
84 for (int64_t element = 0; element < num_elements; ++element) {
85 shard_starts_.push_back(
static_cast<int>(element));
86 shard_masses_.push_back(1);
89 for (
int shard = 0; shard < num_shards; ++shard) {
90 const int64_t this_shard_start = ((num_elements * shard) / num_shards);
91 const int64_t next_shard_start =
92 ((num_elements * (shard + 1)) / num_shards);
93 if (next_shard_start - this_shard_start > 0) {
94 shard_starts_.push_back(this_shard_start);
95 shard_masses_.push_back(next_shard_start - this_shard_start);
99 shard_starts_.push_back(num_elements);
100 CHECK_EQ(
NumShards(), shard_masses_.size());
107 other_sharder.scheduler_) {}
110 const std::function<
void(
const Shard&)>& func)
const {
112 absl::BlockingCounter counter(
NumShards());
113 VLOG(2) <<
"Starting ParallelForEachShard()";
114 scheduler_->ParallelFor(0,
NumShards(), [&](
int shard_num) {
119 func(
Shard(shard_num,
this));
122 VLOG(2) <<
"Shard " << shard_num <<
" with " <<
ShardSize(shard_num)
123 <<
" elements and " <<
ShardMass(shard_num)
124 <<
" mass finished with "
131 VLOG(2) <<
"Done ParallelForEachShard()";
133 for (
int shard_num = 0; shard_num <
NumShards(); ++shard_num) {
134 func(
Shard(shard_num,
this));
140 const std::function<
double(
const Shard&)>& func)
const {
143 local_sums[shard.
Index()] = func(shard);
145 return local_sums.sum();
149 const std::function<
bool(
const Shard&)>& func)
const {
151 std::vector<int> local_result(
NumShards());
153 local_result[shard.
Index()] =
static_cast<int>(func(shard));
155 return std::all_of(local_result.begin(), local_result.end(),
156 [](
const int v) { return static_cast<bool>(v); });
160 const Eigen::SparseMatrix<double, Eigen::ColMajor, int64_t>& matrix,
161 const VectorXd& vector,
const Sharder& sharder) {
162 CHECK_EQ(vector.size(), matrix.rows());
163 VectorXd answer(matrix.cols());
169 shard(answer) = shard(matrix).transpose() * vector;
194 const Sharder& sharder, VectorXd& dest) {
196 shard(dest) += scale * shard(increment);
201 dest.resize(vec.size());
213 const Sharder& sharder, VectorXd& dest) {
215 shard(dest) = shard(dest).cwiseProduct(shard(scale));
220 const Sharder& sharder, VectorXd& dest) {
222 shard(dest) = shard(dest).cwiseQuotient(shard(scale));
226double Dot(
const VectorXd& v1,
const VectorXd& v2,
const Sharder& sharder) {
228 [&](
const Sharder::Shard& shard) {
return shard(v1).dot(shard(v2)); });
234 local_max[shard.
Index()] = shard(vector).lpNorm<Eigen::Infinity>();
236 return local_max.lpNorm<Eigen::Infinity>();
241 [&](
const Sharder::Shard& shard) {
return shard(vector).lpNorm<1>(); });
246 [&](
const Sharder::Shard& shard) {
return shard(vector).squaredNorm(); });
256 return (shard(vector1) - shard(vector2)).squaredNorm();
260double Distance(
const VectorXd& vector1,
const VectorXd& vector2,
269 local_max[shard.
Index()] =
270 shard(vector).cwiseProduct(shard(scale)).lpNorm<Eigen::Infinity>();
272 return local_max.lpNorm<Eigen::Infinity>();
278 return shard(vector).cwiseProduct(shard(scale)).squaredNorm();
282double ScaledNorm(
const VectorXd& vector,
const VectorXd& scale,
288 const Eigen::SparseMatrix<double, Eigen::ColMajor, int64_t>& matrix,
289 const VectorXd& row_scaling_vec,
const VectorXd& col_scaling_vec,
291 CHECK_EQ(matrix.cols(), col_scaling_vec.size());
292 CHECK_EQ(matrix.rows(), row_scaling_vec.size());
293 VectorXd answer(matrix.cols());
295 auto matrix_shard = shard(matrix);
296 auto col_scaling_shard = shard(col_scaling_vec);
297 for (int64_t col_num = 0; col_num < shard(matrix).outerSize(); ++col_num) {
299 for (
decltype(matrix_shard)::InnerIterator it(matrix_shard, col_num); it;
301 max = std::max(max, std::abs(it.value() * row_scaling_vec[it.row()]));
303 shard(answer)[col_num] = max * std::abs(col_scaling_shard[col_num]);
310 const Eigen::SparseMatrix<double, Eigen::ColMajor, int64_t>& matrix,
311 const VectorXd& row_scaling_vec,
const VectorXd& col_scaling_vec,
313 CHECK_EQ(matrix.cols(), col_scaling_vec.size());
314 CHECK_EQ(matrix.rows(), row_scaling_vec.size());
315 VectorXd answer(matrix.cols());
317 auto matrix_shard = shard(matrix);
318 auto col_scaling_shard = shard(col_scaling_vec);
319 for (int64_t col_num = 0; col_num < shard(matrix).outerSize(); ++col_num) {
320 double sum_of_squares = 0.0;
321 for (
decltype(matrix_shard)::InnerIterator it(matrix_shard, col_num); it;
326 shard(answer)[col_num] =
327 std::sqrt(sum_of_squares) * std::abs(col_scaling_shard[col_num]);
absl::Duration GetDuration() const
void Start()
When Start() is called multiple times, only the most recent is used.
static T Square(const T x)
Returns the square of x.
Thread scheduling interface.
bool ParallelTrueForAllShards(const std::function< bool(const Shard &)> &func) const
double ParallelSumOverShards(const std::function< double(const Shard &)> &func) const
Runs func on each of the shards and sums the results.
int64_t ShardSize(int shard) const
int64_t ShardMass(int shard) const
Sharder(int64_t num_elements, int num_shards, Scheduler *scheduler, const std::function< int64_t(int64_t)> &element_mass)
void ParallelForEachShard(const std::function< void(const Shard &)> &func) const
Runs func on each of the shards.
int64_t NumElements() const
The number of elements that are split into shards.
Validation utilities for solvers.proto.
double SquaredNorm(const VectorXd &vector, const Sharder &sharder)
void SetZero(const Sharder &sharder, VectorXd &dest)
double ScaledNorm(const VectorXd &vector, const VectorXd &scale, const Sharder &sharder)
double Dot(const VectorXd &v1, const VectorXd &v2, const Sharder &sharder)
double SquaredDistance(const VectorXd &vector1, const VectorXd &vector2, const Sharder &sharder)
double LInfNorm(const VectorXd &vector, const Sharder &sharder)
double Distance(const VectorXd &vector1, const VectorXd &vector2, const Sharder &sharder)
VectorXd TransposedMatrixVectorProduct(const Eigen::SparseMatrix< double, Eigen::ColMajor, int64_t > &matrix, const VectorXd &vector, const Sharder &sharder)
double ScaledLInfNorm(const VectorXd &vector, const VectorXd &scale, const Sharder &sharder)
double ScaledSquaredNorm(const VectorXd &vector, const VectorXd &scale, const Sharder &sharder)
VectorXd ScaledColLInfNorm(const Eigen::SparseMatrix< double, Eigen::ColMajor, int64_t > &matrix, const VectorXd &row_scaling_vec, const VectorXd &col_scaling_vec, const Sharder &sharder)
void AddScaledVector(const double scale, const VectorXd &increment, const Sharder &sharder, VectorXd &dest)
void CoefficientWiseProductInPlace(const VectorXd &scale, const Sharder &sharder, VectorXd &dest)
void CoefficientWiseQuotientInPlace(const VectorXd &scale, const Sharder &sharder, VectorXd &dest)
VectorXd ZeroVector(const Sharder &sharder)
Like VectorXd::Zero(sharder.NumElements()).
VectorXd ScaledColL2Norm(const Eigen::SparseMatrix< double, Eigen::ColMajor, int64_t > &matrix, const VectorXd &row_scaling_vec, const VectorXd &col_scaling_vec, const Sharder &sharder)
double L1Norm(const VectorXd &vector, const Sharder &sharder)
VectorXd CloneVector(const VectorXd &vec, const Sharder &sharder)
VectorXd OnesVector(const Sharder &sharder)
Like VectorXd::Ones(sharder.NumElements()).
double Norm(const VectorXd &vector, const Sharder &sharder)
void AssignVector(const VectorXd &vec, const Sharder &sharder, VectorXd &dest)