KyanChen's picture
Upload 1861 files
3b96cb1
# Copyright (c) OpenMMLab. All rights reserved.
"""Taken from https://github.com/lucidrains/flamingo-pytorch."""
from typing import Optional
import torch
from einops import rearrange, repeat
from torch import einsum, nn
def FeedForward(dim, mult: int = 4):
"""Feedforward layers.
Args:
mult (int): Layer expansion muliplier. Defaults to 4.
"""
inner_dim = int(dim * mult)
return nn.Sequential(
nn.LayerNorm(dim),
nn.Linear(dim, inner_dim, bias=False),
nn.GELU(),
nn.Linear(inner_dim, dim, bias=False),
)
class PerceiverAttention(nn.Module):
"""Perceiver attetion layers.
Args:
dim (int): Input dimensions.
dim_head (int): Number of dimension heads. Defaults to 64.
heads (int): Number of heads. Defaults to 8.
"""
def __init__(self, *, dim: int, dim_head: int = 64, heads: int = 8):
super().__init__()
self.scale = dim_head**-0.5
self.heads = heads
inner_dim = dim_head * heads
self.norm_media = nn.LayerNorm(dim)
self.norm_latents = nn.LayerNorm(dim)
self.to_q = nn.Linear(dim, inner_dim, bias=False)
self.to_kv = nn.Linear(dim, inner_dim * 2, bias=False)
self.to_out = nn.Linear(inner_dim, dim, bias=False)
def forward(self, x: torch.Tensor, latents: torch.Tensor):
"""Forward function.
Args:
x (torch.Tensor): image features of shape (b, T, n1, D).
latent (torch.Tensor): latent features of shape (b, T, n2, D).
"""
x = self.norm_media(x)
latents = self.norm_latents(latents)
h = self.heads
q = self.to_q(latents)
kv_input = torch.cat((x, latents), dim=-2)
k, v = self.to_kv(kv_input).chunk(2, dim=-1)
q = rearrange(q, 'b t n (h d) -> b h t n d', h=h)
k = rearrange(k, 'b t n (h d) -> b h t n d', h=h)
v = rearrange(v, 'b t n (h d) -> b h t n d', h=h)
q = q * self.scale
# attention
sim = einsum('... i d, ... j d -> ... i j', q, k)
sim = sim - sim.amax(dim=-1, keepdim=True).detach()
attn = sim.softmax(dim=-1)
out = einsum('... i j, ... j d -> ... i d', attn, v)
out = rearrange(out, 'b h t n d -> b t n (h d)', h=h)
return self.to_out(out)
class PerceiverResampler(nn.Module):
"""Perceiver resampler layers.
Args:
dim (int): Input dimensions.
depth (int): Depth of resampler. Defaults to 6.
dim_head (int): Number of dimension heads. Defaults to 64.
heads (int): Number of heads. Defaults to 8.
num_latents (int): Number of latents. Defaults to 64.
max_num_media (int, optional): Max number of media.
Defaults to None.
max_num_frames (int, optional): Max number of frames.
Defaults to None.
ff_mult (int): Feed forward multiplier. Defaults to 4.
"""
def __init__(
self,
*,
dim: int,
depth: int = 6,
dim_head: int = 64,
heads: int = 8,
num_latents: int = 64,
max_num_media: Optional[int] = None,
max_num_frames: Optional[int] = None,
ff_mult: int = 4,
):
super().__init__()
self.latents = nn.Parameter(torch.randn(num_latents, dim))
self.frame_embs = (
nn.Parameter(torch.randn(max_num_frames, dim))
if max_num_frames is not None else None)
self.media_time_embs = (
nn.Parameter(torch.randn(max_num_media, 1, dim))
if max_num_media is not None else None)
self.layers = nn.ModuleList([])
for _ in range(depth):
self.layers.append(
nn.ModuleList([
PerceiverAttention(
dim=dim, dim_head=dim_head, heads=heads),
FeedForward(dim=dim, mult=ff_mult),
]))
self.norm = nn.LayerNorm(dim)
def forward(self, x: torch.Tensor):
"""Forward function for perceiver sampler.
Args:
x (torch.Tensor): image features of shape (b, T, F, v, D)
Returns:
torch.Tensor: shape (b, T, n, D) where n is self.num_latents
"""
b, T, F, v = x.shape[:4]
# frame and media time embeddings
if self.frame_embs is not None:
frame_embs = repeat(
self.frame_embs[:F], 'F d -> b T F v d', b=b, T=T, v=v)
x = x + frame_embs
x = rearrange(x, 'b T F v d -> b T (F v) d'
) # flatten the frame and spatial dimensions
if self.media_time_embs is not None:
x = x + self.media_time_embs[:T]
# blocks
latents = repeat(self.latents, 'n d -> b T n d', b=b, T=T)
for attn, ff in self.layers:
latents = attn(x, latents) + latents
latents = ff(latents) + latents
return self.norm(latents)
class MaskedCrossAttention(nn.Module):
"""Masked cross attention layers.
Args:
dim (int): Input text feature dimensions.
dim_visual (int): Input visual feature dimensions.
dim_head (int): Number of dimension heads. Defaults to 64.
heads (int): Number of heads. Defaults to 8.
only_attend_immediate_media (bool): Whether attend immediate media.
Defaults to True.
"""
def __init__(
self,
*,
dim: int,
dim_visual: int,
dim_head: int = 64,
heads: int = 8,
only_attend_immediate_media: bool = True,
):
super().__init__()
self.scale = dim_head**-0.5
self.heads = heads
inner_dim = dim_head * heads
self.norm = nn.LayerNorm(dim)
self.to_q = nn.Linear(dim, inner_dim, bias=False)
self.to_kv = nn.Linear(dim_visual, inner_dim * 2, bias=False)
self.to_out = nn.Linear(inner_dim, dim, bias=False)
# whether for text to only attend to immediate preceding image
# or all previous images
self.only_attend_immediate_media = only_attend_immediate_media
def forward(self,
x: torch.Tensor,
media: torch.Tensor,
media_locations: Optional[torch.Tensor] = None,
attend_previous: bool = True):
"""Forward function for perceiver sampler.
Args:
x (torch.Tensor): text features of shape (B, T_txt, D_txt).
media (torch.Tensor): image features of shape
(B, T_img, n, D_img) where n is the dim of the latents.
media_locations (torch.Tensor, optional): boolean mask identifying
the media tokens in x of shape (B, T_txt). Defaults to None.
attend_previous (bool): If false, ignores immediately preceding
image and starts attending when following image.
Defaults to True.
"""
_, T_img, n = media.shape[:3]
h = self.heads
x = self.norm(x)
q = self.to_q(x)
media = rearrange(media, 'b t n d -> b (t n) d')
k, v = self.to_kv(media).chunk(2, dim=-1)
q = rearrange(q, 'b n (h d) -> b h n d', h=h)
k = rearrange(k, 'b n (h d) -> b h n d', h=h)
v = rearrange(v, 'b n (h d) -> b h n d', h=h)
q = q * self.scale
sim = einsum('... i d, ... j d -> ... i j', q, k)
if media_locations is not None:
# at each boolean of True, increment the time counter
# (relative to media time)
text_time = media_locations.cumsum(dim=-1)
media_time = torch.arange(T_img, device=x.device) + 1
if not attend_previous:
text_time[~media_locations] += 1
# make sure max is still the number of images in the sequence
text_time[text_time > repeat(
torch.count_nonzero(media_locations, dim=1),
'b -> b i',
i=text_time.shape[1],
)] = 0
# text time must equal media time if only attending to most
# immediate image otherwise, as long as text time is greater than
# media time (if attending to all previous images / media)
mask_op = torch.eq if self.only_attend_immediate_media else torch.ge # noqa
text_to_media_mask = mask_op(
rearrange(text_time, 'b i -> b 1 i 1'),
repeat(media_time, 'j -> 1 1 1 (j n)', n=n),
)
sim = sim.masked_fill(~text_to_media_mask,
-torch.finfo(sim.dtype).max)
sim = sim - sim.amax(dim=-1, keepdim=True).detach()
attn = sim.softmax(dim=-1)
if media_locations is not None and self.only_attend_immediate_media:
# any text without a preceding media needs to have
# attention zeroed out
text_without_media_mask = text_time == 0
text_without_media_mask = rearrange(text_without_media_mask,
'b i -> b 1 i 1')
attn = attn.masked_fill(text_without_media_mask, 0.0)
out = einsum('... i j, ... j d -> ... i d', attn, v)
out = rearrange(out, 'b h n d -> b n (h d)')
return self.to_out(out)
class GatedCrossAttentionBlock(nn.Module):
"""Gated cross attention layers.
Args:
dim (int): Input text feature dimensions.
dim_visual (int): Input visual feature dimensions.
dim_head (int): Number of dimension heads. Defaults to 64.
heads (int): Number of heads. Defaults to 8.
ff_mult (int): Feed forward multiplier. Defaults to 4.
only_attend_immediate_media (bool): Whether attend immediate media.
Defaults to True.
"""
def __init__(
self,
*,
dim: int,
dim_visual: int,
dim_head: int = 64,
heads: int = 8,
ff_mult: int = 4,
only_attend_immediate_media: bool = True,
):
super().__init__()
self.attn = MaskedCrossAttention(
dim=dim,
dim_visual=dim_visual,
dim_head=dim_head,
heads=heads,
only_attend_immediate_media=only_attend_immediate_media,
)
self.attn_gate = nn.Parameter(torch.tensor([0.0]))
self.ff = FeedForward(dim, mult=ff_mult)
self.ff_gate = nn.Parameter(torch.tensor([0.0]))
def forward(self,
x: torch.Tensor,
media: torch.Tensor,
media_locations: Optional[torch.Tensor] = None,
attend_previous: bool = True):
"""Forward function for perceiver sampler.
Args:
x (torch.Tensor): text features of shape (B, T_txt, D_txt).
media (torch.Tensor): image features of shape
(B, T_img, n, D_img) where n is the dim of the latents.
media_locations (torch.Tensor, optional): boolean mask identifying
the media tokens in x of shape (B, T_txt). Defaults to None.
attend_previous (bool): If false, ignores immediately preceding
image and starts attending when following image.
Defaults to True.
"""
x = (
self.attn(
x,
media,
media_locations=media_locations,
attend_previous=attend_previous,
) * self.attn_gate.tanh() + x)
x = self.ff(x) * self.ff_gate.tanh() + x
return x
class FlamingoLayer(nn.Module):
"""Faminogo layers.
Args:
gated_cross_attn_layer (nn.Module): Gated cross attention layer.
decoder_layer (nn.Module): Decoder layer.
"""
def __init__(self, gated_cross_attn_layer: nn.Module,
decoder_layer: nn.Module):
super().__init__()
self.gated_cross_attn_layer = gated_cross_attn_layer
self.decoder_layer = decoder_layer
self.vis_x = None
self.media_locations = None
def is_conditioned(self) -> bool:
"""Check whether the layer is conditioned."""
return self.vis_x is not None
def condition_vis_x(self, vis_x):
"""Set condition vision features."""
self.vis_x = vis_x
def condition_media_locations(self, media_locations):
"""Set condition media locations."""
self.media_locations = media_locations
def condition_attend_previous(self, attend_previous):
"""Set attend previous."""
self.attend_previous = attend_previous
def forward(
self,
lang_x: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None,
**decoder_layer_kwargs,
):
"""Forward function.
Args:
lang_x (torch.Tensor): language inputs.
attention_mask (torch.Tensor, optional): text attention mask.
Defaults to None.
**decoder_layer_kwargs: Other decoder layer keyword arguments.
"""
if self.gated_cross_attn_layer is None:
return self.decoder_layer(
lang_x, attention_mask=attention_mask, **decoder_layer_kwargs)
if self.vis_x is None:
raise ValueError('vis_x must be conditioned before forward pass')
if self.media_locations is None:
raise ValueError(
'media_locations must be conditioned before forward pass')
lang_x = self.gated_cross_attn_layer(
lang_x,
self.vis_x,
media_locations=self.media_locations,
attend_previous=self.attend_previous,
)
lang_x = self.decoder_layer(
lang_x, attention_mask=attention_mask, **decoder_layer_kwargs)
return lang_x