Merge pull request #37 from rdelval/auroraAdmin
Added end maintenance API which allows DRAINED hosts to be transition…
This commit is contained in:
commit
8a4a9bdb8c
4 changed files with 320 additions and 34 deletions
|
@ -28,7 +28,7 @@ import (
|
|||
"strings"
|
||||
)
|
||||
|
||||
var cmd, executor, url, clustersConfig, clusterName, updateId, username, password, zkUrl, drainCandidates string
|
||||
var cmd, executor, url, clustersConfig, clusterName, updateId, username, password, zkUrl, hostList string
|
||||
|
||||
var CONNECTION_TIMEOUT = 20000
|
||||
|
||||
|
@ -42,7 +42,7 @@ func init() {
|
|||
flag.StringVar(&username, "username", "aurora", "Username to use for authorization")
|
||||
flag.StringVar(&password, "password", "secret", "Password to use for authorization")
|
||||
flag.StringVar(&zkUrl, "zkurl", "", "zookeeper url")
|
||||
flag.StringVar(&drainCandidates, "drainCandidates", "", "Comma separated list of candidate hosts to drain")
|
||||
flag.StringVar(&hostList, "hostList", "", "Comma separated list of hosts to operate on")
|
||||
flag.Parse()
|
||||
}
|
||||
|
||||
|
@ -501,16 +501,66 @@ func main() {
|
|||
|
||||
case "drainHosts":
|
||||
fmt.Println("Setting hosts to DRAINING")
|
||||
if drainCandidates == "" {
|
||||
if hostList == "" {
|
||||
fmt.Println("No hosts specified to drain")
|
||||
os.Exit(1)
|
||||
}
|
||||
hosts := strings.Split(drainCandidates, ",")
|
||||
hosts := strings.Split(hostList, ",")
|
||||
_, result, err := r.DrainHosts(hosts...)
|
||||
if err != nil {
|
||||
fmt.Printf("error: %+v\n", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Monitor change to DRAINING and DRAINED mode
|
||||
hostResult, err := monitor.HostMaintenance(
|
||||
hosts,
|
||||
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
|
||||
5,
|
||||
10)
|
||||
if err != nil {
|
||||
for host, ok := range hostResult {
|
||||
if !ok {
|
||||
fmt.Printf("Host %s did not transtion into desired mode(s)\n", host)
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf("error: %+v\n", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
fmt.Print(result.String())
|
||||
|
||||
case "endMaintenance":
|
||||
fmt.Println("Setting hosts to ACTIVE")
|
||||
if hostList == "" {
|
||||
fmt.Println("No hosts specified to drain")
|
||||
os.Exit(1)
|
||||
}
|
||||
hosts := strings.Split(hostList, ",")
|
||||
_, result, err := r.EndMaintenance(hosts...)
|
||||
if err != nil {
|
||||
fmt.Printf("error: %+v\n", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Monitor change to DRAINING and DRAINED mode
|
||||
hostResult, err := monitor.HostMaintenance(
|
||||
hosts,
|
||||
[]aurora.MaintenanceMode{aurora.MaintenanceMode_NONE},
|
||||
5,
|
||||
10)
|
||||
if err != nil {
|
||||
for host, ok := range hostResult {
|
||||
if !ok {
|
||||
fmt.Printf("Host %s did not transtion into desired mode(s)\n", host)
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf("error: %+v\n", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
fmt.Print(result.String())
|
||||
|
||||
default:
|
||||
|
|
56
monitors.go
56
monitors.go
|
@ -152,3 +152,59 @@ func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, t
|
|||
fmt.Println("Timed out")
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Monitor host status until all hosts match the status provided. Returns a map where the value is true if the host
|
||||
// is in one of the desired mode(s) or false if it is not as of the time when the monitor exited.
|
||||
func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode, sleepTime, steps int) (map[string]bool, error) {
|
||||
|
||||
// Transform modes to monitor for into a set for easy lookup
|
||||
desiredMode := make(map[aurora.MaintenanceMode]struct{})
|
||||
for _, mode := range modes {
|
||||
desiredMode[mode] = struct{}{}
|
||||
}
|
||||
|
||||
// Turn slice into a host set to eliminate duplicates.
|
||||
// We also can't use a simple count because multiple modes means we can have multiple matches for a single host.
|
||||
// I.e. host A transitions from ACTIVE to DRAINING to DRAINED while monitored
|
||||
remainingHosts := make(map[string]struct{})
|
||||
for _, host := range hosts {
|
||||
remainingHosts[host] = struct{}{}
|
||||
}
|
||||
|
||||
hostResult := make(map[string]bool)
|
||||
|
||||
for step := 0; step < steps; step++ {
|
||||
if step != 0 {
|
||||
time.Sleep(time.Duration(sleepTime) * time.Second)
|
||||
}
|
||||
|
||||
// Client call has multiple retries internally
|
||||
_, result, err := m.Client.MaintenanceStatus(hosts...)
|
||||
if err != nil {
|
||||
// Error is either a payload error or a severe connection error
|
||||
for host := range remainingHosts {
|
||||
hostResult[host] = false
|
||||
}
|
||||
return hostResult, errors.Wrap(err, "client error in monitor")
|
||||
}
|
||||
|
||||
for status := range result.GetStatuses() {
|
||||
|
||||
if _, ok := desiredMode[status.GetMode()]; ok {
|
||||
hostResult[status.GetHost()] = true
|
||||
delete(remainingHosts, status.GetHost())
|
||||
|
||||
if len(remainingHosts) == 0 {
|
||||
return hostResult, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
for host := range remainingHosts {
|
||||
hostResult[host] = false
|
||||
}
|
||||
|
||||
return hostResult, errors.New("Timed out")
|
||||
}
|
||||
|
|
142
realis.go
142
realis.go
|
@ -59,6 +59,8 @@ type Realis interface {
|
|||
|
||||
// Admin functions
|
||||
DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error)
|
||||
EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error)
|
||||
MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error)
|
||||
}
|
||||
|
||||
type realisClient struct {
|
||||
|
@ -346,7 +348,9 @@ func NewDefaultClientUsingUrl(url, user, passwd string) (Realis, error) {
|
|||
config.cluster = nil
|
||||
// Configured for vagrant
|
||||
AddBasicAuth(config, user, passwd)
|
||||
config.backoff = &Backoff{Steps: 2, Duration: 10 * time.Second, Factor: 2.0, Jitter: 0.1}
|
||||
r := newClient(config)
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
|
@ -1164,7 +1168,7 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
|
|||
|
||||
var resp *aurora.Response
|
||||
var result *aurora.DrainHostsResult_
|
||||
var clientErr, payloadErr error
|
||||
var returnErr, clientErr, payloadErr error
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, nil, errors.New("no hosts provided to drain")
|
||||
|
@ -1206,12 +1210,146 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
|
|||
result = resp.GetResult_().GetDrainHostsResult_()
|
||||
}
|
||||
|
||||
// Prioritize returning a bad payload error over a client error as a bad payload error indicates
|
||||
// a deeper issue
|
||||
if payloadErr != nil {
|
||||
returnErr = payloadErr
|
||||
} else {
|
||||
returnErr = clientErr
|
||||
}
|
||||
|
||||
// Timed out on retries. *Note that when we fix the unexpected errors with a correct payload,
|
||||
// this will can become either a timeout error or a payload error
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(clientErr, "Unable to recover connection")
|
||||
return resp, result, errors.Wrap(returnErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
return resp, result, nil
|
||||
}
|
||||
|
||||
func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error) {
|
||||
|
||||
var resp *aurora.Response
|
||||
var result *aurora.EndMaintenanceResult_
|
||||
var returnErr, clientErr, payloadErr error
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, nil, errors.New("no hosts provided to end maintenance on")
|
||||
}
|
||||
|
||||
hostList := aurora.NewHosts()
|
||||
hostList.HostNames = make(map[string]bool)
|
||||
for _, host := range hosts {
|
||||
hostList.HostNames[host] = true
|
||||
}
|
||||
|
||||
retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) {
|
||||
|
||||
// Send thrift call, if we have a thrift send error, attempt to reconnect
|
||||
// and continue trying to resend command
|
||||
if resp, clientErr = r.adminClient.EndMaintenance(hostList); clientErr != nil {
|
||||
// Experienced an connection error
|
||||
err1 := r.ReestablishConn()
|
||||
if err1 != nil {
|
||||
fmt.Println("error in re-establishing connection: ", err1)
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// If error is NOT due to connection
|
||||
if _, payloadErr = response.ResponseCodeCheck(resp); payloadErr != nil {
|
||||
// TODO(rdelvalle): an leader election may cause the response to have
|
||||
// failed when it should have succeeded. Retry everything for now until
|
||||
// we figure out a more concrete fix.
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Successful call
|
||||
return true, nil
|
||||
|
||||
})
|
||||
|
||||
if resp != nil && resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetEndMaintenanceResult_()
|
||||
}
|
||||
|
||||
// Prioritize returning a bad payload error over a client error as a bad payload error indicates
|
||||
// a deeper issue
|
||||
if payloadErr != nil {
|
||||
returnErr = payloadErr
|
||||
} else {
|
||||
returnErr = clientErr
|
||||
}
|
||||
|
||||
// Timed out on retries. *Note that when we fix the unexpected errors with a correct payload,
|
||||
// this will can become either a timeout error or a payload error
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(returnErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
return resp, result, nil
|
||||
}
|
||||
|
||||
func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error) {
|
||||
|
||||
var resp *aurora.Response
|
||||
var result *aurora.MaintenanceStatusResult_
|
||||
var returnErr, clientErr, payloadErr error
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, nil, errors.New("no hosts provided to get maintenance status from")
|
||||
}
|
||||
|
||||
hostList := aurora.NewHosts()
|
||||
hostList.HostNames = make(map[string]bool)
|
||||
for _, host := range hosts {
|
||||
hostList.HostNames[host] = true
|
||||
}
|
||||
|
||||
retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) {
|
||||
|
||||
// Send thrift call, if we have a thrift send error, attempt to reconnect
|
||||
// and continue trying to resend command
|
||||
if resp, clientErr = r.adminClient.MaintenanceStatus(hostList); clientErr != nil {
|
||||
// Experienced an connection error
|
||||
err1 := r.ReestablishConn()
|
||||
if err1 != nil {
|
||||
fmt.Println("error in re-establishing connection: ", err1)
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// If error is NOT due to connection
|
||||
if _, payloadErr = response.ResponseCodeCheck(resp); payloadErr != nil {
|
||||
// TODO(rdelvalle): an leader election may cause the response to have
|
||||
// failed when it should have succeeded. Retry everything for now until
|
||||
// we figure out a more concrete fix.
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Successful call
|
||||
return true, nil
|
||||
|
||||
})
|
||||
|
||||
if resp != nil && resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetMaintenanceStatusResult_()
|
||||
}
|
||||
|
||||
// Prioritize returning a bad payload error over a client error as a bad payload error indicates
|
||||
// a deeper issue
|
||||
if payloadErr != nil {
|
||||
returnErr = payloadErr
|
||||
} else {
|
||||
returnErr = clientErr
|
||||
}
|
||||
|
||||
// Timed out on retries. *Note that when we fix the unexpected errors with a correct payload,
|
||||
// this will can become either a timeout error or a payload error
|
||||
if retryErr != nil {
|
||||
return resp, result, errors.Wrap(returnErr, "Unable to recover connection")
|
||||
}
|
||||
|
||||
return resp, result, nil
|
||||
|
||||
}
|
||||
|
|
|
@ -12,10 +12,11 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package realis
|
||||
package realis_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/rdelval/gorealis"
|
||||
"github.com/rdelval/gorealis/gen-go/apache/aurora"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"io/ioutil"
|
||||
|
@ -24,19 +25,27 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
var r Realis
|
||||
var r realis.Realis
|
||||
var monitor *realis.Monitor
|
||||
var thermosPayload []byte
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
var err error
|
||||
|
||||
// New configuration to connect to Vagrant image
|
||||
r, err = NewDefaultClientUsingUrl("http://192.168.33.7:8081","aurora", "secret")
|
||||
r, err = realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"),
|
||||
realis.BasicAuth("aurora", "secret"),
|
||||
realis.ThriftJSON(),
|
||||
realis.TimeoutMS(20000),
|
||||
realis.BackOff(&realis.Backoff{Steps: 2, Duration: 10 * time.Second, Factor: 2.0, Jitter: 0.1}))
|
||||
if err != nil {
|
||||
fmt.Println("Please run vagrant box before running test suite")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Create monitor
|
||||
monitor = &realis.Monitor{r}
|
||||
|
||||
thermosPayload, err = ioutil.ReadFile("examples/thermos_payload.json")
|
||||
if err != nil {
|
||||
fmt.Println("Error reading thermos payload file: ", err)
|
||||
|
@ -48,10 +57,10 @@ func TestMain(m *testing.M) {
|
|||
|
||||
func TestRealisClient_CreateJob_Thermos(t *testing.T) {
|
||||
|
||||
job := NewJob().
|
||||
job := realis.NewJob().
|
||||
Environment("prod").
|
||||
Role("vagrant").
|
||||
Name("create_job_test").
|
||||
Name("create_thermos_job_test").
|
||||
ExecutorName(aurora.AURORA_EXECUTOR_NAME).
|
||||
ExecutorData(string(thermosPayload)).
|
||||
CPU(1).
|
||||
|
@ -64,38 +73,29 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) {
|
|||
start := time.Now()
|
||||
resp, err := r.CreateJob(job)
|
||||
end := time.Now()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
|
||||
fmt.Printf("Create call took %d ns\n", (end.UnixNano()- start.UnixNano()))
|
||||
fmt.Printf("Create call took %d ns\n", (end.UnixNano() - start.UnixNano()))
|
||||
|
||||
// Tasks must exist for it to be killed
|
||||
t.Run("TestRealisClient_KillJob_Thermos", func(t *testing.T) {
|
||||
start := time.Now()
|
||||
resp, err := r.KillJob(job.JobKey())
|
||||
end := time.Now()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
|
||||
fmt.Printf("Kill call took %d ns\n", (end.UnixNano()- start.UnixNano()))
|
||||
fmt.Printf("Kill call took %d ns\n", (end.UnixNano() - start.UnixNano()))
|
||||
})
|
||||
}
|
||||
|
||||
func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) {
|
||||
|
||||
thermosCronPayload, err := ioutil.ReadFile("examples/thermos_cron_payload.json")
|
||||
if err != nil {
|
||||
fmt.Println("Error reading thermos payload file: ", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
|
||||
job := NewJob().
|
||||
job := realis.NewJob().
|
||||
Environment("prod").
|
||||
Role("vagrant").
|
||||
Name("cronsched_job_test").
|
||||
|
@ -122,24 +122,66 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) {
|
|||
start := time.Now()
|
||||
resp, err := r.StartCronJob(job.JobKey())
|
||||
end := time.Now()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
|
||||
fmt.Printf("Schedule cron call took %d ns\n", (end.UnixNano()- start.UnixNano()))
|
||||
fmt.Printf("Schedule cron call took %d ns\n", (end.UnixNano() - start.UnixNano()))
|
||||
})
|
||||
|
||||
t.Run("TestRealisClient_DeschedulerCronJob_Thermos", func(t *testing.T) {
|
||||
start := time.Now()
|
||||
resp, err := r.DescheduleCronJob(job.JobKey())
|
||||
end := time.Now()
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
|
||||
fmt.Printf("Deschedule cron call took %d ns\n", (end.UnixNano() - start.UnixNano()))
|
||||
})
|
||||
}
|
||||
func TestRealisClient_DrainHosts(t *testing.T) {
|
||||
hosts := []string{"192.168.33.7"}
|
||||
_, _, err := r.DrainHosts(hosts...)
|
||||
if err != nil {
|
||||
fmt.Printf("error: %+v\n", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Monitor change to DRAINING and DRAINED mode
|
||||
hostResults, err := monitor.HostMaintenance(
|
||||
hosts,
|
||||
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
|
||||
5,
|
||||
10)
|
||||
assert.Equal(t, map[string]bool{"192.168.33.7": true}, hostResults)
|
||||
assert.NoError(t, err)
|
||||
|
||||
t.Run("TestRealisClient_MonitorNontransitioned", func(t *testing.T) {
|
||||
// Monitor change to DRAINING and DRAINED mode
|
||||
hostResults, err := monitor.HostMaintenance(
|
||||
append(hosts, "IMAGINARY_HOST"),
|
||||
[]aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING},
|
||||
1,
|
||||
1)
|
||||
|
||||
// Assert monitor returned an error that was not nil, and also a list of the non-transitioned hosts
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, map[string]bool{"192.168.33.7": true, "IMAGINARY_HOST": false}, hostResults)
|
||||
})
|
||||
|
||||
t.Run("TestRealisClient_EndMaintenance", func(t *testing.T) {
|
||||
_, _, err := r.EndMaintenance(hosts...)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
fmt.Printf("error: %+v\n", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode)
|
||||
fmt.Printf("Deschedule cron call took %d ns\n", (end.UnixNano()- start.UnixNano()))
|
||||
// Monitor change to DRAINING and DRAINED mode
|
||||
_, err = monitor.HostMaintenance(
|
||||
hosts,
|
||||
[]aurora.MaintenanceMode{aurora.MaintenanceMode_NONE},
|
||||
5,
|
||||
10)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue