Spaces:
Runtime error
Runtime error
""" | |
The implementation of Invariants Mining model for anomaly detection. | |
Authors: | |
LogPAI Team | |
Reference: | |
[1] Jian-Guang Lou, Qiang Fu, Shengqi Yang, Ye Xu, Jiang Li. Mining Invariants | |
from Console Logs for System Problem Detection. USENIX Annual Technical | |
Conference (ATC), 2010. | |
""" | |
import numpy as np | |
from itertools import combinations | |
from ..utils import metrics | |
class InvariantsMiner(object): | |
def __init__(self, percentage=0.98, epsilon=0.5, longest_invarant=None, scale_list=[1,2,3]): | |
""" The Invariants Mining model for anomaly detection | |
Attributes | |
---------- | |
percentage: float, percentage of samples satisfying the condition that |X_j * V_i| < epsilon | |
epsilon: float, the threshold for estimating the invariant space | |
longest_invarant: int, the specified maximal length of invariant, default to None. Stop | |
searching when the invariant length is larger than longest_invarant. | |
scale_list: list, the list used to scale the theta of float into integer | |
invariants_dict: dict, dictionary of invariants where key is the selected columns | |
and value is the weights the of invariant | |
""" | |
self.percentage = percentage | |
self.epsilon = epsilon | |
self.longest_invarant = longest_invarant | |
self.scale_list = scale_list | |
self.invariants_dict = None | |
def fit(self, X): | |
""" | |
Arguments | |
--------- | |
X: ndarray, the event count matrix of shape num_instances-by-num_events | |
""" | |
print('====== Model summary ======') | |
invar_dim = self._estimate_invarant_space(X) | |
self._invariants_search(X, invar_dim) | |
def predict(self, X): | |
""" Predict anomalies with mined invariants | |
Arguments | |
--------- | |
X: the input event count matrix | |
Returns | |
------- | |
y_pred: ndarray, the predicted label vector of shape (num_instances,) | |
""" | |
y_sum = np.zeros(X.shape[0]) | |
for cols, theta in self.invariants_dict.items(): | |
y_sum += np.fabs(np.dot(X[:, cols], np.array(theta))) | |
y_pred = (y_sum > 1e-6).astype(int) | |
return y_pred | |
def evaluate(self, X, y_true): | |
print('====== Evaluation summary ======') | |
y_pred = self.predict(X) | |
precision, recall, f1 = metrics(y_pred, y_true) | |
print('Precision: {:.3f}, recall: {:.3f}, F1-measure: {:.3f}\n'.format(precision, recall, f1)) | |
return precision, recall, f1 | |
def _estimate_invarant_space(self, X): | |
""" Estimate the dimension of invariant space using SVD decomposition | |
Arguments | |
--------- | |
X: ndarray, the event count matrix of shape num_instances-by-num_events | |
percentage: float, percentage of samples satisfying the condition that |X_j * V_i| < epsilon | |
epsilon: float, the threshold for estimating the invariant space | |
Returns | |
------- | |
r: the dimension of invariant space | |
""" | |
covariance_matrix = np.dot(X.T, X) | |
U, sigma, V = np.linalg.svd(covariance_matrix) # SVD decomposition | |
# Start from the right most column of matrix V, sigular values are in ascending order | |
num_instances, num_events = X.shape | |
r = 0 | |
for i in range(num_events - 1, -1, -1): | |
zero_count = sum(abs(np.dot(X, U[:, i])) < self.epsilon) | |
if zero_count / float(num_instances) < self.percentage: | |
break | |
r += 1 | |
print('Invariant space dimension: {}'.format(r)) | |
return r | |
def _invariants_search(self, X, r): | |
""" Mine invariant relationships from X | |
Arguments | |
--------- | |
X: ndarray, the event count matrix of shape num_instances-by-num_events | |
r: the dimension of invariant space | |
""" | |
num_instances, num_events = X.shape | |
invariants_dict = dict() # save the mined Invariants(value) and its corresponding columns(key) | |
search_space = [] # only invariant candidates in this list are valid | |
# invariant of only one column (all zero columns) | |
init_cols = sorted([[item] for item in range(num_events)]) | |
for col in init_cols: | |
search_space.append(col) | |
init_col_list = init_cols[:] | |
for col in init_cols: | |
if np.count_nonzero(X[:, col]) == 0: | |
invariants_dict[tuple(col)] = [1] | |
search_space.remove(col) | |
init_col_list.remove(col) | |
item_list = init_col_list | |
length = 2 | |
FLAG_break_loop = False | |
# check invariant of more columns | |
while len(item_list) != 0: | |
if self.longest_invarant and len(item_list[0]) >= self.longest_invarant: | |
break | |
joined_item_list = self._join_set(item_list, length) # generate new invariant candidates | |
for items in joined_item_list: | |
if self._check_candi_valid(items, length, search_space): | |
search_space.append(items) | |
item_list = [] | |
for item in joined_item_list: | |
if tuple(item) in invariants_dict: | |
continue | |
if item not in search_space: | |
continue | |
if not self._check_candi_valid(tuple(item), length, search_space) and length > 2: | |
search_space.remove(item) | |
continue # an item must be superset of all other subitems in searchSpace, else skip | |
validity, scaled_theta = self._check_invar_validity(X, item) | |
if validity: | |
self._prune(invariants_dict.keys(), set(item), search_space) | |
invariants_dict[tuple(item)] = scaled_theta.tolist() | |
search_space.remove(item) | |
else: | |
item_list.append(item) | |
if len(invariants_dict) >= r: | |
FLAG_break_loop = True | |
break | |
if FLAG_break_loop: | |
break | |
length += 1 | |
print('Mined {} invariants: {}\n'.format(len(invariants_dict), invariants_dict)) | |
self.invariants_dict = invariants_dict | |
def _compute_eigenvector(self, X): | |
""" calculate the smallest eigenvalue and corresponding eigenvector (theta in the paper) | |
for a given sub_matrix | |
Arguments | |
--------- | |
X: the event count matrix (each row is a log sequence vector, each column represents an event) | |
Returns | |
------- | |
min_vec: the eigenvector of corresponding minimum eigen value | |
FLAG_contain_zero: whether the min_vec contains zero (very small value) | |
""" | |
FLAG_contain_zero = False | |
count_zero = 0 | |
dot_result = np.dot(X.T, X) | |
U, S, V = np.linalg.svd(dot_result) | |
min_vec = U[:, -1] | |
count_zero = sum(np.fabs(min_vec) < 1e-6) | |
if count_zero != 0: | |
FLAG_contain_zero = True | |
return min_vec, FLAG_contain_zero | |
def _check_invar_validity(self, X, selected_columns): | |
""" scale the eigenvector of float number into integer, and check whether the scaled number is valid | |
Arguments | |
--------- | |
X: the event count matrix (each row is a log sequence vector, each column represents an event) | |
selected_columns: select columns from all column list | |
Returns | |
------- | |
validity: whether the selected columns is valid | |
scaled_theta: the scaled theta vector | |
""" | |
sub_matrix = X[:, selected_columns] | |
inst_num = X.shape[0] | |
validity = False | |
min_theta, FLAG_contain_zero = self._compute_eigenvector(sub_matrix) | |
abs_min_theta = [np.fabs(it) for it in min_theta] | |
if FLAG_contain_zero: | |
return validity, [] | |
else: | |
for i in self.scale_list: | |
min_index = np.argmin(abs_min_theta) | |
scale = float(i) / min_theta[min_index] | |
scaled_theta = np.array([round(item * scale) for item in min_theta]) | |
scaled_theta[min_index] = i | |
scaled_theta = scaled_theta.T | |
if 0 in np.fabs(scaled_theta): | |
continue | |
dot_submat_theta = np.dot(sub_matrix, scaled_theta) | |
count_zero = 0 | |
for j in dot_submat_theta: | |
if np.fabs(j) < 1e-8: | |
count_zero += 1 | |
if count_zero >= self.percentage * inst_num: | |
validity = True | |
# print('A valid invariant is found: ',scaled_theta, selected_columns) | |
break | |
return validity, scaled_theta | |
def _prune(self, valid_cols, new_item_set, search_space): | |
""" prune invalid combination of columns | |
Arguments | |
--------- | |
valid_cols: existing valid column list | |
new_item_set: item set to be merged | |
search_space: the search space that stores possible candidates | |
""" | |
if len(valid_cols) == 0: | |
return | |
for se in valid_cols: | |
intersection = set(se) & new_item_set | |
if len(intersection) == 0: | |
continue | |
union = set(se) | new_item_set | |
for item in list(intersection): | |
diff = sorted(list(union - set([item]))) | |
if diff in search_space: | |
search_space.remove(diff) | |
def _join_set(self, item_list, length): | |
""" Join a set with itself and returns the n-element (length) itemsets | |
Arguments | |
--------- | |
item_list: current list of columns | |
length: generate new items of length | |
Returns | |
------- | |
return_list: list of items of length-element | |
""" | |
set_len = len(item_list) | |
return_list = [] | |
for i in range(set_len): | |
for j in range(i + 1, set_len): | |
i_set = set(item_list[i]) | |
j_set = set(item_list[j]) | |
if len(i_set.union(j_set)) == length: | |
joined = sorted(list(i_set.union(j_set))) | |
if joined not in return_list: | |
return_list.append(joined) | |
return_list = sorted(return_list) | |
return return_list | |
def _check_candi_valid(self, item, length, search_space): | |
""" check whether an item's subitems are in searchspace | |
Arguments | |
--------- | |
item: item to be checked | |
length: the length of item | |
search_space: the search space that stores possible candidates | |
Returns | |
------- | |
True or False | |
""" | |
for subItem in combinations(item, length - 1): | |
if sorted(list(subItem)) not in search_space: | |
return False | |
return True | |