Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # -*- coding: UTF-8 -*- | |
| # Imports | |
| import re, regex | |
| from . import utils | |
| from .azure_translate import azure_translate_text | |
| from operator import itemgetter | |
| import sys | |
| import copy | |
| import os | |
| import html | |
| from pathlib import Path | |
| combine_subtitles_max_chars = 200 | |
| translate_service = 'azure' | |
| # -------------------------------- No Translate and Manual Translation Functions ----------------------------------- | |
| BASE_DIR = Path(__file__).resolve().parent.parent / 'SSML_Customization' | |
| # Import files and put into dictionaries | |
| noTranslateOverrideFile = os.path.join(BASE_DIR, 'dont_translate_phrases.txt') | |
| dontTranslateList = utils.txt_to_list(noTranslateOverrideFile) | |
| manualTranslationOverrideFile = os.path.join(BASE_DIR, 'Manual_Translations.csv') | |
| manualTranslationsDict = utils.csv_to_dict(manualTranslationOverrideFile) | |
| urlListFile = os.path.join(BASE_DIR, 'url_list.txt') | |
| urlList = utils.txt_to_list(urlListFile) | |
| # Add span tags around certain words to exclude them from being translated | |
| def add_notranslate_tags_from_notranslate_file(text, phraseList): | |
| for word in phraseList: | |
| findWordRegex = rf'(\p{{Z}}|^)(["\'()]?{word}[.,!?()]?["\']?)(\p{{Z}}|$)' #\p ensures it works with unicode characters | |
| findWordRegexCompiled = regex.compile(findWordRegex, flags=re.IGNORECASE | re.UNICODE) | |
| # Find the word, with optional punctuation after, and optional quotes before or after | |
| text = findWordRegexCompiled.sub(r'\1<span class="notranslate">\2</span>\3', text) | |
| return text | |
| def remove_notranslate_tags(text): | |
| text = text.replace('<span class="notranslate">', '').replace('</span>', '') | |
| return text | |
| def add_notranslate_tags_for_manual_translations(text, langcode): | |
| for manualTranslatedText in manualTranslationsDict: | |
| # Only replace text if the language matches the entry in the manual translations file | |
| if manualTranslatedText['Language Code'] == langcode: | |
| originalText = manualTranslatedText['Original Text'] | |
| findWordRegex = rf'(\p{{Z}}|^)(["\'()]?{originalText}[.,!?()]?["\']?)(\p{{Z}}|$)' | |
| findWordRegexCompiled = regex.compile(findWordRegex, flags=re.IGNORECASE | re.UNICODE) | |
| text = findWordRegexCompiled.sub(r'\1<span class="notranslate">\2</span>\3', text) | |
| return text | |
| # Replace certain words or phrases with their manual translation | |
| def replace_manual_translations(text, langcode): | |
| for manualTranslatedText in manualTranslationsDict: | |
| # Only replace text if the language matches the entry in the manual translations file | |
| if manualTranslatedText['Language Code'] == langcode: | |
| originalText = manualTranslatedText['Original Text'] | |
| translatedText = manualTranslatedText['Translated Text'] | |
| findWordRegex = rf'(\p{{Z}}|^)(["\'()]?{originalText}[.,!?()]?["\']?)(\p{{Z}}|$)' | |
| findWordRegexCompiled = regex.compile(findWordRegex, flags=re.IGNORECASE | re.UNICODE) | |
| # Substitute the matched word with the translated text | |
| text = findWordRegexCompiled.sub(rf'\1{translatedText}\3', text) | |
| return text | |
| #======================================== Translate Text ================================================ | |
| # Note: This function was almost entirely written by GPT-3 after feeding it my original code and asking it to change it so it | |
| # would break up the text into chunks if it was too long. It appears to work | |
| def process_response_text(text, targetLanguage): | |
| text = html.unescape(text) | |
| text = remove_notranslate_tags(text) | |
| text = replace_manual_translations(text, targetLanguage) | |
| return text | |
| def split_transcript_chunks(text, max_length=5000): | |
| # Calculate the total number of utf-8 codepoints | |
| #totalCodepoints = len(text.encode("utf-8")) | |
| # Split the transcript into sentences | |
| sentences = re.split(r'(?<=[.!?])\s+', text) | |
| # Initialize a list to store the chunks of text | |
| chunks = [] | |
| # Initialize a string to store a chunk of text | |
| chunk = "" | |
| # For each sentence in the list of sentences | |
| for sentence in sentences: | |
| # If adding the sentence to the chunk would keep it within the maximum length | |
| if len(chunk.encode("utf-8")) + len(sentence.encode("utf-8")) + 1 <= max_length: # Adding 1 to account for space | |
| # Add the sentence to the chunk | |
| chunk += sentence + " " | |
| else: | |
| # If adding the sentence would exceed the maximum length and chunk is not empty | |
| if chunk: | |
| # Add the chunk to the list of chunks | |
| chunks.append(chunk.strip()) | |
| # Start a new chunk with the current sentence | |
| chunk = sentence + " " | |
| # Add the last chunk to the list of chunks (if it's not empty) | |
| if chunk: | |
| chunks.append(chunk.strip()) | |
| # Return the list of chunks | |
| return chunks | |
| def convertChunkListToCompatibleDict(chunkList): | |
| # Create dictionary with numbers as keys and chunks as values | |
| chunkDict = {} | |
| for i, chunk in enumerate(chunkList, 1): | |
| chunkDict[i] = {'text': chunk} | |
| return chunkDict | |
| # Translate the text entries of the dictionary | |
| def translate_dictionary(inputSubsDict, langDict, translatedSrtFileName, skipTranslation=False, ): | |
| targetLanguage = langDict['targetLanguage'] | |
| sourceLanguage = langDict['sourceLanguage'] | |
| translateService = langDict['translateService'] | |
| # Create a container for all the text to be translated | |
| textToTranslate = [] | |
| for key in inputSubsDict: | |
| originalText = inputSubsDict[key]['text'] | |
| # Add any 'notranslate' tags to the text | |
| processedText = add_notranslate_tags_from_notranslate_file(originalText, dontTranslateList) | |
| processedText = add_notranslate_tags_from_notranslate_file(processedText, urlList) | |
| processedText = add_notranslate_tags_for_manual_translations(processedText, targetLanguage) | |
| # Add the text to the list of text to be translated | |
| textToTranslate.append(processedText) | |
| # Calculate the total number of utf-8 codepoints | |
| codepoints = 0 | |
| for text in textToTranslate: | |
| codepoints += len(text.encode("utf-8")) | |
| # If the codepoints are greater than 28000, split the request into multiple | |
| # Google's API limit is 30000 Utf-8 codepoints per request, while DeepL's is 130000, but we leave some room just in case | |
| if skipTranslation == False: | |
| if translateService == 'azure': | |
| print("Translating text using Azure...") | |
| result = azure_translate_text(textToTranslate, sourceLanguage, targetLanguage) | |
| # Add the translated texts to the dictionary | |
| for i, key in enumerate(inputSubsDict): | |
| inputSubsDict[key]['translated_text'] = process_response_text(result[i]["text"], targetLanguage) | |
| # Print progress, overwrite the same line | |
| print(f' Translated: {key} of {len(inputSubsDict)}', end='\r') | |
| else: | |
| print("Error: Invalid translate_service setting. Only 'Azure' is supported.") | |
| sys.exit() | |
| else: | |
| for key in inputSubsDict: | |
| inputSubsDict[key]['translated_text'] = process_response_text(inputSubsDict[key]['text'], targetLanguage) # Skips translating, such as for testing | |
| print(" ") | |
| combinedProcessedDict = combine_subtitles_advanced(inputSubsDict, int(combine_subtitles_max_chars)) | |
| if skipTranslation == False: | |
| # Write new srt file with translated text | |
| with open(translatedSrtFileName, 'w', encoding='utf-8-sig') as f: | |
| for key in combinedProcessedDict: | |
| f.write(str(key) + '\n') | |
| f.write(combinedProcessedDict[key]['srt_timestamps_line'] + '\n') | |
| f.write(combinedProcessedDict[key]['translated_text'] + '\n') | |
| f.write('\n') | |
| return combinedProcessedDict | |
| ##### Add additional info to the dictionary for each language ##### | |
| def set_translation_info(languageBatchDict): | |
| newBatchSettingsDict = copy.deepcopy(languageBatchDict) | |
| # If using Azure, set all languages to use Azure in dictionary | |
| if translate_service == 'azure': | |
| for langNum, langInfo in languageBatchDict.items(): | |
| newBatchSettingsDict[langNum]['translate_service'] = 'azure' | |
| newBatchSettingsDict[langNum]['formality'] = None | |
| else: | |
| print("Error: No valid translation service selected. Please choose a valid service or enable 'skip_translation' in config.") | |
| sys.exit() | |
| return newBatchSettingsDict | |
| #======================================== Combine Subtitle Lines ================================================ | |
| def combine_subtitles_advanced(inputDict, maxCharacters=200): | |
| charRateGoal = 20 #20 | |
| gapThreshold = 100 # The maximum gap between subtitles to combine | |
| noMorePossibleCombines = False | |
| # Convert dictionary to list of dictionaries of the values | |
| entryList = [] | |
| for key, value in inputDict.items(): | |
| value['originalIndex'] = int(key)-1 | |
| entryList.append(value) | |
| while not noMorePossibleCombines: | |
| entryList, noMorePossibleCombines = combine_single_pass(entryList, charRateGoal, gapThreshold, maxCharacters) | |
| # Convert the list back to a dictionary then return it | |
| return dict(enumerate(entryList, start=1)) | |
| def combine_single_pass(entryListLocal, charRateGoal, gapThreshold, maxCharacters): | |
| # Want to restart the loop if a change is made, so use this variable, otherwise break only if the end is reached | |
| reachedEndOfList = False | |
| noMorePossibleCombines = True # Will be set to False if a combination is made | |
| # Use while loop because the list is being modified | |
| while not reachedEndOfList: | |
| # Need to update original index in here | |
| for entry in entryListLocal: | |
| entry['originalIndex'] = entryListLocal.index(entry) | |
| # Will use later to check if an entry is the last one in the list, because the last entry will have originalIndex equal to the length of the list - 1 | |
| originalNumberOfEntries = len(entryListLocal) | |
| # Need to calculate the char_rate for each entry, any time something changes, so put it at the top of this loop | |
| entryListLocal = calc_list_speaking_rates(entryListLocal, charRateGoal) | |
| # Sort the list by the difference in speaking speed from charRateGoal | |
| priorityOrderedList = sorted(entryListLocal, key=itemgetter('char_rate_diff'), reverse=True) | |
| # Iterates through the list in order of priority, and uses that index to operate on entryListLocal | |
| # For loop is broken after a combination is made, so that the list can be re-sorted and re-iterated | |
| for progress, data in enumerate(priorityOrderedList): | |
| i = data['originalIndex'] | |
| # Check if last entry, and therefore will end loop when done with this iteration | |
| if progress == len(priorityOrderedList) - 1: | |
| reachedEndOfList = True | |
| # Check if the current entry is outside the upper and lower bounds | |
| if (data['char_rate'] > charRateGoal or data['char_rate'] < charRateGoal): | |
| # Check if the entry is the first in entryListLocal, if so do not consider the previous entry | |
| if data['originalIndex'] == 0: | |
| considerPrev = False | |
| else: | |
| considerPrev = True | |
| # Check if the entry is the last in entryListLocal, if so do not consider the next entry | |
| if data['originalIndex'] == originalNumberOfEntries - 1: | |
| considerNext = False | |
| else: | |
| considerNext = True | |
| # Check if current entry is still in the list - if it has been combined with another entry, it will not be | |
| # Get the char_rate of the next and previous entries, if they exist, and calculate the difference | |
| # If the diff is positive, then it is lower than the current char_rate | |
| try: | |
| nextCharRate = entryListLocal[i+1]['char_rate'] | |
| nextDiff = data['char_rate'] - nextCharRate | |
| except IndexError: | |
| considerNext = False | |
| nextCharRate = None | |
| nextDiff = None | |
| try: | |
| prevCharRate = entryListLocal[i-1]['char_rate'] | |
| prevDiff = data['char_rate'] - prevCharRate | |
| except IndexError: | |
| considerPrev = False | |
| prevCharRate = None | |
| prevDiff = None | |
| else: | |
| continue | |
| # Define functions for combining with previous or next entries - Generated with copilot, it's possible this isn't perfect | |
| def combine_with_next(): | |
| entryListLocal[i]['text'] = entryListLocal[i]['text'] + ' ' + entryListLocal[i+1]['text'] | |
| entryListLocal[i]['translated_text'] = entryListLocal[i]['translated_text'] + ' ' + entryListLocal[i+1]['translated_text'] | |
| entryListLocal[i]['end_ms'] = entryListLocal[i+1]['end_ms'] | |
| entryListLocal[i]['end_ms_buffered'] = entryListLocal[i+1]['end_ms_buffered'] | |
| entryListLocal[i]['duration_ms'] = int(entryListLocal[i+1]['end_ms']) - int(entryListLocal[i]['start_ms']) | |
| entryListLocal[i]['duration_ms_buffered'] = int(entryListLocal[i+1]['end_ms_buffered']) - int(entryListLocal[i]['start_ms_buffered']) | |
| entryListLocal[i]['srt_timestamps_line'] = entryListLocal[i]['srt_timestamps_line'].split(' --> ')[0] + ' --> ' + entryListLocal[i+1]['srt_timestamps_line'].split(' --> ')[1] | |
| del entryListLocal[i+1] | |
| def combine_with_prev(): | |
| entryListLocal[i-1]['text'] = entryListLocal[i-1]['text'] + ' ' + entryListLocal[i]['text'] | |
| entryListLocal[i-1]['translated_text'] = entryListLocal[i-1]['translated_text'] + ' ' + entryListLocal[i]['translated_text'] | |
| entryListLocal[i-1]['end_ms'] = entryListLocal[i]['end_ms'] | |
| entryListLocal[i-1]['end_ms_buffered'] = entryListLocal[i]['end_ms_buffered'] | |
| entryListLocal[i-1]['duration_ms'] = int(entryListLocal[i]['end_ms']) - int(entryListLocal[i-1]['start_ms']) | |
| entryListLocal[i-1]['duration_ms_buffered'] = int(entryListLocal[i]['end_ms_buffered']) - int(entryListLocal[i-1]['start_ms_buffered']) | |
| entryListLocal[i-1]['srt_timestamps_line'] = entryListLocal[i-1]['srt_timestamps_line'].split(' --> ')[0] + ' --> ' + entryListLocal[i]['srt_timestamps_line'].split(' --> ')[1] | |
| del entryListLocal[i] | |
| # Choose whether to consider next and previous entries, and if neither then continue to next loop | |
| if data['char_rate'] > charRateGoal: | |
| # Check to ensure next/previous rates are lower than current rate, and the combined entry is not too long, and the gap between entries is not too large | |
| # Need to add check for considerNext and considerPrev first, because if run other checks when there is no next/prev value to check, it will throw an error | |
| if considerNext == False or nextDiff or nextDiff < 0 or (entryListLocal[i]['break_until_next'] >= gapThreshold) or (len(entryListLocal[i]['translated_text']) + len(entryListLocal[i+1]['translated_text']) > maxCharacters): | |
| considerNext = False | |
| try: | |
| if considerPrev == False or not prevDiff or prevDiff < 0 or (entryListLocal[i-1]['break_until_next'] >= gapThreshold) or (len(entryListLocal[i-1]['translated_text']) + len(entryListLocal[i]['translated_text']) > maxCharacters): | |
| considerPrev = False | |
| except TypeError: | |
| considerPrev = False | |
| elif data['char_rate'] < charRateGoal: | |
| # Check to ensure next/previous rates are higher than current rate | |
| if considerNext == False or not nextDiff or nextDiff > 0 or (entryListLocal[i]['break_until_next'] >= gapThreshold) or (len(entryListLocal[i]['translated_text']) + len(entryListLocal[i+1]['translated_text']) > maxCharacters): | |
| considerNext = False | |
| try: | |
| if considerPrev == False or not prevDiff or prevDiff > 0 or (entryListLocal[i-1]['break_until_next'] >= gapThreshold) or (len(entryListLocal[i-1]['translated_text']) + len(entryListLocal[i]['translated_text']) > maxCharacters): | |
| considerPrev = False | |
| except TypeError: | |
| considerPrev = False | |
| else: | |
| continue | |
| # Continue to next loop if neither are considered | |
| if not considerNext and not considerPrev: | |
| continue | |
| # Should only reach this point if two entries are to be combined | |
| if data['char_rate'] > charRateGoal: | |
| # If both are to be considered, then choose the one with the lower char_rate | |
| if considerNext and considerPrev: | |
| if nextDiff < prevDiff: | |
| combine_with_next() | |
| noMorePossibleCombines = False | |
| break | |
| else: | |
| combine_with_prev() | |
| noMorePossibleCombines = False | |
| break | |
| # If only one is to be considered, then combine with that one | |
| elif considerNext: | |
| combine_with_next() | |
| noMorePossibleCombines = False | |
| break | |
| elif considerPrev: | |
| combine_with_prev() | |
| noMorePossibleCombines = False | |
| break | |
| else: | |
| print(f"Error U: Should not reach this point! Current entry = {i}") | |
| print(f"Current Entry Text = {data['text']}") | |
| continue | |
| elif data['char_rate'] < charRateGoal: | |
| # If both are to be considered, then choose the one with the higher char_rate | |
| if considerNext and considerPrev: | |
| if nextDiff > prevDiff: | |
| combine_with_next() | |
| noMorePossibleCombines = False | |
| break | |
| else: | |
| combine_with_prev() | |
| noMorePossibleCombines = False | |
| break | |
| # If only one is to be considered, then combine with that one | |
| elif considerNext: | |
| combine_with_next() | |
| noMorePossibleCombines = False | |
| break | |
| elif considerPrev: | |
| combine_with_prev() | |
| noMorePossibleCombines = False | |
| break | |
| else: | |
| print(f"Error L: Should not reach this point! Index = {i}") | |
| print(f"Current Entry Text = {data['text']}") | |
| continue | |
| return entryListLocal, noMorePossibleCombines | |
| #-- End of combine_single_pass -- | |
| #---------------------------------------------------------------------- | |
| # Calculate the number of characters per second for each subtitle entry | |
| def calc_dict_speaking_rates(inputDict, dictKey='translated_text'): | |
| tempDict = copy.deepcopy(inputDict) | |
| for key, value in tempDict.items(): | |
| tempDict[key]['char_rate'] = round(len(value[dictKey]) / (int(value['duration_ms']) / 1000), 2) | |
| return tempDict | |
| def calc_list_speaking_rates(inputList, charRateGoal, dictKey='translated_text'): | |
| tempList = copy.deepcopy(inputList) | |
| for i in range(len(tempList)): | |
| # Calculate the number of characters per second based on the duration of the entry | |
| tempList[i]['char_rate'] = round(len(tempList[i][dictKey]) / (int(tempList[i]['duration_ms']) / 1000), 2) | |
| # Calculate the difference between the current char_rate and the goal char_rate - Absolute Value | |
| tempList[i]['char_rate_diff'] = abs(round(tempList[i]['char_rate'] - charRateGoal, 2)) | |
| return tempList |