In [8]:
import os
import json
import pickle
from time import time
from tqdm import tqdm
from typing import Union, List, Optional

import math
import torch
from torch import nn
from torch.nn import functional as F
from xgboost import XGBClassifier
import numpy as np
from nltk.tokenize import wordpunct_tokenize
from sklearn.metrics import roc_curve, accuracy_score, f1_score
from sklearn.utils import shuffle

import sys
ROOT = os.path.join("..", "..", "QuasarNix")
sys.path.append(ROOT)
# NOTE: src is from https://github.com/dtrizna/QuasarNix -- temp, will be as pip later
from src.data_utils import commands_to_loader, load_nl2bash
from src.preprocessors import CommandTokenizer, OneHotCustomVectorizer

TOKENIZER = wordpunct_tokenize
SEED = 33
VOCAB_SIZE = 4096
EMBEDDED_DIM = 64
DROPOUT = 0.5
MAX_LEN = 128

def lit_ckpt_to_torch(ckpt: str):
 """
 Convert a lightning checkpoint to a torch state dict
 """
 state_dict = torch.load(ckpt, map_location='cpu')['state_dict']
 
 for k, v in dict(state_dict).items():
 # lightning introduced model. prefix
 if k.startswith('model.'):
 state_dict[k[len('model.'):]] = v
 del state_dict[k]

 return state_dict

def load_torch(path: str, model: nn.Module):
 if path.endswith('.ckpt'):
 state_dict = lit_ckpt_to_torch(path)
 model.load_state_dict(state_dict)
 torch.save(model.state_dict(), path.replace('.ckpt', '.torch'))
 elif path.endswith('.torch'):
 state_dict = torch.load(path, map_location='cpu')
 model.load_state_dict(state_dict)
 else:
 raise ValueError('Unknown model format')
 return model

def load_xgb(path: str):
 if path.endswith('.pickle'):
 with open(path, 'rb') as f:
 model = pickle.load(f)
 model.save_model(path.replace('.pickle', '.xgboost'))
 elif path.endswith('.xgboost'):
 model = XGBClassifier(n_estimators=100, max_depth=10, random_state=SEED)
 model.load_model(path)
 else:
 raise ValueError('Unknown model format')
 return model

In [None]:
X_test_baseline_cmds = load_nl2bash()

# NOTE: malicious data is from https://huggingface.co/datasets/dtrizna/QuasarNix
# orig test
X_test_malicious_cmds_path = os.path.join(ROOT, "data", "X_test_malicious_cmd_orig.json")
with open(X_test_malicious_cmds_path, 'r') as f:
 X_test_malicious_cmds = json.load(f)

X_test_cmds_orig = X_test_malicious_cmds + X_test_baseline_cmds
y_test_orig = np.array([1]*len(X_test_malicious_cmds) + [0]*len(X_test_baseline_cmds))
X_test_cmds_orig, y_test_orig = shuffle(X_test_cmds_orig, y_test_orig, random_state=SEED)

# adversarial test
X_test_malicious_cmds_adv_path = os.path.join(ROOT, "data", "X_test_malicious_cmd_adv.json")
with open(X_test_malicious_cmds_adv_path, 'r') as f:
 X_test_malicious_cmds_adv = json.load(f)

X_test_cmds_adv = X_test_malicious_cmds_adv + X_test_baseline_cmds
y_test_adv = np.array([1]*len(X_test_malicious_cmds_adv) + [0]*len(X_test_baseline_cmds))
X_test_cmds_adv, y_test_adv = shuffle(X_test_cmds_adv, y_test_adv, random_state=SEED)

In [10]:
# TABULAR MODELS

oh_tokenizer_orig = OneHotCustomVectorizer(tokenizer=TOKENIZER, max_features=VOCAB_SIZE)
oh_tokenizer_orig.load_vocab("quasarnix_tokenizer_data_train_onehot_orig.json")

oh_tokenizer_adv = OneHotCustomVectorizer(tokenizer=TOKENIZER, max_features=VOCAB_SIZE)
oh_tokenizer_adv.load_vocab("quasarnix_tokenizer_data_train_onehot_adv.json")

oh_tokenizer_full = OneHotCustomVectorizer(tokenizer=TOKENIZER, max_features=VOCAB_SIZE)
oh_tokenizer_full.load_vocab("quasarnix_tokenizer_data_train_onehot_full.json")

xgb_model_path_orig = './quasarnix_model_data_train_xgb_orig.xgboost'
xgb_model_path_adv = './quasarnix_model_data_train_xgb_adv.xgboost'
xgb_model_path_full = './quasarnix_model_data_full_xgb_adv.xgboost'

mlp_model_path_orig = './quasarnix_model_data_train_mlp_orig.torch'
mlp_model_path_adv = './quasarnix_model_data_train_mlp_adv.torch'
mlp_model_path_full = './quasarnix_model_data_full_mlp_adv.torch'

# SEQUENTIAL EMBEDDING MODELS

vocab_path_orig = "./quasarnix_tokenizer_data_train_vocab_orig.json"
tokenizer_orig = CommandTokenizer(tokenizer_fn=TOKENIZER, vocab_size=VOCAB_SIZE, max_len=MAX_LEN)
tokenizer_orig.load_vocab(vocab_path_orig)

vocab_path_adv = "./quasarnix_tokenizer_data_train_vocab_adv.json"
tokenizer_adv = CommandTokenizer(tokenizer_fn=TOKENIZER, vocab_size=VOCAB_SIZE, max_len=MAX_LEN)
tokenizer_adv.load_vocab(vocab_path_adv)

vocab_path_full = "./quasarnix_tokenizer_data_full_vocab_adv.json"
tokenizer_full = CommandTokenizer(tokenizer_fn=TOKENIZER, vocab_size=VOCAB_SIZE, max_len=MAX_LEN)
tokenizer_full.load_vocab(vocab_path_full)

cnn_model_path_orig = './quasarnix_model_data_train_cnn_orig.torch'
cnn_model_path_adv = './quasarnix_model_data_train_cnn_adv.torch'
cnn_model_path_full = './quasarnix_model_data_full_cnn_adv.torch'

transformer_model_path_orig = './quasarnix_model_data_train_transformer_orig.torch'
transformer_model_path_adv = './quasarnix_model_data_train_transformer_adv.torch'
transformer_model_path_full = './quasarnix_model_data_full_transformer_adv.torch'

In [4]:
def get_preds(
 model: Union[XGBClassifier, nn.Module],
 X_cmds: List[str],
 y: np.ndarray,
 tokenizer: Union[OneHotCustomVectorizer, CommandTokenizer],
 threshold: Optional[float] = None
):
 now = time()
 if isinstance(model, XGBClassifier):
 print(f"[*] Working on {len(X_cmds)} samples", end='\r')
 X_encoded = tokenizer.transform(X_cmds)
 y_pred = model.predict_proba(X_encoded)[:, 1]
 elif isinstance(model, nn.Module):
 print(f"[*] Building DataLoader for {len(X_cmds)} samples", end='\r')
 loader = commands_to_loader(X_cmds, tokenizer, y=y, workers=4, batch_size=256)
 model = model.to('cuda')
 model.eval()
 with torch.no_grad():
 y_pred = []
 for (x, _) in tqdm(loader, desc="[*] Predicting", total=math.ceil(len(X_cmds)/256)):
 x = x.to('cuda')
 y_pred.append(model(x).cpu().numpy())
 y_pred = np.concatenate(y_pred).flatten()
 else:
 raise ValueError("Unknown model type")
 
 if threshold is not None:
 y_pred = y_pred > threshold
 acc = accuracy_score(y, y_pred > 0.5)
 f1 = f1_score(y, y_pred > 0.5)
 print(f"[!] Accuracy: {acc*100:.3f}%, F1: {f1*100:.3f}% | Took {time()-now:.2f}s")
 
 return y_pred, y

