""" The interface to load log datasets. The datasets currently supported include HDFS and BGL. Authors: LogPAI Team """ import pandas as pd import os import numpy as np import re from sklearn.utils import shuffle from collections import OrderedDict def _split_data(x_data, y_data=None, train_ratio=0, split_type='uniform'): if split_type == 'uniform' and y_data is not None: pos_idx = y_data > 0 x_pos = x_data[pos_idx] y_pos = y_data[pos_idx] x_neg = x_data[~pos_idx] y_neg = y_data[~pos_idx] train_pos = int(train_ratio * x_pos.shape[0]) train_neg = int(train_ratio * x_neg.shape[0]) x_train = np.hstack([x_pos[0:train_pos], x_neg[0:train_neg]]) y_train = np.hstack([y_pos[0:train_pos], y_neg[0:train_neg]]) x_test = np.hstack([x_pos[train_pos:], x_neg[train_neg:]]) y_test = np.hstack([y_pos[train_pos:], y_neg[train_neg:]]) elif split_type == 'sequential': num_train = int(train_ratio * x_data.shape[0]) x_train = x_data[0:num_train] x_test = x_data[num_train:] if y_data is None: y_train = None y_test = None else: y_train = y_data[0:num_train] y_test = y_data[num_train:] # Random shuffle indexes = shuffle(np.arange(x_train.shape[0])) x_train = x_train[indexes] if y_train is not None: y_train = y_train[indexes] return (x_train, y_train), (x_test, y_test) def load_HDFS(log_file, label_file=None, window='session', train_ratio=0.5, split_type='sequential', save_csv=False, window_size=0): """ Load HDFS structured log into train and test data Arguments --------- log_file: str, the file path of structured log. label_file: str, the file path of anomaly labels, None for unlabeled data window: str, the window options including `session` (default). train_ratio: float, the ratio of training data for train/test split. split_type: `uniform` or `sequential`, which determines how to split dataset. `uniform` means to split positive samples and negative samples equally when setting label_file. `sequential` means to split the data sequentially without label_file. That is, the first part is for training, while the second part is for testing. Returns ------- (x_train, y_train): the training data (x_test, y_test): the testing data """ print('====== Input data summary ======') if log_file.endswith('.npz'): # Split training and validation set in a class-uniform way data = np.load(log_file) x_data = data['x_data'] y_data = data['y_data'] (x_train, y_train), (x_test, y_test) = _split_data(x_data, y_data, train_ratio, split_type) elif log_file.endswith('.csv'): assert window == 'session', "Only window=session is supported for HDFS dataset." print("Loading", log_file) struct_log = pd.read_csv(log_file, engine='c', na_filter=False, memory_map=True) data_dict = OrderedDict() for idx, row in struct_log.iterrows(): blkId_list = re.findall(r'(blk_-?\d+)', row['Content']) blkId_set = set(blkId_list) for blk_Id in blkId_set: if not blk_Id in data_dict: data_dict[blk_Id] = [] data_dict[blk_Id].append(row['EventId']) data_df = pd.DataFrame(list(data_dict.items()), columns=['BlockId', 'EventSequence']) if label_file: # Split training and validation set in a class-uniform way label_data = pd.read_csv(label_file, engine='c', na_filter=False, memory_map=True) label_data = label_data.set_index('BlockId') label_dict = label_data['Label'].to_dict() data_df['Label'] = data_df['BlockId'].apply(lambda x: 1 if label_dict[x] == 'Anomaly' else 0) # Split train and test data (x_train, y_train), (x_test, y_test) = _split_data(data_df['EventSequence'].values, data_df['Label'].values, train_ratio, split_type) print(y_train.sum(), y_test.sum()) if save_csv: data_df.to_csv('data_instances.csv', index=False) if window_size > 0: x_train, window_y_train, y_train = slice_hdfs(x_train, y_train, window_size) x_test, window_y_test, y_test = slice_hdfs(x_test, y_test, window_size) log = "{} {} windows ({}/{} anomaly), {}/{} normal" print(log.format("Train:", x_train.shape[0], y_train.sum(), y_train.shape[0], (1-y_train).sum(), y_train.shape[0])) print(log.format("Test:", x_test.shape[0], y_test.sum(), y_test.shape[0], (1-y_test).sum(), y_test.shape[0])) return (x_train, window_y_train, y_train), (x_test, window_y_test, y_test) if label_file is None: if split_type == 'uniform': split_type = 'sequential' print('Warning: Only split_type=sequential is supported \ if label_file=None.'.format(split_type)) # Split training and validation set sequentially x_data = data_df['EventSequence'].values (x_train, _), (x_test, _) = _split_data(x_data, train_ratio=train_ratio, split_type=split_type) print('Total: {} instances, train: {} instances, test: {} instances'.format( x_data.shape[0], x_train.shape[0], x_test.shape[0])) return (x_train, None), (x_test, None), data_df else: raise NotImplementedError('load_HDFS() only support csv and npz files!') num_train = x_train.shape[0] num_test = x_test.shape[0] num_total = num_train + num_test num_train_pos = sum(y_train) num_test_pos = sum(y_test) num_pos = num_train_pos + num_test_pos print('Total: {} instances, {} anomaly, {} normal' \ .format(num_total, num_pos, num_total - num_pos)) print('Train: {} instances, {} anomaly, {} normal' \ .format(num_train, num_train_pos, num_train - num_train_pos)) print('Test: {} instances, {} anomaly, {} normal\n' \ .format(num_test, num_test_pos, num_test - num_test_pos)) return (x_train, y_train), (x_test, y_test) def slice_hdfs(x, y, window_size): results_data = [] print("Slicing {} sessions, with window {}".format(x.shape[0], window_size)) for idx, sequence in enumerate(x): seqlen = len(sequence) i = 0 while (i + window_size) < seqlen: slice = sequence[i: i + window_size] results_data.append([idx, slice, sequence[i + window_size], y[idx]]) i += 1 else: slice = sequence[i: i + window_size] slice += ["#Pad"] * (window_size - len(slice)) results_data.append([idx, slice, "#Pad", y[idx]]) results_df = pd.DataFrame(results_data, columns=["SessionId", "EventSequence", "Label", "SessionLabel"]) print("Slicing done, {} windows generated".format(results_df.shape[0])) return results_df[["SessionId", "EventSequence"]], results_df["Label"], results_df["SessionLabel"] def load_BGL(log_file, label_file=None, window='sliding', time_interval=60, stepping_size=60, train_ratio=0.8): """ TODO """ def bgl_preprocess_data(para, raw_data, event_mapping_data): """ split logs into sliding windows, built an event count matrix and get the corresponding label Args: -------- para: the parameters dictionary raw_data: list of (label, time) event_mapping_data: a list of event index, where each row index indicates a corresponding log Returns: -------- event_count_matrix: event count matrix, where each row is an instance (log sequence vector) labels: a list of labels, 1 represents anomaly """ # create the directory for saving the sliding windows (start_index, end_index), which can be directly loaded in future running if not os.path.exists(para['save_path']): os.mkdir(para['save_path']) log_size = raw_data.shape[0] sliding_file_path = para['save_path']+'sliding_'+str(para['window_size'])+'h_'+str(para['step_size'])+'h.csv' #=============divide into sliding windows=========# start_end_index_list = [] # list of tuples, tuple contains two number, which represent the start and end of sliding time window label_data, time_data = raw_data[:,0], raw_data[:, 1] if not os.path.exists(sliding_file_path): # split into sliding window start_time = time_data[0] start_index = 0 end_index = 0 # get the first start, end index, end time for cur_time in time_data: if cur_time < start_time + para['window_size']*3600: end_index += 1 end_time = cur_time else: start_end_pair=tuple((start_index,end_index)) start_end_index_list.append(start_end_pair) break # move the start and end index until next sliding window while end_index < log_size: start_time = start_time + para['step_size']*3600 end_time = end_time + para['step_size']*3600 for i in range(start_index,end_index): if time_data[i] < start_time: i+=1 else: break for j in range(end_index, log_size): if time_data[j] < end_time: j+=1 else: break start_index = i end_index = j start_end_pair = tuple((start_index, end_index)) start_end_index_list.append(start_end_pair) inst_number = len(start_end_index_list) print('there are %d instances (sliding windows) in this dataset\n'%inst_number) np.savetxt(sliding_file_path,start_end_index_list,delimiter=',',fmt='%d') else: print('Loading start_end_index_list from file') start_end_index_list = pd.read_csv(sliding_file_path, header=None).values inst_number = len(start_end_index_list) print('there are %d instances (sliding windows) in this dataset' % inst_number) # get all the log indexes in each time window by ranging from start_index to end_index expanded_indexes_list=[] for t in range(inst_number): index_list = [] expanded_indexes_list.append(index_list) for i in range(inst_number): start_index = start_end_index_list[i][0] end_index = start_end_index_list[i][1] for l in range(start_index, end_index): expanded_indexes_list[i].append(l) event_mapping_data = [row[0] for row in event_mapping_data] event_num = len(list(set(event_mapping_data))) print('There are %d log events'%event_num) #=============get labels and event count of each sliding window =========# labels = [] event_count_matrix = np.zeros((inst_number,event_num)) for j in range(inst_number): label = 0 #0 represent success, 1 represent failure for k in expanded_indexes_list[j]: event_index = event_mapping_data[k] event_count_matrix[j, event_index] += 1 if label_data[k]: label = 1 continue labels.append(label) assert inst_number == len(labels) print("Among all instances, %d are anomalies"%sum(labels)) assert event_count_matrix.shape[0] == len(labels) return event_count_matrix, labels