|
""" |
|
NLP utilities for policy text processing |
|
""" |
|
|
|
import re |
|
from typing import List, Dict, Any, Tuple |
|
import pandas as pd |
|
import numpy as np |
|
import logging |
|
|
|
|
|
class PolicyTextProcessor: |
|
""" |
|
Text processing utilities for policy documents |
|
""" |
|
|
|
def __init__(self): |
|
"""Initialize text processor with patterns and vocabularies""" |
|
|
|
self.country_mappings = { |
|
"united states": "US", |
|
"china": "CN", |
|
"singapore": "SG", |
|
"malaysia": "MY", |
|
"thailand": "TH", |
|
"vietnam": "VN", |
|
"indonesia": "ID", |
|
"philippines": "PH", |
|
"japan": "JP", |
|
"south korea": "KR", |
|
"germany": "DE", |
|
"france": "FR", |
|
"united kingdom": "UK", |
|
"canada": "CA", |
|
"mexico": "MX", |
|
"brazil": "BR", |
|
"india": "IN", |
|
"australia": "AU", |
|
} |
|
|
|
|
|
self.policy_indicators = { |
|
"tariff": ["tariff", "duty", "customs", "import tax"], |
|
"quota": ["quota", "limit", "restriction", "ceiling"], |
|
"subsidy": ["subsidy", "support", "assistance", "aid"], |
|
"sanction": ["sanction", "penalty", "embargo", "ban"], |
|
"agreement": ["agreement", "treaty", "accord", "pact"], |
|
} |
|
|
|
|
|
self.urgency_patterns = [ |
|
r"immediate(?:ly)?", |
|
r"urgent(?:ly)?", |
|
r"emergency", |
|
r"temporary", |
|
r"suspension", |
|
r"retaliation", |
|
r"response\s+to", |
|
r"investigation", |
|
r"anti.?dumping", |
|
r"safeguard", |
|
] |
|
|
|
|
|
self.date_patterns = [ |
|
r"\d{1,2}[/-]\d{1,2}[/-]\d{2,4}", |
|
r"\d{4}[/-]\d{1,2}[/-]\d{1,2}", |
|
r"(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}", |
|
] |
|
|
|
def clean_text(self, text: str) -> str: |
|
"""Clean and normalize policy text""" |
|
if not isinstance(text, str): |
|
return "" |
|
|
|
|
|
text = re.sub(r"\s+", " ", text) |
|
|
|
|
|
text = re.sub(r"[^\w\s\.,;:!?()-]", " ", text) |
|
|
|
|
|
text = text.strip().lower() |
|
|
|
return text |
|
|
|
def extract_entities(self, text: str) -> Dict[str, List[str]]: |
|
"""Extract named entities from policy text""" |
|
text_clean = self.clean_text(text) |
|
|
|
entities = { |
|
"countries": [], |
|
"dates": [], |
|
"hs_codes": [], |
|
"amounts": [], |
|
"policy_types": [], |
|
} |
|
|
|
|
|
for country_name, country_code in self.country_mappings.items(): |
|
if country_name in text_clean: |
|
entities["countries"].append(country_code) |
|
|
|
|
|
for pattern in self.date_patterns: |
|
matches = re.findall(pattern, text, re.IGNORECASE) |
|
entities["dates"].extend(matches) |
|
|
|
|
|
hs_patterns = [ |
|
r"hs\s*(\d{2,10})", |
|
r"heading\s*(\d{2,4})", |
|
r"tariff\s*line\s*(\d+)", |
|
r"classification\s*(\d+)", |
|
] |
|
for pattern in hs_patterns: |
|
matches = re.findall(pattern, text_clean, re.IGNORECASE) |
|
entities["hs_codes"].extend(matches) |
|
|
|
|
|
amount_patterns = [ |
|
r"\$\s*(\d+(?:,\d{3})*(?:\.\d{2})?)", |
|
r"(\d+(?:,\d{3})*(?:\.\d{2})?)\s*(?:dollar|usd|million|billion)", |
|
r"(\d+(?:\.\d+)?)\s*%", |
|
] |
|
for pattern in amount_patterns: |
|
matches = re.findall(pattern, text_clean, re.IGNORECASE) |
|
entities["amounts"].extend(matches) |
|
|
|
|
|
for policy_type, keywords in self.policy_indicators.items(): |
|
if any(keyword in text_clean for keyword in keywords): |
|
entities["policy_types"].append(policy_type) |
|
|
|
return entities |
|
|
|
def calculate_sentiment_score(self, text: str) -> float: |
|
"""Calculate basic sentiment score for policy text""" |
|
|
|
positive_words = [ |
|
"benefit", |
|
"growth", |
|
"increase", |
|
"improve", |
|
"support", |
|
"enhance", |
|
"strengthen", |
|
"boost", |
|
"advantage", |
|
"opportunity", |
|
] |
|
|
|
negative_words = [ |
|
"tariff", |
|
"penalty", |
|
"restriction", |
|
"ban", |
|
"limit", |
|
"reduce", |
|
"decrease", |
|
"harm", |
|
"damage", |
|
"threat", |
|
"sanction", |
|
"retaliation", |
|
"dispute", |
|
"conflict", |
|
] |
|
|
|
neutral_words = [ |
|
"policy", |
|
"measure", |
|
"regulation", |
|
"standard", |
|
"procedure", |
|
"implement", |
|
"establish", |
|
"maintain", |
|
"review", |
|
"monitor", |
|
] |
|
|
|
text_clean = self.clean_text(text) |
|
words = text_clean.split() |
|
|
|
positive_count = sum(1 for word in words if word in positive_words) |
|
negative_count = sum(1 for word in words if word in negative_words) |
|
neutral_count = sum(1 for word in words if word in neutral_words) |
|
|
|
total_sentiment_words = positive_count + negative_count + neutral_count |
|
|
|
if total_sentiment_words == 0: |
|
return 0.0 |
|
|
|
|
|
sentiment_score = (positive_count - negative_count) / total_sentiment_words |
|
return max(-1.0, min(1.0, sentiment_score)) |
|
|
|
def extract_numerical_features(self, text: str) -> Dict[str, float]: |
|
"""Extract numerical features from text for ML models""" |
|
text_clean = self.clean_text(text) |
|
|
|
features = { |
|
"text_length": len(text), |
|
"word_count": len(text_clean.split()), |
|
"sentence_count": len([s for s in re.split(r"[.!?]+", text) if s.strip()]), |
|
"avg_word_length": ( |
|
np.mean([len(word) for word in text_clean.split()]) |
|
if text_clean.split() |
|
else 0 |
|
), |
|
"punctuation_ratio": ( |
|
sum(1 for char in text if char in ".,;:!?") / len(text) if text else 0 |
|
), |
|
"uppercase_ratio": ( |
|
sum(1 for char in text if char.isupper()) / len(text) if text else 0 |
|
), |
|
"digit_ratio": ( |
|
sum(1 for char in text if char.isdigit()) / len(text) if text else 0 |
|
), |
|
"urgency_score": self._calculate_urgency_score(text_clean), |
|
"sentiment_score": self.calculate_sentiment_score(text), |
|
"entity_density": self._calculate_entity_density(text), |
|
} |
|
|
|
return features |
|
|
|
def _calculate_urgency_score(self, text: str) -> float: |
|
"""Calculate urgency score based on keyword patterns""" |
|
urgency_count = 0 |
|
for pattern in self.urgency_patterns: |
|
matches = re.findall(pattern, text, re.IGNORECASE) |
|
urgency_count += len(matches) |
|
|
|
|
|
words = text.split() |
|
if not words: |
|
return 0.0 |
|
|
|
return min(urgency_count / len(words) * 100, 1.0) |
|
|
|
def _calculate_entity_density(self, text: str) -> float: |
|
"""Calculate entity density in text""" |
|
entities = self.extract_entities(text) |
|
total_entities = sum(len(entity_list) for entity_list in entities.values()) |
|
|
|
words = text.split() |
|
if not words: |
|
return 0.0 |
|
|
|
return total_entities / len(words) |
|
|
|
def identify_policy_scope(self, text: str) -> Dict[str, Any]: |
|
"""Identify the scope and impact level of a policy""" |
|
text_clean = self.clean_text(text) |
|
entities = self.extract_entities(text) |
|
|
|
scope_indicators = { |
|
"bilateral": ["between", "bilateral", "two countries", "agreement with"], |
|
"multilateral": ["multilateral", "multiple countries", "wto", "regional"], |
|
"unilateral": ["unilateral", "impose", "implement", "domestic"], |
|
"global": ["global", "worldwide", "international", "all countries"], |
|
} |
|
|
|
scope_scores = {} |
|
for scope_type, indicators in scope_indicators.items(): |
|
score = sum(1 for indicator in indicators if indicator in text_clean) |
|
scope_scores[scope_type] = score |
|
|
|
|
|
primary_scope = ( |
|
max(scope_scores.items(), key=lambda x: x[1])[0] |
|
if any(scope_scores.values()) |
|
else "unknown" |
|
) |
|
|
|
return { |
|
"primary_scope": primary_scope, |
|
"scope_scores": scope_scores, |
|
"affected_countries": entities["countries"], |
|
"policy_types": entities["policy_types"], |
|
"confidence": ( |
|
max(scope_scores.values()) / sum(scope_scores.values()) |
|
if sum(scope_scores.values()) > 0 |
|
else 0 |
|
), |
|
} |
|
|
|
def parse_policy_timeline(self, text: str) -> List[Dict[str, Any]]: |
|
"""Parse timeline information from policy text""" |
|
timeline_patterns = [ |
|
( |
|
r"effective\s+(?:from\s+)?(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})", |
|
"effective_date", |
|
), |
|
(r"expires?\s+(?:on\s+)?(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})", "expiry_date"), |
|
(r"review\s+(?:on\s+)?(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})", "review_date"), |
|
(r"(?:within\s+)?(\d+)\s+(?:days?|months?|years?)", "duration"), |
|
(r"immediate(?:ly)?", "immediate"), |
|
] |
|
|
|
timeline_events = [] |
|
for pattern, event_type in timeline_patterns: |
|
matches = re.findall(pattern, text, re.IGNORECASE) |
|
for match in matches: |
|
timeline_events.append( |
|
{ |
|
"type": event_type, |
|
"value": match, |
|
"text_position": text.lower().find(match.lower()), |
|
} |
|
) |
|
|
|
|
|
timeline_events.sort(key=lambda x: x["text_position"]) |
|
|
|
return timeline_events |
|
|
|
|
|
def extract_policy_triggers(text: str) -> Any: |
|
"""Extract policy triggers from text (stub).""" |
|
logging.info(f"Extracting policy triggers from text: {text[:30]}...") |
|
|
|
return None |
|
|