def score_both_sets(model, tokenizer):
 print("Original Test Set:")
 y_pred_orig, y_true_orig = get_preds(model, X_test_cmds_orig, y_test_orig, tokenizer, threshold=0.5)
 print("\nAdversarial Test Set:")
 y_pred_adv, y_true_adv = get_preds(model, X_test_cmds_adv, y_test_adv, tokenizer, threshold=0.5)
 return y_pred_orig, y_true_orig, y_pred_adv, y_true_adv

def plot_roc(y_true, y_pred, ax):
 fpr, tpr, _ = roc_curve(y_true, y_pred)
 ax.plot(fpr, tpr)
 return ax

## Gradient Boosted Decision Trees (GBDT) with XGBoost

In [5]:
xgb_orig = load_xgb(xgb_model_path_orig)
_ = score_both_sets(xgb_orig, oh_tokenizer_orig)

# see params
# print(xgb_full_adv.get_booster().get_dump()[0])

Original Test Set:
[!] Accuracy: 99.968%, F1: 99.968% | Took 25.27s

Adversarial Test Set:
[!] Accuracy: 83.418%, F1: 80.123% | Took 31.22s


In [6]:
xgb_adv = load_xgb(xgb_model_path_adv)
_ = score_both_sets(xgb_adv, oh_tokenizer_adv)

Original Test Set:
[*] Working on 470129 samples

[!] Accuracy: 99.954%, F1: 99.954% | Took 24.01s

Adversarial Test Set:
[!] Accuracy: 99.975%, F1: 99.975% | Took 32.38s


In [7]:
xgb_full_adv = load_xgb(xgb_model_path_full)
_ = score_both_sets(xgb_full_adv, oh_tokenizer_full)

Original Test Set:
[*] Working on 470129 samples

[!] Accuracy: 100.000%, F1: 100.000% | Took 27.10s

Adversarial Test Set:
[!] Accuracy: 100.000%, F1: 100.000% | Took 31.26s


## Tabular Fully Connected Neural Network (aka MLP) with PyTorch

In [8]:
class SimpleMLP(nn.Module):
 def __init__(self, input_dim, output_dim, hidden_dim=[32], dropout=None):
 if isinstance(hidden_dim, int):
 hidden_dim = [hidden_dim]
 
 super().__init__()
 layers = []
 prev_dim = input_dim
 
 # Dynamically create hidden layers based on hidden_dim
 for h_dim in hidden_dim:
 layers.append(nn.Linear(prev_dim, h_dim))
 layers.append(nn.ReLU())
 if dropout:
 layers.append(nn.Dropout(dropout))
 prev_dim = h_dim
 
 layers.append(nn.Linear(prev_dim, output_dim))
 self.model = nn.Sequential(*layers)
 
 def forward(self, x):
 return self.model(x)


mlp_orig = SimpleMLP(
 input_dim=VOCAB_SIZE,
 output_dim=1,
 hidden_dim=[64, 32],
 dropout=DROPOUT
) # 264 K params


mlp_orig = load_torch(mlp_model_path_orig, mlp_orig)
_ = score_both_sets(mlp_orig, oh_tokenizer_orig)

# see params
# mlp_orig.state_dict()


Original Test Set:
[*] Building DataLoader for 470129 samples

[*] Predicting: 100%|██████████| 1837/1837 [00:10<00:00, 181.18it/s]


[!] Accuracy: 99.930%, F1: 99.930% | Took 34.68s

Adversarial Test Set:
[*] Building DataLoader for 470129 samples

[*] Predicting: 100%|██████████| 1837/1837 [00:10<00:00, 170.18it/s]


[!] Accuracy: 89.179%, F1: 87.868% | Took 42.86s


In [9]:
mlp_adv = SimpleMLP(
 input_dim=VOCAB_SIZE,
 output_dim=1,
 hidden_dim=[64, 32],
 dropout=DROPOUT
) # 264 K params

mlp_adv = load_torch(mlp_model_path_adv, mlp_adv)
_ = score_both_sets(mlp_adv, oh_tokenizer_adv)

Original Test Set:
[*] Building DataLoader for 470129 samples

[*] Predicting: 100%|██████████| 1837/1837 [00:11<00:00, 158.51it/s]


[!] Accuracy: 99.943%, F1: 99.943% | Took 34.79s

