LeonardoBerti's picture
Upload 51 files
69524d0 verified
import os
from utils.utils_data import z_score_orderbook, normalize_messages, preprocess_data, one_hot_encoding_type
import pandas as pd
import numpy as np
import torch
import constants as cst
from torch.utils import data
def lobster_load(path, all_features, len_smooth, h, seq_size):
set = np.load(path)
if h == 10:
tmp = 5
if h == 20:
tmp = 4
elif h == 50:
tmp = 3
elif h == 100:
tmp = 2
elif h == 200:
tmp = 1
labels = set[seq_size-len_smooth:, -tmp]
labels = labels[np.isfinite(labels)]
labels = torch.from_numpy(labels).long()
if all_features:
input = set[:, cst.LEN_ORDER:cst.LEN_ORDER + 40]
orders = set[:, :cst.LEN_ORDER]
input = torch.from_numpy(input).float()
orders = torch.from_numpy(orders).float()
input = torch.cat((input, orders), dim=1)
else:
input = set[:, cst.LEN_ORDER:cst.LEN_ORDER + 40]
input = torch.from_numpy(input).float()
return input, labels
def labeling(X, len, h, stock):
# X is the orderbook
# len is the time window smoothing length
# h is the prediction horizon
[N, D] = X.shape
if h < len:
len = h
# Calculate previous and future mid-prices for all relevant indices
previous_ask_prices = np.lib.stride_tricks.sliding_window_view(X[:, 0], window_shape=len)[:-h]
previous_bid_prices = np.lib.stride_tricks.sliding_window_view(X[:, 2], window_shape=len)[:-h]
future_ask_prices = np.lib.stride_tricks.sliding_window_view(X[:, 0], window_shape=len)[h:]
future_bid_prices = np.lib.stride_tricks.sliding_window_view(X[:, 2], window_shape=len)[h:]
previous_mid_prices = (previous_ask_prices + previous_bid_prices) / 2
future_mid_prices = (future_ask_prices + future_bid_prices) / 2
previous_mid_prices = np.mean(previous_mid_prices, axis=1)
future_mid_prices = np.mean(future_mid_prices, axis=1)
# Compute percentage change
percentage_change = (future_mid_prices - previous_mid_prices) / previous_mid_prices
# alpha is the average percentage change of the stock
alpha = np.abs(percentage_change).mean() / 2
# alpha is the average spread of the stock in percentage of the mid-price
#alpha = (X[:, 0] - X[:, 2]).mean() / ((X[:, 0] + X[:, 2]) / 2).mean()
print(f"Alpha: {alpha}")
labels = np.where(percentage_change < -alpha, 2, np.where(percentage_change > alpha, 0, 1))
print(f"Number of labels: {np.unique(labels, return_counts=True)}")
print(f"Percentage of labels: {np.unique(labels, return_counts=True)[1] / labels.shape[0]}")
return labels
class LOBSTERDataBuilder:
def __init__(
self,
stocks,
data_dir,
date_trading_days,
split_rates,
sampling_type,
sampling_time,
sampling_quantity,
):
self.n_lob_levels = cst.N_LOB_LEVELS
self.data_dir = data_dir
self.date_trading_days = date_trading_days
self.stocks = stocks
self.split_rates = split_rates
self.sampling_type = sampling_type
self.sampling_time = sampling_time
self.sampling_quantity = sampling_quantity
def prepare_save_datasets(self):
for i in range(len(self.stocks)):
stock = self.stocks[i]
path = "{}/{}/{}_{}_{}".format(
self.data_dir,
stock,
stock,
self.date_trading_days[0],
self.date_trading_days[1],
)
self.dataframes = []
self._prepare_dataframes(path, stock)
path_where_to_save = "{}/{}".format(
self.data_dir,
stock,
)
self.train_input = pd.concat(self.dataframes[0], axis=1).values
self.val_input = pd.concat(self.dataframes[1], axis=1).values
self.test_input = pd.concat(self.dataframes[2], axis=1).values
self.train_set = pd.concat([pd.DataFrame(self.train_input), pd.DataFrame(self.train_labels_horizons)], axis=1).values
self.val_set = pd.concat([pd.DataFrame(self.val_input), pd.DataFrame(self.val_labels_horizons)], axis=1).values
self.test_set = pd.concat([pd.DataFrame(self.test_input), pd.DataFrame(self.test_labels_horizons)], axis=1).values
self._save(path_where_to_save)
def _prepare_dataframes(self, path, stock):
COLUMNS_NAMES = {"orderbook": ["sell1", "vsell1", "buy1", "vbuy1",
"sell2", "vsell2", "buy2", "vbuy2",
"sell3", "vsell3", "buy3", "vbuy3",
"sell4", "vsell4", "buy4", "vbuy4",
"sell5", "vsell5", "buy5", "vbuy5",
"sell6", "vsell6", "buy6", "vbuy6",
"sell7", "vsell7", "buy7", "vbuy7",
"sell8", "vsell8", "buy8", "vbuy8",
"sell9", "vsell9", "buy9", "vbuy9",
"sell10", "vsell10", "buy10", "vbuy10"],
"message": ["time", "event_type", "order_id", "size", "price", "direction"]}
self.num_trading_days = len(os.listdir(path))//2
split_days = self._split_days()
split_days = [i * 2 for i in split_days]
self._create_dataframes_splitted(path, split_days, COLUMNS_NAMES)
# divide all the price, both of lob and messages, by 10000, to have dollars as unit
for i in range(len(self.dataframes)):
self.dataframes[i][0]["price"] = self.dataframes[i][0]["price"] / 10000
self.dataframes[i][1].loc[:, ::2] /= 10000
train_input = self.dataframes[0][1].values
val_input = self.dataframes[1][1].values
test_input = self.dataframes[2][1].values
#create a dataframe for the labels
for i in range(len(cst.LOBSTER_HORIZONS)):
if i == 0:
train_labels = labeling(train_input, cst.LEN_SMOOTH, cst.LOBSTER_HORIZONS[i], stock)
val_labels = labeling(val_input, cst.LEN_SMOOTH, cst.LOBSTER_HORIZONS[i], stock)
test_labels = labeling(test_input, cst.LEN_SMOOTH, cst.LOBSTER_HORIZONS[i], stock)
train_labels = np.concatenate([train_labels, np.full(shape=(train_input.shape[0] - train_labels.shape[0]), fill_value=np.inf)])
val_labels = np.concatenate([val_labels, np.full(shape=(val_input.shape[0] - val_labels.shape[0]), fill_value=np.inf)])
test_labels = np.concatenate([test_labels, np.full(shape=(test_input.shape[0] - test_labels.shape[0]), fill_value=np.inf)])
self.train_labels_horizons = pd.DataFrame(train_labels, columns=["label_h{}".format(cst.LOBSTER_HORIZONS[i])])
self.val_labels_horizons = pd.DataFrame(val_labels, columns=["label_h{}".format(cst.LOBSTER_HORIZONS[i])])
self.test_labels_horizons = pd.DataFrame(test_labels, columns=["label_h{}".format(cst.LOBSTER_HORIZONS[i])])
else:
train_labels = labeling(train_input, cst.LEN_SMOOTH, cst.LOBSTER_HORIZONS[i], stock)
val_labels = labeling(val_input, cst.LEN_SMOOTH, cst.LOBSTER_HORIZONS[i], stock)
test_labels = labeling(test_input, cst.LEN_SMOOTH, cst.LOBSTER_HORIZONS[i], stock)
train_labels = np.concatenate([train_labels, np.full(shape=(train_input.shape[0] - train_labels.shape[0]), fill_value=np.inf)])
val_labels = np.concatenate([val_labels, np.full(shape=(val_input.shape[0] - val_labels.shape[0]), fill_value=np.inf)])
test_labels = np.concatenate([test_labels, np.full(shape=(test_input.shape[0] - test_labels.shape[0]), fill_value=np.inf)])
self.train_labels_horizons["label_h{}".format(cst.LOBSTER_HORIZONS[i])] = train_labels
self.val_labels_horizons["label_h{}".format(cst.LOBSTER_HORIZONS[i])] = val_labels
self.test_labels_horizons["label_h{}".format(cst.LOBSTER_HORIZONS[i])] = test_labels
#self._sparse_representation()
# to conclude the preprocessing we normalize the dataframes
self._normalize_dataframes()
def _sparse_representation(self):
tick_size = 0.01
for i in range(len(self.dataframes)):
dense_repr = self.dataframes[i][1].values
sparse_repr = np.zeros((dense_repr.shape[0], dense_repr.shape[1] + 1))
for row in range(dense_repr.shape[0]):
sparse_pos_ask = 0
sparse_pos_bid = 0
mid_price = (dense_repr[row][0] + dense_repr[row][2]) / 2
sparse_repr[row][-1] = mid_price
for col in range(0, dense_repr.shape[1], 2):
if col == 0:
start_ask = dense_repr[row][col]
elif col == 2:
start_bid = dense_repr[row][col]
elif col % 4 == 0:
if sparse_pos_ask < (sparse_repr.shape[1]) - 1 / 2:
actual_ask = dense_repr[row][col]
for level in range(0, actual_ask-start_ask, -tick_size):
if sparse_pos_ask < (sparse_repr.shape[1]) - 1 / 2:
if level == actual_ask - start_ask - tick_size:
sparse_repr[row][sparse_pos_ask] = dense_repr[row][col+1]
else:
sparse_repr[row][sparse_pos_ask] = 0
sparse_pos_ask += 1
else:
break
start_ask = actual_ask
else:
continue
elif col % 4 == 2:
if sparse_pos_bid < (sparse_repr.shape[1]) - 1 / 2:
actual_bid = dense_repr[row][col]
for level in range(0, start_bid-actual_bid, -tick_size):
if sparse_pos_bid < (sparse_repr.shape[1]) - 1 / 2:
if level == start_bid - actual_bid - tick_size:
sparse_repr[row][sparse_pos_ask] = dense_repr[row][col+1]
else:
sparse_repr[row][sparse_pos_ask] = 0
sparse_pos_bid += 1
else:
break
start_bid = actual_bid
else:
continue
def _create_dataframes_splitted(self, path, split_days, COLUMNS_NAMES):
# iterate over files in the data directory of self.STOCK_NAME
total_shape = 0
for i, filename in enumerate(sorted(os.listdir(path))):
f = os.path.join(path, filename)
print(f)
if os.path.isfile(f):
# then we create the df for the training set
if i < split_days[0]:
if (i % 2) == 0:
if i == 0:
train_messages = pd.read_csv(f, names=COLUMNS_NAMES["message"])
else:
train_message = pd.read_csv(f, names=COLUMNS_NAMES["message"])
else:
if i == 1:
train_orderbooks = pd.read_csv(f, names=COLUMNS_NAMES["orderbook"])
total_shape += train_orderbooks.shape[0]
train_orderbooks, train_messages = preprocess_data([train_messages, train_orderbooks], self.n_lob_levels, self.sampling_type, self.sampling_time, self.sampling_quantity)
if (len(train_orderbooks) != len(train_messages)):
raise ValueError("train_orderbook length is different than train_messages")
else:
train_orderbook = pd.read_csv(f, names=COLUMNS_NAMES["orderbook"])
total_shape += train_orderbook.shape[0]
train_orderbook, train_message = preprocess_data([train_message, train_orderbook], self.n_lob_levels, self.sampling_type, self.sampling_time, self.sampling_quantity)
train_messages = pd.concat([train_messages, train_message], axis=0)
train_orderbooks = pd.concat([train_orderbooks, train_orderbook], axis=0)
elif split_days[0] <= i < split_days[1]: # then we are creating the df for the validation set
if (i % 2) == 0:
if (i == split_days[0]):
self.dataframes.append([train_messages, train_orderbooks])
val_messages = pd.read_csv(f, names=COLUMNS_NAMES["message"])
else:
val_message = pd.read_csv(f, names=COLUMNS_NAMES["message"])
else:
if i == split_days[0] + 1:
val_orderbooks = pd.read_csv(f, names=COLUMNS_NAMES["orderbook"])
total_shape += val_orderbooks.shape[0]
val_orderbooks, val_messages = preprocess_data([val_messages, val_orderbooks], self.n_lob_levels, self.sampling_type, self.sampling_time, self.sampling_quantity)
if (len(val_orderbooks) != len(val_messages)):
raise ValueError("val_orderbook length is different than val_messages")
else:
val_orderbook = pd.read_csv(f, names=COLUMNS_NAMES["orderbook"])
total_shape += val_orderbook.shape[0]
val_orderbook, val_message = preprocess_data([val_message, val_orderbook], self.n_lob_levels, self.sampling_type, self.sampling_time, self.sampling_quantity)
val_messages = pd.concat([val_messages, val_message], axis=0)
val_orderbooks = pd.concat([val_orderbooks, val_orderbook], axis=0)
else: # then we are creating the df for the test set
if (i % 2) == 0:
if (i == split_days[1]):
self.dataframes.append([val_messages, val_orderbooks])
test_messages = pd.read_csv(f, names=COLUMNS_NAMES["message"])
else:
test_message = pd.read_csv(f, names=COLUMNS_NAMES["message"])
else:
if i == split_days[1] + 1:
test_orderbooks = pd.read_csv(f, names=COLUMNS_NAMES["orderbook"])
test_orderbooks, test_messages = preprocess_data([test_messages, test_orderbooks], self.n_lob_levels, self.sampling_type, self.sampling_time, self.sampling_quantity)
if (len(test_orderbooks) != len(test_messages)):
raise ValueError("test_orderbook length is different than test_messages")
else:
test_orderbook = pd.read_csv(f, names=COLUMNS_NAMES["orderbook"])
test_orderbook, test_message = preprocess_data([test_message, test_orderbook], self.n_lob_levels, self.sampling_type, self.sampling_time, self.sampling_quantity)
test_messages = pd.concat([test_messages, test_message], axis=0)
test_orderbooks = pd.concat([test_orderbooks, test_orderbook], axis=0)
else:
raise ValueError("File {} is not a file".format(f))
self.dataframes.append([test_messages, test_orderbooks])
print(f"Total shape of the orderbooks is {total_shape}")
def _normalize_dataframes(self):
#apply z score to orderbooks
for i in range(len(self.dataframes)):
if (i == 0):
self.dataframes[i][1], mean_size, mean_prices, std_size, std_prices = z_score_orderbook(self.dataframes[i][1])
else:
self.dataframes[i][1], _, _, _, _ = z_score_orderbook(self.dataframes[i][1], mean_size, mean_prices, std_size, std_prices)
#apply z-score to size and prices of messages with the statistics of the train set
for i in range(len(self.dataframes)):
if (i == 0):
self.dataframes[i][0], mean_size, mean_prices, std_size, std_prices, mean_time, std_time, mean_depth, std_depth = normalize_messages(self.dataframes[i][0])
else:
self.dataframes[i][0], _, _, _, _, _, _, _, _ = normalize_messages(self.dataframes[i][0], mean_size, mean_prices, std_size, std_prices, mean_time, std_time, mean_depth, std_depth)
def _save(self, path_where_to_save):
np.save(path_where_to_save + "/train.npy", self.train_set)
np.save(path_where_to_save + "/val.npy", self.val_set)
np.save(path_where_to_save + "/test.npy", self.test_set)
def _split_days(self):
train = int(self.num_trading_days * self.split_rates[0])
val = int(self.num_trading_days * self.split_rates[1]) + train
test = int(self.num_trading_days * self.split_rates[2]) + val
print(f"There are {train} days for training, {val - train} days for validation and {test - val} days for testing")
return [train, val, test]