""" The implementation of Invariants Mining model for anomaly detection. Authors: LogPAI Team Reference: [1] Jian-Guang Lou, Qiang Fu, Shengqi Yang, Ye Xu, Jiang Li. Mining Invariants from Console Logs for System Problem Detection. USENIX Annual Technical Conference (ATC), 2010. """ import numpy as np from itertools import combinations from ..utils import metrics class InvariantsMiner(object): def __init__(self, percentage=0.98, epsilon=0.5, longest_invarant=None, scale_list=[1,2,3]): """ The Invariants Mining model for anomaly detection Attributes ---------- percentage: float, percentage of samples satisfying the condition that |X_j * V_i| < epsilon epsilon: float, the threshold for estimating the invariant space longest_invarant: int, the specified maximal length of invariant, default to None. Stop searching when the invariant length is larger than longest_invarant. scale_list: list, the list used to scale the theta of float into integer invariants_dict: dict, dictionary of invariants where key is the selected columns and value is the weights the of invariant """ self.percentage = percentage self.epsilon = epsilon self.longest_invarant = longest_invarant self.scale_list = scale_list self.invariants_dict = None def fit(self, X): """ Arguments --------- X: ndarray, the event count matrix of shape num_instances-by-num_events """ print('====== Model summary ======') invar_dim = self._estimate_invarant_space(X) self._invariants_search(X, invar_dim) def predict(self, X): """ Predict anomalies with mined invariants Arguments --------- X: the input event count matrix Returns ------- y_pred: ndarray, the predicted label vector of shape (num_instances,) """ y_sum = np.zeros(X.shape[0]) for cols, theta in self.invariants_dict.items(): y_sum += np.fabs(np.dot(X[:, cols], np.array(theta))) y_pred = (y_sum > 1e-6).astype(int) return y_pred def evaluate(self, X, y_true): print('====== Evaluation summary ======') y_pred = self.predict(X) precision, recall, f1 = metrics(y_pred, y_true) print('Precision: {:.3f}, recall: {:.3f}, F1-measure: {:.3f}\n'.format(precision, recall, f1)) return precision, recall, f1 def _estimate_invarant_space(self, X): """ Estimate the dimension of invariant space using SVD decomposition Arguments --------- X: ndarray, the event count matrix of shape num_instances-by-num_events percentage: float, percentage of samples satisfying the condition that |X_j * V_i| < epsilon epsilon: float, the threshold for estimating the invariant space Returns ------- r: the dimension of invariant space """ covariance_matrix = np.dot(X.T, X) U, sigma, V = np.linalg.svd(covariance_matrix) # SVD decomposition # Start from the right most column of matrix V, sigular values are in ascending order num_instances, num_events = X.shape r = 0 for i in range(num_events - 1, -1, -1): zero_count = sum(abs(np.dot(X, U[:, i])) < self.epsilon) if zero_count / float(num_instances) < self.percentage: break r += 1 print('Invariant space dimension: {}'.format(r)) return r def _invariants_search(self, X, r): """ Mine invariant relationships from X Arguments --------- X: ndarray, the event count matrix of shape num_instances-by-num_events r: the dimension of invariant space """ num_instances, num_events = X.shape invariants_dict = dict() # save the mined Invariants(value) and its corresponding columns(key) search_space = [] # only invariant candidates in this list are valid # invariant of only one column (all zero columns) init_cols = sorted([[item] for item in range(num_events)]) for col in init_cols: search_space.append(col) init_col_list = init_cols[:] for col in init_cols: if np.count_nonzero(X[:, col]) == 0: invariants_dict[tuple(col)] = [1] search_space.remove(col) init_col_list.remove(col) item_list = init_col_list length = 2 FLAG_break_loop = False # check invariant of more columns while len(item_list) != 0: if self.longest_invarant and len(item_list[0]) >= self.longest_invarant: break joined_item_list = self._join_set(item_list, length) # generate new invariant candidates for items in joined_item_list: if self._check_candi_valid(items, length, search_space): search_space.append(items) item_list = [] for item in joined_item_list: if tuple(item) in invariants_dict: continue if item not in search_space: continue if not self._check_candi_valid(tuple(item), length, search_space) and length > 2: search_space.remove(item) continue # an item must be superset of all other subitems in searchSpace, else skip validity, scaled_theta = self._check_invar_validity(X, item) if validity: self._prune(invariants_dict.keys(), set(item), search_space) invariants_dict[tuple(item)] = scaled_theta.tolist() search_space.remove(item) else: item_list.append(item) if len(invariants_dict) >= r: FLAG_break_loop = True break if FLAG_break_loop: break length += 1 print('Mined {} invariants: {}\n'.format(len(invariants_dict), invariants_dict)) self.invariants_dict = invariants_dict def _compute_eigenvector(self, X): """ calculate the smallest eigenvalue and corresponding eigenvector (theta in the paper) for a given sub_matrix Arguments --------- X: the event count matrix (each row is a log sequence vector, each column represents an event) Returns ------- min_vec: the eigenvector of corresponding minimum eigen value FLAG_contain_zero: whether the min_vec contains zero (very small value) """ FLAG_contain_zero = False count_zero = 0 dot_result = np.dot(X.T, X) U, S, V = np.linalg.svd(dot_result) min_vec = U[:, -1] count_zero = sum(np.fabs(min_vec) < 1e-6) if count_zero != 0: FLAG_contain_zero = True return min_vec, FLAG_contain_zero def _check_invar_validity(self, X, selected_columns): """ scale the eigenvector of float number into integer, and check whether the scaled number is valid Arguments --------- X: the event count matrix (each row is a log sequence vector, each column represents an event) selected_columns: select columns from all column list Returns ------- validity: whether the selected columns is valid scaled_theta: the scaled theta vector """ sub_matrix = X[:, selected_columns] inst_num = X.shape[0] validity = False min_theta, FLAG_contain_zero = self._compute_eigenvector(sub_matrix) abs_min_theta = [np.fabs(it) for it in min_theta] if FLAG_contain_zero: return validity, [] else: for i in self.scale_list: min_index = np.argmin(abs_min_theta) scale = float(i) / min_theta[min_index] scaled_theta = np.array([round(item * scale) for item in min_theta]) scaled_theta[min_index] = i scaled_theta = scaled_theta.T if 0 in np.fabs(scaled_theta): continue dot_submat_theta = np.dot(sub_matrix, scaled_theta) count_zero = 0 for j in dot_submat_theta: if np.fabs(j) < 1e-8: count_zero += 1 if count_zero >= self.percentage * inst_num: validity = True # print('A valid invariant is found: ',scaled_theta, selected_columns) break return validity, scaled_theta def _prune(self, valid_cols, new_item_set, search_space): """ prune invalid combination of columns Arguments --------- valid_cols: existing valid column list new_item_set: item set to be merged search_space: the search space that stores possible candidates """ if len(valid_cols) == 0: return for se in valid_cols: intersection = set(se) & new_item_set if len(intersection) == 0: continue union = set(se) | new_item_set for item in list(intersection): diff = sorted(list(union - set([item]))) if diff in search_space: search_space.remove(diff) def _join_set(self, item_list, length): """ Join a set with itself and returns the n-element (length) itemsets Arguments --------- item_list: current list of columns length: generate new items of length Returns ------- return_list: list of items of length-element """ set_len = len(item_list) return_list = [] for i in range(set_len): for j in range(i + 1, set_len): i_set = set(item_list[i]) j_set = set(item_list[j]) if len(i_set.union(j_set)) == length: joined = sorted(list(i_set.union(j_set))) if joined not in return_list: return_list.append(joined) return_list = sorted(return_list) return return_list def _check_candi_valid(self, item, length, search_space): """ check whether an item's subitems are in searchspace Arguments --------- item: item to be checked length: the length of item search_space: the search space that stores possible candidates Returns ------- True or False """ for subItem in combinations(item, length - 1): if sorted(list(subItem)) not in search_space: return False return True