Adding support for SLA Drain Host API.
This commit is contained in:
parent
384e476f91
commit
f27dc4a6ab
4 changed files with 96 additions and 3 deletions
|
@ -554,6 +554,37 @@ func main() {
|
||||||
|
|
||||||
fmt.Print(result.String())
|
fmt.Print(result.String())
|
||||||
|
|
||||||
|
case "SLADrainHosts":
|
||||||
|
fmt.Println("Setting hosts to DRAINING using SLA aware draining")
|
||||||
|
if hostList == "" {
|
||||||
|
log.Fatal("No hosts specified to drain")
|
||||||
|
}
|
||||||
|
hosts := strings.Split(hostList, ",")
|
||||||
|
|
||||||
|
policy := aurora.SlaPolicy{PercentageSlaPolicy: &aurora.PercentageSlaPolicy{Percentage: 50.0}}
|
||||||
|
|
||||||
|
result, err := r.SLADrainHosts(&policy, 30, hosts...)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("error: %+v\n", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Monitor change to DRAINING and DRAINED mode
|
||||||
|
hostResult, err := monitor.HostMaintenance(
|
||||||
|
hosts,
|
||||||
|
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
|
||||||
|
5,
|
||||||
|
10)
|
||||||
|
if err != nil {
|
||||||
|
for host, ok := range hostResult {
|
||||||
|
if !ok {
|
||||||
|
fmt.Printf("Host %s did not transtion into desired mode(s)\n", host)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Fatalf("error: %+v\n", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Print(result.String())
|
||||||
|
|
||||||
case "endMaintenance":
|
case "endMaintenance":
|
||||||
fmt.Println("Setting hosts to ACTIVE")
|
fmt.Println("Setting hosts to ACTIVE")
|
||||||
if hostList == "" {
|
if hostList == "" {
|
||||||
|
|
3
job.go
3
job.go
|
@ -155,21 +155,18 @@ func (j *AuroraJob) ExecutorData(data string) Job {
|
||||||
|
|
||||||
func (j *AuroraJob) CPU(cpus float64) Job {
|
func (j *AuroraJob) CPU(cpus float64) Job {
|
||||||
*j.resources["cpu"].NumCpus = cpus
|
*j.resources["cpu"].NumCpus = cpus
|
||||||
j.jobConfig.TaskConfig.NumCpus = cpus //Will be deprecated soon
|
|
||||||
|
|
||||||
return j
|
return j
|
||||||
}
|
}
|
||||||
|
|
||||||
func (j *AuroraJob) RAM(ram int64) Job {
|
func (j *AuroraJob) RAM(ram int64) Job {
|
||||||
*j.resources["ram"].RamMb = ram
|
*j.resources["ram"].RamMb = ram
|
||||||
j.jobConfig.TaskConfig.RamMb = ram //Will be deprecated soon
|
|
||||||
|
|
||||||
return j
|
return j
|
||||||
}
|
}
|
||||||
|
|
||||||
func (j *AuroraJob) Disk(disk int64) Job {
|
func (j *AuroraJob) Disk(disk int64) Job {
|
||||||
*j.resources["disk"].DiskMb = disk
|
*j.resources["disk"].DiskMb = disk
|
||||||
j.jobConfig.TaskConfig.DiskMb = disk //Will be deprecated
|
|
||||||
|
|
||||||
return j
|
return j
|
||||||
}
|
}
|
||||||
|
|
34
realis.go
34
realis.go
|
@ -73,6 +73,7 @@ type Realis interface {
|
||||||
|
|
||||||
// Admin functions
|
// Admin functions
|
||||||
DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error)
|
DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error)
|
||||||
|
SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ...string) (*aurora.DrainHostsResult_, error)
|
||||||
StartMaintenance(hosts ...string) (*aurora.Response, *aurora.StartMaintenanceResult_, error)
|
StartMaintenance(hosts ...string) (*aurora.Response, *aurora.StartMaintenanceResult_, error)
|
||||||
EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error)
|
EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error)
|
||||||
MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error)
|
MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error)
|
||||||
|
@ -1021,6 +1022,39 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
|
||||||
return resp, result, nil
|
return resp, result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start SLA Aware Drain.
|
||||||
|
// defaultSlaPolicy is the fallback SlaPolicy to use if a task does not have an SlaPolicy.
|
||||||
|
// After timeoutSecs, tasks will be forcefully drained without checking SLA.
|
||||||
|
func (r *realisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ...string) (*aurora.DrainHostsResult_, error) {
|
||||||
|
var result *aurora.DrainHostsResult_
|
||||||
|
|
||||||
|
if len(hosts) == 0 {
|
||||||
|
return nil, errors.New("no hosts provided to drain")
|
||||||
|
}
|
||||||
|
|
||||||
|
drainList := aurora.NewHosts()
|
||||||
|
drainList.HostNames = make(map[string]bool)
|
||||||
|
for _, host := range hosts {
|
||||||
|
drainList.HostNames[host] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
r.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList)
|
||||||
|
|
||||||
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
|
return r.adminClient.SlaDrainHosts(drainList, policy, timeout)
|
||||||
|
})
|
||||||
|
|
||||||
|
if retryErr != nil {
|
||||||
|
return result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.GetResult_() != nil {
|
||||||
|
result = resp.GetResult_().GetDrainHostsResult_()
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aurora.StartMaintenanceResult_, error) {
|
func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aurora.StartMaintenanceResult_, error) {
|
||||||
|
|
||||||
var result *aurora.StartMaintenanceResult_
|
var result *aurora.StartMaintenanceResult_
|
||||||
|
|
|
@ -567,6 +567,37 @@ func TestRealisClient_DrainHosts(t *testing.T) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRealisClient_SLADrainHosts(t *testing.T) {
|
||||||
|
hosts := []string{"localhost"}
|
||||||
|
policy := aurora.SlaPolicy{PercentageSlaPolicy: &aurora.PercentageSlaPolicy{Percentage: 50.0}}
|
||||||
|
|
||||||
|
_, err := r.SLADrainHosts(&policy, 30, hosts...)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("error: %+v\n", err.Error())
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Monitor change to DRAINING and DRAINED mode
|
||||||
|
hostResults, err := monitor.HostMaintenance(
|
||||||
|
hosts,
|
||||||
|
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
|
||||||
|
1,
|
||||||
|
50)
|
||||||
|
assert.Equal(t, map[string]bool{"localhost": true}, hostResults)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
_, _, err = r.EndMaintenance(hosts...)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// Monitor change to DRAINING and DRAINED mode
|
||||||
|
_, err = monitor.HostMaintenance(
|
||||||
|
hosts,
|
||||||
|
[]aurora.MaintenanceMode{aurora.MaintenanceMode_NONE},
|
||||||
|
5,
|
||||||
|
10)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
// Test multiple go routines using a single connection
|
// Test multiple go routines using a single connection
|
||||||
func TestRealisClient_SessionThreadSafety(t *testing.T) {
|
func TestRealisClient_SessionThreadSafety(t *testing.T) {
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue