import lightning as L import omegaconf import torch from lightning.pytorch.loggers import WandbLogger import wandb from torch.utils.data import DataLoader from lightning.pytorch.callbacks import TQDMProgressBar from lightning.pytorch.callbacks.early_stopping import EarlyStopping from config.config import Config from models.engine import Engine from preprocessing.fi_2010 import fi_2010_load from preprocessing.lobster import lobster_load from preprocessing.dataset import Dataset, DataModule import constants as cst def run(config: Config, accelerator, model=None): run_name = "" for param in config.model.keys(): value = config.model[param] if param == "hyperparameters_sweep": continue if type(value) == omegaconf.dictconfig.DictConfig: for key in value.keys(): run_name += str(key[:2]) + "_" + str(value[key]) + "_" else: run_name += str(param[:2]) + "_" + str(value.value) + "_" run_name += f"seed_{config.experiment.seed}" seq_size = config.model.hyperparameters_fixed["seq_size"] horizon = config.experiment.horizon training_stocks = config.experiment.training_stocks dataset = config.experiment.dataset_type.value if dataset == "LOBSTER": config.experiment.filename_ckpt = f"{dataset}_{training_stocks}_seq_size_{seq_size}_horizon_{horizon}_{run_name}" else: config.experiment.filename_ckpt = f"{dataset}_seq_size_{seq_size}_horizon_{horizon}_{run_name}" run_name = config.experiment.filename_ckpt trainer = L.Trainer( accelerator=accelerator, precision=cst.PRECISION, max_epochs=config.experiment.max_epochs, callbacks=[ EarlyStopping(monitor="val_loss", mode="min", patience=2, verbose=True, min_delta=0.002), TQDMProgressBar(refresh_rate=100) ], num_sanity_val_steps=0, detect_anomaly=False, profiler=None, check_val_every_n_epoch=1 ) train(config, trainer) def train(config: Config, trainer: L.Trainer, run=None): print_setup(config) dataset_type = config.experiment.dataset_type.value seq_size = config.model.hyperparameters_fixed["seq_size"] horizon = config.experiment.horizon model_type = config.model.type training_stocks = config.experiment.training_stocks testing_stocks = config.experiment.testing_stocks dataset_type = config.experiment.dataset_type.value if dataset_type == "FI-2010": path = cst.DATA_DIR + "/FI_2010" train_input, train_labels, val_input, val_labels, test_input, test_labels = fi_2010_load(path, seq_size, horizon, config.model.hyperparameters_fixed["all_features"]) data_module = DataModule( train_set=Dataset(train_input, train_labels, seq_size), val_set=Dataset(val_input, val_labels, seq_size), test_set=Dataset(test_input, test_labels, seq_size), batch_size=config.experiment.batch_size, test_batch_size=config.experiment.batch_size*4, num_workers=4 ) test_loaders = [data_module.test_dataloader()] else: for i in range(len(training_stocks)): if i == 0: for j in range(2): if j == 0: path = cst.DATA_DIR + "/" + training_stocks[i] + "/train.npy" train_input, train_labels = lobster_load(path, config.model.hyperparameters_fixed["all_features"], cst.LEN_SMOOTH, horizon, seq_size) if j == 1: path = cst.DATA_DIR + "/" + training_stocks[i] + "/val.npy" val_input, val_labels = lobster_load(path, config.model.hyperparameters_fixed["all_features"], cst.LEN_SMOOTH, horizon, seq_size) else: for j in range(2): if j == 0: path = cst.DATA_DIR + "/" + training_stocks[i] + "/train.npy" train_labels = torch.cat((train_labels, torch.zeros(seq_size+horizon-1, dtype=torch.long)), 0) train_input_tmp, train_labels_tmp = lobster_load(path, config.model.hyperparameters_fixed["all_features"], cst.LEN_SMOOTH, horizon, seq_size) train_input = torch.cat((train_input, train_input_tmp), 0) train_labels = torch.cat((train_labels, train_labels_tmp), 0) if j == 1: path = cst.DATA_DIR + "/" + training_stocks[i] + "/val.npy" val_labels = torch.cat((val_labels, torch.zeros(seq_size+horizon-1, dtype=torch.long)), 0) val_input_tmp, val_labels_tmp = lobster_load(path, config.model.hyperparameters_fixed["all_features"], cst.LEN_SMOOTH, horizon, seq_size) val_input = torch.cat((val_input, val_input_tmp), 0) val_labels = torch.cat((val_labels, val_labels_tmp), 0) test_loaders = [] for i in range(len(testing_stocks)): path = cst.DATA_DIR + "/" + testing_stocks[i] + "/test.npy" test_input, test_labels = lobster_load(path, config.model.hyperparameters_fixed["all_features"], cst.LEN_SMOOTH, horizon, seq_size) test_set = Dataset(test_input, test_labels, seq_size) test_dataloader = DataLoader( dataset=test_set, batch_size=config.experiment.batch_size*4, shuffle=False, pin_memory=True, drop_last=False, num_workers=4, persistent_workers=True ) test_loaders.append(test_dataloader) train_set = Dataset(train_input, train_labels, seq_size) val_set = Dataset(val_input, val_labels, seq_size) counts_train = torch.unique(train_labels, return_counts=True) counts_val = torch.unique(val_labels, return_counts=True) print("Train set shape: ", train_input.shape) print("Val set shape: ", val_input.shape) print("Classes counts in train set: ", counts_train[1]) print("Classes counts in val set: ", counts_val[1]) print(f"Classes distribution in train set: up {counts_train[1][0]/train_labels.shape[0]} stat {counts_train[1][1]/train_labels.shape[0]} down {counts_train[1][2]/train_labels.shape[0]} ", ) print(f"Classes distribution in val set: up {counts_val[1][0]/val_labels.shape[0]} stat {counts_val[1][1]/val_labels.shape[0]} down {counts_val[1][2]/val_labels.shape[0]} ", ) data_module = DataModule( train_set=train_set, val_set=val_set, batch_size=config.experiment.batch_size, test_batch_size=config.experiment.batch_size*4, num_workers=4 ) experiment_type = config.experiment.type if "FINETUNING" in experiment_type or "EVALUATION" in experiment_type: checkpoint = torch.load(config.experiment.checkpoint_reference, map_location=cst.DEVICE) print("Loading model from checkpoint: ", config.experiment.checkpoint_reference) lr = checkpoint["hyper_parameters"]["lr"] filename_ckpt = checkpoint["hyper_parameters"]["filename_ckpt"] hidden_dim = checkpoint["hyper_parameters"]["hidden_dim"] num_layers = checkpoint["hyper_parameters"]["num_layers"] optimizer = checkpoint["hyper_parameters"]["optimizer"] model_type = checkpoint["hyper_parameters"]["model_type"]#.value max_epochs = checkpoint["hyper_parameters"]["max_epochs"] horizon = checkpoint["hyper_parameters"]["horizon"] seq_size = checkpoint["hyper_parameters"]["seq_size"] if model_type == "MLPLOB": model = Engine.load_from_checkpoint( config.experiment.checkpoint_reference, seq_size=seq_size, horizon=horizon, max_epochs=max_epochs, model_type=model_type, is_wandb=config.experiment.is_wandb, experiment_type=experiment_type, lr=lr, optimizer=optimizer, filename_ckpt=filename_ckpt, hidden_dim=hidden_dim, num_layers=num_layers, num_features=train_input.shape[1], dataset_type=dataset_type, map_location=cst.DEVICE, ) elif model_type == "TLOB": model = Engine.load_from_checkpoint( config.experiment.checkpoint_reference, seq_size=seq_size, horizon=horizon, max_epochs=max_epochs, model_type=model_type, is_wandb=config.experiment.is_wandb, experiment_type=experiment_type, lr=lr, optimizer=optimizer, filename_ckpt=filename_ckpt, hidden_dim=hidden_dim, num_layers=num_layers, num_features=train_input.shape[1], dataset_type=dataset_type, num_heads=checkpoint["hyper_parameters"]["num_heads"], is_sin_emb=checkpoint["hyper_parameters"]["is_sin_emb"], map_location=cst.DEVICE, len_test_dataloader=len(test_loaders[0]) ) elif model_type == "BINCTABL": model = Engine.load_from_checkpoint( config.experiment.checkpoint_reference, seq_size=seq_size, horizon=horizon, max_epochs=max_epochs, model_type=model_type, is_wandb=config.experiment.is_wandb, experiment_type=experiment_type, lr=lr, optimizer=optimizer, filename_ckpt=filename_ckpt, num_features=train_input.shape[1], dataset_type=dataset_type, map_location=cst.DEVICE, len_test_dataloader=len(test_loaders[0]) ) elif model_type == "DEEPLOB": model = Engine.load_from_checkpoint( config.experiment.checkpoint_reference, seq_size=seq_size, horizon=horizon, max_epochs=max_epochs, model_type=model_type, is_wandb=config.experiment.is_wandb, experiment_type=experiment_type, lr=lr, optimizer=optimizer, filename_ckpt=filename_ckpt, num_features=train_input.shape[1], dataset_type=dataset_type, map_location=cst.DEVICE, len_test_dataloader=len(test_loaders[0]) ) else: if model_type == cst.ModelType.MLPLOB: model = Engine( seq_size=seq_size, horizon=horizon, max_epochs=config.experiment.max_epochs, model_type=config.model.type.value, is_wandb=config.experiment.is_wandb, experiment_type=experiment_type, lr=config.model.hyperparameters_fixed["lr"], optimizer=config.experiment.optimizer, filename_ckpt=config.experiment.filename_ckpt, hidden_dim=config.model.hyperparameters_fixed["hidden_dim"], num_layers=config.model.hyperparameters_fixed["num_layers"], num_features=train_input.shape[1], dataset_type=dataset_type, len_test_dataloader=len(test_loaders[0]) ) elif model_type == cst.ModelType.TLOB: model = Engine( seq_size=seq_size, horizon=horizon, max_epochs=config.experiment.max_epochs, model_type=config.model.type.value, is_wandb=config.experiment.is_wandb, experiment_type=experiment_type, lr=config.model.hyperparameters_fixed["lr"], optimizer=config.experiment.optimizer, filename_ckpt=config.experiment.filename_ckpt, hidden_dim=config.model.hyperparameters_fixed["hidden_dim"], num_layers=config.model.hyperparameters_fixed["num_layers"], num_features=train_input.shape[1], dataset_type=dataset_type, num_heads=config.model.hyperparameters_fixed["num_heads"], is_sin_emb=config.model.hyperparameters_fixed["is_sin_emb"], len_test_dataloader=len(test_loaders[0]) ) elif model_type == cst.ModelType.BINCTABL: model = Engine( seq_size=seq_size, horizon=horizon, max_epochs=config.experiment.max_epochs, model_type=config.model.type.value, is_wandb=config.experiment.is_wandb, experiment_type=experiment_type, lr=config.model.hyperparameters_fixed["lr"], optimizer=config.experiment.optimizer, filename_ckpt=config.experiment.filename_ckpt, num_features=train_input.shape[1], dataset_type=dataset_type, len_test_dataloader=len(test_loaders[0]) ) elif model_type == cst.ModelType.DEEPLOB: model = Engine( seq_size=seq_size, horizon=horizon, max_epochs=config.experiment.max_epochs, model_type=config.model.type.value, is_wandb=config.experiment.is_wandb, experiment_type=experiment_type, lr=config.model.hyperparameters_fixed["lr"], optimizer=config.experiment.optimizer, filename_ckpt=config.experiment.filename_ckpt, num_features=train_input.shape[1], dataset_type=dataset_type, len_test_dataloader=len(test_loaders[0]) ) print("total number of parameters: ", sum(p.numel() for p in model.parameters())) train_dataloader, val_dataloader = data_module.train_dataloader(), data_module.val_dataloader() if "TRAINING" in experiment_type or "FINETUNING" in experiment_type: trainer.fit(model, train_dataloader, val_dataloader) best_model_path = model.last_path_ckpt print("Best model path: ", best_model_path) try: best_model = Engine.load_from_checkpoint(best_model_path, map_location=cst.DEVICE) except: print("no checkpoints has been saved, selecting the last model") best_model = model best_model.experiment_type = "EVALUATION" for i in range(len(test_loaders)): test_dataloader = test_loaders[i] output = trainer.test(best_model, test_dataloader) if run is not None and dataset_type == "LOBSTER": run.log({f"f1 {testing_stocks[i]} best": output[0]["f1_score"]}, commit=False) elif run is not None and dataset_type == "FI-2010": run.log({f"f1 FI-2010 ": output[0]["f1_score"]}, commit=False) else: for i in range(len(test_loaders)): test_dataloader = test_loaders[i] output = trainer.test(model, test_dataloader) if run is not None and dataset_type == "LOBSTER": run.log({f"f1 {testing_stocks[i]} best": output[0]["f1_score"]}, commit=False) elif run is not None and dataset_type == "FI-2010": run.log({f"f1 FI-2010 ": output[0]["f1_score"]}, commit=False) def run_wandb(config: Config, accelerator): def wandb_sweep_callback(): wandb_logger = WandbLogger(project=cst.PROJECT_NAME, log_model=False, save_dir=cst.DIR_SAVED_MODEL) run_name = None if not config.experiment.is_sweep: run_name = "" for param in config.model.keys(): value = config.model[param] if param == "hyperparameters_sweep": continue if type(value) == omegaconf.dictconfig.DictConfig: for key in value.keys(): run_name += str(key[:2]) + "_" + str(value[key]) + "_" else: run_name += str(param[:2]) + "_" + str(value.value) + "_" run = wandb.init(project=cst.PROJECT_NAME, name=run_name, entity="") # set entity to your wandb username if config.experiment.is_sweep: model_params = run.config else: model_params = config.model.hyperparameters_fixed wandb_instance_name = "" for param in config.model.hyperparameters_fixed.keys(): if param in model_params: config.model.hyperparameters_fixed[param] = model_params[param] wandb_instance_name += str(param) + "_" + str(model_params[param]) + "_" #wandb_instance_name += f"seed_{cst.SEED}" run.name = wandb_instance_name seq_size = config.model.hyperparameters_fixed["seq_size"] horizon = config.experiment.horizon dataset = config.experiment.dataset_type.value training_stocks = config.experiment.training_stocks if dataset == "LOBSTER": config.experiment.filename_ckpt = f"{dataset}_{training_stocks}_seq_size_{seq_size}_horizon_{horizon}_{run_name}" else: config.experiment.filename_ckpt = f"{dataset}_seq_size_{seq_size}_horizon_{horizon}_{run_name}" wandb_instance_name = config.experiment.filename_ckpt trainer = L.Trainer( accelerator=accelerator, precision=cst.PRECISION, max_epochs=config.experiment.max_epochs, callbacks=[ EarlyStopping(monitor="val_loss", mode="min", patience=2, verbose=True, min_delta=0.002), TQDMProgressBar(refresh_rate=1000) ], num_sanity_val_steps=0, logger=wandb_logger, detect_anomaly=False, check_val_every_n_epoch=1, ) # log simulation details in WANDB console run.log({"model": config.model.type.value}, commit=False) run.log({"dataset": config.experiment.dataset_type.value}, commit=False) run.log({"seed": config.experiment.seed}, commit=False) run.log({"all_features": config.model.hyperparameters_fixed["all_features"]}, commit=False) if config.experiment.dataset_type == cst.Dataset.LOBSTER: for i in range(len(config.experiment.training_stocks)): run.log({f"training stock{i}": config.experiment.training_stocks[i]}, commit=False) for i in range(len(config.experiment.testing_stocks)): run.log({f"testing stock{i}": config.experiment.testing_stocks[i]}, commit=False) run.log({"sampling_type": config.experiment.sampling_type}, commit=False) if config.experiment.sampling_type == "time": run.log({"sampling_time": config.experiment.sampling_time}, commit=False) else: run.log({"sampling_quantity": config.experiment.sampling_quantity}, commit=False) train(config, trainer, run) run.finish() return wandb_sweep_callback def sweep_init(config: Config): # put your wandb key here wandb.login() parameters = {} for key in config.model.hyperparameters_sweep.keys(): parameters[key] = {'values': list(config.model.hyperparameters_sweep[key])} sweep_config = { 'method': 'grid', 'metric': { 'goal': 'minimize', 'name': 'val_loss' }, 'early_terminate': { 'type': 'hyperband', 'min_iter': 3, 'eta': 1.5 }, 'run_cap': 100, 'parameters': {**parameters} } return sweep_config def print_setup(config: Config): print("Model type: ", config.model.type) print("Dataset: ", config.experiment.dataset_type) print("Seed: ", config.experiment.seed) print("Sequence size: ", config.model.hyperparameters_fixed["seq_size"]) print("Horizon: ", config.experiment.horizon) print("All features: ", config.model.hyperparameters_fixed["all_features"]) print("Is data preprocessed: ", config.experiment.is_data_preprocessed) print("Is wandb: ", config.experiment.is_wandb) print("Is sweep: ", config.experiment.is_sweep) print(config.experiment.type) print("Is debug: ", config.experiment.is_debug) if config.experiment.dataset_type == cst.Dataset.LOBSTER: print("Training stocks: ", config.experiment.training_stocks) print("Testing stocks: ", config.experiment.testing_stocks)