Added ability to fetch the configuration for a specific instance
This commit is contained in:
parent
714b21631e
commit
4c3d850a9e
3 changed files with 63 additions and 14 deletions
|
@ -29,6 +29,7 @@ func main() {
|
||||||
executor := flag.String("executor", "thermos", "Executor to use")
|
executor := flag.String("executor", "thermos", "Executor to use")
|
||||||
url := flag.String("url", "", "URL at which the Aurora Scheduler exists as [url]:[port]")
|
url := flag.String("url", "", "URL at which the Aurora Scheduler exists as [url]:[port]")
|
||||||
clustersConfig := flag.String("clusters", "", "Location of the clusters.json file used by aurora.")
|
clustersConfig := flag.String("clusters", "", "Location of the clusters.json file used by aurora.")
|
||||||
|
clusterName := flag.String("cluster", "devcluster", "Name of cluster to run job on")
|
||||||
updateId := flag.String("updateId", "", "Update ID to operate on")
|
updateId := flag.String("updateId", "", "Update ID to operate on")
|
||||||
username := flag.String("username", "aurora", "Username to use for authorization")
|
username := flag.String("username", "aurora", "Username to use for authorization")
|
||||||
password := flag.String("password", "secret", "Password to use for authorization")
|
password := flag.String("password", "secret", "Password to use for authorization")
|
||||||
|
@ -42,7 +43,11 @@ func main() {
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
cluster, _ := clusters["devcluster"]
|
cluster, ok := clusters[*clusterName];
|
||||||
|
if(!ok) {
|
||||||
|
fmt.Printf("Cluster %s chosen doesn't exist\n", *clusterName)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
*url, err = realis.LeaderFromZK(cluster)
|
*url, err = realis.LeaderFromZK(cluster)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -63,7 +68,7 @@ func main() {
|
||||||
r := realis.NewClient(config)
|
r := realis.NewClient(config)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
|
|
||||||
var job *realis.Job
|
var job realis.Job
|
||||||
|
|
||||||
switch *executor {
|
switch *executor {
|
||||||
case "thermos":
|
case "thermos":
|
||||||
|
@ -141,7 +146,7 @@ func main() {
|
||||||
break
|
break
|
||||||
case "flexUp":
|
case "flexUp":
|
||||||
fmt.Println("Flexing up job")
|
fmt.Println("Flexing up job")
|
||||||
response, err := r.AddInstances(&aurora.InstanceKey{job.JobKey(), 0}, 5)
|
response, err := r.AddInstances(aurora.InstanceKey{job.JobKey(), 0}, 5)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
|
@ -150,8 +155,12 @@ func main() {
|
||||||
break
|
break
|
||||||
case "update":
|
case "update":
|
||||||
fmt.Println("Updating a job with a new name")
|
fmt.Println("Updating a job with a new name")
|
||||||
updateJob := realis.NewUpdateJob(job)
|
taskConfig, err := r.FetchTaskConfig(aurora.InstanceKey{job.JobKey(), 0})
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
updateJob := realis.NewUpdateJob(taskConfig)
|
||||||
updateJob.InstanceCount(3).RAM(128)
|
updateJob.InstanceCount(3).RAM(128)
|
||||||
|
|
||||||
resposne, err := r.StartJobUpdate(updateJob, "")
|
resposne, err := r.StartJobUpdate(updateJob, "")
|
||||||
|
@ -170,6 +179,15 @@ func main() {
|
||||||
}
|
}
|
||||||
fmt.Println(response.String())
|
fmt.Println(response.String())
|
||||||
break
|
break
|
||||||
|
case "taskConfig":
|
||||||
|
fmt.Println("Getting job info")
|
||||||
|
config, err := r.FetchTaskConfig(aurora.InstanceKey{job.JobKey(), 0})
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
print(config.String())
|
||||||
|
break
|
||||||
default:
|
default:
|
||||||
fmt.Println("Only create, kill, restart, flexUp, update, and abortUpdate are supported now")
|
fmt.Println("Only create, kill, restart, flexUp, update, and abortUpdate are supported now")
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
|
|
43
realis.go
43
realis.go
|
@ -29,8 +29,10 @@ import (
|
||||||
|
|
||||||
type Realis interface {
|
type Realis interface {
|
||||||
AbortJobUpdate(key *aurora.JobKey, updateId string, message string) (*aurora.Response, error)
|
AbortJobUpdate(key *aurora.JobKey, updateId string, message string) (*aurora.Response, error)
|
||||||
AddInstances(instKey *aurora.InstanceKey, count int32) (*aurora.Response, error)
|
AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error)
|
||||||
CreateJob(auroraJob *Job) (*aurora.Response, error)
|
ConfigSummary(instKey aurora.InstanceKey) (*aurora.TaskConfig, error)
|
||||||
|
CreateJob(auroraJob Job) (*aurora.Response, error)
|
||||||
|
FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error)
|
||||||
KillJob(key *aurora.JobKey) (*aurora.Response, error)
|
KillJob(key *aurora.JobKey) (*aurora.Response, error)
|
||||||
KillInstance(key *aurora.JobKey, instanceId int32) (*aurora.Response, error)
|
KillInstance(key *aurora.JobKey, instanceId int32) (*aurora.Response, error)
|
||||||
RestartJob(key *aurora.JobKey) (*aurora.Response, error)
|
RestartJob(key *aurora.JobKey) (*aurora.Response, error)
|
||||||
|
@ -158,8 +160,8 @@ func (r realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sends a create job message to the scheduler with a specific job configuration.
|
// Sends a create job message to the scheduler with a specific job configuration.
|
||||||
func (r realisClient) CreateJob(auroraJob *Job) (*aurora.Response, error) {
|
func (r realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
|
||||||
response, err := r.client.CreateJob(auroraJob.jobConfig)
|
response, err := r.client.CreateJob(auroraJob.JobConfig())
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "Error sending Create command to Aurora Scheduler")
|
return nil, errors.Wrap(err, "Error sending Create command to Aurora Scheduler")
|
||||||
|
@ -218,9 +220,9 @@ func (r realisClient) AbortJobUpdate(
|
||||||
|
|
||||||
// Scale up the number of instances under a job configuration using the configuration for specific
|
// Scale up the number of instances under a job configuration using the configuration for specific
|
||||||
// instance to scale up.
|
// instance to scale up.
|
||||||
func (r realisClient) AddInstances(instKey *aurora.InstanceKey, count int32) (*aurora.Response, error) {
|
func (r realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) {
|
||||||
|
|
||||||
response, err := r.client.AddInstances(instKey, count)
|
response, err := r.client.AddInstances(&instKey, count)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "Error sending AddInstances command to Aurora Scheduler")
|
return nil, errors.Wrap(err, "Error sending AddInstances command to Aurora Scheduler")
|
||||||
|
@ -228,3 +230,32 @@ func (r realisClient) AddInstances(instKey *aurora.InstanceKey, count int32) (*a
|
||||||
|
|
||||||
return response, nil
|
return response, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) {
|
||||||
|
|
||||||
|
ids := make(map[int32]bool)
|
||||||
|
|
||||||
|
ids[instKey.InstanceId] = true
|
||||||
|
taskQ := &aurora.TaskQuery{Role: instKey.JobKey.Role,
|
||||||
|
Environment: instKey.JobKey.Environment,
|
||||||
|
JobName: instKey.JobKey.Name,
|
||||||
|
InstanceIds: ids, }
|
||||||
|
|
||||||
|
response, err := r.client.GetTasksStatus(taskQ)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "Error querying Aurora Scheduler for task configuration")
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
tasks := response.GetResult_().GetScheduleStatusResult_().GetTasks()
|
||||||
|
|
||||||
|
if(len(tasks) == 0) {
|
||||||
|
return nil, errors.Errorf("Instance %d for jobkey %s/%s/%s doesn't exist",
|
||||||
|
instKey.InstanceId,
|
||||||
|
instKey.JobKey.Environment,
|
||||||
|
instKey.JobKey.Role,
|
||||||
|
instKey.JobKey.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
return tasks[0].AssignedTask.Task, nil
|
||||||
|
}
|
||||||
|
|
|
@ -14,7 +14,9 @@
|
||||||
|
|
||||||
package realis
|
package realis
|
||||||
|
|
||||||
import "gen-go/apache/aurora"
|
import (
|
||||||
|
"gen-go/apache/aurora"
|
||||||
|
)
|
||||||
|
|
||||||
// Structure to collect all information required to create job update
|
// Structure to collect all information required to create job update
|
||||||
type UpdateJob struct {
|
type UpdateJob struct {
|
||||||
|
@ -32,9 +34,7 @@ func NewUpdateJob(config *aurora.TaskConfig) *UpdateJob {
|
||||||
job := NewJob().(AuroraJob)
|
job := NewJob().(AuroraJob)
|
||||||
job.jobConfig.TaskConfig = config
|
job.jobConfig.TaskConfig = config
|
||||||
|
|
||||||
|
|
||||||
// Rebuild resource map from TaskConfig
|
// Rebuild resource map from TaskConfig
|
||||||
job.resources = make(map[string]*aurora.Resource)
|
|
||||||
for ptr := range config.Resources {
|
for ptr := range config.Resources {
|
||||||
if(ptr.NumCpus != nil){
|
if(ptr.NumCpus != nil){
|
||||||
job.resources["cpu"].NumCpus = ptr.NumCpus
|
job.resources["cpu"].NumCpus = ptr.NumCpus
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue