diff --git a/examples/client.go b/examples/client.go index d681886..15f85f2 100644 --- a/examples/client.go +++ b/examples/client.go @@ -29,6 +29,7 @@ func main() { executor := flag.String("executor", "thermos", "Executor to use") url := flag.String("url", "", "URL at which the Aurora Scheduler exists as [url]:[port]") clustersConfig := flag.String("clusters", "", "Location of the clusters.json file used by aurora.") + clusterName := flag.String("cluster", "devcluster", "Name of cluster to run job on") updateId := flag.String("updateId", "", "Update ID to operate on") username := flag.String("username", "aurora", "Username to use for authorization") password := flag.String("password", "secret", "Password to use for authorization") @@ -42,7 +43,11 @@ func main() { os.Exit(1) } - cluster, _ := clusters["devcluster"] + cluster, ok := clusters[*clusterName]; + if(!ok) { + fmt.Printf("Cluster %s chosen doesn't exist\n", *clusterName) + os.Exit(1) + } *url, err = realis.LeaderFromZK(cluster) if err != nil { @@ -63,7 +68,7 @@ func main() { r := realis.NewClient(config) defer r.Close() - var job *realis.Job + var job realis.Job switch *executor { case "thermos": @@ -141,7 +146,7 @@ func main() { break case "flexUp": fmt.Println("Flexing up job") - response, err := r.AddInstances(&aurora.InstanceKey{job.JobKey(), 0}, 5) + response, err := r.AddInstances(aurora.InstanceKey{job.JobKey(), 0}, 5) if err != nil { fmt.Println(err) os.Exit(1) @@ -150,8 +155,12 @@ func main() { break case "update": fmt.Println("Updating a job with a new name") - updateJob := realis.NewUpdateJob(job) - + taskConfig, err := r.FetchTaskConfig(aurora.InstanceKey{job.JobKey(), 0}) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + updateJob := realis.NewUpdateJob(taskConfig) updateJob.InstanceCount(3).RAM(128) resposne, err := r.StartJobUpdate(updateJob, "") @@ -170,6 +179,15 @@ func main() { } fmt.Println(response.String()) break + case "taskConfig": + fmt.Println("Getting job info") + config, err := r.FetchTaskConfig(aurora.InstanceKey{job.JobKey(), 0}) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + print(config.String()) + break default: fmt.Println("Only create, kill, restart, flexUp, update, and abortUpdate are supported now") os.Exit(1) diff --git a/realis.go b/realis.go index d7f0803..e9d653b 100644 --- a/realis.go +++ b/realis.go @@ -29,8 +29,10 @@ import ( type Realis interface { AbortJobUpdate(key *aurora.JobKey, updateId string, message string) (*aurora.Response, error) - AddInstances(instKey *aurora.InstanceKey, count int32) (*aurora.Response, error) - CreateJob(auroraJob *Job) (*aurora.Response, error) + AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) + ConfigSummary(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) + CreateJob(auroraJob Job) (*aurora.Response, error) + FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) KillJob(key *aurora.JobKey) (*aurora.Response, error) KillInstance(key *aurora.JobKey, instanceId int32) (*aurora.Response, error) RestartJob(key *aurora.JobKey) (*aurora.Response, error) @@ -158,8 +160,8 @@ func (r realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { } // Sends a create job message to the scheduler with a specific job configuration. -func (r realisClient) CreateJob(auroraJob *Job) (*aurora.Response, error) { - response, err := r.client.CreateJob(auroraJob.jobConfig) +func (r realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { + response, err := r.client.CreateJob(auroraJob.JobConfig()) if err != nil { return nil, errors.Wrap(err, "Error sending Create command to Aurora Scheduler") @@ -218,9 +220,9 @@ func (r realisClient) AbortJobUpdate( // Scale up the number of instances under a job configuration using the configuration for specific // instance to scale up. -func (r realisClient) AddInstances(instKey *aurora.InstanceKey, count int32) (*aurora.Response, error) { +func (r realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) { - response, err := r.client.AddInstances(instKey, count) + response, err := r.client.AddInstances(&instKey, count) if err != nil { return nil, errors.Wrap(err, "Error sending AddInstances command to Aurora Scheduler") @@ -228,3 +230,32 @@ func (r realisClient) AddInstances(instKey *aurora.InstanceKey, count int32) (*a return response, nil } + +func (r realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) { + + ids := make(map[int32]bool) + + ids[instKey.InstanceId] = true + taskQ := &aurora.TaskQuery{Role: instKey.JobKey.Role, + Environment: instKey.JobKey.Environment, + JobName: instKey.JobKey.Name, + InstanceIds: ids, } + + response, err := r.client.GetTasksStatus(taskQ) + if err != nil { + return nil, errors.Wrap(err, "Error querying Aurora Scheduler for task configuration") + } + + + tasks := response.GetResult_().GetScheduleStatusResult_().GetTasks() + + if(len(tasks) == 0) { + return nil, errors.Errorf("Instance %d for jobkey %s/%s/%s doesn't exist", + instKey.InstanceId, + instKey.JobKey.Environment, + instKey.JobKey.Role, + instKey.JobKey.Name) + } + + return tasks[0].AssignedTask.Task, nil +} diff --git a/updatejob.go b/updatejob.go index 7dbbc09..2b0770c 100644 --- a/updatejob.go +++ b/updatejob.go @@ -14,7 +14,9 @@ package realis -import "gen-go/apache/aurora" +import ( + "gen-go/apache/aurora" +) // Structure to collect all information required to create job update type UpdateJob struct { @@ -32,9 +34,7 @@ func NewUpdateJob(config *aurora.TaskConfig) *UpdateJob { job := NewJob().(AuroraJob) job.jobConfig.TaskConfig = config - // Rebuild resource map from TaskConfig - job.resources = make(map[string]*aurora.Resource) for ptr := range config.Resources { if(ptr.NumCpus != nil){ job.resources["cpu"].NumCpus = ptr.NumCpus