#!/usr/bin/env python

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

from ThriftTest.ttypes import (
    Bonk,
    Bools,
    LargeDeltas,
    ListBonks,
    NestedListsBonk,
    NestedListsI32x2,
    NestedListsI32x3,
    NestedMixedx2,
    Numberz,
    VersioningTestV1,
    VersioningTestV2,
    Xtruct,
    Xtruct2,
)

from DebugProtoTest.ttypes import CompactProtoTestStruct, Empty
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol, TCompactProtocol, TJSONProtocol
from thrift.TSerialization import serialize, deserialize
import sys
import unittest


class AbstractTest(unittest.TestCase):

    def setUp(self):
        self.v1obj = VersioningTestV1(
            begin_in_both=12345,
            old_string='aaa',
            end_in_both=54321,
        )

        self.v2obj = VersioningTestV2(
            begin_in_both=12345,
            newint=1,
            newbyte=2,
            newshort=3,
            newlong=4,
            newdouble=5.0,
            newstruct=Bonk(message="Hello!", type=123),
            newlist=[7, 8, 9],
            newset=set([42, 1, 8]),
            newmap={1: 2, 2: 3},
            newstring="Hola!",
            end_in_both=54321,
        )

        self.bools = Bools(im_true=True, im_false=False)
        self.bools_flipped = Bools(im_true=False, im_false=True)

        self.large_deltas = LargeDeltas(
            b1=self.bools,
            b10=self.bools_flipped,
            b100=self.bools,
            check_true=True,
            b1000=self.bools_flipped,
            check_false=False,
            vertwo2000=VersioningTestV2(newstruct=Bonk(message='World!', type=314)),
            a_set2500=set(['lazy', 'brown', 'cow']),
            vertwo3000=VersioningTestV2(newset=set([2, 3, 5, 7, 11])),
            big_numbers=[2 ** 8, 2 ** 16, 2 ** 31 - 1, -(2 ** 31 - 1)]
        )

        self.compact_struct = CompactProtoTestStruct(
            a_byte=127,
            a_i16=32000,
            a_i32=1000000000,
            a_i64=0xffffffffff,
            a_double=5.6789,
            a_string="my string",
            true_field=True,
            false_field=False,
            empty_struct_field=Empty(),
            byte_list=[-127, -1, 0, 1, 127],
            i16_list=[-1, 0, 1, 0x7fff],
            i32_list=[-1, 0, 0xff, 0xffff, 0xffffff, 0x7fffffff],
            i64_list=[-1, 0, 0xff, 0xffff, 0xffffff, 0xffffffff, 0xffffffffff, 0xffffffffffff, 0xffffffffffffff, 0x7fffffffffffffff],
            double_list=[0.1, 0.2, 0.3],
            string_list=["first", "second", "third"],
            boolean_list=[True, True, True, False, False, False],
            struct_list=[Empty(), Empty()],
            byte_set=set([-127, -1, 0, 1, 127]),
            i16_set=set([-1, 0, 1, 0x7fff]),
            i32_set=set([1, 2, 3]),
            i64_set=set([-1, 0, 0xff, 0xffff, 0xffffff, 0xffffffff, 0xffffffffff, 0xffffffffffff, 0xffffffffffffff, 0x7fffffffffffffff]),
            double_set=set([0.1, 0.2, 0.3]),
            string_set=set(["first", "second", "third"]),
            boolean_set=set([True, False]),
            # struct_set=set([Empty()]), # unhashable instance
            byte_byte_map={1: 2},
            i16_byte_map={1: 1, -1: 1, 0x7fff: 1},
            i32_byte_map={1: 1, -1: 1, 0x7fffffff: 1},
            i64_byte_map={0: 1, 1: 1, -1: 1, 0x7fffffffffffffff: 1},
            double_byte_map={-1.1: 1, 1.1: 1},
            string_byte_map={"first": 1, "second": 2, "third": 3, "": 0},
            boolean_byte_map={True: 1, False: 0},
            byte_i16_map={1: 1, 2: -1, 3: 0x7fff},
            byte_i32_map={1: 1, 2: -1, 3: 0x7fffffff},
            byte_i64_map={1: 1, 2: -1, 3: 0x7fffffffffffffff},
            byte_double_map={1: 0.1, 2: -0.1, 3: 1000000.1},
            byte_string_map={1: "", 2: "blah", 3: "loooooooooooooong string"},
            byte_boolean_map={1: True, 2: False},
            # list_byte_map # unhashable
            # set_byte_map={set([1, 2, 3]) : 1, set([0, 1]) : 2, set([]) : 0}, # unhashable
            # map_byte_map # unhashable
            byte_map_map={0: {}, 1: {1: 1}, 2: {1: 1, 2: 2}},
            byte_set_map={0: set([]), 1: set([1]), 2: set([1, 2])},
            byte_list_map={0: [], 1: [1], 2: [1, 2]},
        )

        self.nested_lists_i32x2 = NestedListsI32x2(
            [
                [1, 1, 2],
                [2, 7, 9],
                [3, 5, 8]
            ]
        )

        self.nested_lists_i32x3 = NestedListsI32x3(
            [
                [
                    [2, 7, 9],
                    [3, 5, 8]
                ],
                [
                    [1, 1, 2],
                    [1, 4, 9]
                ]
            ]
        )

        self.nested_mixedx2 = NestedMixedx2(int_set_list=[
            set([1, 2, 3]),
            set([1, 4, 9]),
            set([1, 2, 3, 5, 8, 13, 21]),
            set([-1, 0, 1])
        ],
            # note, the sets below are sets of chars, since the strings are iterated
            map_int_strset={10: set('abc'), 20: set('def'), 30: set('GHI')},
            map_int_strset_list=[
                {10: set('abc'), 20: set('def'), 30: set('GHI')},
                {100: set('lmn'), 200: set('opq'), 300: set('RST')},
                {1000: set('uvw'), 2000: set('wxy'), 3000: set('XYZ')}]
        )

        self.nested_lists_bonk = NestedListsBonk(
            [
                [
                    [
                        Bonk(message='inner A first', type=1),
                        Bonk(message='inner A second', type=1)
                    ],
                    [
                        Bonk(message='inner B first', type=2),
                        Bonk(message='inner B second', type=2)
                    ]
                ]
            ]
        )

        self.list_bonks = ListBonks(
            [
                Bonk(message='inner A', type=1),
                Bonk(message='inner B', type=2),
                Bonk(message='inner C', type=0)
            ]
        )

    def _serialize(self, obj):
        trans = TTransport.TMemoryBuffer()
        prot = self.protocol_factory.getProtocol(trans)
        obj.write(prot)
        return trans.getvalue()

    def _deserialize(self, objtype, data):
        prot = self.protocol_factory.getProtocol(TTransport.TMemoryBuffer(data))
        ret = objtype()
        ret.read(prot)
        return ret

    def testForwards(self):
        obj = self._deserialize(VersioningTestV2, self._serialize(self.v1obj))
        self.assertEquals(obj.begin_in_both, self.v1obj.begin_in_both)
        self.assertEquals(obj.end_in_both, self.v1obj.end_in_both)

    def testBackwards(self):
        obj = self._deserialize(VersioningTestV1, self._serialize(self.v2obj))
        self.assertEquals(obj.begin_in_both, self.v2obj.begin_in_both)
        self.assertEquals(obj.end_in_both, self.v2obj.end_in_both)

    def testSerializeV1(self):
        obj = self._deserialize(VersioningTestV1, self._serialize(self.v1obj))
        self.assertEquals(obj, self.v1obj)

    def testSerializeV2(self):
        obj = self._deserialize(VersioningTestV2, self._serialize(self.v2obj))
        self.assertEquals(obj, self.v2obj)

    def testBools(self):
        self.assertNotEquals(self.bools, self.bools_flipped)
        self.assertNotEquals(self.bools, self.v1obj)
        obj = self._deserialize(Bools, self._serialize(self.bools))
        self.assertEquals(obj, self.bools)
        obj = self._deserialize(Bools, self._serialize(self.bools_flipped))
        self.assertEquals(obj, self.bools_flipped)
        rep = repr(self.bools)
        self.assertTrue(len(rep) > 0)

    def testLargeDeltas(self):
        # test large field deltas (meaningful in CompactProto only)
        obj = self._deserialize(LargeDeltas, self._serialize(self.large_deltas))
        self.assertEquals(obj, self.large_deltas)
        rep = repr(self.large_deltas)
        self.assertTrue(len(rep) > 0)

    def testNestedListsI32x2(self):
        obj = self._deserialize(NestedListsI32x2, self._serialize(self.nested_lists_i32x2))
        self.assertEquals(obj, self.nested_lists_i32x2)
        rep = repr(self.nested_lists_i32x2)
        self.assertTrue(len(rep) > 0)

    def testNestedListsI32x3(self):
        obj = self._deserialize(NestedListsI32x3, self._serialize(self.nested_lists_i32x3))
        self.assertEquals(obj, self.nested_lists_i32x3)
        rep = repr(self.nested_lists_i32x3)
        self.assertTrue(len(rep) > 0)

    def testNestedMixedx2(self):
        obj = self._deserialize(NestedMixedx2, self._serialize(self.nested_mixedx2))
        self.assertEquals(obj, self.nested_mixedx2)
        rep = repr(self.nested_mixedx2)
        self.assertTrue(len(rep) > 0)

    def testNestedListsBonk(self):
        obj = self._deserialize(NestedListsBonk, self._serialize(self.nested_lists_bonk))
        self.assertEquals(obj, self.nested_lists_bonk)
        rep = repr(self.nested_lists_bonk)
        self.assertTrue(len(rep) > 0)

    def testListBonks(self):
        obj = self._deserialize(ListBonks, self._serialize(self.list_bonks))
        self.assertEquals(obj, self.list_bonks)
        rep = repr(self.list_bonks)
        self.assertTrue(len(rep) > 0)

    def testCompactStruct(self):
        # test large field deltas (meaningful in CompactProto only)
        obj = self._deserialize(CompactProtoTestStruct, self._serialize(self.compact_struct))
        self.assertEquals(obj, self.compact_struct)
        rep = repr(self.compact_struct)
        self.assertTrue(len(rep) > 0)

    def testIntegerLimits(self):
        if (sys.version_info[0] == 2 and sys.version_info[1] <= 6):
            print('Skipping testIntegerLimits for Python 2.6')
            return
        bad_values = [CompactProtoTestStruct(a_byte=128), CompactProtoTestStruct(a_byte=-129),
                      CompactProtoTestStruct(a_i16=32768), CompactProtoTestStruct(a_i16=-32769),
                      CompactProtoTestStruct(a_i32=2147483648), CompactProtoTestStruct(a_i32=-2147483649),
                      CompactProtoTestStruct(a_i64=9223372036854775808), CompactProtoTestStruct(a_i64=-9223372036854775809)
                      ]

        for value in bad_values:
            self.assertRaises(Exception, self._serialize, value)


