deanna-emery's picture
updates
93528c6
# Copyright 2023 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains definitions of dense prediction heads."""
from typing import Any, Dict, List, Mapping, Optional, Union
# Import libraries
import numpy as np
import tensorflow as tf, tf_keras
from official.modeling import tf_utils
@tf_keras.utils.register_keras_serializable(package='Vision')
class RetinaNetHead(tf_keras.layers.Layer):
"""Creates a RetinaNet head."""
def __init__(
self,
min_level: int,
max_level: int,
num_classes: int,
num_anchors_per_location: int,
num_convs: int = 4,
num_filters: int = 256,
attribute_heads: Optional[List[Dict[str, Any]]] = None,
share_classification_heads: bool = False,
use_separable_conv: bool = False,
activation: str = 'relu',
use_sync_bn: bool = False,
norm_momentum: float = 0.99,
norm_epsilon: float = 0.001,
kernel_regularizer: Optional[tf_keras.regularizers.Regularizer] = None,
bias_regularizer: Optional[tf_keras.regularizers.Regularizer] = None,
num_params_per_anchor: int = 4,
share_level_convs: bool = True,
**kwargs,
):
"""Initializes a RetinaNet head.
Args:
min_level: An `int` number of minimum feature level.
max_level: An `int` number of maximum feature level.
num_classes: An `int` number of classes to predict.
num_anchors_per_location: An `int` number of anchors per pixel location.
num_convs: An `int` number that represents the number of the intermediate
conv layers before the prediction.
num_filters: An `int` number that represents the number of filters of the
intermediate conv layers.
attribute_heads: If not None, a list that contains a dict for each
additional attribute head. Each dict consists of 4 key-value pairs:
`name`, `type` ('regression' or 'classification'), `size` (number of
predicted values for each instance), and `prediction_tower_name`
(optional, specifies shared prediction towers.)
share_classification_heads: A `bool` that indicates whether sharing
weights among the main and attribute classification heads.
use_separable_conv: A `bool` that indicates whether the separable
convolution layers is used.
activation: A `str` that indicates which activation is used, e.g. 'relu',
'swish', etc.
use_sync_bn: A `bool` that indicates whether to use synchronized batch
normalization across different replicas.
norm_momentum: A `float` of normalization momentum for the moving average.
norm_epsilon: A `float` added to variance to avoid dividing by zero.
kernel_regularizer: A `tf_keras.regularizers.Regularizer` object for
Conv2D. Default is None.
bias_regularizer: A `tf_keras.regularizers.Regularizer` object for Conv2D.
num_params_per_anchor: Number of parameters required to specify an anchor
box. For example, `num_params_per_anchor` would be 4 for axis-aligned
anchor boxes specified by their y-centers, x-centers, heights, and
widths.
share_level_convs: An optional bool to enable sharing convs
across levels for classnet, boxnet, classifier and box regressor.
If True, convs will be shared across all levels.
**kwargs: Additional keyword arguments to be passed.
"""
super().__init__(**kwargs)
self._config_dict = {
'min_level': min_level,
'max_level': max_level,
'num_classes': num_classes,
'num_anchors_per_location': num_anchors_per_location,
'num_convs': num_convs,
'num_filters': num_filters,
'attribute_heads': attribute_heads,
'share_classification_heads': share_classification_heads,
'use_separable_conv': use_separable_conv,
'activation': activation,
'use_sync_bn': use_sync_bn,
'norm_momentum': norm_momentum,
'norm_epsilon': norm_epsilon,
'kernel_regularizer': kernel_regularizer,
'bias_regularizer': bias_regularizer,
'num_params_per_anchor': num_params_per_anchor,
'share_level_convs': share_level_convs,
}
if tf_keras.backend.image_data_format() == 'channels_last':
self._bn_axis = -1
else:
self._bn_axis = 1
self._activation = tf_utils.get_activation(activation)
self._conv_kwargs = {
'filters': self._config_dict['num_filters'],
'kernel_size': 3,
'padding': 'same',
'bias_initializer': tf.zeros_initializer(),
'bias_regularizer': self._config_dict['bias_regularizer'],
}
if not self._config_dict['use_separable_conv']:
self._conv_kwargs.update({
'kernel_initializer': tf_keras.initializers.RandomNormal(stddev=0.01),
'kernel_regularizer': self._config_dict['kernel_regularizer'],
})
self._bn_kwargs = {
'axis': self._bn_axis,
'momentum': self._config_dict['norm_momentum'],
'epsilon': self._config_dict['norm_epsilon'],
}
self._classifier_kwargs = {
'filters': (
self._config_dict['num_classes']
* self._config_dict['num_anchors_per_location']
),
'kernel_size': 3,
'padding': 'same',
'bias_initializer': tf.constant_initializer(-np.log((1 - 0.01) / 0.01)),
'bias_regularizer': self._config_dict['bias_regularizer'],
}
if not self._config_dict['use_separable_conv']:
self._classifier_kwargs.update({
'kernel_initializer': tf_keras.initializers.RandomNormal(stddev=1e-5),
'kernel_regularizer': self._config_dict['kernel_regularizer'],
})
self._box_regressor_kwargs = {
'filters': (
self._config_dict['num_params_per_anchor']
* self._config_dict['num_anchors_per_location']
),
'kernel_size': 3,
'padding': 'same',
'bias_initializer': tf.zeros_initializer(),
'bias_regularizer': self._config_dict['bias_regularizer'],
}
if not self._config_dict['use_separable_conv']:
self._box_regressor_kwargs.update({
'kernel_initializer': tf_keras.initializers.RandomNormal(stddev=1e-5),
'kernel_regularizer': self._config_dict['kernel_regularizer'],
})
if self._config_dict['attribute_heads']:
self._init_attribute_kwargs()
def _conv_kwargs_new_kernel_init(self, conv_kwargs):
if 'kernel_initializer' in conv_kwargs:
conv_kwargs['kernel_initializer'] = tf_utils.clone_initializer(
conv_kwargs['kernel_initializer']
)
if 'pointwise_initializer' in conv_kwargs:
conv_kwargs['pointwise_initializer'] = tf_utils.clone_initializer(
conv_kwargs['pointwise_initializer']
)
if 'depthwise_initializer' in conv_kwargs:
conv_kwargs['depthwise_initializer'] = tf_utils.clone_initializer(
conv_kwargs['depthwise_initializer']
)
return conv_kwargs
def _init_attribute_kwargs(self):
self._attribute_kwargs = []
for att_config in self._config_dict['attribute_heads']:
att_type = att_config['type']
att_size = att_config['size']
att_prediction_tower_name = att_config['prediction_tower_name']
att_predictor_kwargs = {
'filters': att_size * self._config_dict['num_anchors_per_location'],
'kernel_size': 3,
'padding': 'same',
'bias_initializer': tf.zeros_initializer(),
'bias_regularizer': self._config_dict['bias_regularizer'],
}
if att_type == 'regression':
att_predictor_kwargs.update(
{'bias_initializer': tf.zeros_initializer()}
)
elif att_type == 'classification':
att_predictor_kwargs.update(
{
'bias_initializer': tf.constant_initializer(
-np.log((1 - 0.01) / 0.01)
)
}
)
else:
raise ValueError(
'Attribute head type {} not supported.'.format(att_type)
)
if (
att_prediction_tower_name
and self._config_dict['share_classification_heads']
):
raise ValueError(
'share_classification_heads cannot be set as True when'
' att_prediction_tower_name is specified.'
)
if not self._config_dict['use_separable_conv']:
att_predictor_kwargs.update({
'kernel_initializer': tf_keras.initializers.RandomNormal(
stddev=1e-5
),
'kernel_regularizer': self._config_dict['kernel_regularizer'],
})
self._attribute_kwargs.append(att_predictor_kwargs)
def _apply_prediction_tower(self, features, convs, norms) -> tf.Tensor:
x = features
for conv, norm in zip(convs, norms):
x = conv(x)
x = norm(x)
x = self._activation(x)
return x
def _apply_attribute_net(
self, attributes, level, level_idx, this_level_features, classnet_x
):
prediction_tower_output = {}
for att_config in self._config_dict['attribute_heads']:
att_name = att_config['name']
att_type = att_config['type']
if (
self._config_dict['share_classification_heads']
and att_type == 'classification'
):
attributes[att_name][str(level)] = self._att_predictors[att_name](
classnet_x
)
else:
def _apply_attribute_prediction_tower(
atttribute_name, features, feature_level
):
return self._apply_prediction_tower(
features,
self._att_convs[atttribute_name],
self._att_norms[atttribute_name][feature_level],
)
prediction_tower_name = att_config['prediction_tower_name']
if not prediction_tower_name:
attributes[att_name][str(level)] = self._att_predictors[att_name](
_apply_attribute_prediction_tower(
att_name, this_level_features, level_idx
)
)
else:
if prediction_tower_name not in prediction_tower_output:
prediction_tower_output[prediction_tower_name] = (
_apply_attribute_prediction_tower(
att_name, this_level_features, level_idx
)
)
attributes[att_name][str(level)] = self._att_predictors[att_name](
prediction_tower_output[prediction_tower_name]
)
def _build_prediction_tower(
self, net_name, predictor_name, conv_op, bn_op, predictor_kwargs
):
"""Builds the prediction tower. Convs across levels can be shared or not."""
convs = []
norms = []
for level in range(
self._config_dict['min_level'], self._config_dict['max_level'] + 1
):
if not self._config_dict['share_level_convs']:
this_level_convs = []
this_level_norms = []
for i in range(self._config_dict['num_convs']):
conv_kwargs = self._conv_kwargs_new_kernel_init(self._conv_kwargs)
if not self._config_dict['share_level_convs']:
# Do not share convs.
this_level_convs.append(
conv_op(name=f'{net_name}-conv_{level}_{i}', **conv_kwargs)
)
elif level == self._config_dict['min_level']:
convs.append(conv_op(name=f'{net_name}-conv_{i}', **conv_kwargs))
this_level_norms.append(
bn_op(name=f'{net_name}-conv-norm_{level}_{i}', **self._bn_kwargs)
)
norms.append(this_level_norms)
if not self._config_dict['share_level_convs']:
convs.append(this_level_convs)
# Create predictors after additional convs.
if self._config_dict['share_level_convs']:
predictors = conv_op(name=predictor_name, **predictor_kwargs)
else:
predictors = []
for level in range(
self._config_dict['min_level'], self._config_dict['max_level'] + 1
):
predictor_kwargs = self._conv_kwargs_new_kernel_init(predictor_kwargs)
predictors.append(
conv_op(name=f'{predictor_name}-{level}', **predictor_kwargs)
)
return convs, norms, predictors
def _build_attribute_net(self, conv_op, bn_op):
self._att_predictors = {}
self._att_convs = {}
self._att_norms = {}
for att_config, att_predictor_kwargs in zip(
self._config_dict['attribute_heads'], self._attribute_kwargs
):
att_name = att_config['name']
att_num_convs = (
att_config.get('num_convs') or self._config_dict['num_convs']
)
att_num_filters = (
att_config.get('num_filters') or self._config_dict['num_filters']
)
if att_num_convs < 0:
raise ValueError(f'Invalid `num_convs` {att_num_convs} for {att_name}.')
if att_num_filters < 0:
raise ValueError(
f'Invalid `num_filters` {att_num_filters} for {att_name}.'
)
att_conv_kwargs = self._conv_kwargs.copy()
att_conv_kwargs['filters'] = att_num_filters
att_convs_i = []
att_norms_i = []
# Build conv and norm layers.
for level in range(
self._config_dict['min_level'], self._config_dict['max_level'] + 1
):
this_level_att_norms = []
for i in range(att_num_convs):
if level == self._config_dict['min_level']:
att_conv_name = '{}-conv_{}'.format(att_name, i)
att_convs_i.append(conv_op(name=att_conv_name, **att_conv_kwargs))
att_norm_name = '{}-conv-norm_{}_{}'.format(att_name, level, i)
this_level_att_norms.append(
bn_op(name=att_norm_name, **self._bn_kwargs)
)
att_norms_i.append(this_level_att_norms)
self._att_convs[att_name] = att_convs_i
self._att_norms[att_name] = att_norms_i
# Build the final prediction layer.
self._att_predictors[att_name] = conv_op(
name='{}_attributes'.format(att_name), **att_predictor_kwargs
)
def build(self, input_shape: Union[tf.TensorShape, List[tf.TensorShape]]):
"""Creates the variables of the head."""
conv_op = (
tf_keras.layers.SeparableConv2D
if self._config_dict['use_separable_conv']
else tf_keras.layers.Conv2D
)
bn_op = (
tf_keras.layers.experimental.SyncBatchNormalization
if self._config_dict['use_sync_bn']
else tf_keras.layers.BatchNormalization
)
# Class net.
self._cls_convs, self._cls_norms, self._classifier = (
self._build_prediction_tower(
'classnet', 'scores', conv_op, bn_op, self._classifier_kwargs
)
)
# Box net.
self._box_convs, self._box_norms, self._box_regressor = (
self._build_prediction_tower(
'boxnet', 'boxes', conv_op, bn_op, self._box_regressor_kwargs
)
)
# Attribute learning nets.
if self._config_dict['attribute_heads']:
self._build_attribute_net(conv_op, bn_op)
super().build(input_shape)
def call(self, features: Mapping[str, tf.Tensor]):
"""Forward pass of the RetinaNet head.
Args:
features: A `dict` of `tf.Tensor` where
- key: A `str` of the level of the multilevel features.
- values: A `tf.Tensor`, the feature map tensors, whose shape is
[batch, height_l, width_l, channels].
Returns:
scores: A `dict` of `tf.Tensor` which includes scores of the predictions.
- key: A `str` of the level of the multilevel predictions.
- values: A `tf.Tensor` of the box scores predicted from a particular
feature level, whose shape is
[batch, height_l, width_l, num_classes * num_anchors_per_location].
boxes: A `dict` of `tf.Tensor` which includes coordinates of the
predictions.
- key: A `str` of the level of the multilevel predictions.
- values: A `tf.Tensor` of the box scores predicted from a particular
feature level, whose shape is
[batch, height_l, width_l,
num_params_per_anchor * num_anchors_per_location].
attributes: a dict of (attribute_name, attribute_prediction). Each
`attribute_prediction` is a dict of:
- key: `str`, the level of the multilevel predictions.
- values: `Tensor`, the box scores predicted from a particular feature
level, whose shape is
[batch, height_l, width_l,
attribute_size * num_anchors_per_location].
Can be an empty dictionary if no attribute learning is required.
"""
scores = {}
boxes = {}
if self._config_dict['attribute_heads']:
attributes = {
att_config['name']: {}
for att_config in self._config_dict['attribute_heads']
}
else:
attributes = {}
for i, level in enumerate(
range(self._config_dict['min_level'],
self._config_dict['max_level'] + 1)):
this_level_features = features[str(level)]
if self._config_dict['share_level_convs']:
cls_convs = self._cls_convs
box_convs = self._box_convs
classifier = self._classifier
box_regressor = self._box_regressor
else:
cls_convs = self._cls_convs[i]
box_convs = self._box_convs[i]
classifier = self._classifier[i]
box_regressor = self._box_regressor[i]
# Apply class net.
x = self._apply_prediction_tower(
this_level_features, cls_convs, self._cls_norms[i]
)
scores[str(level)] = classifier(x)
classnet_x = x
# Apply box net.
x = self._apply_prediction_tower(
this_level_features, box_convs, self._box_norms[i]
)
boxes[str(level)] = box_regressor(x)
# Apply attribute nets.
if self._config_dict['attribute_heads']:
self._apply_attribute_net(
attributes, level, i, this_level_features, classnet_x
)
return scores, boxes, attributes
def get_config(self):
return self._config_dict
@classmethod
def from_config(cls, config):
return cls(**config)
@tf_keras.utils.register_keras_serializable(package='Vision')
class RPNHead(tf_keras.layers.Layer):
"""Creates a Region Proposal Network (RPN) head."""
def __init__(
self,
min_level: int,
max_level: int,
num_anchors_per_location: int,
num_convs: int = 1,
num_filters: int = 256,
use_separable_conv: bool = False,
activation: str = 'relu',
use_sync_bn: bool = False,
norm_momentum: float = 0.99,
norm_epsilon: float = 0.001,
kernel_regularizer: Optional[tf_keras.regularizers.Regularizer] = None,
bias_regularizer: Optional[tf_keras.regularizers.Regularizer] = None,
**kwargs):
"""Initializes a Region Proposal Network head.
Args:
min_level: An `int` number of minimum feature level.
max_level: An `int` number of maximum feature level.
num_anchors_per_location: An `int` number of number of anchors per pixel
location.
num_convs: An `int` number that represents the number of the intermediate
convolution layers before the prediction.
num_filters: An `int` number that represents the number of filters of the
intermediate convolution layers.
use_separable_conv: A `bool` that indicates whether the separable
convolution layers is used.
activation: A `str` that indicates which activation is used, e.g. 'relu',
'swish', etc.
use_sync_bn: A `bool` that indicates whether to use synchronized batch
normalization across different replicas.
norm_momentum: A `float` of normalization momentum for the moving average.
norm_epsilon: A `float` added to variance to avoid dividing by zero.
kernel_regularizer: A `tf_keras.regularizers.Regularizer` object for
Conv2D. Default is None.
bias_regularizer: A `tf_keras.regularizers.Regularizer` object for Conv2D.
**kwargs: Additional keyword arguments to be passed.
"""
super(RPNHead, self).__init__(**kwargs)
self._config_dict = {
'min_level': min_level,
'max_level': max_level,
'num_anchors_per_location': num_anchors_per_location,
'num_convs': num_convs,
'num_filters': num_filters,
'use_separable_conv': use_separable_conv,
'activation': activation,
'use_sync_bn': use_sync_bn,
'norm_momentum': norm_momentum,
'norm_epsilon': norm_epsilon,
'kernel_regularizer': kernel_regularizer,
'bias_regularizer': bias_regularizer,
}
if tf_keras.backend.image_data_format() == 'channels_last':
self._bn_axis = -1
else:
self._bn_axis = 1
self._activation = tf_utils.get_activation(activation)
def build(self, input_shape):
"""Creates the variables of the head."""
conv_op = (tf_keras.layers.SeparableConv2D
if self._config_dict['use_separable_conv']
else tf_keras.layers.Conv2D)
conv_kwargs = {
'filters': self._config_dict['num_filters'],
'kernel_size': 3,
'padding': 'same',
'bias_initializer': tf.zeros_initializer(),
'bias_regularizer': self._config_dict['bias_regularizer'],
}
if not self._config_dict['use_separable_conv']:
conv_kwargs.update({
'kernel_initializer': tf_keras.initializers.RandomNormal(
stddev=0.01),
'kernel_regularizer': self._config_dict['kernel_regularizer'],
})
bn_op = (tf_keras.layers.experimental.SyncBatchNormalization
if self._config_dict['use_sync_bn']
else tf_keras.layers.BatchNormalization)
bn_kwargs = {
'axis': self._bn_axis,
'momentum': self._config_dict['norm_momentum'],
'epsilon': self._config_dict['norm_epsilon'],
}
self._convs = []
self._norms = []
for level in range(
self._config_dict['min_level'], self._config_dict['max_level'] + 1):
this_level_norms = []
for i in range(self._config_dict['num_convs']):
if level == self._config_dict['min_level']:
conv_name = 'rpn-conv_{}'.format(i)
if 'kernel_initializer' in conv_kwargs:
conv_kwargs['kernel_initializer'] = tf_utils.clone_initializer(
conv_kwargs['kernel_initializer'])
self._convs.append(conv_op(name=conv_name, **conv_kwargs))
norm_name = 'rpn-conv-norm_{}_{}'.format(level, i)
this_level_norms.append(bn_op(name=norm_name, **bn_kwargs))
self._norms.append(this_level_norms)
classifier_kwargs = {
'filters': self._config_dict['num_anchors_per_location'],
'kernel_size': 1,
'padding': 'valid',
'bias_initializer': tf.zeros_initializer(),
'bias_regularizer': self._config_dict['bias_regularizer'],
}
if not self._config_dict['use_separable_conv']:
classifier_kwargs.update({
'kernel_initializer': tf_keras.initializers.RandomNormal(
stddev=1e-5),
'kernel_regularizer': self._config_dict['kernel_regularizer'],
})
self._classifier = conv_op(name='rpn-scores', **classifier_kwargs)
box_regressor_kwargs = {
'filters': 4 * self._config_dict['num_anchors_per_location'],
'kernel_size': 1,
'padding': 'valid',
'bias_initializer': tf.zeros_initializer(),
'bias_regularizer': self._config_dict['bias_regularizer'],
}
if not self._config_dict['use_separable_conv']:
box_regressor_kwargs.update({
'kernel_initializer': tf_keras.initializers.RandomNormal(
stddev=1e-5),
'kernel_regularizer': self._config_dict['kernel_regularizer'],
})
self._box_regressor = conv_op(name='rpn-boxes', **box_regressor_kwargs)
super(RPNHead, self).build(input_shape)
def call(self, features: Mapping[str, tf.Tensor]):
"""Forward pass of the RPN head.
Args:
features: A `dict` of `tf.Tensor` where
- key: A `str` of the level of the multilevel features.
- values: A `tf.Tensor`, the feature map tensors, whose shape is [batch,
height_l, width_l, channels].
Returns:
scores: A `dict` of `tf.Tensor` which includes scores of the predictions.
- key: A `str` of the level of the multilevel predictions.
- values: A `tf.Tensor` of the box scores predicted from a particular
feature level, whose shape is
[batch, height_l, width_l, num_classes * num_anchors_per_location].
boxes: A `dict` of `tf.Tensor` which includes coordinates of the
predictions.
- key: A `str` of the level of the multilevel predictions.
- values: A `tf.Tensor` of the box scores predicted from a particular
feature level, whose shape is
[batch, height_l, width_l, 4 * num_anchors_per_location].
"""
scores = {}
boxes = {}
for i, level in enumerate(
range(self._config_dict['min_level'],
self._config_dict['max_level'] + 1)):
x = features[str(level)]
for conv, norm in zip(self._convs, self._norms[i]):
x = conv(x)
x = norm(x)
x = self._activation(x)
scores[str(level)] = self._classifier(x)
boxes[str(level)] = self._box_regressor(x)
return scores, boxes
def get_config(self):
return self._config_dict
@classmethod
def from_config(cls, config):
return cls(**config)