Spaces:
Runtime error
Runtime error
| # Welcome to Team Tonic's MultiMed | |
| import os | |
| import numpy as np | |
| import base64 | |
| import torch | |
| import torchaudio | |
| import gradio as gr | |
| import requests | |
| import json | |
| import dotenv | |
| from transformers import AutoProcessor, SeamlessM4TModel | |
| import torchaudio | |
| dotenv.load_dotenv() | |
| AUDIO_SAMPLE_RATE = 16000.0 | |
| MAX_INPUT_AUDIO_LENGTH = 60 # in seconds | |
| DEFAULT_TARGET_LANGUAGE = "English" | |
| device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
| processor = AutoProcessor.from_pretrained("ylacombe/hf-seamless-m4t-large") | |
| model = SeamlessM4TModel.from_pretrained("ylacombe/hf-seamless-m4t-large").to(device) | |
| from lang_list import ( | |
| LANGUAGE_NAME_TO_CODE, | |
| S2ST_TARGET_LANGUAGE_NAMES, | |
| S2TT_TARGET_LANGUAGE_NAMES, | |
| T2TT_TARGET_LANGUAGE_NAMES, | |
| TEXT_SOURCE_LANGUAGE_NAMES, | |
| LANG_TO_SPKR_ID, | |
| ) | |
| def process_speech(sound): | |
| """ | |
| processing sound using seamless_m4t | |
| """ | |
| # task_name = "T2TT" | |
| arr, org_sr = torchaudio.load(sound) | |
| target_language_code = LANGUAGE_NAME_TO_CODE[DEFAULT_TARGET_LANGUAGE] | |
| new_arr = torchaudio.functional.resample(arr, orig_freq=org_sr, new_freq=AUDIO_SAMPLE_RATE) | |
| max_length = int(MAX_INPUT_AUDIO_LENGTH * AUDIO_SAMPLE_RATE) | |
| if new_arr.shape[1] > max_length: | |
| new_arr = new_arr[:, :max_length] | |
| gr.Warning(f"Input audio is too long. Only the first {MAX_INPUT_AUDIO_LENGTH} seconds is used.") | |
| input_data = processor(audios = new_arr, sampling_rate=AUDIO_SAMPLE_RATE, return_tensors="pt").to(device) | |
| tokens_ids = model.generate(**input_data, generate_speech=False, tgt_lang=target_language_code, num_beams=5, do_sample=True)[0].cpu().squeeze().detach().tolist() | |
| text_out = processor.decode(tokens_ids, skip_special_tokens=True) | |
| return text_out | |
| def convert_image_to_required_format(image): | |
| """ | |
| convert image from numpy to base64 | |
| """ | |
| if type(image) == type(np.array([])): | |
| return base64.b64encode(image).decode('utf-8') | |
| def process_image_with_openai(image): | |
| image_data = convert_image_to_required_format(image) | |
| openai_api_key = os.getenv('OPENAI_API_KEY') | |
| if openai_api_key is None: | |
| raise Exception("OPENAI_API_KEY not found in environment variables") | |
| data_payload = { | |
| "model": "gpt-4-vision-preview", | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": image_data | |
| } | |
| ], | |
| "max_tokens": 300 | |
| } | |
| response = requests.post( | |
| "https://api.openai.com/v1/chat/completions", | |
| headers={ | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {openai_api_key}" | |
| }, | |
| json=data_payload | |
| ) | |
| if response.status_code == 200: | |
| return response.json()['choices'][0]['message']['content'] | |
| else: | |
| raise Exception(f"OpenAI Error: {response.status_code}") | |
| def query_vectara(text): | |
| user_message = text | |
| # Read authentication parameters from the .env file | |
| CUSTOMER_ID = os.getenv('CUSTOMER_ID') | |
| CORPUS_ID = os.getenv('CORPUS_ID') | |
| API_KEY = os.getenv('API_KEY') | |
| # Define the headers | |
| api_key_header = { | |
| "customer-id": CUSTOMER_ID, | |
| "x-api-key": API_KEY | |
| } | |
| # Define the request body in the structure provided in the example | |
| request_body = { | |
| "query": [ | |
| { | |
| "query": user_message, | |
| "queryContext": "", | |
| "start": 1, | |
| "numResults": 50, | |
| "contextConfig": { | |
| "charsBefore": 0, | |
| "charsAfter": 0, | |
| "sentencesBefore": 2, | |
| "sentencesAfter": 2, | |
| "startTag": "%START_SNIPPET%", | |
| "endTag": "%END_SNIPPET%", | |
| }, | |
| "rerankingConfig": { | |
| "rerankerId": 272725718, | |
| "mmrConfig": { | |
| "diversityBias": 0.35 | |
| } | |
| }, | |
| "corpusKey": [ | |
| { | |
| "customerId": CUSTOMER_ID, | |
| "corpusId": CORPUS_ID, | |
| "semantics": 0, | |
| "metadataFilter": "", | |
| "lexicalInterpolationConfig": { | |
| "lambda": 0 | |
| }, | |
| "dim": [] | |
| } | |
| ], | |
| "summary": [ | |
| { | |
| "maxSummarizedResults": 5, | |
| "responseLang": "auto", | |
| "summarizerPromptName": "vectara-summary-ext-v1.2.0" | |
| } | |
| ] | |
| } | |
| ] | |
| } | |
| # Make the API request using Gradio | |
| response = requests.post( | |
| "https://api.vectara.io/v1/query", | |
| json=request_body, # Use json to automatically serialize the request body | |
| verify=True, | |
| headers=api_key_header | |
| ) | |
| if response.status_code == 200: | |
| query_data = response.json() | |
| if query_data: | |
| sources_info = [] | |
| # Extract the summary. | |
| summary = query_data['responseSet'][0]['summary'][0]['text'] | |
| # Iterate over all response sets | |
| for response_set in query_data.get('responseSet', []): | |
| # Extract sources | |
| for source in response_set.get('response', [])[:5]: # Limit to top 5 sources. | |
| source_metadata = source.get('metadata', []) | |
| source_info = {} | |
| for metadata in source_metadata: | |
| metadata_name = metadata.get('name', '') | |
| metadata_value = metadata.get('value', '') | |
| if metadata_name == 'title': | |
| source_info['title'] = metadata_value | |
| elif metadata_name == 'author': | |
| source_info['author'] = metadata_value | |
| elif metadata_name == 'pageNumber': | |
| source_info['page number'] = metadata_value | |
| if source_info: | |
| sources_info.append(source_info) | |
| result = {"summary": summary, "sources": sources_info} | |
| return f"{json.dumps(result, indent=2)}" | |
| else: | |
| return "No data found in the response." | |
| else: | |
| return f"Error: {response.status_code}" | |
| def convert_to_markdown(vectara_response_json): | |
| vectara_response = json.loads(vectara_response_json) | |
| if vectara_response: | |
| summary = vectara_response.get('summary', 'No summary available') | |
| sources_info = vectara_response.get('sources', []) | |
| # Format the summary as Markdown | |
| markdown_summary = f'**Summary:** {summary}\n\n' | |
| # Format the sources as a numbered list | |
| markdown_sources = "" | |
| for i, source_info in enumerate(sources_info): | |
| author = source_info.get('author', 'Unknown author') | |
| title = source_info.get('title', 'Unknown title') | |
| page_number = source_info.get('page number', 'Unknown page number') | |
| markdown_sources += f"{i+1}. {title} by {author}, Page {page_number}\n" | |
| return f"{markdown_summary}**Sources:**\n{markdown_sources}" | |
| else: | |
| return "No data found in the response." | |
| # Main function to handle the Gradio interface logic | |
| def process_and_query(text, image,audio): | |
| try: | |
| # If an image is provided, process it with OpenAI and use the response as the text query for Vectara | |
| if image is not None: | |
| text = process_image_with_openai(image) | |
| # Now, use the text (either provided by the user or obtained from OpenAI) to query Vectara | |
| vectara_response_json = query_vectara(text) | |
| markdown_output = convert_to_markdown(vectara_response_json) | |
| return markdown_output | |
| except Exception as e: | |
| return str(e) | |
| # Define the Gradio interface | |
| iface = gr.Interface( | |
| fn=process_and_query, | |
| inputs=[ | |
| gr.Textbox(label="Input Text"), | |
| gr.Image(label="Upload Image"), | |
| gr.Audio(sources="microphone"), | |
| ], | |
| outputs=[gr.Markdown(label="Output Text")], | |
| title="👋🏻Welcome to ⚕🗣️😷MultiMed - Access Chat ⚕🗣️😷", | |
| description = ''' | |
| ### How To Use ⚕🗣️😷MultiMed⚕: | |
| #### 🗣️📝Interact with ⚕🗣️😷MultiMed⚕ in any language using audio or text! | |
| #### 🗣️📝 This is an educational and accessible conversational tool to improve wellness and sanitation in support of public health. | |
| #### 📚🌟💼 The knowledge base is composed of publicly available medical and health sources in multiple languages. We also used [Kelvalya/MedAware](https://huggingface.co/datasets/keivalya/MedQuad-MedicalQnADataset) that we processed and converted to HTML. The quality of the answers depends on the quality of the dataset, so if you want to see some data represented here, do [get in touch](https://discord.gg/GWpVpekp). You can also use 😷MultiMed⚕️ on your own data & in your own way by cloning this space. 🧬🔬🔍 Simply click here: <a style="display:inline-block" href="https://huggingface.co/spaces/TeamTonic/MultiMed?duplicate=true"><img src="https://img.shields.io/badge/-Duplicate%20Space-blue?labelColor=white&style=flat&logo=&logoWidth=14" alt="Duplicate Space"></a></h3> | |
| #### Join us : 🌟TeamTonic🌟 is always making cool demos! Join our active builder's🛠️community on 👻Discord: [Discord](https://discord.gg/GWpVpekp) On 🤗Huggingface: [TeamTonic](https://huggingface.co/TeamTonic) & [MultiTransformer](https://huggingface.co/MultiTransformer) On 🌐Github: [Polytonic](https://github.com/tonic-ai) & contribute to 🌟 [PolyGPT](https://github.com/tonic-ai/polygpt-alpha)" | |
| ''', | |
| theme='ParityError/Anime', | |
| examples=[ | |
| ["What is the proper treatment for buccal herpes?"], | |
| ["Male, 40 presenting with swollen glands and a rash"], | |
| ["How does cellular metabolism work TCA cycle"], | |
| ["What special care must be provided to children with chicken pox?"], | |
| ["When and how often should I wash my hands ?"], | |
| ["بکل ہرپس کا صحیح علاج کیا ہے؟"], | |
| ["구강 헤르페스의 적절한 치료법은 무엇입니까?"], | |
| ["Je, ni matibabu gani sahihi kwa herpes ya buccal?"], | |
| ], | |
| ) | |
| iface.launch() |