class NormalBinaryTest(AbstractTest):
    protocol_factory = TBinaryProtocol.TBinaryProtocolFactory()


class AcceleratedBinaryTest(AbstractTest):
    protocol_factory = TBinaryProtocol.TBinaryProtocolAcceleratedFactory(fallback=False)


class CompactProtocolTest(AbstractTest):
    protocol_factory = TCompactProtocol.TCompactProtocolFactory()


class AcceleratedCompactTest(AbstractTest):
    protocol_factory = TCompactProtocol.TCompactProtocolAcceleratedFactory(fallback=False)


class JSONProtocolTest(AbstractTest):
    protocol_factory = TJSONProtocol.TJSONProtocolFactory()


class AcceleratedFramedTest(unittest.TestCase):
    def testSplit(self):
        """Test FramedTransport and BinaryProtocolAccelerated

        Tests that TBinaryProtocolAccelerated and TFramedTransport
        play nicely together when a read spans a frame"""

        protocol_factory = TBinaryProtocol.TBinaryProtocolAcceleratedFactory()
        bigstring = "".join(chr(byte) for byte in range(ord("a"), ord("z") + 1))

        databuf = TTransport.TMemoryBuffer()
        prot = protocol_factory.getProtocol(databuf)
        prot.writeI32(42)
        prot.writeString(bigstring)
        prot.writeI16(24)
        data = databuf.getvalue()
        cutpoint = len(data) // 2
        parts = [data[:cutpoint], data[cutpoint:]]

        framed_buffer = TTransport.TMemoryBuffer()
        framed_writer = TTransport.TFramedTransport(framed_buffer)
        for part in parts:
            framed_writer.write(part)
            framed_writer.flush()
        self.assertEquals(len(framed_buffer.getvalue()), len(data) + 8)

        # Recreate framed_buffer so we can read from it.
        framed_buffer = TTransport.TMemoryBuffer(framed_buffer.getvalue())
        framed_reader = TTransport.TFramedTransport(framed_buffer)
        prot = protocol_factory.getProtocol(framed_reader)
        self.assertEqual(prot.readI32(), 42)
        self.assertEqual(prot.readString(), bigstring)
        self.assertEqual(prot.readI16(), 24)


