From 10c12d5a1343d12c106a9d9069147e3a36307a01 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Mon, 13 Feb 2017 19:31:41 -0500 Subject: [PATCH] Thrift Binary protocol is now a choice for users wishing to use it. Default Thrift protocol continues to be JSON. --- realis.go | 58 +++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/realis.go b/realis.go index ae6ca95..2d1ee47 100644 --- a/realis.go +++ b/realis.go @@ -17,14 +17,12 @@ package realis import ( "encoding/base64" - "fmt" "git.apache.org/thrift.git/lib/go/thrift" "github.com/pkg/errors" "github.com/rdelval/gorealis/gen-go/apache/aurora" "github.com/rdelval/gorealis/response" "net/http" "net/http/cookiejar" - "os" "time" ) @@ -33,7 +31,7 @@ type Realis interface { AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) CreateJob(auroraJob Job) (*aurora.Response, error) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) - GetTaskStatus(query *aurora.TaskQuery)([]*aurora.ScheduledTask, error) + GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) @@ -52,9 +50,9 @@ type realisClient struct { client *aurora.AuroraSchedulerManagerClient } -// Wrap object to provide future flexibility +// Wrapper object to provide future flexibility type RealisConfig struct { - transport thrift.TTransport + transport thrift.TTransport protoFactory thrift.TProtocolFactory } @@ -64,26 +62,53 @@ func NewClient(config RealisConfig) Realis { client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory)} } -// Create a default configuration of the transport layer, requires a URL to test connection with. -func NewDefaultConfig(url string, timeoutms int) (RealisConfig, error) { +// Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client +func defaultTTransport(urlstr string, timeoutms int) (thrift.TTransport, error) { jar, err := cookiejar.New(nil) - if err != nil { - return RealisConfig{}, errors.Wrap(err, "Error creating Cookie Jar") + return &thrift.THttpClient{}, errors.Wrap(err, "Error creating Cookie Jar") } - //Custom client to timeout after 10 seconds to avoid hanging - trans, err := thrift.NewTHttpPostClientWithOptions(url+"/api", + trans, err := thrift.NewTHttpPostClientWithOptions(urlstr+"/api", thrift.THttpClientOptions{Client: &http.Client{Timeout: time.Millisecond * time.Duration(timeoutms), Jar: jar}}) if err != nil { - return RealisConfig{}, errors.Wrap(err, "Error creating transport") + return &thrift.THttpClient{}, errors.Wrap(err, "Error creating transport") } if err := trans.Open(); err != nil { - fmt.Fprintln(os.Stderr) - return RealisConfig{}, errors.Wrapf(err, "Error opening connection to %s", url) + return &thrift.THttpClient{}, errors.Wrapf(err, "Error opening connection to %s", urlstr) } + + return trans, nil +} + +// Create a default configuration of the transport layer, requires a URL to test connection with. +// Uses HTTP Post as transport layer and Thrift JSON as the wire protocol by default. +func NewDefaultConfig(url string, timeoutms int) (RealisConfig, error) { + return NewTJSONConfig(url, timeoutms) +} + +// Creates a realis config object using HTTP Post and Thrift JSON protocol to communicate with Aurora. +func NewTJSONConfig(url string, timeoutms int) (RealisConfig, error) { + trans, err := defaultTTransport(url, timeoutms) + if err != nil { + return RealisConfig{}, errors.Wrap(err, "Error creating realis config") + } + + httpTrans := (trans).(*thrift.THttpClient) + httpTrans.SetHeader("Content-Type", "application/x-thrift") + + return RealisConfig{transport: trans, protoFactory: thrift.NewTJSONProtocolFactory()}, nil +} + +// Creates a realis config config using HTTP Post and Thrift Binary protocol to communicate with Aurora. +func NewTBinaryConfig(url string, timeoutms int) (RealisConfig, error) { + trans, err := defaultTTransport(url, timeoutms) + if err != nil { + return RealisConfig{}, errors.Wrap(err, "Error creating realis config") + } + httpTrans := (trans).(*thrift.THttpClient) httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") @@ -286,7 +311,7 @@ func (r realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*au return response.ResponseCodeCheck(resp) } -func (r realisClient) GetTaskStatus(query *aurora.TaskQuery)(tasks []*aurora.ScheduledTask, e error) { +func (r realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { resp, err := r.client.GetTasksStatus(query) if err != nil { @@ -300,7 +325,6 @@ func (r realisClient) GetTaskStatus(query *aurora.TaskQuery)(tasks []*aurora.Sch return response.ScheduleStatusResult(resp).GetTasks(), nil } - func (r realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) { ids := make(map[int32]bool) @@ -319,7 +343,7 @@ func (r realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskC //Check for response code.. if resp.GetResponseCode() != aurora.ResponseCode_OK { - return nil, errors.New(resp.ResponseCode.String() + "--" +response.CombineMessage(resp)) + return nil, errors.New(resp.ResponseCode.String() + "--" + response.CombineMessage(resp)) } tasks := response.ScheduleStatusResult(resp).GetTasks()