diff --git a/clusters_test.go b/clusters_test.go index 2e7118b..6f854dd 100644 --- a/clusters_test.go +++ b/clusters_test.go @@ -12,17 +12,18 @@ * limitations under the License. */ -package realis +package realis_test import ( "fmt" "github.com/stretchr/testify/assert" "testing" + "github.com/paypal/gorealis" ) func TestLoadClusters(t *testing.T) { - clusters, err := LoadClusters("examples/clusters.json") + clusters, err := realis.LoadClusters("examples/clusters.json") if err != nil { fmt.Print(err) } diff --git a/examples/client.go b/examples/client.go index ac21a28..4d15634 100644 --- a/examples/client.go +++ b/examples/client.go @@ -27,6 +27,7 @@ import ( "github.com/paypal/gorealis" "github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/paypal/gorealis/response" + "log" ) var cmd, executor, url, clustersConfig, clusterName, updateId, username, password, zkUrl, hostList string @@ -45,11 +46,10 @@ func init() { flag.StringVar(&zkUrl, "zkurl", "", "zookeeper url") flag.StringVar(&hostList, "hostList", "", "Comma separated list of hosts to operate on") flag.Parse() -} -func main() { - - // Attempt to load leader from zookeeper + // Attempt to load leader from zookeeper using a + // cluster.json file used for the default aurora client if provided. + // This will override the provided url in the arguments if clustersConfig != "" { clusters, err := realis.LoadClusters(clustersConfig) if err != nil { @@ -59,7 +59,7 @@ func main() { cluster, ok := clusters[clusterName] if !ok { - fmt.Printf("Cluster %s chosen doesn't exist\n", clusterName) + fmt.Printf("Cluster %s doesn't exist in the file provided\n", clusterName) os.Exit(1) } @@ -69,56 +69,42 @@ func main() { os.Exit(1) } } +} + +func main() { var job realis.Job var err error var monitor *realis.Monitor var r realis.Realis - var defaultBackoff = &realis.Backoff{ - Steps: 2, - Duration: 10 * time.Second, - Factor: 2.0, - Jitter: 0.1, + clientOptions := []realis.ClientOption{ + realis.BasicAuth(username, password), + realis.ThriftJSON(), + realis.TimeoutMS(CONNECTION_TIMEOUT), + realis.BackOff(&realis.Backoff{ + Steps: 2, + Duration: 10 * time.Second, + Factor: 2.0, + Jitter: 0.1, + }), + realis.SetLogger(log.New(os.Stdout, "realis-debug: ", log.Ldate)), } //check if zkUrl is available. if zkUrl != "" { fmt.Println("zkUrl: ", zkUrl) - cluster := &realis.Cluster{Name: "example", - AuthMechanism: "UNAUTHENTICATED", - ZK: zkUrl, - SchedZKPath: "/aurora/scheduler", - AgentRunDir: "latest", - AgentRoot: "/var/lib/mesos", - } - fmt.Printf("cluster: %+v \n", cluster) - - r, err = realis.NewRealisClient(realis.ZKUrl(zkUrl), - realis.BasicAuth(username, password), - realis.ThriftJSON(), - realis.TimeoutMS(CONNECTION_TIMEOUT), - realis.BackOff(defaultBackoff)) - - if err != nil { - fmt.Println(err) - os.Exit(1) - } - monitor = &realis.Monitor{r} - + clientOptions = append(clientOptions, realis.ZKUrl(zkUrl)) } else { - r, err = realis.NewRealisClient(realis.SchedulerUrl(url), - realis.BasicAuth(username, password), - realis.ThriftJSON(), - realis.TimeoutMS(CONNECTION_TIMEOUT), - realis.BackOff(defaultBackoff)) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - monitor = &realis.Monitor{r} + clientOptions = append(clientOptions, realis.SchedulerUrl(url)) } + + r, err = realis.NewRealisClient(clientOptions...) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + monitor = &realis.Monitor{r} defer r.Close() switch executor { @@ -197,6 +183,29 @@ func main() { } } + break + case "createService": + // Create a service with three instances using the update API instead of the createJob API + fmt.Println("Creating service") + settings := realis.NewUpdateSettings() + job.InstanceCount(3) + _, resp, err := r.CreateService(job, *settings) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + fmt.Println(resp.String()) + + if ok, err := monitor.JobUpdate(*resp.GetKey(), 5, 50); !ok || err != nil { + _, err := r.KillJob(job.JobKey()) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + fmt.Println("ok: ", ok) + fmt.Println("err: ", err) + } + break case "createDocker": fmt.Println("Creating a docker based job") @@ -352,7 +361,6 @@ func main() { } fmt.Println(resp.String()) break - case "flexDown": fmt.Println("Flexing down job") @@ -407,7 +415,6 @@ func main() { jobUpdateKey := response.JobUpdateKey(resp) monitor.JobUpdate(*jobUpdateKey, 5, 500) - break case "updateDetails": resp, err := r.JobUpdateDetails(aurora.JobUpdateQuery{ @@ -457,9 +464,7 @@ func main() { } print(config.String()) break - case "updatesummary": - fmt.Println("Getting job update summary") jobquery := &aurora.JobUpdateQuery{ Role: &job.JobKey().Role, @@ -471,7 +476,6 @@ func main() { os.Exit(1) } fmt.Println(updatesummary) - case "taskStatus": fmt.Println("Getting task status") taskQ := &aurora.TaskQuery{Role: job.JobKey().Role, @@ -485,7 +489,6 @@ func main() { } fmt.Printf("length: %d\n ", len(tasks)) fmt.Printf("tasks: %+v\n", tasks) - case "tasksWithoutConfig": fmt.Println("Getting task status") taskQ := &aurora.TaskQuery{Role: job.JobKey().Role, @@ -499,7 +502,6 @@ func main() { } fmt.Printf("length: %d\n ", len(tasks)) fmt.Printf("tasks: %+v\n", tasks) - case "drainHosts": fmt.Println("Setting hosts to DRAINING") if hostList == "" { @@ -531,7 +533,6 @@ func main() { } fmt.Print(result.String()) - case "endMaintenance": fmt.Println("Setting hosts to ACTIVE") if hostList == "" { @@ -563,7 +564,6 @@ func main() { } fmt.Print(result.String()) - default: fmt.Println("Command not supported") os.Exit(1) diff --git a/job.go b/job.go index 6a08ff4..8753500 100644 --- a/job.go +++ b/job.go @@ -151,8 +151,6 @@ func (j *AuroraJob) RAM(ram int64) Job { *j.resources["ram"].RamMb = ram j.jobConfig.TaskConfig.RamMb = ram //Will be deprecated soon - - return j } diff --git a/monitors.go b/monitors.go index 16ed7b2..75afff9 100644 --- a/monitors.go +++ b/monitors.go @@ -16,7 +16,6 @@ package realis import ( - "fmt" "time" "github.com/paypal/gorealis/gen-go/apache/aurora" @@ -59,7 +58,7 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout updateDetail := response.JobUpdateDetails(respDetail) if len(updateDetail) == 0 { - fmt.Println("No update found") + m.Client.RealisConfig().logger.Println("No update found") return false, errors.New("No update found for " + updateKey.String()) } status := updateDetail[0].Update.Summary.State.Status @@ -70,13 +69,13 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout // if we encounter an inactive state and it is not at rolled forward, update failed switch status { case aurora.JobUpdateStatus_ROLLED_FORWARD: - fmt.Println("Update succeded") + m.Client.RealisConfig().logger.Println("Update succeded") return true, nil case aurora.JobUpdateStatus_FAILED: - fmt.Println("Update failed") + m.Client.RealisConfig().logger.Println("Update failed") return false, errors.New(UpdateFailed) case aurora.JobUpdateStatus_ROLLED_BACK: - fmt.Println("rolled back") + m.Client.RealisConfig().logger.Println("rolled back") return false, errors.New(RolledBack) default: return false, nil diff --git a/realis.go b/realis.go index ac6c20d..bfa8e3f 100644 --- a/realis.go +++ b/realis.go @@ -31,28 +31,29 @@ import ( "github.com/pkg/errors" ) -const VERSION = "1.0.4" +const VERSION = "1.1.0" type Realis interface { AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) - RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error) CreateJob(auroraJob Job) (*aurora.Response, error) + CreateService(auroraJob Job, settings UpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) - GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) - GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error) + GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) + GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) + GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) KillJob(key *aurora.JobKey) (*aurora.Response, error) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) + RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) RestartJob(key *aurora.JobKey) (*aurora.Response, error) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) - GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) ReestablishConn() error RealisConfig() *RealisConfig Close() @@ -68,69 +69,101 @@ type realisClient struct { client *aurora.AuroraSchedulerManagerClient readonlyClient *aurora.ReadOnlySchedulerClient adminClient *aurora.AuroraAdminClient + logger Logger } -type option func(*RealisConfig) +type RealisConfig struct { + username, password string + url string + timeoutms int + binTransport, jsonTransport bool + cluster *Cluster + backoff *Backoff + transport thrift.TTransport + protoFactory thrift.TProtocolFactory + logger Logger +} + +type Backoff struct { + Duration time.Duration // the base duration + Factor float64 // Duration is multipled by factor each iteration + Jitter float64 // The amount of jitter applied each iteration + Steps int // Exit with error after this many steps +} + +var defaultBackoff = Backoff{ + Steps: 3, + Duration: 10 * time.Second, + Factor: 5.0, + Jitter: 0.1, +} + +type ClientOption func(*RealisConfig) //Config sets for options in RealisConfig. -func BasicAuth(username, password string) option { - +func BasicAuth(username, password string) ClientOption { return func(config *RealisConfig) { config.username = username config.password = password } } -func SchedulerUrl(url string) option { +func SchedulerUrl(url string) ClientOption { return func(config *RealisConfig) { config.url = url } } -func TimeoutMS(timeout int) option { +func TimeoutMS(timeout int) ClientOption { return func(config *RealisConfig) { config.timeoutms = timeout } } -func ZKCluster(cluster *Cluster) option { +func ZKCluster(cluster *Cluster) ClientOption { return func(config *RealisConfig) { config.cluster = cluster } } -func ZKUrl(url string) option { +func ZKUrl(url string) ClientOption { return func(config *RealisConfig) { config.cluster = GetDefaultClusterFromZKUrl(url) } } -func Retries(backoff *Backoff) option { +func Retries(backoff *Backoff) ClientOption { return func(config *RealisConfig) { config.backoff = backoff } } -func ThriftJSON() option { +func ThriftJSON() ClientOption { return func(config *RealisConfig) { config.jsonTransport = true } } -func ThriftBinary() option { +func ThriftBinary() ClientOption { return func(config *RealisConfig) { config.binTransport = true } } -func BackOff(b *Backoff) option { +func BackOff(b *Backoff) ClientOption { return func(config *RealisConfig) { config.backoff = b } } -func newTJSONTransport(url string, timeout int) (thrift.TTransport, error) { +// Using the word set to avoid name collision with Interface +func SetLogger(l Logger) ClientOption { + return func(config *RealisConfig) { + config.logger = l + } +} +func newTJSONTransport(url string, timeout int) (thrift.TTransport, error) { trans, err := defaultTTransport(url, timeout) if err != nil { return nil, errors.Wrap(err, "Error creating realis") @@ -155,35 +188,41 @@ func newTBinTransport(url string, timeout int) (thrift.TTransport, error) { return trans, err } -func NewRealisClient(options ...option) (Realis, error) { +func NewRealisClient(options ...ClientOption) (Realis, error) { config := &RealisConfig{} - fmt.Println(" options length: ", len(options)) + + // Default configs + config.timeoutms = 10000 + config.backoff = &defaultBackoff + config.logger = NoopLogger{} + + // Override default configs where necessary for _, opt := range options { opt(config) } - //Default timeout - if config.timeoutms == 0 { - config.timeoutms = 10000 - } + + config.logger.Println("Number of options applied to config: ", len(options)) + //Set default Transport to JSON if needed. if !config.jsonTransport && !config.binTransport { config.jsonTransport = true } + var url string var err error - //Cluster or URL? + + // Determine how to get information to connect to the scheduler. + // Prioritize getting leader from ZK over using a direct URL. if config.cluster != nil { url, err = LeaderFromZK(*config.cluster) - // If ZK is configured, throw an error if the leader is unable to be determined if err != nil { return nil, errors.Wrap(err, "LeaderFromZK error") } - - fmt.Println("schedURLFromZK: ", url) + config.logger.Println("Scheduler URL from ZK: ", url) } else if config.url != "" { - fmt.Println("Scheduler URL: ", config.url) url = config.url + config.logger.Println("Scheduler URL: ", url) } else { return nil, errors.New("Incomplete Options -- url or cluster required") } @@ -193,9 +232,9 @@ func NewRealisClient(options ...option) (Realis, error) { if err != nil { return nil, errors.Wrap(err, "Error creating realis") } - config.transport = trans config.protoFactory = thrift.NewTJSONProtocolFactory() + } else if config.binTransport { trans, err := newTBinTransport(url, config.timeoutms) if err != nil { @@ -205,55 +244,22 @@ func NewRealisClient(options ...option) (Realis, error) { config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() } + config.logger.Printf("gorealis config url: %+v\n", config.url) + //Basic Authentication. if config.username != "" && config.password != "" { AddBasicAuth(config, config.username, config.password) } - //Set defaultBackoff if required. - if config.backoff == nil { - config.backoff = &defaultBackoff - } else { - defaultBackoff = *config.backoff - fmt.Printf(" updating default backoff : %+v\n", *config.backoff) - } - - fmt.Printf("gorealis config url: %+v\n", config.url) - return &realisClient{ config: config, client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory), readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory), - adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory)}, nil + adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory), + logger: config.logger}, nil } -// Wrapper object to provide future flexibility -type RealisConfig struct { - username, password string - url string - timeoutms int - binTransport, jsonTransport bool - cluster *Cluster - backoff *Backoff - transport thrift.TTransport - protoFactory thrift.TProtocolFactory -} - -type Backoff struct { - Duration time.Duration // the base duration - Factor float64 // Duration is multipled by factor each iteration - Jitter float64 // The amount of jitter applied each iteration - Steps int // Exit with error after this many steps -} - -var defaultBackoff = Backoff{ - Steps: 3, - Duration: 10 * time.Second, - Factor: 5.0, - Jitter: 0.1, -} - // Jitter returns a time.Duration between duration and duration + maxFactor * // duration. // @@ -267,33 +273,6 @@ func Jitter(duration time.Duration, maxFactor float64) time.Duration { return wait } -// Create a new Client with Cluster information and other details. - -func NewDefaultClientUsingCluster(cluster *Cluster, user, passwd string) (Realis, error) { - - url, err := LeaderFromZK(*cluster) - if err != nil { - fmt.Println(err) - return nil, err - } - fmt.Printf(" url: %s\n", url) - - //Create new configuration with default transport layer - config, err := newDefaultConfig(url, 10000) - if err != nil { - fmt.Println(err) - return nil, err - } - config.username = user - config.password = passwd - config.cluster = cluster - config.url = "" - // Configured for vagrant - AddBasicAuth(config, user, passwd) - r := newClient(config) - return r, nil -} - func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { return &Cluster{Name: "defaultCluster", AuthMechanism: "UNAUTHENTICATED", @@ -304,65 +283,6 @@ func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { } } -//This api would create default cluster object.. -func NewDefaultClientUsingZKUrl(zkUrl, user, passwd string) (Realis, error) { - - fmt.Printf(" zkUrl: %s\n", zkUrl) - cluster := GetDefaultClusterFromZKUrl(zkUrl) - - url, err := LeaderFromZK(*cluster) - if err != nil { - fmt.Println(err) - return nil, err - } - fmt.Printf(" url: %s\n", url) - - //Create new configuration with default transport layer - config, err := newDefaultConfig(url, 10000) - if err != nil { - fmt.Println(err) - return nil, err - } - config.username = user - config.password = passwd - config.cluster = cluster - config.url = "" - // Configured for vagrant - AddBasicAuth(config, user, passwd) - r := newClient(config) - return r, nil -} - -func NewDefaultClientUsingUrl(url, user, passwd string) (Realis, error) { - - fmt.Printf(" url: %s\n", url) - //Create new configuration with default transport layer - config, err := newDefaultConfig(url, 10000) - if err != nil { - fmt.Println(err) - return nil, err - } - config.username = user - config.password = passwd - config.url = url - config.cluster = nil - // Configured for vagrant - AddBasicAuth(config, user, passwd) - config.backoff = &Backoff{Steps: 2, Duration: 10 * time.Second, Factor: 2.0, Jitter: 0.1} - r := newClient(config) - - return r, nil -} - -// Create a new Client with a default transport layer -func newClient(realisconfig *RealisConfig) Realis { - return &realisClient{ - config: realisconfig, - client: aurora.NewAuroraSchedulerManagerClientFactory(realisconfig.transport, realisconfig.protoFactory), - readonlyClient: aurora.NewReadOnlySchedulerClientFactory(realisconfig.transport, realisconfig.protoFactory), - adminClient: aurora.NewAuroraAdminClientFactory(realisconfig.transport, realisconfig.protoFactory)} -} - // Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client func defaultTTransport(urlstr string, timeoutms int) (thrift.TTransport, error) { jar, err := cookiejar.New(nil) @@ -439,7 +359,7 @@ func basicAuth(username, password string) string { func (r *realisClient) ReestablishConn() error { //close existing connection.. - fmt.Println("ReestablishConn begin ....") + r.logger.Println("ReestablishConn begin ....") r.Close() //First check cluster object for re-establish; if not available then try with scheduler url. //var config *RealisConfig @@ -452,7 +372,7 @@ func (r *realisClient) ReestablishConn() error { if err != nil { fmt.Errorf("LeaderFromZK error: %+v\n ", err) } - fmt.Println("ReestablishConn url: ", url) + r.logger.Println("ReestablishConn url: ", url) if r.config.jsonTransport { trans, err := newTJSONTransport(url, r.config.timeoutms) if err != nil { @@ -469,7 +389,7 @@ func (r *realisClient) ReestablishConn() error { r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() } if err != nil { - fmt.Println("error creating config: ", err) + r.logger.Println("error creating config: ", err) } // Configured for basic-auth AddBasicAuth(r.config, r.config.username, r.config.password) @@ -478,7 +398,7 @@ func (r *realisClient) ReestablishConn() error { r.adminClient = aurora.NewAuroraAdminClientFactory(r.config.transport, r.config.protoFactory) } else if r.config.url != "" && r.config.username != "" && r.config.password != "" { //Re-establish using scheduler url. - fmt.Println("ReestablishConn url: ", r.config.url) + r.logger.Println("ReestablishConn url: ", r.config.url) if r.config.jsonTransport { trans, err := newTJSONTransport(url, r.config.timeoutms) if err != nil { @@ -499,14 +419,14 @@ func (r *realisClient) ReestablishConn() error { r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory) r.adminClient = aurora.NewAuroraAdminClientFactory(r.config.transport, r.config.protoFactory) } else { - fmt.Println(" Missing Data for ReestablishConn ") - fmt.Println(" r.config.cluster: ", r.config.cluster) - fmt.Println(" r.config.username: ", r.config.username) - fmt.Println(" r.config.passwd: ", r.config.password) - fmt.Println(" r.config.url: ", r.config.url) + r.logger.Println(" Missing Data for ReestablishConn ") + r.logger.Println(" r.config.cluster: ", r.config.cluster) + r.logger.Println(" r.config.username: ", r.config.username) + r.logger.Println(" r.config.passwd: ", r.config.password) + r.logger.Println(" r.config.url: ", r.config.url) return errors.New(" Missing Data for ReestablishConn ") } - fmt.Printf(" config url before return: %+v\n", r.config.url) + r.logger.Printf(" config url before return: %+v\n", r.config.url) return nil } @@ -645,6 +565,9 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { } // Sends a create job message to the scheduler with a specific job configuration. +// Although this API is able to create service jobs, it is better to use CreateService instead +// as that API uses the update thrift call which has a few extra features available. +// Use this API to create ad-hoc jobs. func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { var resp *aurora.Response var clientErr error @@ -669,6 +592,24 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { } +// This API uses an update thrift call to create the services giving a few more robust features. +func (r *realisClient) CreateService(auroraJob Job, settings UpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error) { + // Create a new job update object and ship it to the StartJobUpdate api + update := NewUpdateJob(auroraJob.TaskConfig(), &settings.settings) + update.InstanceCount(auroraJob.GetInstanceCount()) + + resp, err := r.StartJobUpdate(update, "") + if err != nil { + return resp, nil, errors.Wrap(err, "unable to create service") + } + + if resp != nil && resp.GetResult_() != nil { + return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil + } + + return resp, nil, errors.New("results object is nil") +} + func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) { var resp *aurora.Response var clientErr error @@ -962,6 +903,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks [] } +// Get the task configuration from the aurora scheduler for a job func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) { ids := make(map[int32]bool) @@ -1166,7 +1108,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au // Experienced an connection error err1 := r.ReestablishConn() if err1 != nil { - fmt.Println("error in re-establishing connection: ", err1) + r.logger.Println("error in re-establishing connection: ", err1) } return false, nil } diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 413d334..82b339a 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -55,6 +55,14 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } +func TestLeaderFromZK(t *testing.T) { + cluster := realis.GetDefaultClusterFromZKUrl("192.168.33.7:2181") + url, err := realis.LeaderFromZK(*cluster) + + assert.NoError(t, err) + assert.Equal(t, url, "http://aurora.local:8081") +} + func TestRealisClient_CreateJob_Thermos(t *testing.T) { job := realis.NewJob(). diff --git a/updatejob.go b/updatejob.go index 9b253a4..d772141 100644 --- a/updatejob.go +++ b/updatejob.go @@ -24,12 +24,15 @@ type UpdateJob struct { req *aurora.JobUpdateRequest } + + // Create a default UpdateJob object. func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob { req := aurora.NewJobUpdateRequest() req.TaskConfig = config - req.Settings = aurora.NewJobUpdateSettings() + s := NewUpdateSettings().Settings() + req.Settings = &s job := NewJob().(*AuroraJob) job.jobConfig.TaskConfig = config @@ -60,7 +63,6 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob { req.Settings.MaxPerInstanceFailures = 0 req.Settings.MaxFailedInstances = 0 req.Settings.RollbackOnFailure = true - req.Settings.WaitForBatchCompletion = false //TODO(rdelvalle): Deep copy job struct to avoid unexpected behavior return &UpdateJob{job, req} @@ -138,3 +140,66 @@ func (u *UpdateJob) RollbackOnFail(rollback bool) *UpdateJob { u.req.Settings.RollbackOnFailure = rollback return u } + + +// TODO(rdelvalle): Integrate this struct with the JobUpdate struct so that we don't repeat code +type UpdateSettings struct { + settings aurora.JobUpdateSettings +} + +func NewUpdateSettings() *UpdateSettings { + + us := new(UpdateSettings) + + // Mirrors defaults set by Pystachio + us.settings.UpdateOnlyTheseInstances = make(map[*aurora.Range]bool) + us.settings.UpdateGroupSize = 1 + us.settings.WaitForBatchCompletion = false + us.settings.MinWaitInInstanceRunningMs = 45000 + us.settings.MaxPerInstanceFailures = 0 + us.settings.MaxFailedInstances = 0 + us.settings.RollbackOnFailure = true + + return us +} + +// Max number of instances being updated at any given moment. +func (u *UpdateSettings) BatchSize(size int32) *UpdateSettings { + u.settings.UpdateGroupSize = size + return u +} + +// Minimum number of seconds a shard must remain in RUNNING state before considered a success. +func (u *UpdateSettings) WatchTime(ms int32) *UpdateSettings { + u.settings.MinWaitInInstanceRunningMs = ms + return u +} + +// Wait for all instances in a group to be done before moving on. +func (u *UpdateSettings) WaitForBatchCompletion(batchWait bool) *UpdateSettings { + u.settings.WaitForBatchCompletion = batchWait + return u +} + +// Max number of instance failures to tolerate before marking instance as FAILED. +func (u *UpdateSettings) MaxPerInstanceFailures(inst int32) *UpdateSettings { + u.settings.MaxPerInstanceFailures = inst + return u +} + +// Max number of FAILED instances to tolerate before terminating the update. +func (u *UpdateSettings) MaxFailedInstances(inst int32) *UpdateSettings { + u.settings.MaxFailedInstances = inst + return u +} + +// When False, prevents auto rollback of a failed update. +func (u *UpdateSettings) RollbackOnFail(rollback bool) *UpdateSettings { + u.settings.RollbackOnFailure = rollback + return u +} + +// Return internal Thrift API structure +func (u UpdateSettings) Settings() aurora.JobUpdateSettings { + return u.settings +} diff --git a/zk.go b/zk.go index 4a6f593..3344d8d 100644 --- a/zk.go +++ b/zk.go @@ -36,89 +36,70 @@ type ServiceInstance struct { Status string `json:"status"` } -type NoopLogger struct{} - -func (NoopLogger) Printf(format string, a ...interface{}) { -} // Retrieves current Aurora leader from ZK. func LeaderFromZK(cluster Cluster) (string, error) { - var err error var zkurl string - duration := defaultBackoff.Duration - for step := 0; step < defaultBackoff.Steps; step++ { + retryErr := ExponentialBackoff(defaultBackoff, func() (bool, error) { - // Attempt to find leader - zkurl, err = leaderFromZK(cluster) - if err == nil { - return zkurl, err + endpoints := strings.Split(cluster.ZK, ",") + + //TODO (rdelvalle): When enabling debugging, change logger here + c, _, err := zk.Connect(endpoints, time.Second*10, func(c *zk.Conn) { c.SetLogger(NoopLogger{}) }) + if err != nil { + return false, errors.Wrap(err, "Failed to connect to Zookeeper at "+cluster.ZK) } - // Backoff if we failed to determine leader - adjusted := duration - if defaultBackoff.Jitter > 0.0 { - adjusted = Jitter(duration, defaultBackoff.Jitter) + defer c.Close() + + // Open up descriptor for the ZK path given + children, _, _, err := c.ChildrenW(cluster.SchedZKPath) + if err != nil { + return false, errors.Wrapf(err, "Path %s doesn't exist on Zookeeper ", cluster.SchedZKPath) } - fmt.Printf("Error determining Aurora leader: %v; retrying in %v\n", err, adjusted) - time.Sleep(adjusted) - duration = time.Duration(float64(duration) * defaultBackoff.Factor) + + // Search for the leader through all the children in the given path + serviceInst := new(ServiceInstance) + for _, child := range children { + + // Only the leader will start with member_ + if strings.HasPrefix(child, "member_") { + + data, _, err := c.Get(cluster.SchedZKPath + "/" + child) + if err != nil { + return false, errors.Wrap(err, "Error fetching contents of leader") + } + + err = json.Unmarshal([]byte(data), serviceInst) + if err != nil { + return false, errors.Wrap(err, "Unable to unmarshall contents of leader") + } + + // Should only be one endpoint + if len(serviceInst.AdditionalEndpoints) > 1 { + fmt.Errorf("Ambiguous end points schemes") + } + + var scheme, host, port string + for k, v := range serviceInst.AdditionalEndpoints { + scheme = k + host = v.Host + port = strconv.Itoa(v.Port) + } + + zkurl = scheme + "://" + host + ":" + port + return true, nil + } + } + + return false, errors.New("No leader found") + }) + + if retryErr != nil { + return "", errors.Wrapf(retryErr, "Failed to determine leader after %v attempts", defaultBackoff.Steps) } - return "", errors.Wrapf(err, "Failed to determine leader after %v attempts", defaultBackoff.Steps) -} - -func leaderFromZK(cluster Cluster) (string, error) { - - endpoints := strings.Split(cluster.ZK, ",") - - //TODO (rdelvalle): When enabling debugging, change logger here - c, _, err := zk.Connect(endpoints, time.Second*10, func(c *zk.Conn) { c.SetLogger(NoopLogger{}) }) - if err != nil { - return "", errors.Wrap(err, "Failed to connect to Zookeeper at "+cluster.ZK) - } - - defer c.Close() - - children, _, _, err := c.ChildrenW(cluster.SchedZKPath) - if err != nil { - return "", errors.Wrapf(err, "Path %s doesn't exist on Zookeeper ", cluster.SchedZKPath) - } - - serviceInst := new(ServiceInstance) - - for _, child := range children { - - // Only the leader will start with member_ - if strings.HasPrefix(child, "member_") { - - data, _, err := c.Get(cluster.SchedZKPath + "/" + child) - if err != nil { - return "", errors.Wrap(err, "Error fetching contents of leader") - } - - err = json.Unmarshal([]byte(data), serviceInst) - if err != nil { - return "", errors.Wrap(err, "Unable to unmarshall contents of leader") - } - - // Should only be one endpoint - if len(serviceInst.AdditionalEndpoints) > 1 { - fmt.Errorf("Ambiguous end points schemes") - } - - var scheme, host, port string - for k, v := range serviceInst.AdditionalEndpoints { - scheme = k - host = v.Host - port = strconv.Itoa(v.Port) - } - - return scheme + "://" + host + ":" + port, nil - } - } - - return "", errors.New("No leader found") - + return zkurl, nil }