diff --git a/monitors.go b/monitors.go index e8d4380..ab8ba10 100644 --- a/monitors.go +++ b/monitors.go @@ -34,8 +34,12 @@ type Monitor struct { // Polls the scheduler every certain amount of time to see if the update has succeeded func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval, timeout time.Duration) (bool, error) { - if interval < 1*time.Second || timeout < 1*time.Second { - return false, errors.New("Interval or timeout cannot be below one second.") + if interval < 1*time.Second { + interval = interval * time.Second + } + + if timeout < 1*time.Second { + timeout = timeout * time.Second } updateQ := aurora.JobUpdateQuery{ @@ -55,20 +59,15 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval, timeout tim return false, cliErr } - if len(updateDetail.GetDetailsList()) == 0 { + if len(updateDetail) == 0 { m.Client.RealisConfig().logger.Println("No update found") return false, errors.New("No update found for " + updateKey.String()) } - status := updateDetail.GetDetailsList()[0].Update.Summary.State.Status + status := updateDetail[0].Update.Summary.State.Status // Convert Thrift Set to Golang map for quick lookup - activeStatus := map[aurora.JobUpdateStatus]bool{} - for _, stat := range aurora.ACTIVE_JOB_UPDATE_STATES { - activeStatus[stat] = true - } - - if _, ok := activeStatus[status]; !ok { + if _, ok := ActiveJobUpdateStates[status]; !ok { // Rolled forward is the only state in which an update has been successfully updated // if we encounter an inactive state and it is not at rolled forward, update failed @@ -92,17 +91,21 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval, timeout tim } } -// Monitor a AuroraJob until all instances enter one of the LIVE_STATES +// Monitor a AuroraJob until all instances enter one of the LiveStates func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval, timeout time.Duration) (bool, error) { return m.ScheduleStatus(key, instances, aurora.LIVE_STATES, interval, timeout) } // Monitor a AuroraJob until all instances enter a desired status. // Defaults sets of desired statuses provided by the thrift API include: -// ACTIVE_STATES, SLAVE_ASSIGNED_STATES, LIVE_STATES, and TERMINAL_STATES +// ActiveStates, SlaveAssignedStates, LiveStates, and TerminalStates func (m *Monitor) ScheduleStatus(key *aurora.JobKey, instanceCount int32, desiredStatuses []aurora.ScheduleStatus, interval, timeout time.Duration) (bool, error) { - if interval < 1*time.Second || timeout < 1*time.Second { - return false, errors.New("Interval or timeout cannot be below one second.") + if interval < 1*time.Second { + interval = interval * time.Second + } + + if timeout < 1*time.Second { + timeout = timeout * time.Second } ticker := time.NewTicker(interval) @@ -133,8 +136,12 @@ func (m *Monitor) ScheduleStatus(key *aurora.JobKey, instanceCount int32, desire // Monitor host status until all hosts match the status provided. Returns a map where the value is true if the host // is in one of the desired mode(s) or false if it is not as of the time when the monitor exited. func (m *Monitor) HostMaintenance(hosts []string, modes []aurora.MaintenanceMode, interval, timeout time.Duration) (map[string]bool, error) { - if interval < 1*time.Second || timeout < 1*time.Second { - return nil, errors.New("Interval or timeout cannot be below one second.") + if interval < 1*time.Second { + interval = interval * time.Second + } + + if timeout < 1*time.Second { + timeout = timeout * time.Second } // Transform modes to monitor for into a set for easy lookup diff --git a/realis.go b/realis.go index 6aeb155..b59a0eb 100644 --- a/realis.go +++ b/realis.go @@ -606,34 +606,34 @@ func (r *RealisClient) ScheduleCronJob(auroraJob *AuroraJob) error { return nil } -func (r *RealisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) { +func (r *RealisClient) DescheduleCronJob(key *aurora.JobKey) error { r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { + _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.DescheduleCronJob(nil, key) }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending Cron AuroraJob De-schedule message to Aurora Scheduler") + return errors.Wrap(retryErr, "Error sending Cron AuroraJob De-schedule message to Aurora Scheduler") } - return resp, nil + return nil } -func (r *RealisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) { +func (r *RealisClient) StartCronJob(key *aurora.JobKey) error { r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key) - resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { + _, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) { return r.client.StartCronJob(nil, key) }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending Start Cron AuroraJob message to Aurora Scheduler") + return errors.Wrap(retryErr, "Error sending Start Cron AuroraJob message to Aurora Scheduler") } - return resp, nil + return nil } @@ -739,7 +739,7 @@ func (r *RealisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s } // Pulse AuroraJob Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI. -func (r *RealisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.PulseJobUpdateResult_, error) { +func (r *RealisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (aurora.JobUpdatePulseStatus, error) { r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey) @@ -748,10 +748,15 @@ func (r *RealisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.P }) if retryErr != nil { - return nil, errors.Wrap(retryErr, "Error sending PulseJobUpdate command to Aurora Scheduler") + return aurora.JobUpdatePulseStatus(0), errors.Wrap(retryErr, "Error sending PulseJobUpdate command to Aurora Scheduler") + } + + if resp.GetResult_() != nil && resp.GetResult_().GetPulseJobUpdateResult_() != nil { + return resp.GetResult_().GetPulseJobUpdateResult_().GetStatus(), nil + } else { + return aurora.JobUpdatePulseStatus(0), errors.New("Thrift error, field was nil unexpectedly") } - return resp.GetResult_().GetPulseJobUpdateResult_(), nil } // Scale up the number of instances under a job configuration using the configuration for specific @@ -892,7 +897,7 @@ func (r *RealisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task return tasks[0].AssignedTask.Task, nil } -func (r *RealisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.GetJobUpdateDetailsResult_, error) { +func (r *RealisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) ([]*aurora.JobUpdateDetails, error) { r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery) @@ -903,7 +908,12 @@ func (r *RealisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur if retryErr != nil { return nil, errors.Wrap(retryErr, "Unable to get job update details") } - return resp.GetResult_().GetGetJobUpdateDetailsResult_(), nil + + if resp.GetResult_() != nil && resp.GetResult_().GetGetJobUpdateDetailsResult_() != nil { + return resp.GetResult_().GetGetJobUpdateDetailsResult_().GetDetailsList(), nil + } else { + return nil, errors.New("Unknown Thrift error, field is nil.") + } } func (r *RealisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) error { @@ -927,9 +937,7 @@ func (r *RealisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string // Set a list of nodes to DRAINING. This means nothing will be able to be scheduled on them and any existing // tasks will be killed and re-scheduled elsewhere in the cluster. Tasks from DRAINING nodes are not guaranteed // to return to running unless there is enough capacity in the cluster to run them. -func (r *RealisClient) DrainHosts(hosts ...string) (*aurora.DrainHostsResult_, error) { - - var result *aurora.DrainHostsResult_ +func (r *RealisClient) DrainHosts(hosts ...string) ([]*aurora.HostStatus, error) { if len(hosts) == 0 { return nil, errors.New("no hosts provided to drain") @@ -945,21 +953,20 @@ func (r *RealisClient) DrainHosts(hosts ...string) (*aurora.DrainHostsResult_, e }) if retryErr != nil { - return result, errors.Wrap(retryErr, "Unable to recover connection") + return nil, errors.Wrap(retryErr, "Unable to recover connection") } - if resp.GetResult_() != nil { - result = resp.GetResult_().GetDrainHostsResult_() + if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil { + return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil + } else { + return nil, errors.New("Thrift error: Field in response is nil unexpectedly.") } - - return result, nil } // Start SLA Aware Drain. // defaultSlaPolicy is the fallback SlaPolicy to use if a task does not have an SlaPolicy. // After timeoutSecs, tasks will be forcefully drained without checking SLA. -func (r *RealisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ...string) (*aurora.DrainHostsResult_, error) { - var result *aurora.DrainHostsResult_ +func (r *RealisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ...string) ([]*aurora.HostStatus, error) { if len(hosts) == 0 { return nil, errors.New("no hosts provided to drain") @@ -975,19 +982,17 @@ func (r *RealisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, ho }) if retryErr != nil { - return result, errors.Wrap(retryErr, "Unable to recover connection") + return nil, errors.Wrap(retryErr, "Unable to recover connection") } - if resp.GetResult_() != nil { - result = resp.GetResult_().GetDrainHostsResult_() + if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil { + return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil + } else { + return nil, errors.New("Thrift error: Field in response is nil unexpectedly.") } - - return result, nil } -func (r *RealisClient) StartMaintenance(hosts ...string) (*aurora.StartMaintenanceResult_, error) { - - var result *aurora.StartMaintenanceResult_ +func (r *RealisClient) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { if len(hosts) == 0 { return nil, errors.New("no hosts provided to start maintenance on") @@ -1003,19 +1008,17 @@ func (r *RealisClient) StartMaintenance(hosts ...string) (*aurora.StartMaintenan }) if retryErr != nil { - return result, errors.Wrap(retryErr, "Unable to recover connection") + return nil, errors.Wrap(retryErr, "Unable to recover connection") } - if resp.GetResult_() != nil { - result = resp.GetResult_().GetStartMaintenanceResult_() + if resp.GetResult_() != nil && resp.GetResult_().GetStartMaintenanceResult_() != nil { + return resp.GetResult_().GetStartMaintenanceResult_().GetStatuses(), nil + } else { + return nil, errors.New("Thrift error: Field in response is nil unexpectedly.") } - - return result, nil } -func (r *RealisClient) EndMaintenance(hosts ...string) (*aurora.EndMaintenanceResult_, error) { - - var result *aurora.EndMaintenanceResult_ +func (r *RealisClient) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { if len(hosts) == 0 { return nil, errors.New("no hosts provided to end maintenance on") @@ -1031,14 +1034,15 @@ func (r *RealisClient) EndMaintenance(hosts ...string) (*aurora.EndMaintenanceRe }) if retryErr != nil { - return result, errors.Wrap(retryErr, "Unable to recover connection") + return nil, errors.Wrap(retryErr, "Unable to recover connection") } - if resp.GetResult_() != nil { - result = resp.GetResult_().GetEndMaintenanceResult_() + if resp.GetResult_() != nil && resp.GetResult_().GetEndMaintenanceResult_() != nil { + return resp.GetResult_().GetEndMaintenanceResult_().GetStatuses(), nil + } else { + return nil, errors.New("Thrift error: Field in response is nil unexpectedly.") } - return result, nil } func (r *RealisClient) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusResult_, error) { @@ -1105,7 +1109,12 @@ func (r *RealisClient) GetQuota(role string) (*aurora.GetQuotaResult_, error) { if retryErr != nil { return nil, errors.Wrap(retryErr, "Unable to get role quota") } - return resp.GetResult_().GetGetQuotaResult_(), retryErr + + if resp.GetResult_() != nil { + return resp.GetResult_().GetGetQuotaResult_(), nil + } else { + return nil, errors.New("Thrift error: Field in response is nil unexpectedly.") + } } // Force Aurora Scheduler to perform a snapshot and write to Mesos log diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 5ecdbda..80f3973 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -24,7 +24,6 @@ import ( "github.com/paypal/gorealis" "github.com/paypal/gorealis/gen-go/apache/aurora" - "github.com/paypal/gorealis/response" "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) @@ -39,7 +38,7 @@ func TestMain(m *testing.M) { // New configuration to connect to docker container r, err = realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"), realis.BasicAuth("aurora", "secret"), - realis.TimeoutMS(20000)) + realis.Timeout(20*time.Second)) if err != nil { fmt.Println("Please run docker-compose up -d before running test suite") @@ -69,7 +68,7 @@ func TestNonExistentEndpoint(t *testing.T) { // Attempt to connect to a bad endpoint r, err := realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081/doesntexist/"), - realis.TimeoutMS(200), + realis.Timeout(200*time.Millisecond), realis.BackOff(backoff), ) defer r.Close() @@ -93,7 +92,7 @@ func TestNonExistentEndpoint(t *testing.T) { func TestThriftBinary(t *testing.T) { r, err := realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"), realis.BasicAuth("aurora", "secret"), - realis.TimeoutMS(20000), + realis.Timeout(20*time.Second), realis.ThriftBinary()) assert.NoError(t, err) @@ -115,7 +114,7 @@ func TestThriftBinary(t *testing.T) { func TestThriftJSON(t *testing.T) { r, err := realis.NewRealisClient(realis.SchedulerUrl("http://192.168.33.7:8081"), realis.BasicAuth("aurora", "secret"), - realis.TimeoutMS(20000), + realis.Timeout(20*time.Second), realis.ThriftJSON()) assert.NoError(t, err) @@ -193,18 +192,16 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { InstanceCount(2). AddPorts(1) - resp, err := r.CreateJob(job) + err := r.CreateJob(job) assert.NoError(t, err) - assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) - // Test Instances Monitor - success, err := monitor.Instances(job.JobKey(), job.GetInstanceCount(), 1, 50) + success, err := monitor.Instances(job.JobKey(), job.GetInstanceCount(), 1*time.Second, 50*time.Second) assert.True(t, success) assert.NoError(t, err) //Fetch all Jobs - _, result, err := r.GetJobs(role) + result, err := r.GetJobs(role) fmt.Printf("GetJobs length: %+v \n", len(result.Configs)) assert.Len(t, result.Configs, 1) assert.NoError(t, err) @@ -223,12 +220,10 @@ func TestRealisClient_CreateJob_Thermos(t *testing.T) { // Tasks must exist for it to, be killed t.Run("TestRealisClient_KillJob_Thermos", func(t *testing.T) { - resp, err := r.KillJob(job.JobKey()) + err := r.KillJob(job.JobKey()) assert.NoError(t, err) - assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) - - success, err := monitor.Instances(job.JobKey(), 0, 1, 50) + success, err := monitor.Instances(job.JobKey(), 0, 1*time.Second, 50*time.Second) assert.True(t, success) assert.NoError(t, err) }) @@ -249,9 +244,8 @@ func TestRealisClient_CreateJob_ExecutorDoesNotExist(t *testing.T) { Disk(10). InstanceCount(1) - resp, err := r.CreateJob(job) + err := r.CreateJob(job) assert.Error(t, err) - assert.Equal(t, aurora.ResponseCode_INVALID_REQUEST, resp.GetResponseCode()) } // Test configuring an executor that doesn't exist for CreateJob API @@ -273,9 +267,8 @@ func TestRealisClient_GetPendingReason(t *testing.T) { Disk(100). InstanceCount(1) - resp, err := r.CreateJob(job) + err := r.CreateJob(job) assert.NoError(t, err) - assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) taskQ := &aurora.TaskQuery{ Role: &role, @@ -287,7 +280,7 @@ func TestRealisClient_GetPendingReason(t *testing.T) { assert.NoError(t, err) assert.Len(t, reasons, 1) - resp, err = r.KillJob(job.JobKey()) + err = r.KillJob(job.JobKey()) assert.NoError(t, err) } @@ -316,61 +309,52 @@ func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) { settings.UpdateGroupSize = 1 settings.WaitForBatchCompletion = true job.InstanceCount(2) - resp, result, err := r.CreateService(job, settings) + result, err := r.CreateService(job, settings) fmt.Println(result.String()) assert.NoError(t, err) - assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) updateQ := aurora.JobUpdateQuery{ Key: result.GetKey(), Limit: 1, } - start := time.Now() + var updateDetails []*aurora.JobUpdateDetails + for i := 0; i*int(pulse) <= timeout; i++ { - fmt.Println("sending PulseJobUpdate....") - resp, err = r.PulseJobUpdate(result.GetKey()) - assert.NotNil(t, resp) - assert.Nil(t, err) + pulseStatus, err := r.PulseJobUpdate(result.GetKey()) - respDetail, err := r.JobUpdateDetails(updateQ) assert.Nil(t, err) - - updateDetail := response.JobUpdateDetails(respDetail) - if len(updateDetail) == 0 { - fmt.Println("No update found") - assert.NotEqual(t, len(updateDetail), 0) + if pulseStatus != aurora.JobUpdatePulseStatus_OK && pulseStatus != aurora.JobUpdatePulseStatus_FINISHED { + assert.Fail(t, "Pulse update status received doesn't exist") } - status := updateDetail[0].Update.Summary.State.Status - if _, ok := aurora.ACTIVE_JOB_UPDATE_STATES[status]; !ok { + updateDetails, err = r.JobUpdateDetails(updateQ) + assert.Nil(t, err) + + assert.Equal(t, len(updateDetails), 1, "No update matching query found") + status := updateDetails[0].Update.Summary.State.Status + + if _, ok := realis.ActiveJobUpdateStates[status]; !ok { // Rolled forward is the only state in which an update has been successfully updated // if we encounter an inactive state and it is not at rolled forward, update failed if status == aurora.JobUpdateStatus_ROLLED_FORWARD { - fmt.Println("Update succeded") + fmt.Println("Update succeeded") break } else { fmt.Println("Update failed") break } } - fmt.Println("Polling, update still active...") - time.Sleep(time.Duration(pulse) * time.Second) } - end := time.Now() - fmt.Printf("Update call took %d ns\n", (end.UnixNano() - start.UnixNano())) t.Run("TestRealisClient_KillJob_Thermos", func(t *testing.T) { - start := time.Now() - resp, err := r.KillJob(job.JobKey()) - end := time.Now() + r.AbortJobUpdate(*updateDetails[0].GetUpdate().GetSummary().GetKey(), "") + err := r.KillJob(job.JobKey()) assert.NoError(t, err) - assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) - fmt.Printf("Kill call took %d ns\n", (end.UnixNano() - start.UnixNano())) }) } @@ -394,20 +378,19 @@ func TestRealisClient_CreateService(t *testing.T) { settings := realis.NewUpdateSettings() settings.UpdateGroupSize = 2 job.InstanceCount(3) - resp, result, err := r.CreateService(job, settings) + result, err := r.CreateService(job, settings) assert.NoError(t, err) assert.NotNil(t, result) - assert.Equal(t, aurora.ResponseCode_OK, resp.GetResponseCode()) var ok bool var mErr error - if ok, mErr = monitor.JobUpdate(*result.GetKey(), 5, 180); !ok || mErr != nil { + if ok, mErr = monitor.JobUpdate(*result.GetKey(), 5*time.Second, 180*time.Second); !ok || mErr != nil { // Update may already be in a terminal state so don't check for error - _, err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.") + err := r.AbortJobUpdate(*result.GetKey(), "Monitor timed out.") - _, err = r.KillJob(job.JobKey()) + err = r.KillJob(job.JobKey()) assert.NoError(t, err) } @@ -416,7 +399,7 @@ func TestRealisClient_CreateService(t *testing.T) { assert.NoError(t, mErr) // Kill task test task after confirming it came up fine - _, err = r.KillJob(job.JobKey()) + err = r.KillJob(job.JobKey()) assert.NoError(t, err) } @@ -438,11 +421,10 @@ func TestRealisClient_CreateService_ExecutorDoesNotExist(t *testing.T) { settings := realis.NewUpdateSettings() job.InstanceCount(3) - resp, result, err := r.CreateService(job, settings) + result, err := r.CreateService(job, settings) assert.Error(t, err) assert.Nil(t, result) - assert.Equal(t, aurora.ResponseCode_INVALID_REQUEST, resp.GetResponseCode()) } func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { @@ -465,77 +447,56 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { CronSchedule("* * * * *"). IsService(false) - resp, err := r.ScheduleCronJob(job) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) + err = r.ScheduleCronJob(job) + assert.NoError(t, err) t.Run("TestRealisClient_StartCronJob_Thermos", func(t *testing.T) { - start := time.Now() - resp, err := r.StartCronJob(job.JobKey()) - end := time.Now() - + err := r.StartCronJob(job.JobKey()) assert.NoError(t, err) - assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) - fmt.Printf("Schedule cron call took %d ns\n", (end.UnixNano() - start.UnixNano())) }) t.Run("TestRealisClient_DeschedulerCronJob_Thermos", func(t *testing.T) { - start := time.Now() - resp, err := r.DescheduleCronJob(job.JobKey()) - end := time.Now() - + err := r.DescheduleCronJob(job.JobKey()) assert.NoError(t, err) - assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) - fmt.Printf("Deschedule cron call took %d ns\n", (end.UnixNano() - start.UnixNano())) }) } func TestRealisClient_StartMaintenance(t *testing.T) { hosts := []string{"localhost"} - _, _, err := r.StartMaintenance(hosts...) - if err != nil { - fmt.Printf("error: %+v\n", err.Error()) - os.Exit(1) - } + _, err := r.StartMaintenance(hosts...) + assert.NoError(t, err) // Monitor change to DRAINING and DRAINED mode hostResults, err := monitor.HostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_SCHEDULED}, - 1, - 50) + 1*time.Second, + 50*time.Second) assert.Equal(t, map[string]bool{"localhost": true}, hostResults) assert.NoError(t, err) - _, _, err = r.EndMaintenance(hosts...) + _, err = r.EndMaintenance(hosts...) assert.NoError(t, err) // Monitor change to DRAINING and DRAINED mode _, err = monitor.HostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, - 5, - 10) + 5*time.Second, + 10*time.Second) assert.NoError(t, err) } func TestRealisClient_DrainHosts(t *testing.T) { hosts := []string{"localhost"} - _, _, err := r.DrainHosts(hosts...) - if err != nil { - fmt.Printf("error: %+v\n", err.Error()) - os.Exit(1) - } + _, err := r.DrainHosts(hosts...) + assert.NoError(t, err) // Monitor change to DRAINING and DRAINED mode hostResults, err := monitor.HostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, - 1, - 50) + 1*time.Second, + 50*time.Second) assert.Equal(t, map[string]bool{"localhost": true}, hostResults) assert.NoError(t, err) @@ -544,8 +505,8 @@ func TestRealisClient_DrainHosts(t *testing.T) { hostResults, err := monitor.HostMaintenance( append(hosts, "IMAGINARY_HOST"), []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, - 1, - 1) + 1*time.Second, + 1*time.Second) // Assert monitor returned an error that was not nil, and also a list of the non-transitioned hosts assert.Error(t, err) @@ -553,15 +514,15 @@ func TestRealisClient_DrainHosts(t *testing.T) { }) t.Run("TestRealisClient_EndMaintenance", func(t *testing.T) { - _, _, err := r.EndMaintenance(hosts...) + _, err := r.EndMaintenance(hosts...) assert.NoError(t, err) // Monitor change to DRAINING and DRAINED mode _, err = monitor.HostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, - 5, - 10) + 5*time.Second, + 10*time.Second) assert.NoError(t, err) }) @@ -581,20 +542,20 @@ func TestRealisClient_SLADrainHosts(t *testing.T) { hostResults, err := monitor.HostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, - 1, - 50) + 1*time.Second, + 50*time.Second) assert.Equal(t, map[string]bool{"localhost": true}, hostResults) assert.NoError(t, err) - _, _, err = r.EndMaintenance(hosts...) + _, err = r.EndMaintenance(hosts...) assert.NoError(t, err) // Monitor change to DRAINING and DRAINED mode _, err = monitor.HostMaintenance( hosts, []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, - 5, - 10) + 5*time.Second, + 10*time.Second) assert.NoError(t, err) } @@ -613,11 +574,9 @@ func TestRealisClient_SessionThreadSafety(t *testing.T) { Disk(10). InstanceCount(1000) // Impossible amount to go live in any sane machine - resp, err := r.CreateJob(job) + err := r.CreateJob(job) assert.NoError(t, err) - assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) - wg := sync.WaitGroup{} for i := 0; i < 20; i++ { @@ -632,11 +591,9 @@ func TestRealisClient_SessionThreadSafety(t *testing.T) { assert.False(t, success) assert.Error(t, err) - resp, err := r.KillJob(job.JobKey()) + err = r.KillJob(job.JobKey()) assert.NoError(t, err) - assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) - }() } @@ -648,19 +605,15 @@ func TestRealisClient_SetQuota(t *testing.T) { var cpu = 3.5 var ram int64 = 20480 var disk int64 = 10240 - resp, err := r.SetQuota("vagrant", &cpu, &ram, &disk) + err := r.SetQuota("vagrant", &cpu, &ram, &disk) assert.NoError(t, err) - assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) t.Run("TestRealisClient_GetQuota", func(t *testing.T) { // Test GetQuota based on previously set values var result *aurora.GetQuotaResult_ - resp, err = r.GetQuota("vagrant") - if resp.GetResult_() != nil { - result = resp.GetResult_().GetQuotaResult_ - } + quotaResult, err := r.GetQuota("vagrant") + assert.NoError(t, err) - assert.Equal(t, aurora.ResponseCode_OK, resp.ResponseCode) - for res := range result.Quota.GetResources() { + for _, res := range quotaResult.GetQuota().GetResources() { switch true { case res.DiskMb != nil: assert.Equal(t, disk, *res.DiskMb) diff --git a/util.go b/util.go new file mode 100644 index 0000000..e697432 --- /dev/null +++ b/util.go @@ -0,0 +1,37 @@ +package realis + +import ( + "github.com/paypal/gorealis/gen-go/apache/aurora" +) + +var ActiveStates = make(map[aurora.ScheduleStatus]bool) +var SlaveAssignedStates = make(map[aurora.ScheduleStatus]bool) +var LiveStates = make(map[aurora.ScheduleStatus]bool) +var TerminalStates = make(map[aurora.ScheduleStatus]bool) +var ActiveJobUpdateStates = make(map[aurora.JobUpdateStatus]bool) +var AwaitingPulseJobUpdateStates = make(map[aurora.JobUpdateStatus]bool) + +func init() { + for _, status := range aurora.ACTIVE_STATES { + ActiveStates[status] = true + } + + for _, status := range aurora.SLAVE_ASSIGNED_STATES { + SlaveAssignedStates[status] = true + } + + for _, status := range aurora.LIVE_STATES { + LiveStates[status] = true + } + + for _, status := range aurora.TERMINAL_STATES { + TerminalStates[status] = true + } + + for _, status := range aurora.ACTIVE_JOB_UPDATE_STATES { + ActiveJobUpdateStates[status] = true + } + for _, status := range aurora.AWAITNG_PULSE_JOB_UPDATE_STATES { + AwaitingPulseJobUpdateStates[status] = true + } +}