Runtime error
Runtime error
File size: 11,065 Bytes
9c323ee |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 |
The implementation of Invariants Mining model for anomaly detection.
LogPAI Team
[1] Jian-Guang Lou, Qiang Fu, Shengqi Yang, Ye Xu, Jiang Li. Mining Invariants
from Console Logs for System Problem Detection. USENIX Annual Technical
Conference (ATC), 2010.
import numpy as np
from itertools import combinations
from ..utils import metrics
class InvariantsMiner(object):
def __init__(self, percentage=0.98, epsilon=0.5, longest_invarant=None, scale_list=[1,2,3]):
""" The Invariants Mining model for anomaly detection
percentage: float, percentage of samples satisfying the condition that |X_j * V_i| < epsilon
epsilon: float, the threshold for estimating the invariant space
longest_invarant: int, the specified maximal length of invariant, default to None. Stop
searching when the invariant length is larger than longest_invarant.
scale_list: list, the list used to scale the theta of float into integer
invariants_dict: dict, dictionary of invariants where key is the selected columns
and value is the weights the of invariant
self.percentage = percentage
self.epsilon = epsilon
self.longest_invarant = longest_invarant
self.scale_list = scale_list
self.invariants_dict = None
def fit(self, X):
X: ndarray, the event count matrix of shape num_instances-by-num_events
print('====== Model summary ======')
invar_dim = self._estimate_invarant_space(X)
self._invariants_search(X, invar_dim)
def predict(self, X):
""" Predict anomalies with mined invariants
X: the input event count matrix
y_pred: ndarray, the predicted label vector of shape (num_instances,)
y_sum = np.zeros(X.shape[0])
for cols, theta in self.invariants_dict.items():
y_sum += np.fabs([:, cols], np.array(theta)))
y_pred = (y_sum > 1e-6).astype(int)
return y_pred
def evaluate(self, X, y_true):
print('====== Evaluation summary ======')
y_pred = self.predict(X)
precision, recall, f1 = metrics(y_pred, y_true)
print('Precision: {:.3f}, recall: {:.3f}, F1-measure: {:.3f}\n'.format(precision, recall, f1))
return precision, recall, f1
def _estimate_invarant_space(self, X):
""" Estimate the dimension of invariant space using SVD decomposition
X: ndarray, the event count matrix of shape num_instances-by-num_events
percentage: float, percentage of samples satisfying the condition that |X_j * V_i| < epsilon
epsilon: float, the threshold for estimating the invariant space
r: the dimension of invariant space
covariance_matrix =, X)
U, sigma, V = np.linalg.svd(covariance_matrix) # SVD decomposition
# Start from the right most column of matrix V, sigular values are in ascending order
num_instances, num_events = X.shape
r = 0
for i in range(num_events - 1, -1, -1):
zero_count = sum(abs(, U[:, i])) < self.epsilon)
if zero_count / float(num_instances) < self.percentage:
r += 1
print('Invariant space dimension: {}'.format(r))
return r
def _invariants_search(self, X, r):
""" Mine invariant relationships from X
X: ndarray, the event count matrix of shape num_instances-by-num_events
r: the dimension of invariant space
num_instances, num_events = X.shape
invariants_dict = dict() # save the mined Invariants(value) and its corresponding columns(key)
search_space = [] # only invariant candidates in this list are valid
# invariant of only one column (all zero columns)
init_cols = sorted([[item] for item in range(num_events)])
for col in init_cols:
init_col_list = init_cols[:]
for col in init_cols:
if np.count_nonzero(X[:, col]) == 0:
invariants_dict[tuple(col)] = [1]
item_list = init_col_list
length = 2
FLAG_break_loop = False
# check invariant of more columns
while len(item_list) != 0:
if self.longest_invarant and len(item_list[0]) >= self.longest_invarant:
joined_item_list = self._join_set(item_list, length) # generate new invariant candidates
for items in joined_item_list:
if self._check_candi_valid(items, length, search_space):
item_list = []
for item in joined_item_list:
if tuple(item) in invariants_dict:
if item not in search_space:
if not self._check_candi_valid(tuple(item), length, search_space) and length > 2:
continue # an item must be superset of all other subitems in searchSpace, else skip
validity, scaled_theta = self._check_invar_validity(X, item)
if validity:
self._prune(invariants_dict.keys(), set(item), search_space)
invariants_dict[tuple(item)] = scaled_theta.tolist()
if len(invariants_dict) >= r:
FLAG_break_loop = True
if FLAG_break_loop:
length += 1
print('Mined {} invariants: {}\n'.format(len(invariants_dict), invariants_dict))
self.invariants_dict = invariants_dict
def _compute_eigenvector(self, X):
""" calculate the smallest eigenvalue and corresponding eigenvector (theta in the paper)
for a given sub_matrix
X: the event count matrix (each row is a log sequence vector, each column represents an event)
min_vec: the eigenvector of corresponding minimum eigen value
FLAG_contain_zero: whether the min_vec contains zero (very small value)
FLAG_contain_zero = False
count_zero = 0
dot_result =, X)
U, S, V = np.linalg.svd(dot_result)
min_vec = U[:, -1]
count_zero = sum(np.fabs(min_vec) < 1e-6)
if count_zero != 0:
FLAG_contain_zero = True
return min_vec, FLAG_contain_zero
def _check_invar_validity(self, X, selected_columns):
""" scale the eigenvector of float number into integer, and check whether the scaled number is valid
X: the event count matrix (each row is a log sequence vector, each column represents an event)
selected_columns: select columns from all column list
validity: whether the selected columns is valid
scaled_theta: the scaled theta vector
sub_matrix = X[:, selected_columns]
inst_num = X.shape[0]
validity = False
min_theta, FLAG_contain_zero = self._compute_eigenvector(sub_matrix)
abs_min_theta = [np.fabs(it) for it in min_theta]
if FLAG_contain_zero:
return validity, []
for i in self.scale_list:
min_index = np.argmin(abs_min_theta)
scale = float(i) / min_theta[min_index]
scaled_theta = np.array([round(item * scale) for item in min_theta])
scaled_theta[min_index] = i
scaled_theta = scaled_theta.T
if 0 in np.fabs(scaled_theta):
dot_submat_theta =, scaled_theta)
count_zero = 0
for j in dot_submat_theta:
if np.fabs(j) < 1e-8:
count_zero += 1
if count_zero >= self.percentage * inst_num:
validity = True
# print('A valid invariant is found: ',scaled_theta, selected_columns)
return validity, scaled_theta
def _prune(self, valid_cols, new_item_set, search_space):
""" prune invalid combination of columns
valid_cols: existing valid column list
new_item_set: item set to be merged
search_space: the search space that stores possible candidates
if len(valid_cols) == 0:
for se in valid_cols:
intersection = set(se) & new_item_set
if len(intersection) == 0:
union = set(se) | new_item_set
for item in list(intersection):
diff = sorted(list(union - set([item])))
if diff in search_space:
def _join_set(self, item_list, length):
""" Join a set with itself and returns the n-element (length) itemsets
item_list: current list of columns
length: generate new items of length
return_list: list of items of length-element
set_len = len(item_list)
return_list = []
for i in range(set_len):
for j in range(i + 1, set_len):
i_set = set(item_list[i])
j_set = set(item_list[j])
if len(i_set.union(j_set)) == length:
joined = sorted(list(i_set.union(j_set)))
if joined not in return_list:
return_list = sorted(return_list)
return return_list
def _check_candi_valid(self, item, length, search_space):
""" check whether an item's subitems are in searchspace
item: item to be checked
length: the length of item
search_space: the search space that stores possible candidates
True or False
for subItem in combinations(item, length - 1):
if sorted(list(subItem)) not in search_space:
return False
return True