Spaces:
Sleeping
Sleeping
| from flask import Flask, render_template, request, jsonify, send_from_directory , send_file | |
| from flask_cors import CORS | |
| import time | |
| import uuid | |
| import threading | |
| from collections import deque | |
| import subprocess | |
| import os | |
| app = Flask(__name__) | |
| CORS(app) | |
| # Set the paths for espeak-ng and the script directory | |
| script_path = os.path.dirname(os.path.abspath(__file__)) | |
| espeak_path = "espeak-ng" # Change this to the full path if needed | |
| static_folder = os.path.join(script_path, "static") | |
| # Ensure the static folder exists | |
| os.makedirs(static_folder, exist_ok=True) | |
| class Component: | |
| def __init__(self, component_type, label, options=None, value=None): | |
| self.component_type = component_type | |
| self.label = label | |
| self.options = options | |
| self.value = value | |
| def render(self): | |
| if self.component_type == "textbox": | |
| return f'<textarea name="{self.label}" class="form-control" placeholder="{self.label}">{self.value}</textarea>' | |
| elif self.component_type == "radio": | |
| options_html = ''.join([f''' | |
| <div class="form-check form-check-inline"> | |
| <input class="form-check-input" type="radio" name="{self.label}" id="{self.label}-{index}" value="{option}" {'checked' if self.value==option else ''}> | |
| <label class="form-check-label" for="{self.label}-{index}">{option}</label></div>''' for index,option in enumerate(self.options)]) | |
| return options_html | |
| elif self.component_type == "audio": | |
| return f'<audio controls id="{self.label}"><source src="" type="audio/wav">Your browser does not support the audio element.</audio>' | |
| elif self.component_type == "output": | |
| return f'<div id="{self.label}">Placeholder for {self.label}</div>' | |
| elif self.component_type == "better_textbox": | |
| return f'<div contenteditable="true" id="{self.label}" class="better-textbox" placeholder="{self.value}">{self.value}</div>' | |
| else: | |
| return "" | |
| class Row: | |
| def __init__(self, components): | |
| self.components = components | |
| def render(self): | |
| components_html = ''.join([component.render() for component in self.components]) | |
| return f'<div class="row mb-3">{components_html}</div>' | |
| class Column: | |
| def __init__(self, components): | |
| self.components = components | |
| def render(self): | |
| components_html = ''.join([component.render() for component in self.components]) | |
| return f'<div class="col">{components_html}</div>' | |
| class GradioInterface: | |
| def __init__(self, fn, input_components, output_components, max_parallel_tasks=5): | |
| self.fn = fn | |
| self.input_components = input_components | |
| self.output_components = output_components | |
| self.max_parallel_tasks = max_parallel_tasks | |
| self.task_queue = deque() | |
| self.results = {} | |
| self.current_tasks = {} | |
| self.task_threads = {} | |
| self.stop_flags = {} | |
| def submit(self, input_data): | |
| task_id = str(uuid.uuid4()) | |
| if len(self.current_tasks) < self.max_parallel_tasks: | |
| self.task_queue.append(task_id) | |
| self.current_tasks[task_id] = input_data | |
| stop_flag = threading.Event() | |
| self.stop_flags[task_id] = stop_flag | |
| thread = threading.Thread(target=self.process_task, args=(task_id, input_data, stop_flag)) | |
| self.task_threads[task_id] = thread | |
| thread.start() | |
| return task_id | |
| else: | |
| return None | |
| def process_task(self, task_id, input_data, stop_flag): | |
| result = self.fn(input_data) | |
| self.results[task_id] = result | |
| self.task_queue.remove(task_id) | |
| del self.current_tasks[task_id] | |
| del self.task_threads[task_id] | |
| del self.stop_flags[task_id] | |
| def get_result(self, task_id): | |
| if task_id in self.results: | |
| return {"status": "completed", "result": self.results[task_id]} | |
| return {"status": "pending"} | |
| def render_components(self): | |
| input_html = ''.join([component.render() for component in self.input_components]) | |
| output_html = ''.join([component.render() for component in self.output_components]) | |
| return input_html #f"{input_html}{output_html}" | |
| def get_queue_position(self): | |
| return len(self.task_queue) | |
| def stop_task(self, task_id): | |
| if task_id in self.stop_flags: | |
| self.stop_flags[task_id].set() | |
| return True | |
| return False | |
| import os | |
| import shutil | |
| import subprocess | |
| def find_espeak_ng_data_folder(): | |
| try: | |
| # https://github.com/espeak-ng/espeak-ng/blob/b006f6d4f997fbfe4016cf29767743e6b397d0fb/src/espeak-ng.c#L309 | |
| data_folder = subprocess.check_output([espeak_path, '--version']).decode().split('Data at: ')[1].strip() | |
| return data_folder | |
| except Exception as e: | |
| print(f"Error: {str(e)}") | |
| return None | |
| def copy_lang_dict(data_folder, destination,lang): | |
| dict_file=lang+'_dict' | |
| en_dict_path = os.path.join(data_folder,dict_file ) | |
| try: | |
| # Copy the en_dict folder to the destination | |
| shutil.copyfile(en_dict_path, os.path.join(destination, dict_file)) | |
| print(f"Successfully copied {dict_file} to {destination}") | |
| except Exception as e: | |
| print(f"Error copying {dict_file}: {str(e)}") | |
| import requests | |
| def download_files(urls, folder): | |
| os.makedirs(folder, exist_ok=True) # Create folder if it doesn't exist | |
| for url in urls: | |
| filename = os.path.join(folder, url.split('/')[-1]) # Get file name from URL | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| with open(filename, 'wb') as f: | |
| f.write(response.content) | |
| print(f'Downloaded: {filename} ✅') | |
| else: | |
| print(f'Failed to download: {url} ❌') | |
| urls = [ | |
| 'https://github.com/espeak-ng/espeak-ng/raw/refs/heads/master/dictsource/fa_rules', | |
| 'https://github.com/espeak-ng/espeak-ng/raw/refs/heads/master/dictsource/fa_list', | |
| 'https://github.com/espeak-ng/espeak-ng/raw/refs/heads/master/dictsource/fa_extra' | |
| ] | |
| download_files(urls, os.path.join(static_folder,"dictsource")) | |
| def compile_espeak(): | |
| compile_command = [espeak_path, "--compile=fa"] | |
| cwd_path = os.path.join(static_folder,"dictsource") | |
| os.makedirs(cwd_path, exist_ok=True) | |
| subprocess.run(compile_command, cwd=cwd_path) | |
| print("Compilation done!") | |
| import gzip | |
| phonetc_data_file_path = os.path.join(static_folder, 'fa_extra.txt.gz') | |
| generations_file_path = os.path.join(static_folder, 'generations.txt.gz') | |
| import time | |
| import datetime | |
| def get_iran_time_no_packages(): | |
| iran_offset_seconds = 3.5 * 3600 # 3.5 hours * seconds/hour | |
| utc_time = time.gmtime() | |
| iran_time_tuple = time.localtime(time.mktime(utc_time) + iran_offset_seconds) | |
| iran_datetime = datetime.datetime(*iran_time_tuple[:6]) # Create datetime object | |
| formatted_time = iran_datetime.strftime("%Y-%m-%d %H:%M:%S") | |
| return formatted_time | |
| with gzip.open(phonetc_data_file_path, 'at',encoding='utf-8') as f: # 'at' for text mode | |
| f.write(f'// started at : {get_iran_time_no_packages()}\n') | |
| import soundfile as sf | |
| from app_utils import tts_interface , models | |
| def tts(input_data): | |
| text = input_data.get('text') | |
| voice = input_data.get('voice') | |
| input_file_path = os.path.join(script_path, "input_text.txt") # Path for the input text file | |
| audio_file = os.path.join(static_folder, f'output_{uuid.uuid4()}.wav') # Path for the output audio file | |
| with gzip.open(generations_file_path, 'at',encoding='utf-8') as f: | |
| f.write(f'{voice}\t{text.strip()}\n') | |
| # Write the input text to a file | |
| with open(input_file_path, 'w', encoding='utf-8') as file: | |
| file.write(text) | |
| # Phonemize the text | |
| phonemes_command = [espeak_path, "-v", 'fa', "-x", "-q", "-f", input_file_path] | |
| phonemes_result = subprocess.run(phonemes_command, capture_output=True, text=True) | |
| phonemized_text = phonemes_result.stdout.strip() | |
| # Generate audio file | |
| (sample_rate, audio_data), final_status = tts_interface(voice, text, '') | |
| sf.write( | |
| audio_file, | |
| audio_data, | |
| samplerate=sample_rate, | |
| subtype="PCM_16", | |
| ) | |
| for model in models: | |
| if voice==model[2]: | |
| model_url = model[3] | |
| status = f'مدل : {model_url}\nآوانگاشت: {phonemized_text}' | |
| return {"audio": f"/static/{os.path.basename(audio_file)}", # Return the static path | |
| "status": status | |
| } | |
| def phonemize(input_data): | |
| text = input_data.get('word') | |
| voice = 'fa' | |
| task = input_data.get('task') | |
| phonetic = input_data.get('phonetic',"").strip() | |
| input_file_path = os.path.join(script_path, "input_text.txt") # Path for the input text file | |
| audio_file = os.path.join(static_folder, f'output_{uuid.uuid4()}.wav') # Path for the output audio file | |
| if phonetic=='' or task=='phonemize' : | |
| phonetic=text | |
| else: | |
| phonetic=f'[[{phonetic.strip()}]]' | |
| # Write the input text to a file | |
| with open(input_file_path, 'w', encoding='utf-8') as file: | |
| file.write(phonetic) | |
| # Phonemize the text | |
| phonemes_command = [espeak_path, "-v", voice, "-x", "-q", "-f", input_file_path] | |
| phonemes_result = subprocess.run(phonemes_command, capture_output=True, text=True) | |
| phonemized_text = phonemes_result.stdout.strip() | |
| # Generate audio file | |
| audio_cmd = f'"{espeak_path}" -v {voice} -w "{audio_file}" -f "{input_file_path}"' | |
| subprocess.run(audio_cmd, shell=True) | |
| status = f'متن : {text}\nآوانگاشت: {phonemized_text}' | |
| if task=='send' and input_data.get('phonetic',False): | |
| with gzip.open(phonetc_data_file_path, 'at',encoding='utf-8') as f: # 'at' for text mode | |
| f.write(f'{text.strip()}\t{input_data.get("phonetic",False).strip()}\n') | |
| return {"audio": f"/static/{os.path.basename(audio_file)}", | |
| "text": text, | |
| "phonemes":phonemized_text, | |
| "status": status | |
| } | |
| def listWords(input_data): | |
| data = [] | |
| with gzip.open(phonetc_data_file_path, 'rt',encoding='utf-8') as f: # 'rt' for text mode | |
| for line in f: | |
| if not line.startswith('//'): # Skip commented lines | |
| parts = line.strip().split('\t') | |
| if len(parts) == 2: | |
| data.append({'word': parts[0].strip(), 'phonetic': parts[1].strip()}) | |
| return data | |
| def example_function(input_data): | |
| return input_data | |
| input_options = list([i[2] for i in models]) | |
| input_components = [ | |
| Component("radio", "voice", options=input_options, value=input_options[7]) | |
| ] | |
| output_components = [ | |
| Component("output", "phonemized_output"), | |
| Component("audio", "audio_output") | |
| ] | |
| ifaces = { | |
| 'tts' : GradioInterface( | |
| fn=tts, | |
| input_components=input_components, | |
| output_components=output_components, | |
| max_parallel_tasks=5 | |
| ) | |
| , | |
| 'phonemize' : GradioInterface( | |
| fn=phonemize, | |
| input_components=input_components, | |
| output_components=output_components, | |
| max_parallel_tasks=5 | |
| ) | |
| , | |
| 'words' : GradioInterface( | |
| fn=listWords, | |
| input_components=input_components, | |
| output_components=output_components, | |
| max_parallel_tasks=5 | |
| ) | |
| , | |
| } | |
| def index(): | |
| return render_template("index.html", components=ifaces['tts'].render_components()) | |
| def submit(): | |
| tab = request.form.get("tab") | |
| task_id = ifaces[tab].submit(request.form) | |
| if task_id: | |
| position = ifaces[tab].get_queue_position() | |
| return jsonify({"task_id": task_id, "position": position}) | |
| else: | |
| return jsonify({"error": "Task queue is full"}), 429 | |
| def result(task_id,tab): | |
| result_data = ifaces[tab].get_result(task_id) | |
| if result_data["status"] == "completed": | |
| return jsonify(result_data) | |
| return jsonify({"status": result_data["status"]}) | |
| def queue(): | |
| return jsonify({"queue": list(iface.task_queue)}) | |
| def stop(task_id): | |
| stopped = iface.stop_task(task_id) | |
| if stopped: | |
| return jsonify({"status": "stopped"}) | |
| return jsonify({"status": "not found"}), 404 | |
| def send_static(filename): | |
| return send_from_directory(static_folder, filename) | |
| def download(): | |
| return send_file(phonetc_data_file_path, as_attachment=True) | |
| def download1(): | |
| dict_file='fa_dict' | |
| compile_espeak() | |
| return send_file(os.path.join(find_espeak_ng_data_folder(),dict_file ), as_attachment=True) | |
| if __name__ == "__main__": | |
| app.run(host='0.0.0.0', port= 7860) | |