# copyright (c) 2022 PaddlePaddle Authors. All Rights Reserve. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Code is refer from: https://github.com/RuijieJ/pren/blob/main/Nets/EfficientNet.py """ from __future__ import absolute_import from __future__ import division from __future__ import print_function import math import re import collections import paddle import paddle.nn as nn import paddle.nn.functional as F __all__ = ['EfficientNetb3'] GlobalParams = collections.namedtuple('GlobalParams', [ 'batch_norm_momentum', 'batch_norm_epsilon', 'dropout_rate', 'num_classes', 'width_coefficient', 'depth_coefficient', 'depth_divisor', 'min_depth', 'drop_connect_rate', 'image_size' ]) BlockArgs = collections.namedtuple('BlockArgs', [ 'kernel_size', 'num_repeat', 'input_filters', 'output_filters', 'expand_ratio', 'id_skip', 'stride', 'se_ratio' ]) class BlockDecoder: @staticmethod def _decode_block_string(block_string): assert isinstance(block_string, str) ops = block_string.split('_') options = {} for op in ops: splits = re.split(r'(\d.*)', op) if len(splits) >= 2: key, value = splits[:2] options[key] = value assert (('s' in options and len(options['s']) == 1) or (len(options['s']) == 2 and options['s'][0] == options['s'][1])) return BlockArgs( kernel_size=int(options['k']), num_repeat=int(options['r']), input_filters=int(options['i']), output_filters=int(options['o']), expand_ratio=int(options['e']), id_skip=('noskip' not in block_string), se_ratio=float(options['se']) if 'se' in options else None, stride=[int(options['s'][0])]) @staticmethod def decode(string_list): assert isinstance(string_list, list) blocks_args = [] for block_string in string_list: blocks_args.append(BlockDecoder._decode_block_string(block_string)) return blocks_args def efficientnet(width_coefficient=None, depth_coefficient=None, dropout_rate=0.2, drop_connect_rate=0.2, image_size=None, num_classes=1000): blocks_args = [ 'r1_k3_s11_e1_i32_o16_se0.25', 'r2_k3_s22_e6_i16_o24_se0.25', 'r2_k5_s22_e6_i24_o40_se0.25', 'r3_k3_s22_e6_i40_o80_se0.25', 'r3_k5_s11_e6_i80_o112_se0.25', 'r4_k5_s22_e6_i112_o192_se0.25', 'r1_k3_s11_e6_i192_o320_se0.25', ] blocks_args = BlockDecoder.decode(blocks_args) global_params = GlobalParams( batch_norm_momentum=0.99, batch_norm_epsilon=1e-3, dropout_rate=dropout_rate, drop_connect_rate=drop_connect_rate, num_classes=num_classes, width_coefficient=width_coefficient, depth_coefficient=depth_coefficient, depth_divisor=8, min_depth=None, image_size=image_size, ) return blocks_args, global_params class EffUtils: @staticmethod def round_filters(filters, global_params): """ Calculate and round number of filters based on depth multiplier. """ multiplier = global_params.width_coefficient if not multiplier: return filters divisor = global_params.depth_divisor min_depth = global_params.min_depth filters *= multiplier min_depth = min_depth or divisor new_filters = max(min_depth, int(filters + divisor / 2) // divisor * divisor) if new_filters < 0.9 * filters: new_filters += divisor return int(new_filters) @staticmethod def round_repeats(repeats, global_params): """ Round number of filters based on depth multiplier. """ multiplier = global_params.depth_coefficient if not multiplier: return repeats return int(math.ceil(multiplier * repeats)) class MbConvBlock(nn.Layer): def __init__(self, block_args): super(MbConvBlock, self).__init__() self._block_args = block_args self.has_se = (self._block_args.se_ratio is not None) and \ (0 < self._block_args.se_ratio <= 1) self.id_skip = block_args.id_skip # expansion phase self.inp = self._block_args.input_filters oup = self._block_args.input_filters * self._block_args.expand_ratio if self._block_args.expand_ratio != 1: self._expand_conv = nn.Conv2D(self.inp, oup, 1, bias_attr=False) self._bn0 = nn.BatchNorm(oup) # depthwise conv phase k = self._block_args.kernel_size s = self._block_args.stride if isinstance(s, list): s = s[0] self._depthwise_conv = nn.Conv2D( oup, oup, groups=oup, kernel_size=k, stride=s, padding='same', bias_attr=False) self._bn1 = nn.BatchNorm(oup) # squeeze and excitation layer, if desired if self.has_se: num_squeezed_channels = max(1, int(self._block_args.input_filters * self._block_args.se_ratio)) self._se_reduce = nn.Conv2D(oup, num_squeezed_channels, 1) self._se_expand = nn.Conv2D(num_squeezed_channels, oup, 1) # output phase and some util class self.final_oup = self._block_args.output_filters self._project_conv = nn.Conv2D(oup, self.final_oup, 1, bias_attr=False) self._bn2 = nn.BatchNorm(self.final_oup) self._swish = nn.Swish() def _drop_connect(self, inputs, p, training): if not training: return inputs batch_size = inputs.shape[0] keep_prob = 1 - p random_tensor = keep_prob random_tensor += paddle.rand([batch_size, 1, 1, 1], dtype=inputs.dtype) random_tensor = paddle.to_tensor(random_tensor, place=inputs.place) binary_tensor = paddle.floor(random_tensor) output = inputs / keep_prob * binary_tensor return output def forward(self, inputs, drop_connect_rate=None): # expansion and depthwise conv x = inputs if self._block_args.expand_ratio != 1: x = self._swish(self._bn0(self._expand_conv(inputs))) x = self._swish(self._bn1(self._depthwise_conv(x))) # squeeze and excitation if self.has_se: x_squeezed = F.adaptive_avg_pool2d(x, 1) x_squeezed = self._se_expand( self._swish(self._se_reduce(x_squeezed))) x = F.sigmoid(x_squeezed) * x x = self._bn2(self._project_conv(x)) # skip conntection and drop connect if self.id_skip and self._block_args.stride == 1 and \ self.inp == self.final_oup: if drop_connect_rate: x = self._drop_connect( x, p=drop_connect_rate, training=self.training) x = x + inputs return x class EfficientNetb3_PREN(nn.Layer): def __init__(self, in_channels): super(EfficientNetb3_PREN, self).__init__() """ the fllowing are efficientnetb3's superparams, they means efficientnetb3 network's width, depth, resolution and dropout respectively, to fit for text recognition task, the resolution here is changed from 300 to 64. """ w, d, s, p = 1.2, 1.4, 64, 0.3 self._blocks_args, self._global_params = efficientnet( width_coefficient=w, depth_coefficient=d, dropout_rate=p, image_size=s) self.out_channels = [] # stem out_channels = EffUtils.round_filters(32, self._global_params) self._conv_stem = nn.Conv2D( in_channels, out_channels, 3, 2, padding='same', bias_attr=False) self._bn0 = nn.BatchNorm(out_channels) # build blocks self._blocks = [] # to extract three feature maps for fpn based on efficientnetb3 backbone self._concerned_block_idxes = [7, 17, 25] _concerned_idx = 0 for i, block_args in enumerate(self._blocks_args): block_args = block_args._replace( input_filters=EffUtils.round_filters(block_args.input_filters, self._global_params), output_filters=EffUtils.round_filters(block_args.output_filters, self._global_params), num_repeat=EffUtils.round_repeats(block_args.num_repeat, self._global_params)) self._blocks.append( self.add_sublayer(f"{i}-0", MbConvBlock(block_args))) _concerned_idx += 1 if _concerned_idx in self._concerned_block_idxes: self.out_channels.append(block_args.output_filters) if block_args.num_repeat > 1: block_args = block_args._replace( input_filters=block_args.output_filters, stride=1) for j in range(block_args.num_repeat - 1): self._blocks.append( self.add_sublayer(f'{i}-{j+1}', MbConvBlock(block_args))) _concerned_idx += 1 if _concerned_idx in self._concerned_block_idxes: self.out_channels.append(block_args.output_filters) self._swish = nn.Swish() def forward(self, inputs): outs = [] x = self._swish(self._bn0(self._conv_stem(inputs))) for idx, block in enumerate(self._blocks): drop_connect_rate = self._global_params.drop_connect_rate if drop_connect_rate: drop_connect_rate *= float(idx) / len(self._blocks) x = block(x, drop_connect_rate=drop_connect_rate) if idx in self._concerned_block_idxes: outs.append(x) return outs