#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from ThriftTest.ttypes import ( Bonk, Bools, LargeDeltas, ListBonks, NestedListsBonk, NestedListsI32x2, NestedListsI32x3, NestedMixedx2, Numberz, VersioningTestV1, VersioningTestV2, Xtruct, Xtruct2, ) from DebugProtoTest.ttypes import CompactProtoTestStruct, Empty from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol, TCompactProtocol, TJSONProtocol from thrift.TSerialization import serialize, deserialize import sys import unittest class AbstractTest(unittest.TestCase): def setUp(self): self.v1obj = VersioningTestV1( begin_in_both=12345, old_string='aaa', end_in_both=54321, ) self.v2obj = VersioningTestV2( begin_in_both=12345, newint=1, newbyte=2, newshort=3, newlong=4, newdouble=5.0, newstruct=Bonk(message="Hello!", type=123), newlist=[7, 8, 9], newset=set([42, 1, 8]), newmap={1: 2, 2: 3}, newstring="Hola!", end_in_both=54321, ) self.bools = Bools(im_true=True, im_false=False) self.bools_flipped = Bools(im_true=False, im_false=True) self.large_deltas = LargeDeltas( b1=self.bools, b10=self.bools_flipped, b100=self.bools, check_true=True, b1000=self.bools_flipped, check_false=False, vertwo2000=VersioningTestV2(newstruct=Bonk(message='World!', type=314)), a_set2500=set(['lazy', 'brown', 'cow']), vertwo3000=VersioningTestV2(newset=set([2, 3, 5, 7, 11])), big_numbers=[2 ** 8, 2 ** 16, 2 ** 31 - 1, -(2 ** 31 - 1)] ) self.compact_struct = CompactProtoTestStruct( a_byte=127, a_i16=32000, a_i32=1000000000, a_i64=0xffffffffff, a_double=5.6789, a_string="my string", true_field=True, false_field=False, empty_struct_field=Empty(), byte_list=[-127, -1, 0, 1, 127], i16_list=[-1, 0, 1, 0x7fff], i32_list=[-1, 0, 0xff, 0xffff, 0xffffff, 0x7fffffff], i64_list=[-1, 0, 0xff, 0xffff, 0xffffff, 0xffffffff, 0xffffffffff, 0xffffffffffff, 0xffffffffffffff, 0x7fffffffffffffff], double_list=[0.1, 0.2, 0.3], string_list=["first", "second", "third"], boolean_list=[True, True, True, False, False, False], struct_list=[Empty(), Empty()], byte_set=set([-127, -1, 0, 1, 127]), i16_set=set([-1, 0, 1, 0x7fff]), i32_set=set([1, 2, 3]), i64_set=set([-1, 0, 0xff, 0xffff, 0xffffff, 0xffffffff, 0xffffffffff, 0xffffffffffff, 0xffffffffffffff, 0x7fffffffffffffff]), double_set=set([0.1, 0.2, 0.3]), string_set=set(["first", "second", "third"]), boolean_set=set([True, False]), # struct_set=set([Empty()]), # unhashable instance byte_byte_map={1: 2}, i16_byte_map={1: 1, -1: 1, 0x7fff: 1}, i32_byte_map={1: 1, -1: 1, 0x7fffffff: 1}, i64_byte_map={0: 1, 1: 1, -1: 1, 0x7fffffffffffffff: 1}, double_byte_map={-1.1: 1, 1.1: 1}, string_byte_map={"first": 1, "second": 2, "third": 3, "": 0}, boolean_byte_map={True: 1, False: 0}, byte_i16_map={1: 1, 2: -1, 3: 0x7fff}, byte_i32_map={1: 1, 2: -1, 3: 0x7fffffff}, byte_i64_map={1: 1, 2: -1, 3: 0x7fffffffffffffff}, byte_double_map={1: 0.1, 2: -0.1, 3: 1000000.1}, byte_string_map={1: "", 2: "blah", 3: "loooooooooooooong string"}, byte_boolean_map={1: True, 2: False}, # list_byte_map # unhashable # set_byte_map={set([1, 2, 3]) : 1, set([0, 1]) : 2, set([]) : 0}, # unhashable # map_byte_map # unhashable byte_map_map={0: {}, 1: {1: 1}, 2: {1: 1, 2: 2}}, byte_set_map={0: set([]), 1: set([1]), 2: set([1, 2])}, byte_list_map={0: [], 1: [1], 2: [1, 2]}, ) self.nested_lists_i32x2 = NestedListsI32x2( [ [1, 1, 2], [2, 7, 9], [3, 5, 8] ] ) self.nested_lists_i32x3 = NestedListsI32x3( [ [ [2, 7, 9], [3, 5, 8] ], [ [1, 1, 2], [1, 4, 9] ] ] ) self.nested_mixedx2 = NestedMixedx2(int_set_list=[ set([1, 2, 3]), set([1, 4, 9]), set([1, 2, 3, 5, 8, 13, 21]), set([-1, 0, 1]) ], # note, the sets below are sets of chars, since the strings are iterated map_int_strset={10: set('abc'), 20: set('def'), 30: set('GHI')}, map_int_strset_list=[ {10: set('abc'), 20: set('def'), 30: set('GHI')}, {100: set('lmn'), 200: set('opq'), 300: set('RST')}, {1000: set('uvw'), 2000: set('wxy'), 3000: set('XYZ')}] ) self.nested_lists_bonk = NestedListsBonk( [ [ [ Bonk(message='inner A first', type=1), Bonk(message='inner A second', type=1) ], [ Bonk(message='inner B first', type=2), Bonk(message='inner B second', type=2) ] ] ] ) self.list_bonks = ListBonks( [ Bonk(message='inner A', type=1), Bonk(message='inner B', type=2), Bonk(message='inner C', type=0) ] ) def _serialize(self, obj): trans = TTransport.TMemoryBuffer() prot = self.protocol_factory.getProtocol(trans) obj.write(prot) return trans.getvalue() def _deserialize(self, objtype, data): prot = self.protocol_factory.getProtocol(TTransport.TMemoryBuffer(data)) ret = objtype() ret.read(prot) return ret def testForwards(self): obj = self._deserialize(VersioningTestV2, self._serialize(self.v1obj)) self.assertEquals(obj.begin_in_both, self.v1obj.begin_in_both) self.assertEquals(obj.end_in_both, self.v1obj.end_in_both) def testBackwards(self): obj = self._deserialize(VersioningTestV1, self._serialize(self.v2obj)) self.assertEquals(obj.begin_in_both, self.v2obj.begin_in_both) self.assertEquals(obj.end_in_both, self.v2obj.end_in_both) def testSerializeV1(self): obj = self._deserialize(VersioningTestV1, self._serialize(self.v1obj)) self.assertEquals(obj, self.v1obj) def testSerializeV2(self): obj = self._deserialize(VersioningTestV2, self._serialize(self.v2obj)) self.assertEquals(obj, self.v2obj) def testBools(self): self.assertNotEquals(self.bools, self.bools_flipped) self.assertNotEquals(self.bools, self.v1obj) obj = self._deserialize(Bools, self._serialize(self.bools)) self.assertEquals(obj, self.bools) obj = self._deserialize(Bools, self._serialize(self.bools_flipped)) self.assertEquals(obj, self.bools_flipped) rep = repr(self.bools) self.assertTrue(len(rep) > 0) def testLargeDeltas(self): # test large field deltas (meaningful in CompactProto only) obj = self._deserialize(LargeDeltas, self._serialize(self.large_deltas)) self.assertEquals(obj, self.large_deltas) rep = repr(self.large_deltas) self.assertTrue(len(rep) > 0) def testNestedListsI32x2(self): obj = self._deserialize(NestedListsI32x2, self._serialize(self.nested_lists_i32x2)) self.assertEquals(obj, self.nested_lists_i32x2) rep = repr(self.nested_lists_i32x2) self.assertTrue(len(rep) > 0) def testNestedListsI32x3(self): obj = self._deserialize(NestedListsI32x3, self._serialize(self.nested_lists_i32x3)) self.assertEquals(obj, self.nested_lists_i32x3) rep = repr(self.nested_lists_i32x3) self.assertTrue(len(rep) > 0) def testNestedMixedx2(self): obj = self._deserialize(NestedMixedx2, self._serialize(self.nested_mixedx2)) self.assertEquals(obj, self.nested_mixedx2) rep = repr(self.nested_mixedx2) self.assertTrue(len(rep) > 0) def testNestedListsBonk(self): obj = self._deserialize(NestedListsBonk, self._serialize(self.nested_lists_bonk)) self.assertEquals(obj, self.nested_lists_bonk) rep = repr(self.nested_lists_bonk) self.assertTrue(len(rep) > 0) def testListBonks(self): obj = self._deserialize(ListBonks, self._serialize(self.list_bonks)) self.assertEquals(obj, self.list_bonks) rep = repr(self.list_bonks) self.assertTrue(len(rep) > 0) def testCompactStruct(self): # test large field deltas (meaningful in CompactProto only) obj = self._deserialize(CompactProtoTestStruct, self._serialize(self.compact_struct)) self.assertEquals(obj, self.compact_struct) rep = repr(self.compact_struct) self.assertTrue(len(rep) > 0) def testIntegerLimits(self): if (sys.version_info[0] == 2 and sys.version_info[1] <= 6): print('Skipping testIntegerLimits for Python 2.6') return bad_values = [CompactProtoTestStruct(a_byte=128), CompactProtoTestStruct(a_byte=-129), CompactProtoTestStruct(a_i16=32768), CompactProtoTestStruct(a_i16=-32769), CompactProtoTestStruct(a_i32=2147483648), CompactProtoTestStruct(a_i32=-2147483649), CompactProtoTestStruct(a_i64=9223372036854775808), CompactProtoTestStruct(a_i64=-9223372036854775809) ] for value in bad_values: self.assertRaises(Exception, self._serialize, value) class NormalBinaryTest(AbstractTest): protocol_factory = TBinaryProtocol.TBinaryProtocolFactory() class AcceleratedBinaryTest(AbstractTest): protocol_factory = TBinaryProtocol.TBinaryProtocolAcceleratedFactory(fallback=False) class CompactProtocolTest(AbstractTest): protocol_factory = TCompactProtocol.TCompactProtocolFactory() class AcceleratedCompactTest(AbstractTest): protocol_factory = TCompactProtocol.TCompactProtocolAcceleratedFactory(fallback=False) class JSONProtocolTest(AbstractTest): protocol_factory = TJSONProtocol.TJSONProtocolFactory() class AcceleratedFramedTest(unittest.TestCase): def testSplit(self): """Test FramedTransport and BinaryProtocolAccelerated Tests that TBinaryProtocolAccelerated and TFramedTransport play nicely together when a read spans a frame""" protocol_factory = TBinaryProtocol.TBinaryProtocolAcceleratedFactory() bigstring = "".join(chr(byte) for byte in range(ord("a"), ord("z") + 1)) databuf = TTransport.TMemoryBuffer() prot = protocol_factory.getProtocol(databuf) prot.writeI32(42) prot.writeString(bigstring) prot.writeI16(24) data = databuf.getvalue() cutpoint = len(data) // 2 parts = [data[:cutpoint], data[cutpoint:]] framed_buffer = TTransport.TMemoryBuffer() framed_writer = TTransport.TFramedTransport(framed_buffer) for part in parts: framed_writer.write(part) framed_writer.flush() self.assertEquals(len(framed_buffer.getvalue()), len(data) + 8) # Recreate framed_buffer so we can read from it. framed_buffer = TTransport.TMemoryBuffer(framed_buffer.getvalue()) framed_reader = TTransport.TFramedTransport(framed_buffer) prot = protocol_factory.getProtocol(framed_reader) self.assertEqual(prot.readI32(), 42) self.assertEqual(prot.readString(), bigstring) self.assertEqual(prot.readI16(), 24) class SerializersTest(unittest.TestCase): def testSerializeThenDeserialize(self): obj = Xtruct2(i32_thing=1, struct_thing=Xtruct(string_thing="foo")) s1 = serialize(obj) for i in range(10): self.assertEquals(s1, serialize(obj)) objcopy = Xtruct2() deserialize(objcopy, serialize(obj)) self.assertEquals(obj, objcopy) obj = Xtruct(string_thing="bar") objcopy = Xtruct() deserialize(objcopy, serialize(obj)) self.assertEquals(obj, objcopy) # test booleans obj = Bools(im_true=True, im_false=False) objcopy = Bools() deserialize(objcopy, serialize(obj)) self.assertEquals(obj, objcopy) # test enums for num, name in Numberz._VALUES_TO_NAMES.items(): obj = Bonk(message='enum Numberz value %d is string %s' % (num, name), type=num) objcopy = Bonk() deserialize(objcopy, serialize(obj)) self.assertEquals(obj, objcopy) def suite(): suite = unittest.TestSuite() loader = unittest.TestLoader() suite.addTest(loader.loadTestsFromTestCase(NormalBinaryTest)) suite.addTest(loader.loadTestsFromTestCase(AcceleratedBinaryTest)) suite.addTest(loader.loadTestsFromTestCase(AcceleratedCompactTest)) suite.addTest(loader.loadTestsFromTestCase(CompactProtocolTest)) suite.addTest(loader.loadTestsFromTestCase(JSONProtocolTest)) suite.addTest(loader.loadTestsFromTestCase(AcceleratedFramedTest)) suite.addTest(loader.loadTestsFromTestCase(SerializersTest)) return suite if __name__ == "__main__": unittest.main(defaultTest="suite", testRunner=unittest.TextTestRunner(verbosity=2))