Upgrading dependencies to gorealis v2 and thrift 0.12.0

This commit is contained in:
Renan DelValle 2018-12-26 17:25:59 -08:00
parent 7cbbea498b
commit 54b8d7942a
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9
1327 changed files with 137391 additions and 61476 deletions

View file

@ -28,13 +28,16 @@ include_directories(SYSTEM "${OPENSSL_INCLUDE_DIR}")
find_package(Libevent REQUIRED) # Libevent comes with CMake support from upstream
include_directories(SYSTEM ${LIBEVENT_INCLUDE_DIRS})
find_package(ZLIB REQUIRED)
include_directories(SYSTEM ${ZLIB_INCLUDE_DIRS})
#Make sure gen-cpp files can be included
include_directories("${CMAKE_CURRENT_BINARY_DIR}")
include_directories("${CMAKE_CURRENT_BINARY_DIR}/gen-cpp")
include_directories("${PROJECT_SOURCE_DIR}/lib/cpp/src")
set(crosstestgencpp_SOURCES
gen-cpp/SecondService.cpp
gen-cpp/ThriftTest.cpp
gen-cpp/ThriftTest_types.cpp
gen-cpp/ThriftTest_constants.cpp
@ -45,7 +48,7 @@ LINK_AGAINST_THRIFT_LIBRARY(crosstestgencpp thrift)
set(crossstressgencpp_SOURCES
gen-cpp/Service.cpp
#gen-cpp/StressTest_types.cpp #basically empty, so omitting
gen-cpp/StressTest_types.cpp
gen-cpp/StressTest_constants.cpp
)
add_library(crossstressgencpp STATIC ${crossstressgencpp_SOURCES})
@ -80,7 +83,7 @@ add_test(NAME StressTestNonBlocking COMMAND StressTestNonBlocking)
# Common thrift code generation rules
#
add_custom_command(OUTPUT gen-cpp/ThriftTest.cpp gen-cpp/ThriftTest_types.cpp gen-cpp/ThriftTest_constants.cpp
add_custom_command(OUTPUT gen-cpp/SecondService.cpp gen-cpp/SecondService.h gen-cpp/ThriftTest.cpp gen-cpp/ThriftTest.h gen-cpp/ThriftTest_types.cpp gen-cpp/ThriftTest_constants.cpp
COMMAND ${THRIFT_COMPILER} --gen cpp:templates,cob_style -r ${PROJECT_SOURCE_DIR}/test/ThriftTest.thrift
)

View file

@ -21,12 +21,16 @@ AUTOMAKE_OPTIONS = subdir-objects serial-tests
BUILT_SOURCES = gen-cpp/ThriftTest.cpp \
gen-cpp/ThriftTest_types.cpp \
gen-cpp/ThriftTest_constants.cpp \
gen-cpp/SecondService.cpp \
gen-cpp/StressTest_types.cpp \
gen-cpp/StressTest_constants.cpp \
gen-cpp/Service.cpp
noinst_LTLIBRARIES = libtestgencpp.la libstresstestgencpp.la
nodist_libtestgencpp_la_SOURCES = \
gen-cpp/SecondService.cpp \
gen-cpp/SecondService.h \
gen-cpp/SecondService.tcc \
gen-cpp/ThriftTest_constants.cpp \
gen-cpp/ThriftTest_constants.h \
gen-cpp/ThriftTest_types.cpp \
@ -98,16 +102,14 @@ StressTestNonBlocking_LDADD = \
#
# Common thrift code generation rules
#
THRIFT = $(top_builddir)/compiler/cpp/thrift
gen-cpp/ThriftTest.cpp gen-cpp/ThriftTest_types.cpp gen-cpp/ThriftTest_constants.cpp: $(top_srcdir)/test/ThriftTest.thrift $(THRIFT)
gen-cpp/ThriftTest.cpp gen-cpp/ThriftTest_types.cpp gen-cpp/ThriftTest_constants.cpp gen-cpp/SecondService.cpp gen-cpp/SecondService.h gen-cpp/SecondService.tcc: $(top_srcdir)/test/ThriftTest.thrift $(THRIFT)
$(THRIFT) --gen cpp:templates,cob_style -r $<
gen-cpp/StressTest_types.cpp gen-cpp/StressTest_constants.cpp gen-cpp/Service.cpp: $(top_srcdir)/test/StressTest.thrift $(THRIFT)
$(THRIFT) --gen cpp $<
AM_CPPFLAGS = $(BOOST_CPPFLAGS) $(LIBEVENT_CPPFLAGS) -I$(top_srcdir)/lib/cpp/src -Igen-cpp
AM_CXXFLAGS = -Wall -Wextra -pedantic
AM_CXXFLAGS = -Wall -Wextra -pedantic -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS
AM_LDFLAGS = $(BOOST_LDFLAGS) $(LIBEVENT_LDFLAGS) $(ZLIB_LIBS)
clean-local:
@ -120,6 +122,4 @@ EXTRA_DIST = \
src/TestClient.cpp \
src/TestServer.cpp \
src/StressTest.cpp \
src/StressTestNonBlocking.cpp \
realloc/realloc_test.c \
realloc/Makefile
src/StressTestNonBlocking.cpp

View file

@ -1,107 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#define _GNU_SOURCE
#include <stdlib.h>
#include <stdio.h>
#include <time.h>
#include <dlfcn.h>
int copies;
int non_copies;
void *realloc(void *ptr, size_t size) {
static void *(*real_realloc)(void*, size_t) = NULL;
if (real_realloc == NULL) {
real_realloc = (void* (*) (void*, size_t)) dlsym(RTLD_NEXT, "realloc");
}
void *ret_ptr = (*real_realloc)(ptr, size);
if (ret_ptr == ptr) {
non_copies++;
} else {
copies++;
}
return ret_ptr;
}
struct TMemoryBuffer {
void* ptr;
int size;
};
int main(int argc, char *argv[]) {
int num_buffers;
int init_size;
int max_size;
int doublings;
int iterations;
if (argc < 6 ||
argc > 7 ||
(num_buffers = atoi(argv[1])) == 0 ||
(init_size = atoi(argv[2])) == 0 ||
(max_size = atoi(argv[3])) == 0 ||
init_size > max_size ||
(iterations = atoi(argv[4])) == 0 ||
(doublings = atoi(argv[5])) == 0 ||
(argc == 7 && atoi(argv[6]) == 0)) {
fprintf(stderr, "usage: realloc_test <num_buffers> <init_size> <max_size> <doublings> <iterations> [seed]\n");
exit(EXIT_FAILURE);
}
for ( int i = 0 ; i < argc ; i++ ) {
printf("%s ", argv[i]);
}
printf("\n");
if (argc == 7) {
srand(atoi(argv[6]));
} else {
srand(time(NULL));
}
struct TMemoryBuffer* buffers = calloc(num_buffers, sizeof(*buffers));
if (buffers == NULL) abort();
for ( int i = 0 ; i < num_buffers ; i++ ) {
buffers[i].size = max_size;
}
while (iterations --> 0) {
for ( int i = 0 ; i < doublings * num_buffers ; i++ ) {
struct TMemoryBuffer* buf = &buffers[rand() % num_buffers];
buf->size *= 2;
if (buf->size <= max_size) {
buf->ptr = realloc(buf->ptr, buf->size);
} else {
free(buf->ptr);
buf->size = init_size;
buf->ptr = malloc(buf->size);
}
if (buf->ptr == NULL) abort();
}
}
printf("Non-copied %d/%d (%.2f%%)\n", non_copies, copies + non_copies, 100.0 * non_copies / (copies + non_copies));
return 0;
}

View file

@ -31,6 +31,7 @@
#include <thrift/transport/TTransportUtils.h>
#include <thrift/transport/TFileTransport.h>
#include <thrift/TLogging.h>
#include <thrift/stdcxx.h>
#include "Service.h"
#include <iostream>
@ -107,8 +108,8 @@ enum TransportOpenCloseBehavior {
};
class ClientThread : public Runnable {
public:
ClientThread(boost::shared_ptr<TTransport> transport,
boost::shared_ptr<ServiceIf> client,
ClientThread(stdcxx::shared_ptr<TTransport> transport,
stdcxx::shared_ptr<ServiceIf> client,
Monitor& monitor,
size_t& workerCount,
size_t loopCount,
@ -224,8 +225,8 @@ public:
}
}
boost::shared_ptr<TTransport> _transport;
boost::shared_ptr<ServiceIf> _client;
stdcxx::shared_ptr<TTransport> _transport;
stdcxx::shared_ptr<ServiceIf> _client;
Monitor& _monitor;
size_t& _workerCount;
size_t _loopCount;
@ -285,14 +286,14 @@ int main(int argc, char** argv) {
"server only. Default is " << clientCount << endl
<< "\thelp Prints this help text." << endl
<< "\tcall Service method to call. Default is " << callName << endl
<< "\tloop The number of remote thrift calls each client makes. Default is " << loopCount << endl
<< "\tloop The number of remote thrift calls each client makes. Default is " << loopCount << endl
<< "\tport The port the server and clients should bind to "
"for thrift network connections. Default is " << port << endl
<< "\tserver Run the Thrift server in this process. Default is " << runServer << endl
<< "\tserver Run the Thrift server in this process. Default is " << runServer << endl
<< "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl
<< "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl
<< "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests << endl
<< "\treplay-request Replay requests from log file (./requestlog.tlog) Default is " << replayRequests << endl
<< "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests << endl
<< "\treplay-request Replay requests from log file (./requestlog.tlog) Default is " << replayRequests << endl
<< "\tworkers Number of thread pools workers. Only valid "
"for thread-pool server type. Default is " << workerCount << endl
<< "\tclient-type Type of client, \"regular\" or \"concurrent\". Default is " << clientType << endl
@ -390,24 +391,24 @@ int main(int argc, char** argv) {
cerr << usage.str();
}
boost::shared_ptr<PlatformThreadFactory> threadFactory
= boost::shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
stdcxx::shared_ptr<PlatformThreadFactory> threadFactory
= stdcxx::shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
// Dispatcher
boost::shared_ptr<Server> serviceHandler(new Server());
stdcxx::shared_ptr<Server> serviceHandler(new Server());
if (replayRequests) {
boost::shared_ptr<Server> serviceHandler(new Server());
boost::shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
stdcxx::shared_ptr<Server> serviceHandler(new Server());
stdcxx::shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
// Transports
boost::shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
stdcxx::shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
fileTransport->setChunkSize(2 * 1024 * 1024);
fileTransport->setMaxEventSize(1024 * 16);
fileTransport->seekToEnd();
// Protocol Factory
boost::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
stdcxx::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
TFileProcessor fileProcessor(serviceProcessor, protocolFactory, fileTransport);
@ -417,28 +418,28 @@ int main(int argc, char** argv) {
if (runServer) {
boost::shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
stdcxx::shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
// Transport
boost::shared_ptr<TServerSocket> serverSocket(new TServerSocket(port));
stdcxx::shared_ptr<TServerSocket> serverSocket(new TServerSocket(port));
// Transport Factory
boost::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
stdcxx::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
// Protocol Factory
boost::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
stdcxx::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
if (logRequests) {
// initialize the log file
boost::shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
stdcxx::shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
fileTransport->setChunkSize(2 * 1024 * 1024);
fileTransport->setMaxEventSize(1024 * 16);
transportFactory
= boost::shared_ptr<TTransportFactory>(new TPipedTransportFactory(fileTransport));
= stdcxx::shared_ptr<TTransportFactory>(new TPipedTransportFactory(fileTransport));
}
boost::shared_ptr<TServer> server;
stdcxx::shared_ptr<TServer> server;
if (serverType == "simple") {
@ -452,7 +453,7 @@ int main(int argc, char** argv) {
} else if (serverType == "thread-pool") {
boost::shared_ptr<ThreadManager> threadManager
stdcxx::shared_ptr<ThreadManager> threadManager
= ThreadManager::newSimpleThreadManager(workerCount);
threadManager->threadFactory(threadFactory);
@ -464,9 +465,9 @@ int main(int argc, char** argv) {
threadManager));
}
boost::shared_ptr<TStartObserver> observer(new TStartObserver);
stdcxx::shared_ptr<TStartObserver> observer(new TStartObserver);
server->setServerEventHandler(observer);
boost::shared_ptr<Thread> serverThread = threadFactory->newThread(server);
stdcxx::shared_ptr<Thread> serverThread = threadFactory->newThread(server);
cerr << "Starting the server on port " << port << endl;
@ -485,7 +486,7 @@ int main(int argc, char** argv) {
size_t threadCount = 0;
set<boost::shared_ptr<Thread> > clientThreads;
set<stdcxx::shared_ptr<Thread> > clientThreads;
if (callName == "echoVoid") {
loopType = T_VOID;
@ -504,28 +505,28 @@ int main(int argc, char** argv) {
if(clientType == "regular") {
for (size_t ix = 0; ix < clientCount; ix++) {
boost::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port));
boost::shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(bufferedSocket));
boost::shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
stdcxx::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port));
stdcxx::shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
stdcxx::shared_ptr<TProtocol> protocol(new TBinaryProtocol(bufferedSocket));
stdcxx::shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
clientThreads.insert(threadFactory->newThread(boost::shared_ptr<ClientThread>(
clientThreads.insert(threadFactory->newThread(stdcxx::shared_ptr<ClientThread>(
new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType, OpenAndCloseTransportInThread))));
}
} else if(clientType == "concurrent") {
boost::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port));
boost::shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(bufferedSocket));
//boost::shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
boost::shared_ptr<ServiceConcurrentClient> serviceClient(new ServiceConcurrentClient(protocol));
stdcxx::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port));
stdcxx::shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
stdcxx::shared_ptr<TProtocol> protocol(new TBinaryProtocol(bufferedSocket));
//stdcxx::shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
stdcxx::shared_ptr<ServiceConcurrentClient> serviceClient(new ServiceConcurrentClient(protocol));
socket->open();
for (size_t ix = 0; ix < clientCount; ix++) {
clientThreads.insert(threadFactory->newThread(boost::shared_ptr<ClientThread>(
clientThreads.insert(threadFactory->newThread(stdcxx::shared_ptr<ClientThread>(
new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType, DontOpenAndCloseTransportInThread))));
}
}
for (std::set<boost::shared_ptr<Thread> >::const_iterator thread = clientThreads.begin();
for (std::set<stdcxx::shared_ptr<Thread> >::const_iterator thread = clientThreads.begin();
thread != clientThreads.end();
thread++) {
(*thread)->start();
@ -558,12 +559,12 @@ int main(int argc, char** argv) {
int64_t minTime = 9223372036854775807LL;
int64_t maxTime = 0;
for (set<boost::shared_ptr<Thread> >::iterator ix = clientThreads.begin();
for (set<stdcxx::shared_ptr<Thread> >::iterator ix = clientThreads.begin();
ix != clientThreads.end();
ix++) {
boost::shared_ptr<ClientThread> client
= boost::dynamic_pointer_cast<ClientThread>((*ix)->runnable());
stdcxx::shared_ptr<ClientThread> client
= stdcxx::dynamic_pointer_cast<ClientThread>((*ix)->runnable());
int64_t delta = client->_endTime - client->_startTime;

View file

@ -29,14 +29,14 @@
#include <thrift/server/TNonblockingServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TNonblockingServerSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/transport/TFileTransport.h>
#include <thrift/TLogging.h>
#include <thrift/stdcxx.h>
#include "Service.h"
#include <boost/shared_ptr.hpp>
#include <iostream>
#include <set>
#include <stdexcept>
@ -109,8 +109,8 @@ private:
class ClientThread : public Runnable {
public:
ClientThread(boost::shared_ptr<TTransport> transport,
boost::shared_ptr<ServiceClient> client,
ClientThread(stdcxx::shared_ptr<TTransport> transport,
stdcxx::shared_ptr<ServiceClient> client,
Monitor& monitor,
size_t& workerCount,
size_t loopCount,
@ -221,8 +221,8 @@ public:
}
}
boost::shared_ptr<TTransport> _transport;
boost::shared_ptr<ServiceClient> _client;
stdcxx::shared_ptr<TTransport> _transport;
stdcxx::shared_ptr<ServiceClient> _client;
Monitor& _monitor;
size_t& _workerCount;
size_t _loopCount;
@ -344,24 +344,24 @@ int main(int argc, char** argv) {
cerr << usage.str();
}
boost::shared_ptr<PlatformThreadFactory> threadFactory
= boost::shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
stdcxx::shared_ptr<PlatformThreadFactory> threadFactory
= stdcxx::shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
// Dispatcher
boost::shared_ptr<Server> serviceHandler(new Server());
stdcxx::shared_ptr<Server> serviceHandler(new Server());
if (replayRequests) {
boost::shared_ptr<Server> serviceHandler(new Server());
boost::shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
stdcxx::shared_ptr<Server> serviceHandler(new Server());
stdcxx::shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
// Transports
boost::shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
stdcxx::shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
fileTransport->setChunkSize(2 * 1024 * 1024);
fileTransport->setMaxEventSize(1024 * 16);
fileTransport->seekToEnd();
// Protocol Factory
boost::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
stdcxx::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
TFileProcessor fileProcessor(serviceProcessor, protocolFactory, fileTransport);
@ -371,45 +371,51 @@ int main(int argc, char** argv) {
if (runServer) {
boost::shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
stdcxx::shared_ptr<ServiceProcessor> serviceProcessor(new ServiceProcessor(serviceHandler));
// Protocol Factory
boost::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
stdcxx::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
// Transport Factory
boost::shared_ptr<TTransportFactory> transportFactory;
stdcxx::shared_ptr<TTransportFactory> transportFactory;
if (logRequests) {
// initialize the log file
boost::shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
stdcxx::shared_ptr<TFileTransport> fileTransport(new TFileTransport(requestLogPath));
fileTransport->setChunkSize(2 * 1024 * 1024);
fileTransport->setMaxEventSize(1024 * 16);
transportFactory
= boost::shared_ptr<TTransportFactory>(new TPipedTransportFactory(fileTransport));
= stdcxx::shared_ptr<TTransportFactory>(new TPipedTransportFactory(fileTransport));
}
boost::shared_ptr<Thread> serverThread;
boost::shared_ptr<Thread> serverThread2;
stdcxx::shared_ptr<Thread> serverThread;
stdcxx::shared_ptr<Thread> serverThread2;
stdcxx::shared_ptr<transport::TNonblockingServerSocket> nbSocket1;
stdcxx::shared_ptr<transport::TNonblockingServerSocket> nbSocket2;
if (serverType == "simple") {
serverThread = threadFactory->newThread(boost::shared_ptr<TServer>(
new TNonblockingServer(serviceProcessor, protocolFactory, port)));
serverThread2 = threadFactory->newThread(boost::shared_ptr<TServer>(
new TNonblockingServer(serviceProcessor, protocolFactory, port + 1)));
nbSocket1.reset(new transport::TNonblockingServerSocket(port));
serverThread = threadFactory->newThread(stdcxx::shared_ptr<TServer>(
new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket1)));
nbSocket2.reset(new transport::TNonblockingServerSocket(port + 1));
serverThread2 = threadFactory->newThread(stdcxx::shared_ptr<TServer>(
new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket2)));
} else if (serverType == "thread-pool") {
boost::shared_ptr<ThreadManager> threadManager
stdcxx::shared_ptr<ThreadManager> threadManager
= ThreadManager::newSimpleThreadManager(workerCount);
threadManager->threadFactory(threadFactory);
threadManager->start();
serverThread = threadFactory->newThread(boost::shared_ptr<TServer>(
new TNonblockingServer(serviceProcessor, protocolFactory, port, threadManager)));
serverThread2 = threadFactory->newThread(boost::shared_ptr<TServer>(
new TNonblockingServer(serviceProcessor, protocolFactory, port + 1, threadManager)));
nbSocket1.reset(new transport::TNonblockingServerSocket(port));
serverThread = threadFactory->newThread(stdcxx::shared_ptr<TServer>(
new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket1, threadManager)));
nbSocket2.reset(new transport::TNonblockingServerSocket(port + 1));
serverThread2 = threadFactory->newThread(stdcxx::shared_ptr<TServer>(
new TNonblockingServer(serviceProcessor, protocolFactory, nbSocket2, threadManager)));
}
cerr << "Starting the server on port " << port << " and " << (port + 1) << endl;
@ -431,7 +437,7 @@ int main(int argc, char** argv) {
size_t threadCount = 0;
set<boost::shared_ptr<Thread> > clientThreads;
set<stdcxx::shared_ptr<Thread> > clientThreads;
if (callName == "echoVoid") {
loopType = T_VOID;
@ -449,16 +455,16 @@ int main(int argc, char** argv) {
for (uint32_t ix = 0; ix < clientCount; ix++) {
boost::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port + (ix % 2)));
boost::shared_ptr<TFramedTransport> framedSocket(new TFramedTransport(socket));
boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(framedSocket));
boost::shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
stdcxx::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port + (ix % 2)));
stdcxx::shared_ptr<TFramedTransport> framedSocket(new TFramedTransport(socket));
stdcxx::shared_ptr<TProtocol> protocol(new TBinaryProtocol(framedSocket));
stdcxx::shared_ptr<ServiceClient> serviceClient(new ServiceClient(protocol));
clientThreads.insert(threadFactory->newThread(boost::shared_ptr<ClientThread>(
clientThreads.insert(threadFactory->newThread(stdcxx::shared_ptr<ClientThread>(
new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType))));
}
for (std::set<boost::shared_ptr<Thread> >::const_iterator thread = clientThreads.begin();
for (std::set<stdcxx::shared_ptr<Thread> >::const_iterator thread = clientThreads.begin();
thread != clientThreads.end();
thread++) {
(*thread)->start();
@ -491,12 +497,12 @@ int main(int argc, char** argv) {
int64_t minTime = 9223372036854775807LL;
int64_t maxTime = 0;
for (set<boost::shared_ptr<Thread> >::iterator ix = clientThreads.begin();
for (set<stdcxx::shared_ptr<Thread> >::iterator ix = clientThreads.begin();
ix != clientThreads.end();
ix++) {
boost::shared_ptr<ClientThread> client
= boost::dynamic_pointer_cast<ClientThread>((*ix)->runnable());
stdcxx::shared_ptr<ClientThread> client
= stdcxx::dynamic_pointer_cast<ClientThread>((*ix)->runnable());
int64_t delta = client->_endTime - client->_startTime;

View file

@ -1,4 +1,4 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -17,8 +17,6 @@
* under the License.
*/
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include <limits>
#include <locale>
#include <ios>
@ -28,29 +26,40 @@
#include <thrift/protocol/TCompactProtocol.h>
#include <thrift/protocol/THeaderProtocol.h>
#include <thrift/protocol/TJSONProtocol.h>
#include <thrift/protocol/TMultiplexedProtocol.h>
#include <thrift/transport/THttpClient.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TSSLSocket.h>
#include <thrift/transport/TZlibTransport.h>
#include <thrift/async/TEvhttpClientChannel.h>
#include <thrift/server/TNonblockingServer.h> // <event.h>
#include <boost/shared_ptr.hpp>
#include <boost/program_options.hpp>
#ifdef HAVE_STDINT_H
#include <stdint.h>
#endif
#ifdef HAVE_INTTYPES_H
#include <inttypes.h>
#endif
#include <boost/algorithm/string.hpp>
#include <boost/filesystem.hpp>
#include <thrift/cxxfunctional.h>
#include <boost/program_options.hpp>
#include <boost/random/random_device.hpp>
#include <thrift/stdcxx.h>
#if _WIN32
#include <thrift/windows/TWinsockSingleton.h>
#endif
#include "SecondService.h"
#include "ThriftTest.h"
using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::async;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace thrift::test;
using namespace apache::thrift::async;
// Current time, microseconds since the epoch
uint64_t now() {
@ -89,10 +98,10 @@ static void testVoid_clientReturn(event_base* base, ThriftTestCobClient* client)
for (int testNr = 0; testNr < 10; ++testNr) {
std::ostringstream os;
os << "test" << testNr;
client->testString(tcxx::bind(testString_clientReturn,
client->testString(stdcxx::bind(testString_clientReturn,
base,
testNr,
tcxx::placeholders::_1),
stdcxx::placeholders::_1),
os.str());
}
} catch (TException& exn) {
@ -123,16 +132,22 @@ bool print_eq(T expected, T actual) {
return_code |= ERR_BASETYPES; \
}
int binary_test(ThriftTestClient& testClient, string::size_type siz);
BOOST_CONSTEXPR_OR_CONST int ERR_BASETYPES = 1;
BOOST_CONSTEXPR_OR_CONST int ERR_STRUCTS = 2;
BOOST_CONSTEXPR_OR_CONST int ERR_CONTAINERS = 4;
BOOST_CONSTEXPR_OR_CONST int ERR_EXCEPTIONS = 8;
BOOST_CONSTEXPR_OR_CONST int ERR_UNKNOWN = 64;
int main(int argc, char** argv) {
cout.precision(19);
int ERR_BASETYPES = 1;
int ERR_STRUCTS = 2;
int ERR_CONTAINERS = 4;
int ERR_EXCEPTIONS = 8;
int ERR_UNKNOWN = 64;
string testDir = boost::filesystem::system_complete(argv[0]).parent_path().parent_path().parent_path().string();
string pemPath = testDir + "/keys/CA.pem";
string testDir = boost::filesystem::system_complete(argv[0]).parent_path().parent_path().parent_path().string();
string caPath = testDir + "/keys/CA.pem";
string certPath = testDir + "/keys/client.crt";
string keyPath = testDir + "/keys/client.key";
#if _WIN32
transport::TWinsockSingleton::create();
#endif
@ -140,6 +155,7 @@ int main(int argc, char** argv) {
int port = 9090;
int numTests = 1;
bool ssl = false;
bool zlib = false;
string transport_type = "buffered";
string protocol_type = "binary";
string domain_socket = "";
@ -149,28 +165,35 @@ int main(int argc, char** argv) {
int return_code = 0;
boost::program_options::options_description desc("Allowed options");
desc.add_options()("help,h",
"produce help message")("host",
boost::program_options::value<string>(&host)
->default_value(host),
"Host to connect")("port",
boost::program_options::value<int>(
&port)->default_value(port),
"Port number to connect")(
"domain-socket",
boost::program_options::value<string>(&domain_socket)->default_value(domain_socket),
"Domain Socket (e.g. /tmp/ThriftTest.thrift), instead of host and port")(
"abstract-namespace",
"Look for the domain socket in the Abstract Namespace (no connection with filesystem pathnames)")(
"transport",
boost::program_options::value<string>(&transport_type)->default_value(transport_type),
"Transport: buffered, framed, http, evhttp")(
"protocol",
boost::program_options::value<string>(&protocol_type)->default_value(protocol_type),
"Protocol: binary, header, compact, json")("ssl", "Encrypted Transport using SSL")(
"testloops,n",
boost::program_options::value<int>(&numTests)->default_value(numTests),
"Number of Tests")("noinsane", "Do not run insanity test");
desc.add_options()
("help,h", "produce help message")
("host",
boost::program_options::value<string>(&host)->default_value(host),
"Host to connect")
("port",
boost::program_options::value<int>(&port)->default_value(port),
"Port number to connect")
("domain-socket",
boost::program_options::value<string>(&domain_socket)->default_value(domain_socket),
"Domain Socket (e.g. /tmp/ThriftTest.thrift), instead of host and port")
("abstract-namespace",
"Look for the domain socket in the Abstract Namespace"
" (no connection with filesystem pathnames)")
("transport",
boost::program_options::value<string>(&transport_type)->default_value(transport_type),
"Transport: buffered, framed, http, evhttp, zlib")
("protocol",
boost::program_options::value<string>(&protocol_type)->default_value(protocol_type),
"Protocol: binary, compact, header, json, multi, multic, multih, multij")
("ssl",
"Encrypted Transport using SSL")
("zlib",
"Wrap Transport with Zlib")
("testloops,n",
boost::program_options::value<int>(&numTests)->default_value(numTests),
"Number of Tests")
("noinsane",
"Do not run insanity test");
boost::program_options::variables_map vm;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
@ -187,6 +210,10 @@ int main(int argc, char** argv) {
} else if (protocol_type == "compact") {
} else if (protocol_type == "header") {
} else if (protocol_type == "json") {
} else if (protocol_type == "multi") {
} else if (protocol_type == "multic") {
} else if (protocol_type == "multih") {
} else if (protocol_type == "multij") {
} else {
throw invalid_argument("Unknown protocol type " + protocol_type);
}
@ -197,6 +224,8 @@ int main(int argc, char** argv) {
} else if (transport_type == "framed") {
} else if (transport_type == "http") {
} else if (transport_type == "evhttp") {
} else if (transport_type == "zlib") {
// crosstest will pass zlib as a transport and as a flag right now..
} else {
throw invalid_argument("Unknown transport type " + transport_type);
}
@ -212,6 +241,10 @@ int main(int argc, char** argv) {
ssl = true;
}
if (vm.count("zlib")) {
zlib = true;
}
if (vm.count("abstract-namespace")) {
abstract_namespace = true;
}
@ -220,16 +253,23 @@ int main(int argc, char** argv) {
noinsane = true;
}
boost::shared_ptr<TTransport> transport;
boost::shared_ptr<TProtocol> protocol;
boost::shared_ptr<TSocket> socket;
boost::shared_ptr<TSSLSocketFactory> factory;
// THRIFT-4164: The factory MUST outlive any sockets it creates for correct behavior!
stdcxx::shared_ptr<TSSLSocketFactory> factory;
stdcxx::shared_ptr<TSocket> socket;
stdcxx::shared_ptr<TTransport> transport;
stdcxx::shared_ptr<TProtocol> protocol;
stdcxx::shared_ptr<TProtocol> protocol2; // SecondService for multiplexed
if (ssl) {
factory = boost::shared_ptr<TSSLSocketFactory>(new TSSLSocketFactory());
cout << "Client Certificate File: " << certPath << endl;
cout << "Client Key File: " << keyPath << endl;
cout << "CA File: " << caPath << endl;
factory = stdcxx::shared_ptr<TSSLSocketFactory>(new TSSLSocketFactory());
factory->ciphers("ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH");
factory->loadTrustedCertificates(pemPath.c_str());
factory->loadTrustedCertificates(caPath.c_str());
factory->loadCertificate(certPath.c_str());
factory->loadPrivateKey(keyPath.c_str());
factory->authenticate(true);
socket = factory->createSocket(host, port);
} else {
@ -237,39 +277,42 @@ int main(int argc, char** argv) {
if (abstract_namespace) {
std::string abstract_socket("\0", 1);
abstract_socket += domain_socket;
socket = boost::shared_ptr<TSocket>(new TSocket(abstract_socket));
socket = stdcxx::shared_ptr<TSocket>(new TSocket(abstract_socket));
} else {
socket = boost::shared_ptr<TSocket>(new TSocket(domain_socket));
socket = stdcxx::shared_ptr<TSocket>(new TSocket(domain_socket));
}
port = 0;
} else {
socket = boost::shared_ptr<TSocket>(new TSocket(host, port));
socket = stdcxx::shared_ptr<TSocket>(new TSocket(host, port));
}
}
if (transport_type.compare("http") == 0) {
boost::shared_ptr<TTransport> httpSocket(new THttpClient(socket, host, "/service"));
transport = httpSocket;
transport = stdcxx::make_shared<THttpClient>(socket, host, "/service");
} else if (transport_type.compare("framed") == 0) {
boost::shared_ptr<TFramedTransport> framedSocket(new TFramedTransport(socket));
transport = framedSocket;
transport = stdcxx::make_shared<TFramedTransport>(socket);
} else {
boost::shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket));
transport = bufferedSocket;
transport = stdcxx::make_shared<TBufferedTransport>(socket);
}
if (protocol_type.compare("json") == 0) {
boost::shared_ptr<TProtocol> jsonProtocol(new TJSONProtocol(transport));
protocol = jsonProtocol;
} else if (protocol_type.compare("compact") == 0) {
boost::shared_ptr<TProtocol> compactProtocol(new TCompactProtocol(transport));
protocol = compactProtocol;
} else if (protocol_type == "header") {
boost::shared_ptr<TProtocol> headerProtocol(new THeaderProtocol(transport));
protocol = headerProtocol;
if (zlib) {
transport = stdcxx::make_shared<TZlibTransport>(transport);
}
if (protocol_type == "json" || protocol_type == "multij") {
protocol = stdcxx::make_shared<TJSONProtocol>(transport);
} else if (protocol_type == "compact" || protocol_type == "multic") {
protocol = stdcxx::make_shared<TCompactProtocol>(transport);
} else if (protocol_type == "header" || protocol_type == "multih") {
protocol = stdcxx::make_shared<THeaderProtocol>(transport);
} else {
boost::shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol(transport));
protocol = binaryProtocol;
protocol = stdcxx::make_shared<TBinaryProtocol>(transport);
}
if (boost::starts_with(protocol_type, "multi")) {
protocol2 = stdcxx::make_shared<TMultiplexedProtocol>(protocol, "SecondService");
// we don't need access to the original protocol any more, so...
protocol = stdcxx::make_shared<TMultiplexedProtocol>(protocol, "ThriftTest");
}
// Connection info
@ -291,14 +334,14 @@ int main(int argc, char** argv) {
cout << "Libevent Features: 0x" << hex << event_base_get_features(base) << endl;
#endif
boost::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
stdcxx::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
boost::shared_ptr<TAsyncChannel> channel(
stdcxx::shared_ptr<TAsyncChannel> channel(
new TEvhttpClientChannel(host.c_str(), "/", host.c_str(), port, base));
ThriftTestCobClient* client = new ThriftTestCobClient(channel, protocolFactory.get());
client->testVoid(tcxx::bind(testVoid_clientReturn,
client->testVoid(stdcxx::bind(testVoid_clientReturn,
base,
tcxx::placeholders::_1));
stdcxx::placeholders::_1));
event_base_loop(base, 0);
return 0;
@ -354,7 +397,30 @@ int main(int argc, char** argv) {
return_code |= ERR_BASETYPES;
}
//
// Multiplexed protocol - call another service method
// in the middle of the ThriftTest
//
if (boost::starts_with(protocol_type, "multi")) {
SecondServiceClient ssc(protocol2);
// transport is already open...
try {
cout << "secondService.secondTestString(\"foo\") => " << flush;
std::string result;
ssc.secondtestString(result, "foo");
cout << "{" << result << "}" << endl;
} catch (std::exception& e) {
cout << " *** FAILED *** " << e.what() << endl;
return_code |= ERR_EXCEPTIONS;
}
}
try {
#ifdef _MSC_VER
#pragma warning( push )
#pragma warning( disable : 4566 )
#endif
string str(
"}{Afrikaans, Alemannisch, Aragonés, العربية, مصرى, "
"Asturianu, Aymar aru, Azərbaycan, Башҡорт, Boarisch, Žemaitėška, "
@ -381,6 +447,9 @@ int main(int argc, char** argv) {
"Türkçe, Татарча/Tatarça, Українська, اردو, Tiếng Việt, Volapük, "
"Walon, Winaray, 吴语, isiXhosa, ייִדיש, Yorùbá, Zeêuws, 中文, "
"Bân-lâm-gú, 粵語");
#ifdef _MSC_VER
#pragma warning( pop )
#endif
cout << "testString(" << str << ") = " << flush;
testClient.testString(s, str);
cout << s << endl;
@ -447,8 +516,8 @@ int main(int argc, char** argv) {
BASETYPE_IDENTITY_TEST(testI32, -1);
BASETYPE_IDENTITY_TEST(testI32, 190000013);
BASETYPE_IDENTITY_TEST(testI32, -190000013);
BASETYPE_IDENTITY_TEST(testI32, numeric_limits<int32_t>::max());
BASETYPE_IDENTITY_TEST(testI32, numeric_limits<int32_t>::min());
BASETYPE_IDENTITY_TEST(testI32, (numeric_limits<int32_t>::max)());
BASETYPE_IDENTITY_TEST(testI32, (numeric_limits<int32_t>::min)());
/**
* I64 TEST
@ -457,12 +526,12 @@ int main(int argc, char** argv) {
BASETYPE_IDENTITY_TEST(testI64, (int64_t)-1);
BASETYPE_IDENTITY_TEST(testI64, (int64_t)7000000000000000123LL);
BASETYPE_IDENTITY_TEST(testI64, (int64_t)-7000000000000000123LL);
BASETYPE_IDENTITY_TEST(testI64, (int64_t)pow(2LL, 32));
BASETYPE_IDENTITY_TEST(testI64, (int64_t)-pow(2LL, 32));
BASETYPE_IDENTITY_TEST(testI64, (int64_t)pow(2LL, 32) + 1);
BASETYPE_IDENTITY_TEST(testI64, (int64_t)-pow(2LL, 32) - 1);
BASETYPE_IDENTITY_TEST(testI64, numeric_limits<int64_t>::max());
BASETYPE_IDENTITY_TEST(testI64, numeric_limits<int64_t>::min());
BASETYPE_IDENTITY_TEST(testI64, (int64_t)pow(static_cast<double>(2LL), 32));
BASETYPE_IDENTITY_TEST(testI64, (int64_t)-pow(static_cast<double>(2LL), 32));
BASETYPE_IDENTITY_TEST(testI64, (int64_t)pow(static_cast<double>(2LL), 32) + 1);
BASETYPE_IDENTITY_TEST(testI64, (int64_t)-pow(static_cast<double>(2LL), 32) - 1);
BASETYPE_IDENTITY_TEST(testI64, (numeric_limits<int64_t>::max)());
BASETYPE_IDENTITY_TEST(testI64, (numeric_limits<int64_t>::min)());
/**
* DOUBLE TEST
@ -472,19 +541,19 @@ int main(int argc, char** argv) {
BASETYPE_IDENTITY_TEST(testDouble, -1.0);
BASETYPE_IDENTITY_TEST(testDouble, -5.2098523);
BASETYPE_IDENTITY_TEST(testDouble, -0.000341012439638598279);
BASETYPE_IDENTITY_TEST(testDouble, pow(2, 32));
BASETYPE_IDENTITY_TEST(testDouble, pow(2, 32) + 1);
BASETYPE_IDENTITY_TEST(testDouble, pow(2, 53) - 1);
BASETYPE_IDENTITY_TEST(testDouble, -pow(2, 32));
BASETYPE_IDENTITY_TEST(testDouble, -pow(2, 32) - 1);
BASETYPE_IDENTITY_TEST(testDouble, -pow(2, 53) + 1);
BASETYPE_IDENTITY_TEST(testDouble, pow(static_cast<double>(2), 32));
BASETYPE_IDENTITY_TEST(testDouble, pow(static_cast<double>(2), 32) + 1);
BASETYPE_IDENTITY_TEST(testDouble, pow(static_cast<double>(2), 53) - 1);
BASETYPE_IDENTITY_TEST(testDouble, -pow(static_cast<double>(2), 32));
BASETYPE_IDENTITY_TEST(testDouble, -pow(static_cast<double>(2), 32) - 1);
BASETYPE_IDENTITY_TEST(testDouble, -pow(static_cast<double>(2), 53) + 1);
try {
double expected = pow(10, 307);
double expected = pow(static_cast<double>(10), 307);
cout << "testDouble(" << expected << ") = " << flush;
double actual = testClient.testDouble(expected);
cout << "(" << actual << ")" << endl;
if (expected - actual > pow(10, 292)) {
if (expected - actual > pow(static_cast<double>(10), 292)) {
cout << "*** FAILED ***" << endl
<< "Expected: " << expected << " but got: " << actual << endl;
}
@ -496,11 +565,11 @@ int main(int argc, char** argv) {
}
try {
double expected = pow(10, -292);
double expected = pow(static_cast<double>(10), -292);
cout << "testDouble(" << expected << ") = " << flush;
double actual = testClient.testDouble(expected);
cout << "(" << actual << ")" << endl;
if (expected - actual > pow(10, -307)) {
if (expected - actual > pow(static_cast<double>(10), -307)) {
cout << "*** FAILED ***" << endl
<< "Expected: " << expected << " but got: " << actual << endl;
}
@ -514,72 +583,9 @@ int main(int argc, char** argv) {
/**
* BINARY TEST
*/
cout << "testBinary(empty)" << endl;
try {
string bin_result;
testClient.testBinary(bin_result, string());
if (!bin_result.empty()) {
cout << endl << "*** FAILED ***" << endl;
cout << "invalid length: " << bin_result.size() << endl;
return_code |= ERR_BASETYPES;
}
} catch (TTransportException&) {
throw;
} catch (exception& ex) {
cout << "*** FAILED ***" << endl << ex.what() << endl;
return_code |= ERR_BASETYPES;
}
cout << "testBinary([-128..127]) = {" << flush;
const signed char bin_data[256]
= {-128, -127, -126, -125, -124, -123, -122, -121, -120, -119, -118, -117, -116, -115, -114,
-113, -112, -111, -110, -109, -108, -107, -106, -105, -104, -103, -102, -101, -100, -99,
-98, -97, -96, -95, -94, -93, -92, -91, -90, -89, -88, -87, -86, -85, -84,
-83, -82, -81, -80, -79, -78, -77, -76, -75, -74, -73, -72, -71, -70, -69,
-68, -67, -66, -65, -64, -63, -62, -61, -60, -59, -58, -57, -56, -55, -54,
-53, -52, -51, -50, -49, -48, -47, -46, -45, -44, -43, -42, -41, -40, -39,
-38, -37, -36, -35, -34, -33, -32, -31, -30, -29, -28, -27, -26, -25, -24,
-23, -22, -21, -20, -19, -18, -17, -16, -15, -14, -13, -12, -11, -10, -9,
-8, -7, -6, -5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 6,
7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21,
22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36,
37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51,
52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66,
67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81,
82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96,
97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111,
112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126,
127};
try {
string bin_result;
testClient.testBinary(bin_result, string(reinterpret_cast<const char *>(bin_data), 256));
if (bin_result.size() != 256) {
cout << endl << "*** FAILED ***" << endl;
cout << "invalid length: " << bin_result.size() << endl;
return_code |= ERR_BASETYPES;
} else {
bool first = true;
bool failed = false;
for (int i = 0; i < 256; ++i) {
if (!first)
cout << ",";
else
first = false;
cout << static_cast<int>(bin_result[i]);
if (!failed && bin_result[i] != i - 128) {
failed = true;
}
}
cout << "}" << endl;
if (failed) {
cout << "*** FAILED ***" << endl;
return_code |= ERR_BASETYPES;
}
}
} catch (TTransportException&) {
throw;
} catch (exception& ex) {
cout << "*** FAILED ***" << endl << ex.what() << endl;
return_code |= ERR_BASETYPES;
for (string::size_type i = 0; i < 131073 && !return_code; ) {
return_code |= binary_test(testClient, i);
if (i > 0) { i *= 2; } else { ++i; }
}
@ -951,7 +957,7 @@ int main(int argc, char** argv) {
failed = true;
} else {
map<Numberz::type, Insanity>::const_iterator it26 = it2->second.find(Numberz::SIX);
if (it26 == it1->second.end() || it26->second != Insanity()) {
if (it26 == it2->second.end() || it26->second != Insanity()) {
failed = true;
}
}
@ -1076,12 +1082,14 @@ int main(int argc, char** argv) {
/**
* I32 TEST
*/
cout << "re-test testI32(-1)";
cout << "re-test testI32(-1)" << flush;
int i32 = testClient.testI32(-1);
cout << " = " << i32 << endl;
if (i32 != -1)
return_code |= ERR_BASETYPES;
cout << endl << "All tests done." << endl << flush;
uint64_t stop = now();
uint64_t tot = stop - start;
@ -1095,10 +1103,10 @@ int main(int argc, char** argv) {
time_max = tot;
}
cout << flush;
transport->close();
}
cout << endl << "All tests done." << endl;
uint64_t time_avg = time_tot / numTests;
@ -1108,3 +1116,66 @@ int main(int argc, char** argv) {
return return_code;
}
void binary_fill(std::string& str, string::size_type siz)
{
static const signed char bin_data[256]
= {-128, -127, -126, -125, -124, -123, -122, -121, -120, -119, -118, -117, -116, -115, -114,
-113, -112, -111, -110, -109, -108, -107, -106, -105, -104, -103, -102, -101, -100, -99,
-98, -97, -96, -95, -94, -93, -92, -91, -90, -89, -88, -87, -86, -85, -84,
-83, -82, -81, -80, -79, -78, -77, -76, -75, -74, -73, -72, -71, -70, -69,
-68, -67, -66, -65, -64, -63, -62, -61, -60, -59, -58, -57, -56, -55, -54,
-53, -52, -51, -50, -49, -48, -47, -46, -45, -44, -43, -42, -41, -40, -39,
-38, -37, -36, -35, -34, -33, -32, -31, -30, -29, -28, -27, -26, -25, -24,
-23, -22, -21, -20, -19, -18, -17, -16, -15, -14, -13, -12, -11, -10, -9,
-8, -7, -6, -5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 6,
7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21,
22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36,
37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51,
52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66,
67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81,
82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96,
97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111,
112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126,
127};
str.resize(siz);
char *ptr = &str[0];
string::size_type pos = 0;
for (string::size_type i = 0; i < siz; ++i)
{
if (pos == 255) { pos = 0; } else { ++pos; }
*ptr++ = bin_data[pos];
}
}
int binary_test(ThriftTestClient& testClient, string::size_type siz)
{
string bin_request;
string bin_result;
cout << "testBinary(siz = " << siz << ")" << endl;
binary_fill(bin_request, siz);
try {
testClient.testBinary(bin_result, bin_request);
if (bin_request.size() != bin_result.size()) {
cout << "*** FAILED: request size " << bin_request.size() << "; result size " << bin_result.size() << endl;
return ERR_BASETYPES;
}
for (string::size_type i = 0; i < siz; ++i) {
if (bin_request.at(i) != bin_result.at(i)) {
cout << "*** FAILED: at position " << i << " request[i] is h" << hex << bin_request.at(i) << " result[i] is h" << hex << bin_result.at(i) << endl;
return ERR_BASETYPES;
}
}
} catch (TTransportException&) {
throw;
} catch (exception& ex) {
cout << "*** FAILED ***" << endl << ex.what() << endl;
return ERR_BASETYPES;
}
return 0;
}

View file

@ -17,39 +17,52 @@
* under the License.
*/
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/async/TAsyncBufferProcessor.h>
#include <thrift/async/TAsyncProtocolProcessor.h>
#include <thrift/async/TEvhttpServer.h>
#include <thrift/concurrency/PlatformThreadFactory.h>
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/processor/TMultiplexedProcessor.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TCompactProtocol.h>
#include <thrift/protocol/THeaderProtocol.h>
#include <thrift/protocol/TJSONProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/server/TThreadedServer.h>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/async/TEvhttpServer.h>
#include <thrift/async/TAsyncBufferProcessor.h>
#include <thrift/async/TAsyncProtocolProcessor.h>
#include <thrift/server/TNonblockingServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TSSLServerSocket.h>
#include <thrift/transport/TSSLSocket.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/server/TThreadedServer.h>
#include <thrift/transport/THttpServer.h>
#include <thrift/transport/THttpTransport.h>
#include <thrift/transport/TNonblockingSSLServerSocket.h>
#include <thrift/transport/TNonblockingServerSocket.h>
#include <thrift/transport/TSSLServerSocket.h>
#include <thrift/transport/TSSLSocket.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/transport/TZlibTransport.h>
#include "SecondService.h"
#include "ThriftTest.h"
#ifdef HAVE_STDINT_H
#include <stdint.h>
#endif
#ifdef HAVE_INTTYPES_H
#include <inttypes.h>
#endif
#ifdef HAVE_SIGNAL_H
#include <signal.h>
#endif
#include <iostream>
#include <stdexcept>
#include <sstream>
#include <boost/algorithm/string.hpp>
#include <boost/program_options.hpp>
#include <boost/filesystem.hpp>
#include <thrift/cxxfunctional.h>
#include <thrift/stdcxx.h>
#include <signal.h>
#if _WIN32
#include <thrift/windows/TWinsockSingleton.h>
#endif
@ -57,14 +70,25 @@
using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::async;
using namespace apache::thrift::concurrency;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::thrift::server;
using namespace apache::thrift::async;
using namespace thrift::test;
// to handle a controlled shutdown, signal handling is mandatory
#ifdef HAVE_SIGNAL_H
apache::thrift::concurrency::Monitor gMonitor;
void signal_handler(int signum)
{
if (signum == SIGINT) {
gMonitor.notifyAll();
}
}
#endif
class TestHandler : public ThriftTestIf {
public:
TestHandler() {}
@ -104,7 +128,7 @@ public:
void testBinary(std::string& _return, const std::string& thing) {
std::ostringstream hexstr;
hexstr << std::hex << thing;
printf("testBinary(%s)\n", hexstr.str().c_str());
printf("testBinary(%lu: %s)\n", safe_numeric_cast<unsigned long>(thing.size()), hexstr.str().c_str());
_return = thing;
}
@ -326,13 +350,18 @@ public:
}
}
void testOneway(const int32_t sleepFor) {
printf("testOneway(%d): Sleeping...\n", sleepFor);
THRIFT_SLEEP_SEC(sleepFor);
printf("testOneway(%d): done sleeping!\n", sleepFor);
void testOneway(const int32_t aNum) {
printf("testOneway(%d): call received\n", aNum);
}
};
class SecondHandler : public SecondServiceIf
{
public:
void secondtestString(std::string& result, const std::string& thing)
{ result = "testString(\"" + thing + "\")"; }
};
class TestProcessorEventHandler : public TProcessorEventHandler {
virtual void* getContext(const char* fn_name, void* serverContext) {
(void)serverContext;
@ -366,66 +395,66 @@ class TestProcessorEventHandler : public TProcessorEventHandler {
class TestHandlerAsync : public ThriftTestCobSvIf {
public:
TestHandlerAsync(boost::shared_ptr<TestHandler>& handler) : _delegate(handler) {}
TestHandlerAsync(stdcxx::shared_ptr<TestHandler>& handler) : _delegate(handler) {}
virtual ~TestHandlerAsync() {}
virtual void testVoid(tcxx::function<void()> cob) {
virtual void testVoid(stdcxx::function<void()> cob) {
_delegate->testVoid();
cob();
}
virtual void testString(tcxx::function<void(std::string const& _return)> cob,
virtual void testString(stdcxx::function<void(std::string const& _return)> cob,
const std::string& thing) {
std::string res;
_delegate->testString(res, thing);
cob(res);
}
virtual void testBool(tcxx::function<void(bool const& _return)> cob, const bool thing) {
virtual void testBool(stdcxx::function<void(bool const& _return)> cob, const bool thing) {
bool res = _delegate->testBool(thing);
cob(res);
}
virtual void testByte(tcxx::function<void(int8_t const& _return)> cob, const int8_t thing) {
virtual void testByte(stdcxx::function<void(int8_t const& _return)> cob, const int8_t thing) {
int8_t res = _delegate->testByte(thing);
cob(res);
}
virtual void testI32(tcxx::function<void(int32_t const& _return)> cob, const int32_t thing) {
virtual void testI32(stdcxx::function<void(int32_t const& _return)> cob, const int32_t thing) {
int32_t res = _delegate->testI32(thing);
cob(res);
}
virtual void testI64(tcxx::function<void(int64_t const& _return)> cob, const int64_t thing) {
virtual void testI64(stdcxx::function<void(int64_t const& _return)> cob, const int64_t thing) {
int64_t res = _delegate->testI64(thing);
cob(res);
}
virtual void testDouble(tcxx::function<void(double const& _return)> cob, const double thing) {
virtual void testDouble(stdcxx::function<void(double const& _return)> cob, const double thing) {
double res = _delegate->testDouble(thing);
cob(res);
}
virtual void testBinary(tcxx::function<void(std::string const& _return)> cob,
virtual void testBinary(stdcxx::function<void(std::string const& _return)> cob,
const std::string& thing) {
std::string res;
_delegate->testBinary(res, thing);
cob(res);
}
virtual void testStruct(tcxx::function<void(Xtruct const& _return)> cob, const Xtruct& thing) {
virtual void testStruct(stdcxx::function<void(Xtruct const& _return)> cob, const Xtruct& thing) {
Xtruct res;
_delegate->testStruct(res, thing);
cob(res);
}
virtual void testNest(tcxx::function<void(Xtruct2 const& _return)> cob, const Xtruct2& thing) {
virtual void testNest(stdcxx::function<void(Xtruct2 const& _return)> cob, const Xtruct2& thing) {
Xtruct2 res;
_delegate->testNest(res, thing);
cob(res);
}
virtual void testMap(tcxx::function<void(std::map<int32_t, int32_t> const& _return)> cob,
virtual void testMap(stdcxx::function<void(std::map<int32_t, int32_t> const& _return)> cob,
const std::map<int32_t, int32_t>& thing) {
std::map<int32_t, int32_t> res;
_delegate->testMap(res, thing);
@ -433,40 +462,40 @@ public:
}
virtual void testStringMap(
tcxx::function<void(std::map<std::string, std::string> const& _return)> cob,
stdcxx::function<void(std::map<std::string, std::string> const& _return)> cob,
const std::map<std::string, std::string>& thing) {
std::map<std::string, std::string> res;
_delegate->testStringMap(res, thing);
cob(res);
}
virtual void testSet(tcxx::function<void(std::set<int32_t> const& _return)> cob,
virtual void testSet(stdcxx::function<void(std::set<int32_t> const& _return)> cob,
const std::set<int32_t>& thing) {
std::set<int32_t> res;
_delegate->testSet(res, thing);
cob(res);
}
virtual void testList(tcxx::function<void(std::vector<int32_t> const& _return)> cob,
virtual void testList(stdcxx::function<void(std::vector<int32_t> const& _return)> cob,
const std::vector<int32_t>& thing) {
std::vector<int32_t> res;
_delegate->testList(res, thing);
cob(res);
}
virtual void testEnum(tcxx::function<void(Numberz::type const& _return)> cob,
virtual void testEnum(stdcxx::function<void(Numberz::type const& _return)> cob,
const Numberz::type thing) {
Numberz::type res = _delegate->testEnum(thing);
cob(res);
}
virtual void testTypedef(tcxx::function<void(UserId const& _return)> cob, const UserId thing) {
virtual void testTypedef(stdcxx::function<void(UserId const& _return)> cob, const UserId thing) {
UserId res = _delegate->testTypedef(thing);
cob(res);
}
virtual void testMapMap(
tcxx::function<void(std::map<int32_t, std::map<int32_t, int32_t> > const& _return)> cob,
stdcxx::function<void(std::map<int32_t, std::map<int32_t, int32_t> > const& _return)> cob,
const int32_t hello) {
std::map<int32_t, std::map<int32_t, int32_t> > res;
_delegate->testMapMap(res, hello);
@ -474,14 +503,14 @@ public:
}
virtual void testInsanity(
tcxx::function<void(std::map<UserId, std::map<Numberz::type, Insanity> > const& _return)> cob,
stdcxx::function<void(std::map<UserId, std::map<Numberz::type, Insanity> > const& _return)> cob,
const Insanity& argument) {
std::map<UserId, std::map<Numberz::type, Insanity> > res;
_delegate->testInsanity(res, argument);
cob(res);
}
virtual void testMulti(tcxx::function<void(Xtruct const& _return)> cob,
virtual void testMulti(stdcxx::function<void(Xtruct const& _return)> cob,
const int8_t arg0,
const int32_t arg1,
const int64_t arg2,
@ -494,8 +523,8 @@ public:
}
virtual void testException(
tcxx::function<void()> cob,
tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob,
stdcxx::function<void()> cob,
stdcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob,
const std::string& arg) {
try {
_delegate->testException(arg);
@ -507,8 +536,8 @@ public:
}
virtual void testMultiException(
tcxx::function<void(Xtruct const& _return)> cob,
tcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob,
stdcxx::function<void(Xtruct const& _return)> cob,
stdcxx::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob,
const std::string& arg0,
const std::string& arg1) {
Xtruct res;
@ -521,13 +550,13 @@ public:
cob(res);
}
virtual void testOneway(tcxx::function<void()> cob, const int32_t secondsToSleep) {
virtual void testOneway(stdcxx::function<void()> cob, const int32_t secondsToSleep) {
_delegate->testOneway(secondsToSleep);
cob();
}
protected:
boost::shared_ptr<TestHandler> _delegate;
stdcxx::shared_ptr<TestHandler> _delegate;
};
namespace po = boost::program_options;
@ -543,6 +572,7 @@ int main(int argc, char** argv) {
#endif
int port = 9090;
bool ssl = false;
bool zlib = false;
string transport_type = "buffered";
string protocol_type = "binary";
string server_type = "simple";
@ -559,9 +589,10 @@ int main(int argc, char** argv) {
("domain-socket", po::value<string>(&domain_socket) ->default_value(domain_socket), "Unix Domain Socket (e.g. /tmp/ThriftTest.thrift)")
("abstract-namespace", "Create the domain socket in the Abstract Namespace (no connection with filesystem pathnames)")
("server-type", po::value<string>(&server_type)->default_value(server_type), "type of server, \"simple\", \"thread-pool\", \"threaded\", or \"nonblocking\"")
("transport", po::value<string>(&transport_type)->default_value(transport_type), "transport: buffered, framed, http")
("protocol", po::value<string>(&protocol_type)->default_value(protocol_type), "protocol: binary, compact, header, json")
("transport", po::value<string>(&transport_type)->default_value(transport_type), "transport: buffered, framed, http, zlib")
("protocol", po::value<string>(&protocol_type)->default_value(protocol_type), "protocol: binary, compact, header, json, multi, multic, multih, multij")
("ssl", "Encrypted Transport using SSL")
("zlib", "Wrapped Transport using Zlib")
("processor-events", "processor-events")
("workers,n", po::value<size_t>(&workers)->default_value(workers), "Number of thread pools workers. Only valid for thread-pool server type")
("string-limit", po::value<int>(&string_limit))
@ -592,6 +623,10 @@ int main(int argc, char** argv) {
} else if (protocol_type == "compact") {
} else if (protocol_type == "json") {
} else if (protocol_type == "header") {
} else if (protocol_type == "multi") { // multiplexed binary
} else if (protocol_type == "multic") { // multiplexed compact
} else if (protocol_type == "multih") { // multiplexed header
} else if (protocol_type == "multij") { // multiplexed json
} else {
throw invalid_argument("Unknown protocol type " + protocol_type);
}
@ -601,6 +636,8 @@ int main(int argc, char** argv) {
if (transport_type == "buffered") {
} else if (transport_type == "framed") {
} else if (transport_type == "http") {
} else if (transport_type == "zlib") {
// crosstester will pass zlib as a flag and a transport right now...
} else {
throw invalid_argument("Unknown transport type " + transport_type);
}
@ -616,22 +653,32 @@ int main(int argc, char** argv) {
ssl = true;
}
if (vm.count("zlib")) {
zlib = true;
}
#if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
if (ssl) {
signal(SIGPIPE, SIG_IGN); // for OpenSSL, otherwise we end abruptly
}
#endif
if (vm.count("abstract-namespace")) {
abstract_namespace = true;
}
// Dispatcher
boost::shared_ptr<TProtocolFactory> protocolFactory;
if (protocol_type == "json") {
boost::shared_ptr<TProtocolFactory> jsonProtocolFactory(new TJSONProtocolFactory());
stdcxx::shared_ptr<TProtocolFactory> protocolFactory;
if (protocol_type == "json" || protocol_type == "multij") {
stdcxx::shared_ptr<TProtocolFactory> jsonProtocolFactory(new TJSONProtocolFactory());
protocolFactory = jsonProtocolFactory;
} else if (protocol_type == "compact") {
} else if (protocol_type == "compact" || protocol_type == "multic") {
TCompactProtocolFactoryT<TBufferBase> *compactProtocolFactory = new TCompactProtocolFactoryT<TBufferBase>();
compactProtocolFactory->setContainerSizeLimit(container_limit);
compactProtocolFactory->setStringSizeLimit(string_limit);
protocolFactory.reset(compactProtocolFactory);
} else if (protocol_type == "header") {
boost::shared_ptr<TProtocolFactory> headerProtocolFactory(new THeaderProtocolFactory());
} else if (protocol_type == "header" || protocol_type == "multih") {
stdcxx::shared_ptr<TProtocolFactory> headerProtocolFactory(new THeaderProtocolFactory());
protocolFactory = headerProtocolFactory;
} else {
TBinaryProtocolFactoryT<TBufferBase>* binaryProtocolFactory = new TBinaryProtocolFactoryT<TBufferBase>();
@ -640,53 +687,57 @@ int main(int argc, char** argv) {
protocolFactory.reset(binaryProtocolFactory);
}
// Processor
boost::shared_ptr<TestHandler> testHandler(new TestHandler());
boost::shared_ptr<ThriftTestProcessor> testProcessor(new ThriftTestProcessor(testHandler));
// Processors
stdcxx::shared_ptr<TestHandler> testHandler(new TestHandler());
stdcxx::shared_ptr<TProcessor> testProcessor(new ThriftTestProcessor(testHandler));
if (vm.count("processor-events")) {
testProcessor->setEventHandler(
boost::shared_ptr<TProcessorEventHandler>(new TestProcessorEventHandler()));
stdcxx::shared_ptr<TProcessorEventHandler>(new TestProcessorEventHandler()));
}
// Transport
boost::shared_ptr<TSSLSocketFactory> sslSocketFactory;
boost::shared_ptr<TServerSocket> serverSocket;
stdcxx::shared_ptr<TSSLSocketFactory> sslSocketFactory;
stdcxx::shared_ptr<TServerSocket> serverSocket;
if (ssl) {
sslSocketFactory = boost::shared_ptr<TSSLSocketFactory>(new TSSLSocketFactory());
sslSocketFactory = stdcxx::shared_ptr<TSSLSocketFactory>(new TSSLSocketFactory());
sslSocketFactory->loadCertificate(certPath.c_str());
sslSocketFactory->loadPrivateKey(keyPath.c_str());
sslSocketFactory->ciphers("ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH");
serverSocket = boost::shared_ptr<TServerSocket>(new TSSLServerSocket(port, sslSocketFactory));
if (server_type != "nonblocking") {
serverSocket = stdcxx::shared_ptr<TServerSocket>(new TSSLServerSocket(port, sslSocketFactory));
}
} else {
if (domain_socket != "") {
if (abstract_namespace) {
std::string abstract_socket("\0", 1);
abstract_socket += domain_socket;
serverSocket = boost::shared_ptr<TServerSocket>(new TServerSocket(abstract_socket));
serverSocket = stdcxx::shared_ptr<TServerSocket>(new TServerSocket(abstract_socket));
} else {
unlink(domain_socket.c_str());
serverSocket = boost::shared_ptr<TServerSocket>(new TServerSocket(domain_socket));
serverSocket = stdcxx::shared_ptr<TServerSocket>(new TServerSocket(domain_socket));
}
port = 0;
} else {
serverSocket = boost::shared_ptr<TServerSocket>(new TServerSocket(port));
serverSocket = stdcxx::shared_ptr<TServerSocket>(new TServerSocket(port));
}
}
// Factory
boost::shared_ptr<TTransportFactory> transportFactory;
stdcxx::shared_ptr<TTransportFactory> transportFactory;
if (transport_type == "http" && server_type != "nonblocking") {
boost::shared_ptr<TTransportFactory> httpTransportFactory(new THttpServerTransportFactory());
transportFactory = httpTransportFactory;
transportFactory = stdcxx::make_shared<THttpServerTransportFactory>();
} else if (transport_type == "framed") {
boost::shared_ptr<TTransportFactory> framedTransportFactory(new TFramedTransportFactory());
transportFactory = framedTransportFactory;
transportFactory = stdcxx::make_shared<TFramedTransportFactory>();
} else {
boost::shared_ptr<TTransportFactory> bufferedTransportFactory(new TBufferedTransportFactory());
transportFactory = bufferedTransportFactory;
transportFactory = stdcxx::make_shared<TBufferedTransportFactory>();
}
if (zlib) {
// hmm.. doesn't seem to be a way to make it wrap the others...
transportFactory = stdcxx::make_shared<TZlibTransportFactory>();
}
// Server Info
@ -701,20 +752,30 @@ int main(int argc, char** argv) {
}
cout << endl;
// Multiplexed Processor if needed
if (boost::starts_with(protocol_type, "multi")) {
stdcxx::shared_ptr<SecondHandler> secondHandler(new SecondHandler());
stdcxx::shared_ptr<SecondServiceProcessor> secondProcessor(new SecondServiceProcessor(secondHandler));
stdcxx::shared_ptr<TMultiplexedProcessor> multiplexedProcessor(new TMultiplexedProcessor());
multiplexedProcessor->registerDefault(testProcessor); // non-multi clients go to the default processor (multi:binary, multic:compact, ...)
multiplexedProcessor->registerProcessor("ThriftTest", testProcessor);
multiplexedProcessor->registerProcessor("SecondService", secondProcessor);
testProcessor = stdcxx::dynamic_pointer_cast<TProcessor>(multiplexedProcessor);
}
// Server
boost::shared_ptr<apache::thrift::server::TServer> server;
stdcxx::shared_ptr<apache::thrift::server::TServer> server;
if (server_type == "simple") {
server.reset(new TSimpleServer(testProcessor, serverSocket, transportFactory, protocolFactory));
} else if (server_type == "thread-pool") {
boost::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workers);
boost::shared_ptr<PlatformThreadFactory> threadFactory
= boost::shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
stdcxx::shared_ptr<PlatformThreadFactory> threadFactory
= stdcxx::shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
stdcxx::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workers);
threadManager->threadFactory(threadFactory);
threadManager->start();
server.reset(new TThreadPoolServer(testProcessor,
@ -723,15 +784,14 @@ int main(int argc, char** argv) {
protocolFactory,
threadManager));
} else if (server_type == "threaded") {
server.reset(
new TThreadedServer(testProcessor, serverSocket, transportFactory, protocolFactory));
} else if (server_type == "nonblocking") {
if (transport_type == "http") {
boost::shared_ptr<TestHandlerAsync> testHandlerAsync(new TestHandlerAsync(testHandler));
boost::shared_ptr<TAsyncProcessor> testProcessorAsync(
stdcxx::shared_ptr<TestHandlerAsync> testHandlerAsync(new TestHandlerAsync(testHandler));
stdcxx::shared_ptr<TAsyncProcessor> testProcessorAsync(
new ThriftTestAsyncProcessor(testHandlerAsync));
boost::shared_ptr<TAsyncBufferProcessor> testBufferProcessor(
stdcxx::shared_ptr<TAsyncBufferProcessor> testBufferProcessor(
new TAsyncProtocolProcessor(testProcessorAsync, protocolFactory));
// not loading nonblockingServer into "server" because
@ -739,8 +799,15 @@ int main(int argc, char** argv) {
// provide a stop method.
TEvhttpServer nonblockingServer(testBufferProcessor, port);
nonblockingServer.serve();
} else if (transport_type == "framed") {
stdcxx::shared_ptr<transport::TNonblockingServerTransport> nbSocket;
nbSocket.reset(
ssl ? new transport::TNonblockingSSLServerSocket(port, sslSocketFactory)
: new transport::TNonblockingServerSocket(port));
server.reset(new TNonblockingServer(testProcessor, protocolFactory, nbSocket));
} else {
server.reset(new TNonblockingServer(testProcessor, protocolFactory, port));
cerr << "server-type nonblocking requires transport of http or framed" << endl;
exit(1);
}
}
@ -748,22 +815,25 @@ int main(int argc, char** argv) {
if (protocol_type == "header") {
// Tell the server to use the same protocol for input / output
// if using header
server->setOutputProtocolFactory(boost::shared_ptr<TProtocolFactory>());
server->setOutputProtocolFactory(stdcxx::shared_ptr<TProtocolFactory>());
}
apache::thrift::concurrency::PlatformThreadFactory factory;
factory.setDetached(false);
boost::shared_ptr<apache::thrift::concurrency::Runnable> serverThreadRunner(server);
boost::shared_ptr<apache::thrift::concurrency::Thread> thread
stdcxx::shared_ptr<apache::thrift::concurrency::Runnable> serverThreadRunner(server);
stdcxx::shared_ptr<apache::thrift::concurrency::Thread> thread
= factory.newThread(serverThreadRunner);
thread->start();
// HACK: cross language test suite is unable to handle cin properly
// that's why we stay in a endless loop here
while (1) {
}
// FIXME: find another way to stop the server (e.g. a signal)
// cout<<"Press enter to stop the server."<<endl;
// cin.ignore(); //wait until a key is pressed
#ifdef HAVE_SIGNAL_H
signal(SIGINT, signal_handler);
#endif
thread->start();
gMonitor.waitForever(); // wait for a shutdown signal
#ifdef HAVE_SIGNAL_H
signal(SIGINT, SIG_DFL);
#endif
server->stop();
thread->join();
@ -773,3 +843,4 @@ int main(int argc, char** argv) {
cout << "done." << endl;
return 0;
}