Adversarial Test Set:
[*] Building DataLoader for 470129 samples

[*] Predicting: 100%|██████████| 1837/1837 [00:11<00:00, 157.97it/s]


[!] Accuracy: 99.968%, F1: 99.968% | Took 44.49s


In [10]:
mlp_full_adv = SimpleMLP(
 input_dim=VOCAB_SIZE,
 output_dim=1,
 hidden_dim=[64, 32],
 dropout=DROPOUT
) # 264 K params

mlp_full_adv = load_torch(mlp_model_path_full, mlp_full_adv)
_ = score_both_sets(mlp_full_adv, oh_tokenizer_full)

Original Test Set:
[*] Building DataLoader for 470129 samples

[*] Predicting: 100%|██████████| 1837/1837 [00:11<00:00, 155.91it/s]


[!] Accuracy: 99.999%, F1: 99.999% | Took 35.86s

Adversarial Test Set:
[*] Building DataLoader for 470129 samples

[*] Predicting: 100%|██████████| 1837/1837 [00:10<00:00, 169.64it/s]


[!] Accuracy: 99.999%, F1: 99.999% | Took 43.86s


## 1D Convolutional Neural Network with PyTorch

In [11]:
class CNN1DGroupedModel(nn.Module):
 def __init__(self, vocab_size, embed_dim, num_channels, kernel_sizes, mlp_hidden_dims, output_dim, dropout=None):
 super().__init__()
 
 self.embedding = nn.Embedding(vocab_size, embed_dim)
 self.grouped_convs = nn.ModuleList([nn.Conv1d(embed_dim, num_channels, kernel) for kernel in kernel_sizes])
 
 mlp_input_dim = num_channels * len(kernel_sizes)
 self.mlp = SimpleMLP(input_dim=mlp_input_dim, output_dim=output_dim, hidden_dim=mlp_hidden_dims, dropout=dropout)

 @staticmethod
 def conv_and_pool(x, conv):
 conv_out = conv(x)
 pooled = F.max_pool1d(conv_out, conv_out.size(2)).squeeze(2)
 return pooled
 
 def forward(self, x):
 x = self.embedding(x).transpose(1, 2)
 conv_outputs = [self.conv_and_pool(x, conv) for conv in self.grouped_convs]

 x = torch.cat(conv_outputs, dim=1)
 return self.mlp(x)


cnn_orig = CNN1DGroupedModel(
 vocab_size=VOCAB_SIZE,
 embed_dim=EMBEDDED_DIM,
 num_channels=32,
 kernel_sizes=[2, 3, 4, 5],
 mlp_hidden_dims=[64, 32],
 output_dim=1,
 dropout=DROPOUT
) # 301 K params

cnn_orig = load_torch(cnn_model_path_orig, cnn_orig)
_ = score_both_sets(cnn_orig, tokenizer_orig)

Original Test Set:
[*] Building DataLoader for 470129 samples

[*] Predicting: 100%|██████████| 1837/1837 [00:02<00:00, 644.75it/s] 


[!] Accuracy: 97.619%, F1: 97.561% | Took 12.85s

Adversarial Test Set:
[*] Building DataLoader for 470129 samples

[*] Predicting: 100%|██████████| 1837/1837 [00:02<00:00, 716.00it/s]


[!] Accuracy: 77.520%, F1: 71.001% | Took 15.09s


In [12]:
cnn_adv = CNN1DGroupedModel(
 vocab_size=VOCAB_SIZE,
 embed_dim=EMBEDDED_DIM,
 num_channels=32,
 kernel_sizes=[2, 3, 4, 5],
 mlp_hidden_dims=[64, 32],
 output_dim=1,
 dropout=DROPOUT
) # 301 K params

cnn_adv = load_torch(cnn_model_path_adv, cnn_adv)
_ = score_both_sets(cnn_adv, tokenizer_adv)

Original Test Set:
[*] Building DataLoader for 470129 samples

[*] Predicting: 100%|██████████| 1837/1837 [00:03<00:00, 605.99it/s]


[!] Accuracy: 99.992%, F1: 99.992% | Took 13.84s

Adversarial Test Set:
[*] Building DataLoader for 470129 samples

[*] Predicting: 100%|██████████| 1837/1837 [00:02<00:00, 653.34it/s]


[!] Accuracy: 99.995%, F1: 99.995% | Took 14.61s


In [13]:
cnn_full_adv = CNN1DGroupedModel(
 vocab_size=VOCAB_SIZE,
 embed_dim=EMBEDDED_DIM,
 num_channels=32,
 kernel_sizes=[2, 3, 4, 5],
 mlp_hidden_dims=[64, 32],
 output_dim=1,
 dropout=DROPOUT
) # 301 K params

cnn_full_adv = load_torch(cnn_model_path_full, cnn_full_adv)
_ = score_both_sets(cnn_full_adv, tokenizer_full)

Original Test Set:
[*] Building DataLoader for 470129 samples

[*] Predicting: 100%|██████████| 1837/1837 [00:02<00:00, 736.91it/s]


[!] Accuracy: 99.999%, F1: 99.999% | Took 12.76s

Adversarial Test Set:
[*] Building DataLoader for 470129 samples

[*] Predicting: 100%|██████████| 1837/1837 [00:02<00:00, 625.40it/s]


[!] Accuracy: 99.999%, F1: 99.999% | Took 14.06s


## Transformer Encoder for Classification

In [14]:
class PositionalEncoding(nn.Module):
 def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
 super().__init__()
 self.dropout = nn.Dropout(p=dropout)

 # Initialize pe with shape [1, max_len, d_model] for broadcasting
 pe = torch.zeros(1, max_len, d_model)
 position = torch.arange(max_len).unsqueeze(1)
 div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
 pe[0, :, 0::2] = torch.sin(position * div_term)
 pe[0, :, 1::2] = torch.cos(position * div_term)
 self.register_buffer('pe', pe)

 def forward(self, x):
 """
 Args:
 x: Tensor, shape [batch_size, seq_len, embedding_dim]
 """
 # Use broadcasting to add positional encoding
 x = x + self.pe[:, :x.size(1), :]
 return self.dropout(x)

class BaseTransformerEncoder(nn.Module):
 def __init__(self, vocab_size, d_model, nhead, num_layers, dim_feedforward, max_len, dropout=None):
 super(BaseTransformerEncoder, self).__init__()
 
 assert d_model % nhead == 0, "nheads must divide evenly into d_model"
 self.embedding = nn.Embedding(vocab_size, d_model)
 self.pos_encoder = PositionalEncoding(d_model, dropout, max_len=max_len)
 encoder_layer = nn.TransformerEncoderLayer(
 d_model,
 nhead,
 dim_feedforward,
 dropout,
 norm_first=True,
 batch_first=True
 )
 self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)

 def encode(self, src, src_mask=None, src_key_padding_mask=None):
 src = self.embedding(src) * math.sqrt(self.embedding.embedding_dim)
 src = self.pos_encoder(src)
 return self.transformer_encoder(src, mask=src_mask, src_key_padding_mask=src_key_padding_mask)

class CLSTransformerEncoder(BaseTransformerEncoder):
 def __init__(self, mlp_hidden_dims, output_dim, *args, **kwargs):
 kwargs["max_len"] += 1 # to account for CLS token
 super(CLSTransformerEncoder, self).__init__(*args, **kwargs)
 self.cls_token = nn.Parameter(torch.randn(1, 1, self.embedding.embedding_dim))
 self.decoder = SimpleMLP(input_dim=self.embedding.embedding_dim, output_dim=output_dim, hidden_dim=mlp_hidden_dims, dropout=kwargs.get("dropout"))

 def forward(self, src, src_mask=None, src_key_padding_mask=None):
 # Embed the src token indices
 src = self.embedding(src) * math.sqrt(self.embedding.embedding_dim)
 
 # Repeat the cls_token for every item in the batch and concatenate it to src
 cls_tokens = self.cls_token.repeat(src.size(0), 1, 1)
 src = torch.cat([cls_tokens, src], dim=1)
 
 # Add positional encoding
 src = self.pos_encoder(src)
 
 # Pass through transformer encoder
 output = self.transformer_encoder(src, mask=src_mask, src_key_padding_mask=src_key_padding_mask)
 
 # Extract the encoding corresponding to the cls_token
 output = output[:, 0, :] # [B, E]
 
 return self.decoder(output)


