Spaces:
Running
on
Zero
Running
on
Zero
| import torch | |
| import clip | |
| from PIL import Image | |
| import numpy as np | |
| from typing import List, Dict, Tuple, Optional, Union, Any | |
| from landmark_data import ALL_LANDMARKS, get_all_landmark_prompts | |
| class CLIPZeroShotClassifier: | |
| """ | |
| 使用CLIP模型進行零樣本分類,專注於識別世界知名地標。 | |
| 作為YOLO檢測的補充,處理標準對象檢測無法識別的地標建築。 | |
| """ | |
| def __init__(self, model_name: str = "ViT-B/16", device: str = None): | |
| """ | |
| 初始化CLIP零樣本分類器 | |
| Args: | |
| model_name: CLIP模型名稱,默認為"ViT-B/16" | |
| device: 運行設備,None則自動選擇 | |
| """ | |
| # 設置運行設備 | |
| if device is None: | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| else: | |
| self.device = device | |
| print(f"Initializing CLIP Zero-Shot Landmark Classifier ({model_name}) on {self.device}") | |
| try: | |
| self.model, self.preprocess = clip.load(model_name, device=self.device) | |
| print(f"Successfully loaded CLIP model") | |
| except Exception as e: | |
| print(f"Error loading CLIP model: {e}") | |
| raise | |
| # 加載地標數據 | |
| try: | |
| self.landmark_data = ALL_LANDMARKS | |
| self.landmark_prompts = get_all_landmark_prompts() | |
| print(f"Loaded {len(self.landmark_prompts)} landmark prompts for classification") | |
| # 預計算地標文本特徵 | |
| self.landmark_text_features = self._precompute_text_features(self.landmark_prompts) | |
| # 創建地標ID到索引的映射,可快速查找 | |
| self.landmark_id_to_index = {landmark_id: i for i, landmark_id in enumerate(ALL_LANDMARKS.keys())} | |
| # 初始化批處理參數 | |
| self.batch_size = 16 # 默認批處理大小 | |
| self.confidence_threshold_multipliers = { | |
| "close_up": 0.9, # 近景標準閾值 | |
| "partial": 0.6, # 部分可見降低閾值要求 | |
| "distant": 0.5, # 遠景更低閾值要求 | |
| "full_image": 0.7 # 整張圖像需要更高閾值 | |
| } | |
| self.landmark_type_thresholds = { | |
| "tower": 0.5, # 塔型建築需要更高閾值 | |
| "skyscraper": 0.4, # 摩天大樓使用較低閾值 | |
| "building": 0.55, # 一般建築物閾值略微降低 | |
| "monument": 0.5, # 紀念碑閾值 | |
| "natural": 0.6 # 自然地標可以使用較低閾值 | |
| } | |
| # 初始化結果快取 | |
| self.results_cache = {} # 使用圖像hash作為鍵 | |
| self.cache_max_size = 100 # 最大快取項目數 | |
| except ImportError: | |
| print("Warning: landmark_data.py not found. Landmark classification will be limited") | |
| self.landmark_data = {} | |
| self.landmark_prompts = [] | |
| self.landmark_text_features = None | |
| self.landmark_id_to_index = {} | |
| self.results_cache = {} | |
| def _get_image_hash(self, image): | |
| """ | |
| 為圖像生成簡單的 hash 值用於快取 | |
| Args: | |
| image: PIL Image 或 numpy 數組 | |
| Returns: | |
| str: 圖像的 hash 值 | |
| """ | |
| if isinstance(image, np.ndarray): | |
| # 對於 numpy 數組,降採樣並計算簡單 hash | |
| small_img = image[::10, ::10] if image.ndim == 3 else image | |
| return hash(small_img.tobytes()) | |
| else: | |
| # 對於 PIL 圖像,調整大小後轉換為 bytes | |
| small_img = image.resize((32, 32)) | |
| return hash(small_img.tobytes()) | |
| def _manage_cache(self): | |
| """ | |
| 管理結果快取大小 | |
| """ | |
| if len(self.results_cache) > self.cache_max_size: | |
| oldest_key = next(iter(self.results_cache)) | |
| del self.results_cache[oldest_key] | |
| def set_batch_size(self, batch_size: int): | |
| """ | |
| 設置批處理大小 | |
| Args: | |
| batch_size: 新的批處理大小 | |
| """ | |
| self.batch_size = max(1, batch_size) | |
| print(f"Batch size set to {self.batch_size}") | |
| def adjust_confidence_threshold(self, detection_type: str, multiplier: float): | |
| """ | |
| 調整特定檢測類型的置信度閾值乘數 | |
| Args: | |
| detection_type: 檢測類型 ('close_up', 'partial', 'distant', 'full_image') | |
| multiplier: 置信度閾值乘數 | |
| """ | |
| if detection_type in self.confidence_threshold_multipliers: | |
| self.confidence_threshold_multipliers[detection_type] = max(0.1, min(1.5, multiplier)) | |
| print(f"Adjusted confidence threshold multiplier for {detection_type} to {multiplier}") | |
| else: | |
| print(f"Unknown detection type: {detection_type}") | |
| def _precompute_text_features(self, text_prompts: List[str]) -> torch.Tensor: | |
| """ | |
| 預計算文本提示的CLIP特徵,提高批處理效率 | |
| Args: | |
| text_prompts: 文本提示列表 | |
| Returns: | |
| torch.Tensor: 預計算的文本特徵 | |
| """ | |
| if not text_prompts: | |
| return None | |
| with torch.no_grad(): | |
| # Process in batches to avoid CUDA memory issues | |
| batch_size = 128 # Adjust based on GPU memory | |
| features_list = [] | |
| for i in range(0, len(text_prompts), batch_size): | |
| batch_prompts = text_prompts[i:i+batch_size] | |
| text_tokens = clip.tokenize(batch_prompts).to(self.device) | |
| batch_features = self.model.encode_text(text_tokens) | |
| batch_features = batch_features / batch_features.norm(dim=-1, keepdim=True) | |
| features_list.append(batch_features) | |
| # Concatenate all batches | |
| if len(features_list) > 1: | |
| text_features = torch.cat(features_list, dim=0) | |
| else: | |
| text_features = features_list[0] | |
| return text_features | |
| def _perform_pyramid_analysis(self, | |
| image: Union[Image.Image, np.ndarray], | |
| levels: int = 4, | |
| base_threshold: float = 0.25, | |
| aspect_ratios: List[float] = [1.0, 0.75, 1.5]) -> Dict[str, Any]: | |
| """ | |
| Performs multi-scale pyramid analysis on the image to improve landmark detection. | |
| Args: | |
| image: Input image | |
| levels: Number of pyramid levels | |
| base_threshold: Base confidence threshold | |
| aspect_ratios: Different aspect ratios to try (for tall buildings vs wide landscapes) | |
| Returns: | |
| Dict: Results of pyramid analysis | |
| """ | |
| # Ensure image is PIL format | |
| if not isinstance(image, Image.Image): | |
| if isinstance(image, np.ndarray): | |
| image = Image.fromarray(image) | |
| else: | |
| raise ValueError("Unsupported image format. Expected PIL Image or numpy array.") | |
| width, height = image.size | |
| pyramid_results = [] | |
| # 對每個縮放和縱橫比組合進行處理 | |
| for level in range(levels): | |
| # 計算縮放因子 | |
| scale_factor = 1.0 - (level * 0.2) | |
| for aspect_ratio in aspect_ratios: | |
| # 計算新尺寸,保持面積近似不變 | |
| if aspect_ratio != 1.0: | |
| # 保持面積近似不變的情況下調整縱橫比 | |
| new_width = int(width * scale_factor * (1/aspect_ratio)**0.5) | |
| new_height = int(height * scale_factor * aspect_ratio**0.5) | |
| else: | |
| new_width = int(width * scale_factor) | |
| new_height = int(height * scale_factor) | |
| # 調整圖像大小 | |
| scaled_image = image.resize((new_width, new_height), Image.LANCZOS) | |
| # 預處理圖像 | |
| image_input = self.preprocess(scaled_image).unsqueeze(0).to(self.device) | |
| # 獲取圖像特徵 | |
| with torch.no_grad(): | |
| image_features = self.model.encode_image(image_input) | |
| image_features = image_features / image_features.norm(dim=-1, keepdim=True) | |
| # 計算相似度 | |
| similarity = (100.0 * image_features @ self.landmark_text_features.T).softmax(dim=-1) | |
| similarity = similarity.cpu().numpy()[0] if self.device == "cuda" else similarity.numpy()[0] | |
| # 找到最佳匹配 | |
| best_idx = similarity.argmax().item() | |
| best_score = similarity[best_idx] | |
| if best_score >= base_threshold: | |
| landmark_id = list(self.landmark_data.keys())[best_idx] | |
| landmark_info = self.landmark_data[landmark_id] | |
| pyramid_results.append({ | |
| "landmark_id": landmark_id, | |
| "landmark_name": landmark_info["name"], | |
| "confidence": float(best_score), | |
| "scale_factor": scale_factor, | |
| "aspect_ratio": aspect_ratio, | |
| "location": landmark_info["location"] | |
| }) | |
| # 按置信度排序 | |
| pyramid_results.sort(key=lambda x: x["confidence"], reverse=True) | |
| return { | |
| "is_landmark": len(pyramid_results) > 0, | |
| "results": pyramid_results, | |
| "best_result": pyramid_results[0] if pyramid_results else None | |
| } | |
| def _enhance_features(self, image: Union[Image.Image, np.ndarray]) -> Image.Image: | |
| """ | |
| Enhances image features to improve landmark detection. | |
| Args: | |
| image: Input image | |
| Returns: | |
| PIL.Image: Enhanced image | |
| """ | |
| # Ensure image is PIL format | |
| if not isinstance(image, Image.Image): | |
| if isinstance(image, np.ndarray): | |
| image = Image.fromarray(image) | |
| else: | |
| raise ValueError("Unsupported image format. Expected PIL Image or numpy array.") | |
| # Convert to numpy for processing | |
| img_array = np.array(image) | |
| # Skip processing for grayscale images | |
| if len(img_array.shape) < 3: | |
| return image | |
| # Apply adaptive contrast enhancement | |
| # Convert to LAB color space | |
| from skimage import color, exposure | |
| try: | |
| # Convert to LAB color space | |
| if img_array.shape[2] == 4: # Handle RGBA | |
| img_array = img_array[:,:,:3] | |
| lab = color.rgb2lab(img_array[:,:,:3] / 255.0) | |
| l_channel = lab[:,:,0] | |
| # Enhance contrast of L channel | |
| p2, p98 = np.percentile(l_channel, (2, 98)) | |
| l_channel_enhanced = exposure.rescale_intensity(l_channel, in_range=(p2, p98)) | |
| # Replace L channel and convert back to RGB | |
| lab[:,:,0] = l_channel_enhanced | |
| enhanced_img = color.lab2rgb(lab) * 255.0 | |
| enhanced_img = enhanced_img.astype(np.uint8) | |
| return Image.fromarray(enhanced_img) | |
| except ImportError: | |
| print("Warning: skimage not available for feature enhancement") | |
| return image | |
| except Exception as e: | |
| print(f"Error in feature enhancement: {e}") | |
| return image | |
| def _determine_landmark_type(self, landmark_id): | |
| """ | |
| 自動判斷地標類型,基於地標數據和命名 | |
| Returns: | |
| str: 地標類型,用於調整閾值 | |
| """ | |
| if not landmark_id: | |
| return "building" # 預設類型 | |
| # 獲取地標詳細數據 | |
| landmark_data = self.landmark_data if hasattr(self, 'landmark_data') else {} | |
| landmark_info = landmark_data.get(landmark_id, {}) | |
| # 獲取地標相關文本 | |
| landmark_id_lower = landmark_id.lower() | |
| landmark_name = landmark_info.get("name", "").lower() | |
| landmark_location = landmark_info.get("location", "").lower() | |
| landmark_aliases = [alias.lower() for alias in landmark_info.get("aliases", [])] | |
| # 合併所有文本數據用於特徵判斷 | |
| combined_text = " ".join([landmark_id_lower, landmark_name] + landmark_aliases) | |
| # 地標類型的特色特徵 | |
| type_features = { | |
| "skyscraper": ["skyscraper", "tall", "tower", "高樓", "摩天", "大厦", "タワー"], | |
| "tower": ["tower", "bell", "clock", "塔", "鐘樓", "タワー", "campanile"], | |
| "monument": ["monument", "memorial", "statue", "紀念", "雕像", "像", "memorial"], | |
| "natural": ["mountain", "lake", "canyon", "falls", "beach", "山", "湖", "峽谷", "瀑布", "海灘"], | |
| "temple": ["temple", "shrine", "寺", "神社", "廟"], | |
| "palace": ["palace", "castle", "宮", "城", "皇宮", "宫殿"], | |
| "distinctive": ["unique", "leaning", "slanted", "傾斜", "斜", "獨特", "傾く"] | |
| } | |
| # 檢查是否位於亞洲地區 | |
| asian_regions = ["china", "japan", "korea", "taiwan", "singapore", "vietnam", "thailand", | |
| "hong kong", "中國", "日本", "韓國", "台灣", "新加坡", "越南", "泰國", "香港"] | |
| is_asian = any(region in landmark_location for region in asian_regions) | |
| # 判斷地標類型 | |
| best_type = None | |
| max_matches = 0 | |
| for type_name, features in type_features.items(): | |
| # 計算特徵詞匹配數量 | |
| matches = sum(1 for feature in features if feature in combined_text) | |
| if matches > max_matches: | |
| max_matches = matches | |
| best_type = type_name | |
| # 處理亞洲地區特例 | |
| if is_asian and best_type == "tower": | |
| best_type = "skyscraper" # 亞洲地區的塔型建築閾值較低 | |
| # 特例處理:檢測傾斜建築 | |
| if any(term in combined_text for term in ["leaning", "slanted", "tilt", "inclined", "斜", "傾斜"]): | |
| return "distinctive" # 傾斜建築需要特殊處理 | |
| return best_type if best_type and max_matches > 0 else "building" # 預設為一般建築 | |
| def classify_image_region(self, | |
| image: Union[Image.Image, np.ndarray], | |
| box: List[float], | |
| threshold: float = 0.25, | |
| detection_type: str = "close_up") -> Dict[str, Any]: | |
| """ | |
| 對圖像的特定區域進行地標分類,具有增強的多尺度和部分識別能力 | |
| Args: | |
| image: 原始圖像 (PIL Image 或 numpy數組) | |
| box: 邊界框 [x1, y1, x2, y2] | |
| threshold: 基礎分類置信度閾值 | |
| detection_type: 檢測類型,影響置信度調整 | |
| Returns: | |
| Dict: 地標分類結果 | |
| """ | |
| # 確保圖像是PIL格式 | |
| if not isinstance(image, Image.Image): | |
| if isinstance(image, np.ndarray): | |
| image = Image.fromarray(image) | |
| else: | |
| raise ValueError("Unsupported image format. Expected PIL Image or numpy array.") | |
| # 生成圖像區域的hash用於快取 | |
| region_key = (self._get_image_hash(image), tuple(box), detection_type) | |
| if region_key in self.results_cache: | |
| return self.results_cache[region_key] | |
| # 裁剪區域 | |
| x1, y1, x2, y2 = map(int, box) | |
| cropped_image = image.crop((x1, y1, x2, y2)) | |
| enhanced_image = self._enhance_features(cropped_image) | |
| # 分析視角信息 | |
| viewpoint_info = self._analyze_viewpoint(enhanced_image) | |
| dominant_viewpoint = viewpoint_info["dominant_viewpoint"] | |
| # 計算區域信息 | |
| region_width = x2 - x1 | |
| region_height = y2 - y1 | |
| image_width, image_height = image.size | |
| # 根據區域大小判斷可能的檢測類型 | |
| region_area_ratio = (region_width * region_height) / (image_width * image_height) | |
| if detection_type == "auto": | |
| if region_area_ratio > 0.5: | |
| detection_type = "close_up" | |
| elif region_area_ratio > 0.2: | |
| detection_type = "partial" | |
| else: | |
| detection_type = "distant" | |
| # 根據視角調整檢測類型 | |
| if dominant_viewpoint == "close_up" and detection_type != "close_up": | |
| detection_type = "close_up" | |
| elif dominant_viewpoint == "distant" and detection_type != "distant": | |
| detection_type = "distant" | |
| elif dominant_viewpoint == "angled_view": | |
| detection_type = "partial" # 角度視圖可能是部分可見 | |
| # 調整置信度閾值 | |
| base_multiplier = self.confidence_threshold_multipliers.get(detection_type, 1.0) | |
| adjusted_threshold = threshold * base_multiplier | |
| # 調整多尺度處理的尺度範圍和縱橫比 - 增強對傾斜建築的支持 | |
| scales = [1.0] # 默認尺度 | |
| # 基於視角選擇合適的尺度和縱橫比 | |
| if detection_type in ["partial", "distant"]: | |
| scales = [0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3] # 標準範圍 | |
| # 如果是特殊視角,進一步調整尺度和縱橫比 - 新增 | |
| if dominant_viewpoint in ["angled_view", "low_angle"]: | |
| scales = [0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.4] # 更寬的範圍 | |
| # 準備縱橫比 - 同時支持水平和垂直地標 | |
| aspect_ratios = [1.0, 0.8, 1.2] # 標準縱橫比 | |
| # 針對可能的傾斜建築增加更多縱橫比 - 新增 | |
| if dominant_viewpoint in ["angled_view", "unique_feature"]: | |
| aspect_ratios = [0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.5] # 更多樣的縱橫比 | |
| best_result = { | |
| "landmark_id": None, | |
| "landmark_name": None, | |
| "confidence": 0.0, | |
| "is_landmark": False | |
| } | |
| # 多尺度和縱橫比分析 | |
| for scale in scales: | |
| for aspect_ratio in aspect_ratios: | |
| # 縮放裁剪區域 | |
| current_width, current_height = cropped_image.size | |
| # 計算新尺寸,保持面積不變但調整縱橫比 | |
| if aspect_ratio != 1.0: | |
| new_width = int(current_width * scale * (1/aspect_ratio)**0.5) | |
| new_height = int(current_height * scale * aspect_ratio**0.5) | |
| else: | |
| new_width = int(current_width * scale) | |
| new_height = int(current_height * scale) | |
| # 確保尺寸至少為1像素 | |
| new_width = max(1, new_width) | |
| new_height = max(1, new_height) | |
| # 縮放圖像 | |
| try: | |
| scaled_image = cropped_image.resize((new_width, new_height), Image.LANCZOS) | |
| except Exception as e: | |
| print(f"Failed to resize image to {new_width}x{new_height}: {e}") | |
| continue | |
| # 預處理裁剪圖像 | |
| try: | |
| image_input = self.preprocess(scaled_image).unsqueeze(0).to(self.device) | |
| except Exception as e: | |
| print(f"Failed to preprocess image: {e}") | |
| continue | |
| # 獲取圖像特徵 | |
| with torch.no_grad(): | |
| try: | |
| image_features = self.model.encode_image(image_input) | |
| image_features = image_features / image_features.norm(dim=-1, keepdim=True) | |
| # 計算與地標提示的相似度 | |
| similarity = (100.0 * image_features @ self.landmark_text_features.T).softmax(dim=-1) | |
| similarity = similarity.cpu().numpy()[0] if self.device == "cuda" else similarity.numpy()[0] | |
| # 找到最佳匹配 | |
| best_idx = similarity.argmax().item() | |
| best_score = similarity[best_idx] | |
| # 如果當前尺度結果更好,則更新 | |
| if best_score > best_result["confidence"]: | |
| landmark_id = list(self.landmark_data.keys())[best_idx] | |
| landmark_info = self.landmark_data[landmark_id] | |
| best_result = { | |
| "landmark_id": landmark_id, | |
| "landmark_name": landmark_info["name"], | |
| "location": landmark_info["location"], | |
| "confidence": float(best_score), | |
| "is_landmark": best_score >= adjusted_threshold, | |
| "scale_used": scale, | |
| "aspect_ratio_used": aspect_ratio, | |
| "viewpoint": dominant_viewpoint | |
| } | |
| # 添加額外可用信息 | |
| for key in ["year_built", "architectural_style", "significance"]: | |
| if key in landmark_info: | |
| best_result[key] = landmark_info[key] | |
| except Exception as e: | |
| print(f"Error in calculating similarity: {e}") | |
| continue | |
| # 只有在有識別出地標ID且信心度足夠高時才應用地標類型閾值調整 | |
| if best_result["landmark_id"]: | |
| landmark_type = self._determine_landmark_type(best_result["landmark_id"]) | |
| # 檢測是否為特殊類型的建築如斜塔 | |
| if landmark_type == "distinctive": | |
| # 特殊建築的閾值降低25% | |
| type_multiplier = 0.75 | |
| else: | |
| # 使用已有的類型閾值 | |
| type_multiplier = self.landmark_type_thresholds.get(landmark_type, 1.0) / 0.5 | |
| # 更新判斷是否為地標的標準 | |
| final_threshold = adjusted_threshold * type_multiplier | |
| best_result["is_landmark"] = best_result["confidence"] >= final_threshold | |
| best_result["landmark_type"] = landmark_type # 添加地標類型信息 | |
| best_result["threshold_applied"] = final_threshold # 記錄應用的閾值 | |
| # 快取結果 | |
| self.results_cache[region_key] = best_result | |
| self._manage_cache() | |
| return best_result | |
| def classify_batch_regions(self, | |
| image: Union[Image.Image, np.ndarray], | |
| boxes: List[List[float]], | |
| threshold: float = 0.28) -> List[Dict[str, Any]]: | |
| """ | |
| 批量處理多個圖像區域,提高效率 | |
| Args: | |
| image: 原始圖像 | |
| boxes: 邊界框列表 | |
| threshold: 置信度閾值 | |
| Returns: | |
| List[Dict]: 分類結果列表 | |
| """ | |
| if not self.landmark_text_features is not None: | |
| return [{"is_landmark": False, "confidence": 0.0} for _ in boxes] | |
| # 確保圖像是PIL格式 | |
| if not isinstance(image, Image.Image): | |
| if isinstance(image, np.ndarray): | |
| image = Image.fromarray(image) | |
| else: | |
| raise ValueError("Unsupported image format. Expected PIL Image or numpy array.") | |
| # 無框可處理時 | |
| if not boxes: | |
| return [] | |
| # 裁剪並預處理所有區域 | |
| cropped_inputs = [] | |
| for box in boxes: | |
| x1, y1, x2, y2 = map(int, box) | |
| cropped_image = image.crop((x1, y1, x2, y2)) | |
| processed_image = self.preprocess(cropped_image).unsqueeze(0) | |
| cropped_inputs.append(processed_image) | |
| # batch process | |
| batch_tensor = torch.cat(cropped_inputs).to(self.device) | |
| # batch encoding | |
| with torch.no_grad(): | |
| image_features = self.model.encode_image(batch_tensor) | |
| image_features = image_features / image_features.norm(dim=-1, keepdim=True) | |
| # 計算相似度 | |
| similarity = (100.0 * image_features @ self.landmark_text_features.T).softmax(dim=-1) | |
| similarity = similarity.cpu().numpy() if self.device == "cuda" else similarity.numpy() | |
| # 處理每個區域的結果 | |
| results = [] | |
| for i, sim in enumerate(similarity): | |
| best_idx = sim.argmax().item() | |
| best_score = sim[best_idx] | |
| if best_score >= threshold: | |
| landmark_id = list(self.landmark_data.keys())[best_idx] | |
| landmark_info = self.landmark_data[landmark_id] | |
| results.append({ | |
| "landmark_id": landmark_id, | |
| "landmark_name": landmark_info["name"], | |
| "location": landmark_info["location"], | |
| "confidence": float(best_score), | |
| "is_landmark": True, | |
| "box": boxes[i] | |
| }) | |
| else: | |
| results.append({ | |
| "landmark_id": None, | |
| "landmark_name": None, | |
| "confidence": float(best_score), | |
| "is_landmark": False, | |
| "box": boxes[i] | |
| }) | |
| return results | |
| def search_entire_image(self, | |
| image: Union[Image.Image, np.ndarray], | |
| threshold: float = 0.35, | |
| detailed_analysis: bool = False) -> Dict[str, Any]: | |
| """ | |
| 檢查整張圖像是否包含地標,具有增強的分析能力 | |
| Args: | |
| image: 原始圖像 | |
| threshold: 置信度閾值 | |
| detailed_analysis: 是否進行詳細分析,包括多區域檢測 | |
| Returns: | |
| Dict: 地標分類結果 | |
| """ | |
| # 確保圖像是PIL格式 | |
| if not isinstance(image, Image.Image): | |
| if isinstance(image, np.ndarray): | |
| image = Image.fromarray(image) | |
| else: | |
| raise ValueError("Unsupported image format. Expected PIL Image or numpy array.") | |
| # 檢查快取 | |
| image_key = (self._get_image_hash(image), "entire_image", detailed_analysis) | |
| if image_key in self.results_cache: | |
| return self.results_cache[image_key] | |
| # 調整閾值 | |
| adjusted_threshold = threshold * self.confidence_threshold_multipliers.get("full_image", 1.0) | |
| # 預處理圖像 | |
| image_input = self.preprocess(image).unsqueeze(0).to(self.device) | |
| # 獲取圖像特徵 | |
| with torch.no_grad(): | |
| image_features = self.model.encode_image(image_input) | |
| image_features = image_features / image_features.norm(dim=-1, keepdim=True) | |
| # 計算與地標提示的相似度 | |
| similarity = (100.0 * image_features @ self.landmark_text_features.T).softmax(dim=-1) | |
| similarity = similarity.cpu().numpy()[0] if self.device == "cuda" else similarity.numpy()[0] | |
| # 找到最佳匹配 | |
| best_idx = similarity.argmax().item() | |
| best_score = similarity[best_idx] | |
| # top3 landmark | |
| top_indices = similarity.argsort()[-3:][::-1] | |
| top_landmarks = [] | |
| for idx in top_indices: | |
| score = similarity[idx] | |
| landmark_id = list(self.landmark_data.keys())[idx] | |
| landmark_info = self.landmark_data[landmark_id] | |
| landmark_result = { | |
| "landmark_id": landmark_id, | |
| "landmark_name": landmark_info["name"], | |
| "location": landmark_info["location"], | |
| "confidence": float(score) | |
| } | |
| # 添加額外可用信息 | |
| if "year_built" in landmark_info: | |
| landmark_result["year_built"] = landmark_info["year_built"] | |
| if "architectural_style" in landmark_info: | |
| landmark_result["architectural_style"] = landmark_info["architectural_style"] | |
| if "significance" in landmark_info: | |
| landmark_result["significance"] = landmark_info["significance"] | |
| top_landmarks.append(landmark_result) | |
| # main result | |
| result = {} | |
| if best_score >= adjusted_threshold: | |
| landmark_id = list(self.landmark_data.keys())[best_idx] | |
| landmark_info = self.landmark_data[landmark_id] | |
| # 應用地標類型特定閾值 | |
| landmark_type = self._determine_landmark_type(landmark_id) | |
| type_multiplier = self.landmark_type_thresholds.get(landmark_type, 1.0) / 0.5 | |
| final_threshold = adjusted_threshold * type_multiplier | |
| if best_score >= final_threshold: | |
| result = { | |
| "landmark_id": landmark_id, | |
| "landmark_name": landmark_info["name"], | |
| "location": landmark_info["location"], | |
| "confidence": float(best_score), | |
| "is_landmark": True, | |
| "landmark_type": landmark_type, | |
| "top_landmarks": top_landmarks | |
| } | |
| # 添加額外可用信息 | |
| if "year_built" in landmark_info: | |
| result["year_built"] = landmark_info["year_built"] | |
| if "architectural_style" in landmark_info: | |
| result["architectural_style"] = landmark_info["architectural_style"] | |
| if "significance" in landmark_info: | |
| result["significance"] = landmark_info["significance"] | |
| else: | |
| result = { | |
| "landmark_id": None, | |
| "landmark_name": None, | |
| "confidence": float(best_score), | |
| "is_landmark": False, | |
| "top_landmarks": top_landmarks | |
| } | |
| # 如果請求詳細分析且是地標,進一步分析圖像區域 | |
| if detailed_analysis and result.get("is_landmark", False): | |
| # 創建不同區域進行更深入分析 | |
| width, height = image.size | |
| regions = [ | |
| # 中心區域 | |
| [width * 0.25, height * 0.25, width * 0.75, height * 0.75], | |
| # 左半部 | |
| [0, 0, width * 0.5, height], | |
| # 右半部 | |
| [width * 0.5, 0, width, height], | |
| # 上半部 | |
| [0, 0, width, height * 0.5], | |
| # 下半部 | |
| [0, height * 0.5, width, height] | |
| ] | |
| region_results = [] | |
| for i, box in enumerate(regions): | |
| region_result = self.classify_image_region( | |
| image, | |
| box, | |
| threshold=threshold * 0.9, | |
| detection_type="partial" | |
| ) | |
| if region_result["is_landmark"]: | |
| region_result["region_name"] = ["center", "left", "right", "top", "bottom"][i] | |
| region_results.append(region_result) | |
| # 添加區域分析結果 | |
| if region_results: | |
| result["region_analyses"] = region_results | |
| # 快取結果 | |
| self.results_cache[image_key] = result | |
| self._manage_cache() | |
| return result | |
| def enhanced_landmark_detection(self, | |
| image: Union[Image.Image, np.ndarray], | |
| threshold: float = 0.3) -> Dict[str, Any]: | |
| """ | |
| Enhanced landmark detection using multiple analysis techniques. | |
| Args: | |
| image: Input image | |
| threshold: Base confidence threshold | |
| Returns: | |
| Dict: Comprehensive landmark detection results | |
| """ | |
| # Ensure image is PIL format | |
| if not isinstance(image, Image.Image): | |
| if isinstance(image, np.ndarray): | |
| image = Image.fromarray(image) | |
| else: | |
| raise ValueError("Unsupported image format. Expected PIL Image or numpy array.") | |
| # Phase 1: Analyze viewpoint to adjust detection parameters | |
| viewpoint_info = self._analyze_viewpoint(image) | |
| viewpoint = viewpoint_info["dominant_viewpoint"] | |
| # Adjust threshold based on viewpoint | |
| if viewpoint == "distant": | |
| adjusted_threshold = threshold * 0.7 # Lower threshold for distant views | |
| elif viewpoint == "close_up": | |
| adjusted_threshold = threshold * 1.1 # Higher threshold for close-ups | |
| else: | |
| adjusted_threshold = threshold | |
| # Phase 2: Perform multi-scale pyramid analysis | |
| pyramid_results = self._perform_pyramid_analysis(image, levels=3, base_threshold=adjusted_threshold) | |
| # Phase 3: Perform grid-based region analysis | |
| grid_results = [] | |
| width, height = image.size | |
| # Create adaptive grid based on viewpoint | |
| if viewpoint == "distant": | |
| grid_size = 3 # Coarser grid for distant views | |
| elif viewpoint == "close_up": | |
| grid_size = 5 # Finer grid for close-ups | |
| else: | |
| grid_size = 4 # Default grid size | |
| # Generate grid regions | |
| for i in range(grid_size): | |
| for j in range(grid_size): | |
| box = [ | |
| width * (j/grid_size), | |
| height * (i/grid_size), | |
| width * ((j+1)/grid_size), | |
| height * ((i+1)/grid_size) | |
| ] | |
| # Apply feature enhancement | |
| region_result = self.classify_image_region( | |
| image, | |
| box, | |
| threshold=adjusted_threshold, | |
| detection_type="auto" | |
| ) | |
| if region_result["is_landmark"]: | |
| region_result["grid_position"] = (i, j) | |
| grid_results.append(region_result) | |
| # Phase 4: Cross-validate and combine results | |
| all_detections = [] | |
| # Add pyramid results | |
| if pyramid_results["is_landmark"] and pyramid_results["best_result"]: | |
| all_detections.append({ | |
| "source": "pyramid", | |
| "landmark_id": pyramid_results["best_result"]["landmark_id"], | |
| "landmark_name": pyramid_results["best_result"]["landmark_name"], | |
| "confidence": pyramid_results["best_result"]["confidence"], | |
| "scale_factor": pyramid_results["best_result"].get("scale_factor", 1.0) | |
| }) | |
| # Add grid results | |
| for result in grid_results: | |
| all_detections.append({ | |
| "source": "grid", | |
| "landmark_id": result["landmark_id"], | |
| "landmark_name": result["landmark_name"], | |
| "confidence": result["confidence"], | |
| "grid_position": result.get("grid_position", (0, 0)) | |
| }) | |
| # Search entire image | |
| full_image_result = self.search_entire_image(image, threshold=adjusted_threshold) | |
| if full_image_result and full_image_result.get("is_landmark", False): | |
| all_detections.append({ | |
| "source": "full_image", | |
| "landmark_id": full_image_result["landmark_id"], | |
| "landmark_name": full_image_result["landmark_name"], | |
| "confidence": full_image_result["confidence"] | |
| }) | |
| # Group by landmark_id and calculate aggregate confidence | |
| landmark_groups = {} | |
| for detection in all_detections: | |
| landmark_id = detection["landmark_id"] | |
| if landmark_id not in landmark_groups: | |
| landmark_groups[landmark_id] = { | |
| "landmark_id": landmark_id, | |
| "landmark_name": detection["landmark_name"], | |
| "detections": [], | |
| "sources": set() | |
| } | |
| landmark_groups[landmark_id]["detections"].append(detection) | |
| landmark_groups[landmark_id]["sources"].add(detection["source"]) | |
| # Calculate aggregate confidence for each landmark | |
| for landmark_id, group in landmark_groups.items(): | |
| detections = group["detections"] | |
| # Base confidence is the maximum confidence from any source | |
| max_confidence = max(d["confidence"] for d in detections) | |
| # Bonus for detection from multiple sources | |
| source_count = len(group["sources"]) | |
| source_bonus = min(0.15, (source_count - 1) * 0.05) # Up to 15% bonus | |
| # Consistency bonus for multiple detections of the same landmark | |
| detection_count = len(detections) | |
| consistency_bonus = min(0.1, (detection_count - 1) * 0.02) # Up to 10% bonus | |
| # Calculate final confidence | |
| aggregate_confidence = min(1.0, max_confidence + source_bonus + consistency_bonus) | |
| group["confidence"] = aggregate_confidence | |
| group["detection_count"] = detection_count | |
| group["source_count"] = source_count | |
| # Sort landmarks by confidence | |
| sorted_landmarks = sorted( | |
| landmark_groups.values(), | |
| key=lambda x: x["confidence"], | |
| reverse=True | |
| ) | |
| return { | |
| "is_landmark_scene": len(sorted_landmarks) > 0, | |
| "detected_landmarks": sorted_landmarks, | |
| "viewpoint_info": viewpoint_info, | |
| "primary_landmark": sorted_landmarks[0] if sorted_landmarks else None | |
| } | |
| def _analyze_architectural_features(self, image): | |
| """ | |
| Analyzes the architectural features of a structure in the image without hardcoding specific landmarks. | |
| Args: | |
| image: Input image | |
| Returns: | |
| Dict: Architectural feature analysis results | |
| """ | |
| # Define universal architectural feature prompts that apply to all types of landmarks | |
| architecture_prompts = { | |
| "tall_structure": "a tall vertical structure standing alone", | |
| "tiered_building": "a building with multiple stacked tiers or segments", | |
| "historical_structure": "a building with historical architectural elements", | |
| "modern_design": "a modern structure with contemporary architectural design", | |
| "segmented_exterior": "a structure with visible segmented or sectioned exterior", | |
| "viewing_platform": "a tall structure with observation area at the top", | |
| "time_display": "a structure with timepiece features", | |
| "glass_facade": "a building with prominent glass exterior surfaces", | |
| "memorial_structure": "a monument or memorial structure", | |
| "ancient_construction": "ancient constructed elements or archaeological features", | |
| "natural_landmark": "a natural geographic formation or landmark", | |
| "slanted_design": "a structure with non-vertical or leaning profile" | |
| } | |
| # Calculate similarity scores against universal architectural patterns | |
| context_scores = self.calculate_similarity_scores(image, architecture_prompts) | |
| # Determine most relevant architectural features | |
| top_features = sorted(context_scores.items(), key=lambda x: x[1], reverse=True)[:3] | |
| # Calculate feature confidence | |
| context_confidence = sum(score for _, score in top_features) / 3 | |
| # Determine primary architectural category based on top features | |
| architectural_categories = { | |
| "tower": ["tall_structure", "viewing_platform", "time_display"], | |
| "skyscraper": ["tall_structure", "modern_design", "glass_facade"], | |
| "historical": ["historical_structure", "ancient_construction", "memorial_structure"], | |
| "natural": ["natural_landmark"], | |
| "distinctive": ["tiered_building", "segmented_exterior", "slanted_design"] | |
| } | |
| # Score each category based on the top features | |
| category_scores = {} | |
| for category, features in architectural_categories.items(): | |
| category_score = 0 | |
| for feature, score in context_scores.items(): | |
| if feature in features: | |
| category_score += score | |
| category_scores[category] = category_score | |
| primary_category = max(category_scores.items(), key=lambda x: x[1])[0] | |
| return { | |
| "architectural_features": top_features, | |
| "context_confidence": context_confidence, | |
| "primary_category": primary_category, | |
| "category_scores": category_scores | |
| } | |
| def intelligent_landmark_search(self, | |
| image: Union[Image.Image, np.ndarray], | |
| yolo_boxes: Optional[List[List[float]]] = None, | |
| base_threshold: float = 0.25) -> Dict[str, Any]: | |
| """ | |
| 對圖像進行智能地標搜索,綜合整張圖像分析和區域分析 | |
| Args: | |
| image: 原始圖像 | |
| yolo_boxes: YOLO檢測到的邊界框 (可選) | |
| base_threshold: 基礎置信度閾值 | |
| Returns: | |
| Dict: 包含所有檢測結果的綜合分析 | |
| """ | |
| # 確保圖像是PIL格式 | |
| if not isinstance(image, Image.Image): | |
| if isinstance(image, np.ndarray): | |
| image = Image.fromarray(image) | |
| else: | |
| raise ValueError("Unsupported image format. Expected PIL Image or numpy array.") | |
| # No YOLO 框時,可以稍微降低閾值以提高召回率 | |
| actual_threshold = base_threshold * 0.85 if yolo_boxes is None or len(yolo_boxes) == 0 else base_threshold | |
| # 首先對整張圖像進行分析 | |
| try: | |
| full_image_result = self.search_entire_image( | |
| image, | |
| threshold=actual_threshold, | |
| detailed_analysis=True # 確保詳細分析開啟 | |
| ) | |
| # No YOLO 框,則進行多尺度分析以提高檢測機會 | |
| if (yolo_boxes is None or len(yolo_boxes) == 0) and (not full_image_result or not full_image_result.get("is_landmark", False)): | |
| print("No YOLO boxes provided, attempting multi-scale pyramid analysis") | |
| try: | |
| if hasattr(self, '_perform_pyramid_analysis'): | |
| pyramid_results = self._perform_pyramid_analysis( | |
| image, | |
| levels=4, # | |
| base_threshold=actual_threshold, | |
| aspect_ratios=[1.0, 0.75, 1.5, 0.5, 2.0] | |
| ) | |
| if pyramid_results and pyramid_results.get("is_landmark", False) and pyramid_results.get("best_result", {}).get("confidence", 0) > actual_threshold: | |
| # 使用金字塔分析結果增強或替代全圖結果 | |
| if not full_image_result or not full_image_result.get("is_landmark", False): | |
| full_image_result = { | |
| "is_landmark": True, | |
| "landmark_id": pyramid_results["best_result"]["landmark_id"], | |
| "landmark_name": pyramid_results["best_result"]["landmark_name"], | |
| "confidence": pyramid_results["best_result"]["confidence"], | |
| "location": pyramid_results["best_result"].get("location", "Unknown Location") | |
| } | |
| print(f"Pyramid analysis detected landmark: {pyramid_results['best_result']['landmark_name']} with confidence {pyramid_results['best_result']['confidence']:.3f}") | |
| else: | |
| print("Pyramid analysis not available, skipping multi-scale detection") | |
| except Exception as e: | |
| print(f"Error in pyramid analysis: {e}") | |
| except Exception as e: | |
| print(f"Error in search_entire_image: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| full_image_result = None | |
| # 初始化結果字典 | |
| result = { | |
| "full_image_analysis": full_image_result if full_image_result else {}, | |
| "is_landmark_scene": False, # 默認值 | |
| "detected_landmarks": [] | |
| } | |
| # 上下文感知比較,處理接近的排名結果 | |
| if full_image_result and "top_landmarks" in full_image_result and len(full_image_result["top_landmarks"]) >= 2: | |
| top_landmarks = full_image_result["top_landmarks"] | |
| # 檢查前兩個結果是否非常接近(信心度差異小於 0.1) | |
| if len(top_landmarks) >= 2 and abs(top_landmarks[0]["confidence"] - top_landmarks[1]["confidence"]) < 0.1: | |
| # 對於接近的結果,使用通用建築特徵分析進行區分 | |
| try: | |
| # 分析建築特徵 | |
| if hasattr(self, '_analyze_architectural_features'): | |
| architectural_analysis = self._analyze_architectural_features(image) | |
| top_features = architectural_analysis.get("architectural_features", []) | |
| primary_category = architectural_analysis.get("primary_category", "") | |
| # 根據建築特徵調整地標置信度 | |
| for i, landmark in enumerate(top_landmarks[:2]): | |
| if i >= len(top_landmarks): | |
| continue | |
| landmark_id = landmark.get("landmark_id", "").lower() | |
| confidence_boost = 0 | |
| # 使用主要建築類別來調整置信度,使用通用條件而非特定地標名稱 | |
| if primary_category == "tower" and any(term in landmark_id for term in ["tower", "spire", "needle"]): | |
| confidence_boost += 0.05 | |
| elif primary_category == "skyscraper" and any(term in landmark_id for term in ["building", "skyscraper", "tall"]): | |
| confidence_boost += 0.05 | |
| elif primary_category == "historical" and any(term in landmark_id for term in ["monument", "castle", "palace", "temple"]): | |
| confidence_boost += 0.05 | |
| elif primary_category == "distinctive" and any(term in landmark_id for term in ["unusual", "unique", "special", "famous"]): | |
| confidence_boost += 0.05 | |
| # 根據特定特徵進一步微調,使用通用特徵描述而非特定地標 | |
| for feature, score in top_features: | |
| if feature == "time_display" and "clock" in landmark_id: | |
| confidence_boost += 0.03 | |
| elif feature == "segmented_exterior" and "segmented" in landmark_id: | |
| confidence_boost += 0.03 | |
| elif feature == "slanted_design" and "leaning" in landmark_id: | |
| confidence_boost += 0.03 | |
| # 應用信心度調整 | |
| if confidence_boost > 0 and i < len(top_landmarks): | |
| top_landmarks[i]["confidence"] += confidence_boost | |
| print(f"Boosted {landmark['landmark_name']} confidence by {confidence_boost:.2f} based on architectural features ({primary_category})") | |
| # 重新排序 | |
| top_landmarks.sort(key=lambda x: x["confidence"], reverse=True) | |
| full_image_result["top_landmarks"] = top_landmarks | |
| if top_landmarks: | |
| full_image_result["landmark_id"] = top_landmarks[0]["landmark_id"] | |
| full_image_result["landmark_name"] = top_landmarks[0]["landmark_name"] | |
| full_image_result["confidence"] = top_landmarks[0]["confidence"] | |
| full_image_result["location"] = top_landmarks[0].get("location", "Unknown Location") | |
| except Exception as e: | |
| print(f"Error in architectural feature analysis: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| if full_image_result and full_image_result.get("is_landmark", False): | |
| result["is_landmark_scene"] = True | |
| landmark_id = full_image_result.get("landmark_id", "unknown") | |
| # extract landmark info | |
| landmark_specific_info = self._extract_landmark_specific_info(landmark_id) | |
| landmark_info = { | |
| "landmark_id": landmark_id, | |
| "landmark_name": full_image_result.get("landmark_name", "Unknown Landmark"), | |
| "confidence": full_image_result.get("confidence", 0.0), | |
| "location": full_image_result.get("location", "Unknown Location"), | |
| "region_type": "full_image", | |
| "box": [0, 0, getattr(image, 'width', 0), getattr(image, 'height', 0)] | |
| } | |
| # 整合地標特定info,確保正確的名稱被使用 | |
| landmark_info.update(landmark_specific_info) | |
| # 如果特定信息中有更準確的地標名稱,使用它 | |
| if landmark_specific_info.get("landmark_name"): | |
| landmark_info["landmark_name"] = landmark_specific_info["landmark_name"] | |
| result["detected_landmarks"].append(landmark_info) | |
| # 確保地標特定活動被正確設置為主要結果 | |
| if landmark_specific_info.get("has_specific_activities", False): | |
| result["primary_landmark_activities"] = landmark_specific_info.get("landmark_specific_activities", []) | |
| print(f"Set primary landmark activities: {len(result['primary_landmark_activities'])} activities for {landmark_info['landmark_name']}") | |
| # 如果提供了YOLO邊界框,分析這些區域 | |
| if yolo_boxes and len(yolo_boxes) > 0: | |
| for box in yolo_boxes: | |
| try: | |
| if hasattr(self, 'classify_image_region'): | |
| box_result = self.classify_image_region( | |
| image, | |
| box, | |
| threshold=base_threshold, | |
| detection_type="auto" | |
| ) | |
| # 如果檢測到地標 | |
| if box_result and box_result.get("is_landmark", False): | |
| # 檢查是否與已檢測的地標重複 | |
| is_duplicate = False | |
| for existing in result["detected_landmarks"]: | |
| if existing.get("landmark_id") == box_result.get("landmark_id"): | |
| # 如果新的置信度更高,則更新 | |
| if box_result.get("confidence", 0) > existing.get("confidence", 0): | |
| existing.update({ | |
| "confidence": box_result.get("confidence", 0), | |
| "region_type": "yolo_box", | |
| "box": box | |
| }) | |
| is_duplicate = True | |
| break | |
| # 如果不是重複的,添加到列表 | |
| if not is_duplicate: | |
| result["detected_landmarks"].append({ | |
| "landmark_id": box_result.get("landmark_id", "unknown"), | |
| "landmark_name": box_result.get("landmark_name", "Unknown Landmark"), | |
| "confidence": box_result.get("confidence", 0.0), | |
| "location": box_result.get("location", "Unknown Location"), | |
| "region_type": "yolo_box", | |
| "box": box | |
| }) | |
| except Exception as e: | |
| print(f"Error in analyzing YOLO box: {e}") | |
| continue | |
| # 最後,執行額外的網格搜索以捕獲可能被遺漏的地標 | |
| # 但只有在尚未發現地標或僅發現低置信度地標時 | |
| should_do_grid_search = ( | |
| len(result["detected_landmarks"]) == 0 or | |
| max([landmark.get("confidence", 0) for landmark in result["detected_landmarks"]], default=0) < 0.5 | |
| ) | |
| if should_do_grid_search and hasattr(self, 'classify_image_region'): | |
| try: | |
| # 創建5x5網格 | |
| width, height = getattr(image, 'size', (getattr(image, 'width', 0), getattr(image, 'height', 0))) | |
| if not isinstance(width, (int, float)) or width <= 0: | |
| width = getattr(image, 'width', 0) | |
| if not isinstance(height, (int, float)) or height <= 0: | |
| height = getattr(image, 'height', 0) | |
| if width > 0 and height > 0: | |
| grid_boxes = [] | |
| for i in range(5): | |
| for j in range(5): | |
| grid_boxes.append([ | |
| width * (j/5), height * (i/5), | |
| width * ((j+1)/5), height * ((i+1)/5) | |
| ]) | |
| # 分析每個網格區域 | |
| for box in grid_boxes: | |
| try: | |
| grid_result = self.classify_image_region( | |
| image, | |
| box, | |
| threshold=base_threshold * 0.9, # 稍微降低網格搜索閾值 | |
| detection_type="partial" | |
| ) | |
| # 如果檢測到地標 | |
| if grid_result and grid_result.get("is_landmark", False): | |
| # 檢查是否與已檢測的地標重複 | |
| is_duplicate = False | |
| for existing in result["detected_landmarks"]: | |
| if existing.get("landmark_id") == grid_result.get("landmark_id"): | |
| is_duplicate = True | |
| break | |
| # 如果不是重複的,添加到列表 | |
| if not is_duplicate: | |
| result["detected_landmarks"].append({ | |
| "landmark_id": grid_result.get("landmark_id", "unknown"), | |
| "landmark_name": grid_result.get("landmark_name", "Unknown Landmark"), | |
| "confidence": grid_result.get("confidence", 0.0), | |
| "location": grid_result.get("location", "Unknown Location"), | |
| "region_type": "grid", | |
| "box": box | |
| }) | |
| except Exception as e: | |
| print(f"Error in analyzing grid region: {e}") | |
| continue | |
| except Exception as e: | |
| print(f"Error in grid search: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| # 按置信度排序檢測結果 | |
| result["detected_landmarks"].sort(key=lambda x: x.get("confidence", 0), reverse=True) | |
| # 更新整體場景類型判斷 | |
| if len(result["detected_landmarks"]) > 0: | |
| result["is_landmark_scene"] = True | |
| result["primary_landmark"] = result["detected_landmarks"][0] | |
| # 添加 clip_analysis_on_full_image 結果,以便給 LLM 提供更多上下文 | |
| if full_image_result and "clip_analysis" in full_image_result: | |
| result["clip_analysis_on_full_image"] = full_image_result["clip_analysis"] | |
| return result | |
| def _extract_landmark_specific_info(self, landmark_id: str) -> Dict[str, Any]: | |
| """ | |
| 提取特定地標的詳細信息,包括特色模板和活動建議 | |
| Args: | |
| landmark_id: 地標ID | |
| Returns: | |
| Dict: 地標特定信息 | |
| """ | |
| if not landmark_id or landmark_id == "unknown": | |
| return {"has_specific_activities": False} | |
| specific_info = {"has_specific_activities": False} | |
| # 從 ALL_LANDMARKS 或 self.landmark_data 中提取基本信息 | |
| landmark_data_source = None | |
| # 優先嘗試從類屬性獲取 | |
| if hasattr(self, 'landmark_data') and self.landmark_data and landmark_id in self.landmark_data: | |
| landmark_data_source = self.landmark_data[landmark_id] | |
| print(f"Using landmark data from class attribute for {landmark_id}") | |
| else: | |
| try: | |
| if landmark_id in ALL_LANDMARKS: | |
| landmark_data_source = ALL_LANDMARKS[landmark_id] | |
| print(f"Using landmark data from ALL_LANDMARKS for {landmark_id}") | |
| except ImportError: | |
| print("Warning: Could not import ALL_LANDMARKS from landmark_data") | |
| except Exception as e: | |
| print(f"Error accessing ALL_LANDMARKS: {e}") | |
| # 處理地標基本數據 | |
| if landmark_data_source: | |
| # 提取正確的地標名稱 | |
| if "name" in landmark_data_source: | |
| specific_info["landmark_name"] = landmark_data_source["name"] | |
| # 提取所有可用的 prompts 作為特色模板 | |
| if "prompts" in landmark_data_source: | |
| specific_info["feature_templates"] = landmark_data_source["prompts"][:5] | |
| specific_info["primary_template"] = landmark_data_source["prompts"][0] | |
| # 提取別名info | |
| if "aliases" in landmark_data_source: | |
| specific_info["aliases"] = landmark_data_source["aliases"] | |
| # 提取位置信息 | |
| if "location" in landmark_data_source: | |
| specific_info["location"] = landmark_data_source["location"] | |
| # 提取其他相關信息 | |
| for key in ["year_built", "architectural_style", "significance", "description"]: | |
| if key in landmark_data_source: | |
| specific_info[key] = landmark_data_source[key] | |
| # 嘗試從 LANDMARK_ACTIVITIES 中提取活動建議 | |
| try: | |
| if landmark_id in LANDMARK_ACTIVITIES: | |
| activities = LANDMARK_ACTIVITIES[landmark_id] | |
| specific_info["landmark_specific_activities"] = activities | |
| specific_info["has_specific_activities"] = True | |
| print(f"Found {len(activities)} specific activities for landmark {landmark_id}") | |
| else: | |
| print(f"No specific activities found for landmark {landmark_id} in LANDMARK_ACTIVITIES") | |
| specific_info["has_specific_activities"] = False | |
| except ImportError: | |
| print("Warning: Could not import LANDMARK_ACTIVITIES from landmark_activities") | |
| specific_info["has_specific_activities"] = False | |
| except Exception as e: | |
| print(f"Error loading landmark activities for {landmark_id}: {e}") | |
| specific_info["has_specific_activities"] = False | |
| return specific_info | |
| def _analyze_viewpoint(self, image: Union[Image.Image, np.ndarray]) -> Dict[str, float]: | |
| """ | |
| Analyzes the image viewpoint to adjust detection parameters. | |
| Args: | |
| image: Input image | |
| Returns: | |
| Dict: Viewpoint analysis results | |
| """ | |
| viewpoint_prompts = { | |
| "aerial_view": "an aerial view from above looking down", | |
| "street_level": "a street level view looking up at a tall structure", | |
| "eye_level": "an eye-level horizontal view of a landmark", | |
| "distant": "a distant view of a landmark on the horizon", | |
| "close_up": "a close-up detailed view of architectural features", | |
| "interior": "an interior view inside a structure" | |
| } | |
| # Calculate similarity scores | |
| viewpoint_scores = self.calculate_similarity_scores(image, viewpoint_prompts) | |
| # Find dominant viewpoint | |
| dominant_viewpoint = max(viewpoint_scores.items(), key=lambda x: x[1]) | |
| return { | |
| "viewpoint_scores": viewpoint_scores, | |
| "dominant_viewpoint": dominant_viewpoint[0], | |
| "confidence": dominant_viewpoint[1] | |
| } | |
| def calculate_similarity_scores(self, image: Union[Image.Image, np.ndarray], | |
| prompts: Dict[str, str]) -> Dict[str, float]: | |
| """ | |
| 計算圖像與一組特定提示之間的相似度分數 | |
| Args: | |
| image: 輸入圖像 | |
| prompts: 提示詞字典 {名稱: 提示文本} | |
| Returns: | |
| Dict[str, float]: 每個提示的相似度分數 | |
| """ | |
| # 確保圖像是PIL格式 | |
| if not isinstance(image, Image.Image): | |
| if isinstance(image, np.ndarray): | |
| image = Image.fromarray(image) | |
| else: | |
| raise ValueError("Unsupported image format. Expected PIL Image or numpy array.") | |
| # 預處理圖像 | |
| image_input = self.preprocess(image).unsqueeze(0).to(self.device) | |
| # 獲取圖像特徵 | |
| with torch.no_grad(): | |
| image_features = self.model.encode_image(image_input) | |
| image_features = image_features / image_features.norm(dim=-1, keepdim=True) | |
| # 計算與每個提示的相似度 | |
| scores = {} | |
| prompt_texts = list(prompts.values()) | |
| prompt_tokens = clip.tokenize(prompt_texts).to(self.device) | |
| with torch.no_grad(): | |
| prompt_features = self.model.encode_text(prompt_tokens) | |
| prompt_features = prompt_features / prompt_features.norm(dim=-1, keepdim=True) | |
| # calculate similarity | |
| similarity = (100.0 * image_features @ prompt_features.T).softmax(dim=-1) | |
| similarity = similarity.cpu().numpy()[0] if self.device == "cuda" else similarity.numpy()[0] | |
| # 填充結果字典 | |
| for i, (name, _) in enumerate(prompts.items()): | |
| scores[name] = float(similarity[i]) | |
| return scores | |