agalma / word2vec.py
Mark7549's picture
improved code quality
17c5755
from gensim.models import Word2Vec
from collections import defaultdict
import os
import tempfile
import pandas as pd
from collections import Counter
def load_all_models():
'''
Load all word2vec models
'''
archaic = ('archaic', load_word2vec_model('models/archaic_cbow.model'))
classical = ('classical', load_word2vec_model('models/classical_cbow.model'))
early_roman = ('early_roman', load_word2vec_model('models/early_roman_cbow.model'))
hellen = ('hellen', load_word2vec_model('models/hellen_cbow.model'))
late_roman = ('late_roman', load_word2vec_model('models/late_roman_cbow.model'))
return [archaic, classical, early_roman, hellen, late_roman]
def load_selected_models(selected_models):
'''
Load the selected word2vec models
selected_models: a list of models that should be loaded
'''
models = []
for model in selected_models:
if model == "Early Roman":
model = "early_roman"
elif model == "Late Roman":
model = "late_roman"
elif model == "Hellenistic":
model = "hellen"
model_name = model.lower() + "_cbow"
models.append([model_name, load_word2vec_model(f'models/{model_name}.model')])
return models
def load_word2vec_model(model_path):
'''
Load a word2vec model from a file
model_path: relative path to model files
'''
return Word2Vec.load(model_path)
def get_word_vector(model, word):
'''
Return the word vector of a word
model: word2vec model object
word: word to extract vector from
'''
return model.wv[word]
def iterate_over_words(model):
'''
Iterate over all words in the vocabulary and print their vectors
model: word2vec model object
'''
index = 0
for word, index in model.wv.key_to_index.items():
vector = get_word_vector(model, word)
print(f'{index} Word: {word}, Vector: {vector}')
index += 1
def model_dictionary(model):
'''
Return the dictionary of the word2vec model
Key is the word and value is the vector of the word
model: word2vec model object
'''
dict = defaultdict(list)
for word, index in model.wv.key_to_index.items():
vector = get_word_vector(model, word)
dict[word] = vector
return dict
def dot_product(vector_a, vector_b):
'''
Return the dot product of two vectors
vector_a: A list of numbers representing the first vector
vector_b: A list of numbers representing the second vector
Returns:
A single number representing the dot product of the two vectors
'''
return sum(a * b for a, b in zip(vector_a, vector_b))
def magnitude(vector):
'''
Returns the magnitude of a vector
vector: A list of numbers representing the vetor
Returns:
A single number representing the magnitude of the vector.
'''
return sum(x**2 for x in vector) ** 0.5
def cosine_similarity(vector_a, vector_b):
'''
Return the cosine similarity of two vectors
vector_a: A list of numbers representing the first vector
vector_b: A list of numbers representing the second vector
Returns:
A String representing the cosine similarity of the two vectors \
formatted to two decimals.
'''
dot_prod = dot_product(vector_a, vector_b)
mag_a = magnitude(vector_a)
mag_b = magnitude(vector_b)
# Avoid division by zero
if mag_a == 0 or mag_b == 0:
return 0.0
similarity = dot_prod / (mag_a * mag_b)
return "{:.2f}".format(similarity)
def get_cosine_similarity(word1, time_slice_1, word2, time_slice_2):
'''
Return the cosine similarity of two words
word1: The first word as a string.
time_slice_1: The time slice for the first word as a string.
word2: The second word as a string.
time_slice_2: The time slice for the second word as a string.
Returns:
A string representing the cosine similarity of the two words formatted to two decimal places.
'''
time_slice_1 = convert_time_name_to_model(time_slice_1)
time_slice_2 = convert_time_name_to_model(time_slice_2)
if not os.path.exists(f'models/{time_slice_1}.model'):
return
model_1 = load_word2vec_model(f'models/{time_slice_1}.model')
model_2 = load_word2vec_model(f'models/{time_slice_2}.model')
dict_1 = model_dictionary(model_1)
dict_2 = model_dictionary(model_2)
return cosine_similarity(dict_1[word1], dict_2[word2])
def get_cosine_similarity_one_word(word, time_slice1, time_slice2):
'''
Return the cosine similarity of one word in two different time slices
word: The word as a string.
time_slice1: The first time slice as a string.
time_slice2: The second time slice as a string.
Returns:
A string representing the cosine similarity of the word in two different time slices formatted to two decimal places.
'''
# Return if path does not exist
if not os.path.exists(f'models/{time_slice1}.model') or not os.path.exists(f'models/{time_slice2}.model'):
return
model1 = load_word2vec_model(f'models/{time_slice1}.model')
model2 = load_word2vec_model(f'models/{time_slice2}.model')
dict1 = model_dictionary(model1)
dict2 = model_dictionary(model2)
return cosine_similarity(dict1[word], dict2[word])
def validate_nearest_neighbours(word, n, models):
'''
Validate the input of the nearest neighbours function
word: The word as a string.
n: The number of nearest neighbours to find as an integer.
models: A list of model names as strings.
Returns:
A boolean value. True if inputs are valid, False otherwise.
'''
if word == '' or n == '' or models == []:
return False
return True
def convert_model_to_time_name(model_name):
'''
Convert the model name to the time slice name
model_name: The model name as a string.
Returns:
A string representing the corresponding time slice name.
'''
if model_name == 'archaic_cbow' or model_name == 'archaic':
return 'Archaic'
elif model_name == 'classical_cbow' or model_name == 'classical':
return 'Classical'
elif model_name == 'early_roman_cbow' or model_name == 'early_roman':
return 'Early Roman'
elif model_name == 'hellen_cbow' or model_name == 'hellen':
return 'Hellenistic'
elif model_name == 'late_roman_cbow' or model_name == 'late_roman':
return 'Late Roman'
def convert_time_name_to_model(time_name):
'''
Convert the time slice name to the model name
time_name -- The time slice name as a string.
Returns:
A string representing the corresponding model name.
'''
if time_name == 'Archaic':
return 'archaic_cbow'
elif time_name == 'Classical':
return 'classical_cbow'
elif time_name == 'Early Roman':
return 'early_roman_cbow'
elif time_name == 'Hellenistic':
return 'hellen_cbow'
elif time_name == 'Late Roman':
return 'late_roman_cbow'
elif time_name == 'classical':
return 'Classical'
elif time_name == 'early_roman':
return 'Early Roman'
elif time_name == 'hellen':
return 'Hellenistic'
elif time_name == 'late_roman':
return 'Late Roman'
elif time_name == 'archaic':
return 'Archaic'
def get_nearest_neighbours(target_word, n=10, models=load_all_models()):
"""
Return the nearest neighbours of a word for the given models
word: the word for which the nearest neighbours are calculated
n: the number of nearest neighbours to return (default: 10)
models: list of tuples with the name of the time slice and the word2vec model (default: all in ./models)
Return: { 'model_name': [(word, cosine_similarity), ...], ... }
"""
nearest_neighbours = {}
# Iterate over models and compute nearest neighbours
for model in models:
model_neighbours = []
model_name = convert_model_to_time_name(model[0])
model = model[1]
vector_1 = get_word_vector(model, target_word)
# Iterate over all words of the model
for word, index in model.wv.key_to_index.items():
vector_2 = get_word_vector(model, word)
cosine_sim = cosine_similarity(vector_1, vector_2)
# If the list of nearest neighbours is not full yet, add the current word
if len(model_neighbours) < n:
model_neighbours.append((word, cosine_sim))
else:
# If the list of nearest neighbours is full, replace the word with the smallest cosine similarity
smallest_neighbour = min(model_neighbours, key=lambda x: x[1])
if cosine_sim > smallest_neighbour[1]:
model_neighbours.remove(smallest_neighbour)
model_neighbours.append((word, cosine_sim))
# Sort the nearest neighbours by cosine similarity
model_neighbours = sorted(model_neighbours, key=lambda x: x[1], reverse=True)
# Add the model name and the nearest neighbours to the dictionary
nearest_neighbours[model_name] = model_neighbours
return nearest_neighbours
def get_nearest_neighbours_vectors(word, time_slice_model, n=15):
'''
Return the vectors of the nearest neighbours of a word
word: the word for which the nearest neighbours are calculated
time_slice_model: the word2vec model of the time slice of the input word
n: the number of nearest neighbours to return (default: 15)
Return: list of tuples with the word, the time slice, the vector, and the cosine similarity
of the nearest neighbours
'''
model_name = convert_model_to_time_name(time_slice_model)
time_slice_model = load_word2vec_model(f'models/{time_slice_model}.model')
vector_1 = get_word_vector(time_slice_model, word)
nearest_neighbours = []
for word, index in time_slice_model.wv.key_to_index.items():
print(word)
vector_2 = get_word_vector(time_slice_model, word)
cosine_sim = cosine_similarity(vector_1, vector_2)
if len(nearest_neighbours) < n:
nearest_neighbours.append((word, model_name, vector_2, cosine_sim))
else:
smallest_neighbour = min(nearest_neighbours, key=lambda x: x[3])
if cosine_sim > smallest_neighbour[3]:
nearest_neighbours.remove(smallest_neighbour)
nearest_neighbours.append((word, model_name, vector_2, cosine_sim))
return sorted(nearest_neighbours, key=lambda x: x[3], reverse=True)
def write_to_file(data):
'''
Write the data to a file
data: the data to be written to the file
Return: the path to the temporary file
'''
# Create random tmp file name
temp_file_descriptor, temp_file_path = tempfile.mkstemp(prefix="temp_", suffix=".txt", dir="/tmp")
os.close(temp_file_descriptor)
# Write data to the temporary file
with open(temp_file_path, 'w') as temp_file:
temp_file.write(str(data))
return temp_file_path
def store_df_in_temp_file(all_dfs):
'''
Store the dataframes in a temporary file
all_dfs: list of tuples with the name of the time slice and the dataframe
Return: the path to the temporary Excel file
'''
# Define directory for temporary files
temp_dir = "./downloads/nn"
# Create the directory if it doesn't exist
os.makedirs(temp_dir, exist_ok=True)
# Create random temporary file name
_, temp_file_path = tempfile.mkstemp(prefix="temp_", suffix=".xlsx", dir=temp_dir)
# Concatenate all dataframes
df = pd.concat([df for _, df in all_dfs], axis=1, keys=[model for model, _ in all_dfs])
# Create an ExcelWriter object
with pd.ExcelWriter(temp_file_path, engine='xlsxwriter') as writer:
# Create a new sheet
worksheet = writer.book.add_worksheet('Results')
start_row = 0
for model, df in all_dfs:
worksheet.write(start_row, 0, f"Model: {model}")
df.to_excel(writer, sheet_name='Results', index=False, startrow=start_row + 1, startcol=0)
start_row += df.shape[0] + 3 # Add some space between models
return temp_file_path
def check_word_in_models(word):
'''
Check in which models a word occurs
word: the word to check
Return: list of model names where the word occurs
'''
all_models = load_all_models()
eligible_models = []
for model in all_models:
model_name = convert_time_name_to_model(model[0])
model = model[1]
if word in model.wv.key_to_index:
eligible_models.append(model_name)
return eligible_models
def count_lemmas(directory):
'''
Create a Counter with all words and their occurrences for all models
directory: the directory containing the text files for the models
Return: a dictionary where keys are model names and values are Counters of word occurrences
'''
lemma_count_dict = {}
for file in os.listdir(directory):
model_name = file.split('.')[0].replace('_', ' ').capitalize()
if len(model_name.split()) == 2:
# Also capitalize second part of model name
model_name = ' '.join([word.capitalize() for word in model_name.split()])
if file.endswith(".txt"):
with open(os.path.join(directory, file), 'r', encoding='utf-8') as f:
text = f.read()
words = text.split()
lemma_count_dict[model_name] = Counter(words)
return lemma_count_dict