class MeanTransformerEncoder(BaseTransformerEncoder):
 def __init__(self, mlp_hidden_dims, output_dim, *args, **kwargs):
 super(MeanTransformerEncoder, self).__init__(*args, **kwargs)
 self.decoder = SimpleMLP(input_dim=self.embedding.embedding_dim, output_dim=output_dim, hidden_dim=mlp_hidden_dims, dropout=kwargs.get("dropout"))

 def forward(self, src, src_mask=None, src_key_padding_mask=None):
 output = self.encode(src, src_mask, src_key_padding_mask)
 output = output.mean(dim=1)
 return self.decoder(output)

In [15]:
transformer_orig = CLSTransformerEncoder(
 vocab_size=VOCAB_SIZE,
 d_model=EMBEDDED_DIM,
 nhead=4,
 num_layers=2,
 dim_feedforward=128,
 max_len=128,
 dropout=DROPOUT,
 mlp_hidden_dims=[64, 32],
 output_dim=1
) # 335 K params

transformer_orig = load_torch(transformer_model_path_orig, transformer_orig)
_ = score_both_sets(transformer_orig, tokenizer_orig)

Original Test Set:
[*] Building DataLoader for 470129 samples



[*] Predicting: 100%|██████████| 1837/1837 [00:04<00:00, 387.08it/s]


[!] Accuracy: 99.696%, F1: 99.696% | Took 15.01s

Adversarial Test Set:
[*] Building DataLoader for 470129 samples

[*] Predicting: 100%|██████████| 1837/1837 [00:07<00:00, 234.15it/s]


[!] Accuracy: 86.388%, F1: 84.245% | Took 20.70s


In [16]:
transformer_adv = CLSTransformerEncoder(
 vocab_size=VOCAB_SIZE,
 d_model=EMBEDDED_DIM,
 nhead=4,
 num_layers=2,
 dim_feedforward=128,
 max_len=128,
 dropout=DROPOUT,
 mlp_hidden_dims=[64, 32],
 output_dim=1
) # 335 K params

transformer_adv = load_torch(transformer_model_path_adv, transformer_adv)
_ = score_both_sets(transformer_adv, tokenizer_adv)

Original Test Set:
[*] Building DataLoader for 470129 samples



[*] Predicting: 100%|██████████| 1837/1837 [00:05<00:00, 307.70it/s]


[!] Accuracy: 99.749%, F1: 99.750% | Took 17.17s

Adversarial Test Set:
[*] Building DataLoader for 470129 samples

[*] Predicting: 100%|██████████| 1837/1837 [00:05<00:00, 330.66it/s]


[!] Accuracy: 99.749%, F1: 99.750% | Took 18.48s


In [17]:
transformer_full_adv = CLSTransformerEncoder(
 vocab_size=VOCAB_SIZE,
 d_model=EMBEDDED_DIM,
 nhead=4,
 num_layers=2,
 dim_feedforward=128,
 max_len=128,
 dropout=DROPOUT,
 mlp_hidden_dims=[64, 32],
 output_dim=1
) # 335 K params

transformer_full_adv = load_torch(transformer_model_path_full, transformer_full_adv)
_ = score_both_sets(transformer_full_adv, tokenizer_full)

Original Test Set:
[*] Building DataLoader for 470129 samples



[*] Predicting: 100%|██████████| 1837/1837 [00:05<00:00, 334.43it/s]


[!] Accuracy: 99.997%, F1: 99.997% | Took 15.61s

Adversarial Test Set:
[*] Building DataLoader for 470129 samples

[*] Predicting: 100%|██████████| 1837/1837 [00:05<00:00, 334.75it/s]


[!] Accuracy: 99.997%, F1: 99.997% | Took 18.08s