class SerializersTest(unittest.TestCase):

    def testSerializeThenDeserialize(self):
        obj = Xtruct2(i32_thing=1,
                      struct_thing=Xtruct(string_thing="foo"))

        s1 = serialize(obj)
        for i in range(10):
            self.assertEquals(s1, serialize(obj))
            objcopy = Xtruct2()
            deserialize(objcopy, serialize(obj))
            self.assertEquals(obj, objcopy)

        obj = Xtruct(string_thing="bar")
        objcopy = Xtruct()
        deserialize(objcopy, serialize(obj))
        self.assertEquals(obj, objcopy)

        # test booleans
        obj = Bools(im_true=True, im_false=False)
        objcopy = Bools()
        deserialize(objcopy, serialize(obj))
        self.assertEquals(obj, objcopy)

        # test enums
        for num, name in Numberz._VALUES_TO_NAMES.items():
            obj = Bonk(message='enum Numberz value %d is string %s' % (num, name), type=num)
            objcopy = Bonk()
            deserialize(objcopy, serialize(obj))
            self.assertEquals(obj, objcopy)


def suite():
    suite = unittest.TestSuite()
    loader = unittest.TestLoader()

    suite.addTest(loader.loadTestsFromTestCase(NormalBinaryTest))
    suite.addTest(loader.loadTestsFromTestCase(AcceleratedBinaryTest))
    suite.addTest(loader.loadTestsFromTestCase(AcceleratedCompactTest))
    suite.addTest(loader.loadTestsFromTestCase(CompactProtocolTest))
    suite.addTest(loader.loadTestsFromTestCase(JSONProtocolTest))
    suite.addTest(loader.loadTestsFromTestCase(AcceleratedFramedTest))
    suite.addTest(loader.loadTestsFromTestCase(SerializersTest))
    return suite

if __name__ == "__main__":
    unittest.main(defaultTest="suite", testRunner=unittest.TextTestRunner(verbosity=2))