RetinaNet 是來自Facebook AI Research 團(tuán)隊(duì)2018年的新作瘸爽,主要貢獻(xiàn)成員有Tsung-Yi Lin, Priya Goyal, Ross Girshick, Kaiming He, Piotr Dollár。 在當(dāng)前的目標(biāo)檢測(cè)領(lǐng)域是最強(qiáng)的網(wǎng)絡(luò)(速度/精度/復(fù)雜度)铅忿。下面兩張是基于COCO 本地測(cè)試集的實(shí)驗(yàn)數(shù)據(jù):
可以看出在one stage的網(wǎng)絡(luò)中剪决,retinanet的精度是已經(jīng)可以和two stage的網(wǎng)絡(luò)媲美,甚至超越了一些two stage的經(jīng)典網(wǎng)絡(luò)辆沦。
Focus Loss
作為one stage的網(wǎng)絡(luò)昼捍,正負(fù)樣例分布不均勻?qū)τ诰W(wǎng)絡(luò)精度的潛在影響一直不可忽視。舉個(gè)簡(jiǎn)單的例子說明一下肢扯。當(dāng)識(shí)別一幅圖片時(shí)妒茬,one stage會(huì)生成大量的Anchor(候選框),這其中只有少數(shù)是正例(物體)蔚晨,大部分的候選框只是框住了背景乍钻,在計(jì)算損失的時(shí)候,網(wǎng)絡(luò)的loss會(huì)被大量的負(fù)例(背景)左右铭腕。two stage的網(wǎng)絡(luò)在這方面會(huì)做的好一些银择,因?yàn)樗麄儠?huì)在一開始將anchors做二分類,這等于做了一個(gè)初篩累舷,這樣一來就降低了正負(fù)樣本分布不均勻的情況浩考。
retinanet通過改變分類損失計(jì)算公式,很大程度上解決了分布不均勻帶來的影響被盈,計(jì)算公式如下:
讓我們從頭說起析孽。二分類誤差一般采用cross entropy(CE)交叉熵,對(duì)于熵這個(gè)概念不了解的可以看一下信息論的基礎(chǔ)知識(shí)只怎,它的計(jì)算公式如下:
我們進(jìn)一步泛化這個(gè)公式:
此時(shí)的交叉熵?fù)p失公式:
一個(gè)常用的平衡類別不均的方法是加上一個(gè)權(quán)重α(范圍[0,1]):
focal loss就是CE(pt)的基礎(chǔ)上再加上一個(gè)權(quán)重:
為啥加一個(gè)權(quán)重就能發(fā)揮如此大的作用袜瞬,我們可以舉一個(gè)例子說明:
設(shè)α=0.25 γ=2
前景的概率是p=0.9,那么現(xiàn)在的交叉熵是
CE(foreground) = -log(0.9) = 0.1053
CE(background) = -log(1–0.1) = 0.1053
FL(foreground) = -1 x 0.25 x (1–0.9)** 2 x log(0.9) = 0.00026
FL(background) = -1 x 0.25 x (1–(1–0.1))** 2 x log(1–0.1) = 0.00026
損失變成了原來的 1/384: 0.1/0.00026 = 384
如果前景的概率是p=0.1身堡,那么現(xiàn)在的交叉熵是
CE(foreground) = -log(0.1) = 2.3025
CE(background) = -log(1–0.9) = 2.3025
我們這里設(shè)a=0.25 γ=2
FL(foreground) = -1 x 0.25 x (1–0.1)** 2 x log(0.1) = 0.4667
FL(background) = -1 x 0.25 x (1–(1–0.9))** 2 x log(1–0.9) = 0.4667
損失變成了原來的 1/5: 2.3/0.4667 = 5
文章中也對(duì)α和γ的取值做了實(shí)驗(yàn)邓尤,得出結(jié)論是將γ=2時(shí)效果最好:
作者也進(jìn)行了一系列實(shí)驗(yàn),發(fā)現(xiàn)focal loss如果應(yīng)用在其他one stage網(wǎng)路中也有不錯(cuò)的效果贴谎。
網(wǎng)絡(luò)結(jié)構(gòu)
這么強(qiáng)大的性能汞扎,網(wǎng)絡(luò)結(jié)構(gòu)卻十分簡(jiǎn)單,可以歸納為: resnet(backbone) + FPN + FCN擅这。
為了方便大家更好的理解網(wǎng)絡(luò)細(xì)節(jié)澈魄,我會(huì)從retinanet的keras版本給大家做介紹,這份源碼是我目前看到寫的最清晰的一篇蕾哟,代碼的主要貢獻(xiàn)者Hans Gaiser就職于一家做包裹分揀和處理的公司Fizyr一忱,主要用到的技術(shù)就是深度學(xué)習(xí)和計(jì)算機(jī)視覺的一些知識(shí)莲蜘。
圖像經(jīng)過resnet主干網(wǎng)絡(luò),每經(jīng)過一個(gè)res_block帘营,圖像的size都要縮小一半:
retinanet選取最后的5層構(gòu)建anchors票渠,我們可以給它們命名為[C3, C4, C5, P6, P7]:
這個(gè)時(shí)候FPN登場(chǎng),通過結(jié)合多層的特征信息芬迄,網(wǎng)絡(luò)能夠更好的處理小的目標(biāo)问顷;同時(shí)融合了深層語(yǔ)義信息和淺層的圖片細(xì)節(jié)(局部特征,目標(biāo)定位等)信息禀梳,網(wǎng)絡(luò)的準(zhǔn)確性得到進(jìn)一步提升杜窄。
C3,C4算途,C5都經(jīng)過一個(gè)卷積層后后得到了P3塞耕,P4,P5嘴瓤,然后我們將每個(gè)"金字塔"(P3-7)都接上輸出扫外,再將他們都連接起來就得到了最終的結(jié)果:
每一層的輸出如下:
Keras版源碼
model
代碼支持resnet、mobilenet廓脆、vgg筛谚、densenet,
def backbone(backbone_name):
""" Returns a backbone object for the given backbone.
"""
if 'resnet' in backbone_name:
from .resnet import ResNetBackbone as b
elif 'mobilenet' in backbone_name:
from .mobilenet import MobileNetBackbone as b
elif 'vgg' in backbone_name:
from .vgg import VGGBackbone as b
elif 'densenet' in backbone_name:
from .densenet import DenseNetBackbone as b
else:
raise NotImplementedError('Backbone class for \'{}\' not implemented.'.format(backbone))
return b(backbone_name)
就拿論文中的用的resnet為例:
def retinanet(self, *args, **kwargs):
""" Returns a retinanet model using the correct backbone.
"""
return resnet_retinanet(*args, backbone=self.backbone, **kwargs)
def resnet_retinanet(num_classes, backbone='resnet50', inputs=None, modifier=None, **kwargs):
""" Constructs a retinanet model using a resnet backbone.
Args
num_classes: Number of classes to predict.
backbone: Which backbone to use (one of ('resnet50', 'resnet101', 'resnet152')).
inputs: The inputs to the network (defaults to a Tensor of shape (None, None, 3)).
modifier: A function handler which can modify the backbone before using it in retinanet (this can be used to freeze backbone layers for example).
Returns
RetinaNet model with a ResNet backbone.
"""
# choose default input
if inputs is None:
if keras.backend.image_data_format() == 'channels_first':
inputs = keras.layers.Input(shape=(3, None, None))
else:
inputs = keras.layers.Input(shape=(None, None, 3))
# create the resnet backbone
if backbone == 'resnet50':
resnet = keras_resnet.models.ResNet50(inputs, include_top=False, freeze_bn=True)
elif backbone == 'resnet101':
resnet = keras_resnet.models.ResNet101(inputs, include_top=False, freeze_bn=True)
elif backbone == 'resnet152':
resnet = keras_resnet.models.ResNet152(inputs, include_top=False, freeze_bn=True)
else:
raise ValueError('Backbone (\'{}\') is invalid.'.format(backbone))
# invoke modifier if given
if modifier:
resnet = modifier(resnet)
# create the full model
return retinanet.retinanet(inputs=inputs, num_classes=num_classes, backbone_layers=resnet.outputs[1:], **kwargs)
接下來看如何構(gòu)造整個(gè)網(wǎng)絡(luò)的模型
def retinanet(
inputs,
backbone_layers,
num_classes,
num_anchors = None,
create_pyramid_features = __create_pyramid_features,
submodels = None,
name = 'retinanet'
):
""" Construct a RetinaNet model on top of a backbone.
This model is the minimum model necessary for training (with the unfortunate exception of anchors as output).
Args
inputs : keras.layers.Input (or list of) for the input to the model.
num_classes : Number of classes to classify.
num_anchors : Number of base anchors.
create_pyramid_features : Functor for creating pyramid features given the features C3, C4, C5 from the backbone.
submodels : Submodels to run on each feature map (default is regression and classification submodels).
name : Name of the model.
Returns
A keras.models.Model which takes an image as input and outputs generated anchors and the result from each submodel on every pyramid level.
The order of the outputs is as defined in submodels:
```
[
regression, classification, other[0], other[1], ...
]
```
"""
if num_anchors is None:
num_anchors = AnchorParameters.default.num_anchors()
if submodels is None:
submodels = default_submodels(num_classes, num_anchors)
C3, C4, C5 = backbone_layers
# compute pyramid features as per https://arxiv.org/abs/1708.02002
features = create_pyramid_features(C3, C4, C5)
# for all pyramid levels, run available submodels
pyramids = __build_pyramid(submodels, features)
return keras.models.Model(inputs=inputs, outputs=pyramids, name=name)
submodels 就是每個(gè)金字塔層接上的輸出停忿,其結(jié)果用來做分類和目標(biāo)定位:
def default_submodels(num_classes, num_anchors):
""" Create a list of default submodels used for object detection.
The default submodels contains a regression submodel and a classification submodel.
Args
num_classes : Number of classes to use.
num_anchors : Number of base anchors.
Returns
A list of tuple, where the first element is the name of the submodel and the second element is the submodel itself.
"""
return [
('regression', default_regression_model(4, num_anchors)),
('classification', default_classification_model(num_classes, num_anchors))
]
回歸模型:
def default_regression_model(num_values, num_anchors, pyramid_feature_size=256, regression_feature_size=256, name='regression_submodel'):
""" Creates the default regression submodel.
Args
num_values : Number of values to regress.
num_anchors : Number of anchors to regress for each feature level.
pyramid_feature_size : The number of filters to expect from the feature pyramid levels.
regression_feature_size : The number of filters to use in the layers in the regression submodel.
name : The name of the submodel.
Returns
A keras.models.Model that predicts regression values for each anchor.
"""
# All new conv layers except the final one in the
# RetinaNet (classification) subnets are initialized
# with bias b = 0 and a Gaussian weight fill with stddev = 0.01.
options = {
'kernel_size' : 3,
'strides' : 1,
'padding' : 'same',
'kernel_initializer' : keras.initializers.normal(mean=0.0, stddev=0.01, seed=None),
'bias_initializer' : 'zeros'
}
if keras.backend.image_data_format() == 'channels_first':
inputs = keras.layers.Input(shape=(pyramid_feature_size, None, None))
else:
inputs = keras.layers.Input(shape=(None, None, pyramid_feature_size))
outputs = inputs
for i in range(4):
outputs = keras.layers.Conv2D(
filters=regression_feature_size,
activation='relu',
name='pyramid_regression_{}'.format(i),
**options
)(outputs)
outputs = keras.layers.Conv2D(num_anchors * num_values, name='pyramid_regression', **options)(outputs)
if keras.backend.image_data_format() == 'channels_first':
outputs = keras.layers.Permute((2, 3, 1), name='pyramid_regression_permute')(outputs)
outputs = keras.layers.Reshape((-1, num_values), name='pyramid_regression_reshape')(outputs)
return keras.models.Model(inputs=inputs, outputs=outputs, name=name)
金字塔的輸出進(jìn)入sub_model的回歸層驾讲,歷經(jīng)4個(gè)256 * 3 * 3,stride為1的卷積層席赂,再做個(gè)reshape的操作吮铭,得到 w * h * anchor_size * 4的輸出。
再來看分類模型:
def default_classification_model(
num_classes,
num_anchors,
pyramid_feature_size=256,
prior_probability=0.01,
classification_feature_size=256,
name='classification_submodel'
):
""" Creates the default regression submodel.
Args
num_classes : Number of classes to predict a score for at each feature level.
num_anchors : Number of anchors to predict classification scores for at each feature level.
pyramid_feature_size : The number of filters to expect from the feature pyramid levels.
classification_feature_size : The number of filters to use in the layers in the classification submodel.
name : The name of the submodel.
Returns
A keras.models.Model that predicts classes for each anchor.
"""
options = {
'kernel_size' : 3,
'strides' : 1,
'padding' : 'same',
}
if keras.backend.image_data_format() == 'channels_first':
inputs = keras.layers.Input(shape=(pyramid_feature_size, None, None))
else:
inputs = keras.layers.Input(shape=(None, None, pyramid_feature_size))
outputs = inputs
for i in range(4):
outputs = keras.layers.Conv2D(
filters=classification_feature_size,
activation='relu',
name='pyramid_classification_{}'.format(i),
kernel_initializer=keras.initializers.normal(mean=0.0, stddev=0.01, seed=None),
bias_initializer='zeros',
**options
)(outputs)
outputs = keras.layers.Conv2D(
filters=num_classes * num_anchors,
kernel_initializer=keras.initializers.normal(mean=0.0, stddev=0.01, seed=None),
bias_initializer=initializers.PriorProbability(probability=prior_probability),
name='pyramid_classification',
**options
)(outputs)
# reshape output and apply sigmoid
if keras.backend.image_data_format() == 'channels_first':
outputs = keras.layers.Permute((2, 3, 1), name='pyramid_classification_permute')(outputs)
outputs = keras.layers.Reshape((-1, num_classes), name='pyramid_classification_reshape')(outputs)
outputs = keras.layers.Activation('sigmoid', name='pyramid_classification_sigmoid')(outputs)
return keras.models.Model(inputs=inputs, outputs=outputs, name=name)
金字塔的輸出進(jìn)入sub_model的分類層氧枣,先歷經(jīng)4個(gè)256 * 3 * 3沐兵,stride為1的卷積層别垮,再通過一個(gè)number_classes(目標(biāo)類別的數(shù)量) * 9(每個(gè)像素生成的anchor的數(shù)量) * 3 * 3便监,stride為1的卷積層,然后reshape成 w * h * 9 * number_classes的張量碳想,最后經(jīng)過一個(gè)sigmod激活層烧董,得到每個(gè)anchor的分類。
再看一下金字塔的構(gòu)建:
def __create_pyramid_features(C3, C4, C5, feature_size=256):
""" Creates the FPN layers on top of the backbone features.
Args
C3 : Feature stage C3 from the backbone.
C4 : Feature stage C4 from the backbone.
C5 : Feature stage C5 from the backbone.
feature_size : The feature size to use for the resulting feature levels.
Returns
A list of feature levels [P3, P4, P5, P6, P7].
"""
# upsample C5 to get P5 from the FPN paper
P5 = keras.layers.Conv2D(feature_size, kernel_size=1, strides=1, padding='same', name='C5_reduced')(C5)
P5_upsampled = layers.UpsampleLike(name='P5_upsampled')([P5, C4])
P5 = keras.layers.Conv2D(feature_size, kernel_size=3, strides=1, padding='same', name='P5')(P5)
# add P5 elementwise to C4
P4 = keras.layers.Conv2D(feature_size, kernel_size=1, strides=1, padding='same', name='C4_reduced')(C4)
P4 = keras.layers.Add(name='P4_merged')([P5_upsampled, P4])
P4_upsampled = layers.UpsampleLike(name='P4_upsampled')([P4, C3])
P4 = keras.layers.Conv2D(feature_size, kernel_size=3, strides=1, padding='same', name='P4')(P4)
# add P4 elementwise to C3
P3 = keras.layers.Conv2D(feature_size, kernel_size=1, strides=1, padding='same', name='C3_reduced')(C3)
P3 = keras.layers.Add(name='P3_merged')([P4_upsampled, P3])
P3 = keras.layers.Conv2D(feature_size, kernel_size=3, strides=1, padding='same', name='P3')(P3)
# "P6 is obtained via a 3x3 stride-2 conv on C5"
P6 = keras.layers.Conv2D(feature_size, kernel_size=3, strides=2, padding='same', name='P6')(C5)
# "P7 is computed by applying ReLU followed by a 3x3 stride-2 conv on P6"
P7 = keras.layers.Activation('relu', name='C6_relu')(P6)
P7 = keras.layers.Conv2D(feature_size, kernel_size=3, strides=2, padding='same', name='P7')(P7)
return [P3, P4, P5, P6, P7]
- C3胧奔,C4逊移,C5先經(jīng)過一個(gè)256 * 1 * 1,stride為1的卷積層
- C5經(jīng)過一個(gè)256 * 3 * 3龙填,stride為1的卷積層得到P5
- C4和C5的上采樣層相加胳泉,再過一個(gè)256 * 3 * 3拐叉,stride為1的卷積層得到P4
- C3和C4的上采樣層相加,再過一個(gè)256 * 3 * 3扇商,stride為1的卷積層得到P3
此時(shí)我們得到了[P3, P4, P5, P6, P7]
最后一步凤瘦,就是將每個(gè)金字塔層和sub_model連接起來:
def __build_pyramid(models, features):
""" Applies all submodels to each FPN level.
Args
models : List of sumodels to run on each pyramid level (by default only regression, classifcation).
features : The FPN features.
Returns
A list of tensors, one for each submodel.
"""
return [__build_model_pyramid(n, m, features) for n, m in models]
def __build_model_pyramid(name, model, features):
""" Applies a single submodel to each FPN level.
Args
name : Name of the submodel.
model : The submodel to evaluate.
features : The FPN features.
Returns
A tensor containing the response from the submodel on the FPN features.
"""
return keras.layers.Concatenate(axis=1, name=name)([model(f) for f in features])
predict
看完了訓(xùn)練用的model結(jié)構(gòu),再看一下預(yù)測(cè)用的model:
# make prediction model
prediction_model = retinanet_bbox(model=model, anchor_params=anchor_params)
def retinanet_bbox(
model = None,
nms = True,
class_specific_filter = True,
name = 'retinanet-bbox',
anchor_params = None,
**kwargs
):
""" Construct a RetinaNet model on top of a backbone and adds convenience functions to output boxes directly.
This model uses the minimum retinanet model and appends a few layers to compute boxes within the graph.
These layers include applying the regression values to the anchors and performing NMS.
Args
model : RetinaNet model to append bbox layers to. If None, it will create a RetinaNet model using **kwargs.
nms : Whether to use non-maximum suppression for the filtering step.
class_specific_filter : Whether to use class specific filtering or filter for the best scoring class only.
name : Name of the model.
anchor_params : Struct containing anchor parameters. If None, default values are used.
*kwargs : Additional kwargs to pass to the minimal retinanet model.
Returns
A keras.models.Model which takes an image as input and outputs the detections on the image.
The order is defined as follows:
```
[
boxes, scores, labels, other[0], other[1], ...
]
```
"""
# if no anchor parameters are passed, use default values
if anchor_params is None:
anchor_params = AnchorParameters.default
# create RetinaNet model
if model is None:
model = retinanet(num_anchors=anchor_params.num_anchors(), **kwargs)
else:
assert_training_model(model)
# compute the anchors
features = [model.get_layer(p_name).output for p_name in ['P3', 'P4', 'P5', 'P6', 'P7']]
anchors = __build_anchors(anchor_params, features)
# we expect the anchors, regression and classification values as first output
regression = model.outputs[0]
classification = model.outputs[1]
# "other" can be any additional output from custom submodels, by default this will be []
other = model.outputs[2:]
# apply predicted regression to anchors
boxes = layers.RegressBoxes(name='boxes')([anchors, regression])
boxes = layers.ClipBoxes(name='clipped_boxes')([model.inputs[0], boxes])
# filter detections (apply NMS / score threshold / select top-k)
detections = layers.FilterDetections(
nms = nms,
class_specific_filter = class_specific_filter,
name = 'filtered_detections'
)([boxes, classification] + other)
# construct the model
return keras.models.Model(inputs=model.inputs, outputs=detections, name=name)
詳細(xì)步驟如下:
- 獲取features案铺,也就是[P3蔬芥,P4,P5控汉,P6笔诵,P7]的輸出,在每個(gè)輸出的圖上構(gòu)建的anchors姑子,構(gòu)建anchors的代碼
def __build_anchors(anchor_parameters, features):
""" Builds anchors for the shape of the features from FPN.
Args
anchor_parameters : Parameteres that determine how anchors are generated.
features : The FPN features.
Returns
A tensor containing the anchors for the FPN features.
The shape is:
```
(batch_size, num_anchors, 4)
```
"""
anchors = [
layers.Anchors(
size=anchor_parameters.sizes[i],
stride=anchor_parameters.strides[i],
ratios=anchor_parameters.ratios,
scales=anchor_parameters.scales,
name='anchors_{}'.format(i)
)(f) for i, f in enumerate(features)
]
return keras.layers.Concatenate(axis=1, name='anchors')(anchors)
看一下layers.Anchors這個(gè)類:
class Anchors(keras.layers.Layer):
""" Keras layer for generating achors for a given shape.
"""
def __init__(self, size, stride, ratios=None, scales=None, *args, **kwargs):
""" Initializer for an Anchors layer.
Args
size: The base size of the anchors to generate.
stride: The stride of the anchors to generate.
ratios: The ratios of the anchors to generate (defaults to AnchorParameters.default.ratios).
scales: The scales of the anchors to generate (defaults to AnchorParameters.default.scales).
"""
self.size = size
self.stride = stride
self.ratios = ratios
self.scales = scales
if ratios is None:
self.ratios = utils_anchors.AnchorParameters.default.ratios
elif isinstance(ratios, list):
self.ratios = np.array(ratios)
if scales is None:
self.scales = utils_anchors.AnchorParameters.default.scales
elif isinstance(scales, list):
self.scales = np.array(scales)
self.num_anchors = len(ratios) * len(scales)
self.anchors = keras.backend.variable(utils_anchors.generate_anchors(
base_size=size,
ratios=ratios,
scales=scales,
))
super(Anchors, self).__init__(*args, **kwargs)
def call(self, inputs, **kwargs):
features = inputs
features_shape = keras.backend.shape(features)
# generate proposals from bbox deltas and shifted anchors
if keras.backend.image_data_format() == 'channels_first':
anchors = backend.shift(features_shape[2:4], self.stride, self.anchors)
else:
anchors = backend.shift(features_shape[1:3], self.stride, self.anchors)
anchors = keras.backend.tile(keras.backend.expand_dims(anchors, axis=0), (features_shape[0], 1, 1))
return anchors
def compute_output_shape(self, input_shape):
if None not in input_shape[1:]:
if keras.backend.image_data_format() == 'channels_first':
total = np.prod(input_shape[2:4]) * self.num_anchors
else:
total = np.prod(input_shape[1:3]) * self.num_anchors
return (input_shape[0], total, 4)
else:
return (input_shape[0], None, 4)
def get_config(self):
config = super(Anchors, self).get_config()
config.update({
'size' : self.size,
'stride' : self.stride,
'ratios' : self.ratios.tolist(),
'scales' : self.scales.tolist(),
})
return config
- 將金字塔層的輸出應(yīng)用到anchors中去乎婿,得到anchors的坐標(biāo)
# apply predicted regression to anchors
boxes = layers.RegressBoxes(name='boxes')([anchors, regression])
boxes = layers.ClipBoxes(name='clipped_boxes')([model.inputs[0], boxes])
作者寫了一個(gè)繼承自keras.layers.layer的類RegressBoxes完成這個(gè)目標(biāo)
class RegressBoxes(keras.layers.Layer):
""" Keras layer for applying regression values to boxes.
"""
def __init__(self, mean=None, std=None, *args, **kwargs):
""" Initializer for the RegressBoxes layer.
Args
mean: The mean value of the regression values which was used for normalization.
std: The standard value of the regression values which was used for normalization.
"""
if mean is None:
mean = np.array([0, 0, 0, 0])
if std is None:
std = np.array([0.2, 0.2, 0.2, 0.2])
if isinstance(mean, (list, tuple)):
mean = np.array(mean)
elif not isinstance(mean, np.ndarray):
raise ValueError('Expected mean to be a np.ndarray, list or tuple. Received: {}'.format(type(mean)))
if isinstance(std, (list, tuple)):
std = np.array(std)
elif not isinstance(std, np.ndarray):
raise ValueError('Expected std to be a np.ndarray, list or tuple. Received: {}'.format(type(std)))
self.mean = mean
self.std = std
super(RegressBoxes, self).__init__(*args, **kwargs)
def call(self, inputs, **kwargs):
anchors, regression = inputs
return backend.bbox_transform_inv(anchors, regression, mean=self.mean, std=self.std)
def compute_output_shape(self, input_shape):
return input_shape[0]
def get_config(self):
config = super(RegressBoxes, self).get_config()
config.update({
'mean': self.mean.tolist(),
'std' : self.std.tolist(),
})
return config
重點(diǎn)來看這段代碼就可以了解retinanet是如何利用金字塔的回歸層生成目標(biāo)的錨框的
def bbox_transform_inv(boxes, deltas, mean=None, std=None):
""" Applies deltas (usually regression results) to boxes (usually anchors).
Before applying the deltas to the boxes, the normalization that was previously applied (in the generator) has to be removed.
The mean and std are the mean and std as applied in the generator. They are unnormalized in this function and then applied to the boxes.
Args
boxes : np.array of shape (B, N, 4), where B is the batch size, N the number of boxes and 4 values for (x1, y1, x2, y2).
deltas: np.array of same shape as boxes. These deltas (d_x1, d_y1, d_x2, d_y2) are a factor of the width/height.
mean : The mean value used when computing deltas (defaults to [0, 0, 0, 0]).
std : The standard deviation used when computing deltas (defaults to [0.2, 0.2, 0.2, 0.2]).
Returns
A np.array of the same shape as boxes, but with deltas applied to each box.
The mean and std are used during training to normalize the regression values (networks love normalization).
"""
if mean is None:
mean = [0, 0, 0, 0]
if std is None:
std = [0.2, 0.2, 0.2, 0.2]
width = boxes[:, :, 2] - boxes[:, :, 0]
height = boxes[:, :, 3] - boxes[:, :, 1]
x1 = boxes[:, :, 0] + (deltas[:, :, 0] * std[0] + mean[0]) * width
y1 = boxes[:, :, 1] + (deltas[:, :, 1] * std[1] + mean[1]) * height
x2 = boxes[:, :, 2] + (deltas[:, :, 2] * std[2] + mean[2]) * width
y2 = boxes[:, :, 3] + (deltas[:, :, 3] * std[3] + mean[3]) * height
pred_boxes = keras.backend.stack([x1, y1, x2, y2], axis=2)
return pred_boxes
跟YOLOV3,SSD不一樣街佑,我們可以看到retinanet直接預(yù)測(cè)目標(biāo)錨框的左下角(x1,y1)和右上角(x2,y2)的坐標(biāo)次酌。
- 最后就是篩選出目標(biāo)錨框
# filter detections (apply NMS / score threshold / select top-k)
detections = layers.FilterDetections(
nms = nms,
class_specific_filter = class_specific_filter,
name = 'filtered_detections'
)([boxes, classification] + other)
class FilterDetections(keras.layers.Layer):
""" Keras layer for filtering detections using score threshold and NMS.
"""
def __init__(
self,
nms = True,
class_specific_filter = True,
nms_threshold = 0.5,
score_threshold = 0.05,
max_detections = 300,
parallel_iterations = 32,
**kwargs
):
""" Filters detections using score threshold, NMS and selecting the top-k detections.
Args
nms : Flag to enable/disable NMS.
class_specific_filter : Whether to perform filtering per class, or take the best scoring class and filter those.
nms_threshold : Threshold for the IoU value to determine when a box should be suppressed.
score_threshold : Threshold used to prefilter the boxes with.
max_detections : Maximum number of detections to keep.
parallel_iterations : Number of batch items to process in parallel.
"""
self.nms = nms
self.class_specific_filter = class_specific_filter
self.nms_threshold = nms_threshold
self.score_threshold = score_threshold
self.max_detections = max_detections
self.parallel_iterations = parallel_iterations
super(FilterDetections, self).__init__(**kwargs)
def call(self, inputs, **kwargs):
""" Constructs the NMS graph.
Args
inputs : List of [boxes, classification, other[0], other[1], ...] tensors.
"""
boxes = inputs[0]
classification = inputs[1]
other = inputs[2:]
# wrap nms with our parameters
def _filter_detections(args):
boxes = args[0]
classification = args[1]
other = args[2]
return filter_detections(
boxes,
classification,
other,
nms = self.nms,
class_specific_filter = self.class_specific_filter,
score_threshold = self.score_threshold,
max_detections = self.max_detections,
nms_threshold = self.nms_threshold,
)
# call filter_detections on each batch
outputs = backend.map_fn(
_filter_detections,
elems=[boxes, classification, other],
dtype=[keras.backend.floatx(), keras.backend.floatx(), 'int32'] + [o.dtype for o in other],
parallel_iterations=self.parallel_iterations
)
return outputs
def compute_output_shape(self, input_shape):
""" Computes the output shapes given the input shapes.
Args
input_shape : List of input shapes [boxes, classification, other[0], other[1], ...].
Returns
List of tuples representing the output shapes:
[filtered_boxes.shape, filtered_scores.shape, filtered_labels.shape, filtered_other[0].shape, filtered_other[1].shape, ...]
"""
return [
(input_shape[0][0], self.max_detections, 4),
(input_shape[1][0], self.max_detections),
(input_shape[1][0], self.max_detections),
] + [
tuple([input_shape[i][0], self.max_detections] + list(input_shape[i][2:])) for i in range(2, len(input_shape))
]
def compute_mask(self, inputs, mask=None):
""" This is required in Keras when there is more than 1 output.
"""
return (len(inputs) + 1) * [None]
def get_config(self):
""" Gets the configuration of this layer.
Returns
Dictionary containing the parameters of this layer.
"""
config = super(FilterDetections, self).get_config()
config.update({
'nms' : self.nms,
'class_specific_filter' : self.class_specific_filter,
'nms_threshold' : self.nms_threshold,
'score_threshold' : self.score_threshold,
'max_detections' : self.max_detections,
'parallel_iterations' : self.parallel_iterations,
})
return config
主要關(guān)注這個(gè)方法:filter_detections
def filter_detections(
boxes,
classification,
other = [],
class_specific_filter = True,
nms = True,
score_threshold = 0.05,
max_detections = 300,
nms_threshold = 0.5
):
""" Filter detections using the boxes and classification values.
Args
boxes : Tensor of shape (num_boxes, 4) containing the boxes in (x1, y1, x2, y2) format.
classification : Tensor of shape (num_boxes, num_classes) containing the classification scores.
other : List of tensors of shape (num_boxes, ...) to filter along with the boxes and classification scores.
class_specific_filter : Whether to perform filtering per class, or take the best scoring class and filter those.
nms : Flag to enable/disable non maximum suppression.
score_threshold : Threshold used to prefilter the boxes with.
max_detections : Maximum number of detections to keep.
nms_threshold : Threshold for the IoU value to determine when a box should be suppressed.
Returns
A list of [boxes, scores, labels, other[0], other[1], ...].
boxes is shaped (max_detections, 4) and contains the (x1, y1, x2, y2) of the non-suppressed boxes.
scores is shaped (max_detections,) and contains the scores of the predicted class.
labels is shaped (max_detections,) and contains the predicted label.
other[i] is shaped (max_detections, ...) and contains the filtered other[i] data.
In case there are less than max_detections detections, the tensors are padded with -1's.
"""
def _filter_detections(scores, labels):
# threshold based on score
indices = backend.where(keras.backend.greater(scores, score_threshold))
if nms:
filtered_boxes = backend.gather_nd(boxes, indices)
filtered_scores = keras.backend.gather(scores, indices)[:, 0]
# perform NMS
nms_indices = backend.non_max_suppression(filtered_boxes, filtered_scores, max_output_size=max_detections, iou_threshold=nms_threshold)
# filter indices based on NMS
indices = keras.backend.gather(indices, nms_indices)
# add indices to list of all indices
labels = backend.gather_nd(labels, indices)
indices = keras.backend.stack([indices[:, 0], labels], axis=1)
return indices
if class_specific_filter:
all_indices = []
# perform per class filtering
for c in range(int(classification.shape[1])):
scores = classification[:, c]
labels = c * backend.ones((keras.backend.shape(scores)[0],), dtype='int64')
all_indices.append(_filter_detections(scores, labels))
# concatenate indices to single tensor
indices = keras.backend.concatenate(all_indices, axis=0)
else:
scores = keras.backend.max(classification, axis = 1)
labels = keras.backend.argmax(classification, axis = 1)
indices = _filter_detections(scores, labels)
# select top k
scores = backend.gather_nd(classification, indices)
labels = indices[:, 1]
scores, top_indices = backend.top_k(scores, k=keras.backend.minimum(max_detections, keras.backend.shape(scores)[0]))
# filter input using the final set of indices
indices = keras.backend.gather(indices[:, 0], top_indices)
boxes = keras.backend.gather(boxes, indices)
labels = keras.backend.gather(labels, top_indices)
other_ = [keras.backend.gather(o, indices) for o in other]
# zero pad the outputs
pad_size = keras.backend.maximum(0, max_detections - keras.backend.shape(scores)[0])
boxes = backend.pad(boxes, [[0, pad_size], [0, 0]], constant_values=-1)
scores = backend.pad(scores, [[0, pad_size]], constant_values=-1)
labels = backend.pad(labels, [[0, pad_size]], constant_values=-1)
labels = keras.backend.cast(labels, 'int32')
other_ = [backend.pad(o, [[0, pad_size]] + [[0, 0] for _ in range(1, len(o.shape))], constant_values=-1) for o in other_]
# set shapes, since we know what they are
boxes.set_shape([max_detections, 4])
scores.set_shape([max_detections])
labels.set_shape([max_detections])
for o, s in zip(other_, [list(keras.backend.int_shape(o)) for o in other]):
o.set_shape([max_detections] + s[1:])
return [boxes, scores, labels] + other_
這個(gè)function有以下幾個(gè)步驟
- 判斷是否需要對(duì)每個(gè)分類都做一個(gè)篩選,還是選出得分最高的那個(gè)分類
- 篩那些得分低于0.05的目標(biāo)
- 應(yīng)用NMS二次篩選
- 3次篩選舆乔,選出Topk(k = min(300,篩選出來的目標(biāo)數(shù)量))
loss
在model.compile這邊定義loss function和optimizer(Adam)
# compile model
training_model.compile(
loss={
'regression' : losses.smooth_l1(),
'classification': losses.focal()
},
optimizer=keras.optimizers.adam(lr=lr, clipnorm=0.001)
)
對(duì)于坐標(biāo)的損失岳服,采用現(xiàn)在比較流行的smooth L1,用他主要是預(yù)防梯度爆炸以及減少一些極端值的影響希俩,代碼如下:
def smooth_l1(sigma=3.0):
""" Create a smooth L1 loss functor.
Args
sigma: This argument defines the point where the loss changes from L2 to L1.
Returns
A functor for computing the smooth L1 loss given target data and predicted data.
"""
sigma_squared = sigma ** 2
def _smooth_l1(y_true, y_pred):
""" Compute the smooth L1 loss of y_pred w.r.t. y_true.
Args
y_true: Tensor from the generator of shape (B, N, 5). The last value for each box is the state of the anchor (ignore, negative, positive).
y_pred: Tensor from the network of shape (B, N, 4).
Returns
The smooth L1 loss of y_pred w.r.t. y_true.
"""
# separate target and state
regression = y_pred
regression_target = y_true[:, :, :-1]
anchor_state = y_true[:, :, -1]
# filter out "ignore" anchors
indices = backend.where(keras.backend.equal(anchor_state, 1))
regression = backend.gather_nd(regression, indices)
regression_target = backend.gather_nd(regression_target, indices)
# compute smooth L1 loss
# f(x) = 0.5 * (sigma * x)^2 if |x| < 1 / sigma / sigma
# |x| - 0.5 / sigma / sigma otherwise
regression_diff = regression - regression_target
regression_diff = keras.backend.abs(regression_diff)
regression_loss = backend.where(
keras.backend.less(regression_diff, 1.0 / sigma_squared),
0.5 * sigma_squared * keras.backend.pow(regression_diff, 2),
regression_diff - 0.5 / sigma_squared
)
# compute the normalizer: the number of positive anchors
normalizer = keras.backend.maximum(1, keras.backend.shape(indices)[0])
normalizer = keras.backend.cast(normalizer, dtype=keras.backend.floatx())
return keras.backend.sum(regression_loss) / normalizer
return _smooth_l1
分類損失計(jì)算我們采用本文最突出的貢獻(xiàn)Focal loss:
def focal(alpha=0.25, gamma=2.0):
""" Create a functor for computing the focal loss.
Args
alpha: Scale the focal weight with alpha.
gamma: Take the power of the focal weight with gamma.
Returns
A functor that computes the focal loss using the alpha and gamma.
"""
def _focal(y_true, y_pred):
""" Compute the focal loss given the target tensor and the predicted tensor.
As defined in https://arxiv.org/abs/1708.02002
Args
y_true: Tensor of target data from the generator with shape (B, N, num_classes).
y_pred: Tensor of predicted data from the network with shape (B, N, num_classes).
Returns
The focal loss of y_pred w.r.t. y_true.
"""
labels = y_true[:, :, :-1]
anchor_state = y_true[:, :, -1] # -1 for ignore, 0 for background, 1 for object
classification = y_pred
# filter out "ignore" anchors
indices = backend.where(keras.backend.not_equal(anchor_state, -1))
labels = backend.gather_nd(labels, indices)
classification = backend.gather_nd(classification, indices)
# compute the focal loss
alpha_factor = keras.backend.ones_like(labels) * alpha
alpha_factor = backend.where(keras.backend.equal(labels, 1), alpha_factor, 1 - alpha_factor)
focal_weight = backend.where(keras.backend.equal(labels, 1), 1 - classification, classification)
focal_weight = alpha_factor * focal_weight ** gamma
cls_loss = focal_weight * keras.backend.binary_crossentropy(labels, classification)
# compute the normalizer: the number of positive anchors
normalizer = backend.where(keras.backend.equal(anchor_state, 1))
normalizer = keras.backend.cast(keras.backend.shape(normalizer)[0], keras.backend.floatx())
normalizer = keras.backend.maximum(keras.backend.cast_to_floatx(1.0), normalizer)
return keras.backend.sum(cls_loss) / normalizer
return _focal
這邊還要講一個(gè)細(xì)節(jié):y_true是如何生成的:
- 生成anchors
def anchors_for_shape(
image_shape,
pyramid_levels=None,
anchor_params=None,
shapes_callback=None,
):
""" Generators anchors for a given shape.
Args
image_shape: The shape of the image.
pyramid_levels: List of ints representing which pyramids to use (defaults to [3, 4, 5, 6, 7]).
anchor_params: Struct containing anchor parameters. If None, default values are used.
shapes_callback: Function to call for getting the shape of the image at different pyramid levels.
Returns
np.array of shape (N, 4) containing the (x1, y1, x2, y2) coordinates for the anchors.
"""
if pyramid_levels is None:
pyramid_levels = [3, 4, 5, 6, 7]
if anchor_params is None:
anchor_params = AnchorParameters.default
if shapes_callback is None:
shapes_callback = guess_shapes
image_shapes = shapes_callback(image_shape, pyramid_levels)
# compute anchors over all pyramid levels
all_anchors = np.zeros((0, 4))
for idx, p in enumerate(pyramid_levels):
anchors = generate_anchors(
base_size=anchor_params.sizes[idx],
ratios=anchor_params.ratios,
scales=anchor_params.scales
)
shifted_anchors = shift(image_shapes[idx], anchor_params.strides[idx], anchors)
all_anchors = np.append(all_anchors, shifted_anchors, axis=0)
return all_anchors
有兩個(gè)關(guān)鍵的function吊宋,(1)generate_anchors 生成一個(gè)default anchor
def generate_anchors(base_size=16, ratios=None, scales=None):
"""
Generate anchor (reference) windows by enumerating aspect ratios X
scales w.r.t. a reference window.
"""
if ratios is None:
ratios = AnchorParameters.default.ratios
if scales is None:
scales = AnchorParameters.default.scales
num_anchors = len(ratios) * len(scales)
# initialize output anchors
anchors = np.zeros((num_anchors, 4))
# scale base_size
anchors[:, 2:] = base_size * np.tile(scales, (2, len(ratios))).T
# compute areas of anchors
areas = anchors[:, 2] * anchors[:, 3]
# correct for ratios
anchors[:, 2] = np.sqrt(areas / np.repeat(ratios, len(scales)))
anchors[:, 3] = anchors[:, 2] * np.repeat(ratios, len(scales))
# transform from (x_ctr, y_ctr, w, h) -> (x1, y1, x2, y2)
anchors[:, 0::2] -= np.tile(anchors[:, 2] * 0.5, (2, 1)).T
anchors[:, 1::2] -= np.tile(anchors[:, 3] * 0.5, (2, 1)).T
return anchors
(2)在圖片上移動(dòng)這個(gè)默認(rèn)的anchor
def shift(shape, stride, anchors):
""" Produce shifted anchors based on shape of the map and stride size.
Args
shape : Shape to shift the anchors over.
stride : Stride to shift the anchors with over the shape.
anchors: The anchors to apply at each location.
"""
# create a grid starting from half stride from the top left corner
shift_x = (np.arange(0, shape[1]) + 0.5) * stride
shift_y = (np.arange(0, shape[0]) + 0.5) * stride
shift_x, shift_y = np.meshgrid(shift_x, shift_y)
shifts = np.vstack((
shift_x.ravel(), shift_y.ravel(),
shift_x.ravel(), shift_y.ravel()
)).transpose()
# add A anchors (1, A, 4) to
# cell K shifts (K, 1, 4) to get
# shift anchors (K, A, 4)
# reshape to (K*A, 4) shifted anchors
A = anchors.shape[0]
K = shifts.shape[0]
all_anchors = (anchors.reshape((1, A, 4)) + shifts.reshape((1, K, 4)).transpose((1, 0, 2)))
all_anchors = all_anchors.reshape((K * A, 4))
return all_anchors
- 標(biāo)記之前生成的anchors,并且賦予每個(gè)anchor一個(gè)state:正例(iou超過0.5的foreground)為1颜武,負(fù)例(iou低于0.4的background)為0璃搜,還有一些需要我們篩掉的例子(iou介于0.4和0.5之間)。最后生成的regression_batch, labels_batch就是loss function中的y_true鳞上,他們的shape和模型預(yù)測(cè)出來的shape是一樣的这吻。
def anchor_targets_bbox(
anchors,
image_group,
annotations_group,
num_classes,
negative_overlap=0.4,
positive_overlap=0.5
):
""" Generate anchor targets for bbox detection.
Args
anchors: np.array of annotations of shape (N, 4) for (x1, y1, x2, y2).
image_group: List of BGR images.
annotations_group: List of annotations (np.array of shape (N, 5) for (x1, y1, x2, y2, label)).
num_classes: Number of classes to predict.
mask_shape: If the image is padded with zeros, mask_shape can be used to mark the relevant part of the image.
negative_overlap: IoU overlap for negative anchors (all anchors with overlap < negative_overlap are negative).
positive_overlap: IoU overlap or positive anchors (all anchors with overlap > positive_overlap are positive).
Returns
labels_batch: batch that contains labels & anchor states (np.array of shape (batch_size, N, num_classes + 1),
where N is the number of anchors for an image and the last column defines the anchor state (-1 for ignore, 0 for bg, 1 for fg).
regression_batch: batch that contains bounding-box regression targets for an image & anchor states (np.array of shape (batch_size, N, 4 + 1),
where N is the number of anchors for an image, the first 4 columns define regression targets for (x1, y1, x2, y2) and the
last column defines anchor states (-1 for ignore, 0 for bg, 1 for fg).
"""
assert(len(image_group) == len(annotations_group)), "The length of the images and annotations need to be equal."
assert(len(annotations_group) > 0), "No data received to compute anchor targets for."
for annotations in annotations_group:
assert('bboxes' in annotations), "Annotations should contain bboxes."
assert('labels' in annotations), "Annotations should contain labels."
batch_size = len(image_group)
regression_batch = np.zeros((batch_size, anchors.shape[0], 4 + 1), dtype=keras.backend.floatx())
labels_batch = np.zeros((batch_size, anchors.shape[0], num_classes + 1), dtype=keras.backend.floatx())
# compute labels and regression targets
for index, (image, annotations) in enumerate(zip(image_group, annotations_group)):
if annotations['bboxes'].shape[0]:
# obtain indices of gt annotations with the greatest overlap
positive_indices, ignore_indices, argmax_overlaps_inds = compute_gt_annotations(anchors, annotations['bboxes'], negative_overlap, positive_overlap)
labels_batch[index, ignore_indices, -1] = -1
labels_batch[index, positive_indices, -1] = 1
regression_batch[index, ignore_indices, -1] = -1
regression_batch[index, positive_indices, -1] = 1
# compute target class labels
labels_batch[index, positive_indices, annotations['labels'][argmax_overlaps_inds[positive_indices]].astype(int)] = 1
regression_batch[index, :, :-1] = bbox_transform(anchors, annotations['bboxes'][argmax_overlaps_inds, :])
# ignore annotations outside of image
if image.shape:
anchors_centers = np.vstack([(anchors[:, 0] + anchors[:, 2]) / 2, (anchors[:, 1] + anchors[:, 3]) / 2]).T
indices = np.logical_or(anchors_centers[:, 0] >= image.shape[1], anchors_centers[:, 1] >= image.shape[0])
labels_batch[index, indices, -1] = -1
regression_batch[index, indices, -1] = -1
return regression_batch, labels_batch