Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import time | |
| import azure.cognitiveservices.speech as speechsdk | |
| import datetime | |
| import zipfile | |
| import io | |
| import copy | |
| import re | |
| from urllib.request import urlopen | |
| from pathlib import Path | |
| from . import azure_batch | |
| from . import utils | |
| from .utils import parseBool | |
| # Get variables from config | |
| # Get Azure variables if applicable | |
| AZURE_SPEECH_KEY = os.environ.get('SPEECH_KEY') | |
| AZURE_SPEECH_REGION = os.environ.get('SPEECH_REGION') | |
| azure_sentence_pause = 80 | |
| azure_comma_pause = 50 | |
| debug_mode = False | |
| tts_service = 'azure' | |
| # ======================================== Pronunciation Correction Functions ================================================ | |
| BASE_DIR = Path(__file__).resolve().parent.parent / 'SSML_Customization' | |
| interpretAsOverrideFile = os.path.join(BASE_DIR, 'interpret-as.csv') | |
| interpretAsEntries = utils.csv_to_dict(interpretAsOverrideFile) | |
| aliasOverrideFile = os.path.join(BASE_DIR, 'aliases.csv') | |
| aliasEntries = utils.csv_to_dict(aliasOverrideFile) | |
| urlListFile = os.path.join(BASE_DIR, 'url_list.txt') | |
| urlList = utils.txt_to_list(urlListFile) | |
| phonemeFile = os.path.join(BASE_DIR, 'Phoneme_Pronunciation.csv') | |
| phonemeEntries = utils.csv_to_dict(phonemeFile) | |
| def add_all_pronunciation_overrides(text): | |
| text = add_interpretas_tags(text) | |
| text = add_alias_tags(text) | |
| text = add_phoneme_tags(text) | |
| return text | |
| def add_interpretas_tags(text): | |
| # Add interpret-as tags from interpret-as.csv | |
| for entryDict in interpretAsEntries: | |
| # Get entry info | |
| entryText = entryDict['Text'] | |
| entryInterpretAsType = entryDict['interpret-as Type'] | |
| isCaseSensitive = parseBool(entryDict['Case Sensitive (True/False)']) | |
| entryFormat = entryDict['Format (Optional)'] | |
| # Create say-as tag | |
| if entryFormat == "": | |
| sayAsTagStart = rf'<say-as interpret-as="{entryInterpretAsType}">' | |
| else: | |
| sayAsTagStart = rf'<say-as interpret-as="{entryInterpretAsType}" format="{entryFormat}">' | |
| # Find and replace the word | |
| findWordRegex = rf'(\b["\']?{entryText}[.,!?]?["\']?\b)' # Find the word, with optional punctuation after, and optional quotes before or after | |
| if isCaseSensitive: | |
| text = re.sub(findWordRegex, rf'{sayAsTagStart}\1</say-as>', text) # Uses group reference, so remember regex must be in parentheses | |
| else: | |
| text = re.sub(findWordRegex, rf'{sayAsTagStart}\1</say-as>', text, flags=re.IGNORECASE) | |
| # Add interpret-as tags from url_list.txt | |
| for url in urlList: | |
| # This regex expression will match the top level domain extension, and the punctuation before/after it, and any periods, slashes or colons | |
| # It will then put the say-as characters tag around all matches | |
| punctuationRegex = re.compile(r'((?:\.[a-z]{2,6}(?:\/|$|\s))|(?:[\.\/:]+))') | |
| taggedURL = re.sub(punctuationRegex, r'<say-as interpret-as="characters">\1</say-as>', url) | |
| # Replace any instances of the URL with the tagged version | |
| text = text.replace(url, taggedURL) | |
| return text | |
| def add_alias_tags(text): | |
| for entryDict in aliasEntries: | |
| # Get entry info | |
| entryText = entryDict['Original Text'] | |
| entryAlias = entryDict['Alias'] | |
| if entryDict['Case Sensitive (True/False)'] == "": | |
| isCaseSensitive = False | |
| else: | |
| isCaseSensitive = parseBool(entryDict['Case Sensitive (True/False)']) | |
| # Find and replace the word | |
| findWordRegex = rf'\b["\'()]?{entryText}[.,!?()]?["\']?\b' # Find the word, with optional punctuation after, and optional quotes before or after | |
| if isCaseSensitive: | |
| text = re.sub(findWordRegex, rf'{entryAlias}', text) | |
| else: | |
| text = re.sub(findWordRegex, rf'{entryAlias}', text, flags=re.IGNORECASE) | |
| return text | |
| # Uses the phoneme pronunciation file to add phoneme tags to the text | |
| def add_phoneme_tags(text): | |
| for entryDict in phonemeEntries: | |
| # Get entry info | |
| entryText = entryDict['Text'] | |
| entryPhoneme = entryDict['Phonetic Pronunciation'] | |
| entryAlphabet = entryDict['Phonetic Alphabet'] | |
| if entryDict['Case Sensitive (True/False)'] == "": | |
| isCaseSensitive = False | |
| else: | |
| isCaseSensitive = parseBool(entryDict['Case Sensitive (True/False)']) | |
| # Find and replace the word | |
| findWordRegex = rf'(\b["\'()]?{entryText}[.,!?()]?["\']?\b)' # Find the word, with optional punctuation after, and optional quotes before or after | |
| if isCaseSensitive: | |
| text = re.sub(findWordRegex, rf'<phoneme alphabet="ipa" ph="{entryPhoneme}">\1</phoneme>', text) | |
| else: | |
| text = re.sub(findWordRegex, rf'<phoneme alphabet="{entryAlphabet}" ph="{entryPhoneme}">\1</phoneme>', text, flags=re.IGNORECASE) | |
| return text | |
| # ================================================== Azure Functions ========================================================= | |
| def synthesize_text_azure(text, duration, voiceName, languageCode): | |
| # Create tag for desired duration of clip | |
| durationTag = f'<mstts:audioduration value="{str(duration)}ms"/>' | |
| # Create string for sentence pauses, if not default | |
| if not azure_sentence_pause == 'default': | |
| sentencePauseTag = f'<mstts:silence type="Sentenceboundary-exact" value="{str(azure_sentence_pause)}ms"/>' | |
| else: | |
| sentencePauseTag = '' | |
| # Create string for comma pauses, if not default | |
| if not azure_comma_pause == 'default': | |
| commaPauseTag = f'<mstts:silence type="Comma-exact" value="{str(azure_comma_pause)}ms"/>' | |
| else: | |
| commaPauseTag = '' | |
| # Set string for tag to set leading and trailing silence times to zero | |
| leadSilenceTag = '<mstts:silence type="Leading-exact" value="0ms"/>' | |
| tailSilenceTag = '<mstts:silence type="Tailing-exact" value="0ms"/>' | |
| # Process text using pronunciation customization set by user | |
| text = add_all_pronunciation_overrides(text) | |
| # Create SSML syntax for Azure TTS | |
| ssml = f"<speak version='1.0' xml:lang='{languageCode}' xmlns='http://www.w3.org/2001/10/synthesis' " \ | |
| "xmlns:mstts='http://www.w3.org/2001/mstts'>" \ | |
| f"<voice name='{voiceName}'>{sentencePauseTag}{commaPauseTag}{durationTag}{leadSilenceTag}{tailSilenceTag}" \ | |
| f"{text}</voice></speak>" | |
| speech_config = speechsdk.SpeechConfig(subscription=AZURE_SPEECH_KEY, region=AZURE_SPEECH_REGION) | |
| # For Azure voices, see: https://learn.microsoft.com/en-us/azure/cognitive-services/speech-service/language-support?tabs=stt-tts | |
| speech_config.speech_synthesis_voice_name=voiceName | |
| # For audio outputs, see: https://learn.microsoft.com/en-us/python/api/azure-cognitiveservices-speech/azure.cognitiveservices.speech.speechsynthesisoutputformat?view=azure-python | |
| speech_config.set_speech_synthesis_output_format(speechsdk.SpeechSynthesisOutputFormat.Audio48Khz192KBitRateMonoMp3) | |
| synthesizer = speechsdk.SpeechSynthesizer(speech_config=speech_config, audio_config=None) | |
| #result = synthesizer.speak_text_async(text).get() | |
| result = synthesizer.speak_ssml_async(ssml).get() | |
| stream = speechsdk.AudioDataStream(result) | |
| return stream | |
| def format_percentage_change(speedFactor): | |
| # Determine speedFactor value for Azure TTS. It should be either 'default' or a relative change. | |
| if speedFactor == 1.0: | |
| rate = 'default' | |
| else: | |
| # Whether to add a plus sign to the number to relative change. A negative will automatically be added | |
| if speedFactor >= 1.0: | |
| percentSign = '+' | |
| else: | |
| percentSign = '' | |
| # Convert speedFactor float value to a relative percentage | |
| rate = percentSign + str(round((speedFactor - 1.0) * 100, 5)) + '%' | |
| return rate | |
| def synthesize_text_azure_batch(subsDict, langDict, skipSynthesize=False, secondPass=False): | |
| def create_request_payload(remainingEntriesDict): | |
| # Create SSML for all subtitles | |
| ssmlJson = [] | |
| payloadSizeInBytes = 0 | |
| tempDict = dict(remainingEntriesDict) # Need to do this to avoid changing the original dict which would mess with the loop | |
| for key, value in tempDict.items(): | |
| text = tempDict[key]['translated_text'] | |
| duration = tempDict[key]['duration_ms_buffered'] | |
| language = langDict['languageCode'] | |
| voice = langDict['voiceName'] | |
| # Create tag for desired duration of clip | |
| durationTag = f'<mstts:audioduration value="{str(duration)}ms"/>' | |
| # Create string for sentence pauses, if not default | |
| if not azure_sentence_pause == 'default': | |
| sentencePauseTag = f'<mstts:silence type="Sentenceboundary-exact" value="{str(azure_sentence_pause)}ms"/>' | |
| else: | |
| sentencePauseTag = '' | |
| # Create string for comma pauses, if not default | |
| if not azure_comma_pause == 'default': | |
| commaPauseTag = f'<mstts:silence type="Comma-exact" value="{str(azure_comma_pause)}ms"/>' | |
| else: | |
| commaPauseTag = '' | |
| # Set string for tag to set leading and trailing silence times to zero | |
| leadSilenceTag = '<mstts:silence type="Leading-exact" value="0ms"/>' | |
| tailSilenceTag = '<mstts:silence type="Tailing-exact" value="0ms"/>' | |
| # Process text using pronunciation customization set by user | |
| text = add_all_pronunciation_overrides(text) | |
| # Create the SSML for each subtitle | |
| ssml = f"<speak version='1.0' xml:lang='{language}' xmlns='http://www.w3.org/2001/10/synthesis' " \ | |
| "xmlns:mstts='http://www.w3.org/2001/mstts'>" \ | |
| f"<voice name='{voice}'>{sentencePauseTag}{commaPauseTag}{durationTag}{leadSilenceTag}{tailSilenceTag}" \ | |
| f"{text}</voice></speak>" | |
| ssmlJson.append({"text": ssml}) | |
| # Construct request payload with SSML | |
| # Reconstruct payload with every loop with new SSML so that the payload size is accurate | |
| now = datetime.datetime.now() | |
| pendingPayload = { | |
| 'displayName': langDict['languageCode'] + '-' + now.strftime("%Y-%m-%d %H:%M:%S"), | |
| 'description': 'Batch synthesis of ' + langDict['languageCode'] + ' subtitles', | |
| "textType": "SSML", | |
| # To use custom voice, see original example code script linked from azure_batch.py | |
| "inputs": ssmlJson, | |
| "properties": { | |
| "outputFormat": "audio-48khz-192kbitrate-mono-mp3", | |
| "wordBoundaryEnabled": False, | |
| "sentenceBoundaryEnabled": False, | |
| "concatenateResult": False, | |
| "decompressOutputFiles": False | |
| }, | |
| } | |
| # Azure TTS Batch requests require payload must be under 500 kilobytes, so check payload is under 500,000 bytes. Not sure if they actually mean kibibytes, assume worst case. | |
| # Payload will be formatted as json so must account for that too by doing json.dumps(), otherwise calculated size will be inaccurate | |
| payloadSizeInBytes = len(str(json.dumps(pendingPayload)).encode('utf-8')) | |
| if payloadSizeInBytes > 495000 or len(ssmlJson) > 995: # Leave some room for anything unexpected. Also number of inputs must be below 1000 | |
| # If payload would be too large, ignore the last entry and break out of loop | |
| return payload, remainingEntriesDict | |
| else: | |
| payload = copy.deepcopy(pendingPayload) # Must make deepycopy otherwise ssmlJson will be updated in both instead of just pendingPayload | |
| # Remove entry from remainingEntriesDict if it was added to payload | |
| remainingEntriesDict.pop(key) | |
| # If all the rest of the entries fit, return the payload | |
| return payload, remainingEntriesDict | |
| # ------------------------- End create_request_payload() ----------------------------------- | |
| # Create payloads, split into multiple if necessary | |
| payloadList = [] | |
| remainingPayloadEntriesDict = dict(subsDict) # Will remove entries as they are added to payloads | |
| while len(remainingPayloadEntriesDict) > 0: | |
| payloadToAppend, remainingPayloadEntriesDict = create_request_payload(remainingPayloadEntriesDict) | |
| payloadList.append(payloadToAppend) | |
| # Tell user if request will be broken up into multiple payloads | |
| if len(payloadList) > 1: | |
| print(f'Payload will be broken up into {len(payloadList)} requests (due to Azure size limitations).') | |
| # Use to keep track of filenames downloaded via separate zip files. WIll remove as they are downloaded | |
| remainingDownloadedEntriesList = list(subsDict.keys()) | |
| # Clear out workingFolder | |
| for filename in os.listdir('workingFolder'): | |
| if not debug_mode: | |
| os.remove(os.path.join('workingFolder', filename)) | |
| # Loop through payloads and submit to Azure | |
| for payload in payloadList: | |
| # Reset job_id from previous loops | |
| job_id = None | |
| # Send request to Azure | |
| job_id = azure_batch.submit_synthesis(payload) | |
| # Wait for job to finish | |
| if job_id is not None: | |
| status = "Running" | |
| resultDownloadLink = None | |
| while True: # Must use break to exit loop | |
| # Get status | |
| response = azure_batch.get_synthesis(job_id) | |
| status = response.json()['status'] | |
| if status == 'Succeeded': | |
| print('Batch synthesis job succeeded') | |
| resultDownloadLink = azure_batch.get_synthesis(job_id).json()['outputs']['result'] | |
| break | |
| elif status == 'Failed': | |
| print('ERROR: Batch synthesis job failed!') | |
| print("Reason:" + response.reason) | |
| break | |
| else: | |
| print(f'Waiting for Azure batch synthesis job to finish. Status: [{status}]') | |
| time.sleep(5) | |
| # Download resultig zip file | |
| if resultDownloadLink is not None: | |
| # Download zip file | |
| urlResponse = urlopen(resultDownloadLink) | |
| # If debug mode, save zip file to disk | |
| if debug_mode: | |
| if secondPass == False: | |
| zipName = 'azureBatch.zip' | |
| else: | |
| zipName = 'azureBatchPass2.zip' | |
| zipPath = os.path.join('workingFolder', zipName) | |
| with open(zipPath, 'wb') as f: | |
| f.write(urlResponse.read()) | |
| # Reset urlResponse so it can be read again | |
| urlResponse = urlopen(resultDownloadLink) | |
| # Process zip file | |
| virtualResultZip = io.BytesIO(urlResponse.read()) | |
| zipdata = zipfile.ZipFile(virtualResultZip) | |
| zipinfos = zipdata.infolist() | |
| # Reorder zipinfos so the file names are in alphanumeric order | |
| zipinfos.sort(key=lambda x: x.filename) | |
| # Only extract necessary files, and rename them while doing so | |
| for file in zipinfos: | |
| if file.filename == "summary.json": | |
| #zipdata.extract(file, 'workingFolder') # For debugging | |
| pass | |
| elif "json" not in file.filename: | |
| # Rename file to match first entry in remainingDownloadedEntriesDict, then extract | |
| currentFileNum = remainingDownloadedEntriesList[0] | |
| file.filename = str(currentFileNum) + '.mp3' | |
| #file.filename = file.filename.lstrip('0') | |
| # Add file path to subsDict then remove from remainingDownloadedEntriesList | |
| subsDict[currentFileNum]['TTS_FilePath'] = os.path.join('workingFolder', str(currentFileNum)) + '.mp3' | |
| # Extract file | |
| zipdata.extract(file, 'workingFolder') | |
| # Remove entry from remainingDownloadedEntriesList | |
| remainingDownloadedEntriesList.pop(0) | |
| return subsDict | |
| def synthesize_dictionary_batch(subsDict, langDict, skipSynthesize=False, secondPass=False): | |
| if not skipSynthesize: | |
| subsDict = synthesize_text_azure_batch(subsDict, langDict, skipSynthesize, secondPass) | |
| return subsDict | |
| def synthesize_dictionary(subsDict, langDict, outputFolder, skipSynthesize=False, secondPass=False): | |
| for key, value in subsDict.items(): | |
| # TTS each subtitle text, write to file, write filename into dictionary | |
| workingFolder = os.path.join(outputFolder, 'workingFolder') | |
| filePath = os.path.join(workingFolder, f'{str(key)}.mp3') | |
| filePathStem = os.path.join(workingFolder, f'{str(key)}') | |
| if not skipSynthesize: | |
| duration = value['duration_ms_buffered'] | |
| if secondPass: | |
| # Get speed factor from subsDict | |
| speedFactor = subsDict[key]['speed_factor'] | |
| else: | |
| speedFactor = float(1.0) | |
| # Prepare output location. If folder doesn't exist, create it | |
| if not os.path.exists(os.path.dirname(filePath)): | |
| try: | |
| os.makedirs(os.path.dirname(filePath)) | |
| except OSError: | |
| print("Error creating directory") | |
| # If Azure TTS, use Azure API | |
| if tts_service == "azure": | |
| # Audio variable is an AudioDataStream object | |
| audio = synthesize_text_azure(value['translated_text'], duration, langDict['voiceName'], langDict['languageCode']) | |
| # Save to file using save_to_wav_file method of audio object | |
| audio.save_to_wav_file(filePath) | |
| # If debug mode, write to files after Google TTS | |
| if debug_mode and secondPass == False: | |
| audio.save_to_wav_file(filePathStem+"_p1.mp3") | |
| elif debug_mode and secondPass == True: | |
| audio.save_to_wav_file(filePathStem+"_p2.mp3") | |
| subsDict[key]['TTS_FilePath'] = filePath | |
| # Get key index | |
| keyIndex = list(subsDict.keys()).index(key) | |
| # Print progress and overwrite line next time | |
| if not secondPass: | |
| print(f" Synthesizing TTS Line: {keyIndex+1} of {len(subsDict)}", end="\r") | |
| else: | |
| print(f" Synthesizing TTS Line (2nd Pass): {keyIndex+1} of {len(subsDict)}", end="\r") | |
| print(" ") # Clear the line | |
| return subsDict | |