New API to set hosts to DRAINING. Cleaned up some of the client code, and fixed a few error printing bugs.

This commit is contained in:
Renan DelValle 2017-09-22 12:55:03 -07:00
parent d9f4086853
commit 0d3126c468
3 changed files with 177 additions and 33 deletions

View file

@ -56,12 +56,16 @@ type Realis interface {
ReestablishConn() error
RealisConfig() *RealisConfig
Close()
// Admin functions
DrainHosts(hosts ...string) (*aurora.Response, error)
}
type realisClient struct {
config *RealisConfig
client *aurora.AuroraSchedulerManagerClient
readonlyClient *aurora.ReadOnlySchedulerClient
adminClient *aurora.AuroraAdminClient
}
type option func(*RealisConfig)
@ -151,7 +155,7 @@ func newTBinTransport(url string, timeout int) (thrift.TTransport, error) {
func NewRealisClient(options ...option) (Realis, error) {
config := &RealisConfig{}
fmt.Println(" options length: ", options)
fmt.Println(" options length: ", len(options))
for _, opt := range options {
opt(config)
}
@ -217,7 +221,8 @@ func NewRealisClient(options ...option) (Realis, error) {
return &realisClient{
config: config,
client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory),
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory)}, nil
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory),
adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory)}, nil
}
@ -350,7 +355,8 @@ func newClient(realisconfig *RealisConfig) Realis {
return &realisClient{
config: realisconfig,
client: aurora.NewAuroraSchedulerManagerClientFactory(realisconfig.transport, realisconfig.protoFactory),
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(realisconfig.transport, realisconfig.protoFactory)}
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(realisconfig.transport, realisconfig.protoFactory),
adminClient: aurora.NewAuroraAdminClientFactory(realisconfig.transport, realisconfig.protoFactory)}
}
// Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client
@ -463,6 +469,7 @@ func (r *realisClient) ReestablishConn() error {
AddBasicAuth(r.config, r.config.username, r.config.password)
r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory)
r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory)
r.adminClient = aurora.NewAuroraAdminClientFactory(r.config.transport, r.config.protoFactory)
} else if r.config.url != "" && r.config.username != "" && r.config.password != "" {
//Re-establish using scheduler url.
fmt.Println("ReestablishConn url: ", r.config.url)
@ -484,6 +491,7 @@ func (r *realisClient) ReestablishConn() error {
AddBasicAuth(r.config, r.config.username, r.config.password)
r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory)
r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory)
r.adminClient = aurora.NewAuroraAdminClientFactory(r.config.transport, r.config.protoFactory)
} else {
fmt.Println(" Missing Data for ReestablishConn ")
fmt.Println(" r.config.cluster: ", r.config.cluster)
@ -679,7 +687,7 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
if resp, err = r.client.CreateJob(auroraJob.JobConfig()); err == nil {
return response.ResponseCodeCheck(resp)
}
fmt.Println("CreateJob err: %+v\n", err)
fmt.Printf("CreateJob err: %+v\n", err)
err1 := r.ReestablishConn()
if err1 != nil {
fmt.Println("error in ReestablishConn: ", err1)
@ -1148,3 +1156,52 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
return nil, errors.Wrap(err, "Unable to roll back job update")
}
func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, error) {
var resp *aurora.Response
var clientErr, payloadErr error
fmt.Printf("number of hosts %d", len(hosts))
if len(hosts) == 0 {
return nil, errors.New("no hosts provided to drain")
}
drainList := aurora.NewHosts()
drainList.HostNames = make(map[string]bool)
for _, host := range hosts {
drainList.HostNames[host] = true
}
retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) {
// Send thrift call, if we have a thrift send error, attempt to reconnect
// and continue trying to resend command
if resp, clientErr = r.adminClient.DrainHosts(drainList); clientErr != nil {
// Experienced an error connection
err1 := r.ReestablishConn()
if err1 != nil {
fmt.Println("error in re-establishing connection: ", err1)
}
return false, nil
}
// If error is NOT due to connection, exit loop by returning
// a non nil error
if _, payloadErr = response.ResponseCodeCheck(resp); payloadErr != nil {
return false, payloadErr
}
// Successful call
return true, nil
})
// We can timeout, or we can encounter a non-retriable error
// If this is the case, bubble error up
if retryErr != nil {
return resp, errors.Wrap(clientErr, "Unable to recover connection")
}
return resp, nil
}