from pathlib import Path
import torch
from st_on_hover_tabs import on_hover_tabs
import streamlit as st
st.set_page_config(layout="wide")
model_path = './model.iter-685000'
import sys, os
import rdkit
import rdkit.Chem as Chem
from rdkit.Chem.Draw import MolToImage
from rdkit.Chem import Descriptors
import sascorer
import networkx as nx
from stqdm import stqdm
import base64, io
import pandas as pd
import streamlit_ext as ste
os.environ['KMP_DUPLICATE_LIB_OK']='True'
sys.path.append('%s/fast_jtnn/' % os.path.dirname(os.path.realpath(__file__)))
from mol_tree import Vocab, MolTree
from jtprop_vae import JTPropVAE
from molbloom import buy
css='''
[data-testid="metric-container"] {
width: fit-content;
margin: auto;
}
[data-testid="metric-container"] > div {
width: fit-content;
margin: auto;
}
[data-testid="metric-container"] label {
width: fit-content;
margin: auto;
[data-testid="stDataFrameResizable"] {
width: fit-content;
margin: auto;
}
}
'''
st.markdown(f'',unsafe_allow_html=True)
s_buff = io.StringIO()
def img_to_bytes(img_path):
img_bytes = Path(img_path).read_bytes()
encoded = base64.b64encode(img_bytes).decode()
return encoded
def img_to_html(img_path):
img_html = "".format(
img_to_bytes(img_path)
)
return img_html
_mcf = pd.read_csv('./mcf.csv')
_pains = pd.read_csv('./wehi_pains.csv',
names=['smarts', 'names'])
_mcf_filters = [Chem.MolFromSmarts(x) for x in
_mcf['smarts'].values]
_pains_filters = [Chem.MolFromSmarts(x) for x in
_pains['smarts'].values]
def mol_passes_filters_custom(mol,
allowed=None,
isomericSmiles=False):
"""
Checks if mol
* passes MCF and PAINS filters,
* has only allowed atoms
* is not charged
"""
allowed = allowed or {'C', 'N', 'S', 'O', 'F', 'Cl', 'Br', 'H'}
if mol is None:
return 'NoMol'
ring_info = mol.GetRingInfo()
if ring_info.NumRings() != 0 and any(
len(x) >= 8 for x in ring_info.AtomRings()
):
return 'ManyRings'
h_mol = Chem.AddHs(mol)
if any(atom.GetFormalCharge() != 0 for atom in mol.GetAtoms()):
return 'Charged'
if any(atom.GetSymbol() not in allowed for atom in mol.GetAtoms()):
return 'AtomNotAllowed'
if any(h_mol.HasSubstructMatch(smarts) for smarts in _mcf_filters):
return 'MCF'
if any(h_mol.HasSubstructMatch(smarts) for smarts in _pains_filters):
return 'PAINS'
smiles = Chem.MolToSmiles(mol, isomericSmiles=isomericSmiles)
if smiles is None or len(smiles) == 0:
return 'Isomeric'
if Chem.MolFromSmiles(smiles) is None:
return 'Isomeric'
return 'YES'
def penalized_logp_standard(mol):
logP_mean = 2.4399606244103639873799239
logP_std = 0.9293197802518905481505840
SA_mean = -2.4485512208785431553792478
SA_std = 0.4603110476923852334429910
cycle_mean = -0.0307270378623088931402396
cycle_std = 0.2163675785228087178335699
log_p = Descriptors.MolLogP(mol)
SA = -sascorer.calculateScore(mol)
# cycle score
cycle_list = nx.cycle_basis(nx.Graph(Chem.rdmolops.GetAdjacencyMatrix(mol)))
if len(cycle_list) == 0:
cycle_length = 0
else:
cycle_length = max([len(j) for j in cycle_list])
if cycle_length <= 6:
cycle_length = 0
else:
cycle_length = cycle_length - 6
cycle_score = -cycle_length
# print(logP_mean)
standardized_log_p = (log_p - logP_mean) / logP_std
standardized_SA = (SA - SA_mean) / SA_std
standardized_cycle = (cycle_score - cycle_mean) / cycle_std
return log_p,SA,cycle_score,standardized_log_p + standardized_SA + standardized_cycle
lg = rdkit.RDLogger.logger()
lg.setLevel(rdkit.RDLogger.CRITICAL)
def About():
descrip = '''
We seek to automate the design of molecules based on specific chemical properties. In computational terms, this task involves continuous embedding and generation of molecular graphs. Our primary contribution is the direct realization of molecular graphs, a task previously approached by generating linear SMILES strings instead of graphs. Our junction tree variational autoencoder generates molecular graphs in two phases, by first generating a tree-structured scaffold over chemical substructures, and then combining them into a molecule with a graph message passing network. This approach allows us to incrementally expand molecules while maintaining chemical validity at every step. We evaluate our model on multiple tasks ranging from molecular generation to optimization. Across these tasks, our model outperforms previous state-of-the-art baselines by a significant margin.
[https://arxiv.org/abs/1802.04364](https://arxiv.org/abs/1802.04364)'''
st.markdown(descrip)
st.markdown("
"+ img_to_html('about.png')+ "
", unsafe_allow_html=True) def Optimize_a_molecule(): st.markdown("SMILES is invalid. Please enter a valid SMILES.
",unsafe_allow_html=True) else: canon_smiles = Chem.MolToSmiles(mol) st.markdown("Canonicalized SMILES",unsafe_allow_html=True) st.code(canon_smiles) logp,sa,cycle,pen_p = penalized_logp_standard(mol) moses_passed = mol_passes_filters_custom(mol) if moses_passed=='YES': st.markdown("MOSES filters passed successfully.
",unsafe_allow_html=True) else: st.markdown("MOSES filters passed failed. Please use another molecule.
",unsafe_allow_html=True) # with st.columns(3)[1]: # st.markdown("",unsafe_allow_html=True) imgByteArr = io.BytesIO() MolToImage(mol,size=(400,400)).save(imgByteArr,format='PNG') st.markdown(""+
f""+
"
New SMILES is invalid! Please choose another setting.
",unsafe_allow_html=True) # st.write('New SMILES is invalid.') else: # st.write('New SMILES molecule:') imgByteArr = io.BytesIO() MolToImage(new_mol,size=(400,400)).save(imgByteArr,format='PNG') st.markdown(""+
f""+
"
MOSES filters passed successfully.
",unsafe_allow_html=True) else: st.markdown("MOSES filters passed failed.
",unsafe_allow_html=True) new_logp,new_sa,new_cycle,new_pen_p = penalized_logp_standard(new_mol) # st.write('New penalized logP score: %.5f' % (new_score)) col12, col22, col32, col42 = st.columns(4) col12.metric('LogP', '%.5f' % (new_logp),'%.5f'%(new_logp-logp)) col22.metric('SA score', '%.5f' % (-new_sa),'%.5f'%(-new_sa+sa),delta_color='inverse') col32.metric('Cycle score', '%d' % (-new_cycle),'%d'%(-new_cycle+cycle),delta_color='inverse') col42.metric('Penalized LogP', '%.5f' % (new_pen_p),'%.5f'%(new_pen_p-pen_p)) # st.metric('New penalized logP score','%.5f' % (new_score), '%.5f'%(new_score-score)) st.metric('Similarity','%.5f' % (sim)) # st.write('Caching ZINC20 if necessary...') with st.spinner("Caching ZINC20 if necessary..."): if buy(new_smiles, catalog='zinc20',canonicalize=True): st.write('This molecule exists.') st.markdown("Checked using molbloom
",unsafe_allow_html=True) def Optimize_a_batch(): st.markdown("Please use Optimize a molecule tab.
",unsafe_allow_html=True) else: def check_smiles(): check = [] for smi in stqdm(smiles_list): mol = Chem.MolFromSmiles(smi) if (mol is not None) and (mol_passes_filters_custom(mol) == 'YES'): check.append(Chem.MolToSmiles(mol)) else: check.append('invalid') st.session_state['df'] = pd.concat([df,pd.DataFrame({'checked':check})],axis=1) df = pd.DataFrame({'SMILES':smiles_list}) st.dataframe(df,use_container_width=True) st.button('Check SMILES',key='check',on_click=check_smiles) if 'df' in st.session_state: # st.markdown('Number of SMILES: '+str(len(st.session_state.smiles_list))) df = st.session_state['df'] st.markdown('Number of passed SMILES: '+str(df[df.checked != 'invalid'].shape[0])) st.dataframe(df,use_container_width=True) download_df(df,0) with st.form("Choose score to calculate"): st.caption('Penalized LogP is always calculated.') logp_cal= st.checkbox('LogP',key='logp_cal') sa_cal= st.checkbox('SA score',key='sa_cal') cycle_cal= st.checkbox('Cycle score',key='cycle_cal') calc = st.form_submit_button("Keep passed SMILES and calculate scores") # st.session_state['pen_logp_cal'] = True if calc: smiles_list = list(df[df.checked != 'invalid'].SMILES) # st.write(smiles_list) score_df = pd.DataFrame({'SMILES':smiles_list}) scores =[] # pen_logp_ls = logp_ls = sa_ls = cycle_ls =[] st.session_state.sc_name = ['logp','sa','cycle','pen_logp'] for smi in stqdm(smiles_list): logp,sa,cycle,pen_logp = penalized_logp_standard(Chem.MolFromSmiles(smi)) # st.write(f"{logp}\t{sa}\t{cycle}\t{pen_logp}") scores+=[(logp,sa,cycle,pen_logp)] s_df = pd.DataFrame(scores,columns=st.session_state.sc_name) # st.write(st.session_state.log_ls) # to_concat = [score_df] + [d for d,checked in zip(dfs,[logp_cal,sa_cal,cycle_cal]) if checked] + [pd.DataFrame({'pen_logp':pen_logp_ls})] for n, checked in zip(st.session_state.sc_name,[logp_cal,sa_cal,cycle_cal,True]): if checked: score_df = pd.concat([score_df,s_df[n]],axis=1) # score_df['pen_logp'] = st.session_state.pen_logp_ls st.session_state.score_df = score_df st.dataframe(score_df,use_container_width=True) download_df(score_df,1) def batch_generate(smiles_list,df): vocab = [x.strip("\r\n ") for x in open('./vocab.txt')] vocab = Vocab(vocab) container = st.empty() with container.container(): st_lottie(get_ani_json(), height=200, width=300) st.markdown('Please wait...') model = JTPropVAE(vocab, 450, 56, 20, 3) model.load_state_dict(torch.load(model_path,map_location=torch.device('cpu'))) new_scores =[] for canon_smiles in stqdm(smiles_list): new_smiles,sim = model.optimize(canon_smiles, sim_cutoff=st.session_state.sim_cutoff, lr=st.session_state.lr, num_iter=st.session_state.n_iter) if new_smiles is None: new_scores+=[('invalid',-100.0,-100.0,-100.0,-100.0,-100.0)] else: new_mol = Chem.MolFromSmiles(new_smiles) if new_mol is None: new_scores+=[('invalid',-100.0,-100.0,-100.0,-100.0,-100.0)] else: logp,sa,cycle,pen_logp = penalized_logp_standard(new_mol) new_scores+=[(new_smiles,sim,logp,sa,cycle,pen_logp)] del model with container.container(): sc_name = ['new_'+n for n in st.session_state.sc_name] s_df = pd.DataFrame(new_scores,columns=['new_smiles','sim']+sc_name) new_score_df = df # st.write(st.session_state.log_ls) # to_concat = [score_df] + [d for d,checked in zip(dfs,[logp_cal,sa_cal,cycle_cal]) if checked] + [pd.DataFrame({'pen_logp':pen_logp_ls})] for n, checked in zip(['new_smiles','sim']+sc_name,[True, True,st.session_state.logp_cal,st.session_state.sa_cal,st.session_state.cycle_cal,True]): if checked: new_score_df = pd.concat([new_score_df,s_df[n]],axis=1) st.markdown("