Spaces:
Runtime error
Runtime error
| # copyright (c) 2022 PaddlePaddle Authors. All Rights Reserve. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| """ | |
| Code is refer from: | |
| https://github.com/RuijieJ/pren/blob/main/Nets/EfficientNet.py | |
| """ | |
| from __future__ import absolute_import | |
| from __future__ import division | |
| from __future__ import print_function | |
| import math | |
| import re | |
| import collections | |
| import paddle | |
| import paddle.nn as nn | |
| import paddle.nn.functional as F | |
| __all__ = ['EfficientNetb3'] | |
| GlobalParams = collections.namedtuple('GlobalParams', [ | |
| 'batch_norm_momentum', 'batch_norm_epsilon', 'dropout_rate', 'num_classes', | |
| 'width_coefficient', 'depth_coefficient', 'depth_divisor', 'min_depth', | |
| 'drop_connect_rate', 'image_size' | |
| ]) | |
| BlockArgs = collections.namedtuple('BlockArgs', [ | |
| 'kernel_size', 'num_repeat', 'input_filters', 'output_filters', | |
| 'expand_ratio', 'id_skip', 'stride', 'se_ratio' | |
| ]) | |
| class BlockDecoder: | |
| def _decode_block_string(block_string): | |
| assert isinstance(block_string, str) | |
| ops = block_string.split('_') | |
| options = {} | |
| for op in ops: | |
| splits = re.split(r'(\d.*)', op) | |
| if len(splits) >= 2: | |
| key, value = splits[:2] | |
| options[key] = value | |
| assert (('s' in options and len(options['s']) == 1) or | |
| (len(options['s']) == 2 and options['s'][0] == options['s'][1])) | |
| return BlockArgs( | |
| kernel_size=int(options['k']), | |
| num_repeat=int(options['r']), | |
| input_filters=int(options['i']), | |
| output_filters=int(options['o']), | |
| expand_ratio=int(options['e']), | |
| id_skip=('noskip' not in block_string), | |
| se_ratio=float(options['se']) if 'se' in options else None, | |
| stride=[int(options['s'][0])]) | |
| def decode(string_list): | |
| assert isinstance(string_list, list) | |
| blocks_args = [] | |
| for block_string in string_list: | |
| blocks_args.append(BlockDecoder._decode_block_string(block_string)) | |
| return blocks_args | |
| def efficientnet(width_coefficient=None, | |
| depth_coefficient=None, | |
| dropout_rate=0.2, | |
| drop_connect_rate=0.2, | |
| image_size=None, | |
| num_classes=1000): | |
| blocks_args = [ | |
| 'r1_k3_s11_e1_i32_o16_se0.25', | |
| 'r2_k3_s22_e6_i16_o24_se0.25', | |
| 'r2_k5_s22_e6_i24_o40_se0.25', | |
| 'r3_k3_s22_e6_i40_o80_se0.25', | |
| 'r3_k5_s11_e6_i80_o112_se0.25', | |
| 'r4_k5_s22_e6_i112_o192_se0.25', | |
| 'r1_k3_s11_e6_i192_o320_se0.25', | |
| ] | |
| blocks_args = BlockDecoder.decode(blocks_args) | |
| global_params = GlobalParams( | |
| batch_norm_momentum=0.99, | |
| batch_norm_epsilon=1e-3, | |
| dropout_rate=dropout_rate, | |
| drop_connect_rate=drop_connect_rate, | |
| num_classes=num_classes, | |
| width_coefficient=width_coefficient, | |
| depth_coefficient=depth_coefficient, | |
| depth_divisor=8, | |
| min_depth=None, | |
| image_size=image_size, ) | |
| return blocks_args, global_params | |
| class EffUtils: | |
| def round_filters(filters, global_params): | |
| """ Calculate and round number of filters based on depth multiplier. """ | |
| multiplier = global_params.width_coefficient | |
| if not multiplier: | |
| return filters | |
| divisor = global_params.depth_divisor | |
| min_depth = global_params.min_depth | |
| filters *= multiplier | |
| min_depth = min_depth or divisor | |
| new_filters = max(min_depth, | |
| int(filters + divisor / 2) // divisor * divisor) | |
| if new_filters < 0.9 * filters: | |
| new_filters += divisor | |
| return int(new_filters) | |
| def round_repeats(repeats, global_params): | |
| """ Round number of filters based on depth multiplier. """ | |
| multiplier = global_params.depth_coefficient | |
| if not multiplier: | |
| return repeats | |
| return int(math.ceil(multiplier * repeats)) | |
| class MbConvBlock(nn.Layer): | |
| def __init__(self, block_args): | |
| super(MbConvBlock, self).__init__() | |
| self._block_args = block_args | |
| self.has_se = (self._block_args.se_ratio is not None) and \ | |
| (0 < self._block_args.se_ratio <= 1) | |
| self.id_skip = block_args.id_skip | |
| # expansion phase | |
| self.inp = self._block_args.input_filters | |
| oup = self._block_args.input_filters * self._block_args.expand_ratio | |
| if self._block_args.expand_ratio != 1: | |
| self._expand_conv = nn.Conv2D(self.inp, oup, 1, bias_attr=False) | |
| self._bn0 = nn.BatchNorm(oup) | |
| # depthwise conv phase | |
| k = self._block_args.kernel_size | |
| s = self._block_args.stride | |
| if isinstance(s, list): | |
| s = s[0] | |
| self._depthwise_conv = nn.Conv2D( | |
| oup, | |
| oup, | |
| groups=oup, | |
| kernel_size=k, | |
| stride=s, | |
| padding='same', | |
| bias_attr=False) | |
| self._bn1 = nn.BatchNorm(oup) | |
| # squeeze and excitation layer, if desired | |
| if self.has_se: | |
| num_squeezed_channels = max(1, | |
| int(self._block_args.input_filters * | |
| self._block_args.se_ratio)) | |
| self._se_reduce = nn.Conv2D(oup, num_squeezed_channels, 1) | |
| self._se_expand = nn.Conv2D(num_squeezed_channels, oup, 1) | |
| # output phase and some util class | |
| self.final_oup = self._block_args.output_filters | |
| self._project_conv = nn.Conv2D(oup, self.final_oup, 1, bias_attr=False) | |
| self._bn2 = nn.BatchNorm(self.final_oup) | |
| self._swish = nn.Swish() | |
| def _drop_connect(self, inputs, p, training): | |
| if not training: | |
| return inputs | |
| batch_size = inputs.shape[0] | |
| keep_prob = 1 - p | |
| random_tensor = keep_prob | |
| random_tensor += paddle.rand([batch_size, 1, 1, 1], dtype=inputs.dtype) | |
| random_tensor = paddle.to_tensor(random_tensor, place=inputs.place) | |
| binary_tensor = paddle.floor(random_tensor) | |
| output = inputs / keep_prob * binary_tensor | |
| return output | |
| def forward(self, inputs, drop_connect_rate=None): | |
| # expansion and depthwise conv | |
| x = inputs | |
| if self._block_args.expand_ratio != 1: | |
| x = self._swish(self._bn0(self._expand_conv(inputs))) | |
| x = self._swish(self._bn1(self._depthwise_conv(x))) | |
| # squeeze and excitation | |
| if self.has_se: | |
| x_squeezed = F.adaptive_avg_pool2d(x, 1) | |
| x_squeezed = self._se_expand( | |
| self._swish(self._se_reduce(x_squeezed))) | |
| x = F.sigmoid(x_squeezed) * x | |
| x = self._bn2(self._project_conv(x)) | |
| # skip conntection and drop connect | |
| if self.id_skip and self._block_args.stride == 1 and \ | |
| self.inp == self.final_oup: | |
| if drop_connect_rate: | |
| x = self._drop_connect( | |
| x, p=drop_connect_rate, training=self.training) | |
| x = x + inputs | |
| return x | |
| class EfficientNetb3_PREN(nn.Layer): | |
| def __init__(self, in_channels): | |
| super(EfficientNetb3_PREN, self).__init__() | |
| """ | |
| the fllowing are efficientnetb3's superparams, | |
| they means efficientnetb3 network's width, depth, resolution and | |
| dropout respectively, to fit for text recognition task, the resolution | |
| here is changed from 300 to 64. | |
| """ | |
| w, d, s, p = 1.2, 1.4, 64, 0.3 | |
| self._blocks_args, self._global_params = efficientnet( | |
| width_coefficient=w, | |
| depth_coefficient=d, | |
| dropout_rate=p, | |
| image_size=s) | |
| self.out_channels = [] | |
| # stem | |
| out_channels = EffUtils.round_filters(32, self._global_params) | |
| self._conv_stem = nn.Conv2D( | |
| in_channels, out_channels, 3, 2, padding='same', bias_attr=False) | |
| self._bn0 = nn.BatchNorm(out_channels) | |
| # build blocks | |
| self._blocks = [] | |
| # to extract three feature maps for fpn based on efficientnetb3 backbone | |
| self._concerned_block_idxes = [7, 17, 25] | |
| _concerned_idx = 0 | |
| for i, block_args in enumerate(self._blocks_args): | |
| block_args = block_args._replace( | |
| input_filters=EffUtils.round_filters(block_args.input_filters, | |
| self._global_params), | |
| output_filters=EffUtils.round_filters(block_args.output_filters, | |
| self._global_params), | |
| num_repeat=EffUtils.round_repeats(block_args.num_repeat, | |
| self._global_params)) | |
| self._blocks.append( | |
| self.add_sublayer(f"{i}-0", MbConvBlock(block_args))) | |
| _concerned_idx += 1 | |
| if _concerned_idx in self._concerned_block_idxes: | |
| self.out_channels.append(block_args.output_filters) | |
| if block_args.num_repeat > 1: | |
| block_args = block_args._replace( | |
| input_filters=block_args.output_filters, stride=1) | |
| for j in range(block_args.num_repeat - 1): | |
| self._blocks.append( | |
| self.add_sublayer(f'{i}-{j+1}', MbConvBlock(block_args))) | |
| _concerned_idx += 1 | |
| if _concerned_idx in self._concerned_block_idxes: | |
| self.out_channels.append(block_args.output_filters) | |
| self._swish = nn.Swish() | |
| def forward(self, inputs): | |
| outs = [] | |
| x = self._swish(self._bn0(self._conv_stem(inputs))) | |
| for idx, block in enumerate(self._blocks): | |
| drop_connect_rate = self._global_params.drop_connect_rate | |
| if drop_connect_rate: | |
| drop_connect_rate *= float(idx) / len(self._blocks) | |
| x = block(x, drop_connect_rate=drop_connect_rate) | |
| if idx in self._concerned_block_idxes: | |
| outs.append(x) | |
| return outs | |