ortools.math_opt.python.hash_model_storage

A minimal pure python implementation of model_storage.ModelStorage.

  1# Copyright 2010-2024 Google LLC
  2# Licensed under the Apache License, Version 2.0 (the "License");
  3# you may not use this file except in compliance with the License.
  4# You may obtain a copy of the License at
  5#
  6#     http://www.apache.org/licenses/LICENSE-2.0
  7#
  8# Unless required by applicable law or agreed to in writing, software
  9# distributed under the License is distributed on an "AS IS" BASIS,
 10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 11# See the License for the specific language governing permissions and
 12# limitations under the License.
 13
 14"""A minimal pure python implementation of model_storage.ModelStorage."""
 15
 16from typing import Dict, Iterable, Iterator, Optional, Set, Tuple
 17import weakref
 18
 19from ortools.math_opt import model_pb2
 20from ortools.math_opt import model_update_pb2
 21from ortools.math_opt import sparse_containers_pb2
 22from ortools.math_opt.python import model_storage
 23
 24_QuadraticKey = model_storage.QuadraticTermIdKey
 25
 26
 27class _UpdateTracker(model_storage.StorageUpdateTracker):
 28    """Tracks model updates for HashModelStorage."""
 29
 30    def __init__(self, mod: "HashModelStorage"):
 31        self.retired: bool = False
 32        self.model: "HashModelStorage" = mod
 33        # Changes for variables with id < variables_checkpoint are explicitly
 34        # tracked.
 35        self.variables_checkpoint: int = self.model._next_var_id
 36        # Changes for linear constraints with id < linear_constraints_checkpoint
 37        # are explicitly tracked.
 38        self.linear_constraints_checkpoint: int = self.model._next_lin_con_id
 39
 40        self.objective_direction: bool = False
 41        self.objective_offset: bool = False
 42
 43        self.variable_deletes: Set[int] = set()
 44        self.variable_lbs: Set[int] = set()
 45        self.variable_ubs: Set[int] = set()
 46        self.variable_integers: Set[int] = set()
 47
 48        self.linear_objective_coefficients: Set[int] = set()
 49        self.quadratic_objective_coefficients: Set[_QuadraticKey] = set()
 50
 51        self.linear_constraint_deletes: Set[int] = set()
 52        self.linear_constraint_lbs: Set[int] = set()
 53        self.linear_constraint_ubs: Set[int] = set()
 54
 55        self.linear_constraint_matrix: Set[Tuple[int, int]] = set()
 56
 57    def export_update(self) -> Optional[model_update_pb2.ModelUpdateProto]:
 58        if self.retired:
 59            raise model_storage.UsedUpdateTrackerAfterRemovalError()
 60        if (
 61            self.variables_checkpoint == self.model.next_variable_id()
 62            and (
 63                self.linear_constraints_checkpoint
 64                == self.model.next_linear_constraint_id()
 65            )
 66            and not self.objective_direction
 67            and not self.objective_offset
 68            and not self.variable_deletes
 69            and not self.variable_lbs
 70            and not self.variable_ubs
 71            and not self.variable_integers
 72            and not self.linear_objective_coefficients
 73            and not self.quadratic_objective_coefficients
 74            and not self.linear_constraint_deletes
 75            and not self.linear_constraint_lbs
 76            and not self.linear_constraint_ubs
 77            and not self.linear_constraint_matrix
 78        ):
 79            return None
 80        result = model_update_pb2.ModelUpdateProto()
 81        result.deleted_variable_ids[:] = sorted(self.variable_deletes)
 82        result.deleted_linear_constraint_ids[:] = sorted(self.linear_constraint_deletes)
 83        # Variable updates
 84        _set_sparse_double_vector(
 85            sorted((vid, self.model.get_variable_lb(vid)) for vid in self.variable_lbs),
 86            result.variable_updates.lower_bounds,
 87        )
 88        _set_sparse_double_vector(
 89            sorted((vid, self.model.get_variable_ub(vid)) for vid in self.variable_ubs),
 90            result.variable_updates.upper_bounds,
 91        )
 92        _set_sparse_bool_vector(
 93            sorted(
 94                (vid, self.model.get_variable_is_integer(vid))
 95                for vid in self.variable_integers
 96            ),
 97            result.variable_updates.integers,
 98        )
 99        # Linear constraint updates
