|
from __future__ import absolute_import |
|
from __future__ import division |
|
from __future__ import print_function |
|
from __future__ import unicode_literals |
|
|
|
from tensorboardX import SummaryWriter |
|
import os |
|
import unittest |
|
|
|
|
|
import numpy as np |
|
import caffe2.python.brew as brew |
|
import caffe2.python.cnn as cnn |
|
import caffe2.python.core as core |
|
import caffe2.python.model_helper as model_helper |
|
from caffe2.proto import caffe2_pb2 |
|
from caffe2.python import workspace |
|
import tensorboardX.caffe2_graph as tb |
|
from tensorboardX import x2num |
|
from .expect_reader import compare_proto, write_proto |
|
|
|
|
|
class Caffe2Test(unittest.TestCase): |
|
def test_caffe2_np(self): |
|
workspace.FeedBlob("testBlob", np.random.randn(1, 3, 64, 64).astype(np.float32)) |
|
assert isinstance(x2num.make_np('testBlob'), np.ndarray) |
|
|
|
|
|
def test_that_operators_gets_non_colliding_names(self): |
|
op = caffe2_pb2.OperatorDef() |
|
op.type = 'foo' |
|
op.input.extend(['foo']) |
|
tb._fill_missing_operator_names([op]) |
|
self.assertEqual(op.input[0], 'foo') |
|
self.assertEqual(op.name, 'foo_1') |
|
|
|
def test_that_replacing_colons_gives_non_colliding_names(self): |
|
|
|
op = caffe2_pb2.OperatorDef() |
|
op.name = 'foo:0' |
|
op.input.extend(['foo:0', 'foo$0']) |
|
shapes = {'foo:0': [1]} |
|
blob_name_tracker = tb._get_blob_names([op]) |
|
tb._replace_colons(shapes, blob_name_tracker, [op], '$') |
|
self.assertEqual(op.input[0], 'foo$0') |
|
self.assertEqual(op.input[1], 'foo$0_1') |
|
|
|
|
|
self.assertEqual(op.name, 'foo$0') |
|
self.assertEqual(len(shapes), 1) |
|
self.assertEqual(shapes['foo$0'], [1]) |
|
self.assertEqual(len(blob_name_tracker), 2) |
|
self.assertEqual(blob_name_tracker['foo$0'], 'foo:0') |
|
self.assertEqual(blob_name_tracker['foo$0_1'], 'foo$0') |
|
|
|
def test_that_adding_gradient_scope_does_no_fancy_renaming(self): |
|
|
|
op = caffe2_pb2.OperatorDef() |
|
op.name = 'foo_grad' |
|
op.input.extend(['foo_grad', 'foo_grad_1']) |
|
shapes = {'foo_grad': [1]} |
|
blob_name_tracker = tb._get_blob_names([op]) |
|
tb._add_gradient_scope(shapes, blob_name_tracker, [op]) |
|
self.assertEqual(op.input[0], 'GRADIENTS/foo_grad') |
|
self.assertEqual(op.input[1], 'GRADIENTS/foo_grad_1') |
|
self.assertEqual(op.name, 'GRADIENTS/foo_grad') |
|
self.assertEqual(len(shapes), 1) |
|
self.assertEqual(shapes['GRADIENTS/foo_grad'], [1]) |
|
self.assertEqual(len(blob_name_tracker), 2) |
|
self.assertEqual( |
|
blob_name_tracker['GRADIENTS/foo_grad'], 'foo_grad') |
|
self.assertEqual( |
|
blob_name_tracker['GRADIENTS/foo_grad_1'], 'foo_grad_1') |
|
|
|
def test_that_auto_ssa_gives_non_colliding_names(self): |
|
op1 = caffe2_pb2.OperatorDef() |
|
op1.output.extend(['foo']) |
|
op2 = caffe2_pb2.OperatorDef() |
|
op2.input.extend(['foo']) |
|
op2.output.extend(['foo']) |
|
op2.output.extend(['foo_1']) |
|
shapes = {'foo': [1], 'foo_1': [2]} |
|
blob_name_tracker = tb._get_blob_names([op1, op2]) |
|
tb._convert_to_ssa(shapes, blob_name_tracker, [op1, op2]) |
|
self.assertEqual(op1.output[0], 'foo') |
|
self.assertEqual(op2.input[0], 'foo') |
|
self.assertEqual(op2.output[0], 'foo_1') |
|
|
|
self.assertEqual(op2.output[1], 'foo_1_1') |
|
self.assertEqual(len(shapes), 3) |
|
self.assertEqual(shapes['foo'], [1]) |
|
self.assertEqual(shapes['foo_1'], [1]) |
|
self.assertEqual(shapes['foo_1_1'], [2]) |
|
self.assertEqual(len(blob_name_tracker), 3) |
|
self.assertEqual(blob_name_tracker['foo'], 'foo') |
|
self.assertEqual(blob_name_tracker['foo_1'], 'foo') |
|
self.assertEqual(blob_name_tracker['foo_1_1'], 'foo_1') |
|
|
|
def test_renaming_tensorflow_style(self): |
|
|
|
|
|
|
|
|
|
op1 = caffe2_pb2.OperatorDef() |
|
op1.input.extend(['foo_w']) |
|
op1.output.extend(['foo_w_2']) |
|
|
|
|
|
op2 = caffe2_pb2.OperatorDef() |
|
op2.input.extend(['foo_bn']) |
|
op2.output.extend(['foo_bn_2']) |
|
|
|
op3 = caffe2_pb2.OperatorDef() |
|
op3.input.extend(['foo_b']) |
|
op3.output.extend(['foo_b_2']) |
|
|
|
op4 = caffe2_pb2.OperatorDef() |
|
op4.input.extend(['foo_s']) |
|
op4.output.extend(['foo_s_2']) |
|
|
|
op5 = caffe2_pb2.OperatorDef() |
|
op5.input.extend(['foo_sum']) |
|
op5.output.extend(['foo_sum_2']) |
|
|
|
|
|
op6 = caffe2_pb2.OperatorDef() |
|
op6.input.extend(['foo_branch']) |
|
op6.input.extend(['test_branch_2']) |
|
op6.output.extend(['foo_branch_3']) |
|
op6.output.extend(['test_branch4']) |
|
shapes = { |
|
'foo_w': [1], 'foo_w_2': [2], 'foo_bn': [3], 'foo_bn_2': [4], |
|
'foo_b': [5], 'foo_b_2': [6], 'foo_s': [7], 'foo_s_2': [8], |
|
'foo_sum': [9], 'foo_sum_2': [10], 'foo_branch': [11], |
|
'test_branch_2': [12], 'foo_branch_3': [13], 'test_branch4': [14], |
|
} |
|
ops = [op1, op2, op3, op4, op5, op6] |
|
blob_name_tracker = tb._get_blob_names(ops) |
|
tb._rename_tensorflow_style(shapes, blob_name_tracker, ops) |
|
|
|
self.assertEqual(blob_name_tracker['foo/weight'], 'foo_w') |
|
self.assertEqual(blob_name_tracker['foo/weight_2'], 'foo_w_2') |
|
self.assertEqual(blob_name_tracker['foo/batchnorm'], 'foo_bn') |
|
self.assertEqual(blob_name_tracker['foo/batchnorm_2'], 'foo_bn_2') |
|
self.assertEqual(blob_name_tracker['foo/bias'], 'foo_b') |
|
self.assertEqual(blob_name_tracker['foo/bias_2'], 'foo_b_2') |
|
self.assertEqual(blob_name_tracker['foo/scale'], 'foo_s') |
|
self.assertEqual(blob_name_tracker['foo/scale_2'], 'foo_s_2') |
|
self.assertEqual(blob_name_tracker['foo/sum'], 'foo_sum') |
|
self.assertEqual(blob_name_tracker['foo/sum_2'], 'foo_sum_2') |
|
self.assertEqual(blob_name_tracker['foo/branch'], 'foo_branch') |
|
self.assertEqual(blob_name_tracker['test/branch_2'], 'test_branch_2') |
|
self.assertEqual(blob_name_tracker['foo/branch_3'], 'foo_branch_3') |
|
self.assertEqual(blob_name_tracker['test/branch4'], 'test_branch4') |
|
|
|
self.assertEqual(shapes['foo/weight'], [1]) |
|
self.assertEqual(shapes['foo/batchnorm_2'], [4]) |
|
self.assertEqual(shapes['foo/sum'], [9]) |
|
self.assertEqual(shapes['test/branch_2'], [12]) |
|
|
|
self.assertEqual(op1.input[0], 'foo/weight') |
|
self.assertEqual(op1.output[0], 'foo/weight_2') |
|
self.assertEqual(op2.input[0], 'foo/batchnorm') |
|
self.assertEqual(op2.output[0], 'foo/batchnorm_2') |
|
self.assertEqual(op3.input[0], 'foo/bias') |
|
self.assertEqual(op3.output[0], 'foo/bias_2') |
|
self.assertEqual(op4.input[0], 'foo/scale') |
|
self.assertEqual(op4.output[0], 'foo/scale_2') |
|
self.assertEqual(op5.input[0], 'foo/sum') |
|
self.assertEqual(op5.output[0], 'foo/sum_2') |
|
self.assertEqual(op6.input[0], 'foo/branch') |
|
self.assertEqual(op6.input[1], 'test/branch_2') |
|
self.assertEqual(op6.output[0], 'foo/branch_3') |
|
self.assertEqual(op6.output[1], 'test/branch4') |
|
|
|
def test_filter_ops(self): |
|
op1 = caffe2_pb2.OperatorDef() |
|
op1.input.extend(['remove_this']) |
|
op1.output.extend(['random_output']) |
|
op2 = caffe2_pb2.OperatorDef() |
|
op2.input.extend(['leave_this']) |
|
op2.output.extend(['leave_this_also']) |
|
op3 = caffe2_pb2.OperatorDef() |
|
op3.input.extend(['random_input']) |
|
op3.output.extend(['remove_this_also']) |
|
|
|
def filter_fn(blob): |
|
|
|
return 'remove' not in str(blob) |
|
|
|
op_set1 = [op1, op2, op3] |
|
op_set2 = [op1, op2, op3] |
|
|
|
|
|
result_ops1 = tb._filter_ops(op_set1, filter_fn, True) |
|
new_op1, new_op2 = result_ops1[0], result_ops1[1] |
|
|
|
self.assertEqual(len(new_op1.input), 0) |
|
self.assertEqual(new_op1.output, ['random_output']) |
|
self.assertEqual(new_op2.input, ['leave_this']) |
|
self.assertEqual(new_op2.output, ['leave_this_also']) |
|
|
|
|
|
|
|
self.assertEqual(len(result_ops1), 2) |
|
|
|
|
|
|
|
result_ops2 = tb._filter_ops(op_set2, filter_fn, False) |
|
self.assertEqual(result_ops2, op_set2) |
|
|
|
|
|
|
|
|
|
def test_simple_cnnmodel(self): |
|
model = cnn.CNNModelHelper("NCHW", name="overfeat") |
|
workspace.FeedBlob("data", np.random.randn(1, 3, 64, 64).astype(np.float32)) |
|
workspace.FeedBlob("label", np.random.randn(1, 1000).astype(np.int)) |
|
with core.NameScope("conv1"): |
|
conv1 = model.Conv("data", "conv1", 3, 96, 11, stride=4) |
|
relu1 = model.Relu(conv1, conv1) |
|
pool1 = model.MaxPool(relu1, "pool1", kernel=2, stride=2) |
|
with core.NameScope("classifier"): |
|
fc = model.FC(pool1, "fc", 4096, 1000) |
|
pred = model.Softmax(fc, "pred") |
|
xent = model.LabelCrossEntropy([pred, "label"], "xent") |
|
loss = model.AveragedLoss(xent, "loss") |
|
|
|
blob_name_tracker = {} |
|
graph = tb.model_to_graph_def( |
|
model, |
|
blob_name_tracker=blob_name_tracker, |
|
shapes={}, |
|
show_simplified=False, |
|
) |
|
|
|
compare_proto(graph, self) |
|
|
|
|
|
|
|
|
|
def test_simple_model(self): |
|
model = model_helper.ModelHelper(name="mnist") |
|
|
|
workspace.FeedBlob("data", np.random.randn(1, 3, 64, 64).astype(np.float32)) |
|
workspace.FeedBlob("label", np.random.randn(1, 1000).astype(np.int)) |
|
|
|
with core.NameScope("conv1"): |
|
conv1 = brew.conv(model, "data", 'conv1', dim_in=1, dim_out=20, kernel=5) |
|
|
|
pool1 = brew.max_pool(model, conv1, 'pool1', kernel=2, stride=2) |
|
|
|
conv2 = brew.conv(model, pool1, 'conv2', dim_in=20, dim_out=100, kernel=5) |
|
|
|
pool2 = brew.max_pool(model, conv2, 'pool2', kernel=2, stride=2) |
|
with core.NameScope("classifier"): |
|
|
|
fc3 = brew.fc(model, pool2, 'fc3', dim_in=100 * 4 * 4, dim_out=500) |
|
relu = brew.relu(model, fc3, fc3) |
|
pred = brew.fc(model, relu, 'pred', 500, 10) |
|
softmax = brew.softmax(model, pred, 'softmax') |
|
xent = model.LabelCrossEntropy([softmax, "label"], 'xent') |
|
|
|
loss = model.AveragedLoss(xent, "loss") |
|
model.net.RunAllOnMKL() |
|
model.param_init_net.RunAllOnMKL() |
|
model.AddGradientOperators([loss], skip=1) |
|
blob_name_tracker = {} |
|
graph = tb.model_to_graph_def( |
|
model, |
|
blob_name_tracker=blob_name_tracker, |
|
shapes={}, |
|
show_simplified=False, |
|
) |
|
|
|
compare_proto(graph, self) |
|
|
|
|
|
if __name__ == "__main__": |
|
unittest.main() |
|
|