In [2]:
import os, torch, torch.nn as nn, torch.utils.data as data, torchvision as tv
import lightning as L
import numpy as np, pandas as pd, matplotlib.pyplot as plt
from pytorch_lightning.loggers import WandbLogger

In [None]:
# create the datasets and dataloaders
train_voxels_path = '/home/ckadirt/brain2music/dataset/preproc/sub-001_Resp_Training.npy' # path to training voxels 65000 * 4800 
test_voxels_path = '/home/ckadirt/brain2music/dataset/preproc/sub-001_Resp_Test_Mean.npy' # path to test voxels 65000 * 600

train_embeddings_path = '/home/ckadirt/brain2music/dataset/Gtanz/audios/sub-001/encodec_embeddings_train.pt' # path to training embeddings 480 * 2 * 1125
test_embeddings_path = '/home/ckadirt/brain2music/dataset/Gtanz/audios/sub-001/encodec_embeddings_test.pt' # path to test embeddings 600 * 2 * 1125

class VoxelsDataset(data.Dataset):
 def __init__(self, voxels_path, embeddings_path):
 # transpose the two dimensions of the voxels data to match the embeddings data
 self.voxels = torch.from_numpy(np.load(voxels_path)).float().transpose(0, 1)
 self.embeddings = torch.load(embeddings_path)
 # as each stimulus has been exposed for 15 seconds and the fMRI data is sampled every 1.5 seconds, we take 10 samples per stimulus
 self.len = len(self.voxels) // 10

 def __getitem__(self, index):
 # as each stimulus has been exposed for 15 seconds and the fMRI data is sampled every 1.5 seconds, we take 10 samples per stimulus
 voxels = self.voxels[index*10:(index+1)*10]
 embeddings = self.embeddings[index]
 return voxels, embeddings

 def __len__(self):
 return self.len
 
class VoxelsEmbeddinsEncodecDataModule(L.LightningDataModule):
 def __init__(self, train_voxels_path, train_embeddings_path, test_voxels_path, test_embeddings_path, batch_size=32):
 super().__init__()
 self.train_voxels_path = train_voxels_path
 self.train_embeddings_path = train_embeddings_path
 self.test_voxels_path = test_voxels_path
 self.test_embeddings_path = test_embeddings_path
 self.batch_size = batch_size

 def setup(self, stage=None):
 self.train_dataset = VoxelsDataset(self.train_voxels_path, self.train_embeddings_path)
 self.test_dataset = VoxelsDataset(self.test_voxels_path, self.test_embeddings_path)

 def train_dataloader(self):
 return data.DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True)

 def test_dataloader(self):
 return data.DataLoader(self.test_dataset, batch_size=self.batch_size, shuffle=False)


In [None]:
class MLP(L.LightningModule):
 def __init__(self, sizes, residual_conections, dropout):
 # sizes is a list of the sizes of the layers ej: [4800, 1000, 1000, 1000, 1000, 1000, 1000, 600]
 # residual_conections is a list with the same length as sizes, each element is a list of the indexes of the layers that will recieve the output of the layer as input, 0 means that the layer will recieve the x inputs ej. [[0], [1], [2,1], [3], [4,3], [5], [6,5], [7]]
 # dropout is a list with the same length as sizes, each element is the dropout probability of the layer ej. [0.5, 0.5, 0.5, 0.5, 0.5, 0.5]
 super().__init__()
 self.sizes = sizes
 self.residual_conections = residual_conections
 self.dropout = dropout
 self.layers = nn.ModuleList()
 for i in range(len(sizes)-1):
 self.layers.append(nn.Linear(sizes[i], sizes[i+1]))
 self.relu = nn.ReLU()
 self.loss = nn.MSELoss()

 def forward(self, x):
 x_states = [x]
 for i in range(len(self.layers)):
 x = self.layers[i](x)
 for j in self.residual_conections[i]:
 x = x + x_states[j]
 x = self.relu(x)
 x = nn.Dropout(self.dropout[i])(x)
 x_states.append(x)

 return x
 
 def training_step(self, batch, batch_idx):
 voxels, embeddings = batch # the sizes are [batch_size, 10, 65000] and [batch_size, 2, 1125]
 # flatten the voxels to [batch_size, rest of the dimensions]
 embeddings = embeddings.flatten(start_dim=1) # the size is [batch_size, 2250]
 # take the mean of the second dimension of the voxels to get the mean of the 10 samples per stimulus
 voxels = voxels.mean(dim=1)
 voxels = voxels.flatten(start_dim=1) # the size is [batch_size, 65000]
 outputs = self(voxels)
 loss = self.loss(outputs, embeddings)
 self.log('train_loss', loss)
 return loss
 
 def validation_step(self, batch, batch_idx):
 voxels, embeddings = batch
 embeddings = embeddings.flatten(start_dim=1)
 voxels = voxels.mean(dim=1)
 voxels = voxels.flatten(start_dim=1)
 outputs = self(voxels)
 loss = self.loss(outputs, embeddings)
 self.log('val_loss', loss)
 return loss
 
 
 def configure_optimizers(self):
 return torch.optim.Adam(self.parameters(), lr=1e-3)
 

# create the model
sizes = [60784, 1000, 1000, 2250]
residual_conections = [[0], [1], [2,1], [3]]
dropout = [0.5, 0.5, 0.5, 0.5]
model = MLP(sizes, residual_conections, dropout)

# create the data module
data_module = VoxelsEmbeddinsEncodecDataModule(train_voxels_path, train_embeddings_path, test_voxels_path, test_embeddings_path, batch_size=32)


wandb_logger = WandbLogger(project='brain2music', entity='ckadirt')

# define the trainer
trainer = L.Trainer(devices=2, accelerator="gpu", max_epochs=100, logger=wandb_logger, precision='16-mixed')

# train the model
trainer.fit(model, data_module)


In [None]:
class MLP(L.LightningModule):
 def __init__(self, sizes, residual_conections, dropout):
 # sizes is a list of the sizes of the layers ej: [4800, 1000, 1000, 1000, 1000, 1000, 1000, 600]
 # residual_conections is a list with the same length as sizes, each element is a list of the indexes of the layers that will recieve the output of the layer as input, 0 means that the layer will recieve the x inputs ej. [[0], [1], [2,1], [3], [4,3], [5], [6,5], [7]]
 # dropout is a list with the same length as sizes, each element is the dropout probability of the layer ej. [0.5, 0.5, 0.5, 0.5, 0.5, 0.5]
 super().__init__()
 self.sizes = sizes
 self.residual_conections = residual_conections
 self.dropout = dropout
 self.layers = nn.Sequential()
 for i in range(len(sizes)-1):
 self.layers.add_module('linear'+str(i), nn.Linear(sizes[i], sizes[i+1]))
 self.layers.add_module('relu'+str(i), nn.ReLU())
 self.layers.add_module('dropout'+str(i), nn.Dropout(dropout[i]))

 self.loss = nn.MSELoss()

 def forward(self, x):
 return self.layers(x)
 
 def training_step(self, batch, batch_idx):
 voxels, embeddings = batch # the sizes are [batch_size, 10, 65000] and [batch_size, 2, 1125]
 # flatten the voxels to [batch_size, rest of the dimensions]
 embeddings = embeddings.flatten(start_dim=1) # the size is [batch_size, 2250]
 # take the mean of the second dimension of the voxels to get the mean of the 10 samples per stimulus
 voxels = voxels.mean(dim=1)
 voxels = voxels.flatten(start_dim=1) # the size is [batch_size, 65000]
 outputs = self(voxels)
 loss = self.loss(outputs, embeddings)
 self.log('train_loss', loss)
 return loss
 
 def validation_step(self, batch, batch_idx):
 voxels, embeddings = batch
 embeddings = embeddings.flatten(start_dim=1)
 voxels = voxels.mean(dim=1)
 voxels = voxels.flatten(start_dim=1)
 outputs = self(voxels)
 loss = self.loss(outputs, embeddings)
 self.log('val_loss', loss)
 return loss
 
 
 def configure_optimizers(self):
 return torch.optim.Adam(self.parameters(), lr=1e-5)
 

# create the model
sizes = [60784, 1000, 1000, 2250]
residual_conections = [[0], [1], [2], [3]]
dropout = [0.5, 0.5, 0.5, 0.5]
model = MLP(sizes, residual_conections, dropout)

# create the data module
data_module = VoxelsEmbeddinsEncodecDataModule(train_voxels_path, train_embeddings_path, test_voxels_path, test_embeddings_path, batch_size=32)


wandb_logger = WandbLogger(project='brain2music', entity='ckadirt')

# define the trainer
trainer = L.Trainer(devices=2, accelerator="gpu", max_epochs=400, logger=wandb_logger, precision='16-mixed', log_every_n_steps=10)

# train the model
trainer.fit(model, datamodule=data_module)


