diff --git a/helpers.go b/helpers.go index adaa9a6..1cd04f2 100644 --- a/helpers.go +++ b/helpers.go @@ -6,7 +6,7 @@ import ( "github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora" ) -func (r *Client) jobExists(key aurora.JobKey) (bool, error) { +func (r *Client) JobExists(key aurora.JobKey) (bool, error) { resp, err := r.client.GetConfigSummary(context.TODO(), &key) if err != nil { return false, err @@ -16,6 +16,8 @@ func (r *Client) jobExists(key aurora.JobKey) (bool, error) { resp.GetResult_() == nil || resp.GetResult_().GetConfigSummaryResult_() == nil || resp.GetResult_().GetConfigSummaryResult_().GetSummary() == nil || + resp.GetResult_().GetConfigSummaryResult_().GetSummary().GetGroups() == nil || + len(resp.GetResult_().GetConfigSummaryResult_().GetSummary().GetGroups()) == 0 || resp.GetResponseCode() != aurora.ResponseCode_OK), nil } diff --git a/realis.go b/realis.go index 3552f61..c331918 100644 --- a/realis.go +++ b/realis.go @@ -376,8 +376,6 @@ func (c *Client) GetJobSummary(role string) (*aurora.JobSummaryResult_, error) { func (c *Client) GetJobs(role string) (*aurora.GetJobsResult_, error) { - var result *aurora.GetJobsResult_ - resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { return c.readonlyClient.GetJobs(context.TODO(), role) }, @@ -385,14 +383,13 @@ func (c *Client) GetJobs(role string) (*aurora.GetJobsResult_, error) { ) if retryErr != nil { - return result, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler") + return nil, errors.Wrap(retryErr, "error getting Jobs from Aurora Scheduler") + } + if resp == nil || resp.GetResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - if resp != nil && resp.GetResult_() != nil { - result = resp.GetResult_().GetJobsResult_ - } - - return result, nil + return resp.GetResult_().GetJobsResult_, nil } // Kill specific instances of a job. Returns true, nil if a task was actually killed as a result of this API call. @@ -462,7 +459,7 @@ func (c *Client) CreateJob(auroraJob *AuroraJob) error { // On a client timeout, attempt to verify that payload made to the Scheduler by // trying to get the config summary for the job key func() (*aurora.Response, bool) { - exists, err := c.jobExists(auroraJob.JobKey()) + exists, err := c.JobExists(auroraJob.JobKey()) if err != nil { c.logger.Print("verification failed ", err) } @@ -644,12 +641,10 @@ func (c *Client) StartJobUpdate(updateJob *JobUpdate, message string) (*aurora.S return nil, errors.Wrap(retryErr, "error sending StartJobUpdate command to Aurora Scheduler") } - - if resp != nil && resp.GetResult_() != nil && resp.GetResult_().GetStartJobUpdateResult_() != nil { - return resp.GetResult_().GetStartJobUpdateResult_(), nil + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetStartJobUpdateResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + return resp.GetResult_().GetStartJobUpdateResult_(), nil } // AbortJobUpdate terminates a job update in the scheduler. @@ -755,10 +750,11 @@ func (c *Client) PulseJobUpdate(updateKey aurora.JobUpdateKey) (aurora.JobUpdate return aurora.JobUpdatePulseStatus(0), errors.Wrap(retryErr, "error sending PulseJobUpdate command to Aurora Scheduler") } - if resp != nil && resp.GetResult_() != nil && resp.GetResult_().GetPulseJobUpdateResult_() != nil { - return resp.GetResult_().GetPulseJobUpdateResult_().GetStatus(), nil + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetPulseJobUpdateResult_() == nil { + return aurora.JobUpdatePulseStatus(0), errors.New("unexpected response from scheduler") } - return aurora.JobUpdatePulseStatus(0), errors.New("thrift error, field was nil unexpectedly") + + return resp.GetResult_().GetPulseJobUpdateResult_().GetStatus(), nil } // Scale up the number of instances under a job configuration using the configuration for specific @@ -829,6 +825,9 @@ func (c *Client) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask if retryErr != nil { return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for task status") } + if resp == nil { + return nil, errors.New("unexpected response from scheduler") + } return response.ScheduleStatusResult(resp).GetTasks(), nil } @@ -848,13 +847,11 @@ func (c *Client) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingRea return nil, errors.Wrap(retryErr, "error querying Aurora Scheduler for pending Reasons") } - var result []*aurora.PendingReason - - if resp != nil && resp.GetResult_() != nil && resp.GetResult_().GetGetPendingReasonResult_() != nil { - result = resp.GetResult_().GetGetPendingReasonResult_().GetReasons() + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetPendingReasonResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - return result, nil + return resp.GetResult_().GetGetPendingReasonResult_().GetReasons(), nil } // GetTasksWithoutConfigs gets information about task including without a task configuration object. @@ -930,10 +927,11 @@ func (c *Client) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) ([]*aurora. return nil, errors.Wrap(retryErr, "unable to get job update details") } - if resp != nil && resp.GetResult_() != nil && resp.GetResult_().GetGetJobUpdateDetailsResult_() != nil { - return resp.GetResult_().GetGetJobUpdateDetailsResult_().GetDetailsList(), nil + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetGetJobUpdateDetailsResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - return nil, errors.New("unknown Thrift error, field is nil.") + + return resp.GetResult_().GetGetJobUpdateDetailsResult_().GetDetailsList(), nil } func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) error { diff --git a/realis_admin.go b/realis_admin.go index a1fa50f..f100f46 100644 --- a/realis_admin.go +++ b/realis_admin.go @@ -45,10 +45,11 @@ func (c *Client) DrainHosts(hosts ...string) ([]*aurora.HostStatus, error) { return nil, errors.Wrap(retryErr, "unable to recover connection") } - if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil { - return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetDrainHostsResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + + return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil } // Start SLA Aware Drain. @@ -87,10 +88,11 @@ func (c *Client) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts .. return nil, errors.Wrap(retryErr, "unable to recover connection") } - if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil { - return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetDrainHostsResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + + return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil } func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { @@ -114,10 +116,11 @@ func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error) return nil, errors.Wrap(retryErr, "unable to recover connection") } - if resp.GetResult_() != nil && resp.GetResult_().GetStartMaintenanceResult_() != nil { - return resp.GetResult_().GetStartMaintenanceResult_().GetStatuses(), nil + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetStartMaintenanceResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + + return resp.GetResult_().GetStartMaintenanceResult_().GetStatuses(), nil } func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { @@ -140,18 +143,13 @@ func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { if retryErr != nil { return nil, errors.Wrap(retryErr, "unable to recover connection") } - - if resp.GetResult_() != nil && resp.GetResult_().GetEndMaintenanceResult_() != nil { - return resp.GetResult_().GetEndMaintenanceResult_().GetStatuses(), nil + if resp == nil || resp.GetResult_() == nil || resp.GetResult_().GetEndMaintenanceResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") - + return resp.GetResult_().GetEndMaintenanceResult_().GetStatuses(), nil } func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusResult_, error) { - - var result *aurora.MaintenanceStatusResult_ - if len(hosts) == 0 { return nil, errors.New("no hosts provided to get maintenance status from") } @@ -170,14 +168,13 @@ func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusRe ) if retryErr != nil { - return result, errors.Wrap(retryErr, "unable to recover connection") + return nil, errors.Wrap(retryErr, "unable to recover connection") + } + if resp == nil || resp.GetResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - if resp.GetResult_() != nil { - result = resp.GetResult_().GetMaintenanceStatusResult_() - } - - return result, nil + return resp.GetResult_().GetMaintenanceStatusResult_(), nil } // SetQuota sets a quota aggregate for the given role @@ -219,10 +216,10 @@ func (c *Client) GetQuota(role string) (*aurora.GetQuotaResult_, error) { return nil, errors.Wrap(retryErr, "unable to get role quota") } - if resp.GetResult_() != nil { - return resp.GetResult_().GetGetQuotaResult_(), nil + if resp == nil || resp.GetResult_() == nil { + return nil, errors.New("unexpected response from scheduler") } - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + return resp.GetResult_().GetGetQuotaResult_(), nil } // Force Aurora Scheduler to perform a snapshot and write to Mesos log diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 04fb307..9482964 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -1447,3 +1447,49 @@ func TestRealisClient_FitTasks(t *testing.T) { } } } + +func TestRealisClient_JobExists(t *testing.T) { + role := "vagrant" + env := "prod" + name := "test_job_exists" + // Create a good single job + job := realis.NewJob(). + Environment(env). + Role(role). + Name(name). + ThermosExecutor(thermosExec). + CPU(.25). + RAM(4). + Disk(10). + InstanceCount(3). + IsService(true). + Production(false). + Tier("preemptible"). + Priority(0) + + err := r.CreateJob(job) + assert.NoError(t, err) + + exists, err := r.JobExists(job.JobKey()) + assert.NoError(t, err) + assert.True(t, exists) + + // Create a single bad job + badJob := realis.NewJob(). + Environment("prod"). + Role("vagrant"). + Name("executordoesntexist"). + ExecutorName("idontexist"). + ExecutorData(""). + CPU(.25). + RAM(4). + Disk(10). + InstanceCount(1) + + err = r.CreateJob(badJob) + assert.Error(t, err) + + exists, err = r.JobExists(badJob.JobKey()) + assert.NoError(t, err) + assert.False(t, exists) +}