From f27dc4a6abf5a2ad8675d7a4ba0059309631e473 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Fri, 9 Nov 2018 15:44:40 -0800 Subject: [PATCH] Adding support for SLA Drain Host API. --- examples/client.go | 31 +++++++++++++++++++++++++++++++ job.go | 3 --- realis.go | 34 ++++++++++++++++++++++++++++++++++ realis_e2e_test.go | 31 +++++++++++++++++++++++++++++++ 4 files changed, 96 insertions(+), 3 deletions(-) diff --git a/examples/client.go b/examples/client.go index 23ffdc8..4d62630 100644 --- a/examples/client.go +++ b/examples/client.go @@ -554,6 +554,37 @@ func main() { fmt.Print(result.String()) + case "SLADrainHosts": + fmt.Println("Setting hosts to DRAINING using SLA aware draining") + if hostList == "" { + log.Fatal("No hosts specified to drain") + } + hosts := strings.Split(hostList, ",") + + policy := aurora.SlaPolicy{PercentageSlaPolicy: &aurora.PercentageSlaPolicy{Percentage: 50.0}} + + result, err := r.SLADrainHosts(&policy, 30, hosts...) + if err != nil { + log.Fatalf("error: %+v\n", err.Error()) + } + + // Monitor change to DRAINING and DRAINED mode + hostResult, err := monitor.HostMaintenance( + hosts, + []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, + 5, + 10) + if err != nil { + for host, ok := range hostResult { + if !ok { + fmt.Printf("Host %s did not transtion into desired mode(s)\n", host) + } + } + log.Fatalf("error: %+v\n", err.Error()) + } + + fmt.Print(result.String()) + case "endMaintenance": fmt.Println("Setting hosts to ACTIVE") if hostList == "" { diff --git a/job.go b/job.go index eb2da87..0ff2aac 100644 --- a/job.go +++ b/job.go @@ -155,21 +155,18 @@ func (j *AuroraJob) ExecutorData(data string) Job { func (j *AuroraJob) CPU(cpus float64) Job { *j.resources["cpu"].NumCpus = cpus - j.jobConfig.TaskConfig.NumCpus = cpus //Will be deprecated soon return j } func (j *AuroraJob) RAM(ram int64) Job { *j.resources["ram"].RamMb = ram - j.jobConfig.TaskConfig.RamMb = ram //Will be deprecated soon return j } func (j *AuroraJob) Disk(disk int64) Job { *j.resources["disk"].DiskMb = disk - j.jobConfig.TaskConfig.DiskMb = disk //Will be deprecated return j } diff --git a/realis.go b/realis.go index 97e588f..8878b45 100644 --- a/realis.go +++ b/realis.go @@ -73,6 +73,7 @@ type Realis interface { // Admin functions DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error) + SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ...string) (*aurora.DrainHostsResult_, error) StartMaintenance(hosts ...string) (*aurora.Response, *aurora.StartMaintenanceResult_, error) EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error) MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error) @@ -1021,6 +1022,39 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr return resp, result, nil } +// Start SLA Aware Drain. +// defaultSlaPolicy is the fallback SlaPolicy to use if a task does not have an SlaPolicy. +// After timeoutSecs, tasks will be forcefully drained without checking SLA. +func (r *realisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ...string) (*aurora.DrainHostsResult_, error) { + var result *aurora.DrainHostsResult_ + + if len(hosts) == 0 { + return nil, errors.New("no hosts provided to drain") + } + + drainList := aurora.NewHosts() + drainList.HostNames = make(map[string]bool) + for _, host := range hosts { + drainList.HostNames[host] = true + } + + r.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList) + + resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { + return r.adminClient.SlaDrainHosts(drainList, policy, timeout) + }) + + if retryErr != nil { + return result, errors.Wrap(retryErr, "Unable to recover connection") + } + + if resp.GetResult_() != nil { + result = resp.GetResult_().GetDrainHostsResult_() + } + + return result, nil +} + func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aurora.StartMaintenanceResult_, error) { var result *aurora.StartMaintenanceResult_ diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 7c7d25d..9108903 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -567,6 +567,37 @@ func TestRealisClient_DrainHosts(t *testing.T) { } +func TestRealisClient_SLADrainHosts(t *testing.T) { + hosts := []string{"localhost"} + policy := aurora.SlaPolicy{PercentageSlaPolicy: &aurora.PercentageSlaPolicy{Percentage: 50.0}} + + _, err := r.SLADrainHosts(&policy, 30, hosts...) + if err != nil { + fmt.Printf("error: %+v\n", err.Error()) + os.Exit(1) + } + + // Monitor change to DRAINING and DRAINED mode + hostResults, err := monitor.HostMaintenance( + hosts, + []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, + 1, + 50) + assert.Equal(t, map[string]bool{"localhost": true}, hostResults) + assert.NoError(t, err) + + _, _, err = r.EndMaintenance(hosts...) + assert.NoError(t, err) + + // Monitor change to DRAINING and DRAINED mode + _, err = monitor.HostMaintenance( + hosts, + []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, + 5, + 10) + assert.NoError(t, err) +} + // Test multiple go routines using a single connection func TestRealisClient_SessionThreadSafety(t *testing.T) {