100        _set_sparse_double_vector(
101            sorted(
102                (cid, self.model.get_linear_constraint_lb(cid))
103                for cid in self.linear_constraint_lbs
104            ),
105            result.linear_constraint_updates.lower_bounds,
106        )
107        _set_sparse_double_vector(
108            sorted(
109                (cid, self.model.get_linear_constraint_ub(cid))
110                for cid in self.linear_constraint_ubs
111            ),
112            result.linear_constraint_updates.upper_bounds,
113        )
114        # New variables and constraints
115        new_vars = []
116        for vid in range(self.variables_checkpoint, self.model.next_variable_id()):
117            var = self.model.variables.get(vid)
118            if var is not None:
119                new_vars.append((vid, var))
120        _variables_to_proto(new_vars, result.new_variables)
121        new_lin_cons = []
122        for lin_con_id in range(
123            self.linear_constraints_checkpoint,
124            self.model.next_linear_constraint_id(),
125        ):
126            lin_con = self.model.linear_constraints.get(lin_con_id)
127            if lin_con is not None:
128                new_lin_cons.append((lin_con_id, lin_con))
129        _linear_constraints_to_proto(new_lin_cons, result.new_linear_constraints)
130        # Objective update
131        if self.objective_direction:
132            result.objective_updates.direction_update = self.model.get_is_maximize()
133        if self.objective_offset:
134            result.objective_updates.offset_update = self.model.get_objective_offset()
135        _set_sparse_double_vector(
136            sorted(
137                (var, self.model.get_linear_objective_coefficient(var))
138                for var in self.linear_objective_coefficients
139            ),
140            result.objective_updates.linear_coefficients,
141        )
142        for new_var in range(self.variables_checkpoint, self.model.next_variable_id()):
143            # NOTE: the value will be 0.0 if either the coefficient is not set or the
144            # variable has been deleted. Calling
145            # model.get_linear_objective_coefficient() throws an exception if the
146            # variable has been deleted.
147            obj_coef = self.model.linear_objective_coefficient.get(new_var, 0.0)
148            if obj_coef:
149                result.objective_updates.linear_coefficients.ids.append(new_var)
150                result.objective_updates.linear_coefficients.values.append(obj_coef)
151
152        quadratic_objective_updates = [
153            (
154                key.id1,
155                key.id2,
156                self.model.get_quadratic_objective_coefficient(key.id1, key.id2),
157            )
158            for key in self.quadratic_objective_coefficients
159        ]
160        for new_var in range(self.variables_checkpoint, self.model.next_variable_id()):
161            if self.model.variable_exists(new_var):
162                for other_var in self.model.get_quadratic_objective_adjacent_variables(
163                    new_var
164                ):
165                    key = _QuadraticKey(new_var, other_var)
166                    if new_var >= other_var:
167                        key = _QuadraticKey(new_var, other_var)
168                        quadratic_objective_updates.append(
169                            (
170                                key.id1,
171                                key.id2,
172                                self.model.get_quadratic_objective_coefficient(
173                                    key.id1, key.id2
174                                ),
175                            )
176                        )
177        quadratic_objective_updates.sort()
178        if quadratic_objective_updates:
179            first_var_ids, second_var_ids, coefficients = zip(
180                *quadratic_objective_updates
181            )
182            result.objective_updates.quadratic_coefficients.row_ids[:] = first_var_ids
183            result.objective_updates.quadratic_coefficients.column_ids[:] = (
184                second_var_ids
185            )
186            result.objective_updates.quadratic_coefficients.coefficients[:] = (
187                coefficients
188            )
189        # Linear constraint matrix updates
190        matrix_updates = [
191            (l, v, self.model.get_linear_constraint_coefficient(l, v))
192            for (l, v) in self.linear_constraint_matrix
193        ]
194        for new_var in range(self.variables_checkpoint, self.model.next_variable_id()):
195            if self.model.variable_exists(new_var):
196                for lin_con in self.model.get_linear_constraints_with_variable(new_var):
197                    matrix_updates.append(
198                        (
199                            lin_con,
200                            new_var,
201                            self.model.get_linear_constraint_coefficient(
202                                lin_con, new_var
203                            ),
204                        )
205                    )
206        for new_lin_con in range(
207            self.linear_constraints_checkpoint,
208            self.model.next_linear_constraint_id(),
209        ):
210            if self.model.linear_constraint_exists(new_lin_con):
211                for var in self.model.get_variables_for_linear_constraint(new_lin_con):
212                    # We have already gotten the new variables above. Note that we do at
213                    # most twice as much work as we should from this.
214                    if var < self.variables_checkpoint:
215                        matrix_updates.append(
216                            (
217                                new_lin_con,
218                                var,
219                                self.model.get_linear_constraint_coefficient(
220                                    new_lin_con, var
221                                ),
222                            )
223                        )
224        matrix_updates.sort()
225        if matrix_updates:
226            lin_cons, variables, coefs = zip(*matrix_updates)
227            result.linear_constraint_matrix_updates.row_ids[:] = lin_cons
228            result.linear_constraint_matrix_updates.column_ids[:] = variables
229            result.linear_constraint_matrix_updates.coefficients[:] = coefs
230        return result
231
232    def advance_checkpoint(self) -> None:
233        if self.retired:
234            raise model_storage.UsedUpdateTrackerAfterRemovalError()
235        self.objective_direction = False
236        self.objective_offset = False
237        self.variable_deletes = set()
238        self.variable_lbs = set()
239        self.variable_ubs = set()
240        self.variable_integers = set()
241        self.linear_objective_coefficients = set()
242        self.linear_constraint_deletes = set()
243        self.linear_constraint_lbs = set()
244        self.linear_constraint_ubs = set()
245        self.linear_constraint_matrix = set()
246
247        self.variables_checkpoint = self.model.next_variable_id()
248        self.linear_constraints_checkpoint = self.model.next_linear_constraint_id()
249
250
251class _VariableStorage:
252    """Data specific to each decision variable in the optimization problem."""
253
254    def __init__(self, lb: float, ub: float, is_integer: bool, name: str) -> None:
255        self.lower_bound: float = lb
256        self.upper_bound: float = ub
257        self.is_integer: bool = is_integer
258        self.name: str = name
259        self.linear_constraint_nonzeros: Set[int] = set()
260
261
262class _LinearConstraintStorage:
263    """Data specific to each linear constraint in the optimization problem."""
264
265    def __init__(self, lb: float, ub: float, name: str) -> None:
266        self.lower_bound: float = lb
267        self.upper_bound: float = ub
268        self.name: str = name
269        self.variable_nonzeros: Set[int] = set()
270
271
272class _QuadraticTermStorage:
273    """Data describing quadratic terms with non-zero coefficients."""
274
275    def __init__(self) -> None:
276        self._coefficients: Dict[_QuadraticKey, float] = {}
277        # For a variable i that does not appear in a quadratic objective term with
278        # a non-zero coefficient, we may have self._adjacent_variable[i] being an
279        # empty set or i not appearing in self._adjacent_variable.keys() (e.g.
280        # depeding on whether the variable previously appeared in a quadratic term).
281        self._adjacent_variables: Dict[int, Set[int]] = {}
282
283    def __bool__(self) -> bool:
284        """Returns true if and only if there are any quadratic terms with non-zero coefficients."""
285        return bool(self._coefficients)
286
287    def get_adjacent_variables(self, variable_id: int) -> Iterator[int]:
288        """Yields the variables multiplying a variable in the stored quadratic terms.
289
290        If variable_id is not in the model the function yields the empty set.
291
292        Args:
293          variable_id: Function yields the variables multiplying variable_id in the
294            stored quadratic terms.
295
296        Yields:
297          The variables multiplying variable_id in the stored quadratic terms.
298        """
299        yield from self._adjacent_variables.get(variable_id, ())
300
301    def keys(self) -> Iterator[_QuadraticKey]:
302        """Yields the variable-pair keys associated to the stored quadratic terms."""
303        yield from self._coefficients.keys()
304
305    def coefficients(self) -> Iterator[model_storage.QuadraticEntry]:
306        """Yields the stored quadratic terms as QuadraticEntry."""
307        for key, coef in self._coefficients.items():
308            yield model_storage.QuadraticEntry(id_key=key, coefficient=coef)
309
310    def delete_variable(self, variable_id: int) -> None:
311        """Updates the data structure to consider variable_id as deleted."""
312        if variable_id not in self._adjacent_variables.keys():
313            return
314        for adjacent_variable_id in self._adjacent_variables[variable_id]:
315            if variable_id != adjacent_variable_id:
316                self._adjacent_variables[adjacent_variable_id].remove(variable_id)
317            del self._coefficients[_QuadraticKey(variable_id, adjacent_variable_id)]
318        self._adjacent_variables[variable_id].clear()
319
320    def clear(self) -> None:
321        """Clears the data structure."""
322        self._coefficients.clear()
323        self._adjacent_variables.clear()
324
325    def set_coefficient(
326        self, first_variable_id: int, second_variable_id: int, value: float
327    ) -> bool:
328        """Sets the coefficient for the quadratic term associated to the product between two variables.
329
330        The ordering of the input variables does not matter.
331
332        Args:
333          first_variable_id: The first variable in the product.
334          second_variable_id: The second variable in the product.
335          value: The value of the coefficient.
336
337        Returns:
338          True if the coefficient is updated, False otherwise.
339        """
340        key = _QuadraticKey(first_variable_id, second_variable_id)
341        if value == self._coefficients.get(key, 0.0):
342            return False
343        if value == 0.0:
344            # Assuming self._coefficients/_adjacent_variables are filled according
345            # to get_coefficient(key) != 0.0.
346            del self._coefficients[key]
347            self._adjacent_variables[first_variable_id].remove(second_variable_id)
348            if first_variable_id != second_variable_id:
349                self._adjacent_variables[second_variable_id].remove(first_variable_id)
350        else:
351            if first_variable_id not in self._adjacent_variables.keys():
352                self._adjacent_variables[first_variable_id] = set()
353            if second_variable_id not in self._adjacent_variables.keys():
354                self._adjacent_variables[second_variable_id] = set()
355            self._coefficients[key] = value
356            self._adjacent_variables[first_variable_id].add(second_variable_id)
357            self._adjacent_variables[second_variable_id].add(first_variable_id)
358        return True
359
360    def get_coefficient(self, first_variable_id: int, second_variable_id: int) -> float:
361        """Gets the objective coefficient for the quadratic term associated to the product between two variables.
362
363        The ordering of the input variables does not matter.
364
365        Args:
366          first_variable_id: The first variable in the product.
367          second_variable_id: The second variable in the product.
368
369        Returns:
370          The value of the coefficient.
371        """
372        return self._coefficients.get(
373            _QuadraticKey(first_variable_id, second_variable_id), 0.0
374        )
375
376
377class HashModelStorage(model_storage.ModelStorage):
378    """A simple, pure python implementation of ModelStorage.
379
380    Attributes:
381        _linear_constraint_matrix: A dictionary with (linear_constraint_id,
382          variable_id) keys and numeric values, representing the matrix A for the
383          constraints lb_c <= A*x <= ub_c. Invariant: the values have no zeros.
384        linear_objective_coefficient: A dictionary with variable_id keys and
385          numeric values, representing the linear terms in the objective.
386          Invariant: the values have no zeros.
387        _quadratic_objective_coefficients: A data structure containing quadratic
388          terms in the objective.
389    """
390
391    def __init__(self, name: str = "") -> None:
392        super().__init__()
393        self._name: str = name
394        self.variables: Dict[int, _VariableStorage] = {}
395        self.linear_constraints: Dict[int, _LinearConstraintStorage] = {}
396        self._linear_constraint_matrix: Dict[Tuple[int, int], float] = {}  #
397        self._is_maximize: bool = False
398        self._objective_offset: float = 0.0
399        self.linear_objective_coefficient: Dict[int, float] = {}
400        self._quadratic_objective_coefficients: _QuadraticTermStorage = (
401            _QuadraticTermStorage()
402        )
403        self._next_var_id: int = 0
404        self._next_lin_con_id: int = 0
405        self._update_trackers: weakref.WeakSet[_UpdateTracker] = weakref.WeakSet()
406
407    @property
408    def name(self) -> str:
409        return self._name
410
411    def add_variable(self, lb: float, ub: float, is_integer: bool, name: str) -> int:
412        var_id = self._next_var_id
413        self._next_var_id += 1
414        self.variables[var_id] = _VariableStorage(lb, ub, is_integer, name)
415        return var_id
416
417    def delete_variable(self, variable_id: int) -> None:
418        self._check_variable_id(variable_id)
419        variable = self.variables[variable_id]
420        # First update the watchers
421        for watcher in self._update_trackers:
422            if variable_id < watcher.variables_checkpoint:
423                watcher.variable_deletes.add(variable_id)
424                watcher.variable_lbs.discard(variable_id)
425                watcher.variable_ubs.discard(variable_id)
426                watcher.variable_integers.discard(variable_id)
427                watcher.linear_objective_coefficients.discard(variable_id)
428                for (
429                    other_variable_id
430                ) in self._quadratic_objective_coefficients.get_adjacent_variables(
431                    variable_id
432                ):
433                    key = _QuadraticKey(variable_id, other_variable_id)
434                    watcher.quadratic_objective_coefficients.discard(key)
435                for lin_con_id in variable.linear_constraint_nonzeros:
436                    if lin_con_id < watcher.linear_constraints_checkpoint:
437                        watcher.linear_constraint_matrix.discard(
438                            (lin_con_id, variable_id)
439                        )
440        # Then update self.
441        for lin_con_id in variable.linear_constraint_nonzeros:
442            self.linear_constraints[lin_con_id].variable_nonzeros.remove(variable_id)
443            del self._linear_constraint_matrix[(lin_con_id, variable_id)]
444        del self.variables[variable_id]
445        self.linear_objective_coefficient.pop(variable_id, None)
446        self._quadratic_objective_coefficients.delete_variable(variable_id)
447
448    def variable_exists(self, variable_id: int) -> bool:
449        return variable_id in self.variables
450
451    def next_variable_id(self) -> int:
452        return self._next_var_id
453
454    def set_variable_lb(self, variable_id: int, lb: float) -> None:
455        self._check_variable_id(variable_id)
456        if lb == self.variables[variable_id].lower_bound:
457            return
458        self.variables[variable_id].lower_bound = lb
459        for watcher in self._update_trackers:
460            if variable_id < watcher.variables_checkpoint:
461                watcher.variable_lbs.add(variable_id)
462
463    def set_variable_ub(self, variable_id: int, ub: float) -> None:
464        self._check_variable_id(variable_id)
465        if ub == self.variables[variable_id].upper_bound:
466            return
467        self.variables[variable_id].upper_bound = ub
468        for watcher in self._update_trackers:
469            if variable_id < watcher.variables_checkpoint:
470                watcher.variable_ubs.add(variable_id)
471
472    def set_variable_is_integer(self, variable_id: int, is_integer: bool) -> None:
473        self._check_variable_id(variable_id)
474        if is_integer == self.variables[variable_id].is_integer:
475            return
476        self.variables[variable_id].is_integer = is_integer
477        for watcher in self._update_trackers:
478            if variable_id < watcher.variables_checkpoint:
479                watcher.variable_integers.add(variable_id)
480
481    def get_variable_lb(self, variable_id: int) -> float:
482        self._check_variable_id(variable_id)
483        return self.variables[variable_id].lower_bound
484
485    def get_variable_ub(self, variable_id: int) -> float:
486        self._check_variable_id(variable_id)
487        return self.variables[variable_id].upper_bound
488
489    def get_variable_is_integer(self, variable_id: int) -> bool:
490        self._check_variable_id(variable_id)
491        return self.variables[variable_id].is_integer
492
493    def get_variable_name(self, variable_id: int) -> str:
494        self._check_variable_id(variable_id)
495        return self.variables[variable_id].name
496
497    def get_variables(self) -> Iterator[int]:
498        yield from self.variables.keys()
499
500    def add_linear_constraint(self, lb: float, ub: float, name: str) -> int:
501        lin_con_id = self._next_lin_con_id
502        self._next_lin_con_id += 1
503        self.linear_constraints[lin_con_id] = _LinearConstraintStorage(lb, ub, name)
504        return lin_con_id
505
506    def delete_linear_constraint(self, linear_constraint_id: int) -> None:
507        self._check_linear_constraint_id(linear_constraint_id)
508        con = self.linear_constraints[linear_constraint_id]
509        # First update the watchers
510        for watcher in self._update_trackers:
511            if linear_constraint_id < watcher.linear_constraints_checkpoint:
512                watcher.linear_constraint_deletes.add(linear_constraint_id)
513                watcher.linear_constraint_lbs.discard(linear_constraint_id)
514                watcher.linear_constraint_ubs.discard(linear_constraint_id)
515                for var_id in con.variable_nonzeros:
516                    if var_id < watcher.variables_checkpoint:
517                        watcher.linear_constraint_matrix.discard(
518                            (linear_constraint_id, var_id)
519                        )
520        # Then update self.
521        for var_id in con.variable_nonzeros:
522            self.variables[var_id].linear_constraint_nonzeros.remove(
523                linear_constraint_id
524            )
525            del self._linear_constraint_matrix[(linear_constraint_id, var_id)]
526        del self.linear_constraints[linear_constraint_id]
527
528    def linear_constraint_exists(self, linear_constraint_id: int) -> bool:
529        return linear_constraint_id in self.linear_constraints
530
531    def next_linear_constraint_id(self) -> int:
532        return self._next_lin_con_id
533
534    def set_linear_constraint_lb(self, linear_constraint_id: int, lb: float) -> None:
535        self._check_linear_constraint_id(linear_constraint_id)
536        if lb == self.linear_constraints[linear_constraint_id].lower_bound:
537            return
538        self.linear_constraints[linear_constraint_id].lower_bound = lb
539        for watcher in self._update_trackers:
540            if linear_constraint_id < watcher.linear_constraints_checkpoint:
541                watcher.linear_constraint_lbs.add(linear_constraint_id)
542
543    def set_linear_constraint_ub(self, linear_constraint_id: int, ub: float) -> None:
544        self._check_linear_constraint_id(linear_constraint_id)
545        if ub == self.linear_constraints[linear_constraint_id].upper_bound:
546            return
547        self.linear_constraints[linear_constraint_id].upper_bound = ub
548        for watcher in self._update_trackers:
549            if linear_constraint_id < watcher.linear_constraints_checkpoint:
550                watcher.linear_constraint_ubs.add(linear_constraint_id)
551
552    def get_linear_constraint_lb(self, linear_constraint_id: int) -> float:
553        self._check_linear_constraint_id(linear_constraint_id)
554        return self.linear_constraints[linear_constraint_id].lower_bound
555
556    def get_linear_constraint_ub(self, linear_constraint_id: int) -> float:
557        self._check_linear_constraint_id(linear_constraint_id)
558        return self.linear_constraints[linear_constraint_id].upper_bound
559
560    def get_linear_constraint_name(self, linear_constraint_id: int) -> str:
561        self._check_linear_constraint_id(linear_constraint_id)
562        return self.linear_constraints[linear_constraint_id].name
563
564    def get_linear_constraints(self) -> Iterator[int]:
565        yield from self.linear_constraints.keys()
566
567    def set_linear_constraint_coefficient(
568        self, linear_constraint_id: int, variable_id: int, value: float
569    ) -> None:
570        self._check_linear_constraint_id(linear_constraint_id)
571        self._check_variable_id(variable_id)
572        if value == self._linear_constraint_matrix.get(
573            (linear_constraint_id, variable_id), 0.0
574        ):
575            return
576        if value == 0.0:
577            self._linear_constraint_matrix.pop(
578                (linear_constraint_id, variable_id), None
579            )
580            self.variables[variable_id].linear_constraint_nonzeros.discard(
581                linear_constraint_id
582            )
583            self.linear_constraints[linear_constraint_id].variable_nonzeros.discard(
584                variable_id
585            )
586        else:
587            self._linear_constraint_matrix[(linear_constraint_id, variable_id)] = value
588            self.variables[variable_id].linear_constraint_nonzeros.add(
589                linear_constraint_id
590            )
591            self.linear_constraints[linear_constraint_id].variable_nonzeros.add(
592                variable_id
593            )
594        for watcher in self._update_trackers:
595            if (
596                variable_id < watcher.variables_checkpoint
597                and linear_constraint_id < watcher.linear_constraints_checkpoint
598            ):
599                watcher.linear_constraint_matrix.add(
600                    (linear_constraint_id, variable_id)
601                )
602
603    def get_linear_constraint_coefficient(
604        self, linear_constraint_id: int, variable_id: int
605    ) -> float:
606        self._check_linear_constraint_id(linear_constraint_id)
607        self._check_variable_id(variable_id)
608        return self._linear_constraint_matrix.get(
609            (linear_constraint_id, variable_id), 0.0
610        )
611
612    def get_linear_constraints_with_variable(self, variable_id: int) -> Iterator[int]:
613        self._check_variable_id(variable_id)
614        yield from self.variables[variable_id].linear_constraint_nonzeros
615
616    def get_variables_for_linear_constraint(
617        self, linear_constraint_id: int
618    ) -> Iterator[int]:
619        self._check_linear_constraint_id(linear_constraint_id)
620        yield from self.linear_constraints[linear_constraint_id].variable_nonzeros
621
622    def get_linear_constraint_matrix_entries(
623        self,
624    ) -> Iterator[model_storage.LinearConstraintMatrixIdEntry]:
625        for (constraint, variable), coef in self._linear_constraint_matrix.items():
626            yield model_storage.LinearConstraintMatrixIdEntry(
627                linear_constraint_id=constraint,
628                variable_id=variable,
629                coefficient=coef,
630            )
631
632    def clear_objective(self) -> None:
633        for variable_id in self.linear_objective_coefficient:
634            for watcher in self._update_trackers:
635                if variable_id < watcher.variables_checkpoint:
636                    watcher.linear_objective_coefficients.add(variable_id)
637        self.linear_objective_coefficient.clear()
638        for key in self._quadratic_objective_coefficients.keys():
639            for watcher in self._update_trackers:
640                if key.id2 < watcher.variables_checkpoint:
641                    watcher.quadratic_objective_coefficients.add(key)
642        self._quadratic_objective_coefficients.clear()
643        self.set_objective_offset(0.0)
644
645    def set_linear_objective_coefficient(self, variable_id: int, value: float) -> None:
646        self._check_variable_id(variable_id)
647        if value == self.linear_objective_coefficient.get(variable_id, 0.0):
648            return
649        if value == 0.0:
650            self.linear_objective_coefficient.pop(variable_id, None)
651        else:
652            self.linear_objective_coefficient[variable_id] = value
653        for watcher in self._update_trackers:
654            if variable_id < watcher.variables_checkpoint:
655                watcher.linear_objective_coefficients.add(variable_id)
656
657    def get_linear_objective_coefficient(self, variable_id: int) -> float:
658        self._check_variable_id(variable_id)
659        return self.linear_objective_coefficient.get(variable_id, 0.0)
660
661    def get_linear_objective_coefficients(
662        self,
663    ) -> Iterator[model_storage.LinearObjectiveEntry]:
664        for var_id, coef in self.linear_objective_coefficient.items():
665            yield model_storage.LinearObjectiveEntry(
666                variable_id=var_id, coefficient=coef
667            )
668
669    def set_quadratic_objective_coefficient(
670        self, first_variable_id: int, second_variable_id: int, value: float
671    ) -> None:
672        self._check_variable_id(first_variable_id)
673        self._check_variable_id(second_variable_id)
674        updated = self._quadratic_objective_coefficients.set_coefficient(
675            first_variable_id, second_variable_id, value
676        )
677        if updated:
678            for watcher in self._update_trackers:
679                if (
680                    max(first_variable_id, second_variable_id)
681                    < watcher.variables_checkpoint
682                ):
683                    watcher.quadratic_objective_coefficients.add(
684                        _QuadraticKey(first_variable_id, second_variable_id)
685                    )
686
687    def get_quadratic_objective_coefficient(
688        self, first_variable_id: int, second_variable_id: int
689    ) -> float:
690        self._check_variable_id(first_variable_id)
691        self._check_variable_id(second_variable_id)
692        return self._quadratic_objective_coefficients.get_coefficient(
693            first_variable_id, second_variable_id
694        )
695
696    def get_quadratic_objective_coefficients(
697        self,
698    ) -> Iterator[model_storage.QuadraticEntry]:
699        yield from self._quadratic_objective_coefficients.coefficients()
700
701    def get_quadratic_objective_adjacent_variables(
702        self, variable_id: int
703    ) -> Iterator[int]:
704        self._check_variable_id(variable_id)
705        yield from self._quadratic_objective_coefficients.get_adjacent_variables(
706            variable_id
707        )
708
709    def set_is_maximize(self, is_maximize: bool) -> None:
710        if self._is_maximize == is_maximize:
711            return
712        self._is_maximize = is_maximize
713        for watcher in self._update_trackers:
714            watcher.objective_direction = True
715
716    def get_is_maximize(self) -> bool:
717        return self._is_maximize
718
719    def set_objective_offset(self, offset: float) -> None:
720        if self._objective_offset == offset:
721            return
722        self._objective_offset = offset
723        for watcher in self._update_trackers:
724            watcher.objective_offset = True
725
726    def get_objective_offset(self) -> float:
727        return self._objective_offset
728
729    def export_model(self) -> model_pb2.ModelProto:
730        m: model_pb2.ModelProto = model_pb2.ModelProto()
731        m.name = self._name
732        _variables_to_proto(self.variables.items(), m.variables)
733        _linear_constraints_to_proto(
734            self.linear_constraints.items(), m.linear_constraints
735        )
736        m.objective.maximize = self._is_maximize
737        m.objective.offset = self._objective_offset
738        if self.linear_objective_coefficient:
739            obj_ids, obj_coefs = zip(*sorted(self.linear_objective_coefficient.items()))
740            m.objective.linear_coefficients.ids.extend(obj_ids)
741            m.objective.linear_coefficients.values.extend(obj_coefs)
742        if self._quadratic_objective_coefficients:
743            first_var_ids, second_var_ids, coefficients = zip(
744                *sorted(
745                    [
746                        (entry.id_key.id1, entry.id_key.id2, entry.coefficient)
747                        for entry in self._quadratic_objective_coefficients.coefficients()
748                    ]
749                )
750            )
751            m.objective.quadratic_coefficients.row_ids.extend(first_var_ids)
752            m.objective.quadratic_coefficients.column_ids.extend(second_var_ids)
753            m.objective.quadratic_coefficients.coefficients.extend(coefficients)
754        if self._linear_constraint_matrix:
755            flat_matrix_items = [
756                (con_id, var_id, coef)
757                for ((con_id, var_id), coef) in self._linear_constraint_matrix.items()
758            ]
759            lin_con_ids, var_ids, lin_con_coefs = zip(*sorted(flat_matrix_items))
760            m.linear_constraint_matrix.row_ids.extend(lin_con_ids)
761            m.linear_constraint_matrix.column_ids.extend(var_ids)
762            m.linear_constraint_matrix.coefficients.extend(lin_con_coefs)
763        return m
764
765    def add_update_tracker(self) -> model_storage.StorageUpdateTracker:
766        tracker = _UpdateTracker(self)
767        self._update_trackers.add(tracker)
768        return tracker
769
770    def remove_update_tracker(
771        self, tracker: model_storage.StorageUpdateTracker
772    ) -> None:
773        self._update_trackers.remove(tracker)
774        tracker.retired = True
775
776    def _check_variable_id(self, variable_id: int) -> None:
777        if variable_id not in self.variables:
778            raise model_storage.BadVariableIdError(variable_id)
779
780    def _check_linear_constraint_id(self, linear_constraint_id: int) -> None:
781        if linear_constraint_id not in self.linear_constraints:
782            raise model_storage.BadLinearConstraintIdError(linear_constraint_id)
783
784
785def _set_sparse_double_vector(
786    id_value_pairs: Iterable[Tuple[int, float]],
787    proto: sparse_containers_pb2.SparseDoubleVectorProto,
788) -> None:
789    """id_value_pairs must be sorted, proto is filled."""
790    if not id_value_pairs:
791        return
792    ids, values = zip(*id_value_pairs)
793    proto.ids[:] = ids
794    proto.values[:] = values
795
796
797def _set_sparse_bool_vector(
798    id_value_pairs: Iterable[Tuple[int, bool]],
799    proto: sparse_containers_pb2.SparseBoolVectorProto,
800) -> None:
801    """id_value_pairs must be sorted, proto is filled."""
802    if not id_value_pairs:
803        return
804    ids, values = zip(*id_value_pairs)
805    proto.ids[:] = ids
806    proto.values[:] = values
807
808
809def _variables_to_proto(
810    variables: Iterable[Tuple[int, _VariableStorage]],
811    proto: model_pb2.VariablesProto,
812) -> None:
813    """Exports variables to proto."""
814    has_named_var = False
815    for _, var_storage in variables:
816        if var_storage.name:
817            has_named_var = True
818            break
819    for var_id, var_storage in variables:
820        proto.ids.append(var_id)
821        proto.lower_bounds.append(var_storage.lower_bound)
822        proto.upper_bounds.append(var_storage.upper_bound)
823        proto.integers.append(var_storage.is_integer)
824        if has_named_var:
825            proto.names.append(var_storage.name)
826
827
828def _linear_constraints_to_proto(
829    linear_constraints: Iterable[Tuple[int, _LinearConstraintStorage]],
830    proto: model_pb2.LinearConstraintsProto,
831) -> None:
832    """Exports variables to proto."""
833    has_named_lin_con = False
834    for _, lin_con_storage in linear_constraints:
835        if lin_con_storage.name:
836            has_named_lin_con = True
837            break
838    for lin_con_id, lin_con_storage in linear_constraints:
839        proto.ids.append(lin_con_id)
840        proto.lower_bounds.append(lin_con_storage.lower_bound)
841        proto.upper_bounds.append(lin_con_storage.upper_bound)
842        if has_named_lin_con:
843            proto.names.append(lin_con_storage.name)
class HashModelStorage(ortools.math_opt.python.model_storage.ModelStorage):
378class HashModelStorage(model_storage.ModelStorage):
379    """A simple, pure python implementation of ModelStorage.
380
381    Attributes:
382        _linear_constraint_matrix: A dictionary with (linear_constraint_id,
383          variable_id) keys and numeric values, representing the matrix A for the
384          constraints lb_c <= A*x <= ub_c. Invariant: the values have no zeros.
385        linear_objective_coefficient: A dictionary with variable_id keys and
386          numeric values, representing the linear terms in the objective.
387          Invariant: the values have no zeros.
388        _quadratic_objective_coefficients: A data structure containing quadratic
389          terms in the objective.
390    """
391
392    def __init__(self, name: str = "") -> None:
393        super().__init__()
394        self._name: str = name
395        self.variables: Dict[int, _VariableStorage] = {}
396        self.linear_constraints: Dict[int, _LinearConstraintStorage] = {}
397        self._linear_constraint_matrix: Dict[Tuple[int, int], float] = {}  #
398        self._is_maximize: bool = False
399        self._objective_offset: float = 0.0
400        self.linear_objective_coefficient: Dict[int, float] = {}
401        self._quadratic_objective_coefficients: _QuadraticTermStorage = (
402            _QuadraticTermStorage()
403        )
404        self._next_var_id: int = 0
405        self._next_lin_con_id: int = 0
406        self._update_trackers: weakref.WeakSet[_UpdateTracker] = weakref.WeakSet()
407
408    @property
409    def name(self) -> str:
410        return self._name
411
412    def add_variable(self, lb: float, ub: float, is_integer: bool, name: str) -> int:
413        var_id = self._next_var_id
414        self._next_var_id += 1
415        self.variables[var_id] = _VariableStorage(lb, ub, is_integer, name)
416        return var_id
417
418    def delete_variable(self, variable_id: int) -> None:
419        self._check_variable_id(variable_id)
420        variable = self.variables[variable_id]
421        # First update the watchers
422        for watcher in self._update_trackers:
423            if variable_id < watcher.variables_checkpoint:
424                watcher.variable_deletes.add(variable_id)
425                watcher.variable_lbs.discard(variable_id)
426                watcher.variable_ubs.discard(variable_id)
427                watcher.variable_integers.discard(variable_id)
428                watcher.linear_objective_coefficients.discard(variable_id)
429                for (
430                    other_variable_id
431                ) in self._quadratic_objective_coefficients.get_adjacent_variables(
432                    variable_id
433                ):
434                    key = _QuadraticKey(variable_id, other_variable_id)
435                    watcher.quadratic_objective_coefficients.discard(key)
436                for lin_con_id in variable.linear_constraint_nonzeros:
437                    if lin_con_id < watcher.linear_constraints_checkpoint:
438                        watcher.linear_constraint_matrix.discard(
439                            (lin_con_id, variable_id)
440                        )
441        # Then update self.
442        for lin_con_id in variable.linear_constraint_nonzeros:
443            self.linear_constraints[lin_con_id].variable_nonzeros.remove(variable_id)
444            del self._linear_constraint_matrix[(lin_con_id, variable_id)]
445        del self.variables[variable_id]
446        self.linear_objective_coefficient.pop(variable_id, None)
447        self._quadratic_objective_coefficients.delete_variable(variable_id)
448
449    def variable_exists(self, variable_id: int) -> bool:
450        return variable_id in self.variables
451
452    def next_variable_id(self) -> int:
453        return self._next_var_id
454
455    def set_variable_lb(self, variable_id: int, lb: float) -> None:
456        self._check_variable_id(variable_id)
457        if lb == self.variables[variable_id].lower_bound:
458            return
459        self.variables[variable_id].lower_bound = lb
460        for watcher in self._update_trackers:
461            if variable_id < watcher.variables_checkpoint:
462                watcher.variable_lbs.add(variable_id)
463
464    def set_variable_ub(self, variable_id: int, ub: float) -> None:
465        self._check_variable_id(variable_id)
466        if ub == self.variables[variable_id].upper_bound:
467            return
468        self.variables[variable_id].upper_bound = ub
469        for watcher in self._update_trackers:
470            if variable_id < watcher.variables_checkpoint:
471                watcher.variable_ubs.add(variable_id)
472
473    def set_variable_is_integer(self, variable_id: int, is_integer: bool) -> None:
474        self._check_variable_id(variable_id)
475        if is_integer == self.variables[variable_id].is_integer:
476            return
477        self.variables[variable_id].is_integer = is_integer
478        for watcher in self._update_trackers:
479            if variable_id < watcher.variables_checkpoint:
480                watcher.variable_integers.add(variable_id)
481
482    def get_variable_lb(self, variable_id: int) -> float:
483        self._check_variable_id(variable_id)
484        return self.variables[variable_id].lower_bound
485
486    def get_variable_ub(self, variable_id: int) -> float:
487        self._check_variable_id(variable_id)
488        return self.variables[variable_id].upper_bound
489
490    def get_variable_is_integer(self, variable_id: int) -> bool:
491        self._check_variable_id(variable_id)
492        return self.variables[variable_id].is_integer
493
494    def get_variable_name(self, variable_id: int) -> str:
495        self._check_variable_id(variable_id)
496        return self.variables[variable_id].name
497
498    def get_variables(self) -> Iterator[int]:
499        yield from self.variables.keys()
500
501    def add_linear_constraint(self, lb: float, ub: float, name: str) -> int:
502        lin_con_id = self._next_lin_con_id
503        self._next_lin_con_id += 1
504        self.linear_constraints[lin_con_id] = _LinearConstraintStorage(lb, ub, name)
505        return lin_con_id
506
507    def delete_linear_constraint(self, linear_constraint_id: int) -> None:
508        self._check_linear_constraint_id(linear_constraint_id)
509        con = self.linear_constraints[linear_constraint_id]
510        # First update the watchers
511        for watcher in self._update_trackers:
512            if linear_constraint_id < watcher.linear_constraints_checkpoint:
513                watcher.linear_constraint_deletes.add(linear_constraint_id)
514                watcher.linear_constraint_lbs.discard(linear_constraint_id)
515                watcher.linear_constraint_ubs.discard(linear_constraint_id)
516                for var_id in con.variable_nonzeros:
517                    if var_id < watcher.variables_checkpoint:
518                        watcher.linear_constraint_matrix.discard(
519                            (linear_constraint_id, var_id)
520                        )
521        # Then update self.
522        for var_id in con.variable_nonzeros:
523            self.variables[var_id].linear_constraint_nonzeros.remove(
524                linear_constraint_id
525            )
526            del self._linear_constraint_matrix[(linear_constraint_id, var_id)]
527        del self.linear_constraints[linear_constraint_id]
528
529    def linear_constraint_exists(self, linear_constraint_id: int) -> bool:
530        return linear_constraint_id in self.linear_constraints
531
532    def next_linear_constraint_id(self) -> int:
533        return self._next_lin_con_id
534
535    def set_linear_constraint_lb(self, linear_constraint_id: int, lb: float) -> None:
536        self._check_linear_constraint_id(linear_constraint_id)
537        if lb == self.linear_constraints[linear_constraint_id].lower_bound:
538            return
539        self.linear_constraints[linear_constraint_id].lower_bound = lb
540        for watcher in self._update_trackers:
541            if linear_constraint_id < watcher.linear_constraints_checkpoint:
542                watcher.linear_constraint_lbs.add(linear_constraint_id)
543
544    def set_linear_constraint_ub(self, linear_constraint_id: int, ub: float) -> None:
545        self._check_linear_constraint_id(linear_constraint_id)
546        if ub == self.linear_constraints[linear_constraint_id].upper_bound:
547            return
548        self.linear_constraints[linear_constraint_id].upper_bound = ub
549        for watcher in self._update_trackers:
550            if linear_constraint_id < watcher.linear_constraints_checkpoint:
551                watcher.linear_constraint_ubs.add(linear_constraint_id)
552
553    def get_linear_constraint_lb(self, linear_constraint_id: int) -> float:
554        self._check_linear_constraint_id(linear_constraint_id)
555        return self.linear_constraints[linear_constraint_id].lower_bound
556
557    def get_linear_constraint_ub(self, linear_constraint_id: int) -> float:
558        self._check_linear_constraint_id(linear_constraint_id)
559        return self.linear_constraints[linear_constraint_id].upper_bound
560
561    def get_linear_constraint_name(self, linear_constraint_id: int) -> str:
562        self._check_linear_constraint_id(linear_constraint_id)
563        return self.linear_constraints[linear_constraint_id].name
564
565    def get_linear_constraints(self) -> Iterator[int]:
566        yield from self.linear_constraints.keys()
567
568    def set_linear_constraint_coefficient(
569        self, linear_constraint_id: int, variable_id: int, value: float
570    ) -> None:
571        self._check_linear_constraint_id(linear_constraint_id)
572        self._check_variable_id(variable_id)
573        if value == self._linear_constraint_matrix.get(
574            (linear_constraint_id, variable_id), 0.0
575        ):
576            return
577        if value == 0.0:
578            self._linear_constraint_matrix.pop(
579                (linear_constraint_id, variable_id), None
580            )
581            self.variables[variable_id].linear_constraint_nonzeros.discard(
582                linear_constraint_id
583            )
584            self.linear_constraints[linear_constraint_id].variable_nonzeros.discard(
585                variable_id
586            )
587        else:
588            self._linear_constraint_matrix[(linear_constraint_id, variable_id)] = value
589            self.variables[variable_id].linear_constraint_nonzeros.add(
590                linear_constraint_id
591            )
592            self.linear_constraints[linear_constraint_id].variable_nonzeros.add(
593                variable_id
594            )
595        for watcher in self._update_trackers:
596            if (
597                variable_id < watcher.variables_checkpoint
598                and linear_constraint_id < watcher.linear_constraints_checkpoint
599            ):
600                watcher.linear_constraint_matrix.add(
601                    (linear_constraint_id, variable_id)
602                )
603
604    def get_linear_constraint_coefficient(
605        self, linear_constraint_id: int, variable_id: int
606    ) -> float:
607        self._check_linear_constraint_id(linear_constraint_id)
608        self._check_variable_id(variable_id)
609        return self._linear_constraint_matrix.get(
610            (linear_constraint_id, variable_id), 0.0
611        )
612
613    def get_linear_constraints_with_variable(self, variable_id: int) -> Iterator[int]:
614        self._check_variable_id(variable_id)
615        yield from self.variables[variable_id].linear_constraint_nonzeros
616
617    def get_variables_for_linear_constraint(
618        self, linear_constraint_id: int
619    ) -> Iterator[int]:
620        self._check_linear_constraint_id(linear_constraint_id)
621        yield from self.linear_constraints[linear_constraint_id].variable_nonzeros
622
623    def get_linear_constraint_matrix_entries(
624        self,
625    ) -> Iterator[model_storage.LinearConstraintMatrixIdEntry]:
626        for (constraint, variable), coef in self._linear_constraint_matrix.items():
627            yield model_storage.LinearConstraintMatrixIdEntry(
628                linear_constraint_id=constraint,
629                variable_id=variable,
630                coefficient=coef,
631            )
632
633    def clear_objective(self) -> None:
634        for variable_id in self.linear_objective_coefficient:
635            for watcher in self._update_trackers:
636                if variable_id < watcher.variables_checkpoint:
637                    watcher.linear_objective_coefficients.add(variable_id)
638        self.linear_objective_coefficient.clear()
639        for key in self._quadratic_objective_coefficients.keys():
640            for watcher in self._update_trackers:
641                if key.id2 < watcher.variables_checkpoint:
642                    watcher.quadratic_objective_coefficients.add(key)
643        self._quadratic_objective_coefficients.clear()
644        self.set_objective_offset(0.0)
645
646    def set_linear_objective_coefficient(self, variable_id: int, value: float) -> None:
647        self._check_variable_id(variable_id)
648        if value == self.linear_objective_coefficient.get(variable_id, 0.0):
649            return
650        if value == 0.0:
651            self.linear_objective_coefficient.pop(variable_id, None)
652        else:
653            self.linear_objective_coefficient[variable_id] = value
654        for watcher in self._update_trackers:
655            if variable_id < watcher.variables_checkpoint:
656                watcher.linear_objective_coefficients.add(variable_id)
657
658    def get_linear_objective_coefficient(self, variable_id: int) -> float:
659        self._check_variable_id(variable_id)
660        return self.linear_objective_coefficient.get(variable_id, 0.0)
661
662    def get_linear_objective_coefficients(
663        self,
664    ) -> Iterator[model_storage.LinearObjectiveEntry]:
665        for var_id, coef in self.linear_objective_coefficient.items():
666            yield model_storage.LinearObjectiveEntry(
667                variable_id=var_id, coefficient=coef
668            )
669
670    def set_quadratic_objective_coefficient(
671        self, first_variable_id: int, second_variable_id: int, value: float
672    ) -> None:
673        self._check_variable_id(first_variable_id)
674        self._check_variable_id(second_variable_id)
675        updated = self._quadratic_objective_coefficients.set_coefficient(
676            first_variable_id, second_variable_id, value
677        )
678        if updated:
679            for watcher in self._update_trackers:
680                if (
681                    max(first_variable_id, second_variable_id)
682                    < watcher.variables_checkpoint
683                ):
684                    watcher.quadratic_objective_coefficients.add(
685                        _QuadraticKey(first_variable_id, second_variable_id)
686                    )
687
688    def get_quadratic_objective_coefficient(
689        self, first_variable_id: int, second_variable_id: int
690    ) -> float:
691        self._check_variable_id(first_variable_id)
692        self._check_variable_id(second_variable_id)
693        return self._quadratic_objective_coefficients.get_coefficient(
694            first_variable_id, second_variable_id
695        )
696
697    def get_quadratic_objective_coefficients(
698        self,
699    ) -> Iterator[model_storage.QuadraticEntry]:
700        yield from self._quadratic_objective_coefficients.coefficients()
701
702    def get_quadratic_objective_adjacent_variables(
703        self, variable_id: int
704    ) -> Iterator[int]:
705        self._check_variable_id(variable_id)
706        yield from self._quadratic_objective_coefficients.get_adjacent_variables(
707            variable_id
708        )
709
710    def set_is_maximize(self, is_maximize: bool) -> None:
711        if self._is_maximize == is_maximize:
712            return
713        self._is_maximize = is_maximize
714        for watcher in self._update_trackers:
715            watcher.objective_direction = True
716
717    def get_is_maximize(self) -> bool:
718        return self._is_maximize
719
720    def set_objective_offset(self, offset: float) -> None:
721        if self._objective_offset == offset:
722            return
723        self._objective_offset = offset
724        for watcher in self._update_trackers:
725            watcher.objective_offset = True
726
727    def get_objective_offset(self) -> float:
728        return self._objective_offset
729
730    def export_model(self) -> model_pb2.ModelProto:
731        m: model_pb2.ModelProto = model_pb2.ModelProto()
732        m.name = self._name
733        _variables_to_proto(self.variables.items(), m.variables)
734        _linear_constraints_to_proto(
735            self.linear_constraints.items(), m.linear_constraints
736        )
737        m.objective.maximize = self._is_maximize
738        m.objective.offset = self._objective_offset
739        if self.linear_objective_coefficient:
740            obj_ids, obj_coefs = zip(*sorted(self.linear_objective_coefficient.items()))
741            m.objective.linear_coefficients.ids.extend(obj_ids)
742            m.objective.linear_coefficients.values.extend(obj_coefs)
743        if self._quadratic_objective_coefficients:
744            first_var_ids, second_var_ids, coefficients = zip(
745                *sorted(
746                    [
747                        (entry.id_key.id1, entry.id_key.id2, entry.coefficient)
748                        for entry in self._quadratic_objective_coefficients.coefficients()
749                    ]
750                )
751            )
752            m.objective.quadratic_coefficients.row_ids.extend(first_var_ids)
753            m.objective.quadratic_coefficients.column_ids.extend(second_var_ids)
754            m.objective.quadratic_coefficients.coefficients.extend(coefficients)
755        if self._linear_constraint_matrix:
756            flat_matrix_items = [
757                (con_id, var_id, coef)
758                for ((con_id, var_id), coef) in self._linear_constraint_matrix.items()
759            ]
760            lin_con_ids, var_ids, lin_con_coefs = zip(*sorted(flat_matrix_items))
761            m.linear_constraint_matrix.row_ids.extend(lin_con_ids)
762            m.linear_constraint_matrix.column_ids.extend(var_ids)
763            m.linear_constraint_matrix.coefficients.extend(lin_con_coefs)
764        return m
765
766    def add_update_tracker(self) -> model_storage.StorageUpdateTracker:
767        tracker = _UpdateTracker(self)
768        self._update_trackers.add(tracker)
769        return tracker
770
771    def remove_update_tracker(
772        self, tracker: model_storage.StorageUpdateTracker
773    ) -> None:
774        self._update_trackers.remove(tracker)
775        tracker.retired = True
776
777    def _check_variable_id(self, variable_id: int) -> None:
778        if variable_id not in self.variables:
779            raise model_storage.BadVariableIdError(variable_id)
780
781    def _check_linear_constraint_id(self, linear_constraint_id: int) -> None:
782        if linear_constraint_id not in self.linear_constraints:
783            raise model_storage.BadLinearConstraintIdError(linear_constraint_id)

