Moving from govendor to dep.

Vendor folder has been deleted. Use dep to regenerate vendor folder.
This commit is contained in:
Renan DelValle 2018-01-02 16:39:22 -08:00
parent 9631aa3aab
commit 03278a882b
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9
2181 changed files with 400398 additions and 346 deletions

View file

@ -0,0 +1,30 @@
This directory contains some glue code to allow Thrift RPCs to be sent over
ZeroMQ. Included are client and server implementations for Python and C++,
along with a simple demo interface (with a working client and server for
each language).
Thrift was designed for stream-based interfaces like TCP, but ZeroMQ is
message-based, so there is a small impedance mismatch. Most of issues are
hidden from developers, but one cannot be: oneway methods have to be handled
differently from normal ones. ZeroMQ requires the messaging pattern to be
declared at socket creation time, so an application cannot decide on a
message-by-message basis whether to send a reply. Therefore, this
implementation makes it the client's responsibility to ensure that ZMQ_REQ
sockets are used for normal methods and ZMQ_DOWNSTREAM sockets are used for
oneway methods. In addition, services that expose both types of methods
have to expose two servers (on two ports), but the TZmqMultiServer makes it
easy to run the two together in the same thread.
This code was tested with ZeroMQ 2.0.7 and pyzmq afabbb5b9bd3.
To build, simply install Thrift and ZeroMQ, then run "make". If you install
in a non-standard location, make sure to set THRIFT to the location of the
Thrift code generator on the make command line and PKG_CONFIG_PATH to a path
that includes the pkgconfig files for both Thrift and ZeroMQ. The test
servers take no arguments. Run the test clients with no arguments to
retrieve the stored value or with an integer argument to increment it by
that amount.
This code is not quite what I would consider production-ready. It doesn't
support all of the normal hooks into Thrift, and its performance is
sub-optimal because it does some unnecessary copying.

View file

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "TZmqClient.h"
#include <cstring>
namespace apache { namespace thrift { namespace transport {
uint32_t TZmqClient::read_virt(uint8_t* buf, uint32_t len) {
if (rbuf_.available_read() == 0) {
(void)sock_.recv(&msg_);
rbuf_.resetBuffer((uint8_t*)msg_.data(), msg_.size());
}
return rbuf_.read(buf, len);
}
void TZmqClient::write_virt(const uint8_t* buf, uint32_t len) {
return wbuf_.write(buf, len);
}
uint32_t TZmqClient::writeEnd() {
uint8_t* buf;
uint32_t size;
wbuf_.getBuffer(&buf, &size);
zmq::message_t msg(size);
std::memcpy(msg.data(), buf, size);
(void)sock_.send(msg);
wbuf_.resetBuffer(true);
return size;
}
}}} // apache::thrift::transport

View file

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef _THRIFT_TRANSPORT_TZMQCLIENT_H_
#define _THRIFT_TRANSPORT_TZMQCLIENT_H_ 1
#include <zmq.hpp>
#include <thrift/transport/TBufferTransports.h>
namespace apache { namespace thrift { namespace transport {
class TZmqClient : public TTransport {
public:
TZmqClient(zmq::context_t& ctx, const std::string& endpoint, int type)
: sock_(ctx, type)
, endpoint_(endpoint)
, wbuf_()
, rbuf_()
, msg_()
, zmq_type_(type)
{}
void open() {
if(zmq_type_ == ZMQ_PUB) {
sock_.bind(endpoint_.c_str());
}
else {
sock_.connect(endpoint_.c_str());
}
}
uint32_t read_virt(uint8_t* buf, uint32_t len);
void write_virt(const uint8_t* buf, uint32_t len);
uint32_t writeEnd();
protected:
zmq::socket_t sock_;
std::string endpoint_;
TMemoryBuffer wbuf_;
TMemoryBuffer rbuf_;
zmq::message_t msg_;
int zmq_type_;
};
}}} // apache::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_TZMQCLIENT_H_

