import streamlit as st import torch import numpy as np # linear algebra import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv) import torch from transformers import AutoTokenizer, AutoModel #for embeddings import os from transformers import AutoTokenizer, AutoModelForMaskedLM from datetime import datetime tokenizer = AutoTokenizer.from_pretrained("AshenR/AshenBERTo") modelBert = AutoModel.from_pretrained("AshenR/AshenBERTo",output_hidden_states=True) ishape = (768) def get_embeddings2(text, token_length, device='cuda'): import torch # Dynamically check if CUDA is available device = torch.device(device if torch.cuda.is_available() else 'cpu') print(f"Using device: {device}") # Tokenize the input text tokens = tokenizer(text, max_length=token_length, padding='max_length', truncation=True, return_tensors='pt') # Move tensors to the specified device input_ids = tokens.input_ids.to(device) attention_mask = tokens.attention_mask.to(device) # Move the model to the same device modelBert.to(device) # Get the model output with torch.no_grad(): # Disable gradient calculation for inference output = modelBert(input_ids, attention_mask=attention_mask).hidden_states[-1] # Compute the mean of the output across the token dimension mean_output = torch.mean(output, dim=1) # Move the result back to CPU and convert to numpy return mean_output.cpu().detach().numpy() import tensorflow as tf from tensorflow.keras import layers, Model, callbacks from tensorflow.keras.layers import ( Dense, Dropout, BatchNormalization, Activation, Input, Bidirectional, LSTM, GlobalAveragePooling1D ) from tensorflow.keras.regularizers import l2 import os class SiameseNetwork(Model): def __init__(self, inputShape, featExtractorConfig, lstm_units=128, dropout_rate=0.5, add_lstm=True, distance_metric="concat", regularization=0.01): super(SiameseNetwork, self).__init__() self.inputShape = inputShape self.featExtractorConfig = featExtractorConfig self.add_lstm = add_lstm self.lstm_units = lstm_units self.dropout_rate = dropout_rate self.distance_metric = distance_metric self.regularization = regularization # Define inputs inp_a = layers.Input(shape=inputShape, name="Input_A") inp_b = layers.Input(shape=inputShape, name="Input_B") # Build shared feature extractor self.feature_extractor = self.build_feature_extractor() # Extract features feats_a = self.feature_extractor(inp_a) feats_b = self.feature_extractor(inp_b) # Compute similarity if distance_metric == "concat": distance = layers.Concatenate()([feats_a, feats_b]) elif distance_metric == "euclidean": distance = layers.Lambda(lambda tensors: tf.norm(tensors[0] - tensors[1], axis=1, keepdims=True))([feats_a, feats_b]) elif distance_metric == "cosine": distance = layers.Lambda(lambda tensors: tf.keras.losses.cosine_similarity(tensors[0], tensors[1]))([feats_a, feats_b]) else: raise ValueError(f"Unsupported distance metric: {distance_metric}") # Output layer outputs = layers.Dense(1, activation="sigmoid", name="Output")(distance) # Define the model self.model = Model(inputs=[inp_a, inp_b], outputs=outputs) def build_feature_extractor(self): inputs = Input(shape=self.inputShape) x = inputs # Add configurable dense layers with regularization for n_units in self.featExtractorConfig: x = Dense(n_units, activation=None, kernel_regularizer=l2(self.regularization))(x) x = BatchNormalization()(x) x = Activation('relu')(x) x = Dropout(self.dropout_rate)(x) if self.add_lstm: # Add sequence dimension for LSTM x = layers.Reshape((-1, self.featExtractorConfig[-1]))(x) # Add Bidirectional LSTM x = Bidirectional(LSTM(self.lstm_units, return_sequences=True, dropout=self.dropout_rate))(x) x = GlobalAveragePooling1D()(x) # Efficient pooling for dimensionality reduction return Model(inputs, x, name="FeatureExtractor") def call(self, inputs): return self.model(inputs) def save_model(self, filepath): # Ensure the directory exists os.makedirs(filepath, exist_ok=True) # Save the entire model (architecture, weights, and compilation details) self.model.save(os.path.join(filepath, 'siamese_model.keras')) # Save model configuration for reconstruction config = { 'inputShape': self.inputShape, 'featExtractorConfig': self.featExtractorConfig, 'lstm_units': self.lstm_units, 'dropout_rate': self.dropout_rate, 'add_lstm': self.add_lstm, 'distance_metric': self.distance_metric, 'regularization': self.regularization } # Save the configuration import json with open(os.path.join(filepath, 'model_config.json'), 'w') as f: json.dump(config, f) print(f"Model saved successfully to {filepath}") @classmethod def load_model(cls, filepath): # Load the configuration import json with open(os.path.join(filepath, 'model_config.json'), 'r') as f: config = json.load(f) # Reconstruct the model using the saved configuration siamese_net = cls(**config) # Load the saved weights and compilation details siamese_net.model = tf.keras.models.load_model(os.path.join(filepath, 'siamese_model.keras')) print(f"Model loaded successfully from {filepath}") return siamese_net # loaded_siamese_net = SiameseNetwork.load_model('./saved_siamese_model') @st.cache_resource def predict(input_1, input_2): loaded_siamese_net = SiameseNetwork.load_model('./saved_siamese_model') x1_test = [] x2_test = [] embedding1 = get_embeddings2(input_1, token_length=100).reshape(ishape) embedding2 = get_embeddings2(input_2, token_length=100).reshape(ishape) x1_test.append(embedding1) x2_test.append(embedding2) x1_test = np.array(x1_test) x2_test = np.array(x2_test) pred = str(round(loaded_siamese_net.predict([x1_test,x2_test])[0][0]*100,2))+ " %" result = f"Predicted Similarity: {pred}" print(input_1,input_2,pred) return result # prompt: mongodb atlas python code import pymongo def connect_to_mongodb(connection_string): try: client = pymongo.MongoClient(connection_string) # Perform a simple operation to verify the connection client.admin.command('ismaster') print("Successfully connected to MongoDB Atlas!") return client except pymongo.errors.ConnectionFailure as e: print(f"Could not connect to MongoDB Atlas: {e}") return None # Replace with your actual connection string from MongoDB Atlas # Initialize session state for feedback tracking if "feedback_submitted" not in st.session_state: st.session_state["feedback_submitted"] = False if "show_feedback" not in st.session_state: st.session_state["show_feedback"] = False def save_feedback(input_1, input_2, prediction, rating, feedback): connection_string = "mongodb+srv://ashen8810:Ashen12345@cluster0.t1w59.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0" client = connect_to_mongodb(connection_string) if client: try: collection = client["user_predictions"]["survey"] document = { "timestamp": datetime.now(), "input_1": str(input_1), "input_2": str(input_2), "prediction": str(prediction), "rating": rating, "feedback": str(feedback) } collection.insert_one(document) print("Data Saved to mongoDB") return True except Exception as e: st.error(f"Database Error: {e}") return False finally: client.close() # Initialize session state if "feedback_submitted" not in st.session_state: st.session_state.feedback_submitted = False if "show_feedback" not in st.session_state: st.session_state.show_feedback = False # Page header st.title("Sinhala Short Sentence Similarity") st.write("Compare the similarity between two Sinhala sentences.") input_1 = st.text_input("First Sentence:") input_2 = st.text_input("Second Sentence:") # Prediction section if st.button("Compare Sentences", type="primary"): if input_1 and input_2: with st.spinner("Calculating similarity..."): result = predict(input_1, input_2) if result: st.success(result) st.session_state.show_feedback = True save_feedback(input_1, input_2, result, 0, "Null") else: st.warning("Please enter both sentences to compare.") # Feedback section if st.session_state.show_feedback and not st.session_state.feedback_submitted: st.subheader("Feedback") is_correct = st.radio("Is this similarity assessment correct?", ("Yes", "No")) if is_correct == "No": rating = st.slider("How accurate was the prediction?", 0.0, 1.0, 0.5, 0.1) feedback = st.text_area("Please provide detailed feedback:") if st.button("Submit Feedback"): if save_feedback(input_1, input_2, predict(input_1, input_2), rating, feedback): st.success("Thank you for your feedback!") st.session_state.feedback_submitted = True if st.button("Clear"): st.session_state.clear() st.session_state.input_1 = "" st.session_state.input_2 = "" # Footer st.markdown("""
Developed by Ashen | Version 1.0
""", unsafe_allow_html=True)