A simple, pure python implementation of ModelStorage.

Attributes:
  • _linear_constraint_matrix: A dictionary with (linear_constraint_id, variable_id) keys and numeric values, representing the matrix A for the constraints lb_c <= A*x <= ub_c. Invariant: the values have no zeros.
  • linear_objective_coefficient: A dictionary with variable_id keys and numeric values, representing the linear terms in the objective. Invariant: the values have no zeros.
  • _quadratic_objective_coefficients: A data structure containing quadratic terms in the objective.
HashModelStorage(name: str = '')
392    def __init__(self, name: str = "") -> None:
393        super().__init__()
394        self._name: str = name
395        self.variables: Dict[int, _VariableStorage] = {}
396        self.linear_constraints: Dict[int, _LinearConstraintStorage] = {}
397        self._linear_constraint_matrix: Dict[Tuple[int, int], float] = {}  #
398        self._is_maximize: bool = False
399        self._objective_offset: float = 0.0
400        self.linear_objective_coefficient: Dict[int, float] = {}
401        self._quadratic_objective_coefficients: _QuadraticTermStorage = (
402            _QuadraticTermStorage()
403        )
404        self._next_var_id: int = 0
405        self._next_lin_con_id: int = 0
406        self._update_trackers: weakref.WeakSet[_UpdateTracker] = weakref.WeakSet()
variables: Dict[int, ortools.math_opt.python.hash_model_storage._VariableStorage]
linear_constraints: Dict[int, ortools.math_opt.python.hash_model_storage._LinearConstraintStorage]
linear_objective_coefficient: Dict[int, float]
name: str
408    @property
409    def name(self) -> str:
410        return self._name
def add_variable(self, lb: float, ub: float, is_integer: bool, name: str) -> int:
412    def add_variable(self, lb: float, ub: float, is_integer: bool, name: str) -> int:
413        var_id = self._next_var_id
414        self._next_var_id += 1
415        self.variables[var_id] = _VariableStorage(lb, ub, is_integer, name)
416        return var_id
def delete_variable(self, variable_id: int) -> None:
418    def delete_variable(self, variable_id: int) -> None:
419        self._check_variable_id(variable_id)
420        variable = self.variables[variable_id]
421        # First update the watchers
422        for watcher in self._update_trackers:
423            if variable_id < watcher.variables_checkpoint:
424                watcher.variable_deletes.add(variable_id)
425                watcher.variable_lbs.discard(variable_id)
426                watcher.variable_ubs.discard(variable_id)
427                watcher.variable_integers.discard(variable_id)
428                watcher.linear_objective_coefficients.discard(variable_id)
429                for (
430                    other_variable_id
431                ) in self._quadratic_objective_coefficients.get_adjacent_variables(
432                    variable_id
433                ):
434                    key = _QuadraticKey(variable_id, other_variable_id)
435                    watcher.quadratic_objective_coefficients.discard(key)
436                for lin_con_id in variable.linear_constraint_nonzeros:
437                    if lin_con_id < watcher.linear_constraints_checkpoint:
438                        watcher.linear_constraint_matrix.discard(
439                            (lin_con_id, variable_id)
440                        )
441        # Then update self.
442        for lin_con_id in variable.linear_constraint_nonzeros:
443            self.linear_constraints[lin_con_id].variable_nonzeros.remove(variable_id)
444            del self._linear_constraint_matrix[(lin_con_id, variable_id)]
445        del self.variables[variable_id]
446        self.linear_objective_coefficient.pop(variable_id, None)
447        self._quadratic_objective_coefficients.delete_variable(variable_id)
def variable_exists(self, variable_id: int) -> bool:
449    def variable_exists(self, variable_id: int) -> bool:
450        return variable_id in self.variables
def next_variable_id(self) -> int:
452    def next_variable_id(self) -> int:
453        return self._next_var_id
def set_variable_lb(self, variable_id: int, lb: float) -> None:
455    def set_variable_lb(self, variable_id: int, lb: float) -> None:
456        self._check_variable_id(variable_id)
457        if lb == self.variables[variable_id].lower_bound:
458            return
459        self.variables[variable_id].lower_bound = lb
460        for watcher in self._update_trackers:
461            if variable_id < watcher.variables_checkpoint:
462                watcher.variable_lbs.add(variable_id)
def set_variable_ub(self, variable_id: int, ub: float) -> None:
464    def set_variable_ub(self, variable_id: int, ub: float) -> None:
465        self._check_variable_id(variable_id)
466        if ub == self.variables[variable_id].upper_bound:
467            return
468        self.variables[variable_id].upper_bound = ub
469        for watcher in self._update_trackers:
470            if variable_id < watcher.variables_checkpoint:
471                watcher.variable_ubs.add(variable_id)
def set_variable_is_integer(self, variable_id: int, is_integer: bool) -> None:
473    def set_variable_is_integer(self, variable_id: int, is_integer: bool) -> None:
474        self._check_variable_id(variable_id)
475        if is_integer == self.variables[variable_id].is_integer:
476            return
477        self.variables[variable_id].is_integer = is_integer
478        for watcher in self._update_trackers:
479            if variable_id < watcher.variables_checkpoint:
480                watcher.variable_integers.add(variable_id)
def get_variable_lb(self, variable_id: int) -> float:
482    def get_variable_lb(self, variable_id: int) -> float:
483        self._check_variable_id(variable_id)
484        return self.variables[variable_id].lower_bound
def get_variable_ub(self, variable_id: int) -> float:
486    def get_variable_ub(self, variable_id: int) -> float:
487        self._check_variable_id(variable_id)
488        return self.variables[variable_id].upper_bound
def get_variable_is_integer(self, variable_id: int) -> bool:
490    def get_variable_is_integer(self, variable_id: int) -> bool:
491        self._check_variable_id(variable_id)
492        return self.variables[variable_id].is_integer
def get_variable_name(self, variable_id: int) -> str:
494    def get_variable_name(self, variable_id: int) -> str:
495        self._check_variable_id(variable_id)
496        return self.variables[variable_id].name
def get_variables(self) -> Iterator[int]:
498    def get_variables(self) -> Iterator[int]:
499        yield from self.variables.keys()

