|
from flask import Flask, request, jsonify, render_template |
|
import gensim |
|
from gensim.models import FastText |
|
from threading import Thread |
|
import numpy as np |
|
from flask_mail import Mail, Message |
|
|
|
app = Flask(__name__) |
|
|
|
class ModelLoader(Thread): |
|
def __init__(self, model_type, model_path): |
|
super().__init__() |
|
self.model_type = model_type |
|
self.model_path = model_path |
|
self.model = None |
|
|
|
def run(self): |
|
|
|
if self.model_type == 'fasttext': |
|
self.model = gensim.models.FastText.load(self.model_path) |
|
elif self.model_type == 'word2vec': |
|
self.model = gensim.models.Word2Vec.load(self.model_path) |
|
else: |
|
raise ValueError("Unsupported model type") |
|
|
|
|
|
word2vec_loader = ModelLoader('word2vec', 'w2v_shona2.model') |
|
fasttext_loader = ModelLoader('fasttext', 'ft_model1.model') |
|
|
|
|
|
word2vec_loader.start() |
|
fasttext_loader.start() |
|
|
|
@app.route('/') |
|
def home(): |
|
return render_template('index.html') |
|
|
|
|
|
|
|
@app.route('/get_similar_words', methods=['POST']) |
|
def get_similar_words(): |
|
word = request.json.get('word') |
|
model_type = request.json.get('model_type') |
|
top_n = int(request.json.get('top_n', 10)) |
|
|
|
if model_type == 'word2vec': |
|
if word2vec_loader.model is None: |
|
return jsonify({'error': 'Word2Vec model is still loading'}), 503 |
|
try: |
|
similar_words = word2vec_loader.model.wv.most_similar(word, topn=top_n) |
|
return jsonify({'similar_words': similar_words}) |
|
except KeyError: |
|
return jsonify({'error': 'Word not found in Word2Vec model'}), 404 |
|
elif model_type == 'fasttext': |
|
if fasttext_loader.model is None: |
|
return jsonify({'error': 'FastText model is still loading'}), 503 |
|
try: |
|
similar_words = fasttext_loader.model.wv.most_similar(word, topn=top_n) |
|
return jsonify({'similar_words': similar_words}) |
|
except KeyError: |
|
return jsonify({'error': 'Word not found in FastText model'}), 404 |
|
else: |
|
return jsonify({'error': 'Invalid model type'}), 400 |
|
|
|
|
|
|
|
|
|
@app.route('/compute', methods=['POST']) |
|
def compute(): |
|
data = request.json |
|
expression = data.get('expression') |
|
model_type = data.get('model') |
|
top_n = int(data.get('top_n', 5)) |
|
|
|
positive = [] |
|
negative = [] |
|
|
|
|
|
parts = expression.replace('-', '+-').split('+') |
|
|
|
for part in parts: |
|
part = part.strip() |
|
if part.startswith('-'): |
|
negative.append(part[1:].strip()) |
|
elif part: |
|
positive.append(part) |
|
|
|
|
|
if model_type == 'word2vec': |
|
if word2vec_loader.model is None: |
|
return jsonify({'error': 'Word2Vec model is still loading'}), 503 |
|
try: |
|
result = word2vec_loader.model.wv.most_similar(positive=positive, negative=negative, topn=top_n) |
|
response = [{'word': word, 'similarity': similarity} for word, similarity in result] |
|
except Exception as e: |
|
response = {'error': str(e)} |
|
|
|
elif model_type == 'fasttext': |
|
try: |
|
|
|
if fasttext_loader.model is None: |
|
raise ValueError("The FastText model is not loaded properly.") |
|
if not hasattr(fasttext_loader.model, 'wv'): |
|
raise TypeError("The loaded model does not have the 'wv' attribute.") |
|
|
|
if not positive: |
|
raise ValueError("Positive word list is empty.") |
|
if not isinstance(positive, list) or not isinstance(negative, list): |
|
raise TypeError("Positive and negative should be lists.") |
|
|
|
|
|
result_vector = fasttext_loader.model.wv.get_vector(positive[0]).copy() |
|
|
|
|
|
for word in positive[1:]: |
|
if word in fasttext_loader.model.wv: |
|
result_vector += fasttext_loader.model.wv.get_vector(word) |
|
else: |
|
raise KeyError(f"Word '{word}' not in vocabulary") |
|
|
|
|
|
for word in negative: |
|
if word in fasttext_loader.model.wv: |
|
result_vector -= fasttext_loader.model.wv.get_vector(word) |
|
else: |
|
raise KeyError(f"Word '{word}' not in vocabulary") |
|
|
|
|
|
result = fasttext_loader.model.wv.most_similar(positive=[result_vector], topn=top_n) |
|
|
|
|
|
response = [{'word': word, 'similarity': similarity} for word, similarity in result] |
|
|
|
except KeyError as e: |
|
response = {'error': f"KeyError: {str(e)}"} |
|
except ValueError as e: |
|
response = {'error': f"ValueError: {str(e)}"} |
|
except TypeError as e: |
|
response = {'error': f"TypeError: {str(e)}"} |
|
except Exception as e: |
|
response = {'error': f"An unexpected error occurred: {str(e)}"} |
|
|
|
|
|
return jsonify(response) |
|
|
|
else: |
|
return jsonify({'error': 'Invalid model type'}) |
|
|
|
return jsonify(response) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app.config['MAIL_SERVER'] = 'smtp.gmail.com' |
|
app.config['MAIL_PORT'] = 587 |
|
app.config['MAIL_USERNAME'] = '[email protected]' |
|
app.config['MAIL_PASSWORD'] = 'jphs dnyg buph srjf' |
|
app.config['MAIL_USE_TLS'] = True |
|
app.config['MAIL_USE_SSL'] = False |
|
|
|
mail = Mail(app) |
|
|
|
@app.route('/submit', methods=['POST']) |
|
def submit(): |
|
|
|
name = request.form.get('name') |
|
email = request.form.get('email') |
|
message = request.form.get('message') |
|
subscribe = 'subscribe' in request.form |
|
|
|
|
|
try: |
|
msg = Message( |
|
subject='Contact Form Submission', |
|
sender=email, |
|
recipients=[app.config['MAIL_USERNAME']], |
|
body=f"Name: {name}\nEmail: {email}\nMessage: {message}\nSubscribe: {subscribe}" |
|
) |
|
mail.send(msg) |
|
except Exception as e: |
|
return jsonify({ |
|
'status': 'error', |
|
'message': f'An error occurred while sending the email: {str(e)}' |
|
}) |
|
|
|
|
|
return jsonify({ |
|
'status': 'success', |
|
'message': 'Your message has been received and the email has been sent!' |
|
}) |
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
app.run(debug=True) |
|
|