Checking in vendor folder for ease of using go get.

This commit is contained in:
Renan DelValle 2018-10-23 23:32:59 -07:00
parent 7a1251853b
commit cdb4b5a1d0
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9
3554 changed files with 1270116 additions and 0 deletions

129
vendor/git.apache.org/thrift.git/lib/d/test/Makefile.am generated vendored Executable file
View file

@ -0,0 +1,129 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
AUTOMAKE_OPTIONS = serial-tests
BUILT_SOURCES = trusted-ca-certificate.pem server-certificate.pem
# Thrift compiler rules
THRIFT = $(top_builddir)/compiler/cpp/thrift
debug_proto_gen = $(addprefix gen-d/, DebugProtoTest_types.d)
$(debug_proto_gen): $(top_srcdir)/test/DebugProtoTest.thrift
$(THRIFT) --gen d -nowarn $<
stress_test_gen = $(addprefix gen-d/thrift/test/stress/, Service.d \
StressTest_types.d)
$(stress_test_gen): $(top_srcdir)/test/StressTest.thrift
$(THRIFT) --gen d $<
thrift_test_gen = $(addprefix gen-d/thrift/test/, SecondService.d \
ThriftTest.d ThriftTest_constants.d ThriftTest_types.d)
$(thrift_test_gen): $(top_srcdir)/test/ThriftTest.thrift
$(THRIFT) --gen d $<
# The actual test targets.
# There just must be some way to reassign a variable without warnings in
# Automake...
targets__ = async_test client_pool_test serialization_benchmark \
stress_test_server thrift_test_client thrift_test_server transport_test
ran_tests__ = client_pool_test \
transport_test \
async_test_runner.sh \
thrift_test_runner.sh
libevent_dependent_targets = async_test_client client_pool_test \
stress_test_server thrift_test_server
libevent_dependent_ran_tests = client_pool_test async_test_runner.sh thrift_test_runner.sh
openssl_dependent_targets = async_test thrift_test_client thrift_test_server
openssl_dependent_ran_tests = async_test_runner.sh thrift_test_runner.sh
d_test_flags =
if WITH_D_EVENT_TESTS
d_test_flags += $(DMD_LIBEVENT_FLAGS) ../$(D_EVENT_LIB_NAME)
targets_ = $(targets__)
ran_tests_ = $(ran_tests__)
else
targets_ = $(filter-out $(libevent_dependent_targets), $(targets__))
ran_tests_ = $(filter-out $(libevent_dependent_ran_tests), $(ran_tests__))
endif
if WITH_D_SSL_TESTS
d_test_flags += $(DMD_OPENSSL_FLAGS) ../$(D_SSL_LIB_NAME)
targets = $(targets_)
ran_tests = $(ran_tests_)
else
targets = $(filter-out $(openssl_dependent_targets), $(targets_))
ran_tests = $(filter-out $(openssl_dependent_ran_tests), $(ran_tests_))
endif
d_test_flags += -w -wi -O -release -inline -I$(top_srcdir)/lib/d/src -Igen-d \
$(top_builddir)/lib/d/$(D_LIB_NAME)
async_test client_pool_test transport_test: %: %.d
$(DMD) $(d_test_flags) -of$@ $^
serialization_benchmark: %: %.d $(debug_proto_gen)
$(DMD) $(d_test_flags) -of$@ $^
stress_test_server: %: %.d test_utils.d $(stress_test_gen)
$(DMD) $(d_test_flags) -of$@ $^
thrift_test_client: %: %.d thrift_test_common.d $(thrift_test_gen)
$(DMD) $(d_test_flags) -of$@ $^
thrift_test_server: %: %.d thrift_test_common.d test_utils.d $(thrift_test_gen)
$(DMD) $(d_test_flags) -of$@ $^
# Certificate generation targets (for the SSL tests).
# Currently, we just assume that the "openssl" tool is on the path, could be
# replaced by a more elaborate mechanism.
server-certificate.pem: openssl.test.cnf
openssl req -new -x509 -nodes -config openssl.test.cnf \
-out server-certificate.pem
trusted-ca-certificate.pem: server-certificate.pem
cat server-certificate.pem > $@
check-local: $(targets)
clean-local:
$(RM) -rf gen-d $(targets) $(addsuffix .o, $(targets))
# Tests ran as part of make check.
async_test_runner.sh: async_test trusted-ca-certificate.pem server-certificate.pem
thrift_test_runner.sh: thrift_test_client thrift_test_server \
trusted-ca-certificate.pem server-certificate.pem
TESTS = $(ran_tests)
precross: $(targets)

View file

@ -0,0 +1,396 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless enforced by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
module async_test;
import core.atomic;
import core.sync.condition : Condition;
import core.sync.mutex : Mutex;
import core.thread : dur, Thread, ThreadGroup;
import std.conv : text;
import std.datetime;
import std.getopt;
import std.exception : collectException, enforce;
import std.parallelism : TaskPool;
import std.stdio;
import std.string;
import std.variant : Variant;
import thrift.base;
import thrift.async.base;
import thrift.async.libevent;
import thrift.async.socket;
import thrift.async.ssl;
import thrift.codegen.async_client;
import thrift.codegen.async_client_pool;
import thrift.codegen.base;
import thrift.codegen.processor;
import thrift.protocol.base;
import thrift.protocol.binary;
import thrift.server.base;
import thrift.server.simple;
import thrift.server.transport.socket;
import thrift.server.transport.ssl;
import thrift.transport.base;
import thrift.transport.buffered;
import thrift.transport.ssl;
import thrift.util.cancellation;
version (Posix) {
import core.stdc.signal;
import core.sys.posix.signal;
// Disable SIGPIPE because SSL server will write to broken socket after
// client disconnected (see TSSLSocket docs).
shared static this() {
signal(SIGPIPE, SIG_IGN);
}
}
interface AsyncTest {
string echo(string value);
string delayedEcho(string value, long milliseconds);
void fail(string reason);
void delayedFail(string reason, long milliseconds);
enum methodMeta = [
TMethodMeta("fail", [], [TExceptionMeta("ate", 1, "AsyncTestException")]),
TMethodMeta("delayedFail", [], [TExceptionMeta("ate", 1, "AsyncTestException")])
];
alias .AsyncTestException AsyncTestException;
}
class AsyncTestException : TException {
string reason;
mixin TStructHelpers!();
}
void main(string[] args) {
ushort port = 9090;
ushort managerCount = 2;
ushort serversPerManager = 5;
ushort threadsPerServer = 10;
uint iterations = 10;
bool ssl;
bool trace;
getopt(args,
"iterations", &iterations,
"managers", &managerCount,
"port", &port,
"servers-per-manager", &serversPerManager,
"ssl", &ssl,
"threads-per-server", &threadsPerServer,
"trace", &trace,
);
TTransportFactory clientTransportFactory;
TSSLContext serverSSLContext;
if (ssl) {
auto clientSSLContext = new TSSLContext();
with (clientSSLContext) {
ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH";
authenticate = true;
loadTrustedCertificates("./trusted-ca-certificate.pem");
}
clientTransportFactory = new TAsyncSSLSocketFactory(clientSSLContext);
serverSSLContext = new TSSLContext();
with (serverSSLContext) {
serverSide = true;
loadCertificate("./server-certificate.pem");
loadPrivateKey("./server-private-key.pem");
ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH";
}
} else {
clientTransportFactory = new TBufferedTransportFactory;
}
auto serverCancel = new TCancellationOrigin;
scope(exit) {
writeln("Triggering server shutdown...");
serverCancel.trigger();
writeln("done.");
}
auto managers = new TLibeventAsyncManager[managerCount];
scope (exit) foreach (ref m; managers) destroy(m);
auto clientsThreads = new ThreadGroup;
foreach (managerIndex, ref manager; managers) {
manager = new TLibeventAsyncManager;
foreach (serverIndex; 0 .. serversPerManager) {
auto currentPort = cast(ushort)
(port + managerIndex * serversPerManager + serverIndex);
// Start the server and wait until it is up and running.
auto servingMutex = new Mutex;
auto servingCondition = new Condition(servingMutex);
auto handler = new PreServeNotifyHandler(servingMutex, servingCondition);
synchronized (servingMutex) {
(new ServerThread!TSimpleServer(currentPort, serverSSLContext, trace,
serverCancel, handler)).start();
servingCondition.wait();
}
// We only run the timing tests for the first server on each async
// manager, so that we don't get spurious timing errors becaue of
// ordering issues.
auto runTimingTests = (serverIndex == 0);
auto c = new ClientsThread(manager, currentPort, clientTransportFactory,
threadsPerServer, iterations, runTimingTests, trace);
clientsThreads.add(c);
c.start();
}
}
clientsThreads.joinAll();
}
class AsyncTestHandler : AsyncTest {
this(bool trace) {
trace_ = trace;
}
override string echo(string value) {
if (trace_) writefln(`echo("%s")`, value);
return value;
}
override string delayedEcho(string value, long milliseconds) {
if (trace_) writef(`delayedEcho("%s", %s ms)... `, value, milliseconds);
Thread.sleep(dur!"msecs"(milliseconds));
if (trace_) writeln("returning.");
return value;
}
override void fail(string reason) {
if (trace_) writefln(`fail("%s")`, reason);
auto ate = new AsyncTestException;
ate.reason = reason;
throw ate;
}
override void delayedFail(string reason, long milliseconds) {
if (trace_) writef(`delayedFail("%s", %s ms)... `, reason, milliseconds);
Thread.sleep(dur!"msecs"(milliseconds));
if (trace_) writeln("returning.");
auto ate = new AsyncTestException;
ate.reason = reason;
throw ate;
}
private:
bool trace_;
AsyncTestException ate_;
}
class PreServeNotifyHandler : TServerEventHandler {
this(Mutex servingMutex, Condition servingCondition) {
servingMutex_ = servingMutex;
servingCondition_ = servingCondition;
}
void preServe() {
synchronized (servingMutex_) {
servingCondition_.notifyAll();
}
}
Variant createContext(TProtocol input, TProtocol output) { return Variant.init; }
void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {}
void preProcess(Variant serverContext, TTransport transport) {}
private:
Mutex servingMutex_;
Condition servingCondition_;
}
class ServerThread(ServerType) : Thread {
this(ushort port, TSSLContext sslContext, bool trace,
TCancellation cancellation, TServerEventHandler eventHandler
) {
port_ = port;
sslContext_ = sslContext;
trace_ = trace;
cancellation_ = cancellation;
eventHandler_ = eventHandler;
super(&run);
}
void run() {
TServerSocket serverSocket;
if (sslContext_) {
serverSocket = new TSSLServerSocket(port_, sslContext_);
} else {
serverSocket = new TServerSocket(port_);
}
auto transportFactory = new TBufferedTransportFactory;
auto protocolFactory = new TBinaryProtocolFactory!();
auto processor = new TServiceProcessor!AsyncTest(new AsyncTestHandler(trace_));
auto server = new ServerType(processor, serverSocket, transportFactory,
protocolFactory);
server.eventHandler = eventHandler_;
writefln("Starting server on port %s...", port_);
server.serve(cancellation_);
writefln("Server thread on port %s done.", port_);
}
private:
ushort port_;
bool trace_;
TCancellation cancellation_;
TSSLContext sslContext_;
TServerEventHandler eventHandler_;
}
class ClientsThread : Thread {
this(TAsyncSocketManager manager, ushort port, TTransportFactory tf,
ushort threads, uint iterations, bool runTimingTests, bool trace
) {
manager_ = manager;
port_ = port;
transportFactory_ = tf;
threads_ = threads;
iterations_ = iterations;
runTimingTests_ = runTimingTests;
trace_ = trace;
super(&run);
}
void run() {
auto transport = new TAsyncSocket(manager_, "localhost", port_);
{
auto client = new TAsyncClient!AsyncTest(
transport,
transportFactory_,
new TBinaryProtocolFactory!()
);
transport.open();
auto clientThreads = new ThreadGroup;
foreach (clientId; 0 .. threads_) {
clientThreads.create({
auto c = clientId;
return {
foreach (i; 0 .. iterations_) {
immutable id = text(port_, ":", c, ":", i);
{
if (trace_) writefln(`Calling echo("%s")... `, id);
auto a = client.echo(id);
enforce(a == id);
if (trace_) writefln(`echo("%s") done.`, id);
}
{
if (trace_) writefln(`Calling fail("%s")... `, id);
auto a = cast(AsyncTestException)collectException(client.fail(id).waitGet());
enforce(a && a.reason == id);
if (trace_) writefln(`fail("%s") done.`, id);
}
}
};
}());
}
clientThreads.joinAll();
transport.close();
}
if (runTimingTests_) {
auto client = new TAsyncClient!AsyncTest(
transport,
transportFactory_,
new TBinaryProtocolFactory!TBufferedTransport
);
// Temporarily redirect error logs to stdout, as SSL errors on the server
// side are expected when the client terminates aburptly (as is the case
// in the timeout test).
auto oldErrorLogSink = g_errorLogSink;
g_errorLogSink = g_infoLogSink;
scope (exit) g_errorLogSink = oldErrorLogSink;
foreach (i; 0 .. iterations_) {
transport.open();
immutable id = text(port_, ":", i);
{
if (trace_) writefln(`Calling delayedEcho("%s", 100 ms)...`, id);
auto a = client.delayedEcho(id, 100);
enforce(!a.completion.wait(dur!"usecs"(1)),
text("wait() succeeded early (", a.get(), ", ", id, ")."));
enforce(!a.completion.wait(dur!"usecs"(1)),
text("wait() succeeded early (", a.get(), ", ", id, ")."));
enforce(a.completion.wait(dur!"msecs"(200)),
text("wait() didn't succeed as expected (", id, ")."));
enforce(a.get() == id);
if (trace_) writefln(`... delayedEcho("%s") done.`, id);
}
{
if (trace_) writefln(`Calling delayedFail("%s", 100 ms)... `, id);
auto a = client.delayedFail(id, 100);
enforce(!a.completion.wait(dur!"usecs"(1)),
text("wait() succeeded early (", id, ", ", collectException(a.get()), ")."));
enforce(!a.completion.wait(dur!"usecs"(1)),
text("wait() succeeded early (", id, ", ", collectException(a.get()), ")."));
enforce(a.completion.wait(dur!"msecs"(200)),
text("wait() didn't succeed as expected (", id, ")."));
auto e = cast(AsyncTestException)collectException(a.get());
enforce(e && e.reason == id);
if (trace_) writefln(`... delayedFail("%s") done.`, id);
}
{
transport.recvTimeout = dur!"msecs"(50);
if (trace_) write(`Calling delayedEcho("socketTimeout", 100 ms)... `);
auto a = client.delayedEcho("socketTimeout", 100);
auto e = cast(TTransportException)collectException(a.waitGet());
enforce(e, text("Operation didn't fail as expected (", id, ")."));
enforce(e.type == TTransportException.Type.TIMED_OUT,
text("Wrong timeout exception type (", id, "): ", e));
if (trace_) writeln(`timed out as expected.`);
// Wait until the server thread reset before the next iteration.
Thread.sleep(dur!"msecs"(50));
transport.recvTimeout = dur!"hnsecs"(0);
}
transport.close();
}
}
writefln("Clients thread for port %s done.", port_);
}
TAsyncSocketManager manager_;
ushort port_;
TTransportFactory transportFactory_;
ushort threads_;
uint iterations_;
bool runTimingTests_;
bool trace_;
}

View file

@ -0,0 +1,28 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
CUR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# Runs the async test in both SSL and non-SSL mode.
${CUR}/async_test > /dev/null || exit 1
echo "Non-SSL tests done."
${CUR}/async_test --ssl > /dev/null || exit 1
echo "SSL tests done."

View file

@ -0,0 +1,416 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
module client_pool_test;
import core.time : Duration, dur;
import core.thread : Thread;
import std.algorithm;
import std.array;
import std.conv;
import std.exception;
import std.getopt;
import std.range;
import std.stdio;
import std.typecons;
import thrift.base;
import thrift.async.libevent;
import thrift.async.socket;
import thrift.codegen.base;
import thrift.codegen.async_client;
import thrift.codegen.async_client_pool;
import thrift.codegen.client;
import thrift.codegen.client_pool;
import thrift.codegen.processor;
import thrift.protocol.binary;
import thrift.server.simple;
import thrift.server.transport.socket;
import thrift.transport.buffered;
import thrift.transport.socket;
import thrift.util.cancellation;
import thrift.util.future;
// We use this as our RPC-layer exception here to make sure socket/… problems
// (that would usually considered to be RPC layer faults) cause the tests to
// fail, even though we are testing the RPC exception handling.
class TestServiceException : TException {
int port;
}
interface TestService {
int getPort();
alias .TestServiceException TestServiceException;
enum methodMeta = [TMethodMeta("getPort", [],
[TExceptionMeta("a", 1, "TestServiceException")])];
}
// Use some derived service, just to check that the pools handle inheritance
// correctly.
interface ExTestService : TestService {
int[] getPortInArray();
enum methodMeta = [TMethodMeta("getPortInArray", [],
[TExceptionMeta("a", 1, "TestServiceException")])];
}
class ExTestHandler : ExTestService {
this(ushort port, Duration delay, bool failing, bool trace) {
this.port = port;
this.delay = delay;
this.failing = failing;
this.trace = trace;
}
override int getPort() {
if (trace) {
stderr.writefln("getPort() called on %s (delay: %s, failing: %s)", port,
delay, failing);
}
sleep();
failIfEnabled();
return port;
}
override int[] getPortInArray() {
return [getPort()];
}
ushort port;
Duration delay;
bool failing;
bool trace;
private:
void sleep() {
if (delay > dur!"hnsecs"(0)) Thread.sleep(delay);
}
void failIfEnabled() {
if (!failing) return;
auto e = new TestServiceException;
e.port = port;
throw e;
}
}
class ServerThread : Thread {
this(ExTestHandler handler, TCancellation cancellation) {
super(&run);
handler_ = handler;
cancellation_ = cancellation;
}
private:
void run() {
try {
auto protocolFactory = new TBinaryProtocolFactory!();
auto processor = new TServiceProcessor!ExTestService(handler_);
auto serverTransport = new TServerSocket(handler_.port);
serverTransport.recvTimeout = dur!"seconds"(3);
auto transportFactory = new TBufferedTransportFactory;
auto server = new TSimpleServer(
processor, serverTransport, transportFactory, protocolFactory);
server.serve(cancellation_);
} catch (Exception e) {
writefln("Server thread on port %s failed: %s", handler_.port, e);
}
}
TCancellation cancellation_;
ExTestHandler handler_;
}
void main(string[] args) {
bool trace;
ushort port = 9090;
getopt(args, "port", &port, "trace", &trace);
auto serverCancellation = new TCancellationOrigin;
scope (exit) serverCancellation.trigger();
immutable ports = cast(immutable)array(map!"cast(ushort)a"(iota(port, port + 6)));
version (none) {
// Cannot use this due to multiple DMD @@BUG@@s:
// 1. »function D main is a nested function and cannot be accessed from array«
// when calling array() on the result of the outer map() would have to
// manually do the eager evaluation/array conversion.
// 2. »Zip.opSlice cannot get frame pointer to map« for the delay argument,
// can be worked around by calling array() on the map result first.
// 3. Even when using the workarounds for the last two points, the DMD-built
// executable crashes when building without (sic!) inlining enabled,
// the backtrace points into the first delegate literal.
auto handlers = array(map!((args){
return new ExTestHandler(args._0, args._1, args._2, trace);
})(zip(
ports,
map!((a){ return dur!`msecs`(a); })([1, 10, 100, 1, 10, 100]),
[false, false, false, true, true, true]
)));
} else {
auto handlers = [
new ExTestHandler(cast(ushort)(port + 0), dur!"msecs"(1), false, trace),
new ExTestHandler(cast(ushort)(port + 1), dur!"msecs"(10), false, trace),
new ExTestHandler(cast(ushort)(port + 2), dur!"msecs"(100), false, trace),
new ExTestHandler(cast(ushort)(port + 3), dur!"msecs"(1), true, trace),
new ExTestHandler(cast(ushort)(port + 4), dur!"msecs"(10), true, trace),
new ExTestHandler(cast(ushort)(port + 5), dur!"msecs"(100), true, trace)
];
}
// Fire up the server threads.
foreach (h; handlers) (new ServerThread(h, serverCancellation)).start();
// Give the servers some time to get up. This should really be accomplished
// via a barrier here and in the preServe() hook.
Thread.sleep(dur!"msecs"(10));
syncClientPoolTest(ports, handlers);
asyncClientPoolTest(ports, handlers);
asyncFastestClientPoolTest(ports, handlers);
asyncAggregatorTest(ports, handlers);
}
void syncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
auto clients = array(map!((a){
return cast(TClientBase!ExTestService)tClient!ExTestService(
tBinaryProtocol(new TSocket("127.0.0.1", a))
);
})(ports));
scope(exit) foreach (c; clients) c.outputProtocol.transport.close();
// Try the case where the first client succeeds.
{
enforce(makePool(clients).getPort() == ports[0]);
}
// Try the case where all clients fail.
{
auto pool = makePool(clients[3 .. $]);
auto e = cast(TCompoundOperationException)collectException(pool.getPort());
enforce(e);
enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
ports[3 .. $]));
}
// Try the case where the first clients fail, but a later one succeeds.
{
auto pool = makePool(clients[3 .. $] ~ clients[0 .. 3]);
enforce(pool.getPortInArray() == [ports[0]]);
}
// Make sure a client is properly deactivated when it has failed too often.
{
auto pool = makePool(clients);
pool.faultDisableCount = 1;
pool.faultDisableDuration = dur!"msecs"(50);
handlers[0].failing = true;
enforce(pool.getPort() == ports[1]);
handlers[0].failing = false;
enforce(pool.getPort() == ports[1]);
Thread.sleep(dur!"msecs"(50));
enforce(pool.getPort() == ports[0]);
}
}
auto makePool(TClientBase!ExTestService[] clients) {
auto p = tClientPool(clients);
p.permuteClients = false;
p.rpcFaultFilter = (Exception e) {
return (cast(TestServiceException)e !is null);
};
return p;
}
void asyncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
auto manager = new TLibeventAsyncManager;
scope (exit) manager.stop(dur!"hnsecs"(0));
auto clients = makeAsyncClients(manager, ports);
scope(exit) foreach (c; clients) c.transport.close();
// Try the case where the first client succeeds.
{
enforce(makeAsyncPool(clients).getPort() == ports[0]);
}
// Try the case where all clients fail.
{
auto pool = makeAsyncPool(clients[3 .. $]);
auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
enforce(e);
enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
ports[3 .. $]));
}
// Try the case where the first clients fail, but a later one succeeds.
{
auto pool = makeAsyncPool(clients[3 .. $] ~ clients[0 .. 3]);
enforce(pool.getPortInArray() == [ports[0]]);
}
// Make sure a client is properly deactivated when it has failed too often.
{
auto pool = makeAsyncPool(clients);
pool.faultDisableCount = 1;
pool.faultDisableDuration = dur!"msecs"(50);
handlers[0].failing = true;
enforce(pool.getPort() == ports[1]);
handlers[0].failing = false;
enforce(pool.getPort() == ports[1]);
Thread.sleep(dur!"msecs"(50));
enforce(pool.getPort() == ports[0]);
}
}
auto makeAsyncPool(TAsyncClientBase!ExTestService[] clients) {
auto p = tAsyncClientPool(clients);
p.permuteClients = false;
p.rpcFaultFilter = (Exception e) {
return (cast(TestServiceException)e !is null);
};
return p;
}
auto makeAsyncClients(TLibeventAsyncManager manager, in ushort[] ports) {
// DMD @@BUG@@ workaround: Using array on the lazyHandlers map result leads
// to »function D main is a nested function and cannot be accessed from array«.
// Thus, we manually do the array conversion.
auto lazyClients = map!((a){
return new TAsyncClient!ExTestService(
new TAsyncSocket(manager, "127.0.0.1", a),
new TBufferedTransportFactory,
new TBinaryProtocolFactory!(TBufferedTransport)
);
})(ports);
TAsyncClientBase!ExTestService[] clients;
foreach (c; lazyClients) clients ~= c;
return clients;
}
void asyncFastestClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
auto manager = new TLibeventAsyncManager;
scope (exit) manager.stop(dur!"hnsecs"(0));
auto clients = makeAsyncClients(manager, ports);
scope(exit) foreach (c; clients) c.transport.close();
// Make sure the fastest client wins, even if they are called in some other
// order.
{
auto result = makeAsyncFastestPool(array(retro(clients))).getPort().waitGet();
enforce(result == ports[0]);
}
// Try the case where all clients fail.
{
auto pool = makeAsyncFastestPool(clients[3 .. $]);
auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
enforce(e);
enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
ports[3 .. $]));
}
// Try the case where the first clients fail, but a later one succeeds.
{
auto pool = makeAsyncFastestPool(clients[1 .. $]);
enforce(pool.getPortInArray() == [ports[1]]);
}
}
auto makeAsyncFastestPool(TAsyncClientBase!ExTestService[] clients) {
auto p = tAsyncFastestClientPool(clients);
p.rpcFaultFilter = (Exception e) {
return (cast(TestServiceException)e !is null);
};
return p;
}
void asyncAggregatorTest(const(ushort)[] ports, ExTestHandler[] handlers) {
auto manager = new TLibeventAsyncManager;
scope (exit) manager.stop(dur!"hnsecs"(0));
auto clients = makeAsyncClients(manager, ports);
scope(exit) foreach (c; clients) c.transport.close();
auto aggregator = tAsyncAggregator(
cast(TAsyncClientBase!ExTestService[])clients);
// Test aggregator range interface.
{
auto range = aggregator.getPort().range(dur!"msecs"(50));
enforce(equal(range, ports[0 .. 2][]));
enforce(equal(map!"a.port"(cast(TestServiceException[])range.exceptions),
ports[3 .. $ - 1]));
enforce(range.completedCount == 4);
}
// Test default accumulator for scalars.
{
auto fullResult = aggregator.getPort().accumulate();
enforce(fullResult.waitGet() == ports[0 .. 3]);
auto partialResult = aggregator.getPort().accumulate();
Thread.sleep(dur!"msecs"(20));
enforce(partialResult.finishGet() == ports[0 .. 2]);
}
// Test default accumulator for arrays.
{
auto fullResult = aggregator.getPortInArray().accumulate();
enforce(fullResult.waitGet() == ports[0 .. 3]);
auto partialResult = aggregator.getPortInArray().accumulate();
Thread.sleep(dur!"msecs"(20));
enforce(partialResult.finishGet() == ports[0 .. 2]);
}
// Test custom accumulator.
{
auto fullResult = aggregator.getPort().accumulate!(function(int[] results){
return reduce!"a + b"(results);
})();
enforce(fullResult.waitGet() == ports[0] + ports[1] + ports[2]);
auto partialResult = aggregator.getPort().accumulate!(
function(int[] results, Exception[] exceptions) {
// Return a tuple of the parameters so we can check them outside of
// this function (to verify the values, we need access to »ports«, but
// due to DMD @@BUG5710@@, we can't use a delegate literal).f
return tuple(results, exceptions);
}
)();
Thread.sleep(dur!"msecs"(20));
auto resultTuple = partialResult.finishGet();
enforce(resultTuple._0 == ports[0 .. 2]);
enforce(equal(map!"a.port"(cast(TestServiceException[])resultTuple._1),
ports[3 .. $ - 1]));
}
}

View file

@ -0,0 +1,14 @@
[ req ]
default_bits = 2048
default_keyfile = server-private-key.pem
distinguished_name = req_distinguished_name
x509_extensions = v3_ca
prompt = no
[ req_distinguished_name ]
CN = localhost
[ v3_ca ]
# Add ::1 to the list of allowed IPs so we can use ::1 to explicitly connect
# to localhost via IPv6.
subjectAltName = IP:::1

View file

@ -0,0 +1,70 @@
/**
* An implementation of the mini serialization benchmark also available for
* C++ and Java.
*
* For meaningful results, you might want to make sure that
* the Thrift library is compiled with release build flags,
* e.g. by including the source files with the build instead
* of linking libthriftd:
*
dmd -w -O -release -inline -I../src -Igen-d -ofserialization_benchmark \
$(find ../src/thrift -name '*.d' -not -name index.d) \
gen-d/DebugProtoTest_types.d serialization_benchmark.d
*/
module serialization_benchmark;
import std.datetime : AutoStart, StopWatch;
import std.math : PI;
import std.stdio;
import thrift.protocol.binary;
import thrift.transport.memory;
import thrift.transport.range;
import DebugProtoTest_types;
void main() {
auto buf = new TMemoryBuffer;
enum ITERATIONS = 10_000_000;
{
auto ooe = OneOfEach();
ooe.im_true = true;
ooe.im_false = false;
ooe.a_bite = 0x7f;
ooe.integer16 = 27_000;
ooe.integer32 = 1 << 24;
ooe.integer64 = 6_000_000_000;
ooe.double_precision = PI;
ooe.some_characters = "JSON THIS! \"\1";
ooe.zomg_unicode = "\xd7\n\a\t";
ooe.base64 = "\1\2\3\255";
auto prot = tBinaryProtocol(buf);
auto sw = StopWatch(AutoStart.yes);
foreach (i; 0 .. ITERATIONS) {
buf.reset(120);
ooe.write(prot);
}
sw.stop();
auto msecs = sw.peek().msecs;
writefln("Write: %s ms (%s kHz)", msecs, ITERATIONS / msecs);
}
auto data = buf.getContents().dup;
{
auto readBuf = tInputRangeTransport(data);
auto prot = tBinaryProtocol(readBuf);
auto ooe = OneOfEach();
auto sw = StopWatch(AutoStart.yes);
foreach (i; 0 .. ITERATIONS) {
readBuf.reset(data);
ooe.read(prot);
}
sw.stop();
auto msecs = sw.peek().msecs;
writefln(" Read: %s ms (%s kHz)", msecs, ITERATIONS / msecs);
}
}

View file

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
module stress_test_server;
import std.getopt;
import std.parallelism : totalCPUs;
import std.stdio;
import std.typetuple;
import thrift.codegen.processor;
import thrift.protocol.binary;
import thrift.server.base;
import thrift.server.transport.socket;
import thrift.transport.buffered;
import thrift.transport.memory;
import thrift.transport.socket;
import thrift.util.hashset;
import test_utils;
import thrift.test.stress.Service;
class ServiceHandler : Service {
void echoVoid() { return; }
byte echoByte(byte arg) { return arg; }
int echoI32(int arg) { return arg; }
long echoI64(long arg) { return arg; }
byte[] echoList(byte[] arg) { return arg; }
HashSet!byte echoSet(HashSet!byte arg) { return arg; }
byte[byte] echoMap(byte[byte] arg) { return arg; }
string echoString(string arg) {
if (arg != "hello") {
stderr.writefln(`Wrong string received: %s instead of "hello"`, arg);
throw new Exception("Wrong string received.");
}
return arg;
}
}
void main(string[] args) {
ushort port = 9091;
auto serverType = ServerType.threaded;
TransportType transportType;
size_t numIOThreads = 1;
size_t taskPoolSize = totalCPUs;
getopt(args, "port", &port, "server-type", &serverType,
"transport-type", &transportType, "task-pool-size", &taskPoolSize,
"num-io-threads", &numIOThreads);
alias TypeTuple!(TBufferedTransport, TMemoryBuffer) AvailableTransports;
auto processor = new TServiceProcessor!(Service,
staticMap!(TBinaryProtocol, AvailableTransports))(new ServiceHandler());
auto serverSocket = new TServerSocket(port);
auto transportFactory = createTransportFactory(transportType);
auto protocolFactory = new TBinaryProtocolFactory!AvailableTransports;
auto server = createServer(serverType, taskPoolSize, numIOThreads,
processor, serverSocket, transportFactory, protocolFactory);
writefln("Starting %s %s StressTest server on port %s...", transportType,
serverType, port);
server.serve();
writeln("done.");
}

View file

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Various helpers used by more than a single test.
*/
module test_utils;
import std.parallelism : TaskPool;
import thrift.protocol.base;
import thrift.protocol.processor;
import thrift.server.base;
import thrift.server.nonblocking;
import thrift.server.simple;
import thrift.server.taskpool;
import thrift.server.threaded;
import thrift.server.transport.socket;
import thrift.transport.base;
import thrift.transport.buffered;
import thrift.transport.framed;
import thrift.transport.http;
// This is a likely victim of @@BUG4744@@ when used with command argument
// parsing.
enum ServerType {
simple,
nonblocking,
pooledNonblocking,
taskpool,
threaded
}
TServer createServer(ServerType type, size_t taskPoolSize, size_t numIOThreads,
TProcessor processor, TServerSocket serverTransport,
TTransportFactory transportFactory, TProtocolFactory protocolFactory)
{
final switch (type) {
case ServerType.simple:
return new TSimpleServer(processor, serverTransport,
transportFactory, protocolFactory);
case ServerType.nonblocking:
auto nb = new TNonblockingServer(processor, serverTransport.port,
transportFactory, protocolFactory);
nb.numIOThreads = numIOThreads;
return nb;
case ServerType.pooledNonblocking:
auto nb = new TNonblockingServer(processor, serverTransport.port,
transportFactory, protocolFactory, new TaskPool(taskPoolSize));
nb.numIOThreads = numIOThreads;
return nb;
case ServerType.taskpool:
auto tps = new TTaskPoolServer(processor, serverTransport,
transportFactory, protocolFactory);
tps.taskPool = new TaskPool(taskPoolSize);
return tps;
case ServerType.threaded:
return new TThreadedServer(processor, serverTransport,
transportFactory, protocolFactory);
}
}
enum TransportType {
buffered,
framed,
http,
raw
}
TTransportFactory createTransportFactory(TransportType type) {
final switch (type) {
case TransportType.buffered:
return new TBufferedTransportFactory;
case TransportType.framed:
return new TFramedTransportFactory;
case TransportType.http:
return new TServerHttpTransportFactory;
case TransportType.raw:
return new TTransportFactory;
}
}

View file

@ -0,0 +1,386 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
module thrift_test_client;
import std.conv;
import std.datetime;
import std.exception : enforce;
import std.getopt;
import std.stdio;
import std.string;
import std.traits;
import thrift.base;
import thrift.codegen.client;
import thrift.protocol.base;
import thrift.protocol.binary;
import thrift.protocol.compact;
import thrift.protocol.json;
import thrift.transport.base;
import thrift.transport.buffered;
import thrift.transport.framed;
import thrift.transport.http;
import thrift.transport.socket;
import thrift.transport.ssl;
import thrift.util.hashset;
import thrift_test_common;
import thrift.test.ThriftTest;
import thrift.test.ThriftTest_types;
enum TransportType {
buffered,
framed,
http,
raw
}
TProtocol createProtocol(T)(T trans, ProtocolType type) {
final switch (type) {
case ProtocolType.binary:
return tBinaryProtocol(trans);
case ProtocolType.compact:
return tCompactProtocol(trans);
case ProtocolType.json:
return tJsonProtocol(trans);
}
}
void main(string[] args) {
string host = "localhost";
ushort port = 9090;
uint numTests = 1;
bool ssl;
ProtocolType protocolType;
TransportType transportType;
bool trace;
getopt(args,
"numTests|n", &numTests,
"protocol", &protocolType,
"ssl", &ssl,
"transport", &transportType,
"trace", &trace,
"port", &port,
"host", (string _, string value) {
auto parts = split(value, ":");
if (parts.length > 1) {
// IPv6 addresses can contain colons, so take the last part for the
// port.
host = join(parts[0 .. $ - 1], ":");
port = to!ushort(parts[$ - 1]);
} else {
host = value;
}
}
);
port = to!ushort(port);
TSocket socket;
if (ssl) {
auto sslContext = new TSSLContext();
sslContext.ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH";
sslContext.authenticate = true;
sslContext.loadTrustedCertificates("../../../test/keys/CA.pem");
socket = new TSSLSocket(sslContext, host, port);
} else {
socket = new TSocket(host, port);
}
TProtocol protocol;
final switch (transportType) {
case TransportType.buffered:
protocol = createProtocol(new TBufferedTransport(socket), protocolType);
break;
case TransportType.framed:
protocol = createProtocol(new TFramedTransport(socket), protocolType);
break;
case TransportType.http:
protocol = createProtocol(
new TClientHttpTransport(socket, host, "/service"), protocolType);
break;
case TransportType.raw:
protocol = createProtocol(socket, protocolType);
break;
}
auto client = tClient!ThriftTest(protocol);
ulong time_min;
ulong time_max;
ulong time_tot;
StopWatch sw;
foreach(test; 0 .. numTests) {
sw.start();
protocol.transport.open();
if (trace) writefln("Test #%s, connect %s:%s", test + 1, host, port);
if (trace) write("testVoid()");
client.testVoid();
if (trace) writeln(" = void");
if (trace) write("testString(\"Test\")");
string s = client.testString("Test");
if (trace) writefln(" = \"%s\"", s);
enforce(s == "Test");
if (trace) write("testByte(1)");
byte u8 = client.testByte(1);
if (trace) writefln(" = %s", u8);
enforce(u8 == 1);
if (trace) write("testI32(-1)");
int i32 = client.testI32(-1);
if (trace) writefln(" = %s", i32);
enforce(i32 == -1);
if (trace) write("testI64(-34359738368)");
long i64 = client.testI64(-34359738368L);
if (trace) writefln(" = %s", i64);
enforce(i64 == -34359738368L);
if (trace) write("testDouble(-5.2098523)");
double dub = client.testDouble(-5.2098523);
if (trace) writefln(" = %s", dub);
enforce(dub == -5.2098523);
// TODO: add testBinary() call
Xtruct out1;
out1.string_thing = "Zero";
out1.byte_thing = 1;
out1.i32_thing = -3;
out1.i64_thing = -5;
if (trace) writef("testStruct(%s)", out1);
auto in1 = client.testStruct(out1);
if (trace) writefln(" = %s", in1);
enforce(in1 == out1);
if (trace) write("testNest({1, {\"Zero\", 1, -3, -5}), 5}");
Xtruct2 out2;
out2.byte_thing = 1;
out2.struct_thing = out1;
out2.i32_thing = 5;
auto in2 = client.testNest(out2);
in1 = in2.struct_thing;
if (trace) writefln(" = {%s, {\"%s\", %s, %s, %s}, %s}", in2.byte_thing,
in1.string_thing, in1.byte_thing, in1.i32_thing, in1.i64_thing,
in2.i32_thing);
enforce(in2 == out2);
int[int] mapout;
for (int i = 0; i < 5; ++i) {
mapout[i] = i - 10;
}
if (trace) writef("testMap({%s})", mapout);
auto mapin = client.testMap(mapout);
if (trace) writefln(" = {%s}", mapin);
enforce(mapin == mapout);
auto setout = new HashSet!int;
for (int i = -2; i < 3; ++i) {
setout ~= i;
}
if (trace) writef("testSet(%s)", setout);
auto setin = client.testSet(setout);
if (trace) writefln(" = %s", setin);
enforce(setin == setout);
int[] listout;
for (int i = -2; i < 3; ++i) {
listout ~= i;
}
if (trace) writef("testList(%s)", listout);
auto listin = client.testList(listout);
if (trace) writefln(" = %s", listin);
enforce(listin == listout);
{
if (trace) write("testEnum(ONE)");
auto ret = client.testEnum(Numberz.ONE);
if (trace) writefln(" = %s", ret);
enforce(ret == Numberz.ONE);
if (trace) write("testEnum(TWO)");
ret = client.testEnum(Numberz.TWO);
if (trace) writefln(" = %s", ret);
enforce(ret == Numberz.TWO);
if (trace) write("testEnum(THREE)");
ret = client.testEnum(Numberz.THREE);
if (trace) writefln(" = %s", ret);
enforce(ret == Numberz.THREE);
if (trace) write("testEnum(FIVE)");
ret = client.testEnum(Numberz.FIVE);
if (trace) writefln(" = %s", ret);
enforce(ret == Numberz.FIVE);
if (trace) write("testEnum(EIGHT)");
ret = client.testEnum(Numberz.EIGHT);
if (trace) writefln(" = %s", ret);
enforce(ret == Numberz.EIGHT);
}
if (trace) write("testTypedef(309858235082523)");
UserId uid = client.testTypedef(309858235082523L);
if (trace) writefln(" = %s", uid);
enforce(uid == 309858235082523L);
if (trace) write("testMapMap(1)");
auto mm = client.testMapMap(1);
if (trace) writefln(" = {%s}", mm);
// Simply doing == doesn't seem to work for nested AAs.
foreach (key, value; mm) {
enforce(testMapMapReturn[key] == value);
}
foreach (key, value; testMapMapReturn) {
enforce(mm[key] == value);
}
Insanity insane;
insane.userMap[Numberz.FIVE] = 5000;
Xtruct truck;
truck.string_thing = "Truck";
truck.byte_thing = 8;
truck.i32_thing = 8;
truck.i64_thing = 8;
insane.xtructs ~= truck;
if (trace) write("testInsanity()");
auto whoa = client.testInsanity(insane);
if (trace) writefln(" = %s", whoa);
// Commented for now, this is cumbersome to write without opEqual getting
// called on AA comparison.
// enforce(whoa == testInsanityReturn);
{
try {
if (trace) write("client.testException(\"Xception\") =>");
client.testException("Xception");
if (trace) writeln(" void\nFAILURE");
throw new Exception("testException failed.");
} catch (Xception e) {
if (trace) writefln(" {%s, \"%s\"}", e.errorCode, e.message);
}
try {
if (trace) write("client.testException(\"TException\") =>");
client.testException("Xception");
if (trace) writeln(" void\nFAILURE");
throw new Exception("testException failed.");
} catch (TException e) {
if (trace) writefln(" {%s}", e.msg);
}
try {
if (trace) write("client.testException(\"success\") =>");
client.testException("success");
if (trace) writeln(" void");
} catch (Exception e) {
if (trace) writeln(" exception\nFAILURE");
throw new Exception("testException failed.");
}
}
{
try {
if (trace) write("client.testMultiException(\"Xception\", \"test 1\") =>");
auto result = client.testMultiException("Xception", "test 1");
if (trace) writeln(" result\nFAILURE");
throw new Exception("testMultiException failed.");
} catch (Xception e) {
if (trace) writefln(" {%s, \"%s\"}", e.errorCode, e.message);
}
try {
if (trace) write("client.testMultiException(\"Xception2\", \"test 2\") =>");
auto result = client.testMultiException("Xception2", "test 2");
if (trace) writeln(" result\nFAILURE");
throw new Exception("testMultiException failed.");
} catch (Xception2 e) {
if (trace) writefln(" {%s, {\"%s\"}}",
e.errorCode, e.struct_thing.string_thing);
}
try {
if (trace) writef("client.testMultiException(\"success\", \"test 3\") =>");
auto result = client.testMultiException("success", "test 3");
if (trace) writefln(" {{\"%s\"}}", result.string_thing);
} catch (Exception e) {
if (trace) writeln(" exception\nFAILURE");
throw new Exception("testMultiException failed.");
}
}
// Do not run oneway test when doing multiple iterations, as it blocks the
// server for three seconds.
if (numTests == 1) {
if (trace) writef("client.testOneway(3) =>");
auto onewayWatch = StopWatch(AutoStart.yes);
client.testOneway(3);
onewayWatch.stop();
if (onewayWatch.peek().msecs > 200) {
if (trace) {
writefln(" FAILURE - took %s ms", onewayWatch.peek().usecs / 1000.0);
}
throw new Exception("testOneway failed.");
} else {
if (trace) {
writefln(" success - took %s ms", onewayWatch.peek().usecs / 1000.0);
}
}
// Redo a simple test after the oneway to make sure we aren't "off by
// one", which would be the case if the server treated oneway methods
// like normal ones.
if (trace) write("re-test testI32(-1)");
i32 = client.testI32(-1);
if (trace) writefln(" = %s", i32);
}
// Time metering.
sw.stop();
immutable tot = sw.peek().usecs;
if (trace) writefln("Total time: %s us\n", tot);
time_tot += tot;
if (time_min == 0 || tot < time_min) {
time_min = tot;
}
if (tot > time_max) {
time_max = tot;
}
protocol.transport.close();
sw.reset();
}
writeln("All tests done.");
if (numTests > 1) {
auto time_avg = time_tot / numTests;
writefln("Min time: %s us", time_min);
writefln("Max time: %s us", time_max);
writefln("Avg time: %s us", time_avg);
}
}

View file

@ -0,0 +1,92 @@
module thrift_test_common;
import std.stdio;
import thrift.test.ThriftTest_types;
enum ProtocolType {
binary,
compact,
json
}
void writeInsanityReturn(in Insanity[Numberz][UserId] insane) {
write("{");
foreach(key1, value1; insane) {
writef("%s => {", key1);
foreach(key2, value2; value1) {
writef("%s => {", key2);
write("{");
foreach(key3, value3; value2.userMap) {
writef("%s => %s, ", key3, value3);
}
write("}, ");
write("{");
foreach (x; value2.xtructs) {
writef("{\"%s\", %s, %s, %s}, ",
x.string_thing, x.byte_thing, x.i32_thing, x.i64_thing);
}
write("}");
write("}, ");
}
write("}, ");
}
write("}");
}
Insanity[Numberz][UserId] testInsanityReturn;
int[int][int] testMapMapReturn;
static this() {
testInsanityReturn = {
Insanity[Numberz][UserId] insane;
Xtruct hello;
hello.string_thing = "Hello2";
hello.byte_thing = 2;
hello.i32_thing = 2;
hello.i64_thing = 2;
Xtruct goodbye;
goodbye.string_thing = "Goodbye4";
goodbye.byte_thing = 4;
goodbye.i32_thing = 4;
goodbye.i64_thing = 4;
Insanity crazy;
crazy.userMap[Numberz.EIGHT] = 8;
crazy.xtructs ~= goodbye;
Insanity looney;
// The C++ TestServer also assigns these to crazy, but that is probably
// an oversight.
looney.userMap[Numberz.FIVE] = 5;
looney.xtructs ~= hello;
Insanity[Numberz] first_map;
first_map[Numberz.TWO] = crazy;
first_map[Numberz.THREE] = crazy;
insane[1] = first_map;
Insanity[Numberz] second_map;
second_map[Numberz.SIX] = looney;
insane[2] = second_map;
return insane;
}();
testMapMapReturn = {
int[int] pos;
int[int] neg;
for (int i = 1; i < 5; i++) {
pos[i] = i;
neg[-i] = -i;
}
int[int][int] result;
result[4] = pos;
result[-4] = neg;
return result;
}();
}

View file

@ -0,0 +1,93 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Runs the D ThriftTest client and servers for all combinations of transport,
# protocol, SSL-mode and server type.
# Pass -k to keep going after failed tests.
CUR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
protocols="binary compact json"
# TODO: fix and enable http
# transports="buffered framed raw http"
transports="buffered framed raw"
servers="simple taskpool threaded"
framed_only_servers="nonblocking pooledNonblocking"
# Don't leave any server instances behind when interrupted (e.g. by Ctrl+C)
# or terminated.
trap "kill $(jobs -p) 2>/dev/null" INT TERM
for protocol in $protocols; do
for ssl in "" " --ssl"; do
for transport in $transports; do
for server in $servers $framed_only_servers; do
case $framed_only_servers in
*$server*) if [ $transport != "framed" ] || [ $ssl != "" ]; then continue; fi;;
esac
args="--transport=$transport --protocol=$protocol$ssl"
${CUR}/thrift_test_server $args --server-type=$server > /dev/null &
server_pid=$!
# Give the server some time to get up and check if it runs (yes, this
# is a huge kludge, should add a connect timeout to test client).
client_rc=-1
if [ "$server" = "taskpool" ]; then
sleep 0.5
else
sleep 0.02
fi
kill -0 $server_pid 2>/dev/null
if [ $? -eq 0 ]; then
${CUR}/thrift_test_client $args --numTests=10 > /dev/null
client_rc=$?
# Temporarily redirect stderr to null to avoid job control messages,
# restore it afterwards.
exec 3>&2
exec 2>/dev/null
kill $server_pid
exec 3>&2
fi
# Get the server exit code (wait should immediately return).
wait $server_pid
server_rc=$?
if [ $client_rc -ne 0 -o $server_rc -eq 1 ]; then
echo -e "\nTests failed for: $args --server-type=$server"
failed="true"
if [ "$1" != "-k" ]; then
exit 1
fi
else
echo -n "."
fi
done
done
done
done
echo
if [ -z "$failed" ]; then
echo "All tests passed."
fi

View file

@ -0,0 +1,286 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
module thrift_test_server;
import core.thread : dur, Thread;
import std.algorithm;
import std.exception : enforce;
import std.getopt;
import std.parallelism : totalCPUs;
import std.string;
import std.stdio;
import std.typetuple : TypeTuple, staticMap;
import thrift.base;
import thrift.codegen.processor;
import thrift.protocol.base;
import thrift.protocol.binary;
import thrift.protocol.compact;
import thrift.protocol.json;
import thrift.server.base;
import thrift.server.transport.socket;
import thrift.server.transport.ssl;
import thrift.transport.base;
import thrift.transport.buffered;
import thrift.transport.framed;
import thrift.transport.http;
import thrift.transport.ssl;
import thrift.util.hashset;
import test_utils;
import thrift_test_common;
import thrift.test.ThriftTest_types;
import thrift.test.ThriftTest;
class TestHandler : ThriftTest {
this(bool trace) {
trace_ = trace;
}
override void testVoid() {
if (trace_) writeln("testVoid()");
}
override string testString(string thing) {
if (trace_) writefln("testString(\"%s\")", thing);
return thing;
}
override byte testByte(byte thing) {
if (trace_) writefln("testByte(%s)", thing);
return thing;
}
override int testI32(int thing) {
if (trace_) writefln("testI32(%s)", thing);
return thing;
}
override long testI64(long thing) {
if (trace_) writefln("testI64(%s)", thing);
return thing;
}
override double testDouble(double thing) {
if (trace_) writefln("testDouble(%s)", thing);
return thing;
}
override string testBinary(string thing) {
if (trace_) writefln("testBinary(\"%s\")", thing);
return thing;
}
override bool testBool(bool thing) {
if (trace_) writefln("testBool(\"%s\")", thing);
return thing;
}
override Xtruct testStruct(ref const(Xtruct) thing) {
if (trace_) writefln("testStruct({\"%s\", %s, %s, %s})",
thing.string_thing, thing.byte_thing, thing.i32_thing, thing.i64_thing);
return thing;
}
override Xtruct2 testNest(ref const(Xtruct2) nest) {
auto thing = nest.struct_thing;
if (trace_) writefln("testNest({%s, {\"%s\", %s, %s, %s}, %s})",
nest.byte_thing, thing.string_thing, thing.byte_thing, thing.i32_thing,
thing.i64_thing, nest.i32_thing);
return nest;
}
override int[int] testMap(int[int] thing) {
if (trace_) writefln("testMap({%s})", thing);
return thing;
}
override HashSet!int testSet(HashSet!int thing) {
if (trace_) writefln("testSet({%s})",
join(map!`to!string(a)`(thing[]), ", "));
return thing;
}
override int[] testList(int[] thing) {
if (trace_) writefln("testList(%s)", thing);
return thing;
}
override Numberz testEnum(Numberz thing) {
if (trace_) writefln("testEnum(%s)", thing);
return thing;
}
override UserId testTypedef(UserId thing) {
if (trace_) writefln("testTypedef(%s)", thing);
return thing;
}
override string[string] testStringMap(string[string] thing) {
if (trace_) writefln("testStringMap(%s)", thing);
return thing;
}
override int[int][int] testMapMap(int hello) {
if (trace_) writefln("testMapMap(%s)", hello);
return testMapMapReturn;
}
override Insanity[Numberz][UserId] testInsanity(ref const(Insanity) argument) {
if (trace_) writeln("testInsanity()");
Insanity[Numberz][UserId] ret;
Insanity[Numberz] m1;
Insanity[Numberz] m2;
Insanity tmp;
tmp = cast(Insanity)argument;
m1[Numberz.TWO] = tmp;
m1[Numberz.THREE] = tmp;
m2[Numberz.SIX] = Insanity();
ret[1] = m1;
ret[2] = m2;
return ret;
}
override Xtruct testMulti(byte arg0, int arg1, long arg2, string[short] arg3,
Numberz arg4, UserId arg5)
{
if (trace_) writeln("testMulti()");
return Xtruct("Hello2", arg0, arg1, arg2);
}
override void testException(string arg) {
if (trace_) writefln("testException(%s)", arg);
if (arg == "Xception") {
auto e = new Xception();
e.errorCode = 1001;
e.message = arg;
throw e;
} else if (arg == "TException") {
throw new TException();
} else if (arg == "ApplicationException") {
throw new TException();
}
}
override Xtruct testMultiException(string arg0, string arg1) {
if (trace_) writefln("testMultiException(%s, %s)", arg0, arg1);
if (arg0 == "Xception") {
auto e = new Xception();
e.errorCode = 1001;
e.message = "This is an Xception";
throw e;
} else if (arg0 == "Xception2") {
auto e = new Xception2();
e.errorCode = 2002;
e.struct_thing.string_thing = "This is an Xception2";
throw e;
} else {
return Xtruct(arg1);
}
}
override void testOneway(int sleepFor) {
if (trace_) writefln("testOneway(%s): Sleeping...", sleepFor);
Thread.sleep(dur!"seconds"(sleepFor));
if (trace_) writefln("testOneway(%s): done sleeping!", sleepFor);
}
private:
bool trace_;
}
void main(string[] args) {
ushort port = 9090;
ServerType serverType;
ProtocolType protocolType;
size_t numIOThreads = 1;
TransportType transportType;
bool ssl;
bool trace;
size_t taskPoolSize = totalCPUs;
getopt(args, "port", &port, "protocol", &protocolType, "server-type",
&serverType, "ssl", &ssl, "num-io-threads", &numIOThreads,
"task-pool-size", &taskPoolSize, "trace", &trace,
"transport", &transportType);
if (serverType == ServerType.nonblocking ||
serverType == ServerType.pooledNonblocking
) {
enforce(transportType == TransportType.framed,
"Need to use framed transport with non-blocking server.");
enforce(!ssl, "The non-blocking server does not support SSL yet.");
// Don't wrap the contents into another layer of framing.
transportType = TransportType.raw;
}
version (ThriftTestTemplates) {
// Only exercise the specialized template code paths if explicitly enabled
// to reduce memory consumption on regular test suite runs there should
// not be much that can go wrong with that specifically anyway.
alias TypeTuple!(TBufferedTransport, TFramedTransport, TServerHttpTransport)
AvailableTransports;
alias TypeTuple!(
staticMap!(TBinaryProtocol, AvailableTransports),
staticMap!(TCompactProtocol, AvailableTransports)
) AvailableProtocols;
} else {
alias TypeTuple!() AvailableTransports;
alias TypeTuple!() AvailableProtocols;
}
TProtocolFactory protocolFactory;
final switch (protocolType) {
case ProtocolType.binary:
protocolFactory = new TBinaryProtocolFactory!AvailableTransports;
break;
case ProtocolType.compact:
protocolFactory = new TCompactProtocolFactory!AvailableTransports;
break;
case ProtocolType.json:
protocolFactory = new TJsonProtocolFactory!AvailableTransports;
break;
}
auto processor = new TServiceProcessor!(ThriftTest, AvailableProtocols)(
new TestHandler(trace));
TServerSocket serverSocket;
if (ssl) {
auto sslContext = new TSSLContext();
sslContext.serverSide = true;
sslContext.loadCertificate("../../../test/keys/server.crt");
sslContext.loadPrivateKey("../../../test/keys/server.key");
sslContext.ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH";
serverSocket = new TSSLServerSocket(port, sslContext);
} else {
serverSocket = new TServerSocket(port);
}
auto transportFactory = createTransportFactory(transportType);
auto server = createServer(serverType, numIOThreads, taskPoolSize,
processor, serverSocket, transportFactory, protocolFactory);
writefln("Starting %s/%s %s ThriftTest server %son port %s...", protocolType,
transportType, serverType, ssl ? "(using SSL) ": "", port);
server.serve();
writeln("done.");
}

View file

@ -0,0 +1,803 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Exercises various transports, combined with the buffered/framed wrappers.
*
* Originally ported from the C++ version, with Windows support code added.
*/
module transport_test;
import core.atomic;
import core.time : Duration;
import core.thread : Thread;
import std.conv : to;
import std.datetime;
import std.exception : enforce;
static import std.file;
import std.getopt;
import std.random : rndGen, uniform, unpredictableSeed;
import std.socket;
import std.stdio;
import std.string;
import std.typetuple;
import thrift.transport.base;
import thrift.transport.buffered;
import thrift.transport.framed;
import thrift.transport.file;
import thrift.transport.http;
import thrift.transport.memory;
import thrift.transport.socket;
import thrift.transport.zlib;
/*
* Size generation helpers used to be able to run the same testing code
* with both constant and random total/chunk sizes.
*/
interface SizeGenerator {
size_t nextSize();
string toString();
}
class ConstantSizeGenerator : SizeGenerator {
this(size_t value) {
value_ = value;
}
override size_t nextSize() {
return value_;
}
override string toString() const {
return to!string(value_);
}
private:
size_t value_;
}
class RandomSizeGenerator : SizeGenerator {
this(size_t min, size_t max) {
min_ = min;
max_ = max;
}
override size_t nextSize() {
return uniform!"[]"(min_, max_);
}
override string toString() const {
return format("rand(%s, %s)", min_, max_);
}
size_t min() const @property {
return min_;
}
size_t max() const @property {
return max_;
}
private:
size_t min_;
size_t max_;
}
/*
* Classes to set up coupled transports
*/
/**
* Helper class to represent a coupled pair of transports.
*
* Data written to the output transport can be read from the input transport.
*
* This is used as the base class for the various coupled transport
* implementations. It shouldn't be used directly.
*/
class CoupledTransports(Transport) if (isTTransport!Transport) {
Transport input;
Transport output;
}
template isCoupledTransports(T) {
static if (is(T _ : CoupledTransports!U, U)) {
enum isCoupledTransports = true;
} else {
enum isCoupledTransports = false;
}
}
/**
* Helper template class for creating coupled transports that wrap
* another transport.
*/
class CoupledWrapperTransports(WrapperTransport, InnerCoupledTransports) if (
isTTransport!WrapperTransport && isCoupledTransports!InnerCoupledTransports
) : CoupledTransports!WrapperTransport {
this() {
inner_ = new InnerCoupledTransports();
if (inner_.input) {
input = new WrapperTransport(inner_.input);
}
if (inner_.output) {
output = new WrapperTransport(inner_.output);
}
}
~this() {
destroy(inner_);
}
private:
InnerCoupledTransports inner_;
}
import thrift.internal.codegen : PApply;
alias PApply!(CoupledWrapperTransports, TBufferedTransport) CoupledBufferedTransports;
alias PApply!(CoupledWrapperTransports, TFramedTransport) CoupledFramedTransports;
alias PApply!(CoupledWrapperTransports, TZlibTransport) CoupledZlibTransports;
/**
* Coupled TMemoryBuffers.
*/
class CoupledMemoryBuffers : CoupledTransports!TMemoryBuffer {
this() {
buf = new TMemoryBuffer;
input = buf;
output = buf;
}
TMemoryBuffer buf;
}
/**
* Coupled TSockets.
*/
class CoupledSocketTransports : CoupledTransports!TSocket {
this() {
auto sockets = socketPair();
input = new TSocket(sockets[0]);
output = new TSocket(sockets[1]);
}
~this() {
input.close();
output.close();
}
}
/**
* Coupled TFileTransports
*/
class CoupledFileTransports : CoupledTransports!TTransport {
this() {
// We actually need the file name of the temp file here, so we can't just
// use the usual tempfile facilities.
do {
fileName_ = tmpDir ~ "/thrift.transport_test." ~ to!string(rndGen().front);
rndGen().popFront();
} while (std.file.exists(fileName_));
writefln("Using temp file: %s", fileName_);
auto writer = new TFileWriterTransport(fileName_);
writer.open();
output = writer;
// Wait until the file has been created.
writer.flush();
auto reader = new TFileReaderTransport(fileName_);
reader.open();
reader.readTimeout(dur!"msecs"(-1));
input = reader;
}
~this() {
input.close();
output.close();
std.file.remove(fileName_);
}
static string tmpDir;
private:
string fileName_;
}
/*
* Test functions
*/
/**
* Test interleaved write and read calls.
*
* Generates a buffer totalSize bytes long, then writes it to the transport,
* and verifies the written data can be read back correctly.
*
* Mode of operation:
* - call wChunkGenerator to figure out how large of a chunk to write
* - call wSizeGenerator to get the size for individual write() calls,
* and do this repeatedly until the entire chunk is written.
* - call rChunkGenerator to figure out how large of a chunk to read
* - call rSizeGenerator to get the size for individual read() calls,
* and do this repeatedly until the entire chunk is read.
* - repeat until the full buffer is written and read back,
* then compare the data read back against the original buffer
*
*
* - If any of the size generators return 0, this means to use the maximum
* possible size.
*
* - If maxOutstanding is non-zero, write chunk sizes will be chosen such that
* there are never more than maxOutstanding bytes waiting to be read back.
*/
void testReadWrite(CoupledTransports)(
size_t totalSize,
SizeGenerator wSizeGenerator,
SizeGenerator rSizeGenerator,
SizeGenerator wChunkGenerator,
SizeGenerator rChunkGenerator,
size_t maxOutstanding
) if (
isCoupledTransports!CoupledTransports
) {
scope transports = new CoupledTransports;
assert(transports.input);
assert(transports.output);
auto wbuf = new ubyte[totalSize];
auto rbuf = new ubyte[totalSize];
// Store some data in wbuf.
foreach (i, ref b; wbuf) {
b = i & 0xff;
}
size_t totalWritten;
size_t totalRead;
while (totalRead < totalSize) {
// Determine how large a chunk of data to write.
auto wChunkSize = wChunkGenerator.nextSize();
if (wChunkSize == 0 || wChunkSize > totalSize - totalWritten) {
wChunkSize = totalSize - totalWritten;
}
// Make sure (totalWritten - totalRead) + wChunkSize is less than
// maxOutstanding.
if (maxOutstanding > 0 &&
wChunkSize > maxOutstanding - (totalWritten - totalRead)) {
wChunkSize = maxOutstanding - (totalWritten - totalRead);
}
// Write the chunk.
size_t chunkWritten = 0;
while (chunkWritten < wChunkSize) {
auto writeSize = wSizeGenerator.nextSize();
if (writeSize == 0 || writeSize > wChunkSize - chunkWritten) {
writeSize = wChunkSize - chunkWritten;
}
transports.output.write(wbuf[totalWritten .. totalWritten + writeSize]);
chunkWritten += writeSize;
totalWritten += writeSize;
}
// Flush the data, so it will be available in the read transport
// Don't flush if wChunkSize is 0. (This should only happen if
// totalWritten == totalSize already, and we're only reading now.)
if (wChunkSize > 0) {
transports.output.flush();
}
// Determine how large a chunk of data to read back.
auto rChunkSize = rChunkGenerator.nextSize();
if (rChunkSize == 0 || rChunkSize > totalWritten - totalRead) {
rChunkSize = totalWritten - totalRead;
}
// Read the chunk.
size_t chunkRead;
while (chunkRead < rChunkSize) {
auto readSize = rSizeGenerator.nextSize();
if (readSize == 0 || readSize > rChunkSize - chunkRead) {
readSize = rChunkSize - chunkRead;
}
size_t bytesRead;
try {
bytesRead = transports.input.read(
rbuf[totalRead .. totalRead + readSize]);
} catch (TTransportException e) {
throw new Exception(format(`read(pos = %s, size = %s) threw ` ~
`exception "%s"; written so far: %s/%s bytes`, totalRead, readSize,
e.msg, totalWritten, totalSize));
}
enforce(bytesRead > 0, format(`read(pos = %s, size = %s) returned %s; ` ~
`written so far: %s/%s bytes`, totalRead, readSize, bytesRead,
totalWritten, totalSize));
chunkRead += bytesRead;
totalRead += bytesRead;
}
}
// make sure the data read back is identical to the data written
if (rbuf != wbuf) {
stderr.writefln("%s vs. %s", wbuf[$ - 4 .. $], rbuf[$ - 4 .. $]);
stderr.writefln("rbuf: %s vs. wbuf: %s", rbuf.length, wbuf.length);
}
enforce(rbuf == wbuf);
}
void testReadPartAvailable(CoupledTransports)() if (
isCoupledTransports!CoupledTransports
) {
scope transports = new CoupledTransports;
assert(transports.input);
assert(transports.output);
ubyte[10] writeBuf = 'a';
ubyte[10] readBuf;
// Attemping to read 10 bytes when only 9 are available should return 9
// immediately.
transports.output.write(writeBuf[0 .. 9]);
transports.output.flush();
auto t = Trigger(dur!"seconds"(3), transports.output, 1);
auto bytesRead = transports.input.read(readBuf);
enforce(t.fired == 0);
enforce(bytesRead == 9);
}
void testReadPartialMidframe(CoupledTransports)() if (
isCoupledTransports!CoupledTransports
) {
scope transports = new CoupledTransports;
assert(transports.input);
assert(transports.output);
ubyte[13] writeBuf = 'a';
ubyte[14] readBuf;
// Attempt to read 10 bytes, when only 9 are available, but after we have
// already read part of the data that is available. This exercises a
// different code path for several of the transports.
//
// For transports that add their own framing (e.g., TFramedTransport and
// TFileTransport), the two flush calls break up the data in to a 10 byte
// frame and a 3 byte frame. The first read then puts us partway through the
// first frame, and then we attempt to read past the end of that frame, and
// through the next frame, too.
//
// For buffered transports that perform read-ahead (e.g.,
// TBufferedTransport), the read-ahead will most likely see all 13 bytes
// written on the first read. The next read will then attempt to read past
// the end of the read-ahead buffer.
//
// Flush 10 bytes, then 3 bytes. This creates 2 separate frames for
// transports that track framing internally.
transports.output.write(writeBuf[0 .. 10]);
transports.output.flush();
transports.output.write(writeBuf[10 .. 13]);
transports.output.flush();
// Now read 4 bytes, so that we are partway through the written data.
auto bytesRead = transports.input.read(readBuf[0 .. 4]);
enforce(bytesRead == 4);
// Now attempt to read 10 bytes. Only 9 more are available.
//
// We should be able to get all 9 bytes, but it might take multiple read
// calls, since it is valid for read() to return fewer bytes than requested.
// (Most transports do immediately return 9 bytes, but the framing transports
// tend to only return to the end of the current frame, which is 6 bytes in
// this case.)
size_t totalRead = 0;
while (totalRead < 9) {
auto t = Trigger(dur!"seconds"(3), transports.output, 1);
bytesRead = transports.input.read(readBuf[4 + totalRead .. 14]);
enforce(t.fired == 0);
enforce(bytesRead > 0);
totalRead += bytesRead;
enforce(totalRead <= 9);
}
enforce(totalRead == 9);
}
void testBorrowPartAvailable(CoupledTransports)() if (
isCoupledTransports!CoupledTransports
) {
scope transports = new CoupledTransports;
assert(transports.input);
assert(transports.output);
ubyte[9] writeBuf = 'a';
ubyte[10] readBuf;
// Attemping to borrow 10 bytes when only 9 are available should return NULL
// immediately.
transports.output.write(writeBuf);
transports.output.flush();
auto t = Trigger(dur!"seconds"(3), transports.output, 1);
auto borrowLen = readBuf.length;
auto borrowedBuf = transports.input.borrow(readBuf.ptr, borrowLen);
enforce(t.fired == 0);
enforce(borrowedBuf is null);
}
void testReadNoneAvailable(CoupledTransports)() if (
isCoupledTransports!CoupledTransports
) {
scope transports = new CoupledTransports;
assert(transports.input);
assert(transports.output);
// Attempting to read when no data is available should either block until
// some data is available, or fail immediately. (e.g., TSocket blocks,
// TMemoryBuffer just fails.)
//
// If the transport blocks, it should succeed once some data is available,
// even if less than the amount requested becomes available.
ubyte[10] readBuf;
auto t = Trigger(dur!"seconds"(1), transports.output, 2);
t.add(dur!"seconds"(1), transports.output, 8);
auto bytesRead = transports.input.read(readBuf);
if (bytesRead == 0) {
enforce(t.fired == 0);
} else {
enforce(t.fired == 1);
enforce(bytesRead == 2);
}
}
void testBorrowNoneAvailable(CoupledTransports)() if (
isCoupledTransports!CoupledTransports
) {
scope transports = new CoupledTransports;
assert(transports.input);
assert(transports.output);
ubyte[16] writeBuf = 'a';
// Attempting to borrow when no data is available should fail immediately
auto t = Trigger(dur!"seconds"(1), transports.output, 10);
auto borrowLen = 10;
auto borrowedBuf = transports.input.borrow(null, borrowLen);
enforce(borrowedBuf is null);
enforce(t.fired == 0);
}
void doRwTest(CoupledTransports)(
size_t totalSize,
SizeGenerator wSizeGen,
SizeGenerator rSizeGen,
SizeGenerator wChunkSizeGen = new ConstantSizeGenerator(0),
SizeGenerator rChunkSizeGen = new ConstantSizeGenerator(0),
size_t maxOutstanding = 0
) if (
isCoupledTransports!CoupledTransports
) {
totalSize = cast(size_t)(totalSize * g_sizeMultiplier);
scope(failure) {
writefln("Test failed for %s: testReadWrite(%s, %s, %s, %s, %s, %s)",
CoupledTransports.stringof, totalSize, wSizeGen, rSizeGen,
wChunkSizeGen, rChunkSizeGen, maxOutstanding);
}
testReadWrite!CoupledTransports(totalSize, wSizeGen, rSizeGen,
wChunkSizeGen, rChunkSizeGen, maxOutstanding);
}
void doBlockingTest(CoupledTransports)() if (
isCoupledTransports!CoupledTransports
) {
void writeFailure(string name) {
writefln("Test failed for %s: %s()", CoupledTransports.stringof, name);
}
{
scope(failure) writeFailure("testReadPartAvailable");
testReadPartAvailable!CoupledTransports();
}
{
scope(failure) writeFailure("testReadPartialMidframe");
testReadPartialMidframe!CoupledTransports();
}
{
scope(failure) writeFailure("testReadNoneAvaliable");
testReadNoneAvailable!CoupledTransports();
}
{
scope(failure) writeFailure("testBorrowPartAvailable");
testBorrowPartAvailable!CoupledTransports();
}
{
scope(failure) writeFailure("testBorrowNoneAvailable");
testBorrowNoneAvailable!CoupledTransports();
}
}
SizeGenerator getGenerator(T)(T t) {
static if (is(T : SizeGenerator)) {
return t;
} else {
return new ConstantSizeGenerator(t);
}
}
template WrappedTransports(T) if (isCoupledTransports!T) {
alias TypeTuple!(
T,
CoupledBufferedTransports!T,
CoupledFramedTransports!T,
CoupledZlibTransports!T
) WrappedTransports;
}
void testRw(C, R, S)(
size_t totalSize,
R wSize,
S rSize
) if (
isCoupledTransports!C && is(typeof(getGenerator(wSize))) &&
is(typeof(getGenerator(rSize)))
) {
testRw!C(totalSize, wSize, rSize, 0, 0, 0);
}
void testRw(C, R, S, T, U)(
size_t totalSize,
R wSize,
S rSize,
T wChunkSize,
U rChunkSize,
size_t maxOutstanding = 0
) if (
isCoupledTransports!C && is(typeof(getGenerator(wSize))) &&
is(typeof(getGenerator(rSize))) && is(typeof(getGenerator(wChunkSize))) &&
is(typeof(getGenerator(rChunkSize)))
) {
foreach (T; WrappedTransports!C) {
doRwTest!T(
totalSize,
getGenerator(wSize),
getGenerator(rSize),
getGenerator(wChunkSize),
getGenerator(rChunkSize),
maxOutstanding
);
}
}
void testBlocking(C)() if (isCoupledTransports!C) {
foreach (T; WrappedTransports!C) {
doBlockingTest!T();
}
}
// A quick hack, for the sake of brevity…
float g_sizeMultiplier = 1;
version (Posix) {
immutable defaultTempDir = "/tmp";
} else version (Windows) {
import core.sys.windows.windows;
extern(Windows) DWORD GetTempPathA(DWORD nBufferLength, LPTSTR lpBuffer);
string defaultTempDir() @property {
char[MAX_PATH + 1] dir;
enforce(GetTempPathA(dir.length, dir.ptr));
return to!string(dir.ptr)[0 .. $ - 1];
}
} else static assert(false);
void main(string[] args) {
int seed = unpredictableSeed();
string tmpDir = defaultTempDir;
getopt(args, "seed", &seed, "size-multiplier", &g_sizeMultiplier,
"tmp-dir", &tmpDir);
enforce(g_sizeMultiplier >= 0, "Size multiplier must not be negative.");
writefln("Using seed: %s", seed);
rndGen().seed(seed);
CoupledFileTransports.tmpDir = tmpDir;
auto rand4k = new RandomSizeGenerator(1, 4096);
/*
* We do the basically the same set of tests for each transport type,
* although we tweak the parameters in some places.
*/
// TMemoryBuffer tests
testRw!CoupledMemoryBuffers(1024 * 1024, 0, 0);
testRw!CoupledMemoryBuffers(1024 * 256, rand4k, rand4k);
testRw!CoupledMemoryBuffers(1024 * 256, 167, 163);
testRw!CoupledMemoryBuffers(1024 * 16, 1, 1);
testRw!CoupledMemoryBuffers(1024 * 256, 0, 0, rand4k, rand4k);
testRw!CoupledMemoryBuffers(1024 * 256, rand4k, rand4k, rand4k, rand4k);
testRw!CoupledMemoryBuffers(1024 * 256, 167, 163, rand4k, rand4k);
testRw!CoupledMemoryBuffers(1024 * 16, 1, 1, rand4k, rand4k);
testBlocking!CoupledMemoryBuffers();
// TSocket tests
enum socketMaxOutstanding = 4096;
testRw!CoupledSocketTransports(1024 * 1024, 0, 0,
0, 0, socketMaxOutstanding);
testRw!CoupledSocketTransports(1024 * 256, rand4k, rand4k,
0, 0, socketMaxOutstanding);
testRw!CoupledSocketTransports(1024 * 256, 167, 163,
0, 0, socketMaxOutstanding);
// Doh. Apparently writing to a socket has some additional overhead for
// each send() call. If we have more than ~400 outstanding 1-byte write
// requests, additional send() calls start blocking.
testRw!CoupledSocketTransports(1024 * 16, 1, 1,
0, 0, 250);
testRw!CoupledSocketTransports(1024 * 256, 0, 0,
rand4k, rand4k, socketMaxOutstanding);
testRw!CoupledSocketTransports(1024 * 256, rand4k, rand4k,
rand4k, rand4k, socketMaxOutstanding);
testRw!CoupledSocketTransports(1024 * 256, 167, 163,
rand4k, rand4k, socketMaxOutstanding);
testRw!CoupledSocketTransports(1024 * 16, 1, 1,
rand4k, rand4k, 250);
testBlocking!CoupledSocketTransports();
// File transport tests.
// Cannot write more than the frame size at once.
enum maxWriteAtOnce = 1024 * 1024 * 16 - 4;
testRw!CoupledFileTransports(1024 * 1024, maxWriteAtOnce, 0);
testRw!CoupledFileTransports(1024 * 256, rand4k, rand4k);
testRw!CoupledFileTransports(1024 * 256, 167, 163);
testRw!CoupledFileTransports(1024 * 16, 1, 1);
testRw!CoupledFileTransports(1024 * 256, 0, 0, rand4k, rand4k);
testRw!CoupledFileTransports(1024 * 256, rand4k, rand4k, rand4k, rand4k);
testRw!CoupledFileTransports(1024 * 256, 167, 163, rand4k, rand4k);
testRw!CoupledFileTransports(1024 * 16, 1, 1, rand4k, rand4k);
testBlocking!CoupledFileTransports();
}
/*
* Timer handling code for use in tests that check the transport blocking
* semantics.
*
* The implementation has been hacked together in a hurry and wastes a lot of
* threads, but speed should not be the concern here.
*/
struct Trigger {
this(Duration timeout, TTransport transport, size_t writeLength) {
mutex_ = new Mutex;
cancelCondition_ = new Condition(mutex_);
info_ = new Info(timeout, transport, writeLength);
startThread();
}
~this() {
synchronized (mutex_) {
info_ = null;
cancelCondition_.notifyAll();
}
if (thread_) thread_.join();
}
@disable this(this) { assert(0); }
void add(Duration timeout, TTransport transport, size_t writeLength) {
synchronized (mutex_) {
auto info = new Info(timeout, transport, writeLength);
if (info_) {
auto prev = info_;
while (prev.next) prev = prev.next;
prev.next = info;
} else {
info_ = info;
startThread();
}
}
}
@property short fired() {
return atomicLoad(fired_);
}
private:
void timerThread() {
// KLUDGE: Make sure the std.concurrency mbox is initialized on the timer
// thread to be able to unblock the file transport.
import std.concurrency;
thisTid;
synchronized (mutex_) {
while (info_) {
auto cancelled = cancelCondition_.wait(info_.timeout);
if (cancelled) {
info_ = null;
break;
}
atomicOp!"+="(fired_, 1);
// Write some data to the transport to unblock it.
auto buf = new ubyte[info_.writeLength];
buf[] = 'b';
info_.transport.write(buf);
info_.transport.flush();
info_ = info_.next;
}
}
thread_ = null;
}
void startThread() {
thread_ = new Thread(&timerThread);
thread_.start();
}
struct Info {
this(Duration timeout, TTransport transport, size_t writeLength) {
this.timeout = timeout;
this.transport = transport;
this.writeLength = writeLength;
}
Duration timeout;
TTransport transport;
size_t writeLength;
Info* next;
}
Info* info_;
Thread thread_;
shared short fired_;
import core.sync.mutex;
Mutex mutex_;
import core.sync.condition;
Condition cancelCondition_;
}