jtvae-demo / fast_jtnn /jtprop_vae.py
Trương Gia Bảo
Initial commit
a3ea5d3
raw
history blame
13.6 kB
import torch
import torch.nn as nn
import torch.nn.functional as F
from mol_tree import Vocab, MolTree
from nnutils import create_var, flatten_tensor, avg_pool
from jtnn_enc import JTNNEncoder
from jtnn_dec import JTNNDecoder
from mpn import MPN
from jtmpn import JTMPN
from datautils import tensorize
from chemutils import enum_assemble, set_atommap, copy_edit_mol, attach_mols
import rdkit
import rdkit.Chem as Chem
from rdkit import DataStructs
from rdkit.Chem import AllChem
import copy, math
class JTPropVAE(nn.Module):
def __init__(self, vocab, hidden_size, latent_size, depthT, depthG):
super(JTPropVAE, self).__init__()
self.vocab = vocab
self.hidden_size = hidden_size
self.latent_size = latent_size = int(latent_size / 2) #Tree and Mol has two vectors
self.jtnn = JTNNEncoder(hidden_size, depthT, nn.Embedding(vocab.size(), hidden_size))
self.decoder = JTNNDecoder(vocab, hidden_size, latent_size, nn.Embedding(vocab.size(), hidden_size))
self.jtmpn = JTMPN(hidden_size, depthG)
self.mpn = MPN(hidden_size, depthG)
self.A_assm = nn.Linear(latent_size, hidden_size, bias=False)
# self.assm_loss = nn.CrossEntropyLoss(size_average=False)
self.assm_loss = nn.CrossEntropyLoss(reduction='sum')
self.T_mean = nn.Linear(hidden_size, latent_size)
self.T_var = nn.Linear(hidden_size, latent_size)
self.G_mean = nn.Linear(hidden_size, latent_size)
self.G_var = nn.Linear(hidden_size, latent_size)
# Prop
self.propNN = nn.Sequential(
nn.Linear(self.latent_size*2, self.hidden_size),
nn.Tanh(),
nn.Linear(self.hidden_size, 1)
)
self.prop_loss = nn.MSELoss()
def encode(self, jtenc_holder, mpn_holder):
tree_vecs, tree_mess = self.jtnn(*jtenc_holder)
mol_vecs = self.mpn(*mpn_holder)
return tree_vecs, tree_mess, mol_vecs
def encode_from_smiles(self, smiles_list):
tree_batch = [MolTree(s) for s in smiles_list]
_, jtenc_holder, mpn_holder = tensorize(tree_batch, self.vocab, assm=False)
tree_vecs, _, mol_vecs = self.encode(jtenc_holder, mpn_holder)
return torch.cat([tree_vecs, mol_vecs], dim=-1)
def encode_latent(self, jtenc_holder, mpn_holder):
tree_vecs, _ = self.jtnn(*jtenc_holder)
mol_vecs = self.mpn(*mpn_holder)
tree_mean = self.T_mean(tree_vecs)
mol_mean = self.G_mean(mol_vecs)
tree_var = -torch.abs(self.T_var(tree_vecs))
mol_var = -torch.abs(self.G_var(mol_vecs))
return torch.cat([tree_mean, mol_mean], dim=1), torch.cat([tree_var, mol_var], dim=1)
def rsample(self, z_vecs, W_mean, W_var):
batch_size = z_vecs.size(0)
z_mean = W_mean(z_vecs)
z_log_var = -torch.abs(W_var(z_vecs)) #Following Mueller et al.
kl_loss = -0.5 * torch.sum(1.0 + z_log_var - z_mean * z_mean - torch.exp(z_log_var)) / batch_size
epsilon = create_var(torch.randn_like(z_mean))
z_vecs = z_mean + torch.exp(z_log_var / 2) * epsilon
return z_vecs, kl_loss
def sample_prior(self, prob_decode=False):
z_tree = torch.randn(1, self.latent_size).cuda()
z_mol = torch.randn(1, self.latent_size).cuda()
return self.decode(z_tree, z_mol, prob_decode)
def forward(self, x_batch, beta):
x_batch, x_jtenc_holder, x_mpn_holder, x_jtmpn_holder, prop_batch = x_batch
x_tree_vecs, x_tree_mess, x_mol_vecs = self.encode(x_jtenc_holder, x_mpn_holder)
z_tree_vecs,tree_kl = self.rsample(x_tree_vecs, self.T_mean, self.T_var)
z_mol_vecs,mol_kl = self.rsample(x_mol_vecs, self.G_mean, self.G_var)
kl_div = tree_kl + mol_kl
word_loss, topo_loss, word_acc, topo_acc = self.decoder(x_batch, z_tree_vecs)
assm_loss, assm_acc = self.assm(x_batch, x_jtmpn_holder, z_mol_vecs, x_tree_mess)
all_vec = torch.cat([z_tree_vecs, z_mol_vecs], dim=1)
# prop_label = create_var(torch.Tensor(prop_batch))
prop_label = create_var(torch.Tensor(prop_batch))
prop_loss = self.prop_loss(self.propNN(all_vec).squeeze(), prop_label)
return word_loss + topo_loss + assm_loss + beta * kl_div + prop_loss, kl_div.item(), word_acc, topo_acc, assm_acc, prop_loss.item()
def assm(self, mol_batch, jtmpn_holder, x_mol_vecs, x_tree_mess):
jtmpn_holder,batch_idx = jtmpn_holder
fatoms,fbonds,agraph,bgraph,scope = jtmpn_holder
batch_idx = create_var(batch_idx)
cand_vecs = self.jtmpn(fatoms, fbonds, agraph, bgraph, scope, x_tree_mess)
x_mol_vecs = x_mol_vecs.index_select(0, batch_idx)
x_mol_vecs = self.A_assm(x_mol_vecs) #bilinear
scores = torch.bmm(
x_mol_vecs.unsqueeze(1),
cand_vecs.unsqueeze(-1)
).squeeze()
cnt,tot,acc = 0,0,0
all_loss = []
for i,mol_tree in enumerate(mol_batch):
comp_nodes = [node for node in mol_tree.nodes if len(node.cands) > 1 and not node.is_leaf]
cnt += len(comp_nodes)
for node in comp_nodes:
label = node.cands.index(node.label)
ncand = len(node.cands)
cur_score = scores.narrow(0, tot, ncand)
tot += ncand
if cur_score.data[label] >= cur_score.max().item():
acc += 1
label = create_var(torch.LongTensor([label]))
all_loss.append( self.assm_loss(cur_score.view(1,-1), label) )
all_loss = sum(all_loss) / len(mol_batch)
return all_loss, acc * 1.0 / cnt
def decode(self, x_tree_vecs, x_mol_vecs, prob_decode):
#currently do not support batch decoding
assert x_tree_vecs.size(0) == 1 and x_mol_vecs.size(0) == 1
pred_root,pred_nodes = self.decoder.decode(x_tree_vecs, prob_decode)
if len(pred_nodes) == 0: return None
elif len(pred_nodes) == 1: return pred_root.smiles
#Mark nid & is_leaf & atommap
for i,node in enumerate(pred_nodes):
node.nid = i + 1
node.is_leaf = (len(node.neighbors) == 1)
if len(node.neighbors) > 1:
set_atommap(node.mol, node.nid)
scope = [(0, len(pred_nodes))]
jtenc_holder,mess_dict = JTNNEncoder.tensorize_nodes(pred_nodes, scope)
_,tree_mess = self.jtnn(*jtenc_holder)
tree_mess = (tree_mess, mess_dict) #Important: tree_mess is a matrix, mess_dict is a python dict
x_mol_vecs = self.A_assm(x_mol_vecs).squeeze() #bilinear
cur_mol = copy_edit_mol(pred_root.mol)
global_amap = [{}] + [{} for node in pred_nodes]
global_amap[1] = {atom.GetIdx():atom.GetIdx() for atom in cur_mol.GetAtoms()}
cur_mol,_ = self.dfs_assemble(tree_mess, x_mol_vecs, pred_nodes, cur_mol, global_amap, [], pred_root, None, prob_decode, check_aroma=True)
if cur_mol is None:
cur_mol = copy_edit_mol(pred_root.mol)
global_amap = [{}] + [{} for node in pred_nodes]
global_amap[1] = {atom.GetIdx():atom.GetIdx() for atom in cur_mol.GetAtoms()}
cur_mol,pre_mol = self.dfs_assemble(tree_mess, x_mol_vecs, pred_nodes, cur_mol, global_amap, [], pred_root, None, prob_decode, check_aroma=False)
if cur_mol is None: cur_mol = pre_mol
if cur_mol is None:
return None
cur_mol = cur_mol.GetMol()
set_atommap(cur_mol)
cur_mol = Chem.MolFromSmiles(Chem.MolToSmiles(cur_mol))
return Chem.MolToSmiles(cur_mol) if cur_mol is not None else None
def dfs_assemble(self, y_tree_mess, x_mol_vecs, all_nodes, cur_mol, global_amap, fa_amap, cur_node, fa_node, prob_decode, check_aroma):
fa_nid = fa_node.nid if fa_node is not None else -1
prev_nodes = [fa_node] if fa_node is not None else []
children = [nei for nei in cur_node.neighbors if nei.nid != fa_nid]
neighbors = [nei for nei in children if nei.mol.GetNumAtoms() > 1]
neighbors = sorted(neighbors, key=lambda x:x.mol.GetNumAtoms(), reverse=True)
singletons = [nei for nei in children if nei.mol.GetNumAtoms() == 1]
neighbors = singletons + neighbors
cur_amap = [(fa_nid,a2,a1) for nid,a1,a2 in fa_amap if nid == cur_node.nid]
cands,aroma_score = enum_assemble(cur_node, neighbors, prev_nodes, cur_amap)
if len(cands) == 0 or (sum(aroma_score) < 0 and check_aroma):
return None, cur_mol
cand_smiles,cand_amap = zip(*cands)
if torch.cuda.is_available():
aroma_score = torch.Tensor(aroma_score).cuda()
else:
aroma_score = torch.Tensor(aroma_score)
cands = [(smiles, all_nodes, cur_node) for smiles in cand_smiles]
if len(cands) > 1:
jtmpn_holder = JTMPN.tensorize(cands, y_tree_mess[1])
fatoms,fbonds,agraph,bgraph,scope = jtmpn_holder
cand_vecs = self.jtmpn(fatoms, fbonds, agraph, bgraph, scope, y_tree_mess[0])
scores = torch.mv(cand_vecs, x_mol_vecs) + aroma_score
else:
scores = torch.Tensor([1.0])
if prob_decode:
probs = F.softmax(scores.view(1,-1), dim=1).squeeze() + 1e-7 #prevent prob = 0
cand_idx = torch.multinomial(probs, probs.numel())
else:
_,cand_idx = torch.sort(scores, descending=True)
backup_mol = Chem.RWMol(cur_mol)
pre_mol = cur_mol
for i in range(cand_idx.numel()):
cur_mol = Chem.RWMol(backup_mol)
pred_amap = cand_amap[cand_idx[i].item()]
new_global_amap = copy.deepcopy(global_amap)
for nei_id,ctr_atom,nei_atom in pred_amap:
if nei_id == fa_nid:
continue
new_global_amap[nei_id][nei_atom] = new_global_amap[cur_node.nid][ctr_atom]
cur_mol = attach_mols(cur_mol, children, [], new_global_amap) #father is already attached
new_mol = cur_mol.GetMol()
new_mol = Chem.MolFromSmiles(Chem.MolToSmiles(new_mol))
if new_mol is None: continue
has_error = False
for nei_node in children:
if nei_node.is_leaf: continue
tmp_mol, tmp_mol2 = self.dfs_assemble(y_tree_mess, x_mol_vecs, all_nodes, cur_mol, new_global_amap, pred_amap, nei_node, cur_node, prob_decode, check_aroma)
if tmp_mol is None:
has_error = True
if i == 0: pre_mol = tmp_mol2
break
cur_mol = tmp_mol
if not has_error: return cur_mol, cur_mol
return None, pre_mol
def optimize(self, smiles, sim_cutoff, lr=2.0, num_iter=20):
# mol_tree = MolTree(smiles)
# mol_tree.recover()
tree_batch = [MolTree(smiles)]
_, jtenc_holder, mpn_holder = tensorize(tree_batch, self.vocab, assm=False)
tree_vec, _, mol_vec = self.encode(jtenc_holder, mpn_holder)
mol = Chem.MolFromSmiles(smiles)
fp1 = AllChem.GetMorganFingerprint(mol, 2)
tree_mean = self.T_mean(tree_vec)
tree_log_var = -torch.abs(self.T_var(tree_vec)) #Following Mueller et al.
mol_mean = self.G_mean(mol_vec)
mol_log_var = -torch.abs(self.G_var(mol_vec)) #Following Mueller et al.
mean = torch.cat([tree_mean, mol_mean], dim=1)
log_var = torch.cat([tree_log_var, mol_log_var], dim=1)
cur_vec = create_var(mean.data, True)
visited = []
for step in range(num_iter):
prop_val = self.propNN(cur_vec).squeeze()
grad = torch.autograd.grad(prop_val, cur_vec)[0]
cur_vec = cur_vec.data + lr * grad.data
cur_vec = create_var(cur_vec, True)
visited.append(cur_vec)
l,r = 0, num_iter - 1
while l < r - 1:
mid = (l + r) // 2
new_vec = visited[mid]
tree_vec,mol_vec = torch.chunk(new_vec, 2, dim=1)
new_smiles = self.decode(tree_vec, mol_vec, prob_decode=False)
if new_smiles is None:
r = mid - 1
continue
new_mol = Chem.MolFromSmiles(new_smiles)
fp2 = AllChem.GetMorganFingerprint(new_mol, 2)
sim = DataStructs.TanimotoSimilarity(fp1, fp2)
if sim < sim_cutoff:
r = mid - 1
else:
l = mid
"""
best_vec = visited[0]
for new_vec in visited:
tree_vec,mol_vec = torch.chunk(new_vec, 2, dim=1)
new_smiles = self.decode(tree_vec, mol_vec, prob_decode=False)
if new_smiles is None: continue
new_mol = Chem.MolFromSmiles(new_smiles)
fp2 = AllChem.GetMorganFingerprint(new_mol, 2)
sim = DataStructs.TanimotoSimilarity(fp1, fp2)
if sim >= sim_cutoff:
best_vec = new_vec
"""
tree_vec,mol_vec = torch.chunk(visited[l], 2, dim=1)
#tree_vec,mol_vec = torch.chunk(best_vec, 2, dim=1)
new_smiles = self.decode(tree_vec, mol_vec, prob_decode=False)
if new_smiles is None:
return None, None
new_mol = Chem.MolFromSmiles(new_smiles)
fp2 = AllChem.GetMorganFingerprint(new_mol, 2)
sim = DataStructs.TanimotoSimilarity(fp1, fp2)
if sim >= sim_cutoff:
return new_smiles, sim
else:
return None, None