The interface to load log datasets. The datasets currently supported include
import pandas as pd
import os
import numpy as np
import re
from sklearn.utils import shuffle
from collections import OrderedDict
def _split_data(x_data, y_data=None, train_ratio=0, split_type='uniform'):
if split_type == 'uniform' and y_data is not None:
pos_idx = y_data > 0
x_pos = x_data[pos_idx]
y_pos = y_data[pos_idx]
x_neg = x_data[~pos_idx]
y_neg = y_data[~pos_idx]
train_pos = int(train_ratio * x_pos.shape[0])
train_neg = int(train_ratio * x_neg.shape[0])
x_train = np.hstack([x_pos[0:train_pos], x_neg[0:train_neg]])
y_train = np.hstack([y_pos[0:train_pos], y_neg[0:train_neg]])
x_test = np.hstack([x_pos[train_pos:], x_neg[train_neg:]])
y_test = np.hstack([y_pos[train_pos:], y_neg[train_neg:]])
elif split_type == 'sequential':
num_train = int(train_ratio * x_data.shape[0])
x_train = x_data[0:num_train]
x_test = x_data[num_train:]
if y_data is None:
y_train = None
y_test = None
y_train = y_data[0:num_train]
y_test = y_data[num_train:]
# Random shuffle
indexes = shuffle(np.arange(x_train.shape[0]))
x_train = x_train[indexes]
if y_train is not None:
y_train = y_train[indexes]
return (x_train, y_train), (x_test, y_test)
def load_HDFS(log_file, label_file=None, window='session', train_ratio=0.5, split_type='sequential', save_csv=False, window_size=0):
""" Load HDFS structured log into train and test data
log_file: str, the file path of structured log.
label_file: str, the file path of anomaly labels, None for unlabeled data
window: str, the window options including `session` (default).
train_ratio: float, the ratio of training data for train/test split.
split_type: `uniform` or `sequential`, which determines how to split dataset. `uniform` means
to split positive samples and negative samples equally when setting label_file. `sequential`
means to split the data sequentially without label_file. That is, the first part is for training,
while the second part is for testing.
(x_train, y_train): the training data
(x_test, y_test): the testing data
print('====== Input data summary ======')
if log_file.endswith('.npz'):
# Split training and validation set in a class-uniform way
data = np.load(log_file)
x_data = data['x_data']
y_data = data['y_data']
(x_train, y_train), (x_test, y_test) = _split_data(x_data, y_data, train_ratio, split_type)
elif log_file.endswith('.csv'):
assert window == 'session', "Only window=session is supported for HDFS dataset."
print("Loading", log_file)
struct_log = pd.read_csv(log_file, engine='c',
na_filter=False, memory_map=True)
data_dict = OrderedDict()
for idx, row in struct_log.iterrows():
blkId_list = re.findall(r'(blk_-?\d+)', row['Content'])
blkId_set = set(blkId_list)
for blk_Id in blkId_set:
if not blk_Id in data_dict:
data_dict[blk_Id] = []
data_df = pd.DataFrame(list(data_dict.items()), columns=['BlockId', 'EventSequence'])
if label_file:
# Split training and validation set in a class-uniform way
label_data = pd.read_csv(label_file, engine='c', na_filter=False, memory_map=True)
label_data = label_data.set_index('BlockId')
label_dict = label_data['Label'].to_dict()
data_df['Label'] = data_df['BlockId'].apply(lambda x: 1 if label_dict[x] == 'Anomaly' else 0)
# Split train and test data
(x_train, y_train), (x_test, y_test) = _split_data(data_df['EventSequence'].values,
data_df['Label'].values, train_ratio, split_type)
print(y_train.sum(), y_test.sum())
if save_csv:
data_df.to_csv('data_instances.csv', index=False)
if window_size > 0:
x_train, window_y_train, y_train = slice_hdfs(x_train, y_train, window_size)
x_test, window_y_test, y_test = slice_hdfs(x_test, y_test, window_size)
log = "{} {} windows ({}/{} anomaly), {}/{} normal"
print(log.format("Train:", x_train.shape[0], y_train.sum(), y_train.shape[0], (1-y_train).sum(), y_train.shape[0]))
print(log.format("Test:", x_test.shape[0], y_test.sum(), y_test.shape[0], (1-y_test).sum(), y_test.shape[0]))
return (x_train, window_y_train, y_train), (x_test, window_y_test, y_test)
if label_file is None:
if split_type == 'uniform':
split_type = 'sequential'
print('Warning: Only split_type=sequential is supported \
if label_file=None.'.format(split_type))
# Split training and validation set sequentially
x_data = data_df['EventSequence'].values
(x_train, _), (x_test, _) = _split_data(x_data, train_ratio=train_ratio, split_type=split_type)
print('Total: {} instances, train: {} instances, test: {} instances'.format(
x_data.shape[0], x_train.shape[0], x_test.shape[0]))
return (x_train, None), (x_test, None), data_df
raise NotImplementedError('load_HDFS() only support csv and npz files!')
num_train = x_train.shape[0]
num_test = x_test.shape[0]
num_total = num_train + num_test
num_train_pos = sum(y_train)
num_test_pos = sum(y_test)
num_pos = num_train_pos + num_test_pos
print('Total: {} instances, {} anomaly, {} normal' \
.format(num_total, num_pos, num_total - num_pos))
print('Train: {} instances, {} anomaly, {} normal' \
.format(num_train, num_train_pos, num_train - num_train_pos))
print('Test: {} instances, {} anomaly, {} normal\n' \
.format(num_test, num_test_pos, num_test - num_test_pos))
return (x_train, y_train), (x_test, y_test)
def slice_hdfs(x, y, window_size):
results_data = []
print("Slicing {} sessions, with window {}".format(x.shape[0], window_size))
for idx, sequence in enumerate(x):
seqlen = len(sequence)
i = 0
while (i + window_size) < seqlen:
slice = sequence[i: i + window_size]
results_data.append([idx, slice, sequence[i + window_size], y[idx]])
i += 1
slice = sequence[i: i + window_size]
slice += ["#Pad"] * (window_size - len(slice))
results_data.append([idx, slice, "#Pad", y[idx]])
results_df = pd.DataFrame(results_data, columns=["SessionId", "EventSequence", "Label", "SessionLabel"])
print("Slicing done, {} windows generated".format(results_df.shape[0]))
return results_df[["SessionId", "EventSequence"]], results_df["Label"], results_df["SessionLabel"]
def load_BGL(log_file, label_file=None, window='sliding', time_interval=60, stepping_size=60,
""" TODO
def bgl_preprocess_data(para, raw_data, event_mapping_data):
""" split logs into sliding windows, built an event count matrix and get the corresponding label
para: the parameters dictionary
raw_data: list of (label, time)
event_mapping_data: a list of event index, where each row index indicates a corresponding log
event_count_matrix: event count matrix, where each row is an instance (log sequence vector)
labels: a list of labels, 1 represents anomaly
# create the directory for saving the sliding windows (start_index, end_index), which can be directly loaded in future running
if not os.path.exists(para['save_path']):
log_size = raw_data.shape[0]
sliding_file_path = para['save_path']+'sliding_'+str(para['window_size'])+'h_'+str(para['step_size'])+'h.csv'
#=============divide into sliding windows=========#
start_end_index_list = [] # list of tuples, tuple contains two number, which represent the start and end of sliding time window
label_data, time_data = raw_data[:,0], raw_data[:, 1]
if not os.path.exists(sliding_file_path):
# split into sliding window
start_time = time_data[0]
start_index = 0
end_index = 0
# get the first start, end index, end time
for cur_time in time_data:
if cur_time < start_time + para['window_size']*3600:
end_index += 1
end_time = cur_time
# move the start and end index until next sliding window
while end_index < log_size:
start_time = start_time + para['step_size']*3600
end_time = end_time + para['step_size']*3600
for i in range(start_index,end_index):
if time_data[i] < start_time:
for j in range(end_index, log_size):
if time_data[j] < end_time:
start_index = i
end_index = j
start_end_pair = tuple((start_index, end_index))
inst_number = len(start_end_index_list)
print('there are %d instances (sliding windows) in this dataset\n'%inst_number)
print('Loading start_end_index_list from file')
start_end_index_list = pd.read_csv(sliding_file_path, header=None).values
inst_number = len(start_end_index_list)
print('there are %d instances (sliding windows) in this dataset' % inst_number)
# get all the log indexes in each time window by ranging from start_index to end_index
for t in range(inst_number):
index_list = []
for i in range(inst_number):
start_index = start_end_index_list[i][0]
end_index = start_end_index_list[i][1]
for l in range(start_index, end_index):
event_mapping_data = [row[0] for row in event_mapping_data]
event_num = len(list(set(event_mapping_data)))
print('There are %d log events'%event_num)
#=============get labels and event count of each sliding window =========#
labels = []
event_count_matrix = np.zeros((inst_number,event_num))
for j in range(inst_number):
label = 0 #0 represent success, 1 represent failure
for k in expanded_indexes_list[j]:
event_index = event_mapping_data[k]
event_count_matrix[j, event_index] += 1
if label_data[k]:
label = 1
assert inst_number == len(labels)
print("Among all instances, %d are anomalies"%sum(labels))
assert event_count_matrix.shape[0] == len(labels)
return event_count_matrix, labels