jukebox / tensorboardX /tests /test_caffe2.py
bds2714's picture
Upload 331 files
c508d7f
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from tensorboardX import SummaryWriter
import os
import unittest
# try:
import numpy as np
import caffe2.python.brew as brew
import caffe2.python.cnn as cnn
import caffe2.python.core as core
import caffe2.python.model_helper as model_helper
from caffe2.proto import caffe2_pb2
from caffe2.python import workspace
import tensorboardX.caffe2_graph as tb
from tensorboardX import x2num
from .expect_reader import compare_proto, write_proto
class Caffe2Test(unittest.TestCase):
def test_caffe2_np(self):
workspace.FeedBlob("testBlob", np.random.randn(1, 3, 64, 64).astype(np.float32))
assert isinstance(x2num.make_np('testBlob'), np.ndarray)
# assert isinstance(x2num.make_np('testBlob', 'IMG'), np.ndarray)
def test_that_operators_gets_non_colliding_names(self):
op = caffe2_pb2.OperatorDef()
op.type = 'foo'
op.input.extend(['foo'])
tb._fill_missing_operator_names([op])
self.assertEqual(op.input[0], 'foo')
self.assertEqual(op.name, 'foo_1')
def test_that_replacing_colons_gives_non_colliding_names(self):
# .. and update shapes
op = caffe2_pb2.OperatorDef()
op.name = 'foo:0'
op.input.extend(['foo:0', 'foo$0'])
shapes = {'foo:0': [1]}
blob_name_tracker = tb._get_blob_names([op])
tb._replace_colons(shapes, blob_name_tracker, [op], '$')
self.assertEqual(op.input[0], 'foo$0')
self.assertEqual(op.input[1], 'foo$0_1')
# Collision but blobs and op names are handled later by
# _fill_missing_operator_names.
self.assertEqual(op.name, 'foo$0')
self.assertEqual(len(shapes), 1)
self.assertEqual(shapes['foo$0'], [1])
self.assertEqual(len(blob_name_tracker), 2)
self.assertEqual(blob_name_tracker['foo$0'], 'foo:0')
self.assertEqual(blob_name_tracker['foo$0_1'], 'foo$0')
def test_that_adding_gradient_scope_does_no_fancy_renaming(self):
# because it cannot create collisions
op = caffe2_pb2.OperatorDef()
op.name = 'foo_grad'
op.input.extend(['foo_grad', 'foo_grad_1'])
shapes = {'foo_grad': [1]}
blob_name_tracker = tb._get_blob_names([op])
tb._add_gradient_scope(shapes, blob_name_tracker, [op])
self.assertEqual(op.input[0], 'GRADIENTS/foo_grad')
self.assertEqual(op.input[1], 'GRADIENTS/foo_grad_1')
self.assertEqual(op.name, 'GRADIENTS/foo_grad')
self.assertEqual(len(shapes), 1)
self.assertEqual(shapes['GRADIENTS/foo_grad'], [1])
self.assertEqual(len(blob_name_tracker), 2)
self.assertEqual(
blob_name_tracker['GRADIENTS/foo_grad'], 'foo_grad')
self.assertEqual(
blob_name_tracker['GRADIENTS/foo_grad_1'], 'foo_grad_1')
def test_that_auto_ssa_gives_non_colliding_names(self):
op1 = caffe2_pb2.OperatorDef()
op1.output.extend(['foo'])
op2 = caffe2_pb2.OperatorDef()
op2.input.extend(['foo'])
op2.output.extend(['foo'])
op2.output.extend(['foo_1'])
shapes = {'foo': [1], 'foo_1': [2]}
blob_name_tracker = tb._get_blob_names([op1, op2])
tb._convert_to_ssa(shapes, blob_name_tracker, [op1, op2])
self.assertEqual(op1.output[0], 'foo')
self.assertEqual(op2.input[0], 'foo')
self.assertEqual(op2.output[0], 'foo_1')
# Unfortunate name but we do not parse original `_` for now.
self.assertEqual(op2.output[1], 'foo_1_1')
self.assertEqual(len(shapes), 3)
self.assertEqual(shapes['foo'], [1])
self.assertEqual(shapes['foo_1'], [1])
self.assertEqual(shapes['foo_1_1'], [2])
self.assertEqual(len(blob_name_tracker), 3)
self.assertEqual(blob_name_tracker['foo'], 'foo')
self.assertEqual(blob_name_tracker['foo_1'], 'foo')
self.assertEqual(blob_name_tracker['foo_1_1'], 'foo_1')
def test_renaming_tensorflow_style(self):
# Construct some dummy operators here
# NOTE: '_w', '_bn', etc without the postfix '_' are only renamed when
# they are at the very end of the name.
# Test that '_w', '_w_' are renamed to '/weight', '/weight_', resp.
op1 = caffe2_pb2.OperatorDef()
op1.input.extend(['foo_w'])
op1.output.extend(['foo_w_2'])
# Test that '_bn', '_bn_' are renamed to '/batchnorm', '/batchnorm_',
# respectively.
op2 = caffe2_pb2.OperatorDef()
op2.input.extend(['foo_bn'])
op2.output.extend(['foo_bn_2'])
# Test that '_b', '_b_', are renamed to '/bias', '/bias_', resp.
op3 = caffe2_pb2.OperatorDef()
op3.input.extend(['foo_b'])
op3.output.extend(['foo_b_2'])
# Test that '_s', '_s_', are renamed to '/scale', '/scale_', resp.
op4 = caffe2_pb2.OperatorDef()
op4.input.extend(['foo_s'])
op4.output.extend(['foo_s_2'])
# Test that '_sum', '_sum_', are renamed to '/sum', '/sum_', resp.
op5 = caffe2_pb2.OperatorDef()
op5.input.extend(['foo_sum'])
op5.output.extend(['foo_sum_2'])
# Test that '_branch', '_branch_', are renamed to '/branch', '/branch_',
# respectively. Multiple inputs/outputs are also tested in this case.
op6 = caffe2_pb2.OperatorDef()
op6.input.extend(['foo_branch'])
op6.input.extend(['test_branch_2'])
op6.output.extend(['foo_branch_3'])
op6.output.extend(['test_branch4'])
shapes = {
'foo_w': [1], 'foo_w_2': [2], 'foo_bn': [3], 'foo_bn_2': [4],
'foo_b': [5], 'foo_b_2': [6], 'foo_s': [7], 'foo_s_2': [8],
'foo_sum': [9], 'foo_sum_2': [10], 'foo_branch': [11],
'test_branch_2': [12], 'foo_branch_3': [13], 'test_branch4': [14],
}
ops = [op1, op2, op3, op4, op5, op6]
blob_name_tracker = tb._get_blob_names(ops)
tb._rename_tensorflow_style(shapes, blob_name_tracker, ops)
# Testing that keys in blob name tracker were renamed correctly
self.assertEqual(blob_name_tracker['foo/weight'], 'foo_w')
self.assertEqual(blob_name_tracker['foo/weight_2'], 'foo_w_2')
self.assertEqual(blob_name_tracker['foo/batchnorm'], 'foo_bn')
self.assertEqual(blob_name_tracker['foo/batchnorm_2'], 'foo_bn_2')
self.assertEqual(blob_name_tracker['foo/bias'], 'foo_b')
self.assertEqual(blob_name_tracker['foo/bias_2'], 'foo_b_2')
self.assertEqual(blob_name_tracker['foo/scale'], 'foo_s')
self.assertEqual(blob_name_tracker['foo/scale_2'], 'foo_s_2')
self.assertEqual(blob_name_tracker['foo/sum'], 'foo_sum')
self.assertEqual(blob_name_tracker['foo/sum_2'], 'foo_sum_2')
self.assertEqual(blob_name_tracker['foo/branch'], 'foo_branch')
self.assertEqual(blob_name_tracker['test/branch_2'], 'test_branch_2')
self.assertEqual(blob_name_tracker['foo/branch_3'], 'foo_branch_3')
self.assertEqual(blob_name_tracker['test/branch4'], 'test_branch4')
# Testing that keys in shapes were renamed correctly
self.assertEqual(shapes['foo/weight'], [1])
self.assertEqual(shapes['foo/batchnorm_2'], [4])
self.assertEqual(shapes['foo/sum'], [9])
self.assertEqual(shapes['test/branch_2'], [12])
# Testing that the ops were renamed correctly
self.assertEqual(op1.input[0], 'foo/weight')
self.assertEqual(op1.output[0], 'foo/weight_2')
self.assertEqual(op2.input[0], 'foo/batchnorm')
self.assertEqual(op2.output[0], 'foo/batchnorm_2')
self.assertEqual(op3.input[0], 'foo/bias')
self.assertEqual(op3.output[0], 'foo/bias_2')
self.assertEqual(op4.input[0], 'foo/scale')
self.assertEqual(op4.output[0], 'foo/scale_2')
self.assertEqual(op5.input[0], 'foo/sum')
self.assertEqual(op5.output[0], 'foo/sum_2')
self.assertEqual(op6.input[0], 'foo/branch')
self.assertEqual(op6.input[1], 'test/branch_2')
self.assertEqual(op6.output[0], 'foo/branch_3')
self.assertEqual(op6.output[1], 'test/branch4')
def test_filter_ops(self):
op1 = caffe2_pb2.OperatorDef()
op1.input.extend(['remove_this'])
op1.output.extend(['random_output'])
op2 = caffe2_pb2.OperatorDef()
op2.input.extend(['leave_this'])
op2.output.extend(['leave_this_also'])
op3 = caffe2_pb2.OperatorDef()
op3.input.extend(['random_input'])
op3.output.extend(['remove_this_also'])
def filter_fn(blob):
# Filter all blobs with names containing 'remove'
return 'remove' not in str(blob)
op_set1 = [op1, op2, op3]
op_set2 = [op1, op2, op3]
# Test case for when perform_filter = True.
result_ops1 = tb._filter_ops(op_set1, filter_fn, True)
new_op1, new_op2 = result_ops1[0], result_ops1[1]
# input named 'remove_this' should have been filtered
self.assertEqual(len(new_op1.input), 0)
self.assertEqual(new_op1.output, ['random_output'])
self.assertEqual(new_op2.input, ['leave_this'])
self.assertEqual(new_op2.output, ['leave_this_also'])
# output named 'remove_this_also' should have been filtered as well.
# This should have also removed op3 as the filter function excludes ops
# with no outputs.
self.assertEqual(len(result_ops1), 2)
# Test case for when perform_filter = False. op_set2 should remain
# unchanged.
result_ops2 = tb._filter_ops(op_set2, filter_fn, False)
self.assertEqual(result_ops2, op_set2)
# Use show_simplified=False. This shows the original style of graph
# visualization from caffe2.contrib.tensorboard.
# TODO: Add test for show_simplified=True.
def test_simple_cnnmodel(self):
model = cnn.CNNModelHelper("NCHW", name="overfeat")
workspace.FeedBlob("data", np.random.randn(1, 3, 64, 64).astype(np.float32))
workspace.FeedBlob("label", np.random.randn(1, 1000).astype(np.int))
with core.NameScope("conv1"):
conv1 = model.Conv("data", "conv1", 3, 96, 11, stride=4)
relu1 = model.Relu(conv1, conv1)
pool1 = model.MaxPool(relu1, "pool1", kernel=2, stride=2)
with core.NameScope("classifier"):
fc = model.FC(pool1, "fc", 4096, 1000)
pred = model.Softmax(fc, "pred")
xent = model.LabelCrossEntropy([pred, "label"], "xent")
loss = model.AveragedLoss(xent, "loss")
blob_name_tracker = {}
graph = tb.model_to_graph_def(
model,
blob_name_tracker=blob_name_tracker,
shapes={},
show_simplified=False,
)
compare_proto(graph, self)
# cnn.CNNModelHelper is deprecated, so we also test with
# model_helper.ModelHelper. The model used in this test is taken from the
# Caffe2 MNIST tutorial. Also use show_simplified=False here.
def test_simple_model(self):
model = model_helper.ModelHelper(name="mnist")
# how come those inputs don't break the forward pass =.=a
workspace.FeedBlob("data", np.random.randn(1, 3, 64, 64).astype(np.float32))
workspace.FeedBlob("label", np.random.randn(1, 1000).astype(np.int))
with core.NameScope("conv1"):
conv1 = brew.conv(model, "data", 'conv1', dim_in=1, dim_out=20, kernel=5)
# Image size: 24 x 24 -> 12 x 12
pool1 = brew.max_pool(model, conv1, 'pool1', kernel=2, stride=2)
# Image size: 12 x 12 -> 8 x 8
conv2 = brew.conv(model, pool1, 'conv2', dim_in=20, dim_out=100, kernel=5)
# Image size: 8 x 8 -> 4 x 4
pool2 = brew.max_pool(model, conv2, 'pool2', kernel=2, stride=2)
with core.NameScope("classifier"):
# 50 * 4 * 4 stands for dim_out from previous layer multiplied by the image size
fc3 = brew.fc(model, pool2, 'fc3', dim_in=100 * 4 * 4, dim_out=500)
relu = brew.relu(model, fc3, fc3)
pred = brew.fc(model, relu, 'pred', 500, 10)
softmax = brew.softmax(model, pred, 'softmax')
xent = model.LabelCrossEntropy([softmax, "label"], 'xent')
# compute the expected loss
loss = model.AveragedLoss(xent, "loss")
model.net.RunAllOnMKL()
model.param_init_net.RunAllOnMKL()
model.AddGradientOperators([loss], skip=1)
blob_name_tracker = {}
graph = tb.model_to_graph_def(
model,
blob_name_tracker=blob_name_tracker,
shapes={},
show_simplified=False,
)
compare_proto(graph, self)
if __name__ == "__main__":
unittest.main()