View file

@ -0,0 +1,64 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import zmq
from cStringIO import StringIO
from thrift.transport.TTransport import TTransportBase, CReadableTransport
class TZmqClient(TTransportBase, CReadableTransport):
def __init__(self, ctx, endpoint, sock_type):
self._sock = ctx.socket(sock_type)
self._endpoint = endpoint
self._wbuf = StringIO()
self._rbuf = StringIO()
def open(self):
self._sock.connect(self._endpoint)
def read(self, size):
ret = self._rbuf.read(size)
if len(ret) != 0:
return ret
self._read_message()
return self._rbuf.read(size)
def _read_message(self):
msg = self._sock.recv()
self._rbuf = StringIO(msg)
def write(self, buf):
self._wbuf.write(buf)
def flush(self):
msg = self._wbuf.getvalue()
self._wbuf = StringIO()
self._sock.send(msg)
# Implement the CReadableTransport interface.
@property
def cstringio_buf(self):
return self._rbuf
# NOTE: This will probably not actually work.
def cstringio_refill(self, prefix, reqlen):
while len(prefix) < reqlen:
self.read_message()
prefix += self._rbuf.getvalue()
self._rbuf = StringIO(prefix)
return self._rbuf

View file

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "TZmqServer.h"
#include <thrift/transport/TBufferTransports.h>
#include <boost/scoped_ptr.hpp>
using boost::shared_ptr;
using apache::thrift::transport::TMemoryBuffer;
using apache::thrift::protocol::TProtocol;
namespace apache { namespace thrift { namespace server {
bool TZmqServer::serveOne(int recv_flags) {
zmq::message_t msg;
bool received = sock_.recv(&msg, recv_flags);
if (!received) {
return false;
}
shared_ptr<TMemoryBuffer> inputTransport(new TMemoryBuffer((uint8_t*)msg.data(), msg.size()));
shared_ptr<TMemoryBuffer> outputTransport(new TMemoryBuffer());
shared_ptr<TProtocol> inputProtocol(
inputProtocolFactory_->getProtocol(inputTransport));
shared_ptr<TProtocol> outputProtocol(
outputProtocolFactory_->getProtocol(outputTransport));
shared_ptr<TMemoryBuffer> transport(new TMemoryBuffer);
processor_->process(inputProtocol, outputProtocol, NULL);
if (zmq_type_ == ZMQ_REP) {
uint8_t* buf;
uint32_t size;
outputTransport->getBuffer(&buf, &size);
msg.rebuild(size);
std::memcpy(msg.data(), buf, size);
(void)sock_.send(msg);
}
return true;
}
void TZmqMultiServer::serveOne(long timeout) {
boost::scoped_ptr<zmq::pollitem_t> items(setupPoll());
serveActive(items.get(), timeout);
}
void TZmqMultiServer::serveForever() {
boost::scoped_ptr<zmq::pollitem_t> items(setupPoll());
while (true) {
serveActive(items.get(), -1);
}
}
zmq::pollitem_t* TZmqMultiServer::setupPoll() {
zmq::pollitem_t* items = new zmq::pollitem_t[servers_.size()];
for (int i = 0; i < servers_.size(); ++i) {
items[i].socket = servers_[i]->getSocket();
items[i].events = ZMQ_POLLIN;
}
return items;
}
void TZmqMultiServer::serveActive(zmq::pollitem_t* items, long timeout) {
int rc = zmq::poll(items, servers_.size(), timeout);
if (rc == 0) {
return;
}
for (int i = 0; i < servers_.size(); ++i) {
if ((items[i].revents & ZMQ_POLLIN) != 0) {
// Should we pass ZMQ_NOBLOCK here to be safe?
servers_[i]->serveOne();
}
}
}
}}} // apache::thrift::server

View file

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef _THRIFT_SERVER_TZMQSERVER_H_
#define _THRIFT_SERVER_TZMQSERVER_H_ 1
#include <zmq.hpp>
#include <thrift/server/TServer.h>
namespace apache { namespace thrift { namespace server {
class TZmqServer : public TServer {
public:
TZmqServer(
boost::shared_ptr<TProcessor> processor,
zmq::context_t& ctx, const std::string& endpoint, int type)
: TServer(processor)
, processor_(processor)
, zmq_type_(type)
, sock_(ctx, type)
{
if(zmq_type_ == ZMQ_SUB) {
sock_.setsockopt(ZMQ_SUBSCRIBE, "", 0) ; // listen to all messages
sock_.connect(endpoint.c_str()) ;
}
else {
sock_.bind(endpoint.c_str());
}
}
bool serveOne(int recv_flags = 0);
void serve() {
while (true) {
serveOne();
}
}
zmq::socket_t& getSocket() {
return sock_;
}
private:
boost::shared_ptr<TProcessor> processor_;
int zmq_type_;
zmq::socket_t sock_;
};
class TZmqMultiServer {
public:
void serveOne(long timeout = -1);
void serveForever();
std::vector<TZmqServer*>& servers() {
return servers_;
}
private:
zmq::pollitem_t* setupPoll();
void serveActive(zmq::pollitem_t* items, long timeout);
std::vector<TZmqServer*> servers_;
};
}}} // apache::thrift::server
#endif // #ifndef _THRIFT_SERVER_TZMQSERVER_H_

View file

@ -0,0 +1,79 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import logging
import zmq
import thrift.server.TServer
import thrift.transport.TTransport
class TZmqServer(thrift.server.TServer.TServer):
def __init__(self, processor, ctx, endpoint, sock_type):
thrift.server.TServer.TServer.__init__(self, processor, None)
self.zmq_type = sock_type
self.socket = ctx.socket(sock_type)
self.socket.bind(endpoint)
def serveOne(self):
msg = self.socket.recv()
itrans = thrift.transport.TTransport.TMemoryBuffer(msg)
otrans = thrift.transport.TTransport.TMemoryBuffer()
iprot = self.inputProtocolFactory.getProtocol(itrans)
oprot = self.outputProtocolFactory.getProtocol(otrans)
try:
self.processor.process(iprot, oprot)
except Exception:
logging.exception("Exception while processing request")
# Fall through and send back a response, even if empty or incomplete.
if self.zmq_type == zmq.REP:
msg = otrans.getvalue()
self.socket.send(msg)
def serve(self):
while True:
self.serveOne()
class TZmqMultiServer(object):
def __init__(self):
self.servers = []
def serveOne(self, timeout=-1):
self._serveActive(self._setupPoll(), timeout)
def serveForever(self):
poll_info = self._setupPoll()
while True:
self._serveActive(poll_info, -1)
def _setupPoll(self):
server_map = {}
poller = zmq.Poller()
for server in self.servers:
server_map[server.socket] = server
poller.register(server.socket, zmq.POLLIN)
return (server_map, poller)
def _serveActive(self, poll_info, timeout):
(server_map, poller) = poll_info
ready = dict(poller.poll())
for sock, state in ready.items():
assert (state & zmq.POLLIN) != 0
server_map[sock].serveOne()

