Thrift Binary protocol is now a choice for users wishing to use it. Default Thrift protocol continues to be JSON.
This commit is contained in:
parent
75c87f34b3
commit
10c12d5a13
1 changed files with 41 additions and 17 deletions
58
realis.go
58
realis.go
|
@ -17,14 +17,12 @@ package realis
|
|||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"git.apache.org/thrift.git/lib/go/thrift"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rdelval/gorealis/gen-go/apache/aurora"
|
||||
"github.com/rdelval/gorealis/response"
|
||||
"net/http"
|
||||
"net/http/cookiejar"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -33,7 +31,7 @@ type Realis interface {
|
|||
AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error)
|
||||
CreateJob(auroraJob Job) (*aurora.Response, error)
|
||||
DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error)
|
||||
GetTaskStatus(query *aurora.TaskQuery)([]*aurora.ScheduledTask, error)
|
||||
GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error)
|
||||
FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error)
|
||||
GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error)
|
||||
JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error)
|
||||
|
@ -52,9 +50,9 @@ type realisClient struct {
|
|||
client *aurora.AuroraSchedulerManagerClient
|
||||
}
|
||||
|
||||
// Wrap object to provide future flexibility
|
||||
// Wrapper object to provide future flexibility
|
||||
type RealisConfig struct {
|
||||
transport thrift.TTransport
|
||||
transport thrift.TTransport
|
||||
protoFactory thrift.TProtocolFactory
|
||||
}
|
||||
|
||||
|
@ -64,26 +62,53 @@ func NewClient(config RealisConfig) Realis {
|
|||
client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory)}
|
||||
}
|
||||
|
||||
// Create a default configuration of the transport layer, requires a URL to test connection with.
|
||||
func NewDefaultConfig(url string, timeoutms int) (RealisConfig, error) {
|
||||
// Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client
|
||||
func defaultTTransport(urlstr string, timeoutms int) (thrift.TTransport, error) {
|
||||
jar, err := cookiejar.New(nil)
|
||||
|
||||
if err != nil {
|
||||
return RealisConfig{}, errors.Wrap(err, "Error creating Cookie Jar")
|
||||
return &thrift.THttpClient{}, errors.Wrap(err, "Error creating Cookie Jar")
|
||||
}
|
||||
|
||||
//Custom client to timeout after 10 seconds to avoid hanging
|
||||
trans, err := thrift.NewTHttpPostClientWithOptions(url+"/api",
|
||||
trans, err := thrift.NewTHttpPostClientWithOptions(urlstr+"/api",
|
||||
thrift.THttpClientOptions{Client: &http.Client{Timeout: time.Millisecond * time.Duration(timeoutms), Jar: jar}})
|
||||
|
||||
if err != nil {
|
||||
return RealisConfig{}, errors.Wrap(err, "Error creating transport")
|
||||
return &thrift.THttpClient{}, errors.Wrap(err, "Error creating transport")
|
||||
}
|
||||
|
||||
if err := trans.Open(); err != nil {
|
||||
fmt.Fprintln(os.Stderr)
|
||||
return RealisConfig{}, errors.Wrapf(err, "Error opening connection to %s", url)
|
||||
return &thrift.THttpClient{}, errors.Wrapf(err, "Error opening connection to %s", urlstr)
|
||||
}
|
||||
|
||||
return trans, nil
|
||||
}
|
||||
|
||||
// Create a default configuration of the transport layer, requires a URL to test connection with.
|
||||
// Uses HTTP Post as transport layer and Thrift JSON as the wire protocol by default.
|
||||
func NewDefaultConfig(url string, timeoutms int) (RealisConfig, error) {
|
||||
return NewTJSONConfig(url, timeoutms)
|
||||
}
|
||||
|
||||
// Creates a realis config object using HTTP Post and Thrift JSON protocol to communicate with Aurora.
|
||||
func NewTJSONConfig(url string, timeoutms int) (RealisConfig, error) {
|
||||
trans, err := defaultTTransport(url, timeoutms)
|
||||
if err != nil {
|
||||
return RealisConfig{}, errors.Wrap(err, "Error creating realis config")
|
||||
}
|
||||
|
||||
httpTrans := (trans).(*thrift.THttpClient)
|
||||
httpTrans.SetHeader("Content-Type", "application/x-thrift")
|
||||
|
||||
return RealisConfig{transport: trans, protoFactory: thrift.NewTJSONProtocolFactory()}, nil
|
||||
}
|
||||
|
||||
// Creates a realis config config using HTTP Post and Thrift Binary protocol to communicate with Aurora.
|
||||
func NewTBinaryConfig(url string, timeoutms int) (RealisConfig, error) {
|
||||
trans, err := defaultTTransport(url, timeoutms)
|
||||
if err != nil {
|
||||
return RealisConfig{}, errors.Wrap(err, "Error creating realis config")
|
||||
}
|
||||
|
||||
httpTrans := (trans).(*thrift.THttpClient)
|
||||
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
|
||||
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
|
||||
|
@ -286,7 +311,7 @@ func (r realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*au
|
|||
return response.ResponseCodeCheck(resp)
|
||||
}
|
||||
|
||||
func (r realisClient) GetTaskStatus(query *aurora.TaskQuery)(tasks []*aurora.ScheduledTask, e error) {
|
||||
func (r realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
|
||||
|
||||
resp, err := r.client.GetTasksStatus(query)
|
||||
if err != nil {
|
||||
|
@ -300,7 +325,6 @@ func (r realisClient) GetTaskStatus(query *aurora.TaskQuery)(tasks []*aurora.Sch
|
|||
return response.ScheduleStatusResult(resp).GetTasks(), nil
|
||||
}
|
||||
|
||||
|
||||
func (r realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) {
|
||||
|
||||
ids := make(map[int32]bool)
|
||||
|
@ -319,7 +343,7 @@ func (r realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskC
|
|||
|
||||
//Check for response code..
|
||||
if resp.GetResponseCode() != aurora.ResponseCode_OK {
|
||||
return nil, errors.New(resp.ResponseCode.String() + "--" +response.CombineMessage(resp))
|
||||
return nil, errors.New(resp.ResponseCode.String() + "--" + response.CombineMessage(resp))
|
||||
}
|
||||
|
||||
tasks := response.ScheduleStatusResult(resp).GetTasks()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue