Spaces:
Runtime error
Runtime error
# Copyright (c) OpenMMLab. All rights reserved. | |
"""Taken from https://github.com/lucidrains/flamingo-pytorch.""" | |
from typing import Optional | |
import torch | |
from einops import rearrange, repeat | |
from torch import einsum, nn | |
def FeedForward(dim, mult: int = 4): | |
"""Feedforward layers. | |
Args: | |
mult (int): Layer expansion muliplier. Defaults to 4. | |
""" | |
inner_dim = int(dim * mult) | |
return nn.Sequential( | |
nn.LayerNorm(dim), | |
nn.Linear(dim, inner_dim, bias=False), | |
nn.GELU(), | |
nn.Linear(inner_dim, dim, bias=False), | |
) | |
class PerceiverAttention(nn.Module): | |
"""Perceiver attetion layers. | |
Args: | |
dim (int): Input dimensions. | |
dim_head (int): Number of dimension heads. Defaults to 64. | |
heads (int): Number of heads. Defaults to 8. | |
""" | |
def __init__(self, *, dim: int, dim_head: int = 64, heads: int = 8): | |
super().__init__() | |
self.scale = dim_head**-0.5 | |
self.heads = heads | |
inner_dim = dim_head * heads | |
self.norm_media = nn.LayerNorm(dim) | |
self.norm_latents = nn.LayerNorm(dim) | |
self.to_q = nn.Linear(dim, inner_dim, bias=False) | |
self.to_kv = nn.Linear(dim, inner_dim * 2, bias=False) | |
self.to_out = nn.Linear(inner_dim, dim, bias=False) | |
def forward(self, x: torch.Tensor, latents: torch.Tensor): | |
"""Forward function. | |
Args: | |
x (torch.Tensor): image features of shape (b, T, n1, D). | |
latent (torch.Tensor): latent features of shape (b, T, n2, D). | |
""" | |
x = self.norm_media(x) | |
latents = self.norm_latents(latents) | |
h = self.heads | |
q = self.to_q(latents) | |
kv_input = torch.cat((x, latents), dim=-2) | |
k, v = self.to_kv(kv_input).chunk(2, dim=-1) | |
q = rearrange(q, 'b t n (h d) -> b h t n d', h=h) | |
k = rearrange(k, 'b t n (h d) -> b h t n d', h=h) | |
v = rearrange(v, 'b t n (h d) -> b h t n d', h=h) | |
q = q * self.scale | |
# attention | |
sim = einsum('... i d, ... j d -> ... i j', q, k) | |
sim = sim - sim.amax(dim=-1, keepdim=True).detach() | |
attn = sim.softmax(dim=-1) | |
out = einsum('... i j, ... j d -> ... i d', attn, v) | |
out = rearrange(out, 'b h t n d -> b t n (h d)', h=h) | |
return self.to_out(out) | |
class PerceiverResampler(nn.Module): | |
"""Perceiver resampler layers. | |
Args: | |
dim (int): Input dimensions. | |
depth (int): Depth of resampler. Defaults to 6. | |
dim_head (int): Number of dimension heads. Defaults to 64. | |
heads (int): Number of heads. Defaults to 8. | |
num_latents (int): Number of latents. Defaults to 64. | |
max_num_media (int, optional): Max number of media. | |
Defaults to None. | |
max_num_frames (int, optional): Max number of frames. | |
Defaults to None. | |
ff_mult (int): Feed forward multiplier. Defaults to 4. | |
""" | |
def __init__( | |
self, | |
*, | |
dim: int, | |
depth: int = 6, | |
dim_head: int = 64, | |
heads: int = 8, | |
num_latents: int = 64, | |
max_num_media: Optional[int] = None, | |
max_num_frames: Optional[int] = None, | |
ff_mult: int = 4, | |
): | |
super().__init__() | |
self.latents = nn.Parameter(torch.randn(num_latents, dim)) | |
self.frame_embs = ( | |
nn.Parameter(torch.randn(max_num_frames, dim)) | |
if max_num_frames is not None else None) | |
self.media_time_embs = ( | |
nn.Parameter(torch.randn(max_num_media, 1, dim)) | |
if max_num_media is not None else None) | |
self.layers = nn.ModuleList([]) | |
for _ in range(depth): | |
self.layers.append( | |
nn.ModuleList([ | |
PerceiverAttention( | |
dim=dim, dim_head=dim_head, heads=heads), | |
FeedForward(dim=dim, mult=ff_mult), | |
])) | |
self.norm = nn.LayerNorm(dim) | |
def forward(self, x: torch.Tensor): | |
"""Forward function for perceiver sampler. | |
Args: | |
x (torch.Tensor): image features of shape (b, T, F, v, D) | |
Returns: | |
torch.Tensor: shape (b, T, n, D) where n is self.num_latents | |
""" | |
b, T, F, v = x.shape[:4] | |
# frame and media time embeddings | |
if self.frame_embs is not None: | |
frame_embs = repeat( | |
self.frame_embs[:F], 'F d -> b T F v d', b=b, T=T, v=v) | |
x = x + frame_embs | |
x = rearrange(x, 'b T F v d -> b T (F v) d' | |
) # flatten the frame and spatial dimensions | |
if self.media_time_embs is not None: | |
x = x + self.media_time_embs[:T] | |
# blocks | |
latents = repeat(self.latents, 'n d -> b T n d', b=b, T=T) | |
for attn, ff in self.layers: | |
latents = attn(x, latents) + latents | |
latents = ff(latents) + latents | |
return self.norm(latents) | |
class MaskedCrossAttention(nn.Module): | |
"""Masked cross attention layers. | |
Args: | |
dim (int): Input text feature dimensions. | |
dim_visual (int): Input visual feature dimensions. | |
dim_head (int): Number of dimension heads. Defaults to 64. | |
heads (int): Number of heads. Defaults to 8. | |
only_attend_immediate_media (bool): Whether attend immediate media. | |
Defaults to True. | |
""" | |
def __init__( | |
self, | |
*, | |
dim: int, | |
dim_visual: int, | |
dim_head: int = 64, | |
heads: int = 8, | |
only_attend_immediate_media: bool = True, | |
): | |
super().__init__() | |
self.scale = dim_head**-0.5 | |
self.heads = heads | |
inner_dim = dim_head * heads | |
self.norm = nn.LayerNorm(dim) | |
self.to_q = nn.Linear(dim, inner_dim, bias=False) | |
self.to_kv = nn.Linear(dim_visual, inner_dim * 2, bias=False) | |
self.to_out = nn.Linear(inner_dim, dim, bias=False) | |
# whether for text to only attend to immediate preceding image | |
# or all previous images | |
self.only_attend_immediate_media = only_attend_immediate_media | |
def forward(self, | |
x: torch.Tensor, | |
media: torch.Tensor, | |
media_locations: Optional[torch.Tensor] = None, | |
attend_previous: bool = True): | |
"""Forward function for perceiver sampler. | |
Args: | |
x (torch.Tensor): text features of shape (B, T_txt, D_txt). | |
media (torch.Tensor): image features of shape | |
(B, T_img, n, D_img) where n is the dim of the latents. | |
media_locations (torch.Tensor, optional): boolean mask identifying | |
the media tokens in x of shape (B, T_txt). Defaults to None. | |
attend_previous (bool): If false, ignores immediately preceding | |
image and starts attending when following image. | |
Defaults to True. | |
""" | |
_, T_img, n = media.shape[:3] | |
h = self.heads | |
x = self.norm(x) | |
q = self.to_q(x) | |
media = rearrange(media, 'b t n d -> b (t n) d') | |
k, v = self.to_kv(media).chunk(2, dim=-1) | |
q = rearrange(q, 'b n (h d) -> b h n d', h=h) | |
k = rearrange(k, 'b n (h d) -> b h n d', h=h) | |
v = rearrange(v, 'b n (h d) -> b h n d', h=h) | |
q = q * self.scale | |
sim = einsum('... i d, ... j d -> ... i j', q, k) | |
if media_locations is not None: | |
# at each boolean of True, increment the time counter | |
# (relative to media time) | |
text_time = media_locations.cumsum(dim=-1) | |
media_time = torch.arange(T_img, device=x.device) + 1 | |
if not attend_previous: | |
text_time[~media_locations] += 1 | |
# make sure max is still the number of images in the sequence | |
text_time[text_time > repeat( | |
torch.count_nonzero(media_locations, dim=1), | |
'b -> b i', | |
i=text_time.shape[1], | |
)] = 0 | |
# text time must equal media time if only attending to most | |
# immediate image otherwise, as long as text time is greater than | |
# media time (if attending to all previous images / media) | |
mask_op = torch.eq if self.only_attend_immediate_media else torch.ge # noqa | |
text_to_media_mask = mask_op( | |
rearrange(text_time, 'b i -> b 1 i 1'), | |
repeat(media_time, 'j -> 1 1 1 (j n)', n=n), | |
) | |
sim = sim.masked_fill(~text_to_media_mask, | |
-torch.finfo(sim.dtype).max) | |
sim = sim - sim.amax(dim=-1, keepdim=True).detach() | |
attn = sim.softmax(dim=-1) | |
if media_locations is not None and self.only_attend_immediate_media: | |
# any text without a preceding media needs to have | |
# attention zeroed out | |
text_without_media_mask = text_time == 0 | |
text_without_media_mask = rearrange(text_without_media_mask, | |
'b i -> b 1 i 1') | |
attn = attn.masked_fill(text_without_media_mask, 0.0) | |
out = einsum('... i j, ... j d -> ... i d', attn, v) | |
out = rearrange(out, 'b h n d -> b n (h d)') | |
return self.to_out(out) | |
class GatedCrossAttentionBlock(nn.Module): | |
"""Gated cross attention layers. | |
Args: | |
dim (int): Input text feature dimensions. | |
dim_visual (int): Input visual feature dimensions. | |
dim_head (int): Number of dimension heads. Defaults to 64. | |
heads (int): Number of heads. Defaults to 8. | |
ff_mult (int): Feed forward multiplier. Defaults to 4. | |
only_attend_immediate_media (bool): Whether attend immediate media. | |
Defaults to True. | |
""" | |
def __init__( | |
self, | |
*, | |
dim: int, | |
dim_visual: int, | |
dim_head: int = 64, | |
heads: int = 8, | |
ff_mult: int = 4, | |
only_attend_immediate_media: bool = True, | |
): | |
super().__init__() | |
self.attn = MaskedCrossAttention( | |
dim=dim, | |
dim_visual=dim_visual, | |
dim_head=dim_head, | |
heads=heads, | |
only_attend_immediate_media=only_attend_immediate_media, | |
) | |
self.attn_gate = nn.Parameter(torch.tensor([0.0])) | |
self.ff = FeedForward(dim, mult=ff_mult) | |
self.ff_gate = nn.Parameter(torch.tensor([0.0])) | |
def forward(self, | |
x: torch.Tensor, | |
media: torch.Tensor, | |
media_locations: Optional[torch.Tensor] = None, | |
attend_previous: bool = True): | |
"""Forward function for perceiver sampler. | |
Args: | |
x (torch.Tensor): text features of shape (B, T_txt, D_txt). | |
media (torch.Tensor): image features of shape | |
(B, T_img, n, D_img) where n is the dim of the latents. | |
media_locations (torch.Tensor, optional): boolean mask identifying | |
the media tokens in x of shape (B, T_txt). Defaults to None. | |
attend_previous (bool): If false, ignores immediately preceding | |
image and starts attending when following image. | |
Defaults to True. | |
""" | |
x = ( | |
self.attn( | |
x, | |
media, | |
media_locations=media_locations, | |
attend_previous=attend_previous, | |
) * self.attn_gate.tanh() + x) | |
x = self.ff(x) * self.ff_gate.tanh() + x | |
return x | |
class FlamingoLayer(nn.Module): | |
"""Faminogo layers. | |
Args: | |
gated_cross_attn_layer (nn.Module): Gated cross attention layer. | |
decoder_layer (nn.Module): Decoder layer. | |
""" | |
def __init__(self, gated_cross_attn_layer: nn.Module, | |
decoder_layer: nn.Module): | |
super().__init__() | |
self.gated_cross_attn_layer = gated_cross_attn_layer | |
self.decoder_layer = decoder_layer | |
self.vis_x = None | |
self.media_locations = None | |
def is_conditioned(self) -> bool: | |
"""Check whether the layer is conditioned.""" | |
return self.vis_x is not None | |
def condition_vis_x(self, vis_x): | |
"""Set condition vision features.""" | |
self.vis_x = vis_x | |
def condition_media_locations(self, media_locations): | |
"""Set condition media locations.""" | |
self.media_locations = media_locations | |
def condition_attend_previous(self, attend_previous): | |
"""Set attend previous.""" | |
self.attend_previous = attend_previous | |
def forward( | |
self, | |
lang_x: torch.Tensor, | |
attention_mask: Optional[torch.Tensor] = None, | |
**decoder_layer_kwargs, | |
): | |
"""Forward function. | |
Args: | |
lang_x (torch.Tensor): language inputs. | |
attention_mask (torch.Tensor, optional): text attention mask. | |
Defaults to None. | |
**decoder_layer_kwargs: Other decoder layer keyword arguments. | |
""" | |
if self.gated_cross_attn_layer is None: | |
return self.decoder_layer( | |
lang_x, attention_mask=attention_mask, **decoder_layer_kwargs) | |
if self.vis_x is None: | |
raise ValueError('vis_x must be conditioned before forward pass') | |
if self.media_locations is None: | |
raise ValueError( | |
'media_locations must be conditioned before forward pass') | |
lang_x = self.gated_cross_attn_layer( | |
lang_x, | |
self.vis_x, | |
media_locations=self.media_locations, | |
attend_previous=self.attend_previous, | |
) | |
lang_x = self.decoder_layer( | |
lang_x, attention_mask=attention_mask, **decoder_layer_kwargs) | |
return lang_x | |