# coding=utf-8 # Copyright 2024 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Processor class for Spec-Vision. """ import re from typing import List, Optional, Union import numpy as np import torch import torchvision from PIL import Image from transformers import AutoImageProcessor from transformers.feature_extraction_utils import BatchFeature from transformers.image_processing_utils import BaseImageProcessor from transformers.image_transforms import convert_to_rgb from transformers.image_utils import (OPENAI_CLIP_MEAN, OPENAI_CLIP_STD, ImageInput, make_list_of_images, valid_images) from transformers.processing_utils import ProcessorMixin from transformers.tokenization_utils_base import (PaddingStrategy, TextInput, TruncationStrategy) from transformers.utils import TensorType, is_vision_available, logging logger = logging.get_logger(__name__) def padding_336(image): """Apply padding to make height a multiple of 336 while preserving aspect ratio.""" width, height = image.size target_height = int(np.ceil(height / 336) * 336) top_padding = int((target_height - height) / 2) bottom_padding = target_height - height - top_padding padded_image = torchvision.transforms.functional.pad( image, [0, top_padding, 0, bottom_padding], fill=[255, 255, 255] ) return padded_image def calc_padded_size(width, height, padding_unit=336): """Calculate the padded dimensions for an image.""" target_height = int(np.ceil(height / padding_unit) * padding_unit) padded_width = width padded_height = target_height return padded_width, padded_height def hd_transform(img, hd_num=16): """Apply HD transformation with support for Spec-Vision's requirements.""" width, height = img.size transposed = False # Handle portrait images by transposing if width < height: img = img.transpose(Image.TRANSPOSE) width, height = img.size transposed = True ratio = width / height scale = 1 while scale * np.ceil(scale / ratio) <= hd_num: scale += 1 scale -= 1 new_width = int(scale * 336) new_height = int(new_width / ratio) # Resize and pad img = torchvision.transforms.functional.resize(img, [new_height, new_width]) img = padding_336(img) # Restore original orientation if needed if transposed: img = img.transpose(Image.TRANSPOSE) return img def pad_to_max_crops(images, max_crops=5): """Pad batch of images to have consistent number of crops.""" B, _, H, W = images.shape if B < max_crops: padding = torch.zeros(max_crops - B, 3, H, W, dtype=images.dtype, device=images.device) images = torch.cat([images, padding], dim=0) return images class SpecVisionImageProcessor(BaseImageProcessor): """ Image processor for Spec-Vision model. This processor handles the preparation of images for the Spec-Vision model, including: - HD transformation for high-resolution image processing - Multi-crop processing with configurable number of crops - Normalization and padding """ model_input_names = ["pixel_values"] def __init__( self, num_crops: int = 1, image_mean: Optional[Union[float, List[float]]] = None, image_std: Optional[Union[float, List[float]]] = None, do_convert_rgb: bool = True, hd_transform_order: str = "sub_glb", **kwargs, ) -> None: super().__init__(**kwargs) self.num_crops = num_crops self.image_mean = image_mean if image_mean is not None else OPENAI_CLIP_MEAN self.image_std = image_std if image_std is not None else OPENAI_CLIP_STD self.do_convert_rgb = do_convert_rgb self.hd_transform_order = hd_transform_order def calc_num_image_tokens(self, images: ImageInput) -> List[int]: """Calculate number of image tokens needed for each image.""" images = make_list_of_images(images) if not valid_images(images): raise ValueError("Invalid image type provided") images = [image.convert('RGB') for image in images] transformed_images = [hd_transform(im, hd_num=self.num_crops) for im in images] shapes = [[im.size[1], im.size[0]] for im in transformed_images] # Calculate tokens based on Spec-Vision's architecture num_img_tokens = [ int((h//336 * w//336 + 1) * 144 + 1 + (h//336 + 1) * 12) for h, w in shapes ] return num_img_tokens def preprocess( self, images: ImageInput, image_mean: Optional[Union[float, List[float]]] = None, image_std: Optional[Union[float, List[float]]] = None, do_convert_rgb: bool = None, return_tensors: Optional[Union[str, TensorType]] = None, ) -> BatchFeature: """ Preprocess images for the Spec-Vision model. Handles HD transformation, normalization, and proper formatting of images according to Spec-Vision's requirements. """ image_mean = image_mean if image_mean is not None else self.image_mean image_std = image_std if image_std is not None else self.image_std do_convert_rgb = do_convert_rgb if do_convert_rgb is not None else self.do_convert_rgb # Validate and prepare images images = make_list_of_images(images) if not valid_images(images): raise ValueError("Invalid image type provided") if do_convert_rgb: images = [convert_to_rgb(image) for image in images] # Create image processor pipeline img_processor = torchvision.transforms.Compose([ torchvision.transforms.ToTensor(), torchvision.transforms.Normalize(image_mean, image_std) ]) # Process images according to Spec-Vision's HD transform requirements images = [image.convert('RGB') for image in images] transformed_images = [hd_transform(im, hd_num=self.num_crops) for im in images] # Convert to tensors and normalize hd_images = [img_processor(im) for im in transformed_images] # Create global views global_images = [ torch.nn.functional.interpolate( im.unsqueeze(0).float(), size=(336, 336), mode='bicubic' ).to(im.dtype) for im in hd_images ] # Process shapes and calculate tokens shapes = [[im.size(1), im.size(2)] for im in hd_images] num_img_tokens = [ int(((h//336) * (w//336) + 1) * 144 + 1 + (h//336 + 1) * 12) for h, w in shapes ] # Reshape images according to Spec-Vision's requirements hd_images_reshaped = [ im.reshape(1, 3, h//336, 336, w//336, 336) .permute(0, 2, 4, 1, 3, 5) .reshape(-1, 3, 336, 336) .contiguous() for im, (h, w) in zip(hd_images, shapes) ] # Combine global and local views based on transform order if self.hd_transform_order == "sub_glb": processed_images = [ torch.cat([_im, _global_image], dim=0) for _global_image, _im in zip(global_images, hd_images_reshaped) ] else: # glb_sub processed_images = [ torch.cat([_global_image, _im], dim=0) for _global_image, _im in zip(global_images, hd_images_reshaped) ] # Pad to consistent number of crops image_batch = [ pad_to_max_crops(im, self.num_crops + 1) for im in processed_images ] image_batch = torch.stack(image_batch, dim=0) return BatchFeature( data={ "pixel_values": image_batch, "image_sizes": shapes, "num_img_tokens": num_img_tokens }, tensor_type=return_tensors ) class SpecVisionProcessor(ProcessorMixin): """ Combined processor for Spec-Vision model, handling both image and text inputs. Combines SpecVisionImageProcessor for images and a tokenizer for text, coordinating their interaction for multi-modal inputs. """ attributes = ["image_processor", "tokenizer"] image_processor_class = "SpecVisionImageProcessor" tokenizer_class = ("LlamaTokenizer", "LlamaTokenizerFast") special_image_token = "<|image|>" def __init__(self, image_processor, tokenizer): self.image_processor = image_processor self.tokenizer = tokenizer self.num_img_tokens = image_processor.num_crops self.img_tokens = [f"<|image_{i+1}|>" for i in range(1000000)] def __call__( self, text: Union[TextInput, List[TextInput]], images: ImageInput = None, padding: Union[bool, str, PaddingStrategy] = False, truncation: Union[bool, str, TruncationStrategy] = None, max_length=None, return_tensors: Optional[Union[str, TensorType]] = TensorType.PYTORCH, ) -> BatchFeature: """Process both text and image inputs for the model.""" if images is not None: image_features = self.image_processor(images, return_tensors=return_tensors) else: image_features = {} # Process combined inputs inputs = self._process_multimodal_inputs( image_features, text, padding=padding, truncation=truncation, max_length=max_length, return_tensors=return_tensors ) return inputs def _process_multimodal_inputs(self, images, texts, **kwargs): """Process and combine image and text inputs.""" if not images: return BatchFeature(data=self.tokenizer( texts, return_tensors=kwargs.get('return_tensors'), padding=kwargs.get('padding'), truncation=kwargs.get('truncation'), max_length=kwargs.get('max_length') )) # Process text chunks and image tags pattern = r"<\|image_\d+\|>" text_chunks = [ self.tokenizer(chunk).input_ids for chunk in re.split(pattern, texts) ] # Handle image tokens num_img_tokens = ( images['num_img_tokens'] if 'num_img_tokens' in images else [self.num_img_tokens] * len(images['pixel_values']) ) image_tags = re.findall(pattern, texts) image_ids = [int(tag.split("|")[1].split("_")[-1]) for tag in image_tags] # Validate image IDs unique_ids = sorted(set(image_ids)) if unique_ids != list(range(1, len(unique_ids) + 1)): raise ValueError( f"Image IDs must be consecutive integers starting from 1, got {unique_ids}" ) if len(unique_ids) != len(images['pixel_values']): raise ValueError( f"Number of image tags ({len(unique_ids)}) doesn't match " f"number of images ({len(images['pixel_values'])})" ) # Create padded image IDs image_ids_padded = [ [-iid] * num_img_tokens[iid-1] for iid in image_ids ] # Combine text and image tokens input_ids = [] for x in self._interleave_sequences(text_chunks, image_ids_padded): input_ids.extend(x) input_ids = torch.tensor(input_ids, dtype=torch.long).unsqueeze(0) attention_mask = (input_ids > -1000000).to(torch.long) return BatchFeature(data={ "input_ids": input_ids, "attention_mask": attention_mask, "pixel_values": images['pixel_values'], "image_sizes": images['image_sizes'] }) def _interleave_sequences(self, seq1, seq2): """Interleave two sequences, padding second sequence if needed.""" if len(seq1) > len(seq2): seq2.append([]) return [item for pair in zip(seq1, seq2) for item in pair] def batch_decode(self, *args, **kwargs): """Decode a batch of token IDs to text.""" return self.tokenizer.batch_decode(*args, **kwargs) def decode(self, *args, **kwargs): """Decode token IDs to text.""" return self.tokenizer.decode(*args, **kwargs) @property def model_input_names(self): """Get combined input names from both processors.""" return list(dict.fromkeys( self.tokenizer.model_input_names + self.image_processor.model_input_names )) # Register the processor with AutoImageProcessor AutoImageProcessor.register("SpecVisionImageProcessor", SpecVisionImageProcessor)