|
import os
|
|
from utils.utils_data import z_score_orderbook, normalize_messages, preprocess_data, one_hot_encoding_type
|
|
import pandas as pd
|
|
import numpy as np
|
|
import torch
|
|
import constants as cst
|
|
from torch.utils import data
|
|
|
|
|
|
def lobster_load(path, all_features, len_smooth, h, seq_size):
|
|
set = np.load(path)
|
|
if h == 10:
|
|
tmp = 5
|
|
if h == 20:
|
|
tmp = 4
|
|
elif h == 50:
|
|
tmp = 3
|
|
elif h == 100:
|
|
tmp = 2
|
|
elif h == 200:
|
|
tmp = 1
|
|
labels = set[seq_size-len_smooth:, -tmp]
|
|
labels = labels[np.isfinite(labels)]
|
|
labels = torch.from_numpy(labels).long()
|
|
if all_features:
|
|
input = set[:, cst.LEN_ORDER:cst.LEN_ORDER + 40]
|
|
orders = set[:, :cst.LEN_ORDER]
|
|
input = torch.from_numpy(input).float()
|
|
orders = torch.from_numpy(orders).float()
|
|
input = torch.cat((input, orders), dim=1)
|
|
else:
|
|
input = set[:, cst.LEN_ORDER:cst.LEN_ORDER + 40]
|
|
input = torch.from_numpy(input).float()
|
|
|
|
return input, labels
|
|
|
|
|
|
def labeling(X, len, h, stock):
|
|
|
|
|
|
|
|
[N, D] = X.shape
|
|
|
|
if h < len:
|
|
len = h
|
|
|
|
previous_ask_prices = np.lib.stride_tricks.sliding_window_view(X[:, 0], window_shape=len)[:-h]
|
|
previous_bid_prices = np.lib.stride_tricks.sliding_window_view(X[:, 2], window_shape=len)[:-h]
|
|
future_ask_prices = np.lib.stride_tricks.sliding_window_view(X[:, 0], window_shape=len)[h:]
|
|
future_bid_prices = np.lib.stride_tricks.sliding_window_view(X[:, 2], window_shape=len)[h:]
|
|
|
|
previous_mid_prices = (previous_ask_prices + previous_bid_prices) / 2
|
|
future_mid_prices = (future_ask_prices + future_bid_prices) / 2
|
|
|
|
previous_mid_prices = np.mean(previous_mid_prices, axis=1)
|
|
future_mid_prices = np.mean(future_mid_prices, axis=1)
|
|
|
|
|
|
percentage_change = (future_mid_prices - previous_mid_prices) / previous_mid_prices
|
|
|
|
|
|
alpha = np.abs(percentage_change).mean() / 2
|
|
|
|
|
|
|
|
|
|
print(f"Alpha: {alpha}")
|
|
labels = np.where(percentage_change < -alpha, 2, np.where(percentage_change > alpha, 0, 1))
|
|
print(f"Number of labels: {np.unique(labels, return_counts=True)}")
|
|
print(f"Percentage of labels: {np.unique(labels, return_counts=True)[1] / labels.shape[0]}")
|
|
return labels
|
|
|
|
|
|
class LOBSTERDataBuilder:
|
|
def __init__(
|
|
self,
|
|
stocks,
|
|
data_dir,
|
|
date_trading_days,
|
|
split_rates,
|
|
sampling_type,
|
|
sampling_time,
|
|
sampling_quantity,
|
|
):
|
|
self.n_lob_levels = cst.N_LOB_LEVELS
|
|
self.data_dir = data_dir
|
|
self.date_trading_days = date_trading_days
|
|
self.stocks = stocks
|
|
self.split_rates = split_rates
|
|
|
|
self.sampling_type = sampling_type
|
|
self.sampling_time = sampling_time
|
|
self.sampling_quantity = sampling_quantity
|
|
|
|
|
|
def prepare_save_datasets(self):
|
|
for i in range(len(self.stocks)):
|
|
stock = self.stocks[i]
|
|
path = "{}/{}/{}_{}_{}".format(
|
|
self.data_dir,
|
|
stock,
|
|
stock,
|
|
self.date_trading_days[0],
|
|
self.date_trading_days[1],
|
|
)
|
|
self.dataframes = []
|
|
self._prepare_dataframes(path, stock)
|
|
|
|
path_where_to_save = "{}/{}".format(
|
|
self.data_dir,
|
|
stock,
|
|
)
|
|
|
|
self.train_input = pd.concat(self.dataframes[0], axis=1).values
|
|
self.val_input = pd.concat(self.dataframes[1], axis=1).values
|
|
self.test_input = pd.concat(self.dataframes[2], axis=1).values
|
|
self.train_set = pd.concat([pd.DataFrame(self.train_input), pd.DataFrame(self.train_labels_horizons)], axis=1).values
|
|
self.val_set = pd.concat([pd.DataFrame(self.val_input), pd.DataFrame(self.val_labels_horizons)], axis=1).values
|
|
self.test_set = pd.concat([pd.DataFrame(self.test_input), pd.DataFrame(self.test_labels_horizons)], axis=1).values
|
|
self._save(path_where_to_save)
|
|
|
|
|
|
def _prepare_dataframes(self, path, stock):
|
|
COLUMNS_NAMES = {"orderbook": ["sell1", "vsell1", "buy1", "vbuy1",
|
|
"sell2", "vsell2", "buy2", "vbuy2",
|
|
"sell3", "vsell3", "buy3", "vbuy3",
|
|
"sell4", "vsell4", "buy4", "vbuy4",
|
|
"sell5", "vsell5", "buy5", "vbuy5",
|
|
"sell6", "vsell6", "buy6", "vbuy6",
|
|
"sell7", "vsell7", "buy7", "vbuy7",
|
|
"sell8", "vsell8", "buy8", "vbuy8",
|
|
"sell9", "vsell9", "buy9", "vbuy9",
|
|
"sell10", "vsell10", "buy10", "vbuy10"],
|
|
"message": ["time", "event_type", "order_id", "size", "price", "direction"]}
|
|
self.num_trading_days = len(os.listdir(path))//2
|
|
split_days = self._split_days()
|
|
split_days = [i * 2 for i in split_days]
|
|
self._create_dataframes_splitted(path, split_days, COLUMNS_NAMES)
|
|
|
|
for i in range(len(self.dataframes)):
|
|
self.dataframes[i][0]["price"] = self.dataframes[i][0]["price"] / 10000
|
|
self.dataframes[i][1].loc[:, ::2] /= 10000
|
|
train_input = self.dataframes[0][1].values
|
|
val_input = self.dataframes[1][1].values
|
|
test_input = self.dataframes[2][1].values
|
|
|
|
for i in range(len(cst.LOBSTER_HORIZONS)):
|
|
if i == 0:
|
|
train_labels = labeling(train_input, cst.LEN_SMOOTH, cst.LOBSTER_HORIZONS[i], stock)
|
|
val_labels = labeling(val_input, cst.LEN_SMOOTH, cst.LOBSTER_HORIZONS[i], stock)
|
|
test_labels = labeling(test_input, cst.LEN_SMOOTH, cst.LOBSTER_HORIZONS[i], stock)
|
|
train_labels = np.concatenate([train_labels, np.full(shape=(train_input.shape[0] - train_labels.shape[0]), fill_value=np.inf)])
|
|
val_labels = np.concatenate([val_labels, np.full(shape=(val_input.shape[0] - val_labels.shape[0]), fill_value=np.inf)])
|
|
test_labels = np.concatenate([test_labels, np.full(shape=(test_input.shape[0] - test_labels.shape[0]), fill_value=np.inf)])
|
|
self.train_labels_horizons = pd.DataFrame(train_labels, columns=["label_h{}".format(cst.LOBSTER_HORIZONS[i])])
|
|
self.val_labels_horizons = pd.DataFrame(val_labels, columns=["label_h{}".format(cst.LOBSTER_HORIZONS[i])])
|
|
self.test_labels_horizons = pd.DataFrame(test_labels, columns=["label_h{}".format(cst.LOBSTER_HORIZONS[i])])
|
|
else:
|
|
train_labels = labeling(train_input, cst.LEN_SMOOTH, cst.LOBSTER_HORIZONS[i], stock)
|
|
val_labels = labeling(val_input, cst.LEN_SMOOTH, cst.LOBSTER_HORIZONS[i], stock)
|
|
test_labels = labeling(test_input, cst.LEN_SMOOTH, cst.LOBSTER_HORIZONS[i], stock)
|
|
train_labels = np.concatenate([train_labels, np.full(shape=(train_input.shape[0] - train_labels.shape[0]), fill_value=np.inf)])
|
|
val_labels = np.concatenate([val_labels, np.full(shape=(val_input.shape[0] - val_labels.shape[0]), fill_value=np.inf)])
|
|
test_labels = np.concatenate([test_labels, np.full(shape=(test_input.shape[0] - test_labels.shape[0]), fill_value=np.inf)])
|
|
self.train_labels_horizons["label_h{}".format(cst.LOBSTER_HORIZONS[i])] = train_labels
|
|
self.val_labels_horizons["label_h{}".format(cst.LOBSTER_HORIZONS[i])] = val_labels
|
|
self.test_labels_horizons["label_h{}".format(cst.LOBSTER_HORIZONS[i])] = test_labels
|
|
|
|
|
|
|
|
|
|
self._normalize_dataframes()
|
|
|
|
|
|
def _sparse_representation(self):
|
|
tick_size = 0.01
|
|
for i in range(len(self.dataframes)):
|
|
dense_repr = self.dataframes[i][1].values
|
|
sparse_repr = np.zeros((dense_repr.shape[0], dense_repr.shape[1] + 1))
|
|
for row in range(dense_repr.shape[0]):
|
|
sparse_pos_ask = 0
|
|
sparse_pos_bid = 0
|
|
mid_price = (dense_repr[row][0] + dense_repr[row][2]) / 2
|
|
sparse_repr[row][-1] = mid_price
|
|
for col in range(0, dense_repr.shape[1], 2):
|
|
if col == 0:
|
|
start_ask = dense_repr[row][col]
|
|
elif col == 2:
|
|
start_bid = dense_repr[row][col]
|
|
elif col % 4 == 0:
|
|
if sparse_pos_ask < (sparse_repr.shape[1]) - 1 / 2:
|
|
actual_ask = dense_repr[row][col]
|
|
for level in range(0, actual_ask-start_ask, -tick_size):
|
|
if sparse_pos_ask < (sparse_repr.shape[1]) - 1 / 2:
|
|
if level == actual_ask - start_ask - tick_size:
|
|
sparse_repr[row][sparse_pos_ask] = dense_repr[row][col+1]
|
|
else:
|
|
sparse_repr[row][sparse_pos_ask] = 0
|
|
sparse_pos_ask += 1
|
|
else:
|
|
break
|
|
start_ask = actual_ask
|
|
else:
|
|
continue
|
|
elif col % 4 == 2:
|
|
if sparse_pos_bid < (sparse_repr.shape[1]) - 1 / 2:
|
|
actual_bid = dense_repr[row][col]
|
|
for level in range(0, start_bid-actual_bid, -tick_size):
|
|
if sparse_pos_bid < (sparse_repr.shape[1]) - 1 / 2:
|
|
if level == start_bid - actual_bid - tick_size:
|
|
sparse_repr[row][sparse_pos_ask] = dense_repr[row][col+1]
|
|
else:
|
|
sparse_repr[row][sparse_pos_ask] = 0
|
|
sparse_pos_bid += 1
|
|
else:
|
|
break
|
|
start_bid = actual_bid
|
|
else:
|
|
continue
|
|
|
|
|
|
def _create_dataframes_splitted(self, path, split_days, COLUMNS_NAMES):
|
|
|
|
total_shape = 0
|
|
for i, filename in enumerate(sorted(os.listdir(path))):
|
|
f = os.path.join(path, filename)
|
|
print(f)
|
|
if os.path.isfile(f):
|
|
|
|
if i < split_days[0]:
|
|
if (i % 2) == 0:
|
|
if i == 0:
|
|
train_messages = pd.read_csv(f, names=COLUMNS_NAMES["message"])
|
|
else:
|
|
train_message = pd.read_csv(f, names=COLUMNS_NAMES["message"])
|
|
|
|
else:
|
|
if i == 1:
|
|
train_orderbooks = pd.read_csv(f, names=COLUMNS_NAMES["orderbook"])
|
|
total_shape += train_orderbooks.shape[0]
|
|
train_orderbooks, train_messages = preprocess_data([train_messages, train_orderbooks], self.n_lob_levels, self.sampling_type, self.sampling_time, self.sampling_quantity)
|
|
if (len(train_orderbooks) != len(train_messages)):
|
|
raise ValueError("train_orderbook length is different than train_messages")
|
|
else:
|
|
train_orderbook = pd.read_csv(f, names=COLUMNS_NAMES["orderbook"])
|
|
total_shape += train_orderbook.shape[0]
|
|
train_orderbook, train_message = preprocess_data([train_message, train_orderbook], self.n_lob_levels, self.sampling_type, self.sampling_time, self.sampling_quantity)
|
|
train_messages = pd.concat([train_messages, train_message], axis=0)
|
|
train_orderbooks = pd.concat([train_orderbooks, train_orderbook], axis=0)
|
|
|
|
elif split_days[0] <= i < split_days[1]:
|
|
if (i % 2) == 0:
|
|
if (i == split_days[0]):
|
|
self.dataframes.append([train_messages, train_orderbooks])
|
|
val_messages = pd.read_csv(f, names=COLUMNS_NAMES["message"])
|
|
else:
|
|
val_message = pd.read_csv(f, names=COLUMNS_NAMES["message"])
|
|
else:
|
|
if i == split_days[0] + 1:
|
|
val_orderbooks = pd.read_csv(f, names=COLUMNS_NAMES["orderbook"])
|
|
total_shape += val_orderbooks.shape[0]
|
|
val_orderbooks, val_messages = preprocess_data([val_messages, val_orderbooks], self.n_lob_levels, self.sampling_type, self.sampling_time, self.sampling_quantity)
|
|
if (len(val_orderbooks) != len(val_messages)):
|
|
raise ValueError("val_orderbook length is different than val_messages")
|
|
else:
|
|
val_orderbook = pd.read_csv(f, names=COLUMNS_NAMES["orderbook"])
|
|
total_shape += val_orderbook.shape[0]
|
|
val_orderbook, val_message = preprocess_data([val_message, val_orderbook], self.n_lob_levels, self.sampling_type, self.sampling_time, self.sampling_quantity)
|
|
val_messages = pd.concat([val_messages, val_message], axis=0)
|
|
val_orderbooks = pd.concat([val_orderbooks, val_orderbook], axis=0)
|
|
|
|
else:
|
|
|
|
if (i % 2) == 0:
|
|
if (i == split_days[1]):
|
|
self.dataframes.append([val_messages, val_orderbooks])
|
|
test_messages = pd.read_csv(f, names=COLUMNS_NAMES["message"])
|
|
else:
|
|
test_message = pd.read_csv(f, names=COLUMNS_NAMES["message"])
|
|
|
|
else:
|
|
if i == split_days[1] + 1:
|
|
test_orderbooks = pd.read_csv(f, names=COLUMNS_NAMES["orderbook"])
|
|
test_orderbooks, test_messages = preprocess_data([test_messages, test_orderbooks], self.n_lob_levels, self.sampling_type, self.sampling_time, self.sampling_quantity)
|
|
if (len(test_orderbooks) != len(test_messages)):
|
|
raise ValueError("test_orderbook length is different than test_messages")
|
|
else:
|
|
test_orderbook = pd.read_csv(f, names=COLUMNS_NAMES["orderbook"])
|
|
test_orderbook, test_message = preprocess_data([test_message, test_orderbook], self.n_lob_levels, self.sampling_type, self.sampling_time, self.sampling_quantity)
|
|
test_messages = pd.concat([test_messages, test_message], axis=0)
|
|
test_orderbooks = pd.concat([test_orderbooks, test_orderbook], axis=0)
|
|
else:
|
|
raise ValueError("File {} is not a file".format(f))
|
|
self.dataframes.append([test_messages, test_orderbooks])
|
|
print(f"Total shape of the orderbooks is {total_shape}")
|
|
|
|
|
|
def _normalize_dataframes(self):
|
|
|
|
for i in range(len(self.dataframes)):
|
|
if (i == 0):
|
|
self.dataframes[i][1], mean_size, mean_prices, std_size, std_prices = z_score_orderbook(self.dataframes[i][1])
|
|
else:
|
|
self.dataframes[i][1], _, _, _, _ = z_score_orderbook(self.dataframes[i][1], mean_size, mean_prices, std_size, std_prices)
|
|
|
|
|
|
for i in range(len(self.dataframes)):
|
|
if (i == 0):
|
|
self.dataframes[i][0], mean_size, mean_prices, std_size, std_prices, mean_time, std_time, mean_depth, std_depth = normalize_messages(self.dataframes[i][0])
|
|
else:
|
|
self.dataframes[i][0], _, _, _, _, _, _, _, _ = normalize_messages(self.dataframes[i][0], mean_size, mean_prices, std_size, std_prices, mean_time, std_time, mean_depth, std_depth)
|
|
|
|
def _save(self, path_where_to_save):
|
|
np.save(path_where_to_save + "/train.npy", self.train_set)
|
|
np.save(path_where_to_save + "/val.npy", self.val_set)
|
|
np.save(path_where_to_save + "/test.npy", self.test_set)
|
|
|
|
|
|
def _split_days(self):
|
|
train = int(self.num_trading_days * self.split_rates[0])
|
|
val = int(self.num_trading_days * self.split_rates[1]) + train
|
|
test = int(self.num_trading_days * self.split_rates[2]) + val
|
|
print(f"There are {train} days for training, {val - train} days for validation and {test - val} days for testing")
|
|
return [train, val, test] |