In [None]:
class MLP(L.LightningModule):
 def __init__(self, sizes, residual_conections, dropout):
 # sizes is a list of the sizes of the layers ej: [4800, 1000, 1000, 1000, 1000, 1000, 1000, 600]
 # residual_conections is a list with the same length as sizes, each element is a list of the indexes of the layers that will recieve the output of the layer as input, 0 means that the layer will recieve the x inputs ej. [[0], [1], [2,1], [3], [4,3], [5], [6,5], [7]]
 # dropout is a list with the same length as sizes, each element is the dropout probability of the layer ej. [0.5, 0.5, 0.5, 0.5, 0.5, 0.5]
 super().__init__()
 self.sizes = sizes
 self.residual_conections = residual_conections
 self.dropout = dropout
 self.layers = nn.Sequential()
 for i in range(len(sizes)-1):
 self.layers.add_module('linear'+str(i), nn.Linear(sizes[i], sizes[i+1]))
 self.layers.add_module('relu'+str(i), nn.ReLU())
 self.layers.add_module('dropout'+str(i), nn.Dropout(dropout[i]))

 self.loss = nn.CrossEntropyLoss()

 def forward(self, x):
 return self.layers(x)
 
 def training_step(self, batch, batch_idx):
 voxels, embeddings = batch # the sizes are [batch_size, 10, 65000] and [batch_size, 2, 1125]
 # flatten the voxels to [batch_size, rest of the dimensions]
 embeddings = embeddings.flatten(start_dim=1).long() # the size is [batch_size, 2250] 
 #take just the first 200 embeddings
 embeddings = embeddings[:, :200]
 # take the mean of the second dimension of the voxels to get the mean of the 10 samples per stimulus
 voxels = voxels.mean(dim=1)
 voxels = voxels.flatten(start_dim=1) # the size is [batch_size, 65000]
 outputs = self(voxels)
 # the outputs are [batch_size, 200*1024], we need to reshape them to [batch_size, 200, 1024]
 outputs = outputs.reshape(-1, 1024, 200)
 loss = self.loss(outputs, embeddings)
 acuracy = self.tokens_accuracy(outputs, embeddings)
 self.log('train_loss', loss)
 self.log('train_accuracy', acuracy)
 return loss
 
 def tokens_accuracy(self, outputs, embeddings):
 # outputs is [batch_size, 1024, 200]
 # embeddings is [batch_size, 200]
 # we need to get the index of the maximum value of each token
 outputs = outputs.argmax(dim=1)
 # now we need to compare the outputs with the embeddings
 return (outputs == embeddings).float().mean()

 
 def validation_step(self, batch, batch_idx):
 voxels, embeddings = batch
 embeddings = embeddings.flatten(start_dim=1).long()
 embeddings = embeddings[:, :200]
 voxels = voxels.mean(dim=1)
 voxels = voxels.flatten(start_dim=1)
 outputs = self(voxels)
 outputs = outputs.reshape(-1, 1024, 200)
 loss = self.loss(outputs, embeddings)
 accuracy = self.tokens_accuracy(outputs, embeddings)
 self.log('val_loss', loss)
 self.log('val_accuracy', accuracy)
 return loss
 
 
 def configure_optimizers(self):
 return torch.optim.Adam(self.parameters(), lr=1e-5)
 

# create the model
sizes = [60784, 1000, 1000, 200*1024]
residual_conections = [[0], [1], [2], [3]]
dropout = [0.5, 0.5, 0.5, 0.5]
model = MLP(sizes, residual_conections, dropout)

# create the data module
data_module = VoxelsEmbeddinsEncodecDataModule(train_voxels_path, train_embeddings_path, test_voxels_path, test_embeddings_path, batch_size=2)

wandb.finish()

wandb_logger = WandbLogger(project='brain2music', entity='ckadirt')

# define the trainer
trainer = L.Trainer(devices=2, accelerator="gpu", max_epochs=400, logger=wandb_logger, precision='16-mixed', log_every_n_steps=10)

# train the model
trainer.fit(model, datamodule=data_module)


