Log-Decoder / loglizer /dataloader.py
jpcabangon
init logdecoder app files
9c323ee
raw
history blame
11.6 kB
"""
The interface to load log datasets. The datasets currently supported include
HDFS and BGL.
Authors:
LogPAI Team
"""
import pandas as pd
import os
import numpy as np
import re
from sklearn.utils import shuffle
from collections import OrderedDict
def _split_data(x_data, y_data=None, train_ratio=0, split_type='uniform'):
if split_type == 'uniform' and y_data is not None:
pos_idx = y_data > 0
x_pos = x_data[pos_idx]
y_pos = y_data[pos_idx]
x_neg = x_data[~pos_idx]
y_neg = y_data[~pos_idx]
train_pos = int(train_ratio * x_pos.shape[0])
train_neg = int(train_ratio * x_neg.shape[0])
x_train = np.hstack([x_pos[0:train_pos], x_neg[0:train_neg]])
y_train = np.hstack([y_pos[0:train_pos], y_neg[0:train_neg]])
x_test = np.hstack([x_pos[train_pos:], x_neg[train_neg:]])
y_test = np.hstack([y_pos[train_pos:], y_neg[train_neg:]])
elif split_type == 'sequential':
num_train = int(train_ratio * x_data.shape[0])
x_train = x_data[0:num_train]
x_test = x_data[num_train:]
if y_data is None:
y_train = None
y_test = None
else:
y_train = y_data[0:num_train]
y_test = y_data[num_train:]
# Random shuffle
indexes = shuffle(np.arange(x_train.shape[0]))
x_train = x_train[indexes]
if y_train is not None:
y_train = y_train[indexes]
return (x_train, y_train), (x_test, y_test)
def load_HDFS(log_file, label_file=None, window='session', train_ratio=0.5, split_type='sequential', save_csv=False, window_size=0):
""" Load HDFS structured log into train and test data
Arguments
---------
log_file: str, the file path of structured log.
label_file: str, the file path of anomaly labels, None for unlabeled data
window: str, the window options including `session` (default).
train_ratio: float, the ratio of training data for train/test split.
split_type: `uniform` or `sequential`, which determines how to split dataset. `uniform` means
to split positive samples and negative samples equally when setting label_file. `sequential`
means to split the data sequentially without label_file. That is, the first part is for training,
while the second part is for testing.
Returns
-------
(x_train, y_train): the training data
(x_test, y_test): the testing data
"""
print('====== Input data summary ======')
if log_file.endswith('.npz'):
# Split training and validation set in a class-uniform way
data = np.load(log_file)
x_data = data['x_data']
y_data = data['y_data']
(x_train, y_train), (x_test, y_test) = _split_data(x_data, y_data, train_ratio, split_type)
elif log_file.endswith('.csv'):
assert window == 'session', "Only window=session is supported for HDFS dataset."
print("Loading", log_file)
struct_log = pd.read_csv(log_file, engine='c',
na_filter=False, memory_map=True)
data_dict = OrderedDict()
for idx, row in struct_log.iterrows():
blkId_list = re.findall(r'(blk_-?\d+)', row['Content'])
blkId_set = set(blkId_list)
for blk_Id in blkId_set:
if not blk_Id in data_dict:
data_dict[blk_Id] = []
data_dict[blk_Id].append(row['EventId'])
data_df = pd.DataFrame(list(data_dict.items()), columns=['BlockId', 'EventSequence'])
if label_file:
# Split training and validation set in a class-uniform way
label_data = pd.read_csv(label_file, engine='c', na_filter=False, memory_map=True)
label_data = label_data.set_index('BlockId')
label_dict = label_data['Label'].to_dict()
data_df['Label'] = data_df['BlockId'].apply(lambda x: 1 if label_dict[x] == 'Anomaly' else 0)
# Split train and test data
(x_train, y_train), (x_test, y_test) = _split_data(data_df['EventSequence'].values,
data_df['Label'].values, train_ratio, split_type)
print(y_train.sum(), y_test.sum())
if save_csv:
data_df.to_csv('data_instances.csv', index=False)
if window_size > 0:
x_train, window_y_train, y_train = slice_hdfs(x_train, y_train, window_size)
x_test, window_y_test, y_test = slice_hdfs(x_test, y_test, window_size)
log = "{} {} windows ({}/{} anomaly), {}/{} normal"
print(log.format("Train:", x_train.shape[0], y_train.sum(), y_train.shape[0], (1-y_train).sum(), y_train.shape[0]))
print(log.format("Test:", x_test.shape[0], y_test.sum(), y_test.shape[0], (1-y_test).sum(), y_test.shape[0]))
return (x_train, window_y_train, y_train), (x_test, window_y_test, y_test)
if label_file is None:
if split_type == 'uniform':
split_type = 'sequential'
print('Warning: Only split_type=sequential is supported \
if label_file=None.'.format(split_type))
# Split training and validation set sequentially
x_data = data_df['EventSequence'].values
(x_train, _), (x_test, _) = _split_data(x_data, train_ratio=train_ratio, split_type=split_type)
print('Total: {} instances, train: {} instances, test: {} instances'.format(
x_data.shape[0], x_train.shape[0], x_test.shape[0]))
return (x_train, None), (x_test, None), data_df
else:
raise NotImplementedError('load_HDFS() only support csv and npz files!')
num_train = x_train.shape[0]
num_test = x_test.shape[0]
num_total = num_train + num_test
num_train_pos = sum(y_train)
num_test_pos = sum(y_test)
num_pos = num_train_pos + num_test_pos
print('Total: {} instances, {} anomaly, {} normal' \
.format(num_total, num_pos, num_total - num_pos))
print('Train: {} instances, {} anomaly, {} normal' \
.format(num_train, num_train_pos, num_train - num_train_pos))
print('Test: {} instances, {} anomaly, {} normal\n' \
.format(num_test, num_test_pos, num_test - num_test_pos))
return (x_train, y_train), (x_test, y_test)
def slice_hdfs(x, y, window_size):
results_data = []
print("Slicing {} sessions, with window {}".format(x.shape[0], window_size))
for idx, sequence in enumerate(x):
seqlen = len(sequence)
i = 0
while (i + window_size) < seqlen:
slice = sequence[i: i + window_size]
results_data.append([idx, slice, sequence[i + window_size], y[idx]])
i += 1
else:
slice = sequence[i: i + window_size]
slice += ["#Pad"] * (window_size - len(slice))
results_data.append([idx, slice, "#Pad", y[idx]])
results_df = pd.DataFrame(results_data, columns=["SessionId", "EventSequence", "Label", "SessionLabel"])
print("Slicing done, {} windows generated".format(results_df.shape[0]))
return results_df[["SessionId", "EventSequence"]], results_df["Label"], results_df["SessionLabel"]
def load_BGL(log_file, label_file=None, window='sliding', time_interval=60, stepping_size=60,
train_ratio=0.8):
""" TODO
"""
def bgl_preprocess_data(para, raw_data, event_mapping_data):
""" split logs into sliding windows, built an event count matrix and get the corresponding label
Args:
--------
para: the parameters dictionary
raw_data: list of (label, time)
event_mapping_data: a list of event index, where each row index indicates a corresponding log
Returns:
--------
event_count_matrix: event count matrix, where each row is an instance (log sequence vector)
labels: a list of labels, 1 represents anomaly
"""
# create the directory for saving the sliding windows (start_index, end_index), which can be directly loaded in future running
if not os.path.exists(para['save_path']):
os.mkdir(para['save_path'])
log_size = raw_data.shape[0]
sliding_file_path = para['save_path']+'sliding_'+str(para['window_size'])+'h_'+str(para['step_size'])+'h.csv'
#=============divide into sliding windows=========#
start_end_index_list = [] # list of tuples, tuple contains two number, which represent the start and end of sliding time window
label_data, time_data = raw_data[:,0], raw_data[:, 1]
if not os.path.exists(sliding_file_path):
# split into sliding window
start_time = time_data[0]
start_index = 0
end_index = 0
# get the first start, end index, end time
for cur_time in time_data:
if cur_time < start_time + para['window_size']*3600:
end_index += 1
end_time = cur_time
else:
start_end_pair=tuple((start_index,end_index))
start_end_index_list.append(start_end_pair)
break
# move the start and end index until next sliding window
while end_index < log_size:
start_time = start_time + para['step_size']*3600
end_time = end_time + para['step_size']*3600
for i in range(start_index,end_index):
if time_data[i] < start_time:
i+=1
else:
break
for j in range(end_index, log_size):
if time_data[j] < end_time:
j+=1
else:
break
start_index = i
end_index = j
start_end_pair = tuple((start_index, end_index))
start_end_index_list.append(start_end_pair)
inst_number = len(start_end_index_list)
print('there are %d instances (sliding windows) in this dataset\n'%inst_number)
np.savetxt(sliding_file_path,start_end_index_list,delimiter=',',fmt='%d')
else:
print('Loading start_end_index_list from file')
start_end_index_list = pd.read_csv(sliding_file_path, header=None).values
inst_number = len(start_end_index_list)
print('there are %d instances (sliding windows) in this dataset' % inst_number)
# get all the log indexes in each time window by ranging from start_index to end_index
expanded_indexes_list=[]
for t in range(inst_number):
index_list = []
expanded_indexes_list.append(index_list)
for i in range(inst_number):
start_index = start_end_index_list[i][0]
end_index = start_end_index_list[i][1]
for l in range(start_index, end_index):
expanded_indexes_list[i].append(l)
event_mapping_data = [row[0] for row in event_mapping_data]
event_num = len(list(set(event_mapping_data)))
print('There are %d log events'%event_num)
#=============get labels and event count of each sliding window =========#
labels = []
event_count_matrix = np.zeros((inst_number,event_num))
for j in range(inst_number):
label = 0 #0 represent success, 1 represent failure
for k in expanded_indexes_list[j]:
event_index = event_mapping_data[k]
event_count_matrix[j, event_index] += 1
if label_data[k]:
label = 1
continue
labels.append(label)
assert inst_number == len(labels)
print("Among all instances, %d are anomalies"%sum(labels))
assert event_count_matrix.shape[0] == len(labels)
return event_count_matrix, labels