diff --git a/examples/client.go b/examples/client.go index ac21a28..5dadf1b 100644 --- a/examples/client.go +++ b/examples/client.go @@ -45,11 +45,10 @@ func init() { flag.StringVar(&zkUrl, "zkurl", "", "zookeeper url") flag.StringVar(&hostList, "hostList", "", "Comma separated list of hosts to operate on") flag.Parse() -} -func main() { - - // Attempt to load leader from zookeeper + // Attempt to load leader from zookeeper using a + // cluster.json file used for the default aurora client if provided. + // This will override the provided url in the arguments if clustersConfig != "" { clusters, err := realis.LoadClusters(clustersConfig) if err != nil { @@ -59,7 +58,7 @@ func main() { cluster, ok := clusters[clusterName] if !ok { - fmt.Printf("Cluster %s chosen doesn't exist\n", clusterName) + fmt.Printf("Cluster %s doesn't exist in the file provided\n", clusterName) os.Exit(1) } @@ -69,17 +68,25 @@ func main() { os.Exit(1) } } +} + +func main() { var job realis.Job var err error var monitor *realis.Monitor var r realis.Realis - var defaultBackoff = &realis.Backoff{ - Steps: 2, - Duration: 10 * time.Second, - Factor: 2.0, - Jitter: 0.1, + clientOptions := []realis.ClientOption{ + realis.BasicAuth(username, password), + realis.ThriftJSON(), + realis.TimeoutMS(CONNECTION_TIMEOUT), + realis.BackOff(&realis.Backoff{ + Steps: 2, + Duration: 10 * time.Second, + Factor: 2.0, + Jitter: 0.1, + }), } //check if zkUrl is available. @@ -93,32 +100,17 @@ func main() { AgentRoot: "/var/lib/mesos", } fmt.Printf("cluster: %+v \n", cluster) - - r, err = realis.NewRealisClient(realis.ZKUrl(zkUrl), - realis.BasicAuth(username, password), - realis.ThriftJSON(), - realis.TimeoutMS(CONNECTION_TIMEOUT), - realis.BackOff(defaultBackoff)) - - if err != nil { - fmt.Println(err) - os.Exit(1) - } - monitor = &realis.Monitor{r} - + clientOptions = append(clientOptions, realis.ZKUrl(zkUrl)) } else { - r, err = realis.NewRealisClient(realis.SchedulerUrl(url), - realis.BasicAuth(username, password), - realis.ThriftJSON(), - realis.TimeoutMS(CONNECTION_TIMEOUT), - realis.BackOff(defaultBackoff)) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - monitor = &realis.Monitor{r} + clientOptions = append(clientOptions, realis.SchedulerUrl(url)) } + + r, err = realis.NewRealisClient(clientOptions...) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + monitor = &realis.Monitor{r} defer r.Close() switch executor { diff --git a/job.go b/job.go index 6a08ff4..8753500 100644 --- a/job.go +++ b/job.go @@ -151,8 +151,6 @@ func (j *AuroraJob) RAM(ram int64) Job { *j.resources["ram"].RamMb = ram j.jobConfig.TaskConfig.RamMb = ram //Will be deprecated soon - - return j } diff --git a/realis.go b/realis.go index ac6c20d..a1c219f 100644 --- a/realis.go +++ b/realis.go @@ -25,10 +25,15 @@ import ( "math/rand" + "log" + + "bytes" + "git.apache.org/thrift.git/lib/go/thrift" "github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/paypal/gorealis/response" "github.com/pkg/errors" + "os" ) const VERSION = "1.0.4" @@ -36,23 +41,23 @@ const VERSION = "1.0.4" type Realis interface { AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) - RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error) CreateJob(auroraJob Job) (*aurora.Response, error) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) - GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) - GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error) + GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) + GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) + GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) KillJob(key *aurora.JobKey) (*aurora.Response, error) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) + RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) RestartJob(key *aurora.JobKey) (*aurora.Response, error) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) - GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) ReestablishConn() error RealisConfig() *RealisConfig Close() @@ -68,69 +73,94 @@ type realisClient struct { client *aurora.AuroraSchedulerManagerClient readonlyClient *aurora.ReadOnlySchedulerClient adminClient *aurora.AuroraAdminClient + logger *log.Logger } -type option func(*RealisConfig) +type RealisConfig struct { + username, password string + url string + timeoutms int + binTransport, jsonTransport bool + cluster *Cluster + backoff *Backoff + transport thrift.TTransport + protoFactory thrift.TProtocolFactory + logger *log.Logger +} + +type Backoff struct { + Duration time.Duration // the base duration + Factor float64 // Duration is multipled by factor each iteration + Jitter float64 // The amount of jitter applied each iteration + Steps int // Exit with error after this many steps +} + +var defaultBackoff = Backoff{ + Steps: 3, + Duration: 10 * time.Second, + Factor: 5.0, + Jitter: 0.1, +} + +type ClientOption func(*RealisConfig) //Config sets for options in RealisConfig. -func BasicAuth(username, password string) option { - +func BasicAuth(username, password string) ClientOption { return func(config *RealisConfig) { config.username = username config.password = password } } -func SchedulerUrl(url string) option { +func SchedulerUrl(url string) ClientOption { return func(config *RealisConfig) { config.url = url } } -func TimeoutMS(timeout int) option { +func TimeoutMS(timeout int) ClientOption { return func(config *RealisConfig) { config.timeoutms = timeout } } -func ZKCluster(cluster *Cluster) option { +func ZKCluster(cluster *Cluster) ClientOption { return func(config *RealisConfig) { config.cluster = cluster } } -func ZKUrl(url string) option { +func ZKUrl(url string) ClientOption { return func(config *RealisConfig) { config.cluster = GetDefaultClusterFromZKUrl(url) } } -func Retries(backoff *Backoff) option { +func Retries(backoff *Backoff) ClientOption { return func(config *RealisConfig) { config.backoff = backoff } } -func ThriftJSON() option { +func ThriftJSON() ClientOption { return func(config *RealisConfig) { config.jsonTransport = true } } -func ThriftBinary() option { +func ThriftBinary() ClientOption { return func(config *RealisConfig) { config.binTransport = true } } -func BackOff(b *Backoff) option { +func BackOff(b *Backoff) ClientOption { return func(config *RealisConfig) { config.backoff = b } } func newTJSONTransport(url string, timeout int) (thrift.TTransport, error) { - trans, err := defaultTTransport(url, timeout) if err != nil { return nil, errors.Wrap(err, "Error creating realis") @@ -155,12 +185,20 @@ func newTBinTransport(url string, timeout int) (thrift.TTransport, error) { return trans, err } -func NewRealisClient(options ...option) (Realis, error) { +func NewRealisClient(options ...ClientOption) (Realis, error) { config := &RealisConfig{} - fmt.Println(" options length: ", len(options)) for _, opt := range options { opt(config) } + + if config.logger == nil { + buf := new(bytes.Buffer) + config.logger = log.New(buf, "gorealis debug: ", 0) + config.logger.SetOutput(os.Stdout) + } + + config.logger.Println("Number of options applied to config: ", len(options)) + //Default timeout if config.timeoutms == 0 { config.timeoutms = 10000 @@ -171,19 +209,19 @@ func NewRealisClient(options ...option) (Realis, error) { } var url string var err error - //Cluster or URL? + + // Determine how to get information to connect to the scheduler. + // Prioritize getting leader from ZK over using a direct URL. if config.cluster != nil { url, err = LeaderFromZK(*config.cluster) - // If ZK is configured, throw an error if the leader is unable to be determined if err != nil { return nil, errors.Wrap(err, "LeaderFromZK error") } - - fmt.Println("schedURLFromZK: ", url) + config.logger.Println("Scheduler URL from ZK: ", url) } else if config.url != "" { - fmt.Println("Scheduler URL: ", config.url) url = config.url + config.logger.Println("Scheduler URL: ", url) } else { return nil, errors.New("Incomplete Options -- url or cluster required") } @@ -215,10 +253,10 @@ func NewRealisClient(options ...option) (Realis, error) { config.backoff = &defaultBackoff } else { defaultBackoff = *config.backoff - fmt.Printf(" updating default backoff : %+v\n", *config.backoff) + config.logger.Printf("Updating default backoff : %+v\n", *config.backoff) } - fmt.Printf("gorealis config url: %+v\n", config.url) + config.logger.Printf("gorealis config url: %+v\n", config.url) return &realisClient{ config: config, @@ -228,32 +266,6 @@ func NewRealisClient(options ...option) (Realis, error) { } -// Wrapper object to provide future flexibility -type RealisConfig struct { - username, password string - url string - timeoutms int - binTransport, jsonTransport bool - cluster *Cluster - backoff *Backoff - transport thrift.TTransport - protoFactory thrift.TProtocolFactory -} - -type Backoff struct { - Duration time.Duration // the base duration - Factor float64 // Duration is multipled by factor each iteration - Jitter float64 // The amount of jitter applied each iteration - Steps int // Exit with error after this many steps -} - -var defaultBackoff = Backoff{ - Steps: 3, - Duration: 10 * time.Second, - Factor: 5.0, - Jitter: 0.1, -} - // Jitter returns a time.Duration between duration and duration + maxFactor * // duration. // @@ -267,33 +279,6 @@ func Jitter(duration time.Duration, maxFactor float64) time.Duration { return wait } -// Create a new Client with Cluster information and other details. - -func NewDefaultClientUsingCluster(cluster *Cluster, user, passwd string) (Realis, error) { - - url, err := LeaderFromZK(*cluster) - if err != nil { - fmt.Println(err) - return nil, err - } - fmt.Printf(" url: %s\n", url) - - //Create new configuration with default transport layer - config, err := newDefaultConfig(url, 10000) - if err != nil { - fmt.Println(err) - return nil, err - } - config.username = user - config.password = passwd - config.cluster = cluster - config.url = "" - // Configured for vagrant - AddBasicAuth(config, user, passwd) - r := newClient(config) - return r, nil -} - func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { return &Cluster{Name: "defaultCluster", AuthMechanism: "UNAUTHENTICATED", @@ -304,65 +289,6 @@ func GetDefaultClusterFromZKUrl(zkurl string) *Cluster { } } -//This api would create default cluster object.. -func NewDefaultClientUsingZKUrl(zkUrl, user, passwd string) (Realis, error) { - - fmt.Printf(" zkUrl: %s\n", zkUrl) - cluster := GetDefaultClusterFromZKUrl(zkUrl) - - url, err := LeaderFromZK(*cluster) - if err != nil { - fmt.Println(err) - return nil, err - } - fmt.Printf(" url: %s\n", url) - - //Create new configuration with default transport layer - config, err := newDefaultConfig(url, 10000) - if err != nil { - fmt.Println(err) - return nil, err - } - config.username = user - config.password = passwd - config.cluster = cluster - config.url = "" - // Configured for vagrant - AddBasicAuth(config, user, passwd) - r := newClient(config) - return r, nil -} - -func NewDefaultClientUsingUrl(url, user, passwd string) (Realis, error) { - - fmt.Printf(" url: %s\n", url) - //Create new configuration with default transport layer - config, err := newDefaultConfig(url, 10000) - if err != nil { - fmt.Println(err) - return nil, err - } - config.username = user - config.password = passwd - config.url = url - config.cluster = nil - // Configured for vagrant - AddBasicAuth(config, user, passwd) - config.backoff = &Backoff{Steps: 2, Duration: 10 * time.Second, Factor: 2.0, Jitter: 0.1} - r := newClient(config) - - return r, nil -} - -// Create a new Client with a default transport layer -func newClient(realisconfig *RealisConfig) Realis { - return &realisClient{ - config: realisconfig, - client: aurora.NewAuroraSchedulerManagerClientFactory(realisconfig.transport, realisconfig.protoFactory), - readonlyClient: aurora.NewReadOnlySchedulerClientFactory(realisconfig.transport, realisconfig.protoFactory), - adminClient: aurora.NewAuroraAdminClientFactory(realisconfig.transport, realisconfig.protoFactory)} -} - // Creates a default Thrift Transport object for communications in gorealis using an HTTP Post Client func defaultTTransport(urlstr string, timeoutms int) (thrift.TTransport, error) { jar, err := cookiejar.New(nil) @@ -439,7 +365,7 @@ func basicAuth(username, password string) string { func (r *realisClient) ReestablishConn() error { //close existing connection.. - fmt.Println("ReestablishConn begin ....") + r.config.logger.Println("ReestablishConn begin ....") r.Close() //First check cluster object for re-establish; if not available then try with scheduler url. //var config *RealisConfig @@ -452,7 +378,7 @@ func (r *realisClient) ReestablishConn() error { if err != nil { fmt.Errorf("LeaderFromZK error: %+v\n ", err) } - fmt.Println("ReestablishConn url: ", url) + r.config.logger.Println("ReestablishConn url: ", url) if r.config.jsonTransport { trans, err := newTJSONTransport(url, r.config.timeoutms) if err != nil { @@ -469,7 +395,7 @@ func (r *realisClient) ReestablishConn() error { r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault() } if err != nil { - fmt.Println("error creating config: ", err) + r.config.logger.Println("error creating config: ", err) } // Configured for basic-auth AddBasicAuth(r.config, r.config.username, r.config.password) @@ -478,7 +404,7 @@ func (r *realisClient) ReestablishConn() error { r.adminClient = aurora.NewAuroraAdminClientFactory(r.config.transport, r.config.protoFactory) } else if r.config.url != "" && r.config.username != "" && r.config.password != "" { //Re-establish using scheduler url. - fmt.Println("ReestablishConn url: ", r.config.url) + r.config.logger.Println("ReestablishConn url: ", r.config.url) if r.config.jsonTransport { trans, err := newTJSONTransport(url, r.config.timeoutms) if err != nil { @@ -499,14 +425,14 @@ func (r *realisClient) ReestablishConn() error { r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory) r.adminClient = aurora.NewAuroraAdminClientFactory(r.config.transport, r.config.protoFactory) } else { - fmt.Println(" Missing Data for ReestablishConn ") - fmt.Println(" r.config.cluster: ", r.config.cluster) - fmt.Println(" r.config.username: ", r.config.username) - fmt.Println(" r.config.passwd: ", r.config.password) - fmt.Println(" r.config.url: ", r.config.url) + r.config.logger.Println(" Missing Data for ReestablishConn ") + r.config.logger.Println(" r.config.cluster: ", r.config.cluster) + r.config.logger.Println(" r.config.username: ", r.config.username) + r.config.logger.Println(" r.config.passwd: ", r.config.password) + r.config.logger.Println(" r.config.url: ", r.config.url) return errors.New(" Missing Data for ReestablishConn ") } - fmt.Printf(" config url before return: %+v\n", r.config.url) + r.config.logger.Printf(" config url before return: %+v\n", r.config.url) return nil } @@ -645,6 +571,9 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) { } // Sends a create job message to the scheduler with a specific job configuration. +// Although this API is able to create service jobs, it is better to use CreateService instead +// as that API uses the update thrift call which has a few extra features available. +// Use this API to create ad-hoc jobs. func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) { var resp *aurora.Response var clientErr error @@ -962,6 +891,7 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks [] } +// Get the task configuration from the aurora scheduler for a job func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) { ids := make(map[int32]bool) @@ -1166,7 +1096,7 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au // Experienced an connection error err1 := r.ReestablishConn() if err1 != nil { - fmt.Println("error in re-establishing connection: ", err1) + r.config.logger.Println("error in re-establishing connection: ", err1) } return false, nil } diff --git a/updatejob.go b/updatejob.go index 9b253a4..da0ac96 100644 --- a/updatejob.go +++ b/updatejob.go @@ -60,7 +60,6 @@ func NewDefaultUpdateJob(config *aurora.TaskConfig) *UpdateJob { req.Settings.MaxPerInstanceFailures = 0 req.Settings.MaxFailedInstances = 0 req.Settings.RollbackOnFailure = true - req.Settings.WaitForBatchCompletion = false //TODO(rdelvalle): Deep copy job struct to avoid unexpected behavior return &UpdateJob{job, req}