Address code review comment

This commit is contained in:
Lawrence Wong 2023-01-25 14:56:02 -08:00
parent e90cc84f8c
commit b26290d922
4 changed files with 95 additions and 52 deletions

View file

@ -6,7 +6,7 @@ import (
"github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora"
)
func (r *Client) jobExists(key aurora.JobKey) (bool, error) {
func (r *Client) JobExists(key aurora.JobKey) (bool, error) {
resp, err := r.client.GetConfigSummary(context.TODO(), &key)
if err != nil {
return false, err
@ -16,6 +16,8 @@ func (r *Client) jobExists(key aurora.JobKey) (bool, error) {
resp.GetResult_() == nil ||
resp.GetResult_().GetConfigSummaryResult_() == nil ||
resp.GetResult_().GetConfigSummaryResult_().GetSummary() == nil ||
resp.GetResult_().GetConfigSummaryResult_().GetSummary().GetGroups() == nil ||
len(resp.GetResult_().GetConfigSummaryResult_().GetSummary().GetGroups()) == 0 ||
resp.GetResponseCode() != aurora.ResponseCode_OK),
nil
}

View file

@ -376,8 +376,6 @@ func (c *Client) GetJobSummary(role string) (*aurora.JobSummaryResult_, error) {
func (c *Client) GetJobs(role string) (*aurora.GetJobsResult_, error) {
var result *aurora.GetJobsResult_
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
return c.readonlyClient.GetJobs(context.TODO(), role)
},
@ -385,14 +383,13 @@ func (c *Client) GetJobs(role string) (*aurora.GetJobsResult_, error) {
)
if retryErr != nil {
return result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler")
return nil, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler")
}
if resp == nil || resp.GetResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
if resp != nil && resp.GetResult_() != nil {
result = resp.GetResult_().GetJobsResult_
}
return result, nil
return resp.GetResult_().GetJobsResult_, nil
}
// Kill specific instances of a job. Returns true, nil if a task was actually killed as a result of this API call.
@ -462,7 +459,7 @@ func (c *Client) CreateJob(auroraJob *AuroraJob) error {
// On a client timeout, attempt to verify that payload made to the Scheduler by
// trying to get the config summary for the job key
func() (*aurora.Response, bool) {
exists, err := c.jobExists(auroraJob.JobKey())
exists, err := c.JobExists(auroraJob.JobKey())
if err != nil {
c.logger.Print("verification failed ", err)
}
@ -644,12 +641,10 @@ func (c *Client) StartJobUpdate(updateJob *JobUpdate, message string) (*aurora.S
return nil, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler")
}
if resp != nil && resp.GetResult_() != nil && resp.GetResult_().GetStartJobUpdateResult_() != nil {
return resp.GetResult_().GetStartJobUpdateResult_(), nil
if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetStartJobUpdateResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
return resp.GetResult_().GetStartJobUpdateResult_(), nil
}
// AbortJobUpdate terminates a job update in the scheduler.
@ -755,10 +750,11 @@ func (c *Client) PulseJobUpdate(updateKey aurora.JobUpdateKey) (aurora.JobUpdate
return aurora.JobUpdatePulseStatus(0), errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler")
}
if resp != nil && resp.GetResult_() != nil && resp.GetResult_().GetPulseJobUpdateResult_() != nil {
return resp.GetResult_().GetPulseJobUpdateResult_().GetStatus(), nil
if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetPulseJobUpdateResult_() == nil {
return aurora.JobUpdatePulseStatus(0), errors.New("unexpected response from scheduler")
}
return aurora.JobUpdatePulseStatus(0), errors.New("thrift error, field was nil unexpectedly")
return resp.GetResult_().GetPulseJobUpdateResult_().GetStatus(), nil
}
// Scale up the number of instances under a job configuration using the configuration for specific
@ -829,6 +825,9 @@ func (c *Client) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask
if retryErr != nil {
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status")
}
if resp == nil {
return nil, errors.New("unexpected response from scheduler")
}
return response.ScheduleStatusResult(resp).GetTasks(), nil
}
@ -848,13 +847,11 @@ func (c *Client) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingRea
return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons")
}
var result []*aurora.PendingReason
if resp != nil && resp.GetResult_() != nil && resp.GetResult_().GetGetPendingReasonResult_() != nil {
result = resp.GetResult_().GetGetPendingReasonResult_().GetReasons()
if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetPendingReasonResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
return result, nil
return resp.GetResult_().GetGetPendingReasonResult_().GetReasons(), nil
}
// GetTasksWithoutConfigs gets information about task including without a task configuration object.
@ -930,10 +927,11 @@ func (c *Client) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) ([]*aurora.
return nil, errors.Wrap(retryErr, "unable to get job update details")
}
if resp != nil && resp.GetResult_() != nil && resp.GetResult_().GetGetJobUpdateDetailsResult_() != nil {
return resp.GetResult_().GetGetJobUpdateDetailsResult_().GetDetailsList(), nil
if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateDetailsResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
return nil, errors.New("unknown Thrift error, field is nil.")
return resp.GetResult_().GetGetJobUpdateDetailsResult_().GetDetailsList(), nil
}
func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) error {

View file

@ -45,10 +45,11 @@ func (c *Client) DrainHosts(hosts ...string) ([]*aurora.HostStatus, error) {
return nil, errors.Wrap(retryErr, "unable to recover connection")
}
if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil {
return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil
if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetDrainHostsResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil
}
// Start SLA Aware Drain.
@ -87,10 +88,11 @@ func (c *Client) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ..
return nil, errors.Wrap(retryErr, "unable to recover connection")
}
if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil {
return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil
if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetDrainHostsResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil
}
func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error) {
@ -114,10 +116,11 @@ func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error)
return nil, errors.Wrap(retryErr, "unable to recover connection")
}
if resp.GetResult_() != nil && resp.GetResult_().GetStartMaintenanceResult_() != nil {
return resp.GetResult_().GetStartMaintenanceResult_().GetStatuses(), nil
if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetStartMaintenanceResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
return resp.GetResult_().GetStartMaintenanceResult_().GetStatuses(), nil
}
func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) {
@ -140,18 +143,13 @@ func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) {
if retryErr != nil {
return nil, errors.Wrap(retryErr, "unable to recover connection")
}
if resp.GetResult_() != nil && resp.GetResult_().GetEndMaintenanceResult_() != nil {
return resp.GetResult_().GetEndMaintenanceResult_().GetStatuses(), nil
if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetEndMaintenanceResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
return resp.GetResult_().GetEndMaintenanceResult_().GetStatuses(), nil
}
func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusResult_, error) {
var result *aurora.MaintenanceStatusResult_
if len(hosts) == 0 {
return nil, errors.New("no hosts provided to get maintenance status from")
}
@ -170,14 +168,13 @@ func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusRe
)
if retryErr != nil {
return result, errors.Wrap(retryErr, "unable to recover connection")
return nil, errors.Wrap(retryErr, "unable to recover connection")
}
if resp == nil || resp.GetResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
if resp.GetResult_() != nil {
result = resp.GetResult_().GetMaintenanceStatusResult_()
}
return result, nil
return resp.GetResult_().GetMaintenanceStatusResult_(), nil
}
// SetQuota sets a quota aggregate for the given role
@ -219,10 +216,10 @@ func (c *Client) GetQuota(role string) (*aurora.GetQuotaResult_, error) {
return nil, errors.Wrap(retryErr, "unable to get role quota")
}
if resp.GetResult_() != nil {
return resp.GetResult_().GetGetQuotaResult_(), nil
if resp == nil || resp.GetResult_() == nil {
return nil, errors.New("unexpected response from scheduler")
}
return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
return resp.GetResult_().GetGetQuotaResult_(), nil
}
// Force Aurora Scheduler to perform a snapshot and write to Mesos log

View file

@ -1447,3 +1447,49 @@ func TestRealisClient_FitTasks(t *testing.T) {
}
}
}
func TestRealisClient_JobExists(t *testing.T) {
role := "vagrant"
env := "prod"
name := "test_job_exists"
// Create a good single job
job := realis.NewJob().
Environment(env).
Role(role).
Name(name).
ThermosExecutor(thermosExec).
CPU(.25).
RAM(4).
Disk(10).
InstanceCount(3).
IsService(true).
Production(false).
Tier("preemptible").
Priority(0)
err := r.CreateJob(job)
assert.NoError(t, err)
exists, err := r.JobExists(job.JobKey())
assert.NoError(t, err)
assert.True(t, exists)
// Create a single bad job
badJob := realis.NewJob().
Environment("prod").
Role("vagrant").
Name("executordoesntexist").
ExecutorName("idontexist").
ExecutorData("").
CPU(.25).
RAM(4).
Disk(10).
InstanceCount(1)
err = r.CreateJob(badJob)
assert.Error(t, err)
exists, err = r.JobExists(badJob.JobKey())
assert.NoError(t, err)
assert.False(t, exists)
}