File size: 5,261 Bytes
aa29d50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c30df20
aa29d50
 
 
c30df20
ee7ba2f
c30df20
aa29d50
 
 
 
 
 
 
 
 
 
c30df20
 
aa29d50
 
 
 
 
 
 
 
 
 
c30df20
 
 
 
 
aa29d50
 
 
 
 
 
 
 
 
 
 
 
 
 
ee7ba2f
aa29d50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c30df20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aa29d50
 
 
 
 
 
 
 
 
c30df20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aa29d50
 
 
 
 
 
 
c30df20
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""
Helper methods for the Presidio Streamlit app
"""
from typing import List, Optional, Tuple
import logging
import streamlit as st
from presidio_analyzer import (
    AnalyzerEngine,
    RecognizerResult,
    RecognizerRegistry,
    PatternRecognizer,
)
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig

logger = logging.getLogger("presidio-streamlit")

@st.cache_resource
def nlp_engine_and_registry(
    model_family: str,
    model_path: str,
) -> Tuple[object, RecognizerRegistry]:
    """Create the NLP Engine instance based on the requested model."""
    registry = RecognizerRegistry()
    
    try:
        if model_family.lower() == "flair":
            from flair.models import SequenceTagger
            tagger = SequenceTagger.load(model_path)
            registry.load_predefined_recognizers()
            registry.add_recognizer_from_dict({
                "name": "flair_recognizer",
                "supported_language": "en",
                "supported_entities": ["PERSON", "LOCATION", "ORGANIZATION"],
                "model": model_path,
                "package": "flair",
            })
            return tagger, registry
        elif model_family.lower() == "huggingface":
            from transformers import pipeline
            nlp = pipeline("ner", model=model_path, tokenizer=model_path)
            registry.load_predefined_recognizers()
            registry.add_recognizer_from_dict({
                "name": "huggingface_recognizer",
                "supported_language": "en",
                "supported_entities": ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"],
                "model": model_path,
                "package": "transformers",
            })
            return nlp, registry
        else:
            raise ValueError(f"Model family {model_family} not supported")
    except Exception as e:
        logger.error(f"Error loading model {model_path} for {model_family}: {str(e)}")
        raise RuntimeError(f"Failed to load model: {str(e)}. Ensure model is downloaded and accessible.")

@st.cache_resource
def analyzer_engine(
    model_family: str,
    model_path: str,
) -> AnalyzerEngine:
    """Create the Analyzer Engine instance based on the requested model."""
    nlp_engine, registry = nlp_engine_and_registry(model_family, model_path)
    analyzer = AnalyzerEngine(registry=registry)
    return analyzer

@st.cache_data
def get_supported_entities(model_family: str, model_path: str) -> List[str]:
    """Return supported entities for the selected model."""
    if model_family.lower() == "huggingface":
        return ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"]
    elif model_family.lower() == "flair":
        return ["PERSON", "LOCATION", "ORGANIZATION"]
    return ["PERSON", "LOCATION", "ORGANIZATION"]

def analyze(
    analyzer: AnalyzerEngine,
    text: str,
    entities: List[str],
    language: str,
    score_threshold: float,
    return_decision_process: bool,
    allow_list: List[str],
    deny_list: List[str],
) -> List[RecognizerResult]:
    """Analyze text for PHI entities."""
    try:
        results = analyzer.analyze(
            text=text,
            entities=entities,
            language=language,
            score_threshold=score_threshold,
            return_decision_process=return_decision_process,
        )
        # Apply allow and deny lists
        filtered_results = []
        for result in results:
            text_snippet = text[result.start:result.end].lower()
            if any(word.lower() in text_snippet for word in allow_list):
                continue
            if any(word.lower() in text_snippet for word in deny_list):
                filtered_results.append(result)
            elif not deny_list:
                filtered_results.append(result)
        return filtered_results
    except Exception as e:
        logger.error(f"Analysis error: {str(e)}")
        raise

def anonymize(
    text: str,
    operator: str,
    analyze_results: List[RecognizerResult],
    mask_char: str = "*",
    number_of_chars: int = 15,
) -> dict:
    """Anonymize detected PHI entities in the text."""
    try:
        anonymizer = AnonymizerEngine()
        operator_config = {
            "DEFAULT": OperatorConfig(operator, {})
        }
        if operator == "mask":
            operator_config["DEFAULT"] = OperatorConfig(operator, {
                "masking_char": mask_char,
                "chars_to_mask": number_of_chars,
            })
        return anonymizer.anonymize(
            text=text,
            analyzer_results=analyze_results,
            operators=operator_config,
        )
    except Exception as e:
        logger.error(f"Anonymization error: {str(e)}")
        raise

def create_ad_hoc_deny_list_recognizer(
    deny_list: Optional[List[str]] = None,
) -> Optional[PatternRecognizer]:
    """Create a recognizer for deny list items."""
    if not deny_list:
        return None
    try:
        deny_list_recognizer = PatternRecognizer(
            supported_entity="GENERIC_PII", deny_list=deny_list
        )
        return deny_list_recognizer
    except Exception as e:
        logger.error(f"Error creating deny list recognizer: {str(e)}")
        raise