import torch import torch.nn as nn import torch.nn.functional as F from mol_tree import Vocab, MolTree from nnutils import create_var, flatten_tensor, avg_pool from jtnn_enc import JTNNEncoder from jtnn_dec import JTNNDecoder from mpn import MPN from jtmpn import JTMPN from datautils import tensorize from chemutils import enum_assemble, set_atommap, copy_edit_mol, attach_mols import rdkit import rdkit.Chem as Chem from rdkit import DataStructs from rdkit.Chem import AllChem import copy, math class JTPropVAE(nn.Module): def __init__(self, vocab, hidden_size, latent_size, depthT, depthG): super(JTPropVAE, self).__init__() self.vocab = vocab self.hidden_size = hidden_size self.latent_size = latent_size = int(latent_size / 2) #Tree and Mol has two vectors self.jtnn = JTNNEncoder(hidden_size, depthT, nn.Embedding(vocab.size(), hidden_size)) self.decoder = JTNNDecoder(vocab, hidden_size, latent_size, nn.Embedding(vocab.size(), hidden_size)) self.jtmpn = JTMPN(hidden_size, depthG) self.mpn = MPN(hidden_size, depthG) self.A_assm = nn.Linear(latent_size, hidden_size, bias=False) # self.assm_loss = nn.CrossEntropyLoss(size_average=False) self.assm_loss = nn.CrossEntropyLoss(reduction='sum') self.T_mean = nn.Linear(hidden_size, latent_size) self.T_var = nn.Linear(hidden_size, latent_size) self.G_mean = nn.Linear(hidden_size, latent_size) self.G_var = nn.Linear(hidden_size, latent_size) # Prop self.propNN = nn.Sequential( nn.Linear(self.latent_size*2, self.hidden_size), nn.Tanh(), nn.Linear(self.hidden_size, 1) ) self.prop_loss = nn.MSELoss() def encode(self, jtenc_holder, mpn_holder): tree_vecs, tree_mess = self.jtnn(*jtenc_holder) mol_vecs = self.mpn(*mpn_holder) return tree_vecs, tree_mess, mol_vecs def encode_from_smiles(self, smiles_list): tree_batch = [MolTree(s) for s in smiles_list] _, jtenc_holder, mpn_holder = tensorize(tree_batch, self.vocab, assm=False) tree_vecs, _, mol_vecs = self.encode(jtenc_holder, mpn_holder) return torch.cat([tree_vecs, mol_vecs], dim=-1) def encode_latent(self, jtenc_holder, mpn_holder): tree_vecs, _ = self.jtnn(*jtenc_holder) mol_vecs = self.mpn(*mpn_holder) tree_mean = self.T_mean(tree_vecs) mol_mean = self.G_mean(mol_vecs) tree_var = -torch.abs(self.T_var(tree_vecs)) mol_var = -torch.abs(self.G_var(mol_vecs)) return torch.cat([tree_mean, mol_mean], dim=1), torch.cat([tree_var, mol_var], dim=1) def rsample(self, z_vecs, W_mean, W_var): batch_size = z_vecs.size(0) z_mean = W_mean(z_vecs) z_log_var = -torch.abs(W_var(z_vecs)) #Following Mueller et al. kl_loss = -0.5 * torch.sum(1.0 + z_log_var - z_mean * z_mean - torch.exp(z_log_var)) / batch_size epsilon = create_var(torch.randn_like(z_mean)) z_vecs = z_mean + torch.exp(z_log_var / 2) * epsilon return z_vecs, kl_loss def sample_prior(self, prob_decode=False): z_tree = torch.randn(1, self.latent_size).cuda() z_mol = torch.randn(1, self.latent_size).cuda() return self.decode(z_tree, z_mol, prob_decode) def forward(self, x_batch, beta): x_batch, x_jtenc_holder, x_mpn_holder, x_jtmpn_holder, prop_batch = x_batch x_tree_vecs, x_tree_mess, x_mol_vecs = self.encode(x_jtenc_holder, x_mpn_holder) z_tree_vecs,tree_kl = self.rsample(x_tree_vecs, self.T_mean, self.T_var) z_mol_vecs,mol_kl = self.rsample(x_mol_vecs, self.G_mean, self.G_var) kl_div = tree_kl + mol_kl word_loss, topo_loss, word_acc, topo_acc = self.decoder(x_batch, z_tree_vecs) assm_loss, assm_acc = self.assm(x_batch, x_jtmpn_holder, z_mol_vecs, x_tree_mess) all_vec = torch.cat([z_tree_vecs, z_mol_vecs], dim=1) # prop_label = create_var(torch.Tensor(prop_batch)) prop_label = create_var(torch.Tensor(prop_batch)) prop_loss = self.prop_loss(self.propNN(all_vec).squeeze(), prop_label) return word_loss + topo_loss + assm_loss + beta * kl_div + prop_loss, kl_div.item(), word_acc, topo_acc, assm_acc, prop_loss.item() def assm(self, mol_batch, jtmpn_holder, x_mol_vecs, x_tree_mess): jtmpn_holder,batch_idx = jtmpn_holder fatoms,fbonds,agraph,bgraph,scope = jtmpn_holder batch_idx = create_var(batch_idx) cand_vecs = self.jtmpn(fatoms, fbonds, agraph, bgraph, scope, x_tree_mess) x_mol_vecs = x_mol_vecs.index_select(0, batch_idx) x_mol_vecs = self.A_assm(x_mol_vecs) #bilinear scores = torch.bmm( x_mol_vecs.unsqueeze(1), cand_vecs.unsqueeze(-1) ).squeeze() cnt,tot,acc = 0,0,0 all_loss = [] for i,mol_tree in enumerate(mol_batch): comp_nodes = [node for node in mol_tree.nodes if len(node.cands) > 1 and not node.is_leaf] cnt += len(comp_nodes) for node in comp_nodes: label = node.cands.index(node.label) ncand = len(node.cands) cur_score = scores.narrow(0, tot, ncand) tot += ncand if cur_score.data[label] >= cur_score.max().item(): acc += 1 label = create_var(torch.LongTensor([label])) all_loss.append( self.assm_loss(cur_score.view(1,-1), label) ) all_loss = sum(all_loss) / len(mol_batch) return all_loss, acc * 1.0 / cnt def decode(self, x_tree_vecs, x_mol_vecs, prob_decode): #currently do not support batch decoding assert x_tree_vecs.size(0) == 1 and x_mol_vecs.size(0) == 1 pred_root,pred_nodes = self.decoder.decode(x_tree_vecs, prob_decode) if len(pred_nodes) == 0: return None elif len(pred_nodes) == 1: return pred_root.smiles #Mark nid & is_leaf & atommap for i,node in enumerate(pred_nodes): node.nid = i + 1 node.is_leaf = (len(node.neighbors) == 1) if len(node.neighbors) > 1: set_atommap(node.mol, node.nid) scope = [(0, len(pred_nodes))] jtenc_holder,mess_dict = JTNNEncoder.tensorize_nodes(pred_nodes, scope) _,tree_mess = self.jtnn(*jtenc_holder) tree_mess = (tree_mess, mess_dict) #Important: tree_mess is a matrix, mess_dict is a python dict x_mol_vecs = self.A_assm(x_mol_vecs).squeeze() #bilinear cur_mol = copy_edit_mol(pred_root.mol) global_amap = [{}] + [{} for node in pred_nodes] global_amap[1] = {atom.GetIdx():atom.GetIdx() for atom in cur_mol.GetAtoms()} cur_mol,_ = self.dfs_assemble(tree_mess, x_mol_vecs, pred_nodes, cur_mol, global_amap, [], pred_root, None, prob_decode, check_aroma=True) if cur_mol is None: cur_mol = copy_edit_mol(pred_root.mol) global_amap = [{}] + [{} for node in pred_nodes] global_amap[1] = {atom.GetIdx():atom.GetIdx() for atom in cur_mol.GetAtoms()} cur_mol,pre_mol = self.dfs_assemble(tree_mess, x_mol_vecs, pred_nodes, cur_mol, global_amap, [], pred_root, None, prob_decode, check_aroma=False) if cur_mol is None: cur_mol = pre_mol if cur_mol is None: return None cur_mol = cur_mol.GetMol() set_atommap(cur_mol) cur_mol = Chem.MolFromSmiles(Chem.MolToSmiles(cur_mol)) return Chem.MolToSmiles(cur_mol) if cur_mol is not None else None def dfs_assemble(self, y_tree_mess, x_mol_vecs, all_nodes, cur_mol, global_amap, fa_amap, cur_node, fa_node, prob_decode, check_aroma): fa_nid = fa_node.nid if fa_node is not None else -1 prev_nodes = [fa_node] if fa_node is not None else [] children = [nei for nei in cur_node.neighbors if nei.nid != fa_nid] neighbors = [nei for nei in children if nei.mol.GetNumAtoms() > 1] neighbors = sorted(neighbors, key=lambda x:x.mol.GetNumAtoms(), reverse=True) singletons = [nei for nei in children if nei.mol.GetNumAtoms() == 1] neighbors = singletons + neighbors cur_amap = [(fa_nid,a2,a1) for nid,a1,a2 in fa_amap if nid == cur_node.nid] cands,aroma_score = enum_assemble(cur_node, neighbors, prev_nodes, cur_amap) if len(cands) == 0 or (sum(aroma_score) < 0 and check_aroma): return None, cur_mol cand_smiles,cand_amap = zip(*cands) if torch.cuda.is_available(): aroma_score = torch.Tensor(aroma_score).cuda() else: aroma_score = torch.Tensor(aroma_score) cands = [(smiles, all_nodes, cur_node) for smiles in cand_smiles] if len(cands) > 1: jtmpn_holder = JTMPN.tensorize(cands, y_tree_mess[1]) fatoms,fbonds,agraph,bgraph,scope = jtmpn_holder cand_vecs = self.jtmpn(fatoms, fbonds, agraph, bgraph, scope, y_tree_mess[0]) scores = torch.mv(cand_vecs, x_mol_vecs) + aroma_score else: scores = torch.Tensor([1.0]) if prob_decode: probs = F.softmax(scores.view(1,-1), dim=1).squeeze() + 1e-7 #prevent prob = 0 cand_idx = torch.multinomial(probs, probs.numel()) else: _,cand_idx = torch.sort(scores, descending=True) backup_mol = Chem.RWMol(cur_mol) pre_mol = cur_mol for i in range(cand_idx.numel()): cur_mol = Chem.RWMol(backup_mol) pred_amap = cand_amap[cand_idx[i].item()] new_global_amap = copy.deepcopy(global_amap) for nei_id,ctr_atom,nei_atom in pred_amap: if nei_id == fa_nid: continue new_global_amap[nei_id][nei_atom] = new_global_amap[cur_node.nid][ctr_atom] cur_mol = attach_mols(cur_mol, children, [], new_global_amap) #father is already attached new_mol = cur_mol.GetMol() new_mol = Chem.MolFromSmiles(Chem.MolToSmiles(new_mol)) if new_mol is None: continue has_error = False for nei_node in children: if nei_node.is_leaf: continue tmp_mol, tmp_mol2 = self.dfs_assemble(y_tree_mess, x_mol_vecs, all_nodes, cur_mol, new_global_amap, pred_amap, nei_node, cur_node, prob_decode, check_aroma) if tmp_mol is None: has_error = True if i == 0: pre_mol = tmp_mol2 break cur_mol = tmp_mol if not has_error: return cur_mol, cur_mol return None, pre_mol def optimize(self, smiles, sim_cutoff, lr=2.0, num_iter=20): # mol_tree = MolTree(smiles) # mol_tree.recover() tree_batch = [MolTree(smiles)] _, jtenc_holder, mpn_holder = tensorize(tree_batch, self.vocab, assm=False) tree_vec, _, mol_vec = self.encode(jtenc_holder, mpn_holder) mol = Chem.MolFromSmiles(smiles) fp1 = AllChem.GetMorganFingerprint(mol, 2) tree_mean = self.T_mean(tree_vec) tree_log_var = -torch.abs(self.T_var(tree_vec)) #Following Mueller et al. mol_mean = self.G_mean(mol_vec) mol_log_var = -torch.abs(self.G_var(mol_vec)) #Following Mueller et al. mean = torch.cat([tree_mean, mol_mean], dim=1) log_var = torch.cat([tree_log_var, mol_log_var], dim=1) cur_vec = create_var(mean.data, True) visited = [] for step in range(num_iter): prop_val = self.propNN(cur_vec).squeeze() grad = torch.autograd.grad(prop_val, cur_vec)[0] cur_vec = cur_vec.data + lr * grad.data cur_vec = create_var(cur_vec, True) visited.append(cur_vec) l,r = 0, num_iter - 1 while l < r - 1: mid = (l + r) // 2 new_vec = visited[mid] tree_vec,mol_vec = torch.chunk(new_vec, 2, dim=1) new_smiles = self.decode(tree_vec, mol_vec, prob_decode=False) if new_smiles is None: r = mid - 1 continue new_mol = Chem.MolFromSmiles(new_smiles) fp2 = AllChem.GetMorganFingerprint(new_mol, 2) sim = DataStructs.TanimotoSimilarity(fp1, fp2) if sim < sim_cutoff: r = mid - 1 else: l = mid """ best_vec = visited[0] for new_vec in visited: tree_vec,mol_vec = torch.chunk(new_vec, 2, dim=1) new_smiles = self.decode(tree_vec, mol_vec, prob_decode=False) if new_smiles is None: continue new_mol = Chem.MolFromSmiles(new_smiles) fp2 = AllChem.GetMorganFingerprint(new_mol, 2) sim = DataStructs.TanimotoSimilarity(fp1, fp2) if sim >= sim_cutoff: best_vec = new_vec """ tree_vec,mol_vec = torch.chunk(visited[l], 2, dim=1) #tree_vec,mol_vec = torch.chunk(best_vec, 2, dim=1) new_smiles = self.decode(tree_vec, mol_vec, prob_decode=False) if new_smiles is None: return None, None new_mol = Chem.MolFromSmiles(new_smiles) fp2 = AllChem.GetMorganFingerprint(new_mol, 2) sim = DataStructs.TanimotoSimilarity(fp1, fp2) if sim >= sim_cutoff: return new_smiles, sim else: return None, None