#!/usr/bin/env python

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

from ThriftTest.ttypes import Xtruct
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.protocol import TCompactProtocol
import unittest


class TestEof(unittest.TestCase):

    def make_data(self, pfactory=None):
        trans = TTransport.TMemoryBuffer()
        if pfactory:
            prot = pfactory.getProtocol(trans)
        else:
            prot = TBinaryProtocol.TBinaryProtocol(trans)

        x = Xtruct()
        x.string_thing = "Zero"
        x.byte_thing = 0

        x.write(prot)

        x = Xtruct()
        x.string_thing = "One"
        x.byte_thing = 1

        x.write(prot)

        return trans.getvalue()

    def testTransportReadAll(self):
        """Test that readAll on any type of transport throws an EOFError"""
        trans = TTransport.TMemoryBuffer(self.make_data())
        trans.readAll(1)

        try:
            trans.readAll(10000)
        except EOFError:
            return

        self.fail("Should have gotten EOFError")

    def eofTestHelper(self, pfactory):
        trans = TTransport.TMemoryBuffer(self.make_data(pfactory))
        prot = pfactory.getProtocol(trans)

        x = Xtruct()
        x.read(prot)
        self.assertEqual(x.string_thing, "Zero")
        self.assertEqual(x.byte_thing, 0)

        x = Xtruct()
        x.read(prot)
        self.assertEqual(x.string_thing, "One")
        self.assertEqual(x.byte_thing, 1)

        try:
            x = Xtruct()
            x.read(prot)
        except EOFError:
            return

        self.fail("Should have gotten EOFError")

    def eofTestHelperStress(self, pfactory):
        """Test the ability of TBinaryProtocol to deal with the removal of every byte in the file"""
        # TODO: we should make sure this covers more of the code paths

        data = self.make_data(pfactory)
        for i in range(0, len(data) + 1):
            trans = TTransport.TMemoryBuffer(data[0:i])
            prot = pfactory.getProtocol(trans)
            try:
                x = Xtruct()
                x.read(prot)
                x.read(prot)
                x.read(prot)
            except EOFError:
                continue
            self.fail("Should have gotten an EOFError")

    def testBinaryProtocolEof(self):
        """Test that TBinaryProtocol throws an EOFError when it reaches the end of the stream"""
        self.eofTestHelper(TBinaryProtocol.TBinaryProtocolFactory())
        self.eofTestHelperStress(TBinaryProtocol.TBinaryProtocolFactory())

    def testBinaryProtocolAcceleratedBinaryEof(self):
        """Test that TBinaryProtocolAccelerated throws an EOFError when it reaches the end of the stream"""
        self.eofTestHelper(TBinaryProtocol.TBinaryProtocolAcceleratedFactory(fallback=False))
        self.eofTestHelperStress(TBinaryProtocol.TBinaryProtocolAcceleratedFactory(fallback=False))

    def testCompactProtocolEof(self):
        """Test that TCompactProtocol throws an EOFError when it reaches the end of the stream"""
        self.eofTestHelper(TCompactProtocol.TCompactProtocolFactory())
        self.eofTestHelperStress(TCompactProtocol.TCompactProtocolFactory())

    def testCompactProtocolAcceleratedCompactEof(self):
        """Test that TCompactProtocolAccelerated throws an EOFError when it reaches the end of the stream"""
        self.eofTestHelper(TCompactProtocol.TCompactProtocolAcceleratedFactory(fallback=False))
        self.eofTestHelperStress(TCompactProtocol.TCompactProtocolAcceleratedFactory(fallback=False))


def suite():
    suite = unittest.TestSuite()
    loader = unittest.TestLoader()
    suite.addTest(loader.loadTestsFromTestCase(TestEof))
    return suite


if __name__ == "__main__":
    unittest.main(defaultTest="suite", testRunner=unittest.TextTestRunner(verbosity=2))