Source code for Irene.sdp

from .base import base

from numpy import array, zeros, matrix, float64
from time import time


[docs] class sdp(base): r""" This is the class which intends to solve semidefinite programs in primal format: .. math:: \left\lbrace \begin{array}{lll} \min & \sum_{i=1}^m b_i x_i & \\ \textrm{subject to} & & \\ & \sum_{i=1}^m A_{ij}x_i - C_j \succeq 0 & j=1,\dots,k. \end{array}\right. For the argument `solver` following sdp solvers are supported (if they are installed): + `CVXOPT`, + `CSDP`, + `SDPA`, + `DSDP`. """ Solvers = ['CVXOPT', 'SDPA', 'CSDP', 'DSDP'] SolverOptions = {} Info = {} def __init__(self, solver='cvxopt', solver_path=None): solver_upper = solver.upper() if isinstance(solver, str) else None if solver_upper not in self.Solvers: raise ValueError("Currently the following solvers are supported: 'CVXOPT', 'SDPA', 'CSDP', 'DSDP'") super(sdp, self).__init__() if solver_path: self.Path = dict(solver_path) self.solver = solver_upper self.BlockStruct = [] self.b = None self.A = [] self.C = [] self.CvxOpt_Available = False self.ErrorString = "" self.solver_options = {} self.Info = {} self.num_constraints = 0 self.num_blocks = 0 # checks the availability of solver if self.solver not in self.AvailableSDPSolvers(): raise ImportError("The solver '%s' is not available" % solver)
[docs] def SetObjective(self, b): r""" Takes the coefficients of the objective function. """ self.b = b
[docs] def AddConstraintBlock(self, A): r""" This takes a list of square matrices which corresponds to coefficient of :math:`x_i`. Simply, :math:`A_i=[A_{i1},\dots,A_{ik}]`. Note that the :math:`i^{th}` call of ``AddConstraintBlock`` fills the blocks associated with :math:`i^{th}` variable :math:`x_i`. """ BlkStc = [] for blk in A: BlkStc.append(blk.shape[0]) if (self.BlockStruct != []) and (self.BlockStruct == BlkStc): self.A.append(A) elif not self.BlockStruct: self.BlockStruct = BlkStc self.A.append(A) else: raise TypeError("The block structure is inconsistent.")
[docs] def AddConstantBlock(self, C): r""" `C` must be a list of ``numpy`` matrices that represent :math:`C_j` for each `j`. This method sets the value for :math:`C=[C_1,\dots,C_k]`. """ BlkStc = [] for blk in C: BlkStc.append(blk.shape[0]) if (self.BlockStruct != []) and (self.BlockStruct == BlkStc): self.C = C elif not self.BlockStruct: self.BlockStruct = BlkStc self.C = C else: raise TypeError("The block structure is inconsistent.")
[docs] def Option(self, param, val): r""" Sets the `param` option of the solver to `val` if the solver accepts such an option. The following options are supported by solvers: + ``CVXOPT``: + ``show_progress``: ``True`` or ``False``, turns the output to the screen on or off (default: ``True``); + ``maxiters``: maximum number of iterations (default: 100); + ``abstol``: absolute accuracy (default: 1e-7); + ``reltol``: relative accuracy (default: 1e-6); + ``feastol``: tolerance for feasibility conditions (default: 1e-7); + ``refinement``: number of iterative refinement steps when solving KKT equations (default: 0 if the problem has no second-order cone or matrix inequality constraints; 1 otherwise). + ``SDPA``: + ``maxIteration``: Maximum number of iterations. The SDPA stops when the iteration exceeds ``maxIteration``; + ``epsilonStar``, ``epsilonDash``: The accuracy of an approximate optimal solution of the SDP; + ``lambdaStar``: This parameter determines an initial point; + ``omegaStar``: This parameter determines the region in which the SDPA searches an optimal solution; + ``lowerBound``: Lower bound of the minimum objective value of the primal problem; + ``upperBound``: Upper bound of the maximum objective value of the dual problem; + ``betaStar``: Parameter controlling the search direction when current state is feasible; + ``betaBar``: Parameter controlling the search direction when current state is infeasible; + ``gammaStar``: Reduction factor for the primal and dual step lengths; 0.0 < ``gammaStar`` < 1.0. """ self.SolverOptions[param] = val
@staticmethod def _coerce_float(value): try: return float(value) except (TypeError, ValueError) as exc: raise TypeError("SDP data must be numeric") from exc def _objective_values_as_floats(self): return [self._coerce_float(value) for value in self.b]
[docs] def write_sdpa_dat(self, filename): r""" Writes the semidefinite program in the file `filename` with dense SDPA format. """ f = open(filename, 'w') f.write("%d=mDIM\n" % len(self.b)) f.write("%d=nBLOCK\n" % len(self.C)) f.write(str(self.BlockStruct).replace( '[', '{').replace(']', '}') + "=bLOCKsTRUCT\n") objective = ', '.join('{:.16g}'.format(value) for value in self._objective_values_as_floats()) f.write('{' + objective + "}\n") f.write('{\n') for B in self.C: f.write(str(B).replace('[', '{').replace(']', '}') + '\n') f.write('}\n') for B in self.A: f.write('{\n') for Bl in B: f.write(str(Bl).replace('[', '{').replace(']', '}') + '\n') f.write('}\n') f.close()
[docs] def write_sdpa_dat_sparse(self, filename): r""" Writes the semidefinite program in the file `filename` with sparse SDPA format. Uses sparse iteration over non-zero entries to avoid dense nested loops. """ import numpy as np sparse_zero_tol = 1e-12 with open(filename, 'w') as f: f.write("%d = mDIM\n" % len(self.b)) f.write("%d = nBLOCK\n" % len(self.C)) f.write(str(self.BlockStruct).replace('[', '').replace( ']', '').replace(',', ' ') + " = bLOCKsTRUCT\n") objective = ' '.join('{:.16g}'.format(value) for value in self._objective_values_as_floats()) f.write(objective + "\n") mat_no = 0 blk_no = 1 for B in self.C: # Use argwhere to iterate only over non-zero entries nonzero_idx = np.argwhere(np.abs(B) > sparse_zero_tol) for idx in nonzero_idx: i, j = idx[0], idx[1] # Only output upper triangular part (i <= j) if i <= j: val = B[i, j] f.write("%d %d %d %d %f\n" % (mat_no, blk_no, i + 1, j + 1, val)) blk_no += 1 for B in self.A: mat_no += 1 blk_no = 1 for Bl in B: # Use argwhere to iterate only over non-zero entries nonzero_idx = np.argwhere(np.abs(Bl) > sparse_zero_tol) for idx in nonzero_idx: i, j = idx[0], idx[1] # Only output upper triangular part (i <= j) if i <= j: val = Bl[i, j] f.write("%d %d %d %d %f\n" % (mat_no, blk_no, i + 1, j + 1, val)) blk_no += 1
[docs] @staticmethod def parse_solution_matrix(iterator): r""" Parses and returns the matrices and vectors found by `SDPA` solver. This was taken from `ncpol2sdpa` and customized for `Irene`. """ import numpy as np solution_matrix = [] while True: sol_mat = None in_matrix = False i = 0 row = None for row in iterator: stripped = row.strip() if stripped.find('}') < 0: continue if stripped.startswith('}'): if sol_mat is None: return solution_matrix break if stripped.find('{') < 0: raise ValueError("Malformed solution matrix row") if stripped.find('{') != stripped.rfind('{'): in_matrix = True numbers = stripped[ stripped.rfind('{') + 1:stripped.find('}')].strip().split(',') if len(numbers) == 1 and numbers[0] == '': raise ValueError("Empty solution matrix row") if sol_mat is None: sol_mat = np.empty((len(numbers), len(numbers))) elif len(numbers) != sol_mat.shape[1]: raise ValueError("Inconsistent solution matrix row width") if i >= sol_mat.shape[0]: raise ValueError("Too many rows in solution matrix") for j, number in enumerate(numbers): sol_mat[i, j] = float(number) i += 1 if stripped.find('}') != stripped.rfind('}') or not in_matrix: break if sol_mat is None: return solution_matrix if i != sol_mat.shape[0]: raise ValueError("Incomplete solution matrix") solution_matrix.append(sol_mat) if row is not None and row.strip().startswith('}'): break return solution_matrix
[docs] def read_sdpa_out(self, filename): r""" Extracts information from `SDPA`'s output file `filename`. This was taken from `ncpol2sdpa` and customized for `Irene`. """ primal = None dual = None x_mat = None y_mat = None xVec = None status_string = None total_time = None with open(filename, 'r') as file_: for line in file_: if line.find("objValPrimal") > -1: primal = float((line.split())[2]) if line.find("objValDual") > -1: dual = float((line.split())[2]) if line.find("total time") > -1: total_time = float((line.split('='))[1]) if line.find("xMat =") > -1: x_mat = self.parse_solution_matrix(file_) if line.find("yMat =") > -1: y_mat = self.parse_solution_matrix(file_) if line.find("xVec =") > -1: line = next(file_) xVec = array([float(m) for m in line.replace( '{', '').replace('}', '').split(',')]) if line.find("phase.value") > -1: if (line.find("pdOPT") > -1) or line.find("pdFEAS") > -1: status_string = 'Optimal' elif line.find("noINFO") > -1: status_string = 'Optimal' elif line.find("INF") > -1: status_string = 'Infeasible' elif line.find("UNBD") > -1: status_string = 'Unbounded' else: status_string = 'Unknown' for var in [primal, dual, status_string]: if var is None: status_string = 'invalid' break for var in [x_mat, y_mat]: if var is None: status_string = 'invalid' break self.Info['PObj'] = primal self.Info['DObj'] = dual self.Info['X'] = y_mat self.Info['Z'] = x_mat self.Info['y'] = xVec self.Info['Status'] = status_string self.Info['CPU'] = total_time
[docs] def sdpa_param(self): r""" Produces sdpa.param file from ``SolverOptions``. """ f = open("param.sdpa", 'w') if 'maxIteration' in self.SolverOptions: f.write("%d unsigned int maxIteration;\n" % self.SolverOptions['maxIteration']) else: f.write("40 unsigned int maxIteration;\n") if 'epsilonStar' in self.SolverOptions: f.write("%f double 0.0 < epsilonStar;\n" % self.SolverOptions['epsilonStar']) else: f.write("1.0E-7 double 0.0 < epsilonStar;\n") if 'lambdaStar' in self.SolverOptions: f.write("%f double 0.0 < lambdaStar;\n" % self.SolverOptions['lambdaStar']) else: f.write("1.0E2 double 0.0 < lambdaStar;\n") if 'omegaStar' in self.SolverOptions: f.write("%f double 1.0 < omegaStar;\n" % self.SolverOptions['omegaStar']) else: f.write("2.0 double 1.0 < omegaStar;\n") if 'lowerBound' in self.SolverOptions: f.write("%f double lowerBound;\n" % self.SolverOptions['lowerBound']) else: f.write("-1.0E5 double lowerBound;\n") if 'upperBound' in self.SolverOptions: f.write("%f double upperBound;\n" % self.SolverOptions['upperBound']) else: f.write("1.0E5 double upperBound;\n") if 'betaStar' in self.SolverOptions: f.write("%f double 0.0 <= betaStar < 1.0;\n" % self.SolverOptions['betaStar']) else: f.write("0.1 double 0.0 <= betaStar < 1.0;\n") if 'betaBar' in self.SolverOptions: f.write("%f double 0.0 <= betaBar < 1.0, betaStar <= betaBar;\n" % self.SolverOptions['betaBar']) else: f.write("0.2 double 0.0 <= betaBar < 1.0, betaStar <= betaBar;\n") if 'gammaStar' in self.SolverOptions: f.write("%f double 0.0 < gammaStar < 1.0;\n" % self.SolverOptions['gammaStar']) else: f.write("0.9 double 0.0 < gammaStar < 1.0;\n") if 'epsilonDash' in self.SolverOptions: f.write("%f double 0.0 < epsilonDash;\n" % self.SolverOptions['epsilonDash']) else: f.write("1.0E-7 double 0.0 < epsilonDash;\n") f.close()
[docs] def read_csdp_out(self, filename, txt): r""" Takes a file name and a string that are the outputs of `CSDP` as a file and command line outputs of the solver and extracts the required information. """ primal = None dual = None total_time = None Status = 'Unknown' progress = txt.split('\n') for line in progress: if line.find("Success") > -1: Status = 'Optimal' elif line.find("Primal objective value") > -1: primal = float(line.split(':')[1]) elif line.find("Dual objective value") > -1: dual = float(line.split(':')[1]) elif line.find("Total time") > -1: total_time = float(line.split(':')[1]) with open(filename, 'r') as file_: line = file_.readline() x_tokens = line.split() if not x_tokens: raise ValueError("Malformed CSDP solution vector") xVec = array([float(token) for token in x_tokens]) X = [zeros((d, d)) for d in self.BlockStruct] Z = [zeros((d, d)) for d in self.BlockStruct] for line in file_: entity = line.split() if not entity: continue if len(entity) != 5: raise ValueError("Malformed CSDP solution row") mat_type, block_idx, row_idx, col_idx, value = entity block_no = int(block_idx) - 1 row_no = int(row_idx) - 1 col_no = int(col_idx) - 1 if int(mat_type) == 1: Z[block_no][row_no][col_no] = float(value) elif int(mat_type) == 2: X[block_no][row_no][col_no] = float(value) # self.BlockStruct self.Info['PObj'] = primal self.Info['DObj'] = dual self.Info['X'] = X self.Info['Z'] = Z self.Info['y'] = xVec self.Info['Status'] = Status self.Info['CPU'] = total_time
[docs] @staticmethod def VEC(M): """ Converts the matrix M into a column vector acceptable by `CVXOPT`. """ V = [] n, m = M.shape for j in range(m): for i in range(n): V.append(M[i, j]) return V
[docs] def CvxOpt(self): r""" This calls `CVXOPT` and `DSDP` to solve the initiated semidefinite program. """ try: from cvxopt import solvers from cvxopt.base import matrix as Mtx RealNumber = float # Required for CvxOpt Integer = int # Required for CvxOpt self.CvxOpt_Available = True except Exception as e: self.CvxOpt_Available = False self.ErrorString = "CVXOPT is not available." raise Exception(self.ErrorString) self.solver_options = {} self.Info = {} self.num_constraints = len(self.A) self.num_blocks = len(self.C) # Build Ccvxopt: objective matrix blocks Ccvxopt = [-Mtx(M, tc='d') for M in self.C] # Build Acvxopt: constraint matrix blocks Acvxopt = [] for blk_no in range(self.num_blocks): Ablock = [self.VEC(constraint[blk_no]) for constraint in self.A] Acvxopt.append(-Mtx(matrix(Ablock).transpose(), tc='d')) # Build acvxopt: objective vector using direct numpy approach b_coerced = [self._coerce_float(elmnt) for elmnt in self.b] acvxopt = Mtx(array(b_coerced).reshape(-1, 1), tc='d') # CvxOpt options for param in self.SolverOptions: solvers.options[param] = self.SolverOptions[param] start1 = time() try: # if True: sol = solvers.sdp(acvxopt, Gs=Acvxopt, hs=Ccvxopt, solver=self.solver.lower()) elapsed1 = (time() - start1) if sol['status'] != 'optimal': self.Info = {'Status': 'Infeasible'} else: self.Info = {'Status': 'Optimal', 'DObj': sol['dual objective'], 'PObj': sol['primal objective'], 'Wall': elapsed1, 'CPU': None, 'y': array( list(sol['x'])), 'Z': []} for ds in sol['ss']: self.Info['Z'].append( array(list(ds)).reshape(*ds.size)) self.Info['X'] = [] for ds in sol['zs']: self.Info['X'].append( array(list(ds)).reshape(*ds.size)) except Exception as e: self.Info = {'Status': 'Infeasible'} self.Info['solver'] = self.solver
[docs] def sdpa(self): r""" Calls `SDPA` to solve the initiated semidefinite program. """ import subprocess prg_file = "prg.dat" out_file = "out.res" self.sdpa_param() par_file = "param.sdpa" if not self.BlockStruct: self.BlockStruct = [len(B) for B in self.C] self.write_sdpa_dat(prg_file) try: subprocess.run( [self.Path['sdpa'], "-dd", prg_file, "-o", out_file, "-p", par_file], check=True, capture_output=True, text=True, ) except (OSError, subprocess.SubprocessError) as exc: raise RuntimeError("SDPA execution failed") from exc self.read_sdpa_out(out_file)
[docs] def csdp(self): r""" Calls `SDPA` to solve the initiated semidefinite program. """ import subprocess prg_file = "prg.dat-s" out_file = "out.res" if not self.BlockStruct: self.BlockStruct = [len(B) for B in self.C] self.write_sdpa_dat_sparse(prg_file) try: completed = subprocess.run( [self.Path['csdp'], prg_file, out_file], check=True, capture_output=True, text=True, ) except (OSError, subprocess.SubprocessError) as exc: raise RuntimeError("CSDP execution failed") from exc out = completed.stdout self.read_csdp_out(out_file, out)
[docs] def solve(self): r""" Solves the initiated semidefinite program according to the requested solver. """ if self.solver in ['CVXOPT', 'DSDP']: self.CvxOpt() elif self.solver == 'SDPA': self.sdpa() elif self.solver == 'CSDP': self.csdp()
def __str__(self): out_text = "Semidefinite program with\n" out_text += " # variables:" + str(len(self.C)) + "\n" out_text += " # constraints:" + str(len(self.A)) + "\n" out_text += " with solver:" + self.solver return out_text def __latex__(self): return "SDP(%d, %d, %s)" % (len(self.C), len(self.A), self.solver)