Spaces:
Runtime error
Runtime error
| import pandas as pd | |
| import pulp | |
| import pdb | |
| import os | |
| import json | |
| from rdkit import Chem | |
| # pulp_solver = pulp.solvers.CPLEX_CMD(path=None, keepFiles=0, mip=1, msg=1, | |
| # options=['mip tolerances mipgap 0', 'mip tolerances absmipgap 0', | |
| # 'mip tolerances integrality 0', 'simplex tolerances optimality 1E-9', | |
| # 'simplex tolerances feasibility 1E-9',], timelimit=1200) | |
| def count_substructures(radius,molecule): | |
| """Helper function for get the information of molecular signature of a | |
| metabolite. The relaxed signature requires the number of each substructure | |
| to construct a matrix for each molecule. | |
| Parameters | |
| ---------- | |
| radius : int | |
| the radius is bond-distance that defines how many neighbor atoms should | |
| be considered in a reaction center. | |
| molecule : Molecule | |
| a molecule object create by RDkit (e.g. Chem.MolFromInchi(inchi_code) | |
| or Chem.MolToSmiles(smiles_code)) | |
| Returns | |
| ------- | |
| dict | |
| dictionary of molecular signature for a molecule, | |
| {smiles: molecular_signature} | |
| """ | |
| m = molecule | |
| smi_count = dict() | |
| atomList = [atom for atom in m.GetAtoms()] | |
| for i in range(len(atomList)): | |
| env = Chem.FindAtomEnvironmentOfRadiusN(m,radius,i) | |
| atoms=set() | |
| for bidx in env: | |
| atoms.add(m.GetBondWithIdx(bidx).GetBeginAtomIdx()) | |
| atoms.add(m.GetBondWithIdx(bidx).GetEndAtomIdx()) | |
| # only one atom is in this environment, such as O in H2O | |
| if len(atoms) == 0: | |
| atoms = {i} | |
| smi = Chem.MolFragmentToSmiles(m,atomsToUse=list(atoms), | |
| bondsToUse=env,canonical=True) | |
| if smi in smi_count: | |
| smi_count[smi] = smi_count[smi] + 1 | |
| else: | |
| smi_count[smi] = 1 | |
| return smi_count | |
| def novoStoic_minFlux_relaxedRule(exchange_mets, novel_mets,project,iterations,pulp_solver,use_direction): | |
| """apply reaction rules generated from a more relaxed manner to search for | |
| reaction rules that are able to fill the gap between the source and sink | |
| metabolites. | |
| - rePrime procedure is more similar to a morgan fingerprints | |
| - the relaxed rule is generated from substructures without considering the | |
| bond that connect the atoms at the edge of the substructure to the rest | |
| of the molecules | |
| Parameters | |
| ---------- | |
| exchange_mets : dict | |
| overall stoichiometry of source and sink metabolites, {met: stoic,...} | |
| This is a important input for novoStoic to run correctly because the | |
| method requires that overall moieties are balanced. | |
| novel_mets : list | |
| list of novel metabolites that are not in the database (novoStoic/data/ | |
| metanetx_universal_model_kegg_metacyc_rhea_seed_reactome.json) | |
| filtered_rules : list | |
| list of rules that are filtered by the user (based on expert knowldedge) | |
| to reduce the running time of the novoStoic search process | |
| project : string | |
| a path to store the tmp information of result from running novoStoic | |
| iterations : int | |
| the number of iterations of searching for alternative solutions | |
| data_dir : type | |
| Description of parameter `data_dir`. | |
| Returns | |
| ------- | |
| None | |
| all the outputs are saved in the project folder. | |
| """ | |
| if not os.path.exists(project): | |
| os.makedirs(project) | |
| # the maximum flux of a reaction | |
| M = 2 | |
| data_dir = './data' | |
| # read csv files with molecular signatures and reaction rules | |
| molecular_signature = json.load(open( | |
| os.path.join(data_dir, 'decompose_vector_ac.json'))) | |
| molsigs = pd.DataFrame.from_dict(molecular_signature).fillna(0) | |
| rules = pd.read_csv( | |
| os.path.join(data_dir, "relaxed_rule_noduplic.csv"), index_col=0 | |
| ) | |
| ###### sets ############ | |
| moiety_index = rules.index.tolist() # moiety sets | |
| rules_index = rules.columns.values.tolist() | |
| print("Number of rules used in this search:",len(rules_index)) | |
| exchange_index = exchange_mets.keys() | |
| ###### parameters ###### | |
| # T(m,r) contains atom stoichiometry for each rule | |
| T = rules.to_dict(orient="index") | |
| # C(m,i) contains moiety cardinality for each metabolite | |
| C = molsigs.to_dict(orient="index") | |
| for m in moiety_index: | |
| C[m]["C00080"] = 0 | |
| C[m]["C00282"] = 0 | |
| # add metabolites that are not present in current database | |
| for met in novel_mets: | |
| # molsigs_product = pd.read_csv( | |
| # project + "/relaxed_molsig_" + met + "_1.csv", index_col=0 | |
| # ) | |
| # molsigs_product_dict = molsigs_product.to_dict(orient="index") | |
| smiles = novel_mets[met] | |
| mol = Chem.MolFromSmiles(smiles) | |
| mol = Chem.RemoveHs(mol) | |
| molsigs_product_dict = count_substructures(1,mol) | |
| for m in moiety_index: | |
| if m in molsigs_product_dict.keys(): | |
| C[m][met] = molsigs_product_dict[m] | |
| else: | |
| C[m][met] = 0 | |
| ###### variables ###### | |
| v_rule = pulp.LpVariable.dicts( | |
| "v_rule", rules_index, lowBound=-M, upBound=M, cat="Integer" | |
| ) | |
| v_rule_obj = pulp.LpVariable.dicts( | |
| "v_rule_obj", rules_index, lowBound=0, upBound=M, cat="Continuous" | |
| ) | |
| v_EX = pulp.LpVariable.dicts( | |
| "v_EX", exchange_index, lowBound=-M, upBound=M, cat="Continuous" | |
| ) | |
| y_rule = pulp.LpVariable.dicts( | |
| "y", rules_index, lowBound=0, upBound=1, cat="Binary" | |
| ) | |
| # create MILP problem | |
| lp_prob = pulp.LpProblem("novoStoic", pulp.LpMinimize) | |
| ####### objective function #### | |
| lp_prob += pulp.lpSum([v_rule_obj[j] for j in rules_index]) | |
| ####### constraints #### | |
| # constraint 1: moiety change balance | |
| for m in moiety_index: | |
| lp_prob += ( | |
| pulp.lpSum([T[m][r] * v_rule[r] for r in rules_index if T[m][r] !=0]) | |
| == pulp.lpSum([C[m][i] * v_EX[i] for i in exchange_index if C[m][i] != 0]), | |
| "moiety_balance_" + str(moiety_index.index(m)), | |
| ) | |
| # constraint 2: constraint for exchange reactions | |
| for i, stoic in exchange_mets.items(): | |
| lp_prob += v_EX[i] == stoic, "exchange" + i | |
| # constraint 3: control the number of rules | |
| direction_df = pd.read_csv( | |
| os.path.join(data_dir, "direction.csv"), index_col=0 | |
| ) | |
| direction_df.index = direction_df['reaction'] | |
| # direction: 0-reversible, 1-backward, 2-forward | |
| direction = direction_df['direction'].to_dict() | |
| if use_direction: | |
| soln_file = os.path.join(project, "solution_use_direction.txt") | |
| for j in rules_index: | |
| if direction[j] == 0: | |
| lp_prob += v_rule[j] >= y_rule[j] * -M, "cons1_%s" % j | |
| lp_prob += v_rule[j] <= y_rule[j] * M, "cons2_%s" % j | |
| if direction[j] == 1: | |
| lp_prob += v_rule[j] >= y_rule[j] * -M, "cons1_%s" % j | |
| lp_prob += v_rule[j] <= 0, "cons2_%s" % j | |
| if direction[j] == 2: | |
| lp_prob += v_rule[j] >= 0, "cons1_%s" % j | |
| lp_prob += v_rule[j] <= y_rule[j] * M, "cons2_%s" % j | |
| else: | |
| soln_file = os.path.join(project, "solution_no_direction.txt") | |
| for j in rules_index: | |
| lp_prob += v_rule[j] >= y_rule[j] * -M, "cons1_%s" % j | |
| lp_prob += v_rule[j] <= y_rule[j] * M, "cons2_%s" % j | |
| for j in rules_index: | |
| lp_prob += v_rule_obj[j] >= v_rule[j] | |
| lp_prob += v_rule_obj[j] >= -v_rule[j] | |
| # constraint 5: customized constraints | |
| # the number of steps of the pathway | |
| lp_prob += pulp.lpSum([v_rule_obj[j] for j in rules_index]) == 2 | |
| ### solve | |
| integer_cuts(lp_prob,pulp_solver,iterations,rules_index,y_rule,v_rule,soln_file,direction) | |
| def integer_cuts(lp_prob,pulp_solver,iterations,rules_index,y_rule,v_rule,soln_file,direction): | |
| """add integer cut constraints to a mixed-integer linear programming problem | |
| (MILP). The aim of such constraints is to find alternative solutions by | |
| adding constraints to exclude the already explored solutions. | |
| Reference: Optimization Methods in Metabolic Networks By Costas D. Maranas, | |
| Ali R. Zomorrodi, Chapter 4.2.2 Finding alternative optimal integer | |
| solutions | |
| Returns | |
| ------- | |
| type | |
| Description of returned object. | |
| """ | |
| for sol_num in range(1, iterations + 1): | |
| integer_cut_rules = [] | |
| # optinal output: lp file for debug | |
| lp_prob.writeLP('./test.lp') | |
| # if pulp_solver = "SCIP": | |
| # status, values = pulp_solver.solve(lp_prob) | |
| lp_prob.solve(pulp_solver) | |
| # pulp_solver.solve(lp_prob) | |
| print("Status:", pulp.LpStatus[lp_prob.status]) | |
| if pulp.LpStatus[lp_prob.status] != 'Optimal': | |
| break | |
| print('-----------rules--------------') | |
| with open(soln_file,'a') as f: | |
| f.write('iteration,' + str(sol_num)) | |
| f.write('\n') | |
| for r in rules_index: | |
| if (v_rule[r].varValue >= 0.1 or v_rule[r].varValue <=-0.1): | |
| dG_info = '' | |
| if (v_rule[r].varValue > 0 and direction[r] == 1) or (v_rule[r].varValue < 0 and direction[r] == 2): | |
| # print("##### Found ####: " + str(r)) | |
| # with open(soln_file,'a') as f: | |
| # f.write('##### Found ####: ' + str(r)) | |
| # f.write('\n') | |
| dG_info = ' * Thermodynamically infeasible' | |
| print("##### Found ####: " + str(r) + dG_info) | |
| integer_cut_rules.append(r) | |
| print(r,v_rule[r].varValue) | |
| with open(soln_file,'a') as f: | |
| f.write(r + ',' + str(v_rule[r].varValue) + dG_info) | |
| f.write('\n') | |
| length = len(integer_cut_rules) - 1 | |
| lp_prob += ( | |
| pulp.lpSum([y_rule[r] for r in integer_cut_rules]) <= length, | |
| "integer_cut_" + str(sol_num), | |
| ) | |
| def test_bdo(): | |
| exchange_mets = { | |
| 'C00091': -1, # Succinyl-CoA | |
| 'C00004': -4, # NADH | |
| 'C00003': 4, # NAD+ | |
| 'C00010': 1, # coa | |
| 'C00001':1, # h2O | |
| '14bdo': 1, | |
| } | |
| novel_mets = { | |
| '14bdo': 'OCCCCO' | |
| } | |
| iterations = 50 | |
| project = './novoStoic_result' | |
| # path_to_cplex = '/Users/linuswang/Applications/IBM/ILOG/CPLEX_Studio1261/cplex/bin/x86-64_osx/cplex' | |
| # pulp_solver = pulp.CPLEX_CMD(path=path_to_cplex,keepFiles=0, mip=1, msg=1) | |
| pulp_solver = pulp.CPLEX_CMD(path=None,keepFiles=0, mip=1, msg=1) | |
| # pulp_solver = pulp.solvers.GUROBI_CMD() | |
| # pulp_solver = pulp.solvers.GLPK_CMD() | |
| use_direction=True | |
| novoStoic_minFlux_relaxedRule(exchange_mets, novel_mets,project,iterations,pulp_solver,use_direction) | |
| use_direction=False | |
| novoStoic_minFlux_relaxedRule(exchange_mets, novel_mets,project,iterations,pulp_solver,use_direction) | |
| def test_isovalarate(): | |
| exchange_mets = { | |
| 'C00141': -1, # 2-keto isovalarate | |
| 'C00004': -1, # NADH | |
| 'C00003': 1, # NAD+ | |
| "C14710": 1, # isobutanol C4H10O | |
| 'C00011': 1, # co2 | |
| } | |
| novel_mets = {} | |
| iterations = 50 | |
| project = './novoStoic_isovalarate' | |
| # path_to_cplex = '/Users/linuswang/Applications/IBM/ILOG/CPLEX_Studio1261/cplex/bin/x86-64_osx/cplex' | |
| # pulp_solver = pulp.CPLEX_CMD(path=path_to_cplex,keepFiles=0, mip=1, msg=1) | |
| pulp_solver = pulp.CPLEX_CMD(path=None,keepFiles=0, mip=1, msg=1) | |
| # pulp_solver = pulp.solvers.GUROBI_CMD() | |
| # pulp_solver = pulp.GLPK_CMD() | |
| # use_direction=True | |
| # novoStoic_minFlux_relaxedRule(exchange_mets, novel_mets,project,iterations,pulp_solver,use_direction) | |
| use_direction=False | |
| novoStoic_minFlux_relaxedRule(exchange_mets, novel_mets,project,iterations,pulp_solver,use_direction) | |
| if __name__ == '__main__': | |
| test_isovalarate() | |