Yields the variable ids in order of creation.

def add_linear_constraint(self, lb: float, ub: float, name: str) -> int:
501    def add_linear_constraint(self, lb: float, ub: float, name: str) -> int:
502        lin_con_id = self._next_lin_con_id
503        self._next_lin_con_id += 1
504        self.linear_constraints[lin_con_id] = _LinearConstraintStorage(lb, ub, name)
505        return lin_con_id
def delete_linear_constraint(self, linear_constraint_id: int) -> None:
507    def delete_linear_constraint(self, linear_constraint_id: int) -> None:
508        self._check_linear_constraint_id(linear_constraint_id)
509        con = self.linear_constraints[linear_constraint_id]
510        # First update the watchers
511        for watcher in self._update_trackers:
512            if linear_constraint_id < watcher.linear_constraints_checkpoint:
513                watcher.linear_constraint_deletes.add(linear_constraint_id)
514                watcher.linear_constraint_lbs.discard(linear_constraint_id)
515                watcher.linear_constraint_ubs.discard(linear_constraint_id)
516                for var_id in con.variable_nonzeros:
517                    if var_id < watcher.variables_checkpoint:
518                        watcher.linear_constraint_matrix.discard(
519                            (linear_constraint_id, var_id)
520                        )
521        # Then update self.
522        for var_id in con.variable_nonzeros:
523            self.variables[var_id].linear_constraint_nonzeros.remove(
524                linear_constraint_id
525            )
526            del self._linear_constraint_matrix[(linear_constraint_id, var_id)]
527        del self.linear_constraints[linear_constraint_id]
def linear_constraint_exists(self, linear_constraint_id: int) -> bool:
529    def linear_constraint_exists(self, linear_constraint_id: int) -> bool:
530        return linear_constraint_id in self.linear_constraints
def next_linear_constraint_id(self) -> int:
532    def next_linear_constraint_id(self) -> int:
533        return self._next_lin_con_id
def set_linear_constraint_lb(self, linear_constraint_id: int, lb: float) -> None:
535    def set_linear_constraint_lb(self, linear_constraint_id: int, lb: float) -> None:
536        self._check_linear_constraint_id(linear_constraint_id)
537        if lb == self.linear_constraints[linear_constraint_id].lower_bound:
538            return
539        self.linear_constraints[linear_constraint_id].lower_bound = lb
540        for watcher in self._update_trackers:
541            if linear_constraint_id < watcher.linear_constraints_checkpoint:
542                watcher.linear_constraint_lbs.add(linear_constraint_id)
def set_linear_constraint_ub(self, linear_constraint_id: int, ub: float) -> None:
544    def set_linear_constraint_ub(self, linear_constraint_id: int, ub: float) -> None:
545        self._check_linear_constraint_id(linear_constraint_id)
546        if ub == self.linear_constraints[linear_constraint_id].upper_bound:
547            return
548        self.linear_constraints[linear_constraint_id].upper_bound = ub
549        for watcher in self._update_trackers:
550            if linear_constraint_id < watcher.linear_constraints_checkpoint:
551                watcher.linear_constraint_ubs.add(linear_constraint_id)
def get_linear_constraint_lb(self, linear_constraint_id: int) -> float:
553    def get_linear_constraint_lb(self, linear_constraint_id: int) -> float:
554        self._check_linear_constraint_id(linear_constraint_id)
555        return self.linear_constraints[linear_constraint_id].lower_bound
def get_linear_constraint_ub(self, linear_constraint_id: int) -> float:
557    def get_linear_constraint_ub(self, linear_constraint_id: int) -> float:
558        self._check_linear_constraint_id(linear_constraint_id)
559        return self.linear_constraints[linear_constraint_id].upper_bound
def get_linear_constraint_name(self, linear_constraint_id: int) -> str:
561    def get_linear_constraint_name(self, linear_constraint_id: int) -> str:
562        self._check_linear_constraint_id(linear_constraint_id)
563        return self.linear_constraints[linear_constraint_id].name
def get_linear_constraints(self) -> Iterator[int]:
565    def get_linear_constraints(self) -> Iterator[int]:
566        yield from self.linear_constraints.keys()

