Merge pull request #17 from sunilmvn/master

Adding removeInstances method for a job -
This commit is contained in:
kkrishna 2017-03-10 14:00:36 -08:00 committed by GitHub
commit 58088a139c
2 changed files with 64 additions and 5 deletions

View file

@ -74,7 +74,7 @@ func main() {
switch *executor { switch *executor {
case "thermos": case "thermos":
payload, err := ioutil.ReadFile("thermos_payload.json") payload, err := ioutil.ReadFile("examples/thermos_payload.json")
if err != nil { if err != nil {
fmt.Println("Error reading json config file: ", err) fmt.Println("Error reading json config file: ", err)
os.Exit(1) os.Exit(1)
@ -100,11 +100,11 @@ func main() {
Name("docker-compose"). Name("docker-compose").
ExecutorName("docker-compose-executor"). ExecutorName("docker-compose-executor").
ExecutorData("{}"). ExecutorData("{}").
CPU(1). CPU(0.5).
RAM(64). RAM(64).
Disk(100). Disk(100).
IsService(false). IsService(false).
InstanceCount(1). InstanceCount(3).
AddPorts(1). AddPorts(1).
AddLabel("fileName", "sample-app/docker-compose.yml"). AddLabel("fileName", "sample-app/docker-compose.yml").
AddURIs(true, true, "https://github.com/mesos/docker-compose-executor/releases/download/0.1.0/sample-app.tar.gz") AddURIs(true, true, "https://github.com/mesos/docker-compose-executor/releases/download/0.1.0/sample-app.tar.gz")
@ -273,7 +273,15 @@ func main() {
case "flexUp": case "flexUp":
fmt.Println("Flexing up job") fmt.Println("Flexing up job")
numOfInstances := int32(5) numOfInstances := int32(2)
live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
currInstances := int32(len(live))
fmt.Println("Current num of instances: ", currInstances)
resp, err := r.AddInstances(aurora.InstanceKey{job.JobKey(), 0}, numOfInstances) resp, err := r.AddInstances(aurora.InstanceKey{job.JobKey(), 0}, numOfInstances)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
@ -281,7 +289,34 @@ func main() {
} }
if resp.ResponseCode == aurora.ResponseCode_OK { if resp.ResponseCode == aurora.ResponseCode_OK {
if ok, err := monitor.Instances(job.JobKey(), job.GetInstanceCount()+numOfInstances, 5, 50); !ok || err != nil { if ok, err := monitor.Instances(job.JobKey(), currInstances+numOfInstances, 5, 50); !ok || err != nil {
fmt.Println("Flexing up failed")
}
}
fmt.Println(resp.String())
break
case "flexDown":
fmt.Println("Flexing down job")
numOfInstances := int32(2)
live, err := r.GetInstanceIds(job.JobKey(), aurora.ACTIVE_STATES)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
currInstances := int32(len(live))
fmt.Println("Current num of instances: ", currInstances)
resp, err := r.RemoveInstances(job.JobKey(), numOfInstances)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if resp.ResponseCode == aurora.ResponseCode_OK {
if ok, err := monitor.Instances(job.JobKey(), currInstances-numOfInstances, 5, 50); !ok || err != nil {
fmt.Println("Flexing up failed") fmt.Println("Flexing up failed")
} }
} }

View file

@ -25,11 +25,13 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/rdelval/gorealis/gen-go/apache/aurora" "github.com/rdelval/gorealis/gen-go/apache/aurora"
"github.com/rdelval/gorealis/response" "github.com/rdelval/gorealis/response"
"fmt"
) )
type Realis interface { type Realis interface {
AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error)
AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error)
RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error)
CreateJob(auroraJob Job) (*aurora.Response, error) CreateJob(auroraJob Job) (*aurora.Response, error)
DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error)
GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error)
@ -320,6 +322,28 @@ func (r realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*au
return response.ResponseCodeCheck(resp) return response.ResponseCodeCheck(resp)
} }
//Scale down the number of instances under a job configuration using the configuratipn of a specific instance
func (r realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error) {
instanceIds, err := r.GetInstanceIds(key, aurora.ACTIVE_STATES)
if err != nil {
return nil, errors.Wrap(err, "RemoveInstances: Could not retrieve relevant instance IDs")
}
if len(instanceIds) < int(count) {
return nil, errors.New(fmt.Sprintf("RemoveInstances: No sufficient instances to Kill - " +
"Instances to kill %d Total Instances %d", count, len(instanceIds)))
}
instanceList := make([]int32, count)
i := 0
for k := range instanceIds {
instanceList[i] = k
i += 1
if i == int(count) {
break
}
}
return r.KillInstances(key, instanceList...)
}
func (r realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) { func (r realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
resp, err := r.client.GetTasksStatus(query) resp, err := r.client.GetTasksStatus(query)