Merge pull request #36 from rdelval/auroraAdmin

New API to set hosts to DRAINING. Cleaned up some of the sample client code.
This commit is contained in:
Renan DelValle 2017-09-26 12:33:50 -07:00 committed by GitHub
commit f59940f9a7
3 changed files with 187 additions and 33 deletions

View file

@ -25,35 +25,44 @@ import (
"github.com/rdelval/gorealis"
"github.com/rdelval/gorealis/gen-go/apache/aurora"
"github.com/rdelval/gorealis/response"
"strings"
)
func main() {
cmd := flag.String("cmd", "", "Job request type to send to Aurora Scheduler")
executor := flag.String("executor", "thermos", "Executor to use")
url := flag.String("url", "", "URL at which the Aurora Scheduler exists as [url]:[port]")
clustersConfig := flag.String("clusters", "", "Location of the clusters.json file used by aurora.")
clusterName := flag.String("cluster", "devcluster", "Name of cluster to run job on (only necessary if clusters is set)")
updateId := flag.String("updateId", "", "Update ID to operate on")
username := flag.String("username", "aurora", "Username to use for authorization")
password := flag.String("password", "secret", "Password to use for authorization")
zkUrl := flag.String("zkurl", "", "zookeeper url")
var cmd, executor, url, clustersConfig, clusterName, updateId, username, password, zkUrl, drainCandidates string
var CONNECTION_TIMEOUT = 20000
func init() {
flag.StringVar(&cmd, "cmd", "", "Job request type to send to Aurora Scheduler")
flag.StringVar(&executor, "executor", "thermos", "Executor to use")
flag.StringVar(&url, "url", "", "URL at which the Aurora Scheduler exists as [url]:[port]")
flag.StringVar(&clustersConfig, "clusters", "", "Location of the clusters.json file used by aurora.")
flag.StringVar(&clusterName, "cluster", "devcluster", "Name of cluster to run job on (only necessary if clusters is set)")
flag.StringVar(&updateId, "updateId", "", "Update ID to operate on")
flag.StringVar(&username, "username", "aurora", "Username to use for authorization")
flag.StringVar(&password, "password", "secret", "Password to use for authorization")
flag.StringVar(&zkUrl, "zkurl", "", "zookeeper url")
flag.StringVar(&drainCandidates, "drainCandidates", "", "Comma separated list of candidate hosts to drain")
flag.Parse()
}
func main() {
// Attempt to load leader from zookeeper
if *clustersConfig != "" {
clusters, err := realis.LoadClusters(*clustersConfig)
if clustersConfig != "" {
clusters, err := realis.LoadClusters(clustersConfig)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
cluster, ok := clusters[*clusterName]
cluster, ok := clusters[clusterName]
if !ok {
fmt.Printf("Cluster %s chosen doesn't exist\n", *clusterName)
fmt.Printf("Cluster %s chosen doesn't exist\n", clusterName)
os.Exit(1)
}
*url, err = realis.LeaderFromZK(cluster)
url, err = realis.LeaderFromZK(cluster)
if err != nil {
fmt.Println(err)
os.Exit(1)
@ -73,19 +82,23 @@ func main() {
}
//check if zkUrl is available.
if *zkUrl != "" {
fmt.Println("zkUrl: ", *zkUrl)
if zkUrl != "" {
fmt.Println("zkUrl: ", zkUrl)
cluster := &realis.Cluster{Name: "example",
AuthMechanism: "UNAUTHENTICATED",
ZK: *zkUrl,
ZK: zkUrl,
SchedZKPath: "/aurora/scheduler",
AgentRunDir: "latest",
AgentRoot: "/var/lib/mesos",
}
fmt.Printf("cluster: %+v \n", cluster)
//r, err = realis.NewRealisClient(realis.ZKCluster(cluster), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(15000))
r, err = realis.NewRealisClient(realis.ZKUrl(*zkUrl), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(15000), realis.BackOff(defaultBackoff))
r, err = realis.NewRealisClient(realis.ZKUrl(zkUrl),
realis.BasicAuth(username, password),
realis.ThriftJSON(),
realis.TimeoutMS(CONNECTION_TIMEOUT),
realis.BackOff(defaultBackoff))
if err != nil {
fmt.Println(err)
os.Exit(1)
@ -93,7 +106,11 @@ func main() {
monitor = &realis.Monitor{r}
} else {
r, err = realis.NewRealisClient(realis.SchedulerUrl(*url), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(20000), realis.BackOff(defaultBackoff))
r, err = realis.NewRealisClient(realis.SchedulerUrl(url),
realis.BasicAuth(username, password),
realis.ThriftJSON(),
realis.TimeoutMS(CONNECTION_TIMEOUT),
realis.BackOff(defaultBackoff))
if err != nil {
fmt.Println(err)
os.Exit(1)
@ -103,7 +120,7 @@ func main() {
}
defer r.Close()
switch *executor {
switch executor {
case "thermos":
payload, err := ioutil.ReadFile("examples/thermos_payload.json")
if err != nil {
@ -157,7 +174,7 @@ func main() {
os.Exit(1)
}
switch *cmd {
switch cmd {
case "create":
fmt.Println("Creating job")
resp, err := r.CreateJob(job)
@ -317,7 +334,7 @@ func main() {
currInstances := int32(len(live))
fmt.Println("Current num of instances: ", currInstances)
var instId int32
for k := range live{
for k := range live {
instId = k
break
}
@ -369,7 +386,7 @@ func main() {
os.Exit(1)
}
var instId int32
for k := range live{
for k := range live {
instId = k
break
}
@ -393,7 +410,7 @@ func main() {
break
case "updateDetails":
resp, err := r.JobUpdateDetails(aurora.JobUpdateQuery{
Key: &aurora.JobUpdateKey{job.JobKey(), *updateId}, Limit: 1})
Key: &aurora.JobUpdateKey{job.JobKey(), updateId}, Limit: 1})
if err != nil {
fmt.Println(err)
@ -403,7 +420,7 @@ func main() {
break
case "abortUpdate":
fmt.Println("Abort update")
resp, err := r.AbortJobUpdate(aurora.JobUpdateKey{job.JobKey(), *updateId}, "")
resp, err := r.AbortJobUpdate(aurora.JobUpdateKey{job.JobKey(), updateId}, "")
if err != nil {
fmt.Println(err)
os.Exit(1)
@ -412,7 +429,7 @@ func main() {
break
case "rollbackUpdate":
fmt.Println("Abort update")
resp, err := r.RollbackJobUpdate(aurora.JobUpdateKey{job.JobKey(), *updateId}, "")
resp, err := r.RollbackJobUpdate(aurora.JobUpdateKey{job.JobKey(), updateId}, "")
if err != nil {
fmt.Println(err)
os.Exit(1)
@ -427,7 +444,7 @@ func main() {
os.Exit(1)
}
var instId int32
for k := range live{
for k := range live {
instId = k
break
}
@ -482,6 +499,20 @@ func main() {
fmt.Printf("length: %d\n ", len(tasks))
fmt.Printf("tasks: %+v\n", tasks)
case "drainHosts":
fmt.Println("Setting hosts to DRAINING")
if drainCandidates == "" {
fmt.Println("No hosts specified to drain")
os.Exit(1)
}
hosts := strings.Split(drainCandidates, ",")
_, result, err := r.DrainHosts(hosts...)
if err != nil {
fmt.Printf("error: %+v\n", err.Error())
os.Exit(1)
}
fmt.Print(result.String())
default:
fmt.Println("Command not supported")
os.Exit(1)

View file

@ -56,12 +56,16 @@ type Realis interface {
ReestablishConn() error
RealisConfig() *RealisConfig
Close()
// Admin functions
DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error)
}
type realisClient struct {
config *RealisConfig
client *aurora.AuroraSchedulerManagerClient
readonlyClient *aurora.ReadOnlySchedulerClient
adminClient *aurora.AuroraAdminClient
}
type option func(*RealisConfig)
@ -151,7 +155,7 @@ func newTBinTransport(url string, timeout int) (thrift.TTransport, error) {
func NewRealisClient(options ...option) (Realis, error) {
config := &RealisConfig{}
fmt.Println(" options length: ", options)
fmt.Println(" options length: ", len(options))
for _, opt := range options {
opt(config)
}
@ -217,7 +221,8 @@ func NewRealisClient(options ...option) (Realis, error) {
return &realisClient{
config: config,
client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory),
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory)}, nil
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory),
adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory)}, nil
}
@ -350,7 +355,8 @@ func newClient(realisconfig *RealisConfig) Realis {
return &realisClient{
config: realisconfig,
client: aurora.NewAuroraSchedulerManagerClientFactory(realisconfig.transport, realisconfig.protoFactory),
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(realisconfig.transport, realisconfig.protoFactory)}
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(realisconfig.transport, realisconfig.protoFactory),
adminClient: aurora.NewAuroraAdminClientFactory(realisconfig.transport, realisconfig.protoFactory)}
}
// Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client
@ -463,6 +469,7 @@ func (r *realisClient) ReestablishConn() error {
AddBasicAuth(r.config, r.config.username, r.config.password)
r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory)
r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory)
r.adminClient = aurora.NewAuroraAdminClientFactory(r.config.transport, r.config.protoFactory)
} else if r.config.url != "" && r.config.username != "" && r.config.password != "" {
//Re-establish using scheduler url.
fmt.Println("ReestablishConn url: ", r.config.url)
@ -484,6 +491,7 @@ func (r *realisClient) ReestablishConn() error {
AddBasicAuth(r.config, r.config.username, r.config.password)
r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory)
r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory)
r.adminClient = aurora.NewAuroraAdminClientFactory(r.config.transport, r.config.protoFactory)
} else {
fmt.Println(" Missing Data for ReestablishConn ")
fmt.Println(" r.config.cluster: ", r.config.cluster)
@ -679,7 +687,7 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
if resp, err = r.client.CreateJob(auroraJob.JobConfig()); err == nil {
return response.ResponseCodeCheck(resp)
}
fmt.Println("CreateJob err: %+v\n", err)
fmt.Printf("CreateJob err: %+v\n", err)
err1 := r.ReestablishConn()
if err1 != nil {
fmt.Println("error in ReestablishConn: ", err1)
@ -1148,3 +1156,62 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
return nil, errors.Wrap(err, "Unable to roll back job update")
}
// Set a list of nodes to DRAINING. This means nothing will be able to be scheduled on them and any existing
// tasks will be killed and re-scheduled elsewhere in the cluster. Tasks from DRAINING nodes are not guaranteed
// to return to running unless there is enough capacity in the cluster to run them.
func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error) {
var resp *aurora.Response
var result *aurora.DrainHostsResult_
var clientErr, payloadErr error
if len(hosts) == 0 {
return nil, nil, errors.New("no hosts provided to drain")
}
drainList := aurora.NewHosts()
drainList.HostNames = make(map[string]bool)
for _, host := range hosts {
drainList.HostNames[host] = true
}
retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) {
// Send thrift call, if we have a thrift send error, attempt to reconnect
// and continue trying to resend command
if resp, clientErr = r.adminClient.DrainHosts(drainList); clientErr != nil {
// Experienced an connection error
err1 := r.ReestablishConn()
if err1 != nil {
fmt.Println("error in re-establishing connection: ", err1)
}
return false, nil
}
// If error is NOT due to connection
if _, payloadErr = response.ResponseCodeCheck(resp); payloadErr != nil {
// TODO(rdelvalle): an leader election may cause the response to have
// failed when it should have succeeded. Retry everything for now until
// we figure out a more concrete fix.
return false, nil
}
// Successful call
return true, nil
})
if resp != nil && resp.GetResult_() != nil {
result = resp.GetResult_().GetDrainHostsResult_()
}
// Timed out on retries. *Note that when we fix the unexpected errors with a correct payload,
// this will can become either a timeout error or a payload error
if retryErr != nil {
return resp, result, errors.Wrap(clientErr, "Unable to recover connection")
}
return resp, result, nil
}

56
retry.go Normal file
View file

@ -0,0 +1,56 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Modified version of the Kubernetes exponential-backoff code
package realis
import (
"time"
"errors"
)
// ConditionFunc returns true if the condition is satisfied, or an error
// if the loop should be aborted.
type ConditionFunc func() (done bool, err error)
// ExponentialBackoff repeats a condition check with exponential backoff.
//
// It checks the condition up to Steps times, increasing the wait by multiplying
// the previous duration by Factor.
//
// If Jitter is greater than zero, a random amount of each duration is added
// (between duration and duration*(1+jitter)).
//
// If the condition never returns true, ErrWaitTimeout is returned. All other
// errors terminate immediately.
func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
duration := backoff.Duration
for i := 0; i < backoff.Steps; i++ {
if i != 0 {
adjusted := duration
if backoff.Jitter > 0.0 {
adjusted = Jitter(duration, backoff.Jitter)
}
time.Sleep(adjusted)
duration = time.Duration(float64(duration) * backoff.Factor)
}
if ok, err := condition(); err != nil || ok {
return err
}
}
return errors.New("Timed out while retrying")
}