Yields the linear constraint ids in order of creation.

def set_linear_constraint_coefficient(self, linear_constraint_id: int, variable_id: int, value: float) -> None:
568    def set_linear_constraint_coefficient(
569        self, linear_constraint_id: int, variable_id: int, value: float
570    ) -> None:
571        self._check_linear_constraint_id(linear_constraint_id)
572        self._check_variable_id(variable_id)
573        if value == self._linear_constraint_matrix.get(
574            (linear_constraint_id, variable_id), 0.0
575        ):
576            return
577        if value == 0.0:
578            self._linear_constraint_matrix.pop(
579                (linear_constraint_id, variable_id), None
580            )
581            self.variables[variable_id].linear_constraint_nonzeros.discard(
582                linear_constraint_id
583            )
584            self.linear_constraints[linear_constraint_id].variable_nonzeros.discard(
585                variable_id
586            )
587        else:
588            self._linear_constraint_matrix[(linear_constraint_id, variable_id)] = value
589            self.variables[variable_id].linear_constraint_nonzeros.add(
590                linear_constraint_id
591            )
592            self.linear_constraints[linear_constraint_id].variable_nonzeros.add(
593                variable_id
594            )
595        for watcher in self._update_trackers:
596            if (
597                variable_id < watcher.variables_checkpoint
598                and linear_constraint_id < watcher.linear_constraints_checkpoint
599            ):
600                watcher.linear_constraint_matrix.add(
601                    (linear_constraint_id, variable_id)
602                )
def get_linear_constraint_coefficient(self, linear_constraint_id: int, variable_id: int) -> float:
604    def get_linear_constraint_coefficient(
605        self, linear_constraint_id: int, variable_id: int
606    ) -> float:
607        self._check_linear_constraint_id(linear_constraint_id)
608        self._check_variable_id(variable_id)
609        return self._linear_constraint_matrix.get(
610            (linear_constraint_id, variable_id), 0.0
611        )
def get_linear_constraints_with_variable(self, variable_id: int) -> Iterator[int]:
613    def get_linear_constraints_with_variable(self, variable_id: int) -> Iterator[int]:
614        self._check_variable_id(variable_id)
615        yield from self.variables[variable_id].linear_constraint_nonzeros

