diff --git a/examples/client.go b/examples/client.go index 53b6d1e..5d7a01f 100644 --- a/examples/client.go +++ b/examples/client.go @@ -25,35 +25,44 @@ import ( "github.com/rdelval/gorealis" "github.com/rdelval/gorealis/gen-go/apache/aurora" "github.com/rdelval/gorealis/response" + "strings" ) -func main() { - cmd := flag.String("cmd", "", "Job request type to send to Aurora Scheduler") - executor := flag.String("executor", "thermos", "Executor to use") - url := flag.String("url", "", "URL at which the Aurora Scheduler exists as [url]:[port]") - clustersConfig := flag.String("clusters", "", "Location of the clusters.json file used by aurora.") - clusterName := flag.String("cluster", "devcluster", "Name of cluster to run job on (only necessary if clusters is set)") - updateId := flag.String("updateId", "", "Update ID to operate on") - username := flag.String("username", "aurora", "Username to use for authorization") - password := flag.String("password", "secret", "Password to use for authorization") - zkUrl := flag.String("zkurl", "", "zookeeper url") +var cmd, executor, url, clustersConfig, clusterName, updateId, username, password, zkUrl, drainCandidates string + +var CONNECTION_TIMEOUT = 20000 + +func init() { + flag.StringVar(&cmd, "cmd", "", "Job request type to send to Aurora Scheduler") + flag.StringVar(&executor, "executor", "thermos", "Executor to use") + flag.StringVar(&url, "url", "", "URL at which the Aurora Scheduler exists as [url]:[port]") + flag.StringVar(&clustersConfig, "clusters", "", "Location of the clusters.json file used by aurora.") + flag.StringVar(&clusterName, "cluster", "devcluster", "Name of cluster to run job on (only necessary if clusters is set)") + flag.StringVar(&updateId, "updateId", "", "Update ID to operate on") + flag.StringVar(&username, "username", "aurora", "Username to use for authorization") + flag.StringVar(&password, "password", "secret", "Password to use for authorization") + flag.StringVar(&zkUrl, "zkurl", "", "zookeeper url") + flag.StringVar(&drainCandidates, "drainCandidates", "", "Comma separated list of candidate hosts to drain") flag.Parse() +} + +func main() { // Attempt to load leader from zookeeper - if *clustersConfig != "" { - clusters, err := realis.LoadClusters(*clustersConfig) + if clustersConfig != "" { + clusters, err := realis.LoadClusters(clustersConfig) if err != nil { fmt.Println(err) os.Exit(1) } - cluster, ok := clusters[*clusterName] + cluster, ok := clusters[clusterName] if !ok { - fmt.Printf("Cluster %s chosen doesn't exist\n", *clusterName) + fmt.Printf("Cluster %s chosen doesn't exist\n", clusterName) os.Exit(1) } - *url, err = realis.LeaderFromZK(cluster) + url, err = realis.LeaderFromZK(cluster) if err != nil { fmt.Println(err) os.Exit(1) @@ -73,19 +82,23 @@ func main() { } //check if zkUrl is available. - if *zkUrl != "" { - fmt.Println("zkUrl: ", *zkUrl) + if zkUrl != "" { + fmt.Println("zkUrl: ", zkUrl) cluster := &realis.Cluster{Name: "example", AuthMechanism: "UNAUTHENTICATED", - ZK: *zkUrl, + ZK: zkUrl, SchedZKPath: "/aurora/scheduler", AgentRunDir: "latest", AgentRoot: "/var/lib/mesos", } fmt.Printf("cluster: %+v \n", cluster) - //r, err = realis.NewRealisClient(realis.ZKCluster(cluster), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(15000)) - r, err = realis.NewRealisClient(realis.ZKUrl(*zkUrl), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(15000), realis.BackOff(defaultBackoff)) + r, err = realis.NewRealisClient(realis.ZKUrl(zkUrl), + realis.BasicAuth(username, password), + realis.ThriftJSON(), + realis.TimeoutMS(CONNECTION_TIMEOUT), + realis.BackOff(defaultBackoff)) + if err != nil { fmt.Println(err) os.Exit(1) @@ -93,7 +106,11 @@ func main() { monitor = &realis.Monitor{r} } else { - r, err = realis.NewRealisClient(realis.SchedulerUrl(*url), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(20000), realis.BackOff(defaultBackoff)) + r, err = realis.NewRealisClient(realis.SchedulerUrl(url), + realis.BasicAuth(username, password), + realis.ThriftJSON(), + realis.TimeoutMS(CONNECTION_TIMEOUT), + realis.BackOff(defaultBackoff)) if err != nil { fmt.Println(err) os.Exit(1) @@ -103,7 +120,7 @@ func main() { } defer r.Close() - switch *executor { + switch executor { case "thermos": payload, err := ioutil.ReadFile("examples/thermos_payload.json") if err != nil { @@ -157,7 +174,7 @@ func main() { os.Exit(1) } - switch *cmd { + switch cmd { case "create": fmt.Println("Creating job") resp, err := r.CreateJob(job) @@ -317,7 +334,7 @@ func main() { currInstances := int32(len(live)) fmt.Println("Current num of instances: ", currInstances) var instId int32 - for k := range live{ + for k := range live { instId = k break } @@ -369,7 +386,7 @@ func main() { os.Exit(1) } var instId int32 - for k := range live{ + for k := range live { instId = k break } @@ -393,7 +410,7 @@ func main() { break case "updateDetails": resp, err := r.JobUpdateDetails(aurora.JobUpdateQuery{ - Key: &aurora.JobUpdateKey{job.JobKey(), *updateId}, Limit: 1}) + Key: &aurora.JobUpdateKey{job.JobKey(), updateId}, Limit: 1}) if err != nil { fmt.Println(err) @@ -403,7 +420,7 @@ func main() { break case "abortUpdate": fmt.Println("Abort update") - resp, err := r.AbortJobUpdate(aurora.JobUpdateKey{job.JobKey(), *updateId}, "") + resp, err := r.AbortJobUpdate(aurora.JobUpdateKey{job.JobKey(), updateId}, "") if err != nil { fmt.Println(err) os.Exit(1) @@ -412,7 +429,7 @@ func main() { break case "rollbackUpdate": fmt.Println("Abort update") - resp, err := r.RollbackJobUpdate(aurora.JobUpdateKey{job.JobKey(), *updateId}, "") + resp, err := r.RollbackJobUpdate(aurora.JobUpdateKey{job.JobKey(), updateId}, "") if err != nil { fmt.Println(err) os.Exit(1) @@ -427,7 +444,7 @@ func main() { os.Exit(1) } var instId int32 - for k := range live{ + for k := range live { instId = k break } @@ -482,6 +499,20 @@ func main() { fmt.Printf("length: %d\n ", len(tasks)) fmt.Printf("tasks: %+v\n", tasks) + case "drainHosts": + fmt.Println("Setting hosts to DRAINING") + if drainCandidates == "" { + fmt.Println("No hosts specified to drain") + os.Exit(1) + } + hosts := strings.Split(drainCandidates, ",") + _, result, err := r.DrainHosts(hosts...) + if err != nil { + fmt.Printf("error: %+v\n", err.Error()) + os.Exit(1) + } + fmt.Print(result.String()) + default: fmt.Println("Command not supported") os.Exit(1) diff --git a/realis.go b/realis.go index b278df1..37fb65a 100644 --- a/realis.go +++ b/realis.go @@ -56,12 +56,16 @@ type Realis interface { ReestablishConn() error RealisConfig() *RealisConfig Close() + + // Admin functions + DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error) } type realisClient struct { config *RealisConfig client *aurora.AuroraSchedulerManagerClient readonlyClient *aurora.ReadOnlySchedulerClient + adminClient *aurora.AuroraAdminClient } type option func(*RealisConfig) @@ -151,7 +155,7 @@ func newTBinTransport(url string, timeout int) (thrift.TTransport, error) { func NewRealisClient(options ...option) (Realis, error) { config := &RealisConfig{} - fmt.Println(" options length: ", options) + fmt.Println(" options length: ", len(options)) for _, opt := range options { opt(config) } @@ -217,7 +221,8 @@ func NewRealisClient(options ...option) (Realis, error) { return &realisClient{ config: config, client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory), - readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory)}, nil + readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory), + adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory)}, nil } @@ -350,7 +355,8 @@ func newClient(realisconfig *RealisConfig) Realis { return &realisClient{ config: realisconfig, client: aurora.NewAuroraSchedulerManagerClientFactory(realisconfig.transport, realisconfig.protoFactory), - readonlyClient: aurora.NewReadOnlySchedulerClientFactory(realisconfig.transport, realisconfig.protoFactory)} + readonlyClient: aurora.NewReadOnlySchedulerClientFactory(realisconfig.transport, realisconfig.protoFactory), + adminClient: aurora.NewAuroraAdminClientFactory(realisconfig.transport, realisconfig.protoFactory)} } // Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client @@ -463,6 +469,7 @@ func (r *realisClient) ReestablishConn() error { AddBasicAuth(r.config, r.config.username, r.config.password) r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory) r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory) + r.adminClient = aurora.NewAuroraAdminClientFactory(r.config.transport, r.config.protoFactory) } else if r.config.url != "" && r.config.username != "" && r.config.password != "" { //Re-establish using scheduler url. fmt.Println("ReestablishConn url: ", r.config.url) @@ -484,6 +491,7 @@ func (r *realisClient) ReestablishConn() error { AddBasicAuth(r.config, r.config.username, r.config.password) r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory) r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory) + r.adminClient = aurora.NewAuroraAdminClientFactory(r.config.transport, r.config.protoFactory) } else { fmt.Println(" Missing Data for ReestablishConn ") fmt.Println(" r.config.cluster: ", r.config.cluster) @@ -679,7 +687,7 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { if resp, err = r.client.CreateJob(auroraJob.JobConfig()); err == nil { return response.ResponseCodeCheck(resp) } - fmt.Println("CreateJob err: %+v\n", err) + fmt.Printf("CreateJob err: %+v\n", err) err1 := r.ReestablishConn() if err1 != nil { fmt.Println("error in ReestablishConn: ", err1) @@ -1148,3 +1156,62 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string return nil, errors.Wrap(err, "Unable to roll back job update") } + +// Set a list of nodes to DRAINING. This means nothing will be able to be scheduled on them and any existing +// tasks will be killed and re-scheduled elsewhere in the cluster. Tasks from DRAINING nodes are not guaranteed +// to return to running unless there is enough capacity in the cluster to run them. +func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error) { + + var resp *aurora.Response + var result *aurora.DrainHostsResult_ + var clientErr, payloadErr error + + if len(hosts) == 0 { + return nil, nil, errors.New("no hosts provided to drain") + } + + drainList := aurora.NewHosts() + drainList.HostNames = make(map[string]bool) + for _, host := range hosts { + drainList.HostNames[host] = true + } + + retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) { + + // Send thrift call, if we have a thrift send error, attempt to reconnect + // and continue trying to resend command + if resp, clientErr = r.adminClient.DrainHosts(drainList); clientErr != nil { + // Experienced an connection error + err1 := r.ReestablishConn() + if err1 != nil { + fmt.Println("error in re-establishing connection: ", err1) + } + return false, nil + } + + // If error is NOT due to connection + if _, payloadErr = response.ResponseCodeCheck(resp); payloadErr != nil { + // TODO(rdelvalle): an leader election may cause the response to have + // failed when it should have succeeded. Retry everything for now until + // we figure out a more concrete fix. + return false, nil + } + + // Successful call + return true, nil + + }) + + if resp != nil && resp.GetResult_() != nil { + result = resp.GetResult_().GetDrainHostsResult_() + } + + + // Timed out on retries. *Note that when we fix the unexpected errors with a correct payload, + // this will can become either a timeout error or a payload error + if retryErr != nil { + return resp, result, errors.Wrap(clientErr, "Unable to recover connection") + } + + return resp, result, nil +} diff --git a/retry.go b/retry.go new file mode 100644 index 0000000..fba37d4 --- /dev/null +++ b/retry.go @@ -0,0 +1,56 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Modified version of the Kubernetes exponential-backoff code + +package realis + +import ( + "time" + "errors" +) + +// ConditionFunc returns true if the condition is satisfied, or an error +// if the loop should be aborted. +type ConditionFunc func() (done bool, err error) + +// ExponentialBackoff repeats a condition check with exponential backoff. +// +// It checks the condition up to Steps times, increasing the wait by multiplying +// the previous duration by Factor. +// +// If Jitter is greater than zero, a random amount of each duration is added +// (between duration and duration*(1+jitter)). +// +// If the condition never returns true, ErrWaitTimeout is returned. All other +// errors terminate immediately. +func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { + duration := backoff.Duration + for i := 0; i < backoff.Steps; i++ { + if i != 0 { + adjusted := duration + if backoff.Jitter > 0.0 { + adjusted = Jitter(duration, backoff.Jitter) + } + time.Sleep(adjusted) + duration = time.Duration(float64(duration) * backoff.Factor) + } + if ok, err := condition(); err != nil || ok { + return err + } + } + return errors.New("Timed out while retrying") +}