From 69b50e1eb2b0344d55d8e8edec70f3697f58de26 Mon Sep 17 00:00:00 2001 From: bgoli Date: Fri, 2 Aug 2019 10:10:11 +0200 Subject: [PATCH] removed comment --- src/CBModel.py | 2 +- src/oldsolver/CBCPLEX.py | 941 --------------------------------- src/oldsolver/CBGLPK.py | 791 ---------------------------- src/oldsolver/CBSolver.py | 1027 ------------------------------------- src/oldsolver/__init__.py | 0 5 files changed, 1 insertion(+), 2760 deletions(-) delete mode 100644 src/oldsolver/CBCPLEX.py delete mode 100644 src/oldsolver/CBGLPK.py delete mode 100644 src/oldsolver/CBSolver.py delete mode 100644 src/oldsolver/__init__.py diff --git a/src/CBModel.py b/src/CBModel.py index d6acc85..ff0d0ff 100644 --- a/src/CBModel.py +++ b/src/CBModel.py @@ -5080,7 +5080,7 @@ def createAssociationAndGeneRefsFromTree(self, gprtree, altlabels=None): else: #print('createAssociationAndGeneRefs', gid, label) if gid is None or gid is 'None': - print(self.getTree(), genes, gid, label) + #print(self.getTree(), genes, gid, label) continue self.__objref__().addGene(Gene(gid, label, active=True)) self.addGeneref(gid) diff --git a/src/oldsolver/CBCPLEX.py b/src/oldsolver/CBCPLEX.py deleted file mode 100644 index 04e4a9f..0000000 --- a/src/oldsolver/CBCPLEX.py +++ /dev/null @@ -1,941 +0,0 @@ -''' -Created on Nov 12, 2014 - -@author: arne -''' - -# preparing for Python 3 port -from __future__ import division, print_function -from __future__ import absolute_import -#from __future__ import unicode_literals - -import os, time, gc, random -# this is a hack that needs to be streamlined a bit -try: - import cStringIO as csio -except ImportError: - import io as csio - -HAVE_SYMPY = False -try: - import sympy - if int(sympy.__version__.split('.')[1]) >= 7 and int(sympy.__version__.split('.')[2]) >= 4: - HAVE_SYMPY = True - else: - del sympy -except ImportError: - HAVE_SYMPY = False -HAVE_SCIPY = False -try: - from scipy.sparse import csr - HAVE_SCIPY = True -except ImportError: - HAVE_SCIPY = False - -import numpy -import cplex -from cplex.exceptions import CplexError -from cbmpy import CBWrite, CBTools -from cbmpy.CBConfig import __CBCONFIG__ as __CBCONFIG__ - -from solver.CBSolver import SolverFactory, Solver - - -__DEBUG__ = __CBCONFIG__['DEBUG'] -__version__ = __CBCONFIG__['VERSION'] - -""" -Sets the noise level of the solver CPLX_RESULT_STREAM can be:: - - - *None* silent i.e. no output - - *file* set solver to silent and output logs to *CPLX_RESULT_STREAM_FILE* cplex_output.log - - *iostream* set solver to silent and output logs to *CPLX_RESULT_STREAM_IO* csio - - *'default'* or anything else noisy with full output closes STREAM_IO and STREAM_FILE (default) -""" -global CPLX_RESULT_STREAM, CPLX_RESULT_STREAM_FILE, CPLX_RESULT_STREAM_IO, CPLX_SILENT_MODE -CPLX_RESULT_STREAM = 'default' -CPLX_RESULT_STREAM_FILE = None -CPLX_RESULT_STREAM_IO = None -CPLX_SILENT_MODE = True - -class CPLEXSolver(Solver): - def __init__(self, lp=None): - if lp==None: - self.lp = cplex.Cplex() - else: - self.lp = lp - - def fixConSense(self, operator): - """ - Fixes the sense of inequality operators, returns corrected sense symbol - - - *operator* the operator to check - - """ - - if operator in ['<=','<','L']: - oPr = 'L' - elif operator in ['>=','>','G']: - oPr = 'G' - elif operator in ['=','E']: - oPr = 'E' - else: - raise RuntimeError('INFO: invalid operator: %s' % operator) - return oPr - - - def solve(self, method='o'): - """ solves the current problem """ - alg = self.lp.parameters.lpmethod.values - - if method == "o": - self.lp.parameters.lpmethod.set(alg.auto) - elif method == "p": - self.lp.parameters.lpmethod.set(alg.primal) - elif method == "d": - self.lp.parameters.lpmethod.set(alg.dual) - elif method == "b": - self.lp.parameters.lpmethod.set(alg.barrier) - self.lp.parameters.barrier.crossover.set(self.lp.parameters.barrier.crossover.values.none) - elif method == "h": - self.lp.parameters.lpmethod.set(alg.barrier) - elif method == "s": - self.lp.parameters.lpmethod.set(alg.sifting) - elif method == "c": - self.lp.parameters.lpmethod.set(alg.concurrent) - else: - print("Unrecognized option, using automatic") - self.lp.parameters.lpmethod.set(alg.auto) - try: - self.lp.solve() - except cplex.exceptions.CplexSolverError as ex: - print("Exception raised during solve:\n\t\"{}\"".format(ex)) - return None - # solution.get_status() returns an integer code - status = self.lp.solution.get_status() - if status == self.lp.solution.status.optimal: - if not CPLX_SILENT_MODE: - print("INFO: Model is optimal:", status) - elif status == self.lp.solution.status.unbounded: - if not CPLX_SILENT_MODE: - print("INFO: Model is unbounded:", status) - return status - elif status == self.lp.solution.status.infeasible: - if not CPLX_SILENT_MODE: - print("INFO: Model is infeasible:", status) - return status - elif status == self.lp.solution.status.infeasible_or_unbounded: - if not CPLX_SILENT_MODE: - print("INFO: Model is infeasible or unbounded:", status) - return status - - if not CPLX_SILENT_MODE: - s_method = self.lp.solution.get_method() - print("Solution status = " , status, ":", end=" ") - # the following line prints the status as a string - print(self.lp.solution.status[status]) - print("Solution method = ", s_method, ":", end=" ") - print(self.lp.solution.method[s_method]) - - s_type = self.lp.solution.get_solution_type() - if s_type == self.lp.solution.type.none: - print("INFO: No solution available") - return None - else: - if not CPLX_SILENT_MODE: - print("Objective value = " , self.lp.solution.get_objective_value()) - return status - - def setMIPGapTolerance(self,tol): - """ - Sets the the relative MIP gap tolerance - """ - self.lp.parameters.mip.tolerances.mipgap.set(tol) - print('New MIP gap tolerance', self.lp.parameters.mip.tolerances.mipgap.get()) - - def setSolverParameters(self,parset): - pass - - def solveMILP(self, auto_mipgap=False): - """ - Solve and MILP - - - *auto_mipgap* auto decrease mipgap until mipgap == absmipgap - - """ - try: - self.lp.solve() - if self.lp.solution.get_status() == self.lp.solution.status.optimal_tolerance: - if auto_mipgap: - while self.lp.parameters.mip.tolerances.mipgap.get() >= self.lp.parameters.mip.tolerances.absmipgap.get(): - self.lp.parameters.mip.tolerances.mipgap.set(self.lp.parameters.mip.tolerances.mipgap.get()/10.0) - self.lp.solve() - if self.lp.solution.get_status() == self.lp.solution.status.MIP_optimal: - break - print('\n\nMILP solution gap tolerance set to: %s\n\n' % self.lp.parameters.mip.tolerances.mipgap.get()) - time.sleep(5) - if self.lp.solution.get_status() == self.lp.solution.status.optimal_tolerance: - print('\n\nMILP solution found with gap tolerance consider adjusting gap tolerance with cplx_setGapTolerance\n\n') - #raise RuntimeWarning - except cplex.exceptions.CplexSolverError as ex: - print("\nException raised during solve\n") - print(ex) - - def getObjectiveValue(self): - """ returns current objective value (typically valid after solve) """ - return self.lp.solution.get_objective_value() - - def getObjectiveId(self): - """ returns the name of the current objective function """ - return self.lp.objective.get_name() - - def getSolutionStatus(self): - """ - Returns one of: - - - *LPS_OPT*: solution is optimal; - - *LPS_FEAS*: solution is feasible; - - *LPS_INFEAS*: solution is infeasible; - - *LPS_NOFEAS*: problem has no feasible solution; - - *LPS_UNBND*: problem has unbounded solution; - - *LPS_UNDEF*: solution is undefined. - - *LPS_NONE*: no solution - - """ - # solution.get_status() returns an integer code - if self.lp.solution.get_solution_type() == self.lp.solution.type.none: - print("\nNo solution available\n") - return 'LPS_NONE' - status = self.lp.solution.get_status() - if status == self.lp.solution.status.optimal: - if not CPLX_SILENT_MODE: - print("Model is optimal") - return 'LPS_OPT' - elif status == self.lp.solution.status.feasible: - print("\nModel is feasible") - return 'LPS_FEAS' - elif status == self.lp.solution.status.unbounded: - print("\nModel is unbounded") - return 'LPS_UNBND' - elif status == self.lp.solution.status.infeasible: - print("\nModel is infeasible") - return 'LPS_INFEAS' - elif status == self.lp.solution.status.infeasible_or_unbounded: - print("\nModel is infeasible or unbounded") - return 'LPS_INFEAS or LPS_UNBND' - elif status == self.lp.solution.status.MIP_optimal: - print('MILP optimal') - return 'MILP_OPT' - elif status == self.lp.solution.status.MIP_optimal: - print('MILP optimal') - return 'MILP_OPT' - elif status == self.lp.solution.status.optimal_tolerance: - print('MILP optimal within gap tolerance') - return 'MILP_OPTTOL' - else: - return 'LPS_NONE' - - def isDualFeasible(self): - """ checks if problem has been solved to dual feasibility """ - pass - - def addLinearConstraint(self, name, coef, rhs, sense): - """ adds an additional linear constraint. - - Warning: Adding constraints manually might be much slower than creating - the whole model from a metabolic network at once - """ - ind = [] - val = [] - for colname, colval in coef: - ind.append(colname) - val.append(colval) - lexp = cplex.SparsePair(ind, val) - self.lp.linear_constraints.add(lin_expr=[lexp], senses=sense, rhs=[rhs], names=[name]) - del ind, val, lexp - - def addVariables(self, names, lb=None, ub=None, obj=None): - """ adds additional variables. - - - *names* list of variable names - - *lb* list of lower bounds - - *ub* list of upper bounds - - *obj* list of objective values - - if only one variable should be added, the lists can also be replaced by - plain values - - Warning: Adding constraints manually might be much slower than creating - the whole model from a metabolic network at once - """ - pass - - def setLowerBounds(self, bounds): - """ sets lower bounds of given variables - - bounds should be a dictionary mapping variable names to the new bounds - """ - lbounds = [] - for colName, colBound in bounds.items(): - lbounds.append((colName, colBound)) - self.lp.variables.set_lower_bounds(lbounds) - - def setUpperBounds(self, bounds): - """ sets upper bounds of given variables - - bounds should be a dictionary mapping variable names to the new bounds - """ - ubounds = [] - for colName, colBound in bounds.items(): - ubounds.append((colName, colBound)) - self.lp.variables.set_upper_bounds(ubounds) - - def setObjective(self, name='obj', coef=None, sense='maximize', reset=True): - """ sets the objective. - - - *name* name of the objective - - *coef* dictionary mapping variable names to the new coefficients - - *sense* objective sense, can be one of 'min' or 'max' - - *reset* if all other objective coefficients should be reset - - Note that there is a major memory leak in - `c.variables.get_names()` whch is used when reset=True. - A workaround is implemented in the old code of cplx_setObjective2 which - takes *names* as an input. - """ - sense = sense.lower() - if sense == 'max': sense = 'maximize' - if sense == 'min': sense = 'minimize' - if sense in ['maximise', 'minimise']: - sense = sense.replace('se','ze') - assert sense in ['maximize', 'minimize'], "\nsense must be ['maximize', 'minimize'] not %s" % sense - self.lp.objective.set_name(name) - if coef != None: - if reset: - variables = self.lp.variables.get_names() - new_obj = [] - for varname in self.lp.variables: - if coef.hasKey(varname): - new_obj.append((varname, coef[varname])) - else: - new_obj.append((varname, 0.0)) - self.lp.objective.set_linear(new_obj) - del new_obj, variables - else: - new_obj = [] - for varname, varcoef in coef.items(): - new_obj.append((varname, varcoef)) - else: - variables = self.lp.variables.get_names() - new_obj = [] - for varname in self.lp.variables: - new_obj.append((varname, 0.0)) - self.lp.objective.set_linear(new_obj) - del new_obj, variables - - if sense == 'minimize': - self.lp.objective.set_sense(self.lp.objective.sense.minimize) - if __DEBUG__: print('Set minimizing') - else: - self.lp.objective.set_sense(self.lp.objective.sense.maximize) - if __DEBUG__: print('Set maximizing') - pass - - - def getObjectiveCoef(self, obj=None): - """ fetches the objective coefficients of the given variables - - - obj: dictionary that has variable names as keys, the coefficients will - be written in the corresponding values. - If obj is None, a new dictionary for all variables is created - """ - - - def setRHS(self, rhs): - """ sets right-hand sides of given constraints - - rhs should be a dictionary mapping constraint names to the new rhs - """ - pass - - def setSense(self, sense): - """ sets the sense of given constraints - - sense should be a dictionary mapping constraint names to the new sense - """ - pass - - def deleteVariables(self, variables): - """ deletes the variables with specified names """ - pass - - def deleteConstraints(self, cons): - """ delete constraints with specified names """ - pass - - def getSolution(self): - """ - extract the primal solution - """ - s_val = [] - s_name = [] - fba_sol = {} - try: - s_val = self.lp.solution.get_values() - s_name = self.lp.variables.get_names() - for n in range(len(s_name)): - fba_sol.update({s_name[n] : s_val[n]}) - except Exception as ex: - print(ex) - print('WARNING: No solution to get') - s_val = [] - s_name = [] - fba_sol = {} - del s_name,s_val - return fba_sol - - def getReducedCosts(self, scaled=False): - """ - Extract ReducedCosts from LP and return as a dictionary 'Rid' : reduced cost - - - *scaled* scale the reduced cost by the optimal flux value - """ - s_name = self.lp.variables.get_names() - r_costs = self.lp.solution.get_reduced_costs() - objf_val = self.lp.solution.get_objective_value() - output = {} - s_val = None - if scaled: - s_val = self.lp.solution.get_values() - for v in range(len(s_name)): - if scaled: - try: - r_val = r_costs[v]*s_val[v]/objf_val - except: - r_val = 0.0 - else: - r_val = r_costs[v] - output.update({s_name[v] : r_val}) - del s_name, r_costs, s_val - return output - - def getShadowPrices(self): - """ - Returns a dictionary of shadow prices containing 'Rid' : (lb, rhs, ub) - """ - c_names = self.lp.linear_constraints.get_names() - rhs_sense = self.lp.solution.sensitivity.rhs() - rhs_val = self.lp.linear_constraints.get_rhs() - output = {} - for s in range(self.lp.linear_constraints.get_num()): - output.update({c_names[s] : (rhs_sense[s][0], rhs_val[s], rhs_sense[s][1])}) - return output - - def getSensitivities(self): - """ - Get the sensitivities of each constraint on the objective function with input - - Output is a tuple of bound and objective sensitivities where the objective - sensitivity is described in the CPLEX reference manual as:: - - ... the objective sensitivity shows each variable, its reduced cost and the range over - which its objective function coefficient can vary without forcing a change - in the optimal basis. The current value of each objective coefficient is - also displayed for reference. - - - *objective coefficient sensitivity* {flux : (reduced_cost, lower_obj_sensitivity, coeff_value, upper_obj_sensitivity)} - - *rhs sensitivity* {constraint : (low, value, high)} - - *bound sensitivity ranges* {flux : (lb_low, lb_high, ub_low, ub_high)} - - """ - SENSE_RHS = {} - SENSE_BND = {} - SENSE_OBJ = {} - c_names = self.lp.linear_constraints.get_names() - rhs_val = self.lp.linear_constraints.get_rhs() - j_names = self.lp.variables.get_names() - - rhs_sense = self.lp.solution.sensitivity.rhs() - bnd_sense = self.lp.solution.sensitivity.bounds() - obj_sense = self.lp.solution.sensitivity.objective() - obj_coeff = self.lp.objective.get_linear() - red_cost = self.lp.solution.get_reduced_costs() - - for r in range(self.lp.variables.get_num()): - SENSE_BND.update({j_names[r] : (bnd_sense[r][0], bnd_sense[r][1], bnd_sense[r][2], bnd_sense[r][3])}) - SENSE_OBJ.update({j_names[r] : (red_cost[r], obj_sense[r][0], obj_coeff[r], obj_sense[r][1])}) - - for s in range(self.lp.linear_constraints.get_num()): - SENSE_RHS.update({c_names[s] : (rhs_sense[s][0], rhs_val[s], rhs_sense[s][1])}) - - return (SENSE_OBJ, SENSE_RHS, SENSE_BND) - - def getDualValues(self): - """ - Get the get the dual values of the solution - - Output is a dictionary of {name : value} pairs - - """ - d_names = self.lp.linear_constraints.get_names() - d_values = self.lp.solution.get_dual_values() - output = {} - for j in range(len(d_names)): - output.update({d_names[j] : d_values[j]}) - return output - - def write(self, filename, title=None, Dir=None): - """ write problem to file""" - if Dir != None: - filename = os.path.join(Dir, filename) - if title != None: - self.lp.set_problem_name(title) - self.lp.write(filename+'.lp', filetype='lp') - print('LP output as {}'.format(filename+'.lp')) - - - def setOutputStreams(self, mode='default'): - """ - Sets the noise level of the solver, mode can be one of: - - - *None* silent i.e. no output - - *'file'* set solver to silent and output logs to *CPLX_RESULT_STREAM_FILE* cplex_output.log - - *'iostream'* set solver to silent and output logs to *CPLX_RESULT_STREAM_IO* csio - - *'default'* or anything else noisy with full output closes STREAM_IO and STREAM_FILE (default) - """ - global CPLX_RESULT_STREAM - global CPLX_SILENT_MODE - global CPLX_RESULT_STREAM_FILE - global CPLX_RESULT_STREAM_IO - CPLX_RESULT_STREAM = mode - - if CPLX_RESULT_STREAM == None: - self.lp.set_log_stream(None) - self.lp.set_results_stream(None) - self.lp.set_warning_stream(None) - CPLX_SILENT_MODE = True - elif CPLX_RESULT_STREAM == 'file': - if CPLX_RESULT_STREAM_FILE == None: - CPLX_RESULT_STREAM_FILE = file('cplex_output.log', 'w') - self.lp.set_log_stream(CPLX_RESULT_STREAM_FILE) - self.lp.set_results_stream(CPLX_RESULT_STREAM_FILE) - CPLX_SILENT_MODE = True - elif CPLX_RESULT_STREAM == 'iostream': - if CPLX_RESULT_STREAM_IO == None: - CPLX_RESULT_STREAM_IO = csio.StringIO() - self.lp.set_log_stream(CPLX_RESULT_STREAM_IO) - self.lp.set_results_stream(CPLX_RESULT_STREAM_IO) - CPLX_SILENT_MODE = True - else: - CPLX_RESULT_STREAM = 'default' - CPLX_SILENT_MODE = False - try: - CPLX_RESULT_STREAM_IO.close() - except AttributeError: - pass - try: - CPLX_RESULT_STREAM_FILE.close() - except AttributeError: - pass - - CPLX_RESULT_STREAM_IO = None - CPLX_RESULT_STREAM_FILE = None - self.lp.set_log_stream(os.sys.stdout) - self.lp.set_results_stream(os.sys.stdout) - -class CPLEXFactory(SolverFactory): - - def createEmpty(self): - """ creates a new empty solver instance """ - solver = CPLEXSolver() - return solver - - def create(self, fba = None, fname=None): - """creates a new solver instance. - - If an fba instance (metabolic network) is given, the solver is setup - to solve the fba instance. - Otherwise an empty problem is returned for manual setup. - - *fname* optional filename if defined writes out the constructed lp - """ - - if fba == None: - solver = CPLEXSolver() - return solver - else: - _Stime = time.time() - # defines - csense = 'E' # constraint sense - - # define model and add variables - solver = CPLEXSolver() - lp = solver.lp - lp.set_problem_name('%s' % (fba.getPid())) - lp.variables.add(names=fba.N.col) - # define objective - osense = fba.getActiveObjective().operation.lower() - if osense in ['minimize', 'minimise', 'min']: - lp.objective.set_sense(lp.objective.sense.minimize) - elif osense in ['maximize', 'maximise', 'max']: - lp.objective.set_sense(lp.objective.sense.maximize) - else: - raise RuntimeError('\n%s - is not a valid objective operation' % osense) - lp.objective.set_name(fba.getActiveObjective().getPid()) - lp.objective.set_linear([(fo.reaction, fo.coefficient) for fo in fba.getActiveObjective().fluxObjectives]) - - ## old fashioned way - #told = time.time() - #lin_expr = [] - #rhs =[] - #names = [] - #senses = [] - #for lc_ in range(fba.N.shape[0]): - #ids = [] - #val = [] - #for c_ in range(len(fba.N.array[lc_])): - #if fba.N.array[lc_, c_] != 0.0: - #ids.append(fba.N.col[c_]) - #val.append(fba.N.array[lc_, c_]) - #lin_expr.append(cplex.SparsePair(ids, val)) - #rhs.append(fba.N.RHS[lc_]) - #senses.append(fba.N.operators[lc_]) - #names.append(fba.N.row[lc_]) - #lp.linear_constraints.add(lin_expr=lin_expr, senses=senses, rhs=rhs, names=names) - #print 'Old style lc:', time.time() - told - - ## the numpy way - #tnew = time.time() - lin_expr = [] - rhs =[] - names = [] - senses = [] - if HAVE_SYMPY and fba.N.__array_type__ == sympy.MutableDenseMatrix: - print('INFO: CPLEX requires floating point, converting N') - Nmat = numpy.array(fba.N.array).astype('float') - RHSmat = numpy.array(fba.N.RHS).astype('float') - if fba.CM != None: - CMmat = numpy.array(fba.CM.array).astype('float') - CMrhs = numpy.array(fba.CM.RHS).astype('float') - else: - Nmat = fba.N.array - RHSmat = fba.N.RHS - if fba.CM != None: - CMmat = fba.CM.array - CMrhs = fba.CM.RHS - - GOSCI = False - if HAVE_SCIPY and fba.N.__array_type__ == csr.csr_matrix: - GOSCI = True - - for n in range(Nmat.shape[0]): - if not GOSCI: - nz = Nmat[n].nonzero()[0] - else: - nz = Nmat[n].nonzero()[1] - lin_expr.append(cplex.SparsePair([fba.N.col[c] for c in nz], [Nmat[n,c] for c in nz])) - rhs.append(RHSmat[n]) - senses.append(solver.fixConSense(fba.N.operators[n])) - names.append(fba.N.row[n]) - #print senses - lp.linear_constraints.add(lin_expr=lin_expr, senses=senses, rhs=rhs, names=names) - #print 'New style lc:', time.time() - tnew - - # add user defined constraints - if fba.CM != None: - ## the numpy way - #t2new = time.time() - lin_expr = [] - rhs =[] - names = [] - senses = [] - for n in range(CMmat.shape[0]): - if not GOSCI: - nz = CMmat[n].nonzero()[0] - else: - nz = CMmat[n].nonzero()[1] - - lin_expr.append(cplex.SparsePair([fba.CM.col[c] for c in nz], [CMmat[n,c] for c in nz])) - rhs.append(CMrhs[n]) - senses.append(solver.fixConSense(fba.CM.operators[n])) - names.append(fba.CM.row[n]) - lp.linear_constraints.add(lin_expr=lin_expr, senses=senses, rhs=rhs, names=names) - #print 'New style lc:', time.time() - t2new - - # add bounds - lb = [] - ub = [] - for b_ in fba.flux_bounds: - btype = b_.getType() - bvalue = b_.getValue() - if bvalue in ['Infinity', 'inf', 'Inf', 'infinity']: - bvalue = cplex.infinity - elif bvalue in ['-Infinity', '-inf', '-Inf', '-infinity']: - bvalue = -cplex.infinity - elif numpy.isinf(bvalue): - if bvalue > 0.0: - bvalue = cplex.infinity - else: - bvalue = -cplex.infinity - if btype == 'lower': - lb.append((b_.reaction, bvalue)) - elif btype == 'upper': - ub.append((b_.reaction, bvalue)) - elif btype == 'equality': - ub.append((b_.reaction, bvalue)) - lb.append((b_.reaction, bvalue)) - if len(lb) > 0: - lp.variables.set_lower_bounds(lb) - if len(ub) > 0: - lp.variables.set_upper_bounds(ub) - - print('\ncplx_constructLPfromFBA time: {}\n'.format(time.time() - _Stime)) - if fname != None: - lp.write(fname+'.lp', filetype='lp') - return lp - - def getModelFromLP(self, lptFile, Dir=None): - """ - Load a LPT (CPLEX format) file and return a CPLX LP model - - - *lptfile* an CPLEX LP format file - - *Dir* an optional directory - - """ - if Dir != None: - assert os.path.exists(Dir), '\nIncorrect path' - lptFile = os.path.join(Dir, lptFile) - lp = cplex.Cplex(lptFile) - return CPLEXSolver(lp) - - def getCPLEXModelFromLP(self, lptFile, Dir=None): - """ - Load a LPT (CPLEX format) file and return a CPLX LP model - - - *lptfile* an CPLEX LP format file - - *Dir* an optional directory - - """ - return self.getModelFromLP(lptFile, Dir) - - - def getModelFromObj(self, fba): - """ - Return a CPLEX object from a FBA model object (via LP file) - """ - _Stime = time.time() - modId = fba.id - ## fba.id = '_cplxtmp_%s_.tmp' % random.randint(0,5) - fba.id = '_cplxtmp_.tmp' - LPF = CBWrite.writeModelLP(fba, work_dir=os.getcwd()) - fba.id = modId - x = self.getModelFromLP(LPF) - print('\ncplx_getModelFromObj time: {}\n'.format(time.time() - _Stime)) - return x - - def minimizeNumActiveFluxes(self, fba, selected_reactions=None, pre_opt=True, tol=None, objF2constr=True, rhs_sense='lower', optPercentage=100.0, work_dir=None, quiet=False, debug=False, objective_coefficients={}, return_lp_obj=False, populate=None, oldlpgen=False): - """ - Minimize the sum of active fluxes, updates the model with the values of the solution and returns the value - of the MILP objective function (not the model objective function which remains unchanged). If population mode is activated - output is as described below: - - Min: sum(Bi) - Bi = 0 -> Ci Ji = 0 - - Such that: - NJi = 0 - Jbio = opt - - where: - Binary Bi - - Arguments: - - - *fba* an FBA model object - - *selected reactions* [default=None] means use all reactions otherwise use the reactions listed here - - *pre_opt* [default=True] attempt to presolve the FBA and report its results in the ouput, if this is diabled and *objF2constr* is True then the vid/value of the current active objective is used - - *tol* [default=None] do not floor/ceiling the objective function constraint, otherwise round of to *tol* - - *rhs_sense* [default='lower'] means objC >= objVal the inequality to use for the objective constraint can also be *upper* or *equal* - Note this does not necessarily mean the upper or lower bound, although practically it will. If in doubt use *equal* - - *optPercentage* [default=100.0] means the percentage optimal value to use for the RHS of the objective constraint: optimal_value * (optPercentage/100.0) - - *work_dir* [default=None] the MSAF working directory for temporary files default = cwd+fva - - *debug* [default=False] if True write out all the intermediate MSAF LP's into work_dir - - *quiet* [default=False] if enabled supress CPLEX output - - *objF2constr* [default=True] add the model objective function as a constraint using rhs_sense etc. If - this is True with pre_opt=False then the id/value of the active objective is used to form the constraint - - *objective_coefficients* [default={}] a dictionary of (reaction_id : float) pairs that provide the are introduced as objective coefficients to the absolute flux value. Note that the default value of the coefficient (non-specified) is +1. - - *return_lp_obj* [default=False] off by default when enabled it returns the CPLEX LP object - - *populate* [default=None] enable search algorithm to find multiple (sub)optimal solutions. Set with a tuple of (RELGAP=0.0, POPULATE_LIMIT=20, TIME_LIMIT=300) suggested values only. - - *RELGAP* [default=0.0] relative gap to optimal solution - - *POPULATE_LIMIT* [default=20] terminate when so many solutions have been found - - *TIME_LIMIT* [default=300] terminate search after so many seconds important with higher values of *POPULATION_LIMIT* - - *with_reduced_costs* [default='uncsaled'] can be 'scaled', 'unscaled' or anything else which is None - - With outputs: - - - *mincnt* the objective function value OR - - *mincnt, cpx* the objective function and cplex model OR - - *populate_data, mincnt* a population data set OR - - *populate_data, mincnt, cpx* both the cps object and population data set - - depending on selected flags. - - """ - if fba.SCALED_REDUCED_COSTS: - with_reduced_costs = 'scaled' - else: - with_reduced_costs = 'unscaled' - if work_dir == None: - work_dir = os.getcwd() - else: - assert os.path.exists(work_dir), '\nWhat did you think would happen now!' - if debug: - debug_dir = os.path.join(work_dir,'DEBUG') - if not os.path.exists(debug_dir): - os.mkdir(debug_dir) - - base_reaction_names = fba.getReactionIds() - - # generate a presolution - cpx = OPTIMAL_PRESOLUTION = None - pre_sol = pre_oid = pre_oval = None - REDUCED_COSTS = {} - cpx, pre_sol, pre_oid, pre_oval, OPTIMAL_PRESOLUTION, REDUCED_COSTS = self.presolve(fba, pre_opt, objF2constr, quiet=quiet, oldlpgen=oldlpgen, with_reduced_costs=with_reduced_costs) - # if required add the objective function as a constraint - if objF2constr: - cpx.setObjectiveFunctionAsConstraint(rhs_sense, pre_oval, optPercentage) - - STORED_OPT = None - if pre_opt: - STORED_OPT = pre_oval - else: - STORED_OPT = fba.getActiveObjective().getValue() - - # minimize the absolute sum of fluxes - if selected_reactions != None: - rids = fba.getReactionIds() - for r in selected_reactions: - assert r in rids, "\n%s is not a valid reaction name" % r - else: - selected_reactions = fba.getReactionIds() - - # this removes the objective function (redundant I think) - #fba_obj_ids = fba.getActiveObjective().getFluxObjectiveReactions() - #print fba_obj_ids - #for R in fba_obj_ids: - #if R in selected_reactions: - #selected_reactions.pop(selected_reactions.index(R)) - #del R, fba_obj_ids - NUM_FLX = len(selected_reactions) - print('Total number of reactions: {}'.format(NUM_FLX)) - bin_selected_reactions = ['bin_%s' % r for r in selected_reactions] - #print bin_selected_reactions - # bin_J in [0,1] - ## lbs = [0.0]*len(bin_selected_reactions) - ## ubs = [numpy.inf]*len(bin_selected_reactions) - ## cpx.variables.add(lb=lbs, ub=ubs, names=bin_selected_reactions) - #cpx.variables.add(names=bin_selected_reactions) - cpx.lp.variables.add(names=bin_selected_reactions,lb=numpy.zeros(len(bin_selected_reactions)),\ - ub=numpy.ones(len(bin_selected_reactions))) - """ - cpx.variables.add(obj=[], lb=[], ub=[], types='', names=[], columns=[]) - """ - - for bv in bin_selected_reactions: - cpx.lp.variables.set_types(bv, cpx.lp.variables.type.binary) - - obj_func = {} - - for r in range(len(selected_reactions)): - name1 = 'ind_'+selected_reactions[r] - ## print selected_reactions[r] - lin_expr1 = cplex.SparsePair(ind = [selected_reactions[r]], val = [1.0]) - sense1 = 'E' - rhs1 = 0.0 - indvar1 = cpx.lp.variables.get_indices(bin_selected_reactions[r]) - complemented1 = 1 - cpx.lp.indicator_constraints.add(lin_expr=lin_expr1, sense=sense1, rhs=rhs1, indvar=indvar1, complemented=complemented1, name=name1) - #obj_func.append((1, bin_selected_reactions[r])) - - if selected_reactions[r] in objective_coefficients: - obj_func[bin_selected_reactions[r]] = objective_coefficients[selected_reactions[r]] - #obj_func.append((objective_coefficients[selected_reactions[r]], bin_selected_reactions[r])) - else: - obj_func[bin_selected_reactions[r]] = 1 - #obj_func.append((1, bin_selected_reactions[r])) - - #print obj_func - #cpx.write('dump.lp') - cpx.setObjective('MNAF', obj_func, 'min', reset=True) - - if debug: - cpx.writeLPtoLPTfile('MNAF_base_(%s)' % time.time() , title=None, Dir=debug_dir) - ## cplx_writeLPtoLPTfile(cpx, 'MNAV_base_%s' % time.time() , title=None, Dir=debug_dir) - - #cpx.write('dump.lp') - - if populate == None: - cpx.solveMILP() # cpx.solve() - cpx.setFBAsolutionToModel(fba, with_reduced_costs=None) - cpx.setReducedCostsToModel(fba, REDUCED_COSTS) - minCnt = cpx.getObjectiveValue() - fba.getActiveObjective().setValue(STORED_OPT) - if quiet: - cpx.setOutputStreams(mode='default') - print('\nMinimizeNumActiveFluxes objective value: {}'.format(minCnt)) - if not return_lp_obj: - return round(minCnt, 2) - else: - return round(minCnt, 2), cpx - else: - RELGAP = populate[0] #0.0 # relative gap to optimal solution - POPULATE_LIMIT = populate[1] #20 - TIME_LIMIT = populate[2] #300 # seconds - INTENSITY = cpx.lp.parameters.mip.pool.intensity.values.very_aggressive - DIVERSITY = cpx.lp.parameters.mip.pool.replace.values.diversity - #DIVERSITY = cpx.parameters.mip.pool.replace.values.firstin_firstout - #DIVERSITY = cpx.parameters.mip.pool.replace.values.worst_objective - ABSGAP = 0.0 - cpx.lp.parameters.mip.pool.relgap.set(RELGAP) # Gunnar - cpx.lp.parameters.mip.pool.absgap.set(ABSGAP) # check this - cpx.lp.parameters.mip.pool.intensity.set(INTENSITY) # get "all" (sub)optimal solutions - cpx.lp.parameters.mip.limits.populate.set(POPULATE_LIMIT) - cpx.lp.parameters.mip.pool.replace.set(DIVERSITY) - cpx.lp.parameters.timelimit.set(TIME_LIMIT) - #cpx.solve() - cpx.lp.populate_solution_pool() - - population = [] - var_names = cpx.lp.variables.get_names() - var_num = cpx.lp.variables.get_num() - pop_names = cpx.lp.solution.pool.get_names() - pop_num = cpx.lp.solution.pool.get_num() - print('CPLEX solution pool: {}'.format(pop_num)) - - #pop_bin = [] - for p in range(pop_num): - sol = cpx.lp.solution.pool.get_values(p) - binSum = 0 - for j_ in range(len(var_names)-1,-1,-1): - if var_names[j_] not in base_reaction_names: - a = sol.pop(j_) - b = var_names.pop(j_) - binSum += round(a, 2) - #pop_bin.append(binSum) - population.append(sol) - population.insert(0, base_reaction_names) - for j_ in range(len(population[0])): - fba.getReaction(population[0][j_]).setValue(population[1][j_]) - try: - cpx.setReducedCosts(fba, REDUCED_COSTS) - except Exception as ex: - print(ex) - minCnt = cpx.lp.solution.get_objective_value() - print('\nMinimizeNumActiveFluxes objective value: {}\n'.format(round(minCnt, 2))) - fba.getActiveObjective().setValue(STORED_OPT) - if quiet: - cpx.setOutputStreams(mode='default') - if not return_lp_obj: - print('\nINFO: population scan now returns population and objective') - return population, round(minCnt, 2) - else: - return population, round(minCnt, 2), cpx \ No newline at end of file diff --git a/src/oldsolver/CBGLPK.py b/src/oldsolver/CBGLPK.py deleted file mode 100644 index 986f117..0000000 --- a/src/oldsolver/CBGLPK.py +++ /dev/null @@ -1,791 +0,0 @@ -''' -Created on Nov 12, 2014 - -@author: arne -''' - -# preparing for Python 3 port -from __future__ import division, print_function -from __future__ import absolute_import -#from __future__ import unicode_literals - -HAVE_SYMPY = False -try: - import sympy - if int(sympy.__version__.split('.')[1]) >= 7 and int(sympy.__version__.split('.')[2]) >= 4: - HAVE_SYMPY = True - else: - del sympy -except ImportError: - HAVE_SYMPY = False - -import os, time -import numpy -from cbmpy import CBWrite -from cbmpy.CBConfig import __CBCONFIG__ as __CBCONFIG__ -__DEBUG__ = __CBCONFIG__['DEBUG'] -__version__ = __CBCONFIG__['VERSION'] - - -HAVE_GLPK = False -GLPK_SOLUTION_STATUS = None - -try: - import glpk - lp = glpk.LPX() - HAVE_GLPK = True - del lp -except: - raise ImportError - -# configuration options for GLPK -GLPK_CFG = {'simplex' : {'meth' : glpk.LPX.PRIMAL, - #'meth' : glpk.LPX.DUAL, - #'meth' : glpk.LPX.DUALP, - 'tol_bnd' : 1.0e-6, - 'tol_dj' : 1.0e-6, - 'tol_piv' : 1.0e-10 - } - } - -GLPK_STATUS = { - 1 : 'LPS_UNDEF', - 2 : 'LPS_FEAS', - 3 : 'LPS_INFEAS', - 4 : 'LPS_NOFEAS', - 5 : 'LPS_OPT', - 6 : 'LPS_UNBND'} - -GLPK_STATUS2 = { - 'opt' : 'LPS_OPT', - 'undef' : 'LPS_UNDEF', - 'feas' : 'LPS_FEAS', - 'infeas' : 'LPS_INFEAS', - 'nofeas' : 'LPS_NOFEAS', - 'unbnd' : 'LPS_UNBND'} - - -GLPK_SILENT_MODE = True -GLPK_INFINITY = 1.0e9 - - -from .CBSolver import SolverFactory, Solver - -class GLPKSolver(Solver): - - def __init__(self): - """ - Create a GLPK LP in memory. - - *fba* an FBA object - - *fname* optional filename if defined writes out the constructed lp - - """ - # initialize empty lp - self.lp = glpk.LPX() - self.varMap = {} - self.conMap = {} - - def copy(self): - mps_filename = '_{}_.mps'.format(str(time.time()).split('.')[0]) - self.lp.write(mps=mps_filename) - copylp = GLPKSolver() - copylp.lp = glpk.LPX(mps=mps_filename) - copylp.varMap = self.varMap.copy() - copylp.conMap = self.conMap.copy() - os.remove(mps_filename) - return copylp - - def solve(self, method='s'): - """ - Solve the LP and create a status attribute with the solution status - - - *method* [default='s'] 's' = simplex, 'i' = interior, 'e' = exact - - GLPK solver options can be set in the dictionary GLPK_CFG - """ - if method == 'i': - self.lp.interior() - elif method == 'e': - self.lp.exact() - else: - self.lp.simplex(**GLPK_CFG['simplex']) - - - global GLPK_SOLUTION_STATUS - GLPK_SOLUTION_STATUS = self.getSolutionStatus() - - if GLPK_SOLUTION_STATUS in ['LPS_UNDEF', 'LPS_FEAS', 'LPS_INFEAS', 'LPS_NOFEAS', 'LPS_OPT', 'LPS_UNBND']: - if not GLPK_SILENT_MODE: - print('Solution status returned as: {}'.format(GLPK_SOLUTION_STATUS)) - print("Objective value = " , self.lp.obj.value) - return GLPK_SOLUTION_STATUS - else: - print("INFO: No solution available ({})".format(GLPK_SOLUTION_STATUS)) - return None - - def getSolutionStatus(self): - """ - Returns one of: - - - *LPS_OPT*: solution is optimal; - - *LPS_FEAS*: solution is feasible; - - *LPS_INFEAS*: solution is infeasible; - - *LPS_NOFEAS*: problem has no feasible solution; - - *LPS_UNBND*: problem has unbounded solution; - - *LPS_UNDEF*: solution is undefined. - - """ - return GLPK_STATUS2[self.lp.status] - - def isDualFeasible(self): - """ checks if problem has been solved to dual feasibility """ - return self.lp.status_dual == 'feas' - - def getSolution(self): - fba_sol = {} - try: - for n in self.lp.cols: - fba_sol.update({n.name : n.value}) - except Exception as ex: - print(ex) - print('WARNING: No solution to get') - fba_sol = {} - return fba_sol - - def getReducedCosts(self, scaled=False): - """ - Extract ReducedCosts from LP and return as a dictionary 'Rid' : reduced cost - - - *c* a GLPK LP object - - *scaled* scale the reduced cost by the optimal flux value - - """ - s_name = [] - r_costs = [] - s_val = [] - - for c_ in self.lp.cols: - s_name.append(c_.name) - r_costs.append(c_.dual) - s_val.append(c_.value) - objf_val = self.lp.obj.value - output = {} - for v in range(len(s_name)): - if scaled: - try: - r_val = r_costs[v]*s_val[v]/objf_val - except: - r_val = float('nan') - else: - r_val = r_costs[v] - output.update({s_name[v] : r_val}) - del s_name, r_costs, s_val - return output - - def getObjectiveValue(self): - """ returns current objective value (typically valid after solve) """ - return self.lp.obj.value - - def getObjectiveId(self): - """ returns the name of the current objective function """ - return self.lp.obj.name - - def getObjectiveCoef(self, obj=None): - if obj == None: - obj = {} - for cidx_ in range(len(self.lp.cols)): - name = self.lp.cols[cidx_].name - obj[name] = self.lp.obj[cidx_] - else: - for name in obj.keys(): - cidx = self.varMap[name] - obj[name] = self.lp.obj[cidx] - return obj - - def addLinearConstraint(self, name, coef, rhs, sense): - """ adds an additional linear constraint. - - Warning: Adding constraints manually might be much slower than creating - the whole model from a metabolic network at once - """ - baseRows = len(self.lp.rows) - self.lp.rows.add(1) - - # name and coefficients - newCon = [] - for colname, val in coef.items(): - cidx = self.varMap[colname] - newCon.append((cidx, val)) -# print(newCon[-1]) - self.lp.rows[baseRows].name = name - self.lp.rows[baseRows].matrix = newCon - self.conMap[name] = baseRows - - # sense and rhs - if sense in ['<=','<','L']: - self.lp.rows[baseRows].bounds = None, rhs - elif sense in ['>=','>','G']: - self.lp.rows[baseRows].bounds = rhs, None - elif sense in ['=','E']: - self.lp.rows[baseRows].bounds = rhs - else: - raise RuntimeError('INFO: invalid operator: %s' % sense) - - def addVariables(self, names, lb=None, ub=None, obj=None): - """ adds additional variables. - - - *names* list of variable names - - *lb* list of lower bounds - - *ub* list of upper bounds - - *obj* list of objective values - - if only one variable should be added, the lists can also be replaced by - plain values - - Warning: Adding constraints manually might be much slower than creating - the whole model from a metabolic network at once - """ - - basecols = len(self.lp.cols) - if isinstance(names, list): - self.lp.cols.add(len(names)) - if lb == None: - lb = len(names)*[None] - if ub == None: - ub = len(names)*[None] - for v_ in range(len(names)): - self.lp.cols[basecols+v_].name = names[v_] - self.lp.cols[basecols+v_].bounds = lb[v_], ub[v_] - self.varMap[names[v_]] = basecols+v_ - if obj != None: - self.lp.obj[basecols+v_] = obj[v_] - else: - self.lp.cols.add(1) - self.lp.cols[basecols].name = names - self.lp.cols[basecols].bounds = lb, ub - self.varMap[names] = basecols - if obj != None: - self.lp.obj[basecols] = obj - - - def setBounds(self, bounds): - """ sets lower bounds of given variables - - bounds should be a dictionary mapping variable names to the new bounds - the new bounds for each variable are given by the pair (lb, ub) - """ - for colName, colBound in bounds.items(): - cidx = self.varMap[colName] - self.lp.cols[cidx].bounds = colBound - - def getBounds(self, bounds=None): - """ fetches the bounds of the given variables - - - bounds: dictionary that has variable names as keys, the bounds will - be written in the corresponding values. - If bounds is None, a new dictionary for all variables is created - """ - if bounds == None: - bounds = {} - for cidx_ in range(len(self.lp.cols)): - name = self.lp.cols[cidx_].name - bounds[name] = self.lp.cols[cidx_].bounds - else: - for name in bounds.keys(): - cidx = self.varMap[name] - bounds[name] = self.lp.cols[cidx].bounds - return bounds - - def setLowerBounds(self, bounds): - """ sets lower bounds of given variables - - bounds should be a dictionary mapping variable names to the new bounds - """ - for colName, colBound in bounds.items(): - cidx = self.varMap[colName] - oldlb, oldub = self.lp.cols[cidx].bounds - self.lp.cols[cidx].bounds = colBound, oldub - - def setUpperBounds(self, bounds): - """ sets upper bounds of given variables - - bounds should be a dictionary mapping variable names to the new bounds - """ - for colName, colBound in bounds.items(): - cidx = self.varMap[colName] - oldlb, oldub = self.lp.cols[cidx].bounds - self.lp.cols[cidx].bounds = oldlb, colBound - - def setObjective(self, name='obj', coef=None, sense='maximize', reset=True): - """ sets the objective - - - *name* name of the objective - - *coef* dictionary mapping variable names to the new coefficients - - *sense* objective sense, can be one of 'min' or 'max' - - *reset* if all other objective coefficients should be reset - """ - sense = sense.lower() - if sense == 'max': sense = 'maximize' - if sense == 'min': sense = 'minimize' - if sense in ['maximise', 'minimise']: - sense = sense.replace('se','ze') - assert sense in ['maximize', 'minimize'], "\nsense must be ['maximize', 'minimize'] not %s" % sense - - if coef != None: - if reset: - for cidx_ in range(len(self.lp.cols)): - name = self.lp.cols[cidx_].name - if coef.has_key(name): - self.lp.obj[cidx_] = coef[name] - else: - self.lp.obj[cidx_] = 0.0 - - else: - for name, val in coef.items(): - idx = self.varMap[name] - self.lp.obj[idx] = val - elif reset: - for cidx_ in range(len(self.lp.cols)): - name = self.lp.cols[cidx_].name - self.lp.obj[cidx_] = 0.0 - - self.lp.obj.name = name - - if sense == 'minimize': - self.lp.obj.maximize = False - if __DEBUG__: print('Set minimizing') - else: - self.lp.obj.maximize = True - if __DEBUG__: print('Set maximizing') - - - def setRHS(self, rhs): - """ sets right-hand sides of given constraints - - rhs should be a dictionary mapping constraint names to the new rhs - """ - for rowName, rowRHS in rhs.items(): - ridx = self.conMap[rowName] - oldlhs, oldrhs = self.lp.rows[ridx].bounds - if oldlhs != None: - if oldrhs != None: - self.lp.rows[ridx].bounds = rowRHS - else: - self.lp.rows[ridx].bounds = (rowRHS, None) - else: - self.lp.rows[ridx].bounds = (None, rowRHS) - - def setSense(self, sense): - """ sets the sense of given constraints - - sense should be a dictionary mapping constraint names to the new sense - """ - for rowName, rowSense in sense.items(): - ridx = self.conMap[rowName] - oldlhs, oldrhs = self.lp.rows[ridx].bounds - if oldlhs != None: - if oldrhs != None: - assert oldlhs == oldrhs, 'we can not deal with this feature' - old = oldlhs - else: - old = oldrhs - # sense and rhs - if rowSense in ['<=','<','L']: - self.lp.rows[ridx].bounds = None, old - elif rowSense in ['>=','>','G']: - self.lp.rows[ridx].bounds = old, None - elif rowSense in ['=','E']: - self.lp.rows[ridx].bounds = old - else: - raise RuntimeError('INFO: invalid operator: %s' % sense) - - def deleteVariables(self, variables): - """ deletes the variables with specified names """ - pass - - def deleteConstraints(self, cons): - """ delete constraints with specified names """ - pass - - def write(self, filename, title=None, Dir=None): - """ - Write out a GLPK model as an LP format file - - """ - if Dir != None: - filename = os.path.join(Dir, filename) - #if title != None: - #c.set_problem_name(title) - #c.write(filename+'.lp', filetype='lp') - self.lp.write(cpxlp=filename+'.lp') - print('LP output as {}'.format(filename+'.lp')) - - -class GLPKFactory(SolverFactory): - - def createEmpty(self): - """ creates a new empty solver instance """ - return GLPKSolver() - - def create(self, fba=None, fname=None): - """creates a new solver instance. - - If an fba instance (metabolic network) is given, the solver is setup - to solve the fba instance. - Otherwise an empty problem is returned for manual setup. - - - *fba* optional FBA object - - *fname* optional filename if defined writes out the constructed lp (if fba is given) - """ - if fba == None: - return self.createEmpty() - else: - _Stime = time.time() - - solver = GLPKSolver() - # we want to work directly on the glpk instance, so fetch it! - lp = solver.lp - lp.name = fba.getPid() - lp.cols.add(len(fba.N.col)) - - - if HAVE_SYMPY and fba.N.__array_type__ == sympy.MutableDenseMatrix: - print('INFO: GLPK requires floating point, converting N') - Nmat = numpy.array(fba.N.array).astype('float') - RHSmat = numpy.array(fba.N.RHS).astype('float') - if fba.CM != None: - CMmat = numpy.array(fba.CM.array).astype('float') - CMrhs = numpy.array(fba.CM.RHS).astype('float') - else: - Nmat = fba.N.array - RHSmat = fba.N.RHS - if fba.CM != None: - CMmat = fba.CM.array - CMrhs = fba.CM.RHS - - varMap = {} - for n_ in range(Nmat.shape[1]): - varMap[fba.N.col[n_]] = n_ - lp.cols[n_].name = fba.N.col[n_] - - #print varMap - - # define objective - osense = fba.getActiveObjective().operation.lower() - if osense in ['minimize', 'minimise', 'min']: - lp.obj.maximize = False - elif osense in ['maximize', 'maximise', 'max']: - lp.obj.maximize = True - else: - raise RuntimeError('\n%s - is not a valid objective operation' % osense) - lp.obj.name = fba.getActiveObjective().getPid() - for fo_ in fba.getActiveObjective().fluxObjectives: - lp.obj[varMap[fo_.reaction]] = fo_.coefficient - - # create N constraints - lp.rows.add(Nmat.shape[0]) - - conMap = {} - for n_ in range(Nmat.shape[0]): - conMap[fba.N.row[n_]] = n_ - #lp.rows[n_].name = fba.N.row[n_] - - #tnew = time.time() - for r_ in range(Nmat.shape[0]): - # name and coefficients - newCon = [] - for c_ in range(Nmat.shape[1]): - newCon.append((c_, Nmat[r_,c_])) - lp.rows[r_].name = fba.N.row[r_] - lp.rows[r_].matrix = newCon - - # sense and rhs - rhs = RHSmat[r_] - if fba.N.operators[r_] in ['<=','<','L']: - lp.rows[r_].bounds = None, rhs - elif fba.N.operators[r_] in ['>=','>','G']: - lp.rows[r_].bounds = rhs, None - elif fba.N.operators[r_] in ['=','E']: - lp.rows[r_].bounds = rhs - else: - raise RuntimeError('INFO: invalid operator: %s' % fba.N.operators[r_]) - - # add user defined constraints - if fba.CM != None: - baseRows = len(lp.rows) - lp.rows.add(CMmat.shape[0]) - for r_ in range(CMmat.shape[0]): - # name and coefficients - newCon = [] - for c_ in range(CMmat.shape[1]): - newCon.append((c_, CMmat[r_, c_])) - lp.rows[baseRows+r_].name = fba.CM.row[r_] - lp.rows[baseRows+r_].matrix = newCon - conMap[fba.CM.row[r_]] = Nmat.shape[0] + r_ - - # sense and rhs - rhs = CMrhs[r_] - if fba.CM.operators[r_] in ['<=','<','L']: - lp.rows[baseRows+r_].bounds = None, rhs - elif fba.CM.operators[r_] in ['>=','>','G']: - lp.rows[baseRows+r_].bounds = rhs, None - elif fba.CM.operators[r_] in ['=','E']: - lp.rows[baseRows+r_].bounds = rhs - else: - raise RuntimeError('INFO: invalid operator: %s' % fba.N.operators[r_]) - - # add bounds - for r_ in fba.reactions: - lb = ub = None - lb = fba.getReactionLowerBound(r_.getPid()) - ub = fba.getReactionUpperBound(r_.getPid()) - - if lb in ['Infinity', 'inf', 'Inf', 'infinity']: - lb = GLPK_INFINITY - elif lb in ['-Infinity', '-inf', '-Inf', '-infinity', None]: - lb = -GLPK_INFINITY - elif numpy.isinf(lb): - if lb < 0.0: - lb = -GLPK_INFINITY - else: - lb = GLPK_INFINITY - if ub in ['Infinity', 'inf', 'Inf', 'infinity', None]: - ub = GLPK_INFINITY - elif ub in ['-Infinity', '-inf', '-Inf', '-infinity']: - ub = -GLPK_INFINITY - elif numpy.isinf(ub): - if ub < 0.0: - ub = -GLPK_INFINITY - else: - ub = GLPK_INFINITY - - if ub != GLPK_INFINITY and lb != -GLPK_INFINITY and ub == lb: - lp.cols[varMap[r_.getPid()]].bounds = lb - elif ub != GLPK_INFINITY and lb != -GLPK_INFINITY: - lp.cols[varMap[r_.getPid()]].bounds = lb, ub - elif ub != GLPK_INFINITY: - lp.cols[varMap[r_.getPid()]].bounds = None, ub - elif lb != -GLPK_INFINITY: - lp.cols[varMap[r_.getPid()]].bounds = lb, None - else: - lp.cols[varMap[r_.getPid()]].bounds = None - - solver.varMap = varMap - solver.conMap = conMap - #print('\ngplk_constructLPfromFBA time: {}\n'.format(time.time() - _Stime)) - if fname != None: - lp.write(cpxlp=fname+'.lp') - return solver - - - - def fluxVariabilityAnalysis(self, fba, selected_reactions=None, pre_opt=True, objF2constr=True, rhs_sense='lower', optPercentage=100.0, work_dir=None, quiet=True, debug=False, oldlpgen=False, markupmodel=True, default_on_fail=False, roundoff_span=10): - """ - Perform a flux variability analysis on an fba model: - - - *fba* an FBA model object - - *selected reactions* [default=None] means use all reactions otherwise use the reactions listed here - - *pre_opt* [default=True] attempt to presolve the FBA and report its results in the ouput, if this is disabled and *objF2constr* is True then the vid/value of the current active objective is used - - *tol* [default=None] do not floor/ceiling the objective function constraint, otherwise round of to *tol* - - *rhs_sense* [default='lower'] means objC >= objVal the inequality to use for the objective constraint can also be *upper* or *equal* - - *optPercentage* [default=100.0] means the percentage optimal value to use for the RHS of the objective constraint: optimal_value*(optPercentage/100.0) - - *work_dir* [default=None] the FVA working directory for temporary files default = cwd+fva - - *debug* [default=False] if True write out all the intermediate FVA LP's into work_dir - - *quiet* [default=False] if enabled supress CPLEX output - - *objF2constr* [default=True] add the model objective function as a constraint using rhs_sense etc. If - this is True with pre_opt=False then the id/value of the active objective is used to form the constraint - - *markupmodel* [default=True] add the values returned by the fva to the reaction.fva_min and reaction.fva_max - - *default_on_fail* [default=False] if *pre_opt* is enabled replace a failed minimum/maximum with the solution value - - *roundoff_span* [default=10] number of digits is round off (not individual min/max values) - - Returns an array with columns Reaction, Reduced Costs, Variability Min, Variability Max, abs(Max-Min), MinStatus, MaxStatus and a list containing the row names. - - """ - if work_dir == None: - work_dir = os.getcwd() - else: - assert os.path.exists(work_dir), '\nWhat did you think would happen now!' - if debug: - debug_dir = os.path.join(work_dir,'DEBUG') - if not os.path.exists(debug_dir): - os.mkdir(debug_dir) - s2time = time.time() - # generate a presolution - - - pre_sol = pre_oid = pre_oval = None - REDUCED_COSTS = {} - cpx, pre_sol, pre_oid, pre_oval, OPTIMAL_PRESOLUTION, REDUCED_COSTS = self.presolve(fba, pre_opt, objF2constr, quiet=quiet, oldlpgen=oldlpgen) - - # if required add the objective function as a constraint - if objF2constr: - cpx.setObjectiveAsConstraint(rhs_sense, pre_oval, optPercentage) - if debug: - cpx.write(cpxlp=os.path.join(debug_dir, 'FVA_base.lp')) - - # do the FVA - NUM_FLX = len(fba.reactions) - print('Total number of reactions: {}'.format(NUM_FLX)) - if selected_reactions != None: - rids = fba.getReactionIds() - for r in selected_reactions: - assert r in rids, "\n%s is not a valid reaction name" % r - else: - selected_reactions = fba.getReactionIds() - NUM_FLX = len(selected_reactions) - print('Number of user selected variables: {}'.format(NUM_FLX)) - try: - OUTPUT_ARRAY = numpy.zeros((NUM_FLX, 7), numpy.double) - except AttributeError: - OUTPUT_ARRAY = numpy.zeros((NUM_FLX, 7)) - OUTPUT_NAMES = [] - cntr = 0 - tcnt = 0 - # this is a memory hack --> prevents solver going berserk - mcntr = 0 - mcntr_cntrl = 3 - mps_filename = '_{}_.mps'.format(str(time.time()).split('.')[0]) - cpx.lp.write(mps=mps_filename) - for Ridx in range(NUM_FLX): - R = selected_reactions[Ridx] - OUTPUT_NAMES.append(R) - max_error_iter = 1 - GOMIN = True - gomin_cntr = 0 - method = 's' - while GOMIN: - MIN_STAT = 0 - # MIN - # TODO: bgoli: see whether this also works with 'minimize' - cpx.setObjective('min%s' % R, {R:1}, 'min', reset=True) - ## cplx_setBounds(c, id, min=None, max=None) # think about this - MIN_STAT = cpx.solve(method=method) - if MIN_STAT == 'LPS_OPT': - MIN_STAT = 1 - elif MIN_STAT == 'LPS_UNBND': - MIN_STAT = 2 - elif MIN_STAT == 'LPS_NOFEAS': - MIN_STAT = 3 - else: - MIN_STAT = 4 - if MIN_STAT == 1: # solved - min_oval = cpx.lp.obj.value - elif MIN_STAT == 2: # unbound - min_oval = -numpy.Inf - elif MIN_STAT == 3: - #min_oval = pre_sol[R] - min_oval = numpy.NaN - else: # other failure - min_oval = numpy.NaN - if debug: - cpx.write(cpxlp=os.path.join(debug_dir, '%smin.lp' % R)) - if MIN_STAT >= 3: - if gomin_cntr == 0: - cpx.erase() - del cpx - time.sleep(0.1) - cpx = glpk.LPX(mps=mps_filename) - gomin_cntr += 1 - #elif gomin_cntr == 1: - #method = 'i' - #gomin_cntr += 1 - else: - GOMIN = False - else: - GOMIN = False - if gomin_cntr >= max_error_iter: - GOMIN = False - - GOMAX = True - gomax_cntr = 0 - method = 's' - while GOMAX: - MAX_STAT = 0 - # MAX - cpx.setObjective('max%s' % R, coef=None, sense='max', reset=False) - ## cplx_setBounds(c, id, min=None, max=None) # think about this - MAX_STAT = cpx.solve(method=method) - if MAX_STAT == 'LPS_OPT': - MAX_STAT = 1 - elif MAX_STAT == 'LPS_UNBND': - MAX_STAT = 2 - elif MAX_STAT == 'LPS_NOFEAS': - MAX_STAT = 3 - else: - MAX_STAT = 4 - if MAX_STAT == 1: # solved - max_oval = cpx.lp.obj.value - elif MAX_STAT == 2: # unbound - max_oval = numpy.Inf - elif MAX_STAT == 3: # infeasable - #max_oval = pre_sol[R] - max_oval = numpy.NaN - else: # other fail - max_oval = numpy.NaN - - if MAX_STAT >= 3: - if gomax_cntr == 0: - cpx.lp.erase() - del cpx.lp - time.sleep(0.1) - cpx.lp = glpk.LPX(mps=mps_filename) - gomax_cntr += 1 - #elif gomax_cntr == 1: - #method = 'i' - #gomax_cntr += 1 - else: - GOMAX = False - else: - GOMAX = False - if gomax_cntr >= max_error_iter: - GOMAX = False - - - # check for solver going berserk - if MIN_STAT > 1 and MAX_STAT > 1: - print(Ridx) - time.sleep(1) - - # enables using the default value as a solution if the solver fails - if pre_opt and default_on_fail: - if MAX_STAT > 1 and not MIN_STAT > 1: - max_oval = pre_sol[R] - if MIN_STAT > 1 and not MAX_STAT > 1: - min_oval = pre_sol[R] - - if debug: - cpx.write(cpxlp=os.path.join(debug_dir, '%smax.lp' % R)) - - OUTPUT_ARRAY[Ridx,0] = pre_sol[R] - if R in REDUCED_COSTS: - OUTPUT_ARRAY[Ridx,1] = REDUCED_COSTS[R] - OUTPUT_ARRAY[Ridx,2] = min_oval - OUTPUT_ARRAY[Ridx,3] = max_oval - OUTPUT_ARRAY[Ridx,4] = round(abs(max_oval - min_oval), roundoff_span) - OUTPUT_ARRAY[Ridx,5] = MIN_STAT - OUTPUT_ARRAY[Ridx,6] = MAX_STAT - if markupmodel: - REAC = fba.getReaction(R) - REAC.setValue(pre_sol[R]) - REAC.fva_min = min_oval - REAC.fva_max = max_oval - REAC.fva_status = (MIN_STAT, MAX_STAT) - if R in REDUCED_COSTS: - REAC.reduced_costs = REDUCED_COSTS[R] - if not quiet and MAX_STAT > 1 or MIN_STAT > 1: - print('Solver fail for reaction \"{}\" (MIN_STAT: {} MAX_STAT: {})'.format(R, MIN_STAT, MAX_STAT)) - cntr += 1 - if cntr == 200: - tcnt += cntr - print('FVA has processed {} of {} reactions'.format(tcnt, NUM_FLX)) - cntr = 0 - - os.remove(mps_filename) - - #cpx.write(cpxlp='thefinaldebug.lp') - del cpx - print('\nSinglecore FVA took: {} min (1 process)\n'.format((time.time()-s2time)/60.)) - print('Output array has columns:') - print('Reaction, Reduced Costs, Variability Min, Variability Max, abs(Max-Min), MinStatus, MaxStatus') - return OUTPUT_ARRAY, OUTPUT_NAMES - \ No newline at end of file diff --git a/src/oldsolver/CBSolver.py b/src/oldsolver/CBSolver.py deleted file mode 100644 index dbc6075..0000000 --- a/src/oldsolver/CBSolver.py +++ /dev/null @@ -1,1027 +0,0 @@ -''' -Created on Nov 12, 2014 - -@author: arne -''' - -# preparing for Python 3 port -from __future__ import division, print_function -from __future__ import absolute_import -#from __future__ import unicode_literals - -import os -import time - -import numpy - -from cbmpy import CBWrite - -class Solver(): - """ abstract solver stub to ease integration of new solvers """ - - def copy(self): - """ creates an independent copy of this solver""" - raise NotImplementedError('abstract method') - - def solve(self): - """ solves the current problem """ - pass - - def getObjectiveValue(self): - """ returns current objective value (typically valid after solve) """ - pass - - def getObjectiveId(self): - """ returns the name of the current objective function """ - pass - - def getSolutionStatus(self): - """ - Returns one of: - - - *LPS_OPT*: solution is optimal; - - *LPS_FEAS*: solution is feasible; - - *LPS_INFEAS*: solution is infeasible; - - *LPS_NOFEAS*: problem has no feasible solution; - - *LPS_UNBND*: problem has unbounded solution; - - *LPS_UNDEF*: solution is undefined. - - """ - pass - - def isFeasible(self): - """ checks if problem has been solved to feasibility """ - status = self.getSolutionStatus(); - return (status == 'LPS_OPT') or (status == 'LPS_UNBD') - - def isOptimal(self): - """ checks if problem has been solved to optimality - - """ - return self.getSolutionStatus() == 'LPS_OPT' - - def isDualFeasible(self): - """ checks if problem has been solved to dual feasibility """ - pass - - def addLinearConstraint(self, name, coef, rhs, sense): - """ adds an additional linear constraint. - - Warning: Adding constraints manually might be much slower than creating - the whole model from a metabolic network at once - """ - pass - - def addVariables(self, names, lb=None, ub=None, obj=None): - """ adds additional variables. - - - *names* list of variable names - - *lb* list of lower bounds - - *ub* list of upper bounds - - *obj* list of objective values - - if only one variable should be added, the lists can also be replaced by - plain values - - Warning: Adding constraints manually might be much slower than creating - the whole model from a metabolic network at once - """ - pass - - def setBounds(self, bounds): - """ sets lower bounds of given variables - - bounds should be a dictionary mapping variable names to the new bounds - the new bounds for each variable are given by the pair (lb, ub) - """ - pass - - def getBounds(self, bounds=None): - """ fetches the bounds of the given variables - - - bounds: dictionary that has variable names as keys, the bounds will - be written in the corresponding values. - If bounds is None, a new dictionary for all variables is created - """ - raise NotImplementedError('subclasses must implement this') - - def setLowerBounds(self, bounds): - """ sets lower bounds of given variables - - bounds should be a dictionary mapping variable names to the new bounds - """ - pass - - def setUpperBounds(self, bounds): - """ sets upper bounds of given variables - - bounds should be a dictionary mapping variable names to the new bounds - """ - pass - - def setObjective(self, name='obj', coef=None, sense='maximize', reset=True): - """ sets the objective - - - *name* name of the objective - - *coef* dictionary mapping variable names to the new coefficients - - *sense* objective sense, can be one of 'min' or 'max' - - *reset* if all other objective coefficients should be reset - """ - pass - - def getObjectiveCoef(self, obj=None): - """ fetches the objective coefficients of the given variables - - - obj: dictionary that has variable names as keys, the coefficients will - be written in the corresponding values. - If obj is None, a new dictionary for all variables is created - """ - pass - - def setRHS(self, rhs): - """ sets right-hand sides of given constraints - - rhs should be a dictionary mapping constraint names to the new rhs - """ - pass - - def setSense(self, sense): - """ sets the sense of given constraints - - sense should be a dictionary mapping constraint names to the new sense - """ - pass - - def deleteVariables(self, variables): - """ deletes the variables with specified names """ - pass - - def deleteConstraints(self, cons): - """ delete constraints with specified names """ - pass - - def getSolution(self): - """ - extract the primal solution - - This returns a dictionary assigning to each variable its value - """ - pass - - def getReducedCosts(self, scaled=False): - """ - Extract ReducedCosts from LP and return as a dictionary 'Rid' : reduced cost - - - *scaled* scale the reduced cost by the optimal flux value - """ - pass - - def getShadowPrices(self): - """ - Returns a dictionary of shadow prices containing 'Rid' : (lb, rhs, ub) - """ - pass - - def getDualValues(self): - """ - Get the get the dual values of the solution - - Output is a dictionary of {name : value} pairs - - """ - pass - - def setSolutionToModel(self, fba, with_reduced_costs='unscaled'): - """ - Sets the FBA solution from a CPLEX solution to an FBA object - - - *fba* and fba object - - *with_reduced_costs* [default='unscaled'] calculate and add reduced cost information to mode this can be: 'unscaled' or 'scaled' - or anything else which is interpreted as None. Scaled is: s_rcost = (r.reduced_cost*rval)/obj_value - - """ - sol = self.getSolution() - objval = self.getObjectiveValue() - if self.isFeasible(): - fba.objectives[fba.activeObjIdx].solution, fba.objectives[fba.activeObjIdx].value = sol, objval - else: - fba.objectives[fba.activeObjIdx].solution, fba.objectives[fba.activeObjIdx].value = sol, numpy.NaN - for r in fba.reactions: - rid = r.getPid() - if rid in sol: - r.value = sol[rid] - else: - r.value = None - scaled = False - if with_reduced_costs == 'scaled': - scaled = True - with_reduced_costs = True - elif with_reduced_costs == 'unscaled': - scaled = False - with_reduced_costs = True - else: - with_reduced_costs = False - - if objval != None and sol != {} and with_reduced_costs: - RC = self.getReducedCosts(scaled=scaled) - self.setReducedCostsToModel(fba, RC) - else: - self.setReducedCostsToModel(fba, {}) - - def setReducedCostsToModel(self, fba, reduced_costs): - """ - For each reaction/flux, sets the attribute "reduced_cost" from a dictionary of - reduced costs - - - *fba* an fba object - - *reduced_costs* a dictionary of {reaction : value} pairs - - """ - if len(reduced_costs) == 0: - pass - else: - for r in fba.reactions: - if r.getPid() in reduced_costs: - r.reduced_cost = reduced_costs[r.getPid()] - else: - r.reduced_cost = None - - def setObjectiveAsConstraint(self, rhs_sense, oval, optPercentage): - """ - Take the objective function and "optimum" value and add it as a constraint - - *cpx* a cplex object - - *oval* the objective value - - *tol* [default=None] do not floor/ceiling the objective function constraint, otherwise round of to *tol* WEIRD PARAMETER, removed - - *rhs_sense* [default='lower'] means objC >= objVal the inequality to use for the objective constraint can also be *upper* or *equal* - - *optPercentage* [default=100.0] means the percentage optimal value to use for the RHS of the objective constraint: optimal_value*(optPercentage/100.0) - - """ - - # generate new constraint from old objective value (use non-zero coefficients) - con = self.getObjectiveCoef() - -# if rhs_sense == 'equal': -# pass -# elif cpx.obj.maximize and (rhs_sense == 'upper'): -# print('\nWarning: RHS sense error: \"upper\" does not match \"maximize\" changing to \"lower\"') -# rhs_sense = 'lower' -# time.sleep(1) -# elif not cpx.obj.maximize and (rhs_sense == 'lower'): -# print('\nWarning: RHS sense error: \"lower\" does not match \"minimize\" changing to \"upper\"') -# rhs_sense = 'upper' -# time.sleep(1) -# else: -# print('\nRHS sense ok.') - - # set objective constraint - if rhs_sense == 'equal': - self.addLinearConstraint('ObjCstr', con, rhs=oval, sense='E') - elif rhs_sense == 'upper': -# if tol != None: -# ub = numpy.ceil(oval/tol)*tol*optPercentage/100.0 -# else: - ub = oval*(optPercentage/100.0) - self.addLinearConstraint('ObjCstr', con, rhs=ub, sense='L') - elif rhs_sense == 'lower': -# if tol != None: -# lb = numpy.floor(oval/tol)*tol*optPercentage/100.0 -# else: - lb = oval*(optPercentage/100.0) - self.addLinearConstraint(name='ObjCstr', coef=con, rhs=lb, sense='G') - else: - raise RuntimeError("\nInvalid RHS sense: %s" % rhs_sense) - - def write(self, filename, title=None, Dir=None): - """ write problem to file""" - pass - - def writeLPsolution(self, fba_sol, objf_name, objf_val, fname, Dir=None, separator=','): - """ - This function writes the optimal solution, produced wth `cplx_getOptimalSolution` to file - - - *fba_sol* a dictionary of Flux : value pairs - - *objf_name* the objective function flux id - - *objf_val* the optimal value of the objective function - - *fname* the output filename - - *Dir* [default=None] use directory if not None - - *separator* [default=','] the column separator - - """ - if Dir != None: - assert os.path.exists(Dir), '\nPath does not exist' - fname = os.path.join(Dir, fname) - fname += '.csv' - F = file(fname, 'w') - cntr = 0 - F.write('%s%s%s\n' % ('ObjectiveFunction', separator, objf_name)) - for r in fba_sol: - F.write('%s%s%f\n' % (r, separator, fba_sol[r])) - F.flush() - F.close() - print('CSV exported to {}'.format(fname)) - - def setOutputStreams(self, mode='default'): - """ - Sets the noise level of the solver, mode can be one of: - - - *None* silent i.e. no output - - *'file'* set solver to silent and output logs to some log file - - *'iostream'* set solver to silent and output logs to some io stream - - *'default'* or anything else noisy with full output closes STREAM_IO and STREAM_FILE (default) - """ - pass - -class SolverFactory(): - ''' - abstract solver factory stub with default implementations to ease - integration of new solvers. - ''' - - def __init__(self): - ''' - Constructor - ''' - - def createEmpty(self): - """ creates a new empty solver instance """ - pass - - def create(self, fba = None, fname=None): - """creates a new solver instance. - - If an fba instance (metabolic network) is given, the solver is setup - to solve the fba instance. - Otherwise an empty problem is returned for manual setup. - """ - pass - - def analyzeModel(self, fba, lpFname=None, return_lp_obj=False, with_reduced_costs='unscaled', with_sensitivity=False,\ - build_n=True, quiet=False, method=None): - """ - Optimize a model and add the result of the optimization to the model object - (e.g. `reaction.value`, `objectiveFunction.value`). The stoichiometric - matrix is automatically generated. This is a common function available in all - solver interfaces. By default returns the objective function value - - - *fba* an instantiated PySCeSCBM model object - - *lpFname* [default=None] the name of the intermediate LP file saved when this has a string value. - - *return_lp_obj* [default=False] off by default when enabled it returns the PyGLPK LP object - - *with_reduced_costs* [default='unscaled'] calculate and add reduced cost information to mode this can be: 'unscaled' or 'scaled' - or anything else which is interpreted as 'None'. Scaled means s_rcost = (r.reduced_cost*rval)/obj_value - - *with_sensitivity* [default=False] add solution sensitivity information (not yet implemented) - - *del_intermediate* [default=False] delete the intermediary files after updating model object, useful for server applications - - *build_n* [default=True] generate stoichiometry from the reaction network (reactions/reagents/species) - - *quiet* [default=False] suppress solver output - - *method* [default=solverDefault] select the solver method, see the respective solver documentation for details - - """ - - if build_n: - fba.buildStoichMatrix() - #CBTools.addStoichToFBAModel(f) - fid = fba.id - - if with_reduced_costs == 'scaled': - fba.SCALED_REDUCED_COSTS = True - elif with_reduced_costs == 'unscaled': - fba.SCALED_REDUCED_COSTS = False - else: - fba.SCALED_REDUCED_COSTS = None - - if lpFname == None: - flp = self.create(fba, fname=None) - #f.id = '_glpktmp_.tmp' - else: - flp = self.create(fba, fname=lpFname) - fid = lpFname - - _Stime = time.time() - - print('\nanalyzeModel FBA --> LP time: {}\n'.format(time.time() - _Stime)) - fba.id = fid - if method != None: - flp.solve(method) - else: - flp.solve() - - flp.setSolutionToModel(fba, with_reduced_costs=with_reduced_costs) - - fba.SOLUTION_STATUS = flp.getSolutionStatus() - # TODO: need to synchronise all solvers to same integer system - if fba.SOLUTION_STATUS == 'LPS_OPT': - fba.SOLUTION_STATUS_INT = 1 - else: - fba.SOLUTION_STATUS_INT = 999 - -# if oldlpgen and del_intermediate: -# os.remove(LPF) - objv = fba.getActiveObjective().getValue() - print('\nanalyzeModel objective value: {}\n'.format(objv)) - if return_lp_obj: - return flp - else: - del flp - return objv - - def presolve(self, fba, pre_opt, objF2constr, quiet=False, oldlpgen=True, with_reduced_costs='unscaled'): - """ - This is a utility function that does a presolve for FVA, MSAF etc. Generates properly formatted - empty objects if pre_opt == False - - - *pre_opt* a boolean - - *fba* a CBModel object - - *objF2constr* add objective function as constraint - - *quiet* [default=False] supress cplex output - - *with_reduced_costs* [default='unscaled'] can be 'scaled' or 'unscaled' - - Returns: pre_sol, pre_oid, pre_oval, OPTIMAL_PRESOLUTION, REDUCED_COSTS - - """ - - cpx = self.create(fba) - OPTIMAL_PRESOLUTION = None - pre_sol = pre_oid = pre_oval = None - REDUCED_COSTS = {} - if pre_opt: - cpx.solve() - if cpx.isOptimal(): - print('Valid Presolution') - OPTIMAL_PRESOLUTION = True - pre_oval = cpx.getObjectiveValue() - pre_oid = cpx.getObjectiveId() - pre_sol = cpx.getSolution() - fba.objectives[fba.activeObjIdx].solution = pre_sol - fba.objectives[fba.activeObjIdx].value = pre_oval - if with_reduced_costs == 'scaled': - REDUCED_COSTS = cpx.getReducedCosts(scaled=True) - elif with_reduced_costs == 'unscaled': - REDUCED_COSTS = cpx.getReducedCosts(scaled=False) - else: - print('Invalid Presolution') - OPTIMAL_PRESOLUTION = False - pre_sol = {} - for r in fba.reactions: - pre_sol.update({r.getPid : 0.0}) - r.reduced_cost = 0.0 - pre_oval = 0.0 - pre_oid = 'None' - raise RuntimeError('\nPresolve failed to optimize this model and cannot continue!') - else: - pre_sol = {} - for r in fba.reactions: - pre_sol.update({r.getPid() : 0.0}) - if objF2constr: - pre_oval = fba.objectives[fba.activeObjIdx].value - pre_oid = fba.objectives[fba.activeObjIdx].getPid() - else: - pre_oval = 0.0 - pre_oid = 'None' - for r in fba.reactions: - r.value = pre_sol[r.id] - return cpx, pre_sol, pre_oid, pre_oval, OPTIMAL_PRESOLUTION, REDUCED_COSTS - - def fluxVariabilityAnalysis(self, fba, selected_reactions=None, pre_opt=True, objF2constr=True, rhs_sense='lower', optPercentage=100.0, work_dir=None, quiet=True, debug=False, oldlpgen=False, markupmodel=True, default_on_fail=False, roundoff_span=10): - """ - Perform a flux variability analysis on an fba model: - - - *fba* an FBA model object - - *selected reactions* [default=None] means use all reactions otherwise use the reactions listed here - - *pre_opt* [default=True] attempt to presolve the FBA and report its results in the ouput, if this is disabled and *objF2constr* is True then the rid/value of the current active objective is used - - *tol* [default=None] do not floor/ceiling the objective function constraint, otherwise round of to *tol* - - *rhs_sense* [default='lower'] means objC >= objVal the inequality to use for the objective constraint can also be *upper* or *equal* - - *optPercentage* [default=100.0] means the percentage optimal value to use for the RHS of the objective constraint: optimal_value*(optPercentage/100.0) - - *work_dir* [default=None] the FVA working directory for temporary files default = cwd+fva - - *debug* [default=False] if True write out all the intermediate FVA LP's into work_dir - - *quiet* [default=False] if enabled, supress CPLEX output - - *objF2constr* [default=True] add the model objective function as a constraint using rhs_sense etc. If - this is True with pre_opt=False then the id/value of the active objective is used to form the constraint - - *markupmodel* [default=True] add the values returned by the fva to the reaction.fva_min and reaction.fva_max - - *default_on_fail* [default=False] if *pre_opt* is enabled replace a failed minimum/maximum with the solution value - - *roundoff_span* [default=10] number of digits is round off (not individual min/max values) - - Returns an array with columns: Reaction, Reduced Costs, Variability Min, Variability Max, abs(Max-Min), MinStatus, MaxStatus and a list containing the row names. - - """ - if work_dir == None: - work_dir = os.getcwd() - else: - assert os.path.exists(work_dir), '\nWhat did you think would happen now!' - if debug: - debug_dir = os.path.join(work_dir,'DEBUG') - if not os.path.exists(debug_dir): - os.mkdir(debug_dir) - s2time = time.time() - # generate a presolution - cpx = OPTIMAL_PRESOLUTION = None - pre_sol = pre_oid = pre_oval = None - REDUCED_COSTS = {} - cpx, pre_sol, pre_oid, pre_oval, OPTIMAL_PRESOLUTION, REDUCED_COSTS = self.presolve(fba, pre_opt, objF2constr, quiet=quiet, oldlpgen=oldlpgen) - assert isinstance(cpx, Solver) - # if required add the objective function as a constraint - if objF2constr: - cpx.setObjectiveFunctionAsConstraint(cpx, rhs_sense, pre_oval, optPercentage) - if debug: - cpx.writeLPtoLPTfile('FVA_base', title=None, Dir=debug_dir) - - # do the FVA - NUM_FLX = len(fba.reactions) - print('Total number of reactions: {}'.format(NUM_FLX)) - if selected_reactions != None: - rids = fba.getReactionIds() - for r in selected_reactions: - assert r in rids, "\n%s is not a valid reaction name" % r - else: - selected_reactions = fba.getReactionIds() - NUM_FLX = len(selected_reactions) - print('Number of user selected variables: {}'.format(NUM_FLX)) - try: - OUTPUT_ARRAY = numpy.zeros((NUM_FLX, 7), numpy.double) - except AttributeError: - OUTPUT_ARRAY = numpy.zeros((NUM_FLX, 7)) - OUTPUT_NAMES = [] - cntr = 0 - tcnt = 0 - for Ridx in range(NUM_FLX): - R = selected_reactions[Ridx] - OUTPUT_NAMES.append(R) - MIN_STAT = MAX_STAT = 0 - # MIN - ## cplx_setObjective(cpx, R, expr=None, sense='min', reset=True) - # TODO: bgoli: see whether this also works with 'minimize' - cpx.setObjective('min%s' % R, {R:1}, 'min', reset=True) - ## cplx_setBounds(c, id, min=None, max=None) # think about this - MIN_STAT = cpx.solve() - if MIN_STAT == 1: # solved - min_oval = cpx.getObjectiveValue() - elif MIN_STAT == 2: # unbound - min_oval = -numpy.Inf - elif MIN_STAT == 3: - #min_oval = pre_sol[R] # try this as infeasible means no solution outside optimum - min_oval = numpy.NaN - else: # other failure - min_oval = numpy.NaN - if debug: - cpx.writeLPtoLPTfile('%smin' % R, title='min%s=%s' % (R,min_oval), Dir=debug_dir) - - # MAX - ## cplx_setObjective(cpx, R, expr=None, sense='max', reset=True) - cpx.setObjective('max%s' % R, coef=None, sense='max', reset=False) - ## cplx_setBounds(c, id, min=None, max=None) # think about this - MAX_STAT = cpx.solve() - if MAX_STAT == 1: # solved - max_oval = cpx.getObjectiveValue() - elif MAX_STAT == 2: # unbound - max_oval = numpy.Inf - elif MAX_STAT == 3: # infeasible - #max_oval = pre_sol[R] # try this as infeasible means no solution outside optimum - max_oval = numpy.NaN - else: # other fail - max_oval = numpy.NaN - if debug: - cpx.writeLPtoLPTfile(cpx, '%smax' % R, title='max%s=%s' % (R,max_oval), Dir=debug_dir) - - # enables using the default value as a solution if the solver fails - if pre_opt and default_on_fail: - if MAX_STAT > 1 and not MIN_STAT > 1: - max_oval = pre_sol[R] - if MIN_STAT > 1 and not MAX_STAT > 1: - min_oval = pre_sol[R] - - - OUTPUT_ARRAY[Ridx,0] = pre_sol[R] - if R in REDUCED_COSTS: - OUTPUT_ARRAY[Ridx,1] = REDUCED_COSTS[R] - OUTPUT_ARRAY[Ridx,2] = min_oval - OUTPUT_ARRAY[Ridx,3] = max_oval - OUTPUT_ARRAY[Ridx,4] = round(abs(max_oval - min_oval), roundoff_span) - OUTPUT_ARRAY[Ridx,5] = MIN_STAT - OUTPUT_ARRAY[Ridx,6] = MAX_STAT - if markupmodel: - REAC = fba.getReaction(R) - REAC.setValue(pre_sol[R]) - REAC.fva_min = min_oval - REAC.fva_max = max_oval - REAC.fva_status = (MIN_STAT, MAX_STAT) - #if MAX_STAT == 1: - #REAC.fva_max = max_oval - #else: - #REAC.fva_max = None - #if MIN_STAT == 1: - #REAC.fva_min = min_oval - #else: - #REAC.fva_min = None - if R in REDUCED_COSTS: - REAC.reduced_costs = REDUCED_COSTS[R] - if not quiet and MAX_STAT > 1 or MIN_STAT > 1: - print('Solver fail for reaction \"{}\" (MIN_STAT: {} MAX_STAT: {})'.format(R, MIN_STAT, MAX_STAT)) - cntr += 1 - if cntr == 200: - tcnt += cntr - print('FVA has processed {} of {} reactions'.format(tcnt, NUM_FLX)) - cntr = 0 - if quiet: - cpx.setOutputStreams(mode='default') - del cpx - print('\nSinglecore FVA took: {} min (1 process)\n'.format((time.time()-s2time)/60.)) - print('Output array has columns:') - print('Reaction, Reduced Costs, Variability Min, Variability Max, abs(Max-Min), MinStatus, MaxStatus') - return OUTPUT_ARRAY, OUTPUT_NAMES - - def minimizeSumOfAbsFluxes(self, fba, selected_reactions=None, pre_opt=True, objF2constr=True, rhs_sense='lower', optPercentage=100.0, objective_coefficients={}, work_dir=None, quiet=False, debug=False, return_lp_obj=False, oldlpgen=False, with_reduced_costs=None): - """ - Minimize the sum of absolute fluxes sum(abs(J1) + abs(J2) + abs(J3) ... abs(Jn)) by adding two constraints per flux - and a variable representing the absolute value: - - Min: Ci abs_Ji - Ji - abs_Ji <= 0 - Ji + abs_Ji >= 0 - - Such that: - NJi = 0 - Jopt = opt - - returns the value of the flux minimization objective function (not the model objective function which remains unchanged from) - - Arguments: - - - *fba* an FBA model object - - *selected reactions* [default=None] means use all reactions otherwise use the reactions listed here - - *pre_opt* [default=True] attempt to presolve the FBA and report its results in the ouput, if this is disabled and *objF2constr* is True then the vid/value of the current active objective is used - - *tol* [default=None] do not floor/ceiling the objective function constraint, otherwise round of to *tol* - - *rhs_sense* [default='lower'] means objC >= objVal the inequality to use for the objective constraint can also be *upper* or *equal* - - *optPercentage* [default=100.0] means the percentage optimal value to use for the RHS of the objective constraint: optimal_value*(optPercentage/100.0) - - *work_dir* [default=None] the MSAF working directory for temporary files default = cwd+fva - - *debug* [default=False] if True write out all the intermediate MSAF LP's into work_dir - - *quiet* [default=False] if enabled supress CPLEX output - - *objF2constr* [default=True] add the model objective function as a constraint using rhs_sense etc. If - this is True with pre_opt=False then the id/value of the active objective is used to form the constraint - - *objective_coefficients* [default={}] a dictionary of (reaction_id : float) pairs that provide the are introduced as objective coefficients to the absolute flux value. Note that the default value of the coefficient (non-specified) is +1. - - *return_lp_obj* [default=False] off by default when enabled it returns the CPLEX LP object - - *with_reduced_costs* [default=None] if not None should be 'scaled' or 'unscaled' - - With outputs: - - - *fba* an update instance of a CBModel. Note that the FBA model objective function value is the original value set as a constraint - - """ - - if with_reduced_costs == 'scaled': - fba.SCALED_REDUCED_COSTS = True - elif with_reduced_costs == 'unscaled': - fba.SCALED_REDUCED_COSTS = False - elif fba.SCALED_REDUCED_COSTS == True: - with_reduced_costs = 'scaled' - elif fba.SCALED_REDUCED_COSTS == False: - with_reduced_costs = 'unscaled' - else: - with_reduced_costs = None - - print('\nINFO: using \"{}\" reduced costs.\n'.format(with_reduced_costs)) - - if work_dir == None: - work_dir = os.getcwd() - else: - assert os.path.exists(work_dir), '\nWhat did you think would happen now!' - if debug: - debug_dir = os.path.join(work_dir,'DEBUG') - if not os.path.exists(debug_dir): - os.mkdir(debug_dir) - - # generate a presolution - cpx = OPTIMAL_PRESOLUTION = None - pre_sol = pre_oid = pre_oval = None - REDUCED_COSTS = {} - cpx, pre_sol, pre_oid, pre_oval, OPTIMAL_PRESOLUTION, REDUCED_COSTS = self.presolve(fba, pre_opt, objF2constr, quiet=quiet, oldlpgen=oldlpgen, with_reduced_costs=with_reduced_costs) - assert isinstance(cpx, Solver) - # if required add the objective function as a constraint - if objF2constr: - cpx.setObjectiveAsConstraint(cpx, rhs_sense, pre_oval, optPercentage) - - STORED_OPT = None - if pre_opt: - STORED_OPT = pre_oval - else: - STORED_OPT = fba.getActiveObjective().getValue() - - # minimize the absolute sum of fluxes - if selected_reactions != None: - rids = fba.getReactionIds() - for r in selected_reactions: - assert r in rids, "\n%s is not a valid reaction name" % r - else: - selected_reactions = fba.getReactionIds() - - # this removes the objective function - ## fba_obj_ids = fba.getActiveObjective().getFluxObjectiveReactions() - ## print fba_obj_ids - ## for R in fba_obj_ids: - ## if R in selected_reactions: - ## selected_reactions.pop(selected_reactions.index(R)) - ## del R, fba_obj_ids - NUM_FLX = len(selected_reactions) - print('Total number of reactions: {}'.format(NUM_FLX)) - abs_selected_reactions = ['abs_%s' % r for r in selected_reactions] - # absJ implicitly assumed to be absJ >= 0 - #C#cpx.variables.add(names=abs_selected_reactions) - #basecols = len(cpx.cols) - lb = NUM_FLX*[0] - obj = NUM_FLX*[1] - cpx.addVariables(names=abs_selected_reactions, lb=lb, obj=obj) - # set objective name - cpx.setObjective(name='MAFS', sense='min', reset=False) - - """ - J - abs_J <= 0 - J + abs_J >= 0 - """ - for r in range(NUM_FLX): - conName = 'abs{}a'.format(selected_reactions[r]) - conCoef = {selected_reactions[r]:1,abs_selected_reactions[r]:-1} - conRHS = 0 - conSense = '<=' - cpx.addLinearConstraint(conName, conCoef, conRHS, conSense) - - conName = 'abs{}b'.format(selected_reactions[r]) - conCoef = {selected_reactions[r]:1,abs_selected_reactions[r]:1} - conRHS = 0 - conSense = '>=' - cpx.addLinearConstraint(conName, conCoef, conRHS, conSense) - - - if debug: - cpx.write(cpxlp=os.path.join(debug_dir, 'MSAF_base_(%s).lp' % time.time())) - #cplx_writeLPtoLPTfile(cpx, , title=None, Dir=debug_dir) - ## cplx_writeLPtoLPTfile(cpx, 'MSAF_base_%s' % time.time() , title=None, Dir=debug_dir) - - cpx.solve() - cpx.setSolutionToModel(fba, with_reduced_costs=with_reduced_costs) - - #cpx.write(cpxlp='test.lp'); return cpx - - minSum = cpx.getObjectiveValue() - fba.setAnnotation('min_flux_sum', minSum) - fba.getActiveObjective().setValue(STORED_OPT) - print('\nMinimizeSumOfAbsFluxes objective value: {}\n'.format(minSum)) - if quiet: - pass - #cplx_setOutputStreams(cpx, mode='default') - if not return_lp_obj: - return minSum - else: - return cpx - - def runInputScan(self, fba, exDict, wDir, input_lb=-10.0, input_ub=0.0, writeHformat=False, rationalLPout=False): - """ - scans all inputs - - """ - debug_dir = os.path.join(wDir, 'lpt') - ine_dir = os.path.join(wDir, 'ine') - rat_dir = os.path.join(wDir, 'eslv') - if not os.path.exists(debug_dir): - os.mkdir(debug_dir) - if writeHformat and not os.path.exists(ine_dir): - os.mkdir(ine_dir) - if rationalLPout and not os.path.exists(rat_dir): - os.mkdir(rat_dir) - - optimal_growth_rates = {} - infeasible_inputs = [] - modname = fba.sourcefile - fbaid0 = fba.id - fba.buildStoichMatrix() - for input in exDict: - fba.id = '%s(%s)' % (modname, input) - ilb = fba.getFluxBoundByReactionID(input, 'lower').value - iub = fba.getFluxBoundByReactionID(input, 'upper').value - fba.setReactionBound(input, input_lb, 'lower') - fba.setReactionBound(input, input_ub, 'upper') - #fbalp = cplx_getModelFromObj(fba) - fbalp = self.create(fba, fname=None) - fbalp.solve() - if fbalp.getSolutionStatus() == 'LPS_OPT': - opt = fbalp.getObjectiveValue() - if __DEBUG__: print('Optimal growth rate %s(%s) = %s' % (modname, input, opt)) - optimal_growth_rates.update({input : opt}) - if writeHformat: - CBWrite.WriteModelHFormatFBA(fba, ine_dir, use_rational=False) - CBWrite.WriteModelHFormatFBA(fba, ine_dir, use_rational=True) - if rationalLPout: - tmp = CBWrite.writeModelLP(fba, work_dir=rat_dir) - else: - print('Solver returned an error code: {}'.format(fbalp.getSolutionStatus())) - infeasible_inputs.append(input) - fba.setReactionBound(input, ilb, 'lower') - fba.setReactionBound(input, iub, 'upper') - del fbalp - fba.id = fbaid0 - return optimal_growth_rates, infeasible_inputs - -# The following method is a bit more complicated to generalize -# def singleGeneScan(self, fba, r_off_low=0.0, r_off_upp=0.0, optrnd=8, altout=False): -# """ -# Perform a single gene deletion scan -# -# - *fba* a model object -# - *r_off_low* the lower bound of a deactivated reaction -# - *r_off_upp* the upper bound of a deactivated reaction -# - *optrnd* [default=8] round off the optimal value -# - *altout* [default=False] by default return a list of gene:opt pairs, alternatively (True) return an extended -# result set including gene groups, optima and effect map -# -# """ -# # cplex optimization -# if fba.__single_gene_effect_map__ == None: -# fba.createSingleGeneEffectMap() -# #lpx = cbm.analyzeModel(fba, return_lp_obj=True) -# lpx = self.create(fba) -# lpx.solve() -# wtOpt = lpx.isOptimal() -# -# Jmap = fba.__single_gene_effect_map__.pop('keyJ') -# Emap = list(fba.__single_gene_effect_map__) -# -# lpx.setOutputStreams(mode=None) -# -# base_names = lpx.variables.get_names() -# base_lower = lpx.variables.get_lower_bounds() -# base_upper = lpx.variables.get_upper_bounds() -# -# base_lower_bounds = [] -# base_upper_bounds = [] -# -# t_cplex = 0.0 -# -# for n_ in range(len(base_names)): -# base_lower_bounds.append((base_names[n_], base_lower[n_])) -# base_upper_bounds.append((base_names[n_], base_upper[n_])) -# -# results = [] -# -# for pr_ in range(len(Emap)): -# ## cplex optimized method -# t_x1 = time.time() -# lpx.variables.set_lower_bounds(base_lower_bounds) -# lpx.variables.set_upper_bounds(base_upper_bounds) -# new_lower = [] -# new_upper = [] -# for r_ in range(len(Emap[pr_])): -# if not Emap[pr_][r_]: -# new_lower.append((Jmap[r_], r_off_low)) -# new_upper.append((Jmap[r_], r_off_upp)) -# if len(new_lower) > 0: -# lpx.variables.set_lower_bounds(new_lower) -# if len(new_upper) > 0: -# lpx.variables.set_upper_bounds(new_upper) -# lpx.solve() -# if lpx.solution.get_status() == lpx.solution.status.optimal: -# results.append({ 'opt' : round(lpx.solution.get_objective_value(), optrnd), -# 'deletions' : fba.__single_gene_effect_map__[Emap[pr_]], -# #'activities' : Emap[pr_] -# }) -# else: -# results.append({ 'opt' : float('nan'), -# 'deletions' : fba.__single_gene_effect_map__[Emap[pr_]], -# #'activities' : Emap[pr_] -# }) -# t_cplex += time.time() - t_x1 -# -# fba.__single_gene_effect_map__['keyJ'] = Jmap -# del lpx, fba -# print('\nSingle gene deletion scan: {} mins'.format(t_cplex/60.0)) -# -# singleRes = [] -# for d in results: -# for g in d['deletions']: -# if g not in singleRes: -# if numpy.isnan(d['opt']): -# lbl = 'no-solution' -# elif d['opt'] == wtOpt: -# lbl = 'silent' -# elif abs(d['opt']) <= 1.0e-11: -# lbl = 'lethal' -# else: -# lbl = 'partial' -# singleRes.append((g, d['opt'], lbl)) -# else: -# print('\nINFO: duplicate gene Id detected and skipped') -# -# if not altout: -# return singleRes -# else: -# return results - - -############################################################################### -# boilerplate for determining default solver -############################################################################### - -from cbmpy.CBConfig import __CBCONFIG__ as __CBCONFIG__ -__DEBUG__ = __CBCONFIG__['DEBUG'] -__version__ = __CBCONFIG__['VERSION'] -SOLVER_PREF = __CBCONFIG__['SOLVER_PREF'] - -__AVAILABLE_SOLVERS__ = [] - -__CBCONFIG__['SOLVER_ACTIVE'] = None -try: - from . import CBGLPK - __CBCONFIG__['SOLVER_ACTIVE'] = 'GLPK' - __AVAILABLE_SOLVERS__.append('GLPK') -except ImportError: - print('GLPK not available') -try: - from . import CBCPLEX - if __CBCONFIG__['SOLVER_ACTIVE'] == None: - __CBCONFIG__['SOLVER_ACTIVE'] = 'CPLEX' - else: - __CBCONFIG__['SOLVER_ACTIVE'] = 'GLPK+CPLEX' - __AVAILABLE_SOLVERS__.append('CPLEX') -except Exception as ex: - print('\n\n') - print(ex) - print('\n\n') - print('CPLEX not available') - -if __CBCONFIG__['SOLVER_ACTIVE'] == None: - print('\n*****\nWARNING: No linear solver present, please install IBM CPLEX with Python bindings or PyGLPK, please see http://cbmpy.sourceforge.net for Windows binary or http://tfinley.net/software/pyglpk for source.\n*****\n') - -if __CBCONFIG__['SOLVER_ACTIVE'] == 'GLPK+CPLEX': - if __CBCONFIG__['SOLVER_PREF'] == None: - __CBCONFIG__['SOLVER_ACTIVE'] = None - while __CBCONFIG__['SOLVER_ACTIVE'] not in ['CPLEX','GLPK']: - __CBCONFIG__['SOLVER_ACTIVE'] = raw_input('\nSolver preference not set. Please select a solver [CPLEX or GLPK]: ') - - elif __CBCONFIG__['SOLVER_PREF'] == 'CPLEX': - __CBCONFIG__['SOLVER_ACTIVE'] = 'CPLEX' - elif __CBCONFIG__['SOLVER_PREF'] == 'GLPK': - __CBCONFIG__['SOLVER_ACTIVE'] = 'GLPK' - - -############################################################################### -# old functions for compatibility -############################################################################### - -if __CBCONFIG__['SOLVER_ACTIVE'] == 'CPLEX': - factory = CBCPLEX.CPLEXFactory() -elif __CBCONFIG__['SOLVER_ACTIVE'] == 'GLPK': - factory = CBGLPK.GLPKFactory() -else: - factory = None - -def getFactory(): - """ fetches default factory """ - return factory - -def createSolver(fba=None): - """ creates a default solver. - - *fba* metabolic network model to initialize the solver to solve FBA. If no - FBA model is given, an empty LP isi initialized. - """ - return factory.create(fba) - -def analyzeModel(fba): - factory.analyzeModel(fba) - -def FluxVariabilityAnalysis(fba, selected_reactions=None, pre_opt=True, objF2constr=True, rhs_sense='lower', optPercentage=100.0): - factory.fluxVariabilityAnalysis(fba, selected_reactions, pre_opt, objF2constr, rhs_sense, optPercentage) - -def MinimizeSumOfAbsFluxes(fba, selected_reactions=None, pre_opt=True, objF2constr=True, rhs_sense='lower', optPercentage=100.0, objective_coefficients={}): - """ - Minimize the sum of absolute fluxes sum(abs(J1) + abs(J2) + abs(J3) ... abs(Jn)) by adding two constraints per flux - and a variable representing the absolute value: - - Min: Ci abs_Ji - Ji - abs_Ji <= 0 - Ji + abs_Ji >= 0 - - Such that: - NJi = 0 - Jopt = opt - - returns the value of the flux minimization objective function (not the model objective function which remains unchanged from) - - Arguments: - - - *fba* an FBA model object - - *selected reactions* [default=None] means use all reactions otherwise use the reactions listed here - - *pre_opt* [default=True] attempt to presolve the FBA and report its results in the ouput, if this is disabled and *objF2constr* is True then the vid/value of the current active objective is used - - *tol* [default=None] do not floor/ceiling the objective function constraint, otherwise round of to *tol* - - *rhs_sense* [default='lower'] means objC >= objVal the inequality to use for the objective constraint can also be *upper* or *equal* - - *optPercentage* [default=100.0] means the percentage optimal value to use for the RHS of the objective constraint: optimal_value*(optPercentage/100.0) - - *work_dir* [default=None] the MSAF working directory for temporary files default = cwd+fva - - *debug* [default=False] if True write out all the intermediate MSAF LP's into work_dir - - *quiet* [default=False] if enabled supress CPLEX output - - *objF2constr* [default=True] add the model objective function as a constraint using rhs_sense etc. If - this is True with pre_opt=False then the id/value of the active objective is used to form the constraint - - *objective_coefficients* [default={}] a dictionary of (reaction_id : float) pairs that provide the are introduced as objective coefficients to the absolute flux value. Note that the default value of the coefficient (non-specified) is +1. - - *return_lp_obj* [default=False] off by default when enabled it returns the CPLEX LP object - - *with_reduced_costs* [default=None] if not None should be 'scaled' or 'unscaled' - - With outputs: - - - *fba* an update instance of a CBModel. Note that the FBA model objective function value is the original value set as a constraint - - """ - factory.minimizeSumOfAbsFluxes(fba, selected_reactions, pre_opt, objF2constr, rhs_sense, optPercentage, objective_coefficients) \ No newline at end of file diff --git a/src/oldsolver/__init__.py b/src/oldsolver/__init__.py deleted file mode 100644 index e69de29..0000000