In [None]:
class MLP(L.LightningModule):
 def __init__(self, sizes, residual_conections, dropout):
 # sizes is a list of the sizes of the layers ej: [4800, 1000, 1000, 1000, 1000, 1000, 1000, 600]
 # residual_conections is a list with the same length as sizes, each element is a list of the indexes of the layers that will recieve the output of the layer as input, 0 means that the layer will recieve the x inputs ej. [[0], [1], [2,1], [3], [4,3], [5], [6,5], [7]]
 # dropout is a list with the same length as sizes, each element is the dropout probability of the layer ej. [0.5, 0.5, 0.5, 0.5, 0.5, 0.5]
 super().__init__()
 self.sizes = sizes
 self.residual_conections = residual_conections
 self.dropout = dropout
 self.layers = nn.Sequential()
 for i in range(len(sizes)-1):
 self.layers.add_module('linear'+str(i), nn.Linear(sizes[i], sizes[i+1]))
 self.layers.add_module('relu'+str(i), nn.ReLU())
 self.layers.add_module('dropout'+str(i), nn.Dropout(dropout[i]))

 self.loss = nn.CrossEntropyLoss()

 def forward(self, x):
 return self.layers(x)
 
 def training_step(self, batch, batch_idx):
 voxels, embeddings = batch # the sizes are [batch_size, 10, 65000] and [batch_size, 2, 1125]
 # flatten the voxels to [batch_size, rest of the dimensions]
 embeddings = embeddings.flatten(start_dim=1).long() # the size is [batch_size, 2250] 
 #take just the first 200 embeddings
 embeddings = embeddings[:, :200]
 # take the mean of the second dimension of the voxels to get the mean of the 10 samples per stimulus
 voxels = voxels[:, 1, :]
 voxels = voxels.flatten(start_dim=1) # the size is [batch_size, 65000]
 outputs = self(voxels)
 # the outputs are [batch_size, 200*1024], we need to reshape them to [batch_size, 200, 1024]
 outputs = outputs.reshape(-1, 1024, 200)
 loss = self.loss(outputs, embeddings)
 acuracy = self.tokens_accuracy(outputs, embeddings)
 self.log('train_loss', loss)
 self.log('train_accuracy', acuracy)
 return loss
 
 def tokens_accuracy(self, outputs, embeddings):
 # outputs is [batch_size, 1024, 200]
 # embeddings is [batch_size, 200]
 # we need to get the index of the maximum value of each token
 outputs = outputs.argmax(dim=1)
 # now we need to compare the outputs with the embeddings
 return (outputs == embeddings).float().mean()

 
 def validation_step(self, batch, batch_idx):
 voxels, embeddings = batch
 embeddings = embeddings.flatten(start_dim=1).long()
 embeddings = embeddings[:, :200]
 voxels = voxels[:, 1, :]
 voxels = voxels.flatten(start_dim=1)
 outputs = self(voxels)
 outputs = outputs.reshape(-1, 1024, 200)
 loss = self.loss(outputs, embeddings)
 accuracy = self.tokens_accuracy(outputs, embeddings)
 self.log('val_loss', loss)
 self.log('val_accuracy', accuracy)
 return loss
 
 
 def configure_optimizers(self):
 return torch.optim.Adam(self.parameters(), lr=1e-6)
 

# create the model
sizes = [60784, 1000, 1000, 200*1024]
residual_conections = [[0], [1], [2], [3]]
dropout = [0.2, 0.2, 0.2, 0.2]
model = MLP(sizes, residual_conections, dropout)

# create the data module
data_module = VoxelsEmbeddinsEncodecDataModule(train_voxels_path, train_embeddings_path, test_voxels_path, test_embeddings_path, batch_size=4)

wandb.finish()

wandb_logger = WandbLogger(project='brain2music', entity='ckadirt')

# define the trainer
trainer = L.Trainer(devices=2, accelerator="gpu", max_epochs=400, logger=wandb_logger, precision='16-mixed', log_every_n_steps=10)

# train the model
trainer.fit(model, datamodule=data_module)


In [None]:
model3.eval()
outputs = torch.Tensor((480,200))
with torch.no_grad():
 test_dataset = VoxelsDataset(test_voxels_path, test_embeddings_path)
 dataloader = data.DataLoader(test_dataset, batch_size = 2)
 for i, (voxels, embeddings) in enumerate(dataloader):
 voxels = voxels[:, 1, :]
 voxels = voxels.flatten(start_dim=1) # the size is [batch_size, 65000]
 bout = model3(voxels)
 bout = bout.reshape(-1, 1024, 200)
 # the 1024 dimension is the number of tokens, we need to get the index of the maximum value of each token
 bout = bout.argmax(dim=1)
 # now we need to add the outputs to the outputs tensor
 outputs[i*2:(i+1)*2] = bout
 
 
# save the predicted outputs on the current directory
torch.save(outputs, 'outputs.pt')