Yields the linear constraints with nonzero coefficient for a variable in undefined order.

def get_variables_for_linear_constraint(self, linear_constraint_id: int) -> Iterator[int]:
617    def get_variables_for_linear_constraint(
618        self, linear_constraint_id: int
619    ) -> Iterator[int]:
620        self._check_linear_constraint_id(linear_constraint_id)
621        yield from self.linear_constraints[linear_constraint_id].variable_nonzeros

Yields the variables with nonzero coefficient in a linear constraint in undefined order.

def get_linear_constraint_matrix_entries( self) -> Iterator[ortools.math_opt.python.model_storage.LinearConstraintMatrixIdEntry]:
623    def get_linear_constraint_matrix_entries(
624        self,
625    ) -> Iterator[model_storage.LinearConstraintMatrixIdEntry]:
626        for (constraint, variable), coef in self._linear_constraint_matrix.items():
627            yield model_storage.LinearConstraintMatrixIdEntry(
628                linear_constraint_id=constraint,
629                variable_id=variable,
630                coefficient=coef,
631            )

Yields the nonzero elements of the linear constraint matrix in undefined order.

def clear_objective(self) -> None:
633    def clear_objective(self) -> None:
634        for variable_id in self.linear_objective_coefficient:
635            for watcher in self._update_trackers:
636                if variable_id < watcher.variables_checkpoint:
637                    watcher.linear_objective_coefficients.add(variable_id)
638        self.linear_objective_coefficient.clear()
639        for key in self._quadratic_objective_coefficients.keys():
640            for watcher in self._update_trackers:
641                if key.id2 < watcher.variables_checkpoint:
642                    watcher.quadratic_objective_coefficients.add(key)
643        self._quadratic_objective_coefficients.clear()
644        self.set_objective_offset(0.0)

Clears objective coefficients and offset. Does not change direction.

def set_linear_objective_coefficient(self, variable_id: int, value: float) -> None:
646    def set_linear_objective_coefficient(self, variable_id: int, value: float) -> None:
647        self._check_variable_id(variable_id)
648        if value == self.linear_objective_coefficient.get(variable_id, 0.0):
649            return
650        if value == 0.0:
651            self.linear_objective_coefficient.pop(variable_id, None)
652        else:
653            self.linear_objective_coefficient[variable_id] = value
654        for watcher in self._update_trackers:
655            if variable_id < watcher.variables_checkpoint:
656                watcher.linear_objective_coefficients.add(variable_id)
def get_linear_objective_coefficient(self, variable_id: int) -> float:
658    def get_linear_objective_coefficient(self, variable_id: int) -> float:
659        self._check_variable_id(variable_id)
660        return self.linear_objective_coefficient.get(variable_id, 0.0)
def get_linear_objective_coefficients( self) -> Iterator[ortools.math_opt.python.model_storage.LinearObjectiveEntry]:
662    def get_linear_objective_coefficients(
663        self,
664    ) -> Iterator[model_storage.LinearObjectiveEntry]:
665        for var_id, coef in self.linear_objective_coefficient.items():
666            yield model_storage.LinearObjectiveEntry(
667                variable_id=var_id, coefficient=coef
668            )

Yields the nonzero linear objective terms in undefined order.

def set_quadratic_objective_coefficient( self, first_variable_id: int, second_variable_id: int, value: float) -> None:
670    def set_quadratic_objective_coefficient(
671        self, first_variable_id: int, second_variable_id: int, value: float
672    ) -> None:
673        self._check_variable_id(first_variable_id)
674        self._check_variable_id(second_variable_id)
675        updated = self._quadratic_objective_coefficients.set_coefficient(
676            first_variable_id, second_variable_id, value
677        )
678        if updated:
679            for watcher in self._update_trackers:
680                if (
681                    max(first_variable_id, second_variable_id)
682                    < watcher.variables_checkpoint
683                ):
684                    watcher.quadratic_objective_coefficients.add(
685                        _QuadraticKey(first_variable_id, second_variable_id)
686                    )

Sets the objective coefficient for the product of two variables.

The ordering of the input variables does not matter.

Arguments:
  • first_variable_id: The first variable in the product.
  • second_variable_id: The second variable in the product.
  • value: The value of the coefficient.
Raises:
  • BadVariableIdError if first_variable_id or second_variable_id are not in
  • the model.
def get_quadratic_objective_coefficient(self, first_variable_id: int, second_variable_id: int) -> float:
688    def get_quadratic_objective_coefficient(
689        self, first_variable_id: int, second_variable_id: int
690    ) -> float:
691        self._check_variable_id(first_variable_id)
692        self._check_variable_id(second_variable_id)
693        return self._quadratic_objective_coefficients.get_coefficient(
694            first_variable_id, second_variable_id
695        )

Gets the objective coefficient for the product of two variables.

The ordering of the input variables does not matter.

Arguments:
  • first_variable_id: The first variable in the product.
  • second_variable_id: The second variable in the product.
Raises:
  • BadVariableIdError if first_variable_id or second_variable_id are not in
  • the model.
Returns:

The value of the coefficient.

def get_quadratic_objective_coefficients(self) -> Iterator[ortools.math_opt.python.model_storage.QuadraticEntry]:
697    def get_quadratic_objective_coefficients(
698        self,
699    ) -> Iterator[model_storage.QuadraticEntry]:
700        yield from self._quadratic_objective_coefficients.coefficients()

Yields the nonzero quadratic objective terms in undefined order.

def get_quadratic_objective_adjacent_variables(self, variable_id: int) -> Iterator[int]:
702    def get_quadratic_objective_adjacent_variables(
703        self, variable_id: int
704    ) -> Iterator[int]:
705        self._check_variable_id(variable_id)
706        yield from self._quadratic_objective_coefficients.get_adjacent_variables(
707            variable_id
708        )

Yields the variables multiplying a variable in the objective function.

Variables are returned in an unspecified order.

For example, if variables x and y have ids 0 and 1 respectively, and the quadratic portion of the objective is x^2 + 2 x*y, then get_quadratic_objective_adjacent_variables(0) = (0, 1).

Arguments:
  • variable_id: Function yields the variables multiplying variable_id in the objective function.
Yields:

The variables multiplying variable_id in the objective function.

Raises:
  • BadVariableIdError if variable_id is not in the model.
def set_is_maximize(self, is_maximize: bool) -> None:
710    def set_is_maximize(self, is_maximize: bool) -> None:
711        if self._is_maximize == is_maximize:
712            return
713        self._is_maximize = is_maximize
714        for watcher in self._update_trackers:
715            watcher.objective_direction = True
def get_is_maximize(self) -> bool:
717    def get_is_maximize(self) -> bool:
718        return self._is_maximize
def set_objective_offset(self, offset: float) -> None:
720    def set_objective_offset(self, offset: float) -> None:
721        if self._objective_offset == offset:
722            return
723        self._objective_offset = offset
724        for watcher in self._update_trackers:
725            watcher.objective_offset = True
def get_objective_offset(self) -> float:
727    def get_objective_offset(self) -> float:
728        return self._objective_offset
def export_model(self) -> ortools.math_opt.model_pb2.ModelProto:
730    def export_model(self) -> model_pb2.ModelProto:
731        m: model_pb2.ModelProto = model_pb2.ModelProto()
732        m.name = self._name
733        _variables_to_proto(self.variables.items(), m.variables)
734        _linear_constraints_to_proto(
735            self.linear_constraints.items(), m.linear_constraints
736        )
737        m.objective.maximize = self._is_maximize
738        m.objective.offset = self._objective_offset
739        if self.linear_objective_coefficient:
740            obj_ids, obj_coefs = zip(*sorted(self.linear_objective_coefficient.items()))
741            m.objective.linear_coefficients.ids.extend(obj_ids)
742            m.objective.linear_coefficients.values.extend(obj_coefs)
743        if self._quadratic_objective_coefficients:
744            first_var_ids, second_var_ids, coefficients = zip(
745                *sorted(
746                    [
747                        (entry.id_key.id1, entry.id_key.id2, entry.coefficient)
748                        for entry in self._quadratic_objective_coefficients.coefficients()
749                    ]
750                )
751            )
752            m.objective.quadratic_coefficients.row_ids.extend(first_var_ids)
753            m.objective.quadratic_coefficients.column_ids.extend(second_var_ids)
754            m.objective.quadratic_coefficients.coefficients.extend(coefficients)
755        if self._linear_constraint_matrix:
756            flat_matrix_items = [
757                (con_id, var_id, coef)
758                for ((con_id, var_id), coef) in self._linear_constraint_matrix.items()
759            ]
760            lin_con_ids, var_ids, lin_con_coefs = zip(*sorted(flat_matrix_items))
761            m.linear_constraint_matrix.row_ids.extend(lin_con_ids)
762            m.linear_constraint_matrix.column_ids.extend(var_ids)
763            m.linear_constraint_matrix.coefficients.extend(lin_con_coefs)
764        return m
def add_update_tracker(self) -> ortools.math_opt.python.model_storage.StorageUpdateTracker:
766    def add_update_tracker(self) -> model_storage.StorageUpdateTracker:
767        tracker = _UpdateTracker(self)
768        self._update_trackers.add(tracker)
769        return tracker

Creates a StorageUpdateTracker registered with self to view model changes.

def remove_update_tracker( self, tracker: ortools.math_opt.python.model_storage.StorageUpdateTracker) -> None:
771    def remove_update_tracker(
772        self, tracker: model_storage.StorageUpdateTracker
773    ) -> None:
774        self._update_trackers.remove(tracker)
775        tracker.retired = True

Stops tracker from getting updates on model changes in self.

An error will be raised if tracker is not a StorageUpdateTracker created by this Model that has not previously been removed.

Using an UpdateTracker (via checkpoint or export_update) after it has been removed will result in an error.

Arguments:
  • tracker: The StorageUpdateTracker to unregister.
Raises:
  • KeyError: The tracker was created by another model or was already removed.