Spaces:
Runtime error
Runtime error
| from keras.layers import Input, Dense, Flatten | |
| from keras.models import Model | |
| from Database import Database | |
| import numpy as np, json | |
| import matplotlib.pyplot as plt | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import mean_squared_error | |
| from dotenv import dotenv_values | |
| import pandas as pd | |
| # from tensorflow.python.ops.confusion_matrix import confusion_matrix | |
| from sklearn.metrics import precision_recall_fscore_support | |
| class Autoencoder: | |
| def __get_autoencoder(self, input_dim) -> Model: | |
| input_shape = (input_dim,) | |
| input_layer = Input(shape=input_shape) | |
| # Encoder layers | |
| encoder = Flatten()(input_layer) | |
| encoder = Dense(128, activation='relu')(encoder) | |
| encoder = Dense(64, activation='relu')(encoder) | |
| # encoder = Dense(32, activation='relu')(encoder) | |
| # Decoder layers | |
| # decoder = Dense(64, activation='relu')(encoder) | |
| decoder = Dense(128, activation='relu')(encoder) #decoder | |
| decoder = Dense(input_dim, activation='sigmoid')(decoder) | |
| # Autoencoder model | |
| autoencoder = Model(inputs=input_layer, outputs=decoder) | |
| # autoencoder.compile(optimizer='adam', loss='binary_crossentropy') | |
| autoencoder.compile(optimizer='adam', loss='mse') | |
| return autoencoder | |
| def __print_summary(self, model: Model): | |
| print(model.summary()) | |
| return | |
| def __fit_autoencoder(self,epochs,batch_size,model: Model, train_var,valid_var=None): | |
| history = model.fit(train_var,train_var, | |
| # validation_data=(valid_var,valid_var), | |
| epochs=epochs,batch_size=batch_size) | |
| return history, model | |
| def __split_train_test_val(self, data): | |
| train_array, test_array = train_test_split(data,test_size=0.2,random_state=42) | |
| train_array, valid_array = train_test_split(train_array,test_size=0.1,random_state=42) | |
| return train_array, valid_array, test_array | |
| def __compute_metrics(conf_matrix): | |
| precision = conf_matrix[1][1] / (conf_matrix[1][1] + conf_matrix[0][1]) | |
| if precision==1: | |
| print(conf_matrix) | |
| recall = conf_matrix[1][1] / (conf_matrix[1][1] + conf_matrix[1][0]) | |
| f1 = (2 * precision * recall) / (precision + recall) | |
| # print("precision: " + str(precision) + ", recall: " + str(recall) + ", f1: " + str(f1)) | |
| return precision, recall, f1 | |
| def __find_optimal_modified(self,error_df: pd.DataFrame, steps=50): | |
| min_error, max_error = error_df["Reconstruction_error"].min(), error_df["Reconstruction_error"].max() | |
| optimal_threshold = (min_error+max_error)/2 | |
| y_pred = [0 if e > optimal_threshold else 1 for e in error_df.Reconstruction_error.values] | |
| precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='macro') | |
| return optimal_threshold, precision, recall, f1 | |
| def __find_optimal(self,error_df: pd.DataFrame, steps=50): | |
| min_error, max_error = error_df["Reconstruction_error"].min(), error_df["Reconstruction_error"].max() | |
| optimal_threshold = min_error | |
| max_f1 = 0 | |
| max_pr = 0 | |
| max_re = 0 | |
| # step_value = (max_error-min_error)/(steps - 1) | |
| for threshold in np.arange(min_error, max_error, 0.005): | |
| # print("Threshold: " + str(threshold)) | |
| # y_pred = [1 if e > threshold else 0 for e in error_df.Reconstruction_error.values] | |
| y_pred = [0 if e > threshold else 1 for e in error_df.Reconstruction_error.values] | |
| # conf_matrix = confusion_matrix(error_df.True_class, y_pred) | |
| # precision, recall, f1 = self.__compute_metrics(conf_matrix) | |
| # precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='macro') | |
| # precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='micro') | |
| # precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='weighted') | |
| precision, recall, f1,_=precision_recall_fscore_support(error_df.True_class, y_pred, average='binary') | |
| if f1 > max_f1: | |
| max_f1 = f1 | |
| optimal_threshold = threshold | |
| max_pr = precision | |
| max_re = recall | |
| print(f"Result optimal_threshold={optimal_threshold}, max_precision={max_pr}, max_recall={max_re}, max_f1={max_f1}") | |
| # return optimal_threshold, max_pr.numpy(), max_re.numpy(), max_f1.numpy() | |
| return optimal_threshold, max_pr, max_re, max_f1 | |
| def __split_by_percent(data,percent): | |
| return train_test_split(data,test_size=0.3,random_state=42) | |
| def train_autoencoder(self): | |
| #GraphCodeBERT | |
| autoencoder = self.__get_autoencoder(768) | |
| self.__print_summary(autoencoder) | |
| #Create Dataset df | |
| df = pd.DataFrame(columns=['Embedding','True_class']) | |
| #DB | |
| db = Database(dotenv_values(".env")['COLLECTION_NAME']) | |
| # embeddings_list = [emb["embedding"] for emb in list(db.find_docs({"refactoring_type":"Extract Method"}))] | |
| pos_emb_list, neg_emb_list = [],[] | |
| for doc in list(db.find_docs({"refactoring_type":"Extract Method"})): | |
| pos_emb_list.append(doc['embedding_pos']) | |
| neg_emb_list.append(doc['embedding_neg']) | |
| pos_emb_list_train, pos_emb_list_test = self.__split_by_percent(pos_emb_list,0.3) | |
| _, neg_emb_list_test = self.__split_by_percent(neg_emb_list,0.3) | |
| x_train = np.array(pos_emb_list_train) | |
| x_test = np.array(pos_emb_list_test+neg_emb_list_test) | |
| y_test = np.array([1 for i in range(0,len(pos_emb_list_test))]+[0 for i in range(0,len(neg_emb_list_test))]) | |
| # print(np.array(pos_emb_list_train).shape) | |
| epoch = 25 | |
| history, trained_model = self.__fit_autoencoder(epoch,32,autoencoder,x_train) | |
| trained_model.save('./results/autoencoder_'+str(epoch)+'.hdf5') | |
| #Test | |
| test_predict = trained_model.predict(x_test) | |
| mse = np.mean(np.power(x_test - test_predict, 2), axis=1) | |
| error_df = pd.DataFrame({'Reconstruction_error': mse, | |
| 'True_class': y_test}) | |
| print("Max: ", error_df["Reconstruction_error"].max()) | |
| print("Min: ", error_df["Reconstruction_error"].min()) | |
| # optimal_threshold, precision, recall, f1 = self.__find_optimal(error_df,100) | |
| optimal_threshold, precision, recall, f1 = self.__find_optimal_modified(error_df,100) | |
| print(f"Result optimal_threshold={optimal_threshold}, max_precision={precision}, max_recall={recall}, max_f1={f1}") | |
| metrics = { | |
| "Threshold":optimal_threshold, | |
| "Precision": precision, | |
| "Recall":recall, | |
| "F1":f1 | |
| } | |
| with open('./results/metrics.json','w') as fp: | |
| json.dump(metrics,fp) | |
| plt.plot(history.history['loss']) | |
| plt.savefig("./results/training_graph.png") | |
| if __name__=="__main__": | |
| Autoencoder().train_autoencoder() |