Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| import numpy as np | |
| from typing import Dict, List, Tuple, Any, Optional | |
| from scene_type import SCENE_TYPES | |
| from enhance_scene_describer import EnhancedSceneDescriber | |
| class SpatialAnalyzer: | |
| """ | |
| Analyzes spatial relationships between objects in an image. | |
| Handles region assignment, object positioning, and functional zone identification. | |
| """ | |
| def __init__(self, class_names: Dict[int, str] = None, object_categories=None): | |
| """Initialize the spatial analyzer with image regions""" | |
| # Define regions of the image (3x3 grid) | |
| self.regions = { | |
| "top_left": (0, 0, 1/3, 1/3), | |
| "top_center": (1/3, 0, 2/3, 1/3), | |
| "top_right": (2/3, 0, 1, 1/3), | |
| "middle_left": (0, 1/3, 1/3, 2/3), | |
| "middle_center": (1/3, 1/3, 2/3, 2/3), | |
| "middle_right": (2/3, 1/3, 1, 2/3), | |
| "bottom_left": (0, 2/3, 1/3, 1), | |
| "bottom_center": (1/3, 2/3, 2/3, 1), | |
| "bottom_right": (2/3, 2/3, 1, 1) | |
| } | |
| self.class_names = class_names | |
| self.OBJECT_CATEGORIES = object_categories or {} | |
| self.enhance_descriptor = EnhancedSceneDescriber(scene_types=SCENE_TYPES) | |
| # Distances thresholds for proximity analysis (normalized) | |
| self.proximity_threshold = 0.2 | |
| def _determine_region(self, x: float, y: float) -> str: | |
| """ | |
| Determine which region a point falls into. | |
| Args: | |
| x: Normalized x-coordinate (0-1) | |
| y: Normalized y-coordinate (0-1) | |
| Returns: | |
| Region name | |
| """ | |
| for region_name, (x1, y1, x2, y2) in self.regions.items(): | |
| if x1 <= x < x2 and y1 <= y < y2: | |
| return region_name | |
| return "unknown" | |
| def _analyze_regions(self, detected_objects: List[Dict]) -> Dict: | |
| """ | |
| Analyze object distribution across image regions. | |
| Args: | |
| detected_objects: List of detected objects with position information | |
| Returns: | |
| Dictionary with region analysis | |
| """ | |
| # Count objects in each region | |
| region_counts = {region: 0 for region in self.regions.keys()} | |
| region_objects = {region: [] for region in self.regions.keys()} | |
| for obj in detected_objects: | |
| region = obj["region"] | |
| if region in region_counts: | |
| region_counts[region] += 1 | |
| region_objects[region].append({ | |
| "class_id": obj["class_id"], | |
| "class_name": obj["class_name"] | |
| }) | |
| # Determine main focus regions (top 1-2 regions by object count) | |
| sorted_regions = sorted(region_counts.items(), key=lambda x: x[1], reverse=True) | |
| main_regions = [region for region, count in sorted_regions if count > 0][:2] | |
| return { | |
| "counts": region_counts, | |
| "main_focus": main_regions, | |
| "objects_by_region": region_objects | |
| } | |
| def _extract_detected_objects(self, detection_result: Any, confidence_threshold: float = 0.25) -> List[Dict]: | |
| """ | |
| Extract detected objects from detection result with position information. | |
| Args: | |
| detection_result: Detection result from YOLOv8 | |
| confidence_threshold: Minimum confidence threshold | |
| Returns: | |
| List of dictionaries with detected object information | |
| """ | |
| boxes = detection_result.boxes.xyxy.cpu().numpy() | |
| classes = detection_result.boxes.cls.cpu().numpy().astype(int) | |
| confidences = detection_result.boxes.conf.cpu().numpy() | |
| # Image dimensions | |
| img_height, img_width = detection_result.orig_shape[:2] | |
| detected_objects = [] | |
| for box, class_id, confidence in zip(boxes, classes, confidences): | |
| # Skip objects with confidence below threshold | |
| if confidence < confidence_threshold: | |
| continue | |
| x1, y1, x2, y2 = box | |
| width = x2 - x1 | |
| height = y2 - y1 | |
| # Center point | |
| center_x = (x1 + x2) / 2 | |
| center_y = (y1 + y2) / 2 | |
| # Normalized positions (0-1) | |
| norm_x = center_x / img_width | |
| norm_y = center_y / img_height | |
| norm_width = width / img_width | |
| norm_height = height / img_height | |
| # Area calculation | |
| area = width * height | |
| norm_area = area / (img_width * img_height) | |
| # Region determination | |
| object_region = self._determine_region(norm_x, norm_y) | |
| detected_objects.append({ | |
| "class_id": int(class_id), | |
| "class_name": self.class_names[int(class_id)], | |
| "confidence": float(confidence), | |
| "box": [float(x1), float(y1), float(x2), float(y2)], | |
| "center": [float(center_x), float(center_y)], | |
| "normalized_center": [float(norm_x), float(norm_y)], | |
| "size": [float(width), float(height)], | |
| "normalized_size": [float(norm_width), float(norm_height)], | |
| "area": float(area), | |
| "normalized_area": float(norm_area), | |
| "region": object_region | |
| }) | |
| return detected_objects | |
| def _detect_scene_viewpoint(self, detected_objects: List[Dict]) -> Dict: | |
| """ | |
| 檢測場景視角並識別特殊場景模式。 | |
| Args: | |
| detected_objects: 檢測到的物體列表 | |
| Returns: | |
| Dict: 包含視角和場景模式信息的字典 | |
| """ | |
| if not detected_objects: | |
| return {"viewpoint": "eye_level", "patterns": []} | |
| # 從物體位置中提取信息 | |
| patterns = [] | |
| # 檢測行人位置模式 | |
| pedestrian_objs = [obj for obj in detected_objects if obj["class_id"] == 0] | |
| # 檢查是否有足夠的行人來識別模式 | |
| if len(pedestrian_objs) >= 4: | |
| pedestrian_positions = [obj["normalized_center"] for obj in pedestrian_objs] | |
| # 檢測十字交叉模式 | |
| if self._detect_cross_pattern(pedestrian_positions): | |
| patterns.append("crosswalk_intersection") | |
| # 檢測多方向行人流 | |
| directions = self._analyze_movement_directions(pedestrian_positions) | |
| if len(directions) >= 2: | |
| patterns.append("multi_directional_movement") | |
| # 檢查物體的大小一致性 - 在空中俯視圖中,物體大小通常更一致 | |
| if len(detected_objects) >= 5: | |
| sizes = [obj.get("normalized_area", 0) for obj in detected_objects] | |
| size_variance = np.var(sizes) / (np.mean(sizes) ** 2) # 標準化變異數,不會受到平均值影響 | |
| if size_variance < 0.3: # 低變異表示大小一致 | |
| patterns.append("consistent_object_size") | |
| # 基本視角檢測 | |
| viewpoint = self.enhance_descriptor._detect_viewpoint(detected_objects) | |
| # 根據檢測到的模式增強視角判斷 | |
| if "crosswalk_intersection" in patterns and viewpoint != "aerial": | |
| # 如果檢測到斑馬線交叉但視角判斷不是空中視角,優先採用模式判斷 | |
| viewpoint = "aerial" | |
| return { | |
| "viewpoint": viewpoint, | |
| "patterns": patterns | |
| } | |
| def _detect_cross_pattern(self, positions): | |
| """ | |
| 檢測位置中的十字交叉模式 | |
| Args: | |
| positions: 位置列表 [[x1, y1], [x2, y2], ...] | |
| Returns: | |
| bool: 是否檢測到十字交叉模式 | |
| """ | |
| if len(positions) < 8: # 需要足夠多的點 | |
| return False | |
| # 提取 x 和 y 坐標 | |
| x_coords = [pos[0] for pos in positions] | |
| y_coords = [pos[1] for pos in positions] | |
| # 檢測 x 和 y 方向的聚類 | |
| x_clusters = [] | |
| y_clusters = [] | |
| # 簡化的聚類分析 | |
| x_mean = np.mean(x_coords) | |
| y_mean = np.mean(y_coords) | |
| # 計算在中心線附近的點 | |
| near_x_center = sum(1 for x in x_coords if abs(x - x_mean) < 0.1) | |
| near_y_center = sum(1 for y in y_coords if abs(y - y_mean) < 0.1) | |
| # 如果有足夠的點在中心線附近,可能是十字交叉 | |
| return near_x_center >= 3 and near_y_center >= 3 | |
| def _analyze_movement_directions(self, positions): | |
| """ | |
| 分析位置中的移動方向 | |
| Args: | |
| positions: 位置列表 [[x1, y1], [x2, y2], ...] | |
| Returns: | |
| list: 檢測到的主要方向 | |
| """ | |
| if len(positions) < 6: | |
| return [] | |
| # extract x 和 y 坐標 | |
| x_coords = [pos[0] for pos in positions] | |
| y_coords = [pos[1] for pos in positions] | |
| directions = [] | |
| # horizontal move (left --> right) | |
| x_std = np.std(x_coords) | |
| x_range = max(x_coords) - min(x_coords) | |
| # vertical move(up --> down) | |
| y_std = np.std(y_coords) | |
| y_range = max(y_coords) - min(y_coords) | |
| # 足夠大的範圍表示該方向有運動 | |
| if x_range > 0.4: | |
| directions.append("horizontal") | |
| if y_range > 0.4: | |
| directions.append("vertical") | |
| return directions | |
| def _identify_functional_zones(self, detected_objects: List[Dict], scene_type: str) -> Dict: | |
| """ | |
| Identify functional zones within the scene with improved detection for different viewpoints | |
| and cultural contexts. | |
| Args: | |
| detected_objects: List of detected objects | |
| scene_type: Identified scene type | |
| Returns: | |
| Dictionary of functional zones with their descriptions | |
| """ | |
| # Group objects by category and region | |
| category_regions = {} | |
| if not getattr(self, 'enable_landmark', True): | |
| detected_objects = [obj for obj in detected_objects if not obj.get("is_landmark", False)] | |
| # 過濾地標相關場景類型 | |
| if scene_type in ["tourist_landmark", "natural_landmark", "historical_monument"]: | |
| scene_type = "city_street" | |
| # MODIFIED: Smart threshold evaluation instead of fixed values | |
| should_identify = self._evaluate_zone_identification_feasibility(detected_objects, scene_type) | |
| if not should_identify: | |
| return {} | |
| # MODIFIED: Build category_regions mapping (was missing in original) | |
| for obj in detected_objects: | |
| category = self._categorize_object(obj) | |
| if not category: | |
| continue | |
| if category not in category_regions: | |
| category_regions[category] = {} | |
| region = obj.get("region", "center") | |
| if region not in category_regions[category]: | |
| category_regions[category][region] = [] | |
| category_regions[category][region].append(obj) | |
| # Identify zones based on object groupings | |
| zones = {} | |
| # Detect viewpoint to adjust zone identification strategy | |
| viewpoint = self._detect_scene_viewpoint(detected_objects) | |
| # Choose appropriate zone identification strategy based on scene type and viewpoint | |
| if scene_type in ["living_room", "bedroom", "dining_area", "kitchen", "office_workspace", "meeting_room"]: | |
| # Indoor scenes | |
| zones.update(self._identify_indoor_zones(category_regions, detected_objects, scene_type)) | |
| elif scene_type in ["city_street", "parking_lot", "park_area"]: | |
| # Outdoor general scenes | |
| zones.update(self._identify_outdoor_general_zones(category_regions, detected_objects, scene_type)) | |
| elif "aerial" in scene_type or viewpoint == "aerial": | |
| # Aerial viewpoint scenes | |
| zones.update(self._identify_aerial_view_zones(category_regions, detected_objects, scene_type)) | |
| elif "asian" in scene_type: | |
| # Asian cultural context scenes | |
| zones.update(self._identify_asian_cultural_zones(category_regions, detected_objects, scene_type)) | |
| elif scene_type == "urban_intersection": | |
| # Specific urban intersection logic | |
| zones.update(self._identify_intersection_zones(category_regions, detected_objects, viewpoint)) | |
| elif scene_type == "financial_district": | |
| # Financial district specific logic | |
| zones.update(self._identify_financial_district_zones(category_regions, detected_objects)) | |
| elif scene_type == "upscale_dining": | |
| # Upscale dining specific logic | |
| zones.update(self._identify_upscale_dining_zones(category_regions, detected_objects)) | |
| elif scene_type == "tourist_landmark" or "landmark" in scene_type: | |
| # 處理地標場景類型 | |
| landmark_objects = [obj for obj in detected_objects if obj.get("is_landmark", False)] | |
| if landmark_objects: | |
| landmark_zones = self._identify_landmark_zones(landmark_objects) | |
| zones.update(landmark_zones) | |
| else: | |
| # Default zone identification for other scene types | |
| zones.update(self._identify_default_zones(category_regions, detected_objects)) | |
| # 檢查是否有地標物體但場景類型不是地標類型 | |
| if scene_type != "tourist_landmark" and "landmark" not in scene_type: | |
| landmark_objects = [obj for obj in detected_objects if obj.get("is_landmark", False)] | |
| if landmark_objects: | |
| # 添加地標功能區,但不覆蓋已有的功能區 | |
| landmark_zones = self._identify_landmark_zones(landmark_objects) | |
| # 確保地標區域不會覆蓋已識別的其他重要功能區 | |
| for zone_id, zone_info in landmark_zones.items(): | |
| if zone_id not in zones: | |
| zones[zone_id] = zone_info | |
| # MODIFIED: Enhanced fallback strategy - try simplified identification if no zones found | |
| if not zones: | |
| zones.update(self._identify_default_zones(category_regions, detected_objects)) | |
| # Final fallback: create basic zones from high-confidence objects | |
| if not zones: | |
| zones.update(self._create_basic_zones_from_objects(detected_objects, scene_type)) | |
| return zones | |
| def _identify_core_objects_for_scene(self, detected_objects: List[Dict], scene_type: str) -> List[Dict]: | |
| """ | |
| Identify core objects that define a particular scene type. | |
| Args: | |
| detected_objects: List of detected objects | |
| scene_type: Scene type | |
| Returns: | |
| List of core objects for the scene | |
| """ | |
| core_objects = [] | |
| scene_core_mapping = { | |
| "bedroom": [59], # bed | |
| "kitchen": [68, 69, 71, 72], # microwave, oven, sink, refrigerator | |
| "living_room": [57, 58, 62], # sofa, chair, tv | |
| "dining_area": [60, 46, 47], # dining table, fork, knife | |
| "office_workspace": [63, 64, 66, 73] # laptop, mouse, keyboard, book | |
| } | |
| if scene_type in scene_core_mapping: | |
| core_class_ids = scene_core_mapping[scene_type] | |
| for obj in detected_objects: | |
| if obj["class_id"] in core_class_ids and obj.get("confidence", 0) >= 0.4: | |
| core_objects.append(obj) | |
| return core_objects | |
| def _get_object_categories(self, detected_objects: List[Dict]) -> set: | |
| """Get unique object categories from detected objects.""" | |
| object_categories = set() | |
| for obj in detected_objects: | |
| category = self._categorize_object(obj) | |
| if category: | |
| object_categories.add(category) | |
| return object_categories | |
| def _create_basic_zones_from_objects(self, detected_objects: List[Dict], scene_type: str) -> Dict: | |
| """ | |
| Create basic functional zones from individual high-confidence objects. | |
| This is a fallback when standard zone identification fails. | |
| Args: | |
| detected_objects: List of detected objects | |
| scene_type: Scene type | |
| Returns: | |
| Dictionary of basic zones | |
| """ | |
| zones = {} | |
| # Focus on high-confidence objects | |
| high_conf_objects = [obj for obj in detected_objects if obj.get("confidence", 0) >= 0.6] | |
| if not high_conf_objects: | |
| high_conf_objects = detected_objects # Fallback to all objects | |
| # Create zones based on individual important objects | |
| for i, obj in enumerate(high_conf_objects[:3]): # Limit to top 3 objects | |
| class_name = obj["class_name"] | |
| region = obj.get("region", "center") | |
| # Create descriptive zone based on object type | |
| zone_description = self._get_basic_zone_description(class_name, scene_type) | |
| if zone_description: | |
| zones[f"functional_area_{i+1}"] = { | |
| "region": region, | |
| "objects": [class_name], | |
| "description": zone_description | |
| } | |
| return zones | |
| def _get_basic_zone_description(self, class_name: str, scene_type: str) -> str: | |
| """Generate basic zone description based on object and scene type.""" | |
| # Object-specific descriptions | |
| descriptions = { | |
| "bed": "Sleeping and rest area", | |
| "sofa": "Seating and relaxation area", | |
| "chair": "Seating area", | |
| "dining table": "Dining and meal area", | |
| "tv": "Entertainment and media area", | |
| "laptop": "Work and computing area", | |
| "potted plant": "Decorative and green space area", | |
| "refrigerator": "Food storage and kitchen area", | |
| "car": "Vehicle and transportation area", | |
| "person": "Activity and social area" | |
| } | |
| return descriptions.get(class_name, f"Functional area with {class_name}") | |
| def _categorize_object(self, obj: Dict) -> str: | |
| """ | |
| Categorize detected objects into functional categories for zone identification. | |
| """ | |
| class_id = obj.get("class_id", -1) | |
| class_name = obj.get("class_name", "").lower() | |
| # Use existing category mapping if available | |
| if hasattr(self, 'OBJECT_CATEGORIES') and self.OBJECT_CATEGORIES: | |
| for category, ids in self.OBJECT_CATEGORIES.items(): | |
| if class_id in ids: | |
| return category | |
| # Fallback categorization based on class names for common COCO classes | |
| furniture_items = ["chair", "couch", "bed", "dining table", "toilet"] | |
| plant_items = ["potted plant"] | |
| electronic_items = ["tv", "laptop", "mouse", "remote", "keyboard", "cell phone"] | |
| vehicle_items = ["bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck", "boat"] | |
| person_items = ["person"] | |
| kitchen_items = ["bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", | |
| "banana", "apple", "sandwich", "orange", "broccoli", "carrot", "hot dog", | |
| "pizza", "donut", "cake", "refrigerator", "oven", "toaster", "sink", "microwave"] | |
| sports_items = ["frisbee", "skis", "snowboard", "sports ball", "kite", "baseball bat", | |
| "baseball glove", "skateboard", "surfboard", "tennis racket"] | |
| personal_items = ["handbag", "tie", "suitcase", "umbrella", "backpack"] | |
| if any(item in class_name for item in furniture_items): | |
| return "furniture" | |
| elif any(item in class_name for item in plant_items): | |
| return "plant" | |
| elif any(item in class_name for item in electronic_items): | |
| return "electronics" | |
| elif any(item in class_name for item in vehicle_items): | |
| return "vehicle" | |
| elif any(item in class_name for item in person_items): | |
| return "person" | |
| elif any(item in class_name for item in kitchen_items): | |
| return "kitchen_items" | |
| elif any(item in class_name for item in sports_items): | |
| return "sports" | |
| elif any(item in class_name for item in personal_items): | |
| return "personal_items" | |
| else: | |
| return "misc" | |
| def _evaluate_zone_identification_feasibility(self, detected_objects: List[Dict], scene_type: str) -> bool: | |
| """ | |
| 基於物件關聯性和分布特徵的彈性可行性評估 | |
| """ | |
| if len(detected_objects) < 2: | |
| return False | |
| # 計算不同置信度層級的物件分布 | |
| high_conf_objects = [obj for obj in detected_objects if obj.get("confidence", 0) >= 0.6] | |
| medium_conf_objects = [obj for obj in detected_objects if obj.get("confidence", 0) >= 0.4] | |
| # 基礎條件:至少需要一定數量的可信物件 | |
| if len(medium_conf_objects) < 2: | |
| return False | |
| # evalure relationships | |
| functional_relationships = self._calculate_functional_relationships(detected_objects) | |
| # 評估space的分布多樣性 | |
| spatial_diversity = self._calculate_spatial_diversity(detected_objects) | |
| # 綜合評分機制 | |
| feasibility_score = 0 | |
| # 物件數量的貢獻值(權重30%) | |
| object_count_score = min(len(detected_objects) / 5.0, 1.0) * 0.3 | |
| # 信心度質量貢獻(權重25%) | |
| confidence_score = len(high_conf_objects) / max(len(detected_objects), 1) * 0.25 | |
| # 功能關聯性貢獻(權重25%) | |
| relationship_score = functional_relationships * 0.25 | |
| # space多樣性貢獻(權重20%) | |
| diversity_score = spatial_diversity * 0.20 | |
| feasibility_score = object_count_score + confidence_score + relationship_score + diversity_score | |
| # 動態閾值:基於場景複雜度調整 | |
| complexity_threshold = self._get_complexity_threshold(scene_type) | |
| return feasibility_score >= complexity_threshold | |
| def _calculate_functional_relationships(self, detected_objects: List[Dict]) -> float: | |
| """ | |
| 計算物件間的功能關聯性評分 | |
| 基於常見的物件組合模式評估功能相關性 | |
| """ | |
| relationship_pairs = { | |
| # 家具組合關係 | |
| frozenset([56, 60]): 1.0, # 椅子+桌子 (dining/work area) | |
| frozenset([57, 62]): 0.9, # 沙發+電視 (living area) | |
| frozenset([59, 58]): 0.7, # 床+植物 (bedroom decor) | |
| # 工作相關組合 | |
| frozenset([63, 66]): 0.9, # 筆電+鍵盤 (workspace) | |
| frozenset([63, 64]): 0.8, # 筆電+滑鼠 (workspace) | |
| frozenset([60, 63]): 0.8, # 桌子+筆電 (workspace) | |
| # 廚房相關組合 | |
| frozenset([68, 72]): 0.9, # 微波爐+冰箱 (kitchen) | |
| frozenset([69, 71]): 0.8, # 烤箱+水槽 (kitchen) | |
| # 用餐相關組合 | |
| frozenset([60, 40]): 0.8, # 桌子+酒杯 (dining) | |
| frozenset([60, 41]): 0.8, # 桌子+杯子 (dining) | |
| frozenset([56, 40]): 0.7, # 椅子+酒杯 (dining) | |
| # 交通相關組合 | |
| frozenset([2, 9]): 0.8, # 汽車+交通燈 (traffic) | |
| frozenset([0, 9]): 0.7, # 行人+交通燈 (crosswalk) | |
| } | |
| detected_class_ids = set(obj["class_id"] for obj in detected_objects) | |
| max_possible_score = 0 | |
| actual_score = 0 | |
| for pair, score in relationship_pairs.items(): | |
| max_possible_score += score | |
| if pair.issubset(detected_class_ids): | |
| actual_score += score | |
| return actual_score / max_possible_score if max_possible_score > 0 else 0 | |
| def _calculate_spatial_diversity(self, detected_objects: List[Dict]) -> float: | |
| """ | |
| 計算物件空間分布的多樣性 | |
| 評估物件是否分散在不同區域,避免所有物件集中在單一區域 | |
| """ | |
| regions = set(obj.get("region", "center") for obj in detected_objects) | |
| unique_regions = len(regions) | |
| return min(unique_regions / 2.0, 1.0) | |
| def _get_complexity_threshold(self, scene_type: str) -> float: | |
| """ | |
| 可根據場景類型返回適當的複雜度閾值 | |
| 平衡不同場景的區域劃分需求 | |
| """ | |
| # 較簡單場景需要較高分數才進行區域劃分 | |
| simple_scenes = ["bedroom", "bathroom", "closet"] | |
| # 較複雜場景可以較低分數進行區域劃分 | |
| complex_scenes = ["living_room", "kitchen", "office_workspace", "dining_area"] | |
| if scene_type in simple_scenes: | |
| return 0.65 # 較高閾值,避免過度細分 | |
| elif scene_type in complex_scenes: | |
| return 0.45 # 較低閾值,允許合理劃分 | |
| else: | |
| return 0.55 # 中等閾值,平衡策略 | |
| def _identify_indoor_zones(self, category_regions: Dict, detected_objects: List[Dict], scene_type: str) -> Dict: | |
| """ | |
| 平衡化的室內功能區域識別 | |
| 採用通用的物件關聯性分析,避免場景特定的硬編碼 | |
| """ | |
| zones = {} | |
| # 辨識到主要功能區域(基於物件關聯性而非場景類型) | |
| primary_zone = self._identify_primary_functional_area(detected_objects) | |
| if primary_zone: | |
| zones["primary_area"] = primary_zone | |
| # 只有明確證據且物件數量足夠時創建次要功能區域 | |
| if len(zones) >= 1 and len(detected_objects) >= 6: | |
| secondary_zone = self._identify_secondary_functional_area(detected_objects, zones) | |
| if secondary_zone: | |
| zones["secondary_area"] = secondary_zone | |
| return zones | |
| def _identify_primary_functional_area(self, detected_objects: List[Dict]) -> Dict: | |
| """ | |
| 辨識主要功能區域,基於最強的物件關聯性組合 | |
| 採用通用邏輯處理各種室內場景 | |
| """ | |
| # 用餐區域檢測(桌椅組合) | |
| dining_area = self._detect_functional_combination( | |
| detected_objects, | |
| primary_objects=[60], # dining table | |
| supporting_objects=[56, 40, 41, 42, 43], # chair, wine glass, cup, fork, knife | |
| min_supporting=2, | |
| description_template="Dining area with table and seating arrangement" | |
| ) | |
| if dining_area: | |
| return dining_area | |
| # 休息區域檢測(沙發電視組合或床) | |
| seating_area = self._detect_functional_combination( | |
| detected_objects, | |
| primary_objects=[57, 59], # sofa, bed | |
| supporting_objects=[62, 58, 56], # tv, potted plant, chair | |
| min_supporting=1, | |
| description_template="Seating and relaxation area" | |
| ) | |
| if seating_area: | |
| return seating_area | |
| # 工作區域檢測(電子設備與家具組合) | |
| work_area = self._detect_functional_combination( | |
| detected_objects, | |
| primary_objects=[63, 66], # laptop, keyboard | |
| supporting_objects=[60, 56, 64], # dining table, chair, mouse | |
| min_supporting=2, | |
| description_template="Workspace area with electronics and furniture" | |
| ) | |
| if work_area: | |
| return work_area | |
| return None | |
| def _identify_secondary_functional_area(self, detected_objects: List[Dict], existing_zones: Dict) -> Dict: | |
| """ | |
| 識別次要功能區域,避免與主要區域重疊 | |
| """ | |
| # 獲取已使用的區域 | |
| used_regions = set(zone["region"] for zone in existing_zones.values()) | |
| # 裝飾區域檢測(植物集中區域) | |
| decorative_area = self._detect_functional_combination( | |
| detected_objects, | |
| primary_objects=[58], # potted plant | |
| supporting_objects=[75], # vase | |
| min_supporting=0, | |
| min_primary=3, # 至少需要3個植物 | |
| description_template="Decorative area with plants and ornamental items", | |
| exclude_regions=used_regions | |
| ) | |
| if decorative_area: | |
| return decorative_area | |
| # 儲存區域檢測(廚房電器組合) | |
| storage_area = self._detect_functional_combination( | |
| detected_objects, | |
| primary_objects=[72, 68, 69], # refrigerator, microwave, oven | |
| supporting_objects=[71], # sink | |
| min_supporting=0, | |
| min_primary=2, | |
| description_template="Kitchen appliance and storage area", | |
| exclude_regions=used_regions | |
| ) | |
| if storage_area: | |
| return storage_area | |
| return None | |
| def _detect_functional_combination(self, detected_objects: List[Dict], primary_objects: List[int], | |
| supporting_objects: List[int], min_supporting: int, | |
| description_template: str, min_primary: int = 1, | |
| exclude_regions: set = None) -> Dict: | |
| """ | |
| 通用的功能組合檢測方法 | |
| 基於主要物件和支持物件的組合判斷功能區域 | |
| Args: | |
| detected_objects: 檢測到的物件列表 | |
| primary_objects: 主要物件的class_id列表 | |
| supporting_objects: 支持物件的class_id列表 | |
| min_supporting: 最少需要的支持物件數量 | |
| description_template: 描述模板 | |
| min_primary: 最少需要的主要物件數量 | |
| exclude_regions: 需要排除的區域集合 | |
| Returns: | |
| Dict: 功能區域資訊,如果不符合條件則返回None | |
| """ | |
| if exclude_regions is None: | |
| exclude_regions = set() | |
| # 收集主要物件 | |
| primary_objs = [obj for obj in detected_objects | |
| if obj["class_id"] in primary_objects and obj.get("confidence", 0) >= 0.4] | |
| # 收集支持物件 | |
| supporting_objs = [obj for obj in detected_objects | |
| if obj["class_id"] in supporting_objects and obj.get("confidence", 0) >= 0.4] | |
| # 檢查是否滿足最少數量要求 | |
| if len(primary_objs) < min_primary or len(supporting_objs) < min_supporting: | |
| return None | |
| # 按區域組織物件 | |
| region_combinations = {} | |
| all_relevant_objs = primary_objs + supporting_objs | |
| for obj in all_relevant_objs: | |
| region = obj["region"] | |
| # 排除指定區域 | |
| if region in exclude_regions: | |
| continue | |
| if region not in region_combinations: | |
| region_combinations[region] = {"primary": [], "supporting": [], "all": []} | |
| region_combinations[region]["all"].append(obj) | |
| if obj["class_id"] in primary_objects: | |
| region_combinations[region]["primary"].append(obj) | |
| else: | |
| region_combinations[region]["supporting"].append(obj) | |
| # 找到最佳區域組合 | |
| best_region = None | |
| best_score = 0 | |
| for region, objs in region_combinations.items(): | |
| # 計算該區域的評分 | |
| primary_count = len(objs["primary"]) | |
| supporting_count = len(objs["supporting"]) | |
| # 必須滿足最低要求 | |
| if primary_count < min_primary or supporting_count < min_supporting: | |
| continue | |
| # 計算組合評分(主要物件權重較高) | |
| score = primary_count * 2 + supporting_count | |
| if score > best_score: | |
| best_score = score | |
| best_region = region | |
| if best_region is None: | |
| return None | |
| best_combination = region_combinations[best_region] | |
| all_objects = [obj["class_name"] for obj in best_combination["all"]] | |
| return { | |
| "region": best_region, | |
| "objects": all_objects, | |
| "description": description_template | |
| } | |
| def _identify_intersection_zones(self, category_regions: Dict, detected_objects: List[Dict], viewpoint: str) -> Dict: | |
| """ | |
| Identify functional zones for urban intersections with enhanced spatial awareness. | |
| Args: | |
| category_regions: Objects grouped by category and region | |
| detected_objects: List of detected objects | |
| viewpoint: Detected viewpoint | |
| Returns: | |
| Dict: Refined intersection functional zones | |
| """ | |
| zones = {} | |
| # Get pedestrians, vehicles and traffic signals | |
| pedestrian_objs = [obj for obj in detected_objects if obj["class_id"] == 0] | |
| vehicle_objs = [obj for obj in detected_objects if obj["class_id"] in [1, 2, 3, 5, 7]] # bicycle, car, motorcycle, bus, truck | |
| traffic_light_objs = [obj for obj in detected_objects if obj["class_id"] == 9] | |
| # Create distribution maps for better spatial understanding | |
| regions_distribution = self._create_distribution_map(detected_objects) | |
| # Analyze pedestrian crossing patterns | |
| crossing_zones = self._analyze_crossing_patterns(pedestrian_objs, traffic_light_objs, regions_distribution) | |
| zones.update(crossing_zones) | |
| # Analyze vehicle traffic zones with directional awareness | |
| traffic_zones = self._analyze_traffic_zones(vehicle_objs, regions_distribution) | |
| zones.update(traffic_zones) | |
| # Identify traffic control zones based on signal placement | |
| if traffic_light_objs: | |
| # Group traffic lights by region for better organization | |
| signal_regions = {} | |
| for obj in traffic_light_objs: | |
| region = obj["region"] | |
| if region not in signal_regions: | |
| signal_regions[region] = [] | |
| signal_regions[region].append(obj) | |
| # Create traffic control zones for each region with signals | |
| for idx, (region, signals) in enumerate(signal_regions.items()): | |
| # Check if this region has a directional name | |
| direction = self._get_directional_description(region) | |
| zones[f"traffic_control_zone_{idx+1}"] = { | |
| "region": region, | |
| "objects": ["traffic light"] * len(signals), | |
| "description": f"Traffic control area with {len(signals)} traffic signals" + | |
| (f" in {direction} area" if direction else "") | |
| } | |
| return zones | |
| def _identify_landmark_zones(self, landmark_objects: List[Dict]) -> Dict: | |
| """ | |
| 識別與地標相關的功能區域 | |
| Args: | |
| landmark_objects: 被識別為地標的物體列表 | |
| Returns: | |
| Dict: 地標相關的功能區域 | |
| """ | |
| landmark_zones = {} | |
| if not landmark_objects: | |
| print("Warning: No landmark objects provided to _identify_landmark_zones") | |
| return landmark_zones | |
| try: | |
| for i, landmark in enumerate(landmark_objects): | |
| if not isinstance(landmark, dict): | |
| print(f"Warning: Landmark object at index {i} is not a dictionary: {type(landmark)}") | |
| continue | |
| landmark_id = landmark.get("landmark_id") | |
| if not landmark_id: | |
| print(f"Warning: Missing landmark_id for landmark at index {i}") | |
| landmark_id = f"unknown_landmark_{i}" | |
| landmark_name = landmark.get("class_name", "Landmark") | |
| landmark_type = landmark.get("landmark_type", "architectural") | |
| landmark_region = landmark.get("region", "middle_center") | |
| # 為地標創建主要觀景區 | |
| zone_id = f"landmark_zone_{i+1}" | |
| zone_name = f"{landmark_name} Viewing Area" | |
| # 根據地標類型調整描述 | |
| if landmark_type == "natural": | |
| zone_description = f"Scenic viewpoint for observing {landmark_name}, a notable natural landmark in {landmark.get('location', 'this area')}." | |
| primary_function = "Nature observation and photography" | |
| elif landmark_type == "monument": | |
| zone_description = f"Viewing area around {landmark_name}, a significant monument in {landmark.get('location', 'this area')}." | |
| primary_function = "Historical appreciation and cultural tourism" | |
| else: # architectural | |
| zone_description = f"Area centered around {landmark_name}, where visitors can observe and appreciate this iconic structure in {landmark.get('location', 'this area')}." | |
| primary_function = "Architectural tourism and photography" | |
| # 確定與地標相關的物體 | |
| related_objects = ["person", "camera", "cell phone", "backpack"] | |
| # 創建功能區域 | |
| landmark_zones[zone_id] = { | |
| "name": zone_name, | |
| "description": zone_description, | |
| "objects": ["landmark"] + [obj for obj in related_objects if obj in [o.get("class_name") for o in landmark_objects]], | |
| "region": landmark_region, | |
| "primary_function": primary_function | |
| } | |
| # 如果有建造年份信息,加到描述中 | |
| if "year_built" in landmark: | |
| landmark_zones[zone_id]["description"] += f" Built in {landmark['year_built']}." | |
| # 如果有建築風格信息,加到描述中 | |
| if "architectural_style" in landmark: | |
| landmark_zones[zone_id]["description"] += f" Features {landmark['architectural_style']} architectural style." | |
| # 如果有重要性信息,加到描述中 | |
| if "significance" in landmark: | |
| landmark_zones[zone_id]["description"] += f" {landmark['significance']}." | |
| try: | |
| # 創建照相區 | |
| photo_region = landmark_region # 默認與地標在同一區域 | |
| # 根據地標位置調整照相區位置(地標前方通常是照相區) | |
| region_mapping = { | |
| "top_left": "bottom_right", | |
| "top_center": "bottom_center", | |
| "top_right": "bottom_left", | |
| "middle_left": "middle_right", | |
| "middle_center": "bottom_center", | |
| "middle_right": "middle_left", | |
| "bottom_left": "top_right", | |
| "bottom_center": "top_center", | |
| "bottom_right": "top_left" | |
| } | |
| if landmark_region in region_mapping: | |
| photo_region = region_mapping[landmark_region] | |
| landmark_zones[f"photo_spot_{i+1}"] = { | |
| "name": f"{landmark_name} Photography Spot", | |
| "description": f"Popular position for photographing {landmark_name} with optimal viewing angle.", | |
| "objects": ["camera", "person", "cell phone"], | |
| "region": photo_region, | |
| "primary_function": "Tourist photography" | |
| } | |
| except Exception as e: | |
| print(f"Error creating photo spot zone: {e}") | |
| try: | |
| # 如果是著名地標,可能有紀念品販售區 | |
| if landmark.get("confidence", 0) > 0.7: # 高置信度地標更可能有紀念品區 | |
| # 根據地標位置找到適合的紀念品區位置(通常在地標附近但不直接在地標上) | |
| adjacent_regions = { | |
| "top_left": ["top_center", "middle_left"], | |
| "top_center": ["top_left", "top_right"], | |
| "top_right": ["top_center", "middle_right"], | |
| "middle_left": ["top_left", "bottom_left"], | |
| "middle_center": ["middle_left", "middle_right"], | |
| "middle_right": ["top_right", "bottom_right"], | |
| "bottom_left": ["middle_left", "bottom_center"], | |
| "bottom_center": ["bottom_left", "bottom_right"], | |
| "bottom_right": ["bottom_center", "middle_right"] | |
| } | |
| if landmark_region in adjacent_regions: | |
| souvenir_region = adjacent_regions[landmark_region][0] # 選擇第一個相鄰區域 | |
| landmark_zones[f"souvenir_area_{i+1}"] = { | |
| "name": f"{landmark_name} Souvenir Area", | |
| "description": f"Area where visitors can purchase souvenirs and memorabilia related to {landmark_name}.", | |
| "objects": ["person", "handbag", "backpack"], | |
| "region": souvenir_region, | |
| "primary_function": "Tourism commerce" | |
| } | |
| except Exception as e: | |
| print(f"Error creating souvenir area zone: {e}") | |
| except Exception as e: | |
| print(f"Error in _identify_landmark_zones: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return landmark_zones | |
| def _analyze_crossing_patterns(self, pedestrians: List[Dict], traffic_lights: List[Dict], | |
| region_distribution: Dict) -> Dict: | |
| """ | |
| Analyze pedestrian crossing patterns to identify crosswalk zones. | |
| Args: | |
| pedestrians: List of pedestrian objects | |
| traffic_lights: List of traffic light objects | |
| region_distribution: Distribution of objects by region | |
| Returns: | |
| Dict: Identified crossing zones | |
| """ | |
| crossing_zones = {} | |
| if not pedestrians: | |
| return crossing_zones | |
| # Group pedestrians by region | |
| pedestrian_regions = {} | |
| for p in pedestrians: | |
| region = p["region"] | |
| if region not in pedestrian_regions: | |
| pedestrian_regions[region] = [] | |
| pedestrian_regions[region].append(p) | |
| # Sort regions by pedestrian count to find main crossing areas | |
| sorted_regions = sorted(pedestrian_regions.items(), key=lambda x: len(x[1]), reverse=True) | |
| # Create crossing zones for regions with pedestrians | |
| for idx, (region, peds) in enumerate(sorted_regions[:2]): # Focus on top 2 regions | |
| # Check if there are traffic lights nearby to indicate a crosswalk | |
| has_nearby_signals = any(t["region"] == region for t in traffic_lights) | |
| # Create crossing zone with descriptive naming | |
| zone_name = f"crossing_zone_{idx+1}" | |
| direction = self._get_directional_description(region) | |
| description = f"Pedestrian crossing area with {len(peds)} " | |
| description += "person" if len(peds) == 1 else "people" | |
| if direction: | |
| description += f" in {direction} direction" | |
| if has_nearby_signals: | |
| description += " near traffic signals" | |
| crossing_zones[zone_name] = { | |
| "region": region, | |
| "objects": ["pedestrian"] * len(peds), | |
| "description": description | |
| } | |
| return crossing_zones | |
| def _analyze_traffic_zones(self, vehicles: List[Dict], region_distribution: Dict) -> Dict: | |
| """ | |
| Analyze vehicle distribution to identify traffic zones with directional awareness. | |
| Args: | |
| vehicles: List of vehicle objects | |
| region_distribution: Distribution of objects by region | |
| Returns: | |
| Dict: Identified traffic zones | |
| """ | |
| traffic_zones = {} | |
| if not vehicles: | |
| return traffic_zones | |
| # 把運輸工具歸成一區 | |
| vehicle_regions = {} | |
| for v in vehicles: | |
| region = v["region"] | |
| if region not in vehicle_regions: | |
| vehicle_regions[region] = [] | |
| vehicle_regions[region].append(v) | |
| # Create traffic zones for regions with vehicles | |
| main_traffic_region = max(vehicle_regions.items(), key=lambda x: len(x[1]), default=(None, [])) | |
| if main_traffic_region[0] is not None: | |
| region = main_traffic_region[0] | |
| vehicles_in_region = main_traffic_region[1] | |
| # Get a list of vehicle types for description | |
| vehicle_types = [v["class_name"] for v in vehicles_in_region] | |
| unique_types = list(set(vehicle_types)) | |
| # Get directional description | |
| direction = self._get_directional_description(region) | |
| # Create descriptive zone | |
| traffic_zones["vehicle_zone"] = { | |
| "region": region, | |
| "objects": vehicle_types, | |
| "description": f"Vehicle traffic area with {', '.join(unique_types[:3])}" + | |
| (f" in {direction} area" if direction else "") | |
| } | |
| # If vehicles are distributed across multiple regions, create secondary zones | |
| if len(vehicle_regions) > 1: | |
| # Get second most populated region | |
| sorted_regions = sorted(vehicle_regions.items(), key=lambda x: len(x[1]), reverse=True) | |
| if len(sorted_regions) > 1: | |
| second_region, second_vehicles = sorted_regions[1] | |
| direction = self._get_directional_description(second_region) | |
| vehicle_types = [v["class_name"] for v in second_vehicles] | |
| unique_types = list(set(vehicle_types)) | |
| traffic_zones["secondary_vehicle_zone"] = { | |
| "region": second_region, | |
| "objects": vehicle_types, | |
| "description": f"Secondary traffic area with {', '.join(unique_types[:2])}" + | |
| (f" in {direction} direction" if direction else "") | |
| } | |
| return traffic_zones | |
| def _get_directional_description(self, region: str) -> str: | |
| """ | |
| 把方向轉換成方位(東西南北) | |
| Args: | |
| region: Region name from the grid | |
| Returns: | |
| str: Directional description | |
| """ | |
| if "top" in region and "left" in region: | |
| return "northwest" | |
| elif "top" in region and "right" in region: | |
| return "northeast" | |
| elif "bottom" in region and "left" in region: | |
| return "southwest" | |
| elif "bottom" in region and "right" in region: | |
| return "southeast" | |
| elif "top" in region: | |
| return "north" | |
| elif "bottom" in region: | |
| return "south" | |
| elif "left" in region: | |
| return "west" | |
| elif "right" in region: | |
| return "east" | |
| else: | |
| return "central" | |
| def _create_distribution_map(self, detected_objects: List[Dict]) -> Dict: | |
| """ | |
| Create a distribution map of objects across regions for spatial analysis. | |
| Args: | |
| detected_objects: List of detected objects | |
| Returns: | |
| Dict: Distribution map of objects by region and class | |
| """ | |
| distribution = {} | |
| # Initialize all regions | |
| for region in self.regions.keys(): | |
| distribution[region] = { | |
| "total": 0, | |
| "objects": {}, | |
| "density": 0 | |
| } | |
| # Populate the distribution | |
| for obj in detected_objects: | |
| region = obj["region"] | |
| class_id = obj["class_id"] | |
| class_name = obj["class_name"] | |
| distribution[region]["total"] += 1 | |
| if class_id not in distribution[region]["objects"]: | |
| distribution[region]["objects"][class_id] = { | |
| "name": class_name, | |
| "count": 0, | |
| "positions": [] | |
| } | |
| distribution[region]["objects"][class_id]["count"] += 1 | |
| # Store position for spatial relationship analysis | |
| if "normalized_center" in obj: | |
| distribution[region]["objects"][class_id]["positions"].append(obj["normalized_center"]) | |
| # Calculate object density for each region | |
| for region, data in distribution.items(): | |
| # Assuming all regions are equal size in the grid | |
| data["density"] = data["total"] / 1 | |
| return distribution | |
| def _identify_asian_cultural_zones(self, category_regions: Dict, detected_objects: List[Dict], scene_type: str) -> Dict: | |
| """ | |
| Identify functional zones for scenes with Asian cultural context. | |
| Args: | |
| category_regions: Objects grouped by category and region | |
| detected_objects: List of detected objects | |
| scene_type: Specific scene type | |
| Returns: | |
| Dict: Asian cultural functional zones | |
| """ | |
| zones = {} | |
| # Identify storefront zone | |
| storefront_items = [] | |
| storefront_regions = {} | |
| # Since storefronts aren't directly detectable, infer from context | |
| # For example, look for regions with signs, people, and smaller objects | |
| sign_regions = set() | |
| for obj in detected_objects: | |
| if obj["class_id"] == 0: # Person | |
| region = obj["region"] | |
| if region not in storefront_regions: | |
| storefront_regions[region] = [] | |
| storefront_regions[region].append(obj) | |
| # Add regions with people as potential storefront areas | |
| sign_regions.add(region) | |
| # Use the areas with most people as storefront zones | |
| if storefront_regions: | |
| main_storefront_regions = sorted(storefront_regions.items(), | |
| key=lambda x: len(x[1]), | |
| reverse=True)[:2] # Top 2 regions | |
| for idx, (region, objs) in enumerate(main_storefront_regions): | |
| zones[f"commercial_zone_{idx+1}"] = { | |
| "region": region, | |
| "objects": [obj["class_name"] for obj in objs], | |
| "description": f"Asian commercial storefront with pedestrian activity" | |
| } | |
| # Identify pedestrian pathway - enhanced to better detect linear pathways | |
| pathway_items = [] | |
| pathway_regions = {} | |
| # Extract people for pathway analysis | |
| people_objs = [obj for obj in detected_objects if obj["class_id"] == 0] | |
| # Analyze if people form a line (typical of shopping streets) | |
| people_positions = [obj["normalized_center"] for obj in people_objs] | |
| structured_path = False | |
| if len(people_positions) >= 3: | |
| # Check if people are arranged along a similar y-coordinate (horizontal path) | |
| y_coords = [pos[1] for pos in people_positions] | |
| y_mean = sum(y_coords) / len(y_coords) | |
| y_variance = sum((y - y_mean)**2 for y in y_coords) / len(y_coords) | |
| horizontal_path = y_variance < 0.05 # Low variance indicates horizontal alignment | |
| # Check if people are arranged along a similar x-coordinate (vertical path) | |
| x_coords = [pos[0] for pos in people_positions] | |
| x_mean = sum(x_coords) / len(x_coords) | |
| x_variance = sum((x - x_mean)**2 for x in x_coords) / len(x_coords) | |
| vertical_path = x_variance < 0.05 # Low variance indicates vertical alignment | |
| structured_path = horizontal_path or vertical_path | |
| path_direction = "horizontal" if horizontal_path else "vertical" if vertical_path else "meandering" | |
| # Collect pathway objects (people, bicycles, motorcycles in middle area) | |
| for obj in detected_objects: | |
| if obj["class_id"] in [0, 1, 3]: # Person, bicycle, motorcycle | |
| y_pos = obj["normalized_center"][1] | |
| # Group by vertical position (middle of image likely pathway) | |
| if 0.25 <= y_pos <= 0.75: | |
| region = obj["region"] | |
| if region not in pathway_regions: | |
| pathway_regions[region] = [] | |
| pathway_regions[region].append(obj) | |
| pathway_items.append(obj["class_name"]) | |
| if pathway_items: | |
| path_desc = "Pedestrian walkway with people moving through the commercial area" | |
| if structured_path: | |
| path_desc = f"{path_direction.capitalize()} pedestrian walkway with organized foot traffic" | |
| zones["pedestrian_pathway"] = { | |
| "region": "middle_center", # Assumption: pathway often in middle | |
| "objects": list(set(pathway_items)), | |
| "description": path_desc | |
| } | |
| # Identify vendor zone (small stalls/shops - inferred from context) | |
| has_small_objects = any(obj["class_id"] in [24, 26, 39, 41] for obj in detected_objects) # bags, bottles, cups | |
| has_people = any(obj["class_id"] == 0 for obj in detected_objects) | |
| if has_small_objects and has_people: | |
| # Likely vendor areas are where people and small objects cluster | |
| small_obj_regions = {} | |
| for obj in detected_objects: | |
| if obj["class_id"] in [24, 26, 39, 41, 67]: # bags, bottles, cups, phones | |
| region = obj["region"] | |
| if region not in small_obj_regions: | |
| small_obj_regions[region] = [] | |
| small_obj_regions[region].append(obj) | |
| if small_obj_regions: | |
| main_vendor_region = max(small_obj_regions.items(), | |
| key=lambda x: len(x[1]), | |
| default=(None, [])) | |
| if main_vendor_region[0] is not None: | |
| vendor_items = [obj["class_name"] for obj in main_vendor_region[1]] | |
| zones["vendor_zone"] = { | |
| "region": main_vendor_region[0], | |
| "objects": list(set(vendor_items)), | |
| "description": "Vendor or market stall area with small merchandise" | |
| } | |
| # For night markets, identify illuminated zones | |
| if scene_type == "asian_night_market": | |
| # Night markets typically have bright spots for food stalls | |
| # This would be enhanced with lighting analysis integration | |
| zones["food_stall_zone"] = { | |
| "region": "middle_center", | |
| "objects": ["inferred food stalls"], | |
| "description": "Food stall area typical of Asian night markets" | |
| } | |
| return zones | |
| def _identify_upscale_dining_zones(self, category_regions: Dict, detected_objects: List[Dict]) -> Dict: | |
| """ | |
| Identify functional zones for upscale dining settings. | |
| Args: | |
| category_regions: Objects grouped by category and region | |
| detected_objects: List of detected objects | |
| Returns: | |
| Dict: Upscale dining functional zones | |
| """ | |
| zones = {} | |
| # Identify dining table zone | |
| dining_items = [] | |
| dining_regions = {} | |
| for obj in detected_objects: | |
| if obj["class_id"] in [40, 41, 42, 43, 44, 45, 60]: # Wine glass, cup, fork, knife, spoon, bowl, table | |
| region = obj["region"] | |
| if region not in dining_regions: | |
| dining_regions[region] = [] | |
| dining_regions[region].append(obj) | |
| dining_items.append(obj["class_name"]) | |
| if dining_items: | |
| main_dining_region = max(dining_regions.items(), | |
| key=lambda x: len(x[1]), | |
| default=(None, [])) | |
| if main_dining_region[0] is not None: | |
| zones["formal_dining_zone"] = { | |
| "region": main_dining_region[0], | |
| "objects": list(set(dining_items)), | |
| "description": f"Formal dining area with {', '.join(list(set(dining_items))[:3])}" | |
| } | |
| # Identify decorative zone with enhanced detection | |
| decor_items = [] | |
| decor_regions = {} | |
| # Look for decorative elements (vases, wine glasses, unused dishes) | |
| for obj in detected_objects: | |
| if obj["class_id"] in [75, 40]: # Vase, wine glass | |
| region = obj["region"] | |
| if region not in decor_regions: | |
| decor_regions[region] = [] | |
| decor_regions[region].append(obj) | |
| decor_items.append(obj["class_name"]) | |
| if decor_items: | |
| main_decor_region = max(decor_regions.items(), | |
| key=lambda x: len(x[1]), | |
| default=(None, [])) | |
| if main_decor_region[0] is not None: | |
| zones["decorative_zone"] = { | |
| "region": main_decor_region[0], | |
| "objects": list(set(decor_items)), | |
| "description": f"Decorative area with {', '.join(list(set(decor_items)))}" | |
| } | |
| # Identify seating arrangement zone | |
| chairs = [obj for obj in detected_objects if obj["class_id"] == 56] # chairs | |
| if len(chairs) >= 2: | |
| chair_regions = {} | |
| for obj in chairs: | |
| region = obj["region"] | |
| if region not in chair_regions: | |
| chair_regions[region] = [] | |
| chair_regions[region].append(obj) | |
| if chair_regions: | |
| main_seating_region = max(chair_regions.items(), | |
| key=lambda x: len(x[1]), | |
| default=(None, [])) | |
| if main_seating_region[0] is not None: | |
| zones["dining_seating_zone"] = { | |
| "region": main_seating_region[0], | |
| "objects": ["chair"] * len(main_seating_region[1]), | |
| "description": f"Formal dining seating arrangement with {len(main_seating_region[1])} chairs" | |
| } | |
| # Identify serving area (if different from dining area) | |
| serving_items = [] | |
| serving_regions = {} | |
| # Serving areas might have bottles, bowls, containers | |
| for obj in detected_objects: | |
| if obj["class_id"] in [39, 45]: # Bottle, bowl | |
| # Check if it's in a different region from the main dining table | |
| if "formal_dining_zone" in zones and obj["region"] != zones["formal_dining_zone"]["region"]: | |
| region = obj["region"] | |
| if region not in serving_regions: | |
| serving_regions[region] = [] | |
| serving_regions[region].append(obj) | |
| serving_items.append(obj["class_name"]) | |
| if serving_items: | |
| main_serving_region = max(serving_regions.items(), | |
| key=lambda x: len(x[1]), | |
| default=(None, [])) | |
| if main_serving_region[0] is not None: | |
| zones["serving_zone"] = { | |
| "region": main_serving_region[0], | |
| "objects": list(set(serving_items)), | |
| "description": f"Serving or sideboard area with {', '.join(list(set(serving_items)))}" | |
| } | |
| return zones | |
| def _identify_financial_district_zones(self, category_regions: Dict, detected_objects: List[Dict]) -> Dict: | |
| """ | |
| Identify functional zones for financial district scenes. | |
| Args: | |
| category_regions: Objects grouped by category and region | |
| detected_objects: List of detected objects | |
| Returns: | |
| Dict: Financial district functional zones | |
| """ | |
| zones = {} | |
| # Identify traffic zone | |
| traffic_items = [] | |
| traffic_regions = {} | |
| for obj in detected_objects: | |
| if obj["class_id"] in [1, 2, 3, 5, 6, 7, 9]: # Various vehicles and traffic lights | |
| region = obj["region"] | |
| if region not in traffic_regions: | |
| traffic_regions[region] = [] | |
| traffic_regions[region].append(obj) | |
| traffic_items.append(obj["class_name"]) | |
| if traffic_items: | |
| main_traffic_region = max(traffic_regions.items(), | |
| key=lambda x: len(x[1]), | |
| default=(None, [])) | |
| if main_traffic_region[0] is not None: | |
| zones["traffic_zone"] = { | |
| "region": main_traffic_region[0], | |
| "objects": list(set(traffic_items)), | |
| "description": f"Urban traffic area with {', '.join(list(set(traffic_items))[:3])}" | |
| } | |
| # Building zones on the sides (inferred from scene context) | |
| # Enhanced to check if there are actual regions that might contain buildings | |
| # Check for regions without vehicles or pedestrians - likely building areas | |
| left_side_regions = ["top_left", "middle_left", "bottom_left"] | |
| right_side_regions = ["top_right", "middle_right", "bottom_right"] | |
| # Check left side | |
| left_building_evidence = True | |
| for region in left_side_regions: | |
| # If many vehicles or people in this region, less likely to be buildings | |
| vehicle_in_region = any(obj["region"] == region and obj["class_id"] in [1, 2, 3, 5, 7] | |
| for obj in detected_objects) | |
| people_in_region = any(obj["region"] == region and obj["class_id"] == 0 | |
| for obj in detected_objects) | |
| if vehicle_in_region or people_in_region: | |
| left_building_evidence = False | |
| break | |
| # Check right side | |
| right_building_evidence = True | |
| for region in right_side_regions: | |
| # If many vehicles or people in this region, less likely to be buildings | |
| vehicle_in_region = any(obj["region"] == region and obj["class_id"] in [1, 2, 3, 5, 7] | |
| for obj in detected_objects) | |
| people_in_region = any(obj["region"] == region and obj["class_id"] == 0 | |
| for obj in detected_objects) | |
| if vehicle_in_region or people_in_region: | |
| right_building_evidence = False | |
| break | |
| # Add building zones if evidence supports them | |
| if left_building_evidence: | |
| zones["building_zone_left"] = { | |
| "region": "middle_left", | |
| "objects": ["building"], # Inferred | |
| "description": "Tall buildings line the left side of the street" | |
| } | |
| if right_building_evidence: | |
| zones["building_zone_right"] = { | |
| "region": "middle_right", | |
| "objects": ["building"], # Inferred | |
| "description": "Tall buildings line the right side of the street" | |
| } | |
| # Identify pedestrian zone if people are present | |
| people_objs = [obj for obj in detected_objects if obj["class_id"] == 0] | |
| if people_objs: | |
| people_regions = {} | |
| for obj in people_objs: | |
| region = obj["region"] | |
| if region not in people_regions: | |
| people_regions[region] = [] | |
| people_regions[region].append(obj) | |
| if people_regions: | |
| main_pedestrian_region = max(people_regions.items(), | |
| key=lambda x: len(x[1]), | |
| default=(None, [])) | |
| if main_pedestrian_region[0] is not None: | |
| zones["pedestrian_zone"] = { | |
| "region": main_pedestrian_region[0], | |
| "objects": ["person"] * len(main_pedestrian_region[1]), | |
| "description": f"Pedestrian area with {len(main_pedestrian_region[1])} people navigating the financial district" | |
| } | |
| return zones | |
| def _identify_aerial_view_zones(self, category_regions: Dict, detected_objects: List[Dict], scene_type: str) -> Dict: | |
| """ | |
| Identify functional zones for scenes viewed from an aerial perspective. | |
| Args: | |
| category_regions: Objects grouped by category and region | |
| detected_objects: List of detected objects | |
| scene_type: Specific scene type | |
| Returns: | |
| Dict: Aerial view functional zones | |
| """ | |
| zones = {} | |
| # For aerial views, we focus on patterns and flows rather than specific zones | |
| # Identify pedestrian patterns | |
| people_objs = [obj for obj in detected_objects if obj["class_id"] == 0] | |
| if people_objs: | |
| # Convert positions to arrays for pattern analysis | |
| positions = np.array([obj["normalized_center"] for obj in people_objs]) | |
| if len(positions) >= 3: | |
| # Calculate distribution metrics | |
| x_coords = positions[:, 0] | |
| y_coords = positions[:, 1] | |
| x_mean = np.mean(x_coords) | |
| y_mean = np.mean(y_coords) | |
| x_std = np.std(x_coords) | |
| y_std = np.std(y_coords) | |
| # Determine if people are organized in a linear pattern | |
| if x_std < 0.1 or y_std < 0.1: | |
| # Linear distribution along one axis | |
| pattern_direction = "vertical" if x_std < y_std else "horizontal" | |
| zones["pedestrian_pattern"] = { | |
| "region": "central", | |
| "objects": ["person"] * len(people_objs), | |
| "description": f"Aerial view shows a {pattern_direction} pedestrian movement pattern" | |
| } | |
| else: | |
| # More dispersed pattern | |
| zones["pedestrian_distribution"] = { | |
| "region": "wide", | |
| "objects": ["person"] * len(people_objs), | |
| "description": f"Aerial view shows pedestrians distributed across the area" | |
| } | |
| # Identify vehicle patterns for traffic analysis | |
| vehicle_objs = [obj for obj in detected_objects if obj["class_id"] in [1, 2, 3, 5, 6, 7]] | |
| if vehicle_objs: | |
| # Convert positions to arrays for pattern analysis | |
| positions = np.array([obj["normalized_center"] for obj in vehicle_objs]) | |
| if len(positions) >= 2: | |
| # Calculate distribution metrics | |
| x_coords = positions[:, 0] | |
| y_coords = positions[:, 1] | |
| x_mean = np.mean(x_coords) | |
| y_mean = np.mean(y_coords) | |
| x_std = np.std(x_coords) | |
| y_std = np.std(y_coords) | |
| # Determine if vehicles are organized in lanes | |
| if x_std < y_std * 0.5: | |
| # Vehicles aligned vertically - indicates north-south traffic | |
| zones["vertical_traffic_flow"] = { | |
| "region": "central_vertical", | |
| "objects": [obj["class_name"] for obj in vehicle_objs[:5]], | |
| "description": "North-south traffic flow visible from aerial view" | |
| } | |
| elif y_std < x_std * 0.5: | |
| # Vehicles aligned horizontally - indicates east-west traffic | |
| zones["horizontal_traffic_flow"] = { | |
| "region": "central_horizontal", | |
| "objects": [obj["class_name"] for obj in vehicle_objs[:5]], | |
| "description": "East-west traffic flow visible from aerial view" | |
| } | |
| else: | |
| # Vehicles in multiple directions - indicates intersection | |
| zones["intersection_traffic"] = { | |
| "region": "central", | |
| "objects": [obj["class_name"] for obj in vehicle_objs[:5]], | |
| "description": "Multi-directional traffic at intersection visible from aerial view" | |
| } | |
| # For intersection specific aerial views, identify crossing patterns | |
| if "intersection" in scene_type: | |
| # Check for traffic signals | |
| traffic_light_objs = [obj for obj in detected_objects if obj["class_id"] == 9] | |
| if traffic_light_objs: | |
| zones["traffic_control_pattern"] = { | |
| "region": "intersection", | |
| "objects": ["traffic light"] * len(traffic_light_objs), | |
| "description": f"Intersection traffic control with {len(traffic_light_objs)} signals visible from above" | |
| } | |
| # Crosswalks are inferred from context in aerial views | |
| zones["crossing_pattern"] = { | |
| "region": "central", | |
| "objects": ["inferred crosswalk"], | |
| "description": "Crossing pattern visible from aerial perspective" | |
| } | |
| # For plaza aerial views, identify gathering patterns | |
| if "plaza" in scene_type: | |
| # Plazas typically have central open area with people | |
| if people_objs: | |
| # Check if people are clustered in central region | |
| central_people = [obj for obj in people_objs | |
| if "middle" in obj["region"]] | |
| if central_people: | |
| zones["central_gathering"] = { | |
| "region": "middle_center", | |
| "objects": ["person"] * len(central_people), | |
| "description": f"Central plaza gathering area with {len(central_people)} people viewed from above" | |
| } | |
| return zones | |
| def _identify_outdoor_general_zones(self, category_regions: Dict, detected_objects: List[Dict], scene_type: str) -> Dict: | |
| """ | |
| Identify functional zones for general outdoor scenes. | |
| Args: | |
| category_regions: Objects grouped by category and region | |
| detected_objects: List of detected objects | |
| scene_type: Specific outdoor scene type | |
| Returns: | |
| Dict: Outdoor functional zones | |
| """ | |
| zones = {} | |
| # Identify pedestrian zones | |
| people_objs = [obj for obj in detected_objects if obj["class_id"] == 0] | |
| if people_objs: | |
| people_regions = {} | |
| for obj in people_objs: | |
| region = obj["region"] | |
| if region not in people_regions: | |
| people_regions[region] = [] | |
| people_regions[region].append(obj) | |
| if people_regions: | |
| # Find main pedestrian areas | |
| main_people_regions = sorted(people_regions.items(), | |
| key=lambda x: len(x[1]), | |
| reverse=True)[:2] # Top 2 regions | |
| for idx, (region, objs) in enumerate(main_people_regions): | |
| if len(objs) > 0: | |
| zones[f"pedestrian_zone_{idx+1}"] = { | |
| "region": region, | |
| "objects": ["person"] * len(objs), | |
| "description": f"Pedestrian area with {len(objs)} {'people' if len(objs) > 1 else 'person'}" | |
| } | |
| # Identify vehicle zones for streets and parking lots | |
| vehicle_objs = [obj for obj in detected_objects if obj["class_id"] in [1, 2, 3, 5, 6, 7]] | |
| if vehicle_objs: | |
| vehicle_regions = {} | |
| for obj in vehicle_objs: | |
| region = obj["region"] | |
| if region not in vehicle_regions: | |
| vehicle_regions[region] = [] | |
| vehicle_regions[region].append(obj) | |
| if vehicle_regions: | |
| main_vehicle_region = max(vehicle_regions.items(), | |
| key=lambda x: len(x[1]), | |
| default=(None, [])) | |
| if main_vehicle_region[0] is not None: | |
| vehicle_types = [obj["class_name"] for obj in main_vehicle_region[1]] | |
| zones["vehicle_zone"] = { | |
| "region": main_vehicle_region[0], | |
| "objects": vehicle_types, | |
| "description": f"Traffic area with {', '.join(list(set(vehicle_types))[:3])}" | |
| } | |
| # For park areas, identify recreational zones | |
| if scene_type == "park_area": | |
| # Look for recreational objects (sports balls, kites, etc.) | |
| rec_items = [] | |
| rec_regions = {} | |
| for obj in detected_objects: | |
| if obj["class_id"] in [32, 33, 34, 35, 38]: # sports ball, kite, baseball bat, glove, tennis racket | |
| region = obj["region"] | |
| if region not in rec_regions: | |
| rec_regions[region] = [] | |
| rec_regions[region].append(obj) | |
| rec_items.append(obj["class_name"]) | |
| if rec_items: | |
| main_rec_region = max(rec_regions.items(), | |
| key=lambda x: len(x[1]), | |
| default=(None, [])) | |
| if main_rec_region[0] is not None: | |
| zones["recreational_zone"] = { | |
| "region": main_rec_region[0], | |
| "objects": list(set(rec_items)), | |
| "description": f"Recreational area with {', '.join(list(set(rec_items)))}" | |
| } | |
| # For parking lots, identify parking zones | |
| if scene_type == "parking_lot": | |
| # Look for parked cars with consistent spacing | |
| car_objs = [obj for obj in detected_objects if obj["class_id"] == 2] # cars | |
| if len(car_objs) >= 3: | |
| # Check if cars are arranged in patterns (simplified) | |
| car_positions = [obj["normalized_center"] for obj in car_objs] | |
| # Check for row patterns by analyzing vertical positions | |
| y_coords = [pos[1] for pos in car_positions] | |
| y_clusters = {} | |
| # Simplified clustering - group cars by similar y-coordinates | |
| for i, y in enumerate(y_coords): | |
| assigned = False | |
| for cluster_y in y_clusters.keys(): | |
| if abs(y - cluster_y) < 0.1: # Within 10% of image height | |
| y_clusters[cluster_y].append(i) | |
| assigned = True | |
| break | |
| if not assigned: | |
| y_clusters[y] = [i] | |
| # If we have row patterns | |
| if max(len(indices) for indices in y_clusters.values()) >= 2: | |
| zones["parking_row"] = { | |
| "region": "central", | |
| "objects": ["car"] * len(car_objs), | |
| "description": f"Organized parking area with vehicles arranged in rows" | |
| } | |
| else: | |
| zones["parking_area"] = { | |
| "region": "wide", | |
| "objects": ["car"] * len(car_objs), | |
| "description": f"Parking area with {len(car_objs)} vehicles" | |
| } | |
| return zones | |
| def _identify_default_zones(self, category_regions: Dict, detected_objects: List[Dict]) -> Dict: | |
| """ | |
| Identify general functional zones when no specific scene type is matched. | |
| Args: | |
| category_regions: Objects grouped by category and region | |
| detected_objects: List of detected objects | |
| Returns: | |
| Dict: Default functional zones | |
| """ | |
| zones = {} | |
| # Group objects by category and find main concentrations | |
| for category, regions in category_regions.items(): | |
| if not regions: | |
| continue | |
| # Find region with most objects in this category | |
| main_region = max(regions.items(), | |
| key=lambda x: len(x[1]), | |
| default=(None, [])) | |
| if main_region[0] is None or len(main_region[1]) < 2: | |
| continue | |
| # Create zone based on object category | |
| zone_objects = [obj["class_name"] for obj in main_region[1]] | |
| # Skip if too few objects | |
| if len(zone_objects) < 2: | |
| continue | |
| # Create appropriate zone name and description based on category | |
| if category == "furniture": | |
| zones["furniture_zone"] = { | |
| "region": main_region[0], | |
| "objects": zone_objects, | |
| "description": f"Area with furniture including {', '.join(zone_objects[:3])}" | |
| } | |
| elif category == "electronics": | |
| zones["electronics_zone"] = { | |
| "region": main_region[0], | |
| "objects": zone_objects, | |
| "description": f"Area with electronic devices including {', '.join(zone_objects[:3])}" | |
| } | |
| elif category == "kitchen_items": | |
| zones["dining_zone"] = { | |
| "region": main_region[0], | |
| "objects": zone_objects, | |
| "description": f"Dining or food area with {', '.join(zone_objects[:3])}" | |
| } | |
| elif category == "vehicles": | |
| zones["vehicle_zone"] = { | |
| "region": main_region[0], | |
| "objects": zone_objects, | |
| "description": f"Area with vehicles including {', '.join(zone_objects[:3])}" | |
| } | |
| elif category == "personal_items": | |
| zones["personal_items_zone"] = { | |
| "region": main_region[0], | |
| "objects": zone_objects, | |
| "description": f"Area with personal items including {', '.join(zone_objects[:3])}" | |
| } | |
| # Check for people groups | |
| people_objs = [obj for obj in detected_objects if obj["class_id"] == 0] | |
| if len(people_objs) >= 2: | |
| people_regions = {} | |
| for obj in people_objs: | |
| region = obj["region"] | |
| if region not in people_regions: | |
| people_regions[region] = [] | |
| people_regions[region].append(obj) | |
| if people_regions: | |
| main_people_region = max(people_regions.items(), | |
| key=lambda x: len(x[1]), | |
| default=(None, [])) | |
| if main_people_region[0] is not None: | |
| zones["people_zone"] = { | |
| "region": main_people_region[0], | |
| "objects": ["person"] * len(main_people_region[1]), | |
| "description": f"Area with {len(main_people_region[1])} people" | |
| } | |
| return zones | |
| def _find_main_region(self, region_objects_dict: Dict) -> str: | |
| """Find the main region with the most objects""" | |
| if not region_objects_dict: | |
| return "unknown" | |
| return max(region_objects_dict.items(), | |
| key=lambda x: len(x[1]), | |
| default=("unknown", []))[0] | |