Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
@@ -1,18 +1,123 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
import os
|
2 |
-
import requests
|
3 |
|
4 |
-
github_pat = os.environ['github_pat']
|
5 |
|
6 |
-
|
7 |
|
8 |
-
response = requests.get(raw_url)
|
9 |
|
10 |
-
if response.status_code == 200:
|
11 |
-
exec(response.text)
|
12 |
-
else:
|
13 |
-
raise Exception(f"Failed to fetch the Python file from the repository. Status code: {response.status_code}")
|
14 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
15 |
|
16 |
anonymizer = Anonimiseren()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
17 |
|
18 |
-
|
|
|
1 |
+
import gradio as gr
|
2 |
+
from transformers import AutoTokenizer, AutoModel
|
3 |
+
import torch
|
4 |
+
import numpy as np
|
5 |
+
import faiss
|
6 |
+
import random
|
7 |
+
from tqdm import tqdm
|
8 |
import os
|
|
|
9 |
|
|
|
10 |
|
11 |
+
runclass = os.environ['anonclass']
|
12 |
|
|
|
13 |
|
|
|
|
|
|
|
|
|
14 |
|
15 |
+
"""
|
16 |
+
class Anonimiseren:
|
17 |
+
def __init__(self, model_name="CLTL/MedRoBERTa.nl"):
|
18 |
+
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
|
19 |
+
self.model = AutoModel.from_pretrained(model_name)
|
20 |
+
self._prepare_vocab_index()
|
21 |
+
|
22 |
+
def _prepare_vocab_index(self):
|
23 |
+
vocab_embeddings = self.model.get_input_embeddings().weight.data.numpy()
|
24 |
+
self.vocab_index = faiss.IndexFlatL2(vocab_embeddings.shape[1])
|
25 |
+
self.vocab_index.add(vocab_embeddings)
|
26 |
+
|
27 |
+
def get_closest_token_id(self, embedding, original_token_id, desired_length=3):
|
28 |
+
D, I = self.vocab_index.search(np.array([embedding]), desired_length + 10)
|
29 |
+
|
30 |
+
exclude_token_ids = [self.tokenizer.cls_token_id, self.tokenizer.sep_token_id, self.tokenizer.pad_token_id, self.tokenizer.mask_token_id]
|
31 |
+
closest_token_ids = [token_id for token_id in I[0] if token_id != original_token_id and token_id not in exclude_token_ids]
|
32 |
+
|
33 |
+
idx = 0
|
34 |
+
while len(closest_token_ids) < desired_length and idx < len(I[0]):
|
35 |
+
token_id = I[0][idx]
|
36 |
+
if token_id not in closest_token_ids and token_id != original_token_id and token_id not in exclude_token_ids:
|
37 |
+
closest_token_ids.append(token_id)
|
38 |
+
idx += 1
|
39 |
+
|
40 |
+
return random.choice(closest_token_ids[:desired_length])
|
41 |
+
|
42 |
+
def process_sentence(self, text, desired_length=1):
|
43 |
+
encoded_input = self.tokenizer.encode_plus(text, add_special_tokens=True, max_length=256, padding='max_length', return_tensors='pt', truncation=True)
|
44 |
+
attention_masks = encoded_input['attention_mask'].squeeze().tolist()
|
45 |
+
|
46 |
+
with torch.no_grad():
|
47 |
+
model_output = self.model(**encoded_input)
|
48 |
+
|
49 |
+
last_layer_embeddings = model_output.last_hidden_state.squeeze(0).numpy()
|
50 |
+
original_token_ids = encoded_input['input_ids'].squeeze().tolist()
|
51 |
+
|
52 |
+
new_token_ids = []
|
53 |
+
for i, embedding in enumerate(last_layer_embeddings):
|
54 |
+
token_id = original_token_ids[i]
|
55 |
+
if token_id in [self.tokenizer.cls_token_id, self.tokenizer.sep_token_id, self.tokenizer.pad_token_id, self.tokenizer.mask_token_id] or attention_masks[i] == 0:
|
56 |
+
new_token_ids.append(token_id)
|
57 |
+
else:
|
58 |
+
closest_token_id = self.get_closest_token_id(embedding, token_id, desired_length)
|
59 |
+
new_token_ids.append(closest_token_id)
|
60 |
+
|
61 |
+
new_sentence = self.tokenizer.decode(new_token_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True)
|
62 |
+
return new_sentence.strip(), original_token_ids, new_token_ids, attention_masks
|
63 |
|
64 |
anonymizer = Anonimiseren()
|
65 |
+
"""
|
66 |
+
|
67 |
+
exec(runclass)
|
68 |
+
|
69 |
+
batch_size = 4
|
70 |
+
|
71 |
+
def process_batch(sentences, anonymizer, desired_length):
|
72 |
+
batch_results = []
|
73 |
+
batch_original_token_ids = []
|
74 |
+
batch_new_token_ids = []
|
75 |
+
batch_attention_masks = []
|
76 |
+
for sentence in sentences:
|
77 |
+
new_sentence, original_token_ids, new_token_ids, attention_masks = anonymizer.process_sentence(sentence, desired_length)
|
78 |
+
batch_results.append(new_sentence)
|
79 |
+
batch_original_token_ids.append(original_token_ids)
|
80 |
+
batch_new_token_ids.append(new_token_ids)
|
81 |
+
batch_attention_masks.append(attention_masks)
|
82 |
+
return batch_results, batch_original_token_ids, batch_new_token_ids, batch_attention_masks
|
83 |
+
|
84 |
+
def anonymize_texts(text_list, desired_length, output_type):
|
85 |
+
desired_length = int(desired_length)
|
86 |
+
sentences = text_list.split('\n')
|
87 |
+
new_sentences, all_original_token_ids, all_new_token_ids, all_attention_masks = [], [], [], []
|
88 |
+
|
89 |
+
for i in tqdm(range(0, len(sentences), batch_size), desc="Processing batches"):
|
90 |
+
batch = sentences[i:i + batch_size]
|
91 |
+
batch_results, batch_original_ids, batch_new_ids, batch_attention_masks = process_batch(batch, anonymizer, desired_length)
|
92 |
+
new_sentences.extend(batch_results)
|
93 |
+
all_original_token_ids.extend(batch_original_ids)
|
94 |
+
all_new_token_ids.extend(batch_new_ids)
|
95 |
+
all_attention_masks.extend(batch_attention_masks)
|
96 |
+
del batch_results, batch_original_ids, batch_new_ids, batch_attention_masks
|
97 |
+
|
98 |
+
|
99 |
+
if output_type == "New Sentences":
|
100 |
+
return "\n".join(new_sentences)
|
101 |
+
elif output_type == "Token IDs":
|
102 |
+
return "\n".join([str(ids) for ids in all_new_token_ids])
|
103 |
+
elif output_type == "Attention Masks":
|
104 |
+
return "\n".join([str(masks) for masks in all_attention_masks])
|
105 |
+
elif output_type == "Token IDs & Attention Masks":
|
106 |
+
combined_output = []
|
107 |
+
for token_ids, masks in zip(all_new_token_ids, all_attention_masks):
|
108 |
+
combined_output.append(f"Token IDs: {token_ids}\nAttention Masks: {masks}\n")
|
109 |
+
return "\n".join(combined_output)
|
110 |
+
|
111 |
+
interface = gr.Interface(
|
112 |
+
fn=anonymize_texts,
|
113 |
+
inputs=[
|
114 |
+
gr.TextArea(label="Input Text"),
|
115 |
+
gr.Number(label="Desired Length"),
|
116 |
+
gr.Dropdown(choices=["New Sentences", "Token IDs", "Attention Masks", "Token IDs & Attention Masks"], label="Output Type")
|
117 |
+
],
|
118 |
+
outputs=gr.TextArea(label="Output Text"),
|
119 |
+
title="Anonymizer",
|
120 |
+
description="Enter multiple sentences (one per line), select the amount of tokens for anonymization, and choose the output type. Note: must be in Dutch."
|
121 |
+
)
|
122 |
|
123 |
+
interface.launch()
|