"""A Gradio app for de-identifying text data using FHE.""" import base64 import os import re import subprocess import time import uuid from typing import Dict, List import gradio as gr import numpy import pandas as pd import requests from fhe_anonymizer import FHEAnonymizer #from openai import OpenAI from utils_demo import * from concrete.ml.deployment import FHEModelClient # Ensure the directory is clean before starting processes or reading files clean_directory() anonymizer = FHEAnonymizer() #client = OpenAI(api_key=os.environ.get("openaikey")) # Start the Uvicorn server hosting the FastAPI app subprocess.Popen(["uvicorn", "server:app"], cwd=CURRENT_DIR) time.sleep(3) # Load data from files required for the application UUID_MAP = read_json(MAPPING_UUID_PATH) ANONYMIZED_DOCUMENT = read_txt(ANONYMIZED_FILE_PATH) MAPPING_ANONYMIZED_SENTENCES = read_pickle(MAPPING_ANONYMIZED_SENTENCES_PATH) MAPPING_ENCRYPTED_SENTENCES = read_pickle(MAPPING_ENCRYPTED_SENTENCES_PATH) ORIGINAL_DOCUMENT = read_txt(ORIGINAL_FILE_PATH).split("\n\n") MAPPING_DOC_EMBEDDING = read_pickle(MAPPING_DOC_EMBEDDING_PATH) print(f"{ORIGINAL_DOCUMENT=}\n") print(f"{MAPPING_DOC_EMBEDDING.keys()=}") # 4. Data Processing and Operations (No specific operations shown here, assuming it's part of anonymizer or client usage) # 5. Utilizing External Services or APIs # (Assuming client initialization and anonymizer setup are parts of using external services or application-specific logic) # Generate a random user ID for this session USER_ID = numpy.random.randint(0, 2**32) def select_static_anonymized_sentences_fn(selected_sentences: List): selected_sentences = [MAPPING_ANONYMIZED_SENTENCES[sentence] for sentence in selected_sentences] anonymized_selected_sentence = sorted(selected_sentences, key=lambda x: x[0]) anonymized_selected_sentence = [sentence for _, sentence in anonymized_selected_sentence] return "\n\n".join(anonymized_selected_sentence) def key_gen_fn() -> Dict: """Generate keys for a given user.""" print("------------ Step 1: Key Generation:") print(f"Your user ID is: {USER_ID}....") client = FHEModelClient(path_dir=DEPLOYMENT_DIR, key_dir=KEYS_DIR / f"{USER_ID}") client.load() # Creates the private and evaluation keys on the client side client.generate_private_and_evaluation_keys() # Get the serialized evaluation keys serialized_evaluation_keys = client.get_serialized_evaluation_keys() assert isinstance(serialized_evaluation_keys, bytes) # Save the evaluation key evaluation_key_path = KEYS_DIR / f"{USER_ID}/evaluation_key" write_bytes(evaluation_key_path, serialized_evaluation_keys) # anonymizer.generate_key() if not evaluation_key_path.is_file(): error_message = ( f"Error Encountered While generating the evaluation {evaluation_key_path.is_file()=}" ) print(error_message) return {gen_key_btn: gr.update(value=error_message)} else: print("Keys have been generated ✅") return {gen_key_btn: gr.update(value="Keys have been generated ✅")} def encrypt_doc_fn(doc): print(f"\n------------ Step 2.1: Doc encryption: {doc=}") if not (KEYS_DIR / f"{USER_ID}/evaluation_key").is_file(): return {encrypted_doc_box: gr.update(value="Error ❌: Please generate the key first!", lines=10)} # Retrieve the client API client = FHEModelClient(path_dir=DEPLOYMENT_DIR, key_dir=KEYS_DIR / f"{USER_ID}") client.load() encrypted_tokens = [] tokens = re.findall(r"(\b[\w\.\/\-@]+\b|[\s,.!?;:'\"-]+|\$\d+(?:\.\d+)?|\€\d+(?:\.\d+)?)", ' '.join(doc)) for token in tokens: if token.strip() and re.match(r"\w+", token): emb_x = MAPPING_DOC_EMBEDDING[token] assert emb_x.shape == (1, 1024) encrypted_x = client.quantize_encrypt_serialize(emb_x) assert isinstance(encrypted_x, bytes) encrypted_tokens.append(encrypted_x) print("Doc encrypted ✅ on Client Side") # No need to save it # write_bytes(KEYS_DIR / f"{USER_ID}/encrypted_doc", b"".join(encrypted_tokens)) encrypted_quant_tokens_hex = [token.hex()[500:510] for token in encrypted_tokens] return { encrypted_doc_box: gr.update(value=" ".join(encrypted_quant_tokens_hex), lines=10), anonymized_doc_output: gr.update(visible=True, value=None), } import presidio_analyzer import presidio_anonymizer def anonymization_with_presidio(prompt): analyzer = AnalyzerEngine() anonymizer = AnonymizerEngine() results = analyzer.analyze(text=prompt,language='en') result = anonymizer.anonymize(text=prompt, analyzer_results=results) return result.text def encrypt_query_fn(query): print(f"\n------------ Step 2: Query encryption: {query=}") if not (KEYS_DIR / f"{USER_ID}/evaluation_key").is_file(): return {output_encrypted_box: gr.update(value="Error ❌: Please generate the key first!", lines=8)} if is_user_query_valid(query): return { query_box: gr.update( value=( "Unable to process ❌: The request exceeds the length limit or falls " "outside the scope of this document. Please refine your query." ) ) } # Retrieve the client API client = FHEModelClient(path_dir=DEPLOYMENT_DIR, key_dir=KEYS_DIR / f"{USER_ID}") client.load() encrypted_tokens = [] # Pattern to identify words and non-words (including punctuation, spaces, etc.) tokens = re.findall(r"(\b[\w\.\/\-@]+\b|[\s,.!?;:'\"-]+)", query) for token in tokens: # 1- Ignore non-words tokens if bool(re.match(r"^\s+$", token)): continue # 2- Directly append non-word tokens or whitespace to processed_tokens # Prediction for each word emb_x = get_batch_text_representation([token], EMBEDDINGS_MODEL, TOKENIZER) encrypted_x = client.quantize_encrypt_serialize(emb_x) assert isinstance(encrypted_x, bytes) encrypted_tokens.append(encrypted_x) print("Data encrypted ✅ on Client Side") assert len({len(token) for token in encrypted_tokens}) == 1 write_bytes(KEYS_DIR / f"{USER_ID}/encrypted_input", b"".join(encrypted_tokens)) write_bytes( KEYS_DIR / f"{USER_ID}/encrypted_input_len", len(encrypted_tokens[0]).to_bytes(10, "big") ) encrypted_quant_tokens_hex = [token.hex()[500:580] for token in encrypted_tokens] return { output_encrypted_box: gr.update(value=" ".join(encrypted_quant_tokens_hex), lines=8), anonymized_query_output: gr.update(visible=True, value=None), identified_words_output_df: gr.update(visible=False, value=None), } def send_input_fn(query) -> Dict: """Send the encrypted data and the evaluation key to the server.""" print("------------ Step 3.1: Send encrypted_data to the Server") evaluation_key_path = KEYS_DIR / f"{USER_ID}/evaluation_key" encrypted_input_path = KEYS_DIR / f"{USER_ID}/encrypted_input" encrypted_input_len_path = KEYS_DIR / f"{USER_ID}/encrypted_input_len" if not evaluation_key_path.is_file(): error_message = ( "Error Encountered While Sending Data to the Server: " f"The key has been generated correctly - {evaluation_key_path.is_file()=}" ) return {anonymized_query_output: gr.update(value=error_message)} if not encrypted_input_path.is_file(): error_message = ( "Error Encountered While Sending Data to the Server: The data has not been encrypted " f"correctly on the client side - {encrypted_input_path.is_file()=}" ) return {anonymized_query_output: gr.update(value=error_message)} # Define the data and files to post data = {"user_id": USER_ID, "input": query} files = [ ("files", open(evaluation_key_path, "rb")), ("files", open(encrypted_input_path, "rb")), ("files", open(encrypted_input_len_path, "rb")), ] # Send the encrypted input and evaluation key to the server url = SERVER_URL + "send_input" with requests.post( url=url, data=data, files=files, ) as resp: print("Data sent to the server ✅" if resp.ok else "Error ❌ in sending data to the server") def run_fhe_in_server_fn() -> Dict: """Run in FHE the anonymization of the query""" print("------------ Step 3.2: Run in FHE on the Server Side") evaluation_key_path = KEYS_DIR / f"{USER_ID}/evaluation_key" encrypted_input_path = KEYS_DIR / f"{USER_ID}/encrypted_input" if not evaluation_key_path.is_file(): error_message = ( "Error Encountered While Sending Data to the Server: " f"The key has been generated correctly - {evaluation_key_path.is_file()=}" ) return {anonymized_query_output: gr.update(value=error_message)} if not encrypted_input_path.is_file(): error_message = ( "Error Encountered While Sending Data to the Server: The data has not been encrypted " f"correctly on the client side - {encrypted_input_path.is_file()=}" ) return {anonymized_query_output: gr.update(value=error_message)} data = { "user_id": USER_ID, } url = SERVER_URL + "run_fhe" with requests.post( url=url, data=data, ) as response: if not response.ok: return { anonymized_query_output: gr.update( value=( "⚠️ An error occurred on the Server Side. " "Please check connectivity and data transmission." ), ), } else: time.sleep(1) print(f"The query anonymization was computed in {response.json():.2f} s per token.") def get_output_fn() -> Dict: print("------------ Step 3.3: Get the output from the Server Side") if not (KEYS_DIR / f"{USER_ID}/evaluation_key").is_file(): error_message = ( "Error Encountered While Sending Data to the Server: " "The key has not been generated correctly" ) return {anonymized_query_output: gr.update(value=error_message)} if not (KEYS_DIR / f"{USER_ID}/encrypted_input").is_file(): error_message = ( "Error Encountered While Sending Data to the Server: " "The data has not been encrypted correctly on the client side" ) return {anonymized_query_output: gr.update(value=error_message)} data = { "user_id": USER_ID, } # Retrieve the encrypted output url = SERVER_URL + "get_output" with requests.post( url=url, data=data, ) as response: if response.ok: print("Data received ✅ from the remote Server") response_data = response.json() encrypted_output_base64 = response_data["encrypted_output"] length_encrypted_output_base64 = response_data["length"] # Decode the base64 encoded data encrypted_output = base64.b64decode(encrypted_output_base64) length_encrypted_output = base64.b64decode(length_encrypted_output_base64) # Save the encrypted output to bytes in a file as it is too large to pass through # regular Gradio buttons (see https://github.com/gradio-app/gradio/issues/1877) write_bytes(CLIENT_DIR / f"{USER_ID}_encrypted_output", encrypted_output) write_bytes(CLIENT_DIR / f"{USER_ID}_encrypted_output_len", length_encrypted_output) else: print("Error ❌ in getting data to the server") def decrypt_fn(text) -> Dict: """Dencrypt the data on the `Client Side`.""" print("------------ Step 4: Dencrypt the data on the `Client Side`") # Get the encrypted output path encrypted_output_path = CLIENT_DIR / f"{USER_ID}_encrypted_output" if not encrypted_output_path.is_file(): error_message = """⚠️ Please ensure that: \n - the connectivity \n - the query has been submitted \n - the evaluation key has been generated \n - the server processed the encrypted data \n - the Client received the data from the Server before decrypting the prediction """ print(error_message) return error_message, None # Retrieve the client API client = FHEModelClient(path_dir=DEPLOYMENT_DIR, key_dir=KEYS_DIR / f"{USER_ID}") client.load() # Load the encrypted output as bytes encrypted_output = read_bytes(CLIENT_DIR / f"{USER_ID}_encrypted_output") length = int.from_bytes(read_bytes(CLIENT_DIR / f"{USER_ID}_encrypted_output_len"), "big") tokens = re.findall(r"(\b[\w\.\/\-@]+\b|[\s,.!?;:'\"-]+)", text) decrypted_output, identified_words_with_prob = [], [] i = 0 for token in tokens: # Directly append non-word tokens or whitespace to processed_tokens if bool(re.match(r"^\s+$", token)): continue else: encrypted_token = encrypted_output[i : i + length] prediction_proba = client.deserialize_decrypt_dequantize(encrypted_token) probability = prediction_proba[0][1] i += length if probability >= 0.77: identified_words_with_prob.append((token, probability)) # Use the existing UUID if available, otherwise generate a new one tmp_uuid = UUID_MAP.get(token, str(uuid.uuid4())[:8]) decrypted_output.append(tmp_uuid) UUID_MAP[token] = tmp_uuid else: decrypted_output.append(token) # Update the UUID map with query. write_json(MAPPING_UUID_PATH, UUID_MAP) # Removing Spaces Before Punctuation: anonymized_text = re.sub(r"\s([,.!?;:])", r"\1", " ".join(decrypted_output)) # Convert the list of identified words and probabilities into a DataFrame if identified_words_with_prob: identified_df = pd.DataFrame( identified_words_with_prob, columns=["Identified Words", "Probability"] ) else: identified_df = pd.DataFrame(columns=["Identified Words", "Probability"]) print("Decryption done ✅ on Client Side") return anonymized_text, identified_df def anonymization_with_fn(selected_sentences, query): encrypt_query_fn(query) send_input_fn(query) run_fhe_in_server_fn() get_output_fn() anonymized_text, identified_df = decrypt_fn(query) return { anonymized_doc_output: gr.update(value=select_static_anonymized_sentences_fn(selected_sentences)), anonymized_query_output: gr.update(value=anonymized_text), identified_words_output_df: gr.update(value=identified_df, visible=False), } # Define the folder path containing audio files AUDIO_FOLDER_PATH = "./files/" # Function to list available audio files in the folder def get_audio_files(): files = [f for f in os.listdir(AUDIO_FOLDER_PATH) if f.endswith(('.wav', '.mp3'))] return files # Step 1: Load and display audio file def load_audio_file(selected_audio): file_path = os.path.join(AUDIO_FOLDER_PATH, selected_audio) return file_path # Step 1.1: Record and save the audio file def save_recorded_audio(audio): file_path = os.path.join(AUDIO_FOLDER_PATH, "recorded_audio.wav") audio.export(file_path, format="wav") # Save the audio as a .wav file return file_path def click_js(): return """function audioRecord() { var xPathRes = document.evaluate ('//*[@id="audio"]//button', document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null); xPathRes.singleNodeValue.click();}""" def action(btn): """Changes button text on click""" if btn == 'Speak': return 'Stop' else: return 'Speak' def check_btn(btn): """Checks for correct button text before invoking transcribe()""" if btn != 'Speak': raise Exception('Recording...') def transcribe(): return 'Success' demo = gr.Blocks(css=".markdown-body { font-size: 18px; }") with demo: gr.Markdown( """

""") gr.Markdown( """

Encrypted de-identification Using Fully Homomorphic Encryption

Concrete-ML Documentation Community @zama_fhe

""" ) gr.Markdown( """

Anonymization makes it impossible to identify a person from a data set and thus allows their privacy to be respected (CNIL)

Concealing some of PII information is a helpful technique for de-identification. However, this can't be considered as anonymization.

De-identification uses Fully Homomorphic Encryption (FHE) to conceal personally identifiable information (PII) within encrypted documents, enabling computations to be performed on the encrypted data.

""" ) # Step 1: Add an audio file gr.Markdown("## Step 1: Add an Audio File") audio_files = get_audio_files() with gr.Row(): audio_file_dropdown = gr.Dropdown(audio_files, label="Select an Audio File", interactive=True) audio_output = gr.Audio(label="Selected Audio", type="filepath") # When an audio file is selected, it will display the file path audio_file_dropdown.change(fn=load_audio_file, inputs=[audio_file_dropdown], outputs=[audio_output]) ########################## Key Gen Part ########################## gr.Markdown( "## Step 1.2: Generate the keys\n\n" """In Fully Homomorphic Encryption (FHE) methods, two types of keys are created. The first type, called secret keys, are used to encrypt and decrypt the user's data. The second type, called evaluation keys, enables a server to work on the encrypted data without seeing the actual data. """ ) gen_key_btn = gr.Button("Generate the secret and evaluation keys") gen_key_btn.click( key_gen_fn, inputs=[], outputs=[gen_key_btn], ) ########################## Step 1.1: Record Audio ########################## gr.Markdown("## Step 1.1: Record an Audio File") """ with gr.Row(): audio_recorder = gr.Audio(source="microphone", type="file", label="Record Audio") record_output = gr.Audio(label="Recorded Audio", type="filepath") # When the user records an audio, save it audio_recorder.change(fn=save_recorded_audio, inputs=[audio_recorder], outputs=[record_output]) gen_key_btn = gr.Button("Generate the secret and evaluation keys") gen_key_btn.click( key_gen_fn, inputs=[], outputs=[gen_key_btn], ) """ msg = gr.Textbox() # Removed the `source` argument as it's no longer supported audio_box = gr.Audio(label="Audio", type="filepath", elem_id='audio') with gr.Row(): audio_btn = gr.Button('Speak') clear = gr.Button("Clear") audio_btn.click(fn=action, inputs=audio_btn, outputs=audio_btn) \ .then(fn=lambda: None, _js=click_js()) \ .then(fn=check_btn, inputs=audio_btn) \ .success(fn=transcribe, outputs=msg) clear.click(lambda: None, None, msg, queue=False) ########################## Main document Part ########################## gr.Markdown("
") gr.Markdown("## Step 2.1: Select the document you want to encrypt\n\n" """To make it simple, we pre-compiled the following document, but you are free to choose on which part you want to run this example. """ ) with gr.Row(): with gr.Column(scale=5): original_sentences_box = gr.CheckboxGroup( ORIGINAL_DOCUMENT, value=ORIGINAL_DOCUMENT, label="Contract:", show_label=True, ) with gr.Column(scale=1, min_width=6): gr.HTML("
") encrypt_doc_btn = gr.Button("Encrypt the document") with gr.Column(scale=5): encrypted_doc_box = gr.Textbox( label="Encrypted document:", show_label=True, interactive=False, lines=10 ) ########################## User Query Part ########################## gr.Markdown("
") gr.Markdown("## Step 2.2: Select the prompt you want to encrypt\n\n" """Please choose from the predefined options in “Prompt examples” or craft a custom question in the “Customized prompt” text box. Remain concise and relevant to the context. Any off-topic query will not be processed.""") with gr.Row(): with gr.Column(scale=5): with gr.Column(scale=5): default_query_box = gr.Dropdown( list(DEFAULT_QUERIES.values()), label="PROMPT EXAMPLES:" ) gr.Markdown("Or") query_box = gr.Textbox( value="What is Kate international bank account number?", label="CUSTOMIZED PROMPT:", interactive=True ) default_query_box.change( fn=lambda default_query_box: default_query_box, inputs=[default_query_box], outputs=[query_box], ) with gr.Column(scale=1, min_width=6): gr.HTML("
") encrypt_query_btn = gr.Button("Encrypt the prompt") # gr.HTML("
") with gr.Column(scale=5): output_encrypted_box = gr.Textbox( label="Encrypted de-identified query that will be sent to the de-identification server:", lines=8, ) ########################## FHE processing Part ########################## gr.Markdown("
") gr.Markdown("## Step 3: De-identify the document and the prompt using FHE") gr.Markdown( """Once the client encrypts the document and the prompt locally, it will be sent to a remote server to perform the de-identification on encrypted data. When the computation is done, the server will return the result to the client for decryption. """ ) run_fhe_btn = gr.Button("De-identify using FHE") with gr.Row(): with gr.Column(scale=5): anonymized_doc_output = gr.Textbox( label="Decrypted and de-idenntified document", lines=10, interactive=True ) with gr.Column(scale=5): anonymized_query_output = gr.Textbox( label="Decrypted and de-identified prompt", lines=10, interactive=True ) identified_words_output_df = gr.Dataframe(label="Identified words:", visible=False) encrypt_doc_btn.click( fn=encrypt_doc_fn, inputs=[original_sentences_box], outputs=[encrypted_doc_box, anonymized_doc_output], ) encrypt_query_btn.click( fn=encrypt_query_fn, inputs=[query_box], outputs=[ query_box, output_encrypted_box, anonymized_query_output, identified_words_output_df, ], ) run_fhe_btn.click( anonymization_with_fn, inputs=[original_sentences_box, query_box], outputs=[anonymized_doc_output, anonymized_query_output, identified_words_output_df], ) ########################## Presidio ########################## gr.Markdown("
") gr.Markdown("## Step 3: De-identify the document and the prompt") gr.Markdown( """This step will demonstrate de-identification using both FHE and Presidio methods. The same prompt will be used for both to allow for direct comparison.""" ) with gr.Row(): with gr.Column(scale=1, min_width=6): run_fhe_btn = gr.Button("De-identify using FHE") with gr.Column(scale=1, min_width=6): run_presidio_btn = gr.Button("De-identify using Presidio") with gr.Column(scale=1, min_width=6): run_both_btn = gr.Button("Run Both De-identification Methods") with gr.Row(): with gr.Column(scale=5): anonymized_doc_output = gr.Textbox( label="FHE: Decrypted and de-identified document", lines=10, interactive=True ) with gr.Column(scale=5): anonymized_query_output = gr.Textbox( label="FHE: Decrypted and de-identified prompt", lines=10, interactive=True ) with gr.Row(): presidio_output = gr.Textbox( label="Presidio: De-identified prompt", lines=10, interactive=True ) identified_words_output_df = gr.Dataframe(label="FHE: Identified words:", visible=False) encrypt_doc_btn.click( fn=encrypt_doc_fn, inputs=[original_sentences_box], outputs=[encrypted_doc_box, anonymized_doc_output], ) encrypt_query_btn.click( fn=encrypt_query_fn, inputs=[query_box], outputs=[ query_box, output_encrypted_box, anonymized_query_output, identified_words_output_df, ], ) run_fhe_btn.click( anonymization_with_fn, inputs=[original_sentences_box, query_box], outputs=[anonymized_doc_output, anonymized_query_output, identified_words_output_df], ) run_presidio_btn.click( anonymization_with_presidio, inputs=[query_box], outputs=[presidio_output], ) def run_both_deidentification(selected_sentences, query): # Run FHE de-identification fhe_results = anonymization_with_fn(selected_sentences, query) # Run Presidio de-identification presidio_result = anonymization_with_presidio(query) # Combine results return { **fhe_results, 'presidio_output': gr.update(value=presidio_result) } run_both_btn.click( run_both_deidentification, inputs=[original_sentences_box, query_box], outputs=[anonymized_doc_output, anonymized_query_output, identified_words_output_df, presidio_output], ) # Launch the app demo.launch(share=False)