View file

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
using System.Reflection;
using System.Runtime.CompilerServices;
// Information about this assembly is defined by the following attributes.
// Change them to the values specific to your project.
[assembly: AssemblyTitle("ZmqServer")]
[assembly: AssemblyDescription("Zmq Examples")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("The Apache Software Foundation")]
[assembly: AssemblyProduct("")]
[assembly: AssemblyCopyright("The Apache Software Foundation")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// The assembly version has the format "{Major}.{Minor}.{Build}.{Revision}".
// The form "{Major}.{Minor}.*" will automatically update the build and revision,
// and "{Major}.{Minor}.{Build}.*" will update just the revision.
[assembly: AssemblyVersion("1.0.*")]
// The following attributes are used to specify the signing key for the assembly,
// if desired. See the Mono documentation for more information about signing.
//[assembly: AssemblyDelaySign(false)]
//[assembly: AssemblyKeyFile("")]

View file

@ -0,0 +1,60 @@
using System;
using System.Threading;
using Thrift.Protocol;
using ZMQ;
using ZmqServer;
using ZmqClient;
namespace ZmqServer
{
class MainClass
{
public static void Main (string[] args)
{
new Thread(Server.serve).Start();
Client.work();
}
static class Server{
public static void serve(){
StorageHandler s=new StorageHandler();
Storage.Processor p=new Storage.Processor(s);
ZMQ.Context c=new ZMQ.Context();
TZmqServer tzs=new TZmqServer(p,c,"tcp://127.0.0.1:9090",ZMQ.SocketType.PAIR);
tzs.Serve();
}
class StorageHandler:Storage.Iface{
int val=0;
public void incr(int amount){
val+=amount;
Console.WriteLine("incr({0})",amount);
}
public int get(){
return val;
}
}
}
static class Client{
public static void work()
{
Context ctx=new Context();
TZmqClient tzc=new TZmqClient(ctx,"tcp://127.0.0.1:9090",SocketType.PAIR);
TBinaryProtocol p=new TBinaryProtocol(tzc);
Storage.Client client=new Storage.Client(p);
tzc.Open();
Console.WriteLine(client.@get());
client.incr(1);
client.incr(41);
Console.WriteLine(client.@get());
}
}
}
}

View file

@ -0,0 +1,78 @@
using System;
using ZMQ;
using System.IO;
using Thrift.Transport;
namespace ZmqClient
{
public class TZmqClient : TTransport
{
Socket _sock;
String _endpoint;
MemoryStream _wbuf = new MemoryStream ();
MemoryStream _rbuf = new MemoryStream ();
void debug (string msg)
{
//Uncomment to enable debug
// Console.WriteLine (msg);
}
public TZmqClient (Context ctx, String endpoint, SocketType sockType)
{
_sock = ctx.Socket (sockType);
_endpoint = endpoint;
}
public override void Open ()
{
_sock.Connect (_endpoint);
}
public override void Close ()
{
throw new NotImplementedException ();
}
public override bool IsOpen {
get {
throw new NotImplementedException ();
}
}
public override int Read (byte[] buf, int off, int len)
{
debug ("Client_Read");
if (off != 0 || len != buf.Length)
throw new NotImplementedException ();
if (_rbuf.Length == 0) {
//Fill the Buffer with the complete ZMQ Message which needs to be(?!) the complete Thrift response
debug ("Client_Read Filling buffer..");
byte[] tmpBuf = _sock.Recv ();
debug (string.Format("Client_Read filled with {0}b",tmpBuf.Length));
_rbuf.Write (tmpBuf, 0, tmpBuf.Length);
_rbuf.Position = 0; //For reading
}
int ret = _rbuf.Read (buf, 0, len);
if (_rbuf.Length == _rbuf.Position) //Finished reading
_rbuf.SetLength (0);
debug (string.Format ("Client_Read return {0}b, remaining {1}b", ret, _rbuf.Length - _rbuf.Position));
return ret;
}
public override void Write (byte[] buf, int off, int len)
{
debug ("Client_Write");
_wbuf.Write (buf, off, len);
}
public override void Flush ()
{
debug ("Client_Flush");
_sock.Send (_wbuf.GetBuffer ());
_wbuf = new MemoryStream ();
}
}
}

View file

@ -0,0 +1,56 @@
using System;
using Thrift;
using Thrift.Server;
using Thrift.Transport;
using Thrift.Protocol;
using ZMQ;
using System.IO;
using System.Collections.Generic;
namespace ZmqServer
{
public class TZmqServer
{
Socket _socket ;
TProcessor _processor;
void debug (string msg)
{
//Uncomment to enable debug
// Console.WriteLine (msg);
}
public TZmqServer (TProcessor processor, Context ctx, String endpoint, SocketType sockType)
{
new TSimpleServer (processor,null);
_socket = ctx.Socket (sockType);
_socket.Bind (endpoint);
_processor = processor;
}
public void ServeOne ()
{
debug ("Server_ServeOne");
Byte[] msg = _socket.Recv ();
MemoryStream istream = new MemoryStream (msg);
MemoryStream ostream = new MemoryStream ();
TProtocol tProtocol = new TBinaryProtocol (new TStreamTransport (istream, ostream));
_processor.Process (tProtocol, tProtocol);
if (ostream.Length != 0) {
byte[] newBuf = new byte[ostream.Length];
Array.Copy (ostream.GetBuffer (), newBuf, ostream.Length);
debug (string.Format ("Server_ServeOne sending {0}b", ostream.Length));
_socket.Send (newBuf);
}
}
public void Serve ()
{
while (true)
ServeOne ();
}
}
}

View file

@ -0,0 +1,91 @@
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">x86</Platform>
<ProductVersion>0.10.0</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{17C63B90-DFD7-42AC-A7B0-749E6876C0A1}</ProjectGuid>
<OutputType>Exe</OutputType>
<RootNamespace>ZmqServer</RootNamespace>
<AssemblyName>ThriftZMQ</AssemblyName>
<TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
<FileUpgradeFlags>
</FileUpgradeFlags>
<OldToolsVersion>3.5</OldToolsVersion>
<UpgradeBackupLocation />
<PublishUrl>publish\</PublishUrl>
<Install>true</Install>
<InstallFrom>Disk</InstallFrom>
<UpdateEnabled>false</UpdateEnabled>
<UpdateMode>Foreground</UpdateMode>
<UpdateInterval>7</UpdateInterval>
<UpdateIntervalUnits>Days</UpdateIntervalUnits>
<UpdatePeriodically>false</UpdatePeriodically>
<UpdateRequired>false</UpdateRequired>
<MapFileExtensions>true</MapFileExtensions>
<ApplicationRevision>0</ApplicationRevision>
<ApplicationVersion>0.10.0.%2a</ApplicationVersion>
<IsWebBootstrapper>false</IsWebBootstrapper>
<UseApplicationTrust>false</UseApplicationTrust>
<BootstrapperEnabled>true</BootstrapperEnabled>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x86' ">
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug</OutputPath>
<DefineConstants>DEBUG</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<PlatformTarget>x86</PlatformTarget>
<Externalconsole>true</Externalconsole>
<CodeAnalysisRuleSet>AllRules.ruleset</CodeAnalysisRuleSet>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x86' ">
<DebugType>none</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Release</OutputPath>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<PlatformTarget>x86</PlatformTarget>
<Externalconsole>true</Externalconsole>
<CodeAnalysisRuleSet>AllRules.ruleset</CodeAnalysisRuleSet>
</PropertyGroup>
<ItemGroup>
<Reference Include="clrzmq, Version=2.1.0.0, Culture=neutral, processorArchitecture=x86">
<SpecificVersion>False</SpecificVersion>
<HintPath>.\clrzmq.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="Thrift, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\..\..\lib\csharp\Thrift.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Main.cs" />
<Compile Include="AssemblyInfo.cs" />
<Compile Include="TZmqServer.cs" />
<Compile Include="TZmqClient.cs" />
<Compile Include="..\gen-csharp\Storage.cs" />
</ItemGroup>
<ItemGroup>
<BootstrapperPackage Include="Microsoft.Net.Client.3.5">
<Visible>False</Visible>
<ProductName>.NET Framework 3.5 SP1 Client Profile</ProductName>
<Install>false</Install>
</BootstrapperPackage>
<BootstrapperPackage Include="Microsoft.Net.Framework.3.5.SP1">
<Visible>False</Visible>
<ProductName>.NET Framework 3.5 SP1</ProductName>
<Install>true</Install>
</BootstrapperPackage>
<BootstrapperPackage Include="Microsoft.Windows.Installer.3.1">
<Visible>False</Visible>
<ProductName>Windows Installer 3.1</ProductName>
<Install>true</Install>
</BootstrapperPackage>
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
</Project>

View file

@ -0,0 +1,42 @@

Microsoft Visual Studio Solution File, Format Version 11.00
# Visual Studio 2010
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ThriftZMQ", "ThriftZMQ.csproj", "{17C63B90-DFD7-42AC-A7B0-749E6876C0A1}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Thrift", "..\..\..\lib\csharp\src\Thrift.csproj", "{499EB63C-D74C-47E8-AE48-A2FC94538E9D}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Debug|Mixed Platforms = Debug|Mixed Platforms
Debug|x86 = Debug|x86
Release|Any CPU = Release|Any CPU
Release|Mixed Platforms = Release|Mixed Platforms
Release|x86 = Release|x86
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Debug|Any CPU.ActiveCfg = Debug|x86
{17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Debug|Mixed Platforms.ActiveCfg = Debug|x86
{17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Debug|Mixed Platforms.Build.0 = Debug|x86
{17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Debug|x86.ActiveCfg = Debug|x86
{17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Debug|x86.Build.0 = Debug|x86
{17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Release|Any CPU.ActiveCfg = Release|x86
{17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Release|Mixed Platforms.ActiveCfg = Release|x86
{17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Release|Mixed Platforms.Build.0 = Release|x86
{17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Release|x86.ActiveCfg = Release|x86
{17C63B90-DFD7-42AC-A7B0-749E6876C0A1}.Release|x86.Build.0 = Release|x86
{499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Debug|x86.ActiveCfg = Debug|Any CPU
{499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Release|Any CPU.Build.0 = Release|Any CPU
{499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{499EB63C-D74C-47E8-AE48-A2FC94538E9D}.Release|x86.ActiveCfg = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
EndGlobal

View file

@ -0,0 +1,4 @@
service Storage {
oneway void incr(1: i32 amount);
i32 get();
}

View file

@ -0,0 +1,40 @@
#include <iostream>
#include <cstdlib>
#include <thrift/protocol/TBinaryProtocol.h>
#include "zmq.hpp"
#include "TZmqClient.h"
#include "Storage.h"
using boost::shared_ptr;
using apache::thrift::transport::TZmqClient;
using apache::thrift::protocol::TBinaryProtocol;
int main(int argc, char** argv) {
const char* endpoint = "tcp://127.0.0.1:9090";
int socktype = ZMQ_REQ;
int incr = 0;
if (argc > 1) {
incr = atoi(argv[1]);
if (incr) {
socktype = ZMQ_DOWNSTREAM;
endpoint = "tcp://127.0.0.1:9091";
}
}
zmq::context_t ctx(1);
shared_ptr<TZmqClient> transport(new TZmqClient(ctx, endpoint, socktype));
shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(transport));
StorageClient client(protocol);
transport->open();
if (incr) {
client.incr(incr);
usleep(50000);
} else {
int value = client.get();
std::cout << value << std::endl;
}
return 0;
}

View file

@ -0,0 +1,36 @@
#!/usr/bin/env python
import sys
import time
import zmq
import TZmqClient
import thrift.protocol.TBinaryProtocol
import storage.ttypes
import storage.Storage
def main(args):
endpoint = "tcp://127.0.0.1:9090"
socktype = zmq.REQ
incr = 0
if len(args) > 1:
incr = int(args[1])
if incr:
socktype = zmq.DOWNSTREAM
endpoint = "tcp://127.0.0.1:9091"
ctx = zmq.Context()
transport = TZmqClient.TZmqClient(ctx, endpoint, socktype)
protocol = thrift.protocol.TBinaryProtocol.TBinaryProtocolAccelerated(transport)
client = storage.Storage.Client(protocol)
transport.open()
if incr:
client.incr(incr)
time.sleep(0.05)
else:
value = client.get()
print value
if __name__ == "__main__":
main(sys.argv)

View file

@ -0,0 +1,40 @@
#include "zmq.hpp"
#include "TZmqServer.h"
#include "Storage.h"
using boost::shared_ptr;
using apache::thrift::TProcessor;
using apache::thrift::server::TZmqServer;
using apache::thrift::server::TZmqMultiServer;
class StorageHandler : virtual public StorageIf {
public:
StorageHandler()
: value_(0)
{}
void incr(const int32_t amount) {
value_ += amount;
printf("value_: %i\n", value_) ;
}
int32_t get() {
return value_;
}
private:
int32_t value_;
};
int main(int argc, char *argv[]) {
shared_ptr<StorageHandler> handler(new StorageHandler());
shared_ptr<TProcessor> processor(new StorageProcessor(handler));
zmq::context_t ctx(1);
TZmqServer oneway_server(processor, ctx, "epgm://eth0;239.192.1.1:5555", ZMQ_SUB);
oneway_server.serve();
return 0;
}

View file

@ -0,0 +1,32 @@
#include <iostream>
#include <cstdlib>
#include <thrift/protocol/TBinaryProtocol.h>
#include "zmq.hpp"
#include "TZmqClient.h"
#include "Storage.h"
using boost::shared_ptr;
using apache::thrift::transport::TZmqClient;
using apache::thrift::protocol::TBinaryProtocol;
int main(int argc, char** argv) {
const char* endpoint = "epgm://eth0;239.192.1.1:5555";
int socktype = ZMQ_PUB;
int incr = 1;
if (argc > 1) {
incr = atoi(argv[1]);
}
zmq::context_t ctx(1);
shared_ptr<TZmqClient> transport(new TZmqClient(ctx, endpoint, socktype));
shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(transport));
StorageClient client(protocol);
transport->open();
client.incr(incr);
usleep(50000);
return 0;
}

View file

@ -0,0 +1,43 @@
#include "zmq.hpp"
#include "TZmqServer.h"
#include "Storage.h"
using boost::shared_ptr;
using apache::thrift::TProcessor;
using apache::thrift::server::TZmqServer;
using apache::thrift::server::TZmqMultiServer;
class StorageHandler : virtual public StorageIf {
public:
StorageHandler()
: value_(0)
{}
void incr(const int32_t amount) {
value_ += amount;
}
int32_t get() {
return value_;
}
private:
int32_t value_;
};
int main(int argc, char *argv[]) {
shared_ptr<StorageHandler> handler(new StorageHandler());
shared_ptr<TProcessor> processor(new StorageProcessor(handler));
zmq::context_t ctx(1);
TZmqServer reqrep_server(processor, ctx, "tcp://0.0.0.0:9090", ZMQ_REP);
TZmqServer oneway_server(processor, ctx, "tcp://0.0.0.0:9091", ZMQ_UPSTREAM);
TZmqMultiServer multiserver;
multiserver.servers().push_back(&reqrep_server);
multiserver.servers().push_back(&oneway_server);
multiserver.serveForever();
return 0;
}

View file

@ -0,0 +1,33 @@
#!/usr/bin/env python
import zmq
import TZmqServer
import storage.ttypes
import storage.Storage
class StorageHandler(storage.Storage.Iface):
def __init__(self):
self.value = 0
def incr(self, amount):
self.value += amount
def get(self):
return self.value
def main():
handler = StorageHandler()
processor = storage.Storage.Processor(handler)
ctx = zmq.Context()
reqrep_server = TZmqServer.TZmqServer(processor, ctx, "tcp://0.0.0.0:9090", zmq.REP)
oneway_server = TZmqServer.TZmqServer(processor, ctx, "tcp://0.0.0.0:9091", zmq.UPSTREAM)
multiserver = TZmqServer.TZmqMultiServer()
multiserver.servers.append(reqrep_server)
multiserver.servers.append(oneway_server)
multiserver.serveForever()
if __name__ == "__main__":
main()