ortools.math_opt.python.hash_model_storage
A minimal pure python implementation of model_storage.ModelStorage.
1# Copyright 2010-2024 Google LLC 2# Licensed under the Apache License, Version 2.0 (the "License"); 3# you may not use this file except in compliance with the License. 4# You may obtain a copy of the License at 5# 6# http://www.apache.org/licenses/LICENSE-2.0 7# 8# Unless required by applicable law or agreed to in writing, software 9# distributed under the License is distributed on an "AS IS" BASIS, 10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11# See the License for the specific language governing permissions and 12# limitations under the License. 13 14"""A minimal pure python implementation of model_storage.ModelStorage.""" 15 16from typing import Dict, Iterable, Iterator, Optional, Set, Tuple 17import weakref 18 19from ortools.math_opt import model_pb2 20from ortools.math_opt import model_update_pb2 21from ortools.math_opt import sparse_containers_pb2 22from ortools.math_opt.python import model_storage 23 24_QuadraticKey = model_storage.QuadraticTermIdKey 25 26 27class _UpdateTracker(model_storage.StorageUpdateTracker): 28 """Tracks model updates for HashModelStorage.""" 29 30 def __init__(self, mod: "HashModelStorage"): 31 self.retired: bool = False 32 self.model: "HashModelStorage" = mod 33 # Changes for variables with id < variables_checkpoint are explicitly 34 # tracked. 35 self.variables_checkpoint: int = self.model._next_var_id 36 # Changes for linear constraints with id < linear_constraints_checkpoint 37 # are explicitly tracked. 38 self.linear_constraints_checkpoint: int = self.model._next_lin_con_id 39 40 self.objective_direction: bool = False 41 self.objective_offset: bool = False 42 43 self.variable_deletes: Set[int] = set() 44 self.variable_lbs: Set[int] = set() 45 self.variable_ubs: Set[int] = set() 46 self.variable_integers: Set[int] = set() 47 48 self.linear_objective_coefficients: Set[int] = set() 49 self.quadratic_objective_coefficients: Set[_QuadraticKey] = set() 50 51 self.linear_constraint_deletes: Set[int] = set() 52 self.linear_constraint_lbs: Set[int] = set() 53 self.linear_constraint_ubs: Set[int] = set() 54 55 self.linear_constraint_matrix: Set[Tuple[int, int]] = set() 56 57 def export_update(self) -> Optional[model_update_pb2.ModelUpdateProto]: 58 if self.retired: 59 raise model_storage.UsedUpdateTrackerAfterRemovalError() 60 if ( 61 self.variables_checkpoint == self.model.next_variable_id() 62 and ( 63 self.linear_constraints_checkpoint 64 == self.model.next_linear_constraint_id() 65 ) 66 and not self.objective_direction 67 and not self.objective_offset 68 and not self.variable_deletes 69 and not self.variable_lbs 70 and not self.variable_ubs 71 and not self.variable_integers 72 and not self.linear_objective_coefficients 73 and not self.quadratic_objective_coefficients 74 and not self.linear_constraint_deletes 75 and not self.linear_constraint_lbs 76 and not self.linear_constraint_ubs 77 and not self.linear_constraint_matrix 78 ): 79 return None 80 result = model_update_pb2.ModelUpdateProto() 81 result.deleted_variable_ids[:] = sorted(self.variable_deletes) 82 result.deleted_linear_constraint_ids[:] = sorted(self.linear_constraint_deletes) 83 # Variable updates 84 _set_sparse_double_vector( 85 sorted((vid, self.model.get_variable_lb(vid)) for vid in self.variable_lbs), 86 result.variable_updates.lower_bounds, 87 ) 88 _set_sparse_double_vector( 89 sorted((vid, self.model.get_variable_ub(vid)) for vid in self.variable_ubs), 90 result.variable_updates.upper_bounds, 91 ) 92 _set_sparse_bool_vector( 93 sorted( 94 (vid, self.model.get_variable_is_integer(vid)) 95 for vid in self.variable_integers 96 ), 97 result.variable_updates.integers, 98 ) 99 # Linear constraint updates 100 _set_sparse_double_vector( 101 sorted( 102 (cid, self.model.get_linear_constraint_lb(cid)) 103 for cid in self.linear_constraint_lbs 104 ), 105 result.linear_constraint_updates.lower_bounds, 106 ) 107 _set_sparse_double_vector( 108 sorted( 109 (cid, self.model.get_linear_constraint_ub(cid)) 110 for cid in self.linear_constraint_ubs 111 ), 112 result.linear_constraint_updates.upper_bounds, 113 ) 114 # New variables and constraints 115 new_vars = [] 116 for vid in range(self.variables_checkpoint, self.model.next_variable_id()): 117 var = self.model.variables.get(vid) 118 if var is not None: 119 new_vars.append((vid, var)) 120 _variables_to_proto(new_vars, result.new_variables) 121 new_lin_cons = [] 122 for lin_con_id in range( 123 self.linear_constraints_checkpoint, 124 self.model.next_linear_constraint_id(), 125 ): 126 lin_con = self.model.linear_constraints.get(lin_con_id) 127 if lin_con is not None: 128 new_lin_cons.append((lin_con_id, lin_con)) 129 _linear_constraints_to_proto(new_lin_cons, result.new_linear_constraints) 130 # Objective update 131 if self.objective_direction: 132 result.objective_updates.direction_update = self.model.get_is_maximize() 133 if self.objective_offset: 134 result.objective_updates.offset_update = self.model.get_objective_offset() 135 _set_sparse_double_vector( 136 sorted( 137 (var, self.model.get_linear_objective_coefficient(var)) 138 for var in self.linear_objective_coefficients 139 ), 140 result.objective_updates.linear_coefficients, 141 ) 142 for new_var in range(self.variables_checkpoint, self.model.next_variable_id()): 143 # NOTE: the value will be 0.0 if either the coefficient is not set or the 144 # variable has been deleted. Calling 145 # model.get_linear_objective_coefficient() throws an exception if the 146 # variable has been deleted. 147 obj_coef = self.model.linear_objective_coefficient.get(new_var, 0.0) 148 if obj_coef: 149 result.objective_updates.linear_coefficients.ids.append(new_var) 150 result.objective_updates.linear_coefficients.values.append(obj_coef) 151 152 quadratic_objective_updates = [ 153 ( 154 key.id1, 155 key.id2, 156 self.model.get_quadratic_objective_coefficient(key.id1, key.id2), 157 ) 158 for key in self.quadratic_objective_coefficients 159 ] 160 for new_var in range(self.variables_checkpoint, self.model.next_variable_id()): 161 if self.model.variable_exists(new_var): 162 for other_var in self.model.get_quadratic_objective_adjacent_variables( 163 new_var 164 ): 165 key = _QuadraticKey(new_var, other_var) 166 if new_var >= other_var: 167 key = _QuadraticKey(new_var, other_var) 168 quadratic_objective_updates.append( 169 ( 170 key.id1, 171 key.id2, 172 self.model.get_quadratic_objective_coefficient( 173 key.id1, key.id2 174 ), 175 ) 176 ) 177 quadratic_objective_updates.sort() 178 if quadratic_objective_updates: 179 first_var_ids, second_var_ids, coefficients = zip( 180 *quadratic_objective_updates 181 ) 182 result.objective_updates.quadratic_coefficients.row_ids[:] = first_var_ids 183 result.objective_updates.quadratic_coefficients.column_ids[:] = ( 184 second_var_ids 185 ) 186 result.objective_updates.quadratic_coefficients.coefficients[:] = ( 187 coefficients 188 ) 189 # Linear constraint matrix updates 190 matrix_updates = [ 191 (l, v, self.model.get_linear_constraint_coefficient(l, v)) 192 for (l, v) in self.linear_constraint_matrix 193 ] 194 for new_var in range(self.variables_checkpoint, self.model.next_variable_id()): 195 if self.model.variable_exists(new_var): 196 for lin_con in self.model.get_linear_constraints_with_variable(new_var): 197 matrix_updates.append( 198 ( 199 lin_con, 200 new_var, 201 self.model.get_linear_constraint_coefficient( 202 lin_con, new_var 203 ), 204 ) 205 ) 206 for new_lin_con in range( 207 self.linear_constraints_checkpoint, 208 self.model.next_linear_constraint_id(), 209 ): 210 if self.model.linear_constraint_exists(new_lin_con): 211 for var in self.model.get_variables_for_linear_constraint(new_lin_con): 212 # We have already gotten the new variables above. Note that we do at 213 # most twice as much work as we should from this. 214 if var < self.variables_checkpoint: 215 matrix_updates.append( 216 ( 217 new_lin_con, 218 var, 219 self.model.get_linear_constraint_coefficient( 220 new_lin_con, var 221 ), 222 ) 223 ) 224 matrix_updates.sort() 225 if matrix_updates: 226 lin_cons, variables, coefs = zip(*matrix_updates) 227 result.linear_constraint_matrix_updates.row_ids[:] = lin_cons 228 result.linear_constraint_matrix_updates.column_ids[:] = variables 229 result.linear_constraint_matrix_updates.coefficients[:] = coefs 230 return result 231 232 def advance_checkpoint(self) -> None: 233 if self.retired: 234 raise model_storage.UsedUpdateTrackerAfterRemovalError() 235 self.objective_direction = False 236 self.objective_offset = False 237 self.variable_deletes = set() 238 self.variable_lbs = set() 239 self.variable_ubs = set() 240 self.variable_integers = set() 241 self.linear_objective_coefficients = set() 242 self.linear_constraint_deletes = set() 243 self.linear_constraint_lbs = set() 244 self.linear_constraint_ubs = set() 245 self.linear_constraint_matrix = set() 246 247 self.variables_checkpoint = self.model.next_variable_id() 248 self.linear_constraints_checkpoint = self.model.next_linear_constraint_id() 249 250 251class _VariableStorage: 252 """Data specific to each decision variable in the optimization problem.""" 253 254 def __init__(self, lb: float, ub: float, is_integer: bool, name: str) -> None: 255 self.lower_bound: float = lb 256 self.upper_bound: float = ub 257 self.is_integer: bool = is_integer 258 self.name: str = name 259 self.linear_constraint_nonzeros: Set[int] = set() 260 261 262class _LinearConstraintStorage: 263 """Data specific to each linear constraint in the optimization problem.""" 264 265 def __init__(self, lb: float, ub: float, name: str) -> None: 266 self.lower_bound: float = lb 267 self.upper_bound: float = ub 268 self.name: str = name 269 self.variable_nonzeros: Set[int] = set() 270 271 272class _QuadraticTermStorage: 273 """Data describing quadratic terms with non-zero coefficients.""" 274 275 def __init__(self) -> None: 276 self._coefficients: Dict[_QuadraticKey, float] = {} 277 # For a variable i that does not appear in a quadratic objective term with 278 # a non-zero coefficient, we may have self._adjacent_variable[i] being an 279 # empty set or i not appearing in self._adjacent_variable.keys() (e.g. 280 # depeding on whether the variable previously appeared in a quadratic term). 281 self._adjacent_variables: Dict[int, Set[int]] = {} 282 283 def __bool__(self) -> bool: 284 """Returns true if and only if there are any quadratic terms with non-zero coefficients.""" 285 return bool(self._coefficients) 286 287 def get_adjacent_variables(self, variable_id: int) -> Iterator[int]: 288 """Yields the variables multiplying a variable in the stored quadratic terms. 289 290 If variable_id is not in the model the function yields the empty set. 291 292 Args: 293 variable_id: Function yields the variables multiplying variable_id in the 294 stored quadratic terms. 295 296 Yields: 297 The variables multiplying variable_id in the stored quadratic terms. 298 """ 299 yield from self._adjacent_variables.get(variable_id, ()) 300 301 def keys(self) -> Iterator[_QuadraticKey]: 302 """Yields the variable-pair keys associated to the stored quadratic terms.""" 303 yield from self._coefficients.keys() 304 305 def coefficients(self) -> Iterator[model_storage.QuadraticEntry]: 306 """Yields the stored quadratic terms as QuadraticEntry.""" 307 for key, coef in self._coefficients.items(): 308 yield model_storage.QuadraticEntry(id_key=key, coefficient=coef) 309 310 def delete_variable(self, variable_id: int) -> None: 311 """Updates the data structure to consider variable_id as deleted.""" 312 if variable_id not in self._adjacent_variables.keys(): 313 return 314 for adjacent_variable_id in self._adjacent_variables[variable_id]: 315 if variable_id != adjacent_variable_id: 316 self._adjacent_variables[adjacent_variable_id].remove(variable_id) 317 del self._coefficients[_QuadraticKey(variable_id, adjacent_variable_id)] 318 self._adjacent_variables[variable_id].clear() 319 320 def clear(self) -> None: 321 """Clears the data structure.""" 322 self._coefficients.clear() 323 self._adjacent_variables.clear() 324 325 def set_coefficient( 326 self, first_variable_id: int, second_variable_id: int, value: float 327 ) -> bool: 328 """Sets the coefficient for the quadratic term associated to the product between two variables. 329 330 The ordering of the input variables does not matter. 331 332 Args: 333 first_variable_id: The first variable in the product. 334 second_variable_id: The second variable in the product. 335 value: The value of the coefficient. 336 337 Returns: 338 True if the coefficient is updated, False otherwise. 339 """ 340 key = _QuadraticKey(first_variable_id, second_variable_id) 341 if value == self._coefficients.get(key, 0.0): 342 return False 343 if value == 0.0: 344 # Assuming self._coefficients/_adjacent_variables are filled according 345 # to get_coefficient(key) != 0.0. 346 del self._coefficients[key] 347 self._adjacent_variables[first_variable_id].remove(second_variable_id) 348 if first_variable_id != second_variable_id: 349 self._adjacent_variables[second_variable_id].remove(first_variable_id) 350 else: 351 if first_variable_id not in self._adjacent_variables.keys(): 352 self._adjacent_variables[first_variable_id] = set() 353 if second_variable_id not in self._adjacent_variables.keys(): 354 self._adjacent_variables[second_variable_id] = set() 355 self._coefficients[key] = value 356 self._adjacent_variables[first_variable_id].add(second_variable_id) 357 self._adjacent_variables[second_variable_id].add(first_variable_id) 358 return True 359 360 def get_coefficient(self, first_variable_id: int, second_variable_id: int) -> float: 361 """Gets the objective coefficient for the quadratic term associated to the product between two variables. 362 363 The ordering of the input variables does not matter. 364 365 Args: 366 first_variable_id: The first variable in the product. 367 second_variable_id: The second variable in the product. 368 369 Returns: 370 The value of the coefficient. 371 """ 372 return self._coefficients.get( 373 _QuadraticKey(first_variable_id, second_variable_id), 0.0 374 ) 375 376 377class HashModelStorage(model_storage.ModelStorage): 378 """A simple, pure python implementation of ModelStorage. 379 380 Attributes: 381 _linear_constraint_matrix: A dictionary with (linear_constraint_id, 382 variable_id) keys and numeric values, representing the matrix A for the 383 constraints lb_c <= A*x <= ub_c. Invariant: the values have no zeros. 384 linear_objective_coefficient: A dictionary with variable_id keys and 385 numeric values, representing the linear terms in the objective. 386 Invariant: the values have no zeros. 387 _quadratic_objective_coefficients: A data structure containing quadratic 388 terms in the objective. 389 """ 390 391 def __init__(self, name: str = "") -> None: 392 super().__init__() 393 self._name: str = name 394 self.variables: Dict[int, _VariableStorage] = {} 395 self.linear_constraints: Dict[int, _LinearConstraintStorage] = {} 396 self._linear_constraint_matrix: Dict[Tuple[int, int], float] = {} # 397 self._is_maximize: bool = False 398 self._objective_offset: float = 0.0 399 self.linear_objective_coefficient: Dict[int, float] = {} 400 self._quadratic_objective_coefficients: _QuadraticTermStorage = ( 401 _QuadraticTermStorage() 402 ) 403 self._next_var_id: int = 0 404 self._next_lin_con_id: int = 0 405 self._update_trackers: weakref.WeakSet[_UpdateTracker] = weakref.WeakSet() 406 407 @property 408 def name(self) -> str: 409 return self._name 410 411 def add_variable(self, lb: float, ub: float, is_integer: bool, name: str) -> int: 412 var_id = self._next_var_id 413 self._next_var_id += 1 414 self.variables[var_id] = _VariableStorage(lb, ub, is_integer, name) 415 return var_id 416 417 def delete_variable(self, variable_id: int) -> None: 418 self._check_variable_id(variable_id) 419 variable = self.variables[variable_id] 420 # First update the watchers 421 for watcher in self._update_trackers: 422 if variable_id < watcher.variables_checkpoint: 423 watcher.variable_deletes.add(variable_id) 424 watcher.variable_lbs.discard(variable_id) 425 watcher.variable_ubs.discard(variable_id) 426 watcher.variable_integers.discard(variable_id) 427 watcher.linear_objective_coefficients.discard(variable_id) 428 for ( 429 other_variable_id 430 ) in self._quadratic_objective_coefficients.get_adjacent_variables( 431 variable_id 432 ): 433 key = _QuadraticKey(variable_id, other_variable_id) 434 watcher.quadratic_objective_coefficients.discard(key) 435 for lin_con_id in variable.linear_constraint_nonzeros: 436 if lin_con_id < watcher.linear_constraints_checkpoint: 437 watcher.linear_constraint_matrix.discard( 438 (lin_con_id, variable_id) 439 ) 440 # Then update self. 441 for lin_con_id in variable.linear_constraint_nonzeros: 442 self.linear_constraints[lin_con_id].variable_nonzeros.remove(variable_id) 443 del self._linear_constraint_matrix[(lin_con_id, variable_id)] 444 del self.variables[variable_id] 445 self.linear_objective_coefficient.pop(variable_id, None) 446 self._quadratic_objective_coefficients.delete_variable(variable_id) 447 448 def variable_exists(self, variable_id: int) -> bool: 449 return variable_id in self.variables 450 451 def next_variable_id(self) -> int: 452 return self._next_var_id 453 454 def set_variable_lb(self, variable_id: int, lb: float) -> None: 455 self._check_variable_id(variable_id) 456 if lb == self.variables[variable_id].lower_bound: 457 return 458 self.variables[variable_id].lower_bound = lb 459 for watcher in self._update_trackers: 460 if variable_id < watcher.variables_checkpoint: 461 watcher.variable_lbs.add(variable_id) 462 463 def set_variable_ub(self, variable_id: int, ub: float) -> None: 464 self._check_variable_id(variable_id) 465 if ub == self.variables[variable_id].upper_bound: 466 return 467 self.variables[variable_id].upper_bound = ub 468 for watcher in self._update_trackers: 469 if variable_id < watcher.variables_checkpoint: 470 watcher.variable_ubs.add(variable_id) 471 472 def set_variable_is_integer(self, variable_id: int, is_integer: bool) -> None: 473 self._check_variable_id(variable_id) 474 if is_integer == self.variables[variable_id].is_integer: 475 return 476 self.variables[variable_id].is_integer = is_integer 477 for watcher in self._update_trackers: 478 if variable_id < watcher.variables_checkpoint: 479 watcher.variable_integers.add(variable_id) 480 481 def get_variable_lb(self, variable_id: int) -> float: 482 self._check_variable_id(variable_id) 483 return self.variables[variable_id].lower_bound 484 485 def get_variable_ub(self, variable_id: int) -> float: 486 self._check_variable_id(variable_id) 487 return self.variables[variable_id].upper_bound 488 489 def get_variable_is_integer(self, variable_id: int) -> bool: 490 self._check_variable_id(variable_id) 491 return self.variables[variable_id].is_integer 492 493 def get_variable_name(self, variable_id: int) -> str: 494 self._check_variable_id(variable_id) 495 return self.variables[variable_id].name 496 497 def get_variables(self) -> Iterator[int]: 498 yield from self.variables.keys() 499 500 def add_linear_constraint(self, lb: float, ub: float, name: str) -> int: 501 lin_con_id = self._next_lin_con_id 502 self._next_lin_con_id += 1 503 self.linear_constraints[lin_con_id] = _LinearConstraintStorage(lb, ub, name) 504 return lin_con_id 505 506 def delete_linear_constraint(self, linear_constraint_id: int) -> None: 507 self._check_linear_constraint_id(linear_constraint_id) 508 con = self.linear_constraints[linear_constraint_id] 509 # First update the watchers 510 for watcher in self._update_trackers: 511 if linear_constraint_id < watcher.linear_constraints_checkpoint: 512 watcher.linear_constraint_deletes.add(linear_constraint_id) 513 watcher.linear_constraint_lbs.discard(linear_constraint_id) 514 watcher.linear_constraint_ubs.discard(linear_constraint_id) 515 for var_id in con.variable_nonzeros: 516 if var_id < watcher.variables_checkpoint: 517 watcher.linear_constraint_matrix.discard( 518 (linear_constraint_id, var_id) 519 ) 520 # Then update self. 521 for var_id in con.variable_nonzeros: 522 self.variables[var_id].linear_constraint_nonzeros.remove( 523 linear_constraint_id 524 ) 525 del self._linear_constraint_matrix[(linear_constraint_id, var_id)] 526 del self.linear_constraints[linear_constraint_id] 527 528 def linear_constraint_exists(self, linear_constraint_id: int) -> bool: 529 return linear_constraint_id in self.linear_constraints 530 531 def next_linear_constraint_id(self) -> int: 532 return self._next_lin_con_id 533 534 def set_linear_constraint_lb(self, linear_constraint_id: int, lb: float) -> None: 535 self._check_linear_constraint_id(linear_constraint_id) 536 if lb == self.linear_constraints[linear_constraint_id].lower_bound: 537 return 538 self.linear_constraints[linear_constraint_id].lower_bound = lb 539 for watcher in self._update_trackers: 540 if linear_constraint_id < watcher.linear_constraints_checkpoint: 541 watcher.linear_constraint_lbs.add(linear_constraint_id) 542 543 def set_linear_constraint_ub(self, linear_constraint_id: int, ub: float) -> None: 544 self._check_linear_constraint_id(linear_constraint_id) 545 if ub == self.linear_constraints[linear_constraint_id].upper_bound: 546 return 547 self.linear_constraints[linear_constraint_id].upper_bound = ub 548 for watcher in self._update_trackers: 549 if linear_constraint_id < watcher.linear_constraints_checkpoint: 550 watcher.linear_constraint_ubs.add(linear_constraint_id) 551 552 def get_linear_constraint_lb(self, linear_constraint_id: int) -> float: 553 self._check_linear_constraint_id(linear_constraint_id) 554 return self.linear_constraints[linear_constraint_id].lower_bound 555 556 def get_linear_constraint_ub(self, linear_constraint_id: int) -> float: 557 self._check_linear_constraint_id(linear_constraint_id) 558 return self.linear_constraints[linear_constraint_id].upper_bound 559 560 def get_linear_constraint_name(self, linear_constraint_id: int) -> str: 561 self._check_linear_constraint_id(linear_constraint_id) 562 return self.linear_constraints[linear_constraint_id].name 563 564 def get_linear_constraints(self) -> Iterator[int]: 565 yield from self.linear_constraints.keys() 566 567 def set_linear_constraint_coefficient( 568 self, linear_constraint_id: int, variable_id: int, value: float 569 ) -> None: 570 self._check_linear_constraint_id(linear_constraint_id) 571 self._check_variable_id(variable_id) 572 if value == self._linear_constraint_matrix.get( 573 (linear_constraint_id, variable_id), 0.0 574 ): 575 return 576 if value == 0.0: 577 self._linear_constraint_matrix.pop( 578 (linear_constraint_id, variable_id), None 579 ) 580 self.variables[variable_id].linear_constraint_nonzeros.discard( 581 linear_constraint_id 582 ) 583 self.linear_constraints[linear_constraint_id].variable_nonzeros.discard( 584 variable_id 585 ) 586 else: 587 self._linear_constraint_matrix[(linear_constraint_id, variable_id)] = value 588 self.variables[variable_id].linear_constraint_nonzeros.add( 589 linear_constraint_id 590 ) 591 self.linear_constraints[linear_constraint_id].variable_nonzeros.add( 592 variable_id 593 ) 594 for watcher in self._update_trackers: 595 if ( 596 variable_id < watcher.variables_checkpoint 597 and linear_constraint_id < watcher.linear_constraints_checkpoint 598 ): 599 watcher.linear_constraint_matrix.add( 600 (linear_constraint_id, variable_id) 601 ) 602 603 def get_linear_constraint_coefficient( 604 self, linear_constraint_id: int, variable_id: int 605 ) -> float: 606 self._check_linear_constraint_id(linear_constraint_id) 607 self._check_variable_id(variable_id) 608 return self._linear_constraint_matrix.get( 609 (linear_constraint_id, variable_id), 0.0 610 ) 611 612 def get_linear_constraints_with_variable(self, variable_id: int) -> Iterator[int]: 613 self._check_variable_id(variable_id) 614 yield from self.variables[variable_id].linear_constraint_nonzeros 615 616 def get_variables_for_linear_constraint( 617 self, linear_constraint_id: int 618 ) -> Iterator[int]: 619 self._check_linear_constraint_id(linear_constraint_id) 620 yield from self.linear_constraints[linear_constraint_id].variable_nonzeros 621 622 def get_linear_constraint_matrix_entries( 623 self, 624 ) -> Iterator[model_storage.LinearConstraintMatrixIdEntry]: 625 for (constraint, variable), coef in self._linear_constraint_matrix.items(): 626 yield model_storage.LinearConstraintMatrixIdEntry( 627 linear_constraint_id=constraint, 628 variable_id=variable, 629 coefficient=coef, 630 ) 631 632 def clear_objective(self) -> None: 633 for variable_id in self.linear_objective_coefficient: 634 for watcher in self._update_trackers: 635 if variable_id < watcher.variables_checkpoint: 636 watcher.linear_objective_coefficients.add(variable_id) 637 self.linear_objective_coefficient.clear() 638 for key in self._quadratic_objective_coefficients.keys(): 639 for watcher in self._update_trackers: 640 if key.id2 < watcher.variables_checkpoint: 641 watcher.quadratic_objective_coefficients.add(key) 642 self._quadratic_objective_coefficients.clear() 643 self.set_objective_offset(0.0) 644 645 def set_linear_objective_coefficient(self, variable_id: int, value: float) -> None: 646 self._check_variable_id(variable_id) 647 if value == self.linear_objective_coefficient.get(variable_id, 0.0): 648 return 649 if value == 0.0: 650 self.linear_objective_coefficient.pop(variable_id, None) 651 else: 652 self.linear_objective_coefficient[variable_id] = value 653 for watcher in self._update_trackers: 654 if variable_id < watcher.variables_checkpoint: 655 watcher.linear_objective_coefficients.add(variable_id) 656 657 def get_linear_objective_coefficient(self, variable_id: int) -> float: 658 self._check_variable_id(variable_id) 659 return self.linear_objective_coefficient.get(variable_id, 0.0) 660 661 def get_linear_objective_coefficients( 662 self, 663 ) -> Iterator[model_storage.LinearObjectiveEntry]: 664 for var_id, coef in self.linear_objective_coefficient.items(): 665 yield model_storage.LinearObjectiveEntry( 666 variable_id=var_id, coefficient=coef 667 ) 668 669 def set_quadratic_objective_coefficient( 670 self, first_variable_id: int, second_variable_id: int, value: float 671 ) -> None: 672 self._check_variable_id(first_variable_id) 673 self._check_variable_id(second_variable_id) 674 updated = self._quadratic_objective_coefficients.set_coefficient( 675 first_variable_id, second_variable_id, value 676 ) 677 if updated: 678 for watcher in self._update_trackers: 679 if ( 680 max(first_variable_id, second_variable_id) 681 < watcher.variables_checkpoint 682 ): 683 watcher.quadratic_objective_coefficients.add( 684 _QuadraticKey(first_variable_id, second_variable_id) 685 ) 686 687 def get_quadratic_objective_coefficient( 688 self, first_variable_id: int, second_variable_id: int 689 ) -> float: 690 self._check_variable_id(first_variable_id) 691 self._check_variable_id(second_variable_id) 692 return self._quadratic_objective_coefficients.get_coefficient( 693 first_variable_id, second_variable_id 694 ) 695 696 def get_quadratic_objective_coefficients( 697 self, 698 ) -> Iterator[model_storage.QuadraticEntry]: 699 yield from self._quadratic_objective_coefficients.coefficients() 700 701 def get_quadratic_objective_adjacent_variables( 702 self, variable_id: int 703 ) -> Iterator[int]: 704 self._check_variable_id(variable_id) 705 yield from self._quadratic_objective_coefficients.get_adjacent_variables( 706 variable_id 707 ) 708 709 def set_is_maximize(self, is_maximize: bool) -> None: 710 if self._is_maximize == is_maximize: 711 return 712 self._is_maximize = is_maximize 713 for watcher in self._update_trackers: 714 watcher.objective_direction = True 715 716 def get_is_maximize(self) -> bool: 717 return self._is_maximize 718 719 def set_objective_offset(self, offset: float) -> None: 720 if self._objective_offset == offset: 721 return 722 self._objective_offset = offset 723 for watcher in self._update_trackers: 724 watcher.objective_offset = True 725 726 def get_objective_offset(self) -> float: 727 return self._objective_offset 728 729 def export_model(self) -> model_pb2.ModelProto: 730 m: model_pb2.ModelProto = model_pb2.ModelProto() 731 m.name = self._name 732 _variables_to_proto(self.variables.items(), m.variables) 733 _linear_constraints_to_proto( 734 self.linear_constraints.items(), m.linear_constraints 735 ) 736 m.objective.maximize = self._is_maximize 737 m.objective.offset = self._objective_offset 738 if self.linear_objective_coefficient: 739 obj_ids, obj_coefs = zip(*sorted(self.linear_objective_coefficient.items())) 740 m.objective.linear_coefficients.ids.extend(obj_ids) 741 m.objective.linear_coefficients.values.extend(obj_coefs) 742 if self._quadratic_objective_coefficients: 743 first_var_ids, second_var_ids, coefficients = zip( 744 *sorted( 745 [ 746 (entry.id_key.id1, entry.id_key.id2, entry.coefficient) 747 for entry in self._quadratic_objective_coefficients.coefficients() 748 ] 749 ) 750 ) 751 m.objective.quadratic_coefficients.row_ids.extend(first_var_ids) 752 m.objective.quadratic_coefficients.column_ids.extend(second_var_ids) 753 m.objective.quadratic_coefficients.coefficients.extend(coefficients) 754 if self._linear_constraint_matrix: 755 flat_matrix_items = [ 756 (con_id, var_id, coef) 757 for ((con_id, var_id), coef) in self._linear_constraint_matrix.items() 758 ] 759 lin_con_ids, var_ids, lin_con_coefs = zip(*sorted(flat_matrix_items)) 760 m.linear_constraint_matrix.row_ids.extend(lin_con_ids) 761 m.linear_constraint_matrix.column_ids.extend(var_ids) 762 m.linear_constraint_matrix.coefficients.extend(lin_con_coefs) 763 return m 764 765 def add_update_tracker(self) -> model_storage.StorageUpdateTracker: 766 tracker = _UpdateTracker(self) 767 self._update_trackers.add(tracker) 768 return tracker 769 770 def remove_update_tracker( 771 self, tracker: model_storage.StorageUpdateTracker 772 ) -> None: 773 self._update_trackers.remove(tracker) 774 tracker.retired = True 775 776 def _check_variable_id(self, variable_id: int) -> None: 777 if variable_id not in self.variables: 778 raise model_storage.BadVariableIdError(variable_id) 779 780 def _check_linear_constraint_id(self, linear_constraint_id: int) -> None: 781 if linear_constraint_id not in self.linear_constraints: 782 raise model_storage.BadLinearConstraintIdError(linear_constraint_id) 783 784 785def _set_sparse_double_vector( 786 id_value_pairs: Iterable[Tuple[int, float]], 787 proto: sparse_containers_pb2.SparseDoubleVectorProto, 788) -> None: 789 """id_value_pairs must be sorted, proto is filled.""" 790 if not id_value_pairs: 791 return 792 ids, values = zip(*id_value_pairs) 793 proto.ids[:] = ids 794 proto.values[:] = values 795 796 797def _set_sparse_bool_vector( 798 id_value_pairs: Iterable[Tuple[int, bool]], 799 proto: sparse_containers_pb2.SparseBoolVectorProto, 800) -> None: 801 """id_value_pairs must be sorted, proto is filled.""" 802 if not id_value_pairs: 803 return 804 ids, values = zip(*id_value_pairs) 805 proto.ids[:] = ids 806 proto.values[:] = values 807 808 809def _variables_to_proto( 810 variables: Iterable[Tuple[int, _VariableStorage]], 811 proto: model_pb2.VariablesProto, 812) -> None: 813 """Exports variables to proto.""" 814 has_named_var = False 815 for _, var_storage in variables: 816 if var_storage.name: 817 has_named_var = True 818 break 819 for var_id, var_storage in variables: 820 proto.ids.append(var_id) 821 proto.lower_bounds.append(var_storage.lower_bound) 822 proto.upper_bounds.append(var_storage.upper_bound) 823 proto.integers.append(var_storage.is_integer) 824 if has_named_var: 825 proto.names.append(var_storage.name) 826 827 828def _linear_constraints_to_proto( 829 linear_constraints: Iterable[Tuple[int, _LinearConstraintStorage]], 830 proto: model_pb2.LinearConstraintsProto, 831) -> None: 832 """Exports variables to proto.""" 833 has_named_lin_con = False 834 for _, lin_con_storage in linear_constraints: 835 if lin_con_storage.name: 836 has_named_lin_con = True 837 break 838 for lin_con_id, lin_con_storage in linear_constraints: 839 proto.ids.append(lin_con_id) 840 proto.lower_bounds.append(lin_con_storage.lower_bound) 841 proto.upper_bounds.append(lin_con_storage.upper_bound) 842 if has_named_lin_con: 843 proto.names.append(lin_con_storage.name)
378class HashModelStorage(model_storage.ModelStorage): 379 """A simple, pure python implementation of ModelStorage. 380 381 Attributes: 382 _linear_constraint_matrix: A dictionary with (linear_constraint_id, 383 variable_id) keys and numeric values, representing the matrix A for the 384 constraints lb_c <= A*x <= ub_c. Invariant: the values have no zeros. 385 linear_objective_coefficient: A dictionary with variable_id keys and 386 numeric values, representing the linear terms in the objective. 387 Invariant: the values have no zeros. 388 _quadratic_objective_coefficients: A data structure containing quadratic 389 terms in the objective. 390 """ 391 392 def __init__(self, name: str = "") -> None: 393 super().__init__() 394 self._name: str = name 395 self.variables: Dict[int, _VariableStorage] = {} 396 self.linear_constraints: Dict[int, _LinearConstraintStorage] = {} 397 self._linear_constraint_matrix: Dict[Tuple[int, int], float] = {} # 398 self._is_maximize: bool = False 399 self._objective_offset: float = 0.0 400 self.linear_objective_coefficient: Dict[int, float] = {} 401 self._quadratic_objective_coefficients: _QuadraticTermStorage = ( 402 _QuadraticTermStorage() 403 ) 404 self._next_var_id: int = 0 405 self._next_lin_con_id: int = 0 406 self._update_trackers: weakref.WeakSet[_UpdateTracker] = weakref.WeakSet() 407 408 @property 409 def name(self) -> str: 410 return self._name 411 412 def add_variable(self, lb: float, ub: float, is_integer: bool, name: str) -> int: 413 var_id = self._next_var_id 414 self._next_var_id += 1 415 self.variables[var_id] = _VariableStorage(lb, ub, is_integer, name) 416 return var_id 417 418 def delete_variable(self, variable_id: int) -> None: 419 self._check_variable_id(variable_id) 420 variable = self.variables[variable_id] 421 # First update the watchers 422 for watcher in self._update_trackers: 423 if variable_id < watcher.variables_checkpoint: 424 watcher.variable_deletes.add(variable_id) 425 watcher.variable_lbs.discard(variable_id) 426 watcher.variable_ubs.discard(variable_id) 427 watcher.variable_integers.discard(variable_id) 428 watcher.linear_objective_coefficients.discard(variable_id) 429 for ( 430 other_variable_id 431 ) in self._quadratic_objective_coefficients.get_adjacent_variables( 432 variable_id 433 ): 434 key = _QuadraticKey(variable_id, other_variable_id) 435 watcher.quadratic_objective_coefficients.discard(key) 436 for lin_con_id in variable.linear_constraint_nonzeros: 437 if lin_con_id < watcher.linear_constraints_checkpoint: 438 watcher.linear_constraint_matrix.discard( 439 (lin_con_id, variable_id) 440 ) 441 # Then update self. 442 for lin_con_id in variable.linear_constraint_nonzeros: 443 self.linear_constraints[lin_con_id].variable_nonzeros.remove(variable_id) 444 del self._linear_constraint_matrix[(lin_con_id, variable_id)] 445 del self.variables[variable_id] 446 self.linear_objective_coefficient.pop(variable_id, None) 447 self._quadratic_objective_coefficients.delete_variable(variable_id) 448 449 def variable_exists(self, variable_id: int) -> bool: 450 return variable_id in self.variables 451 452 def next_variable_id(self) -> int: 453 return self._next_var_id 454 455 def set_variable_lb(self, variable_id: int, lb: float) -> None: 456 self._check_variable_id(variable_id) 457 if lb == self.variables[variable_id].lower_bound: 458 return 459 self.variables[variable_id].lower_bound = lb 460 for watcher in self._update_trackers: 461 if variable_id < watcher.variables_checkpoint: 462 watcher.variable_lbs.add(variable_id) 463 464 def set_variable_ub(self, variable_id: int, ub: float) -> None: 465 self._check_variable_id(variable_id) 466 if ub == self.variables[variable_id].upper_bound: 467 return 468 self.variables[variable_id].upper_bound = ub 469 for watcher in self._update_trackers: 470 if variable_id < watcher.variables_checkpoint: 471 watcher.variable_ubs.add(variable_id) 472 473 def set_variable_is_integer(self, variable_id: int, is_integer: bool) -> None: 474 self._check_variable_id(variable_id) 475 if is_integer == self.variables[variable_id].is_integer: 476 return 477 self.variables[variable_id].is_integer = is_integer 478 for watcher in self._update_trackers: 479 if variable_id < watcher.variables_checkpoint: 480 watcher.variable_integers.add(variable_id) 481 482 def get_variable_lb(self, variable_id: int) -> float: 483 self._check_variable_id(variable_id) 484 return self.variables[variable_id].lower_bound 485 486 def get_variable_ub(self, variable_id: int) -> float: 487 self._check_variable_id(variable_id) 488 return self.variables[variable_id].upper_bound 489 490 def get_variable_is_integer(self, variable_id: int) -> bool: 491 self._check_variable_id(variable_id) 492 return self.variables[variable_id].is_integer 493 494 def get_variable_name(self, variable_id: int) -> str: 495 self._check_variable_id(variable_id) 496 return self.variables[variable_id].name 497 498 def get_variables(self) -> Iterator[int]: 499 yield from self.variables.keys() 500 501 def add_linear_constraint(self, lb: float, ub: float, name: str) -> int: 502 lin_con_id = self._next_lin_con_id 503 self._next_lin_con_id += 1 504 self.linear_constraints[lin_con_id] = _LinearConstraintStorage(lb, ub, name) 505 return lin_con_id 506 507 def delete_linear_constraint(self, linear_constraint_id: int) -> None: 508 self._check_linear_constraint_id(linear_constraint_id) 509 con = self.linear_constraints[linear_constraint_id] 510 # First update the watchers 511 for watcher in self._update_trackers: 512 if linear_constraint_id < watcher.linear_constraints_checkpoint: 513 watcher.linear_constraint_deletes.add(linear_constraint_id) 514 watcher.linear_constraint_lbs.discard(linear_constraint_id) 515 watcher.linear_constraint_ubs.discard(linear_constraint_id) 516 for var_id in con.variable_nonzeros: 517 if var_id < watcher.variables_checkpoint: 518 watcher.linear_constraint_matrix.discard( 519 (linear_constraint_id, var_id) 520 ) 521 # Then update self. 522 for var_id in con.variable_nonzeros: 523 self.variables[var_id].linear_constraint_nonzeros.remove( 524 linear_constraint_id 525 ) 526 del self._linear_constraint_matrix[(linear_constraint_id, var_id)] 527 del self.linear_constraints[linear_constraint_id] 528 529 def linear_constraint_exists(self, linear_constraint_id: int) -> bool: 530 return linear_constraint_id in self.linear_constraints 531 532 def next_linear_constraint_id(self) -> int: 533 return self._next_lin_con_id 534 535 def set_linear_constraint_lb(self, linear_constraint_id: int, lb: float) -> None: 536 self._check_linear_constraint_id(linear_constraint_id) 537 if lb == self.linear_constraints[linear_constraint_id].lower_bound: 538 return 539 self.linear_constraints[linear_constraint_id].lower_bound = lb 540 for watcher in self._update_trackers: 541 if linear_constraint_id < watcher.linear_constraints_checkpoint: 542 watcher.linear_constraint_lbs.add(linear_constraint_id) 543 544 def set_linear_constraint_ub(self, linear_constraint_id: int, ub: float) -> None: 545 self._check_linear_constraint_id(linear_constraint_id) 546 if ub == self.linear_constraints[linear_constraint_id].upper_bound: 547 return 548 self.linear_constraints[linear_constraint_id].upper_bound = ub 549 for watcher in self._update_trackers: 550 if linear_constraint_id < watcher.linear_constraints_checkpoint: 551 watcher.linear_constraint_ubs.add(linear_constraint_id) 552 553 def get_linear_constraint_lb(self, linear_constraint_id: int) -> float: 554 self._check_linear_constraint_id(linear_constraint_id) 555 return self.linear_constraints[linear_constraint_id].lower_bound 556 557 def get_linear_constraint_ub(self, linear_constraint_id: int) -> float: 558 self._check_linear_constraint_id(linear_constraint_id) 559 return self.linear_constraints[linear_constraint_id].upper_bound 560 561 def get_linear_constraint_name(self, linear_constraint_id: int) -> str: 562 self._check_linear_constraint_id(linear_constraint_id) 563 return self.linear_constraints[linear_constraint_id].name 564 565 def get_linear_constraints(self) -> Iterator[int]: 566 yield from self.linear_constraints.keys() 567 568 def set_linear_constraint_coefficient( 569 self, linear_constraint_id: int, variable_id: int, value: float 570 ) -> None: 571 self._check_linear_constraint_id(linear_constraint_id) 572 self._check_variable_id(variable_id) 573 if value == self._linear_constraint_matrix.get( 574 (linear_constraint_id, variable_id), 0.0 575 ): 576 return 577 if value == 0.0: 578 self._linear_constraint_matrix.pop( 579 (linear_constraint_id, variable_id), None 580 ) 581 self.variables[variable_id].linear_constraint_nonzeros.discard( 582 linear_constraint_id 583 ) 584 self.linear_constraints[linear_constraint_id].variable_nonzeros.discard( 585 variable_id 586 ) 587 else: 588 self._linear_constraint_matrix[(linear_constraint_id, variable_id)] = value 589 self.variables[variable_id].linear_constraint_nonzeros.add( 590 linear_constraint_id 591 ) 592 self.linear_constraints[linear_constraint_id].variable_nonzeros.add( 593 variable_id 594 ) 595 for watcher in self._update_trackers: 596 if ( 597 variable_id < watcher.variables_checkpoint 598 and linear_constraint_id < watcher.linear_constraints_checkpoint 599 ): 600 watcher.linear_constraint_matrix.add( 601 (linear_constraint_id, variable_id) 602 ) 603 604 def get_linear_constraint_coefficient( 605 self, linear_constraint_id: int, variable_id: int 606 ) -> float: 607 self._check_linear_constraint_id(linear_constraint_id) 608 self._check_variable_id(variable_id) 609 return self._linear_constraint_matrix.get( 610 (linear_constraint_id, variable_id), 0.0 611 ) 612 613 def get_linear_constraints_with_variable(self, variable_id: int) -> Iterator[int]: 614 self._check_variable_id(variable_id) 615 yield from self.variables[variable_id].linear_constraint_nonzeros 616 617 def get_variables_for_linear_constraint( 618 self, linear_constraint_id: int 619 ) -> Iterator[int]: 620 self._check_linear_constraint_id(linear_constraint_id) 621 yield from self.linear_constraints[linear_constraint_id].variable_nonzeros 622 623 def get_linear_constraint_matrix_entries( 624 self, 625 ) -> Iterator[model_storage.LinearConstraintMatrixIdEntry]: 626 for (constraint, variable), coef in self._linear_constraint_matrix.items(): 627 yield model_storage.LinearConstraintMatrixIdEntry( 628 linear_constraint_id=constraint, 629 variable_id=variable, 630 coefficient=coef, 631 ) 632 633 def clear_objective(self) -> None: 634 for variable_id in self.linear_objective_coefficient: 635 for watcher in self._update_trackers: 636 if variable_id < watcher.variables_checkpoint: 637 watcher.linear_objective_coefficients.add(variable_id) 638 self.linear_objective_coefficient.clear() 639 for key in self._quadratic_objective_coefficients.keys(): 640 for watcher in self._update_trackers: 641 if key.id2 < watcher.variables_checkpoint: 642 watcher.quadratic_objective_coefficients.add(key) 643 self._quadratic_objective_coefficients.clear() 644 self.set_objective_offset(0.0) 645 646 def set_linear_objective_coefficient(self, variable_id: int, value: float) -> None: 647 self._check_variable_id(variable_id) 648 if value == self.linear_objective_coefficient.get(variable_id, 0.0): 649 return 650 if value == 0.0: 651 self.linear_objective_coefficient.pop(variable_id, None) 652 else: 653 self.linear_objective_coefficient[variable_id] = value 654 for watcher in self._update_trackers: 655 if variable_id < watcher.variables_checkpoint: 656 watcher.linear_objective_coefficients.add(variable_id) 657 658 def get_linear_objective_coefficient(self, variable_id: int) -> float: 659 self._check_variable_id(variable_id) 660 return self.linear_objective_coefficient.get(variable_id, 0.0) 661 662 def get_linear_objective_coefficients( 663 self, 664 ) -> Iterator[model_storage.LinearObjectiveEntry]: 665 for var_id, coef in self.linear_objective_coefficient.items(): 666 yield model_storage.LinearObjectiveEntry( 667 variable_id=var_id, coefficient=coef 668 ) 669 670 def set_quadratic_objective_coefficient( 671 self, first_variable_id: int, second_variable_id: int, value: float 672 ) -> None: 673 self._check_variable_id(first_variable_id) 674 self._check_variable_id(second_variable_id) 675 updated = self._quadratic_objective_coefficients.set_coefficient( 676 first_variable_id, second_variable_id, value 677 ) 678 if updated: 679 for watcher in self._update_trackers: 680 if ( 681 max(first_variable_id, second_variable_id) 682 < watcher.variables_checkpoint 683 ): 684 watcher.quadratic_objective_coefficients.add( 685 _QuadraticKey(first_variable_id, second_variable_id) 686 ) 687 688 def get_quadratic_objective_coefficient( 689 self, first_variable_id: int, second_variable_id: int 690 ) -> float: 691 self._check_variable_id(first_variable_id) 692 self._check_variable_id(second_variable_id) 693 return self._quadratic_objective_coefficients.get_coefficient( 694 first_variable_id, second_variable_id 695 ) 696 697 def get_quadratic_objective_coefficients( 698 self, 699 ) -> Iterator[model_storage.QuadraticEntry]: 700 yield from self._quadratic_objective_coefficients.coefficients() 701 702 def get_quadratic_objective_adjacent_variables( 703 self, variable_id: int 704 ) -> Iterator[int]: 705 self._check_variable_id(variable_id) 706 yield from self._quadratic_objective_coefficients.get_adjacent_variables( 707 variable_id 708 ) 709 710 def set_is_maximize(self, is_maximize: bool) -> None: 711 if self._is_maximize == is_maximize: 712 return 713 self._is_maximize = is_maximize 714 for watcher in self._update_trackers: 715 watcher.objective_direction = True 716 717 def get_is_maximize(self) -> bool: 718 return self._is_maximize 719 720 def set_objective_offset(self, offset: float) -> None: 721 if self._objective_offset == offset: 722 return 723 self._objective_offset = offset 724 for watcher in self._update_trackers: 725 watcher.objective_offset = True 726 727 def get_objective_offset(self) -> float: 728 return self._objective_offset 729 730 def export_model(self) -> model_pb2.ModelProto: 731 m: model_pb2.ModelProto = model_pb2.ModelProto() 732 m.name = self._name 733 _variables_to_proto(self.variables.items(), m.variables) 734 _linear_constraints_to_proto( 735 self.linear_constraints.items(), m.linear_constraints 736 ) 737 m.objective.maximize = self._is_maximize 738 m.objective.offset = self._objective_offset 739 if self.linear_objective_coefficient: 740 obj_ids, obj_coefs = zip(*sorted(self.linear_objective_coefficient.items())) 741 m.objective.linear_coefficients.ids.extend(obj_ids) 742 m.objective.linear_coefficients.values.extend(obj_coefs) 743 if self._quadratic_objective_coefficients: 744 first_var_ids, second_var_ids, coefficients = zip( 745 *sorted( 746 [ 747 (entry.id_key.id1, entry.id_key.id2, entry.coefficient) 748 for entry in self._quadratic_objective_coefficients.coefficients() 749 ] 750 ) 751 ) 752 m.objective.quadratic_coefficients.row_ids.extend(first_var_ids) 753 m.objective.quadratic_coefficients.column_ids.extend(second_var_ids) 754 m.objective.quadratic_coefficients.coefficients.extend(coefficients) 755 if self._linear_constraint_matrix: 756 flat_matrix_items = [ 757 (con_id, var_id, coef) 758 for ((con_id, var_id), coef) in self._linear_constraint_matrix.items() 759 ] 760 lin_con_ids, var_ids, lin_con_coefs = zip(*sorted(flat_matrix_items)) 761 m.linear_constraint_matrix.row_ids.extend(lin_con_ids) 762 m.linear_constraint_matrix.column_ids.extend(var_ids) 763 m.linear_constraint_matrix.coefficients.extend(lin_con_coefs) 764 return m 765 766 def add_update_tracker(self) -> model_storage.StorageUpdateTracker: 767 tracker = _UpdateTracker(self) 768 self._update_trackers.add(tracker) 769 return tracker 770 771 def remove_update_tracker( 772 self, tracker: model_storage.StorageUpdateTracker 773 ) -> None: 774 self._update_trackers.remove(tracker) 775 tracker.retired = True 776 777 def _check_variable_id(self, variable_id: int) -> None: 778 if variable_id not in self.variables: 779 raise model_storage.BadVariableIdError(variable_id) 780 781 def _check_linear_constraint_id(self, linear_constraint_id: int) -> None: 782 if linear_constraint_id not in self.linear_constraints: 783 raise model_storage.BadLinearConstraintIdError(linear_constraint_id)
A simple, pure python implementation of ModelStorage.
Attributes:
- _linear_constraint_matrix: A dictionary with (linear_constraint_id, variable_id) keys and numeric values, representing the matrix A for the constraints lb_c <= A*x <= ub_c. Invariant: the values have no zeros.
- linear_objective_coefficient: A dictionary with variable_id keys and numeric values, representing the linear terms in the objective. Invariant: the values have no zeros.
- _quadratic_objective_coefficients: A data structure containing quadratic terms in the objective.
392 def __init__(self, name: str = "") -> None: 393 super().__init__() 394 self._name: str = name 395 self.variables: Dict[int, _VariableStorage] = {} 396 self.linear_constraints: Dict[int, _LinearConstraintStorage] = {} 397 self._linear_constraint_matrix: Dict[Tuple[int, int], float] = {} # 398 self._is_maximize: bool = False 399 self._objective_offset: float = 0.0 400 self.linear_objective_coefficient: Dict[int, float] = {} 401 self._quadratic_objective_coefficients: _QuadraticTermStorage = ( 402 _QuadraticTermStorage() 403 ) 404 self._next_var_id: int = 0 405 self._next_lin_con_id: int = 0 406 self._update_trackers: weakref.WeakSet[_UpdateTracker] = weakref.WeakSet()
418 def delete_variable(self, variable_id: int) -> None: 419 self._check_variable_id(variable_id) 420 variable = self.variables[variable_id] 421 # First update the watchers 422 for watcher in self._update_trackers: 423 if variable_id < watcher.variables_checkpoint: 424 watcher.variable_deletes.add(variable_id) 425 watcher.variable_lbs.discard(variable_id) 426 watcher.variable_ubs.discard(variable_id) 427 watcher.variable_integers.discard(variable_id) 428 watcher.linear_objective_coefficients.discard(variable_id) 429 for ( 430 other_variable_id 431 ) in self._quadratic_objective_coefficients.get_adjacent_variables( 432 variable_id 433 ): 434 key = _QuadraticKey(variable_id, other_variable_id) 435 watcher.quadratic_objective_coefficients.discard(key) 436 for lin_con_id in variable.linear_constraint_nonzeros: 437 if lin_con_id < watcher.linear_constraints_checkpoint: 438 watcher.linear_constraint_matrix.discard( 439 (lin_con_id, variable_id) 440 ) 441 # Then update self. 442 for lin_con_id in variable.linear_constraint_nonzeros: 443 self.linear_constraints[lin_con_id].variable_nonzeros.remove(variable_id) 444 del self._linear_constraint_matrix[(lin_con_id, variable_id)] 445 del self.variables[variable_id] 446 self.linear_objective_coefficient.pop(variable_id, None) 447 self._quadratic_objective_coefficients.delete_variable(variable_id)
455 def set_variable_lb(self, variable_id: int, lb: float) -> None: 456 self._check_variable_id(variable_id) 457 if lb == self.variables[variable_id].lower_bound: 458 return 459 self.variables[variable_id].lower_bound = lb 460 for watcher in self._update_trackers: 461 if variable_id < watcher.variables_checkpoint: 462 watcher.variable_lbs.add(variable_id)
464 def set_variable_ub(self, variable_id: int, ub: float) -> None: 465 self._check_variable_id(variable_id) 466 if ub == self.variables[variable_id].upper_bound: 467 return 468 self.variables[variable_id].upper_bound = ub 469 for watcher in self._update_trackers: 470 if variable_id < watcher.variables_checkpoint: 471 watcher.variable_ubs.add(variable_id)
473 def set_variable_is_integer(self, variable_id: int, is_integer: bool) -> None: 474 self._check_variable_id(variable_id) 475 if is_integer == self.variables[variable_id].is_integer: 476 return 477 self.variables[variable_id].is_integer = is_integer 478 for watcher in self._update_trackers: 479 if variable_id < watcher.variables_checkpoint: 480 watcher.variable_integers.add(variable_id)
507 def delete_linear_constraint(self, linear_constraint_id: int) -> None: 508 self._check_linear_constraint_id(linear_constraint_id) 509 con = self.linear_constraints[linear_constraint_id] 510 # First update the watchers 511 for watcher in self._update_trackers: 512 if linear_constraint_id < watcher.linear_constraints_checkpoint: 513 watcher.linear_constraint_deletes.add(linear_constraint_id) 514 watcher.linear_constraint_lbs.discard(linear_constraint_id) 515 watcher.linear_constraint_ubs.discard(linear_constraint_id) 516 for var_id in con.variable_nonzeros: 517 if var_id < watcher.variables_checkpoint: 518 watcher.linear_constraint_matrix.discard( 519 (linear_constraint_id, var_id) 520 ) 521 # Then update self. 522 for var_id in con.variable_nonzeros: 523 self.variables[var_id].linear_constraint_nonzeros.remove( 524 linear_constraint_id 525 ) 526 del self._linear_constraint_matrix[(linear_constraint_id, var_id)] 527 del self.linear_constraints[linear_constraint_id]
535 def set_linear_constraint_lb(self, linear_constraint_id: int, lb: float) -> None: 536 self._check_linear_constraint_id(linear_constraint_id) 537 if lb == self.linear_constraints[linear_constraint_id].lower_bound: 538 return 539 self.linear_constraints[linear_constraint_id].lower_bound = lb 540 for watcher in self._update_trackers: 541 if linear_constraint_id < watcher.linear_constraints_checkpoint: 542 watcher.linear_constraint_lbs.add(linear_constraint_id)
544 def set_linear_constraint_ub(self, linear_constraint_id: int, ub: float) -> None: 545 self._check_linear_constraint_id(linear_constraint_id) 546 if ub == self.linear_constraints[linear_constraint_id].upper_bound: 547 return 548 self.linear_constraints[linear_constraint_id].upper_bound = ub 549 for watcher in self._update_trackers: 550 if linear_constraint_id < watcher.linear_constraints_checkpoint: 551 watcher.linear_constraint_ubs.add(linear_constraint_id)
565 def get_linear_constraints(self) -> Iterator[int]: 566 yield from self.linear_constraints.keys()
Yields the linear constraint ids in order of creation.
568 def set_linear_constraint_coefficient( 569 self, linear_constraint_id: int, variable_id: int, value: float 570 ) -> None: 571 self._check_linear_constraint_id(linear_constraint_id) 572 self._check_variable_id(variable_id) 573 if value == self._linear_constraint_matrix.get( 574 (linear_constraint_id, variable_id), 0.0 575 ): 576 return 577 if value == 0.0: 578 self._linear_constraint_matrix.pop( 579 (linear_constraint_id, variable_id), None 580 ) 581 self.variables[variable_id].linear_constraint_nonzeros.discard( 582 linear_constraint_id 583 ) 584 self.linear_constraints[linear_constraint_id].variable_nonzeros.discard( 585 variable_id 586 ) 587 else: 588 self._linear_constraint_matrix[(linear_constraint_id, variable_id)] = value 589 self.variables[variable_id].linear_constraint_nonzeros.add( 590 linear_constraint_id 591 ) 592 self.linear_constraints[linear_constraint_id].variable_nonzeros.add( 593 variable_id 594 ) 595 for watcher in self._update_trackers: 596 if ( 597 variable_id < watcher.variables_checkpoint 598 and linear_constraint_id < watcher.linear_constraints_checkpoint 599 ): 600 watcher.linear_constraint_matrix.add( 601 (linear_constraint_id, variable_id) 602 )
604 def get_linear_constraint_coefficient( 605 self, linear_constraint_id: int, variable_id: int 606 ) -> float: 607 self._check_linear_constraint_id(linear_constraint_id) 608 self._check_variable_id(variable_id) 609 return self._linear_constraint_matrix.get( 610 (linear_constraint_id, variable_id), 0.0 611 )
613 def get_linear_constraints_with_variable(self, variable_id: int) -> Iterator[int]: 614 self._check_variable_id(variable_id) 615 yield from self.variables[variable_id].linear_constraint_nonzeros
Yields the linear constraints with nonzero coefficient for a variable in undefined order.
617 def get_variables_for_linear_constraint( 618 self, linear_constraint_id: int 619 ) -> Iterator[int]: 620 self._check_linear_constraint_id(linear_constraint_id) 621 yield from self.linear_constraints[linear_constraint_id].variable_nonzeros
Yields the variables with nonzero coefficient in a linear constraint in undefined order.
623 def get_linear_constraint_matrix_entries( 624 self, 625 ) -> Iterator[model_storage.LinearConstraintMatrixIdEntry]: 626 for (constraint, variable), coef in self._linear_constraint_matrix.items(): 627 yield model_storage.LinearConstraintMatrixIdEntry( 628 linear_constraint_id=constraint, 629 variable_id=variable, 630 coefficient=coef, 631 )
Yields the nonzero elements of the linear constraint matrix in undefined order.
633 def clear_objective(self) -> None: 634 for variable_id in self.linear_objective_coefficient: 635 for watcher in self._update_trackers: 636 if variable_id < watcher.variables_checkpoint: 637 watcher.linear_objective_coefficients.add(variable_id) 638 self.linear_objective_coefficient.clear() 639 for key in self._quadratic_objective_coefficients.keys(): 640 for watcher in self._update_trackers: 641 if key.id2 < watcher.variables_checkpoint: 642 watcher.quadratic_objective_coefficients.add(key) 643 self._quadratic_objective_coefficients.clear() 644 self.set_objective_offset(0.0)
Clears objective coefficients and offset. Does not change direction.
646 def set_linear_objective_coefficient(self, variable_id: int, value: float) -> None: 647 self._check_variable_id(variable_id) 648 if value == self.linear_objective_coefficient.get(variable_id, 0.0): 649 return 650 if value == 0.0: 651 self.linear_objective_coefficient.pop(variable_id, None) 652 else: 653 self.linear_objective_coefficient[variable_id] = value 654 for watcher in self._update_trackers: 655 if variable_id < watcher.variables_checkpoint: 656 watcher.linear_objective_coefficients.add(variable_id)
662 def get_linear_objective_coefficients( 663 self, 664 ) -> Iterator[model_storage.LinearObjectiveEntry]: 665 for var_id, coef in self.linear_objective_coefficient.items(): 666 yield model_storage.LinearObjectiveEntry( 667 variable_id=var_id, coefficient=coef 668 )
Yields the nonzero linear objective terms in undefined order.
670 def set_quadratic_objective_coefficient( 671 self, first_variable_id: int, second_variable_id: int, value: float 672 ) -> None: 673 self._check_variable_id(first_variable_id) 674 self._check_variable_id(second_variable_id) 675 updated = self._quadratic_objective_coefficients.set_coefficient( 676 first_variable_id, second_variable_id, value 677 ) 678 if updated: 679 for watcher in self._update_trackers: 680 if ( 681 max(first_variable_id, second_variable_id) 682 < watcher.variables_checkpoint 683 ): 684 watcher.quadratic_objective_coefficients.add( 685 _QuadraticKey(first_variable_id, second_variable_id) 686 )
Sets the objective coefficient for the product of two variables.
The ordering of the input variables does not matter.
Arguments:
- first_variable_id: The first variable in the product.
- second_variable_id: The second variable in the product.
- value: The value of the coefficient.
Raises:
- BadVariableIdError if first_variable_id or second_variable_id are not in
- the model.
688 def get_quadratic_objective_coefficient( 689 self, first_variable_id: int, second_variable_id: int 690 ) -> float: 691 self._check_variable_id(first_variable_id) 692 self._check_variable_id(second_variable_id) 693 return self._quadratic_objective_coefficients.get_coefficient( 694 first_variable_id, second_variable_id 695 )
Gets the objective coefficient for the product of two variables.
The ordering of the input variables does not matter.
Arguments:
- first_variable_id: The first variable in the product.
- second_variable_id: The second variable in the product.
Raises:
- BadVariableIdError if first_variable_id or second_variable_id are not in
- the model.
Returns:
The value of the coefficient.
697 def get_quadratic_objective_coefficients( 698 self, 699 ) -> Iterator[model_storage.QuadraticEntry]: 700 yield from self._quadratic_objective_coefficients.coefficients()
Yields the nonzero quadratic objective terms in undefined order.
702 def get_quadratic_objective_adjacent_variables( 703 self, variable_id: int 704 ) -> Iterator[int]: 705 self._check_variable_id(variable_id) 706 yield from self._quadratic_objective_coefficients.get_adjacent_variables( 707 variable_id 708 )
Yields the variables multiplying a variable in the objective function.
Variables are returned in an unspecified order.
For example, if variables x and y have ids 0 and 1 respectively, and the quadratic portion of the objective is x^2 + 2 x*y, then get_quadratic_objective_adjacent_variables(0) = (0, 1).
Arguments:
- variable_id: Function yields the variables multiplying variable_id in the objective function.
Yields:
The variables multiplying variable_id in the objective function.
Raises:
- BadVariableIdError if variable_id is not in the model.
730 def export_model(self) -> model_pb2.ModelProto: 731 m: model_pb2.ModelProto = model_pb2.ModelProto() 732 m.name = self._name 733 _variables_to_proto(self.variables.items(), m.variables) 734 _linear_constraints_to_proto( 735 self.linear_constraints.items(), m.linear_constraints 736 ) 737 m.objective.maximize = self._is_maximize 738 m.objective.offset = self._objective_offset 739 if self.linear_objective_coefficient: 740 obj_ids, obj_coefs = zip(*sorted(self.linear_objective_coefficient.items())) 741 m.objective.linear_coefficients.ids.extend(obj_ids) 742 m.objective.linear_coefficients.values.extend(obj_coefs) 743 if self._quadratic_objective_coefficients: 744 first_var_ids, second_var_ids, coefficients = zip( 745 *sorted( 746 [ 747 (entry.id_key.id1, entry.id_key.id2, entry.coefficient) 748 for entry in self._quadratic_objective_coefficients.coefficients() 749 ] 750 ) 751 ) 752 m.objective.quadratic_coefficients.row_ids.extend(first_var_ids) 753 m.objective.quadratic_coefficients.column_ids.extend(second_var_ids) 754 m.objective.quadratic_coefficients.coefficients.extend(coefficients) 755 if self._linear_constraint_matrix: 756 flat_matrix_items = [ 757 (con_id, var_id, coef) 758 for ((con_id, var_id), coef) in self._linear_constraint_matrix.items() 759 ] 760 lin_con_ids, var_ids, lin_con_coefs = zip(*sorted(flat_matrix_items)) 761 m.linear_constraint_matrix.row_ids.extend(lin_con_ids) 762 m.linear_constraint_matrix.column_ids.extend(var_ids) 763 m.linear_constraint_matrix.coefficients.extend(lin_con_coefs) 764 return m
766 def add_update_tracker(self) -> model_storage.StorageUpdateTracker: 767 tracker = _UpdateTracker(self) 768 self._update_trackers.add(tracker) 769 return tracker
Creates a StorageUpdateTracker registered with self to view model changes.
771 def remove_update_tracker( 772 self, tracker: model_storage.StorageUpdateTracker 773 ) -> None: 774 self._update_trackers.remove(tracker) 775 tracker.retired = True
Stops tracker from getting updates on model changes in self.
An error will be raised if tracker is not a StorageUpdateTracker created by this Model that has not previously been removed.
Using an UpdateTracker (via checkpoint or export_update) after it has been removed will result in an error.
Arguments:
- tracker: The StorageUpdateTracker to unregister.
Raises:
- KeyError: The tracker was created by another model or was already removed.