This commit is contained in:
Mukkavilli, Sunil 2017-04-18 16:10:58 -07:00
commit 89337f9c7f
7 changed files with 298 additions and 48 deletions

View file

@ -1,11 +1,10 @@
# gorealis [![GoDoc](https://godoc.org/github.com/rdelval/gorealis?status.svg)](https://godoc.org/github.com/rdelval/gorealis)
Go library for communicating with [Apache Aurora](https://github.com/apache/aurora).
Named after the northern lights (Aurora Borealis).
Go library for interacting with [Apache Aurora](https://github.com/apache/aurora).
### Aurora version compatibility
Please see [.auroraversion](./.auroraversion) to see the latest Aurora version against which this
library has been tested. Vendoring a working version of this library is highly recommended.
library has been tested.
## Usage
@ -17,9 +16,5 @@ library has been tested. Vendoring a working version of this library is highly r
* Create or import a custom transport that uses https://github.com/jmcvetta/napping to improve efficiency
* End to end testing with Vagrant setup
## Importing
* We suggest using a vendoring tool such as [govendor](https://github.com/kardianos/govendor) and
fetching by version, for example: `govendor fetch github.com/rdelval/gorealis@v1`
## Contributions
Contributions are very much welcome. Please raise an issue so that the contribution may be discussed before it's made.
Contributions are always welcome. Please raise an issue so that the contribution may be discussed before it's made.

View file

@ -20,6 +20,8 @@ import (
"io/ioutil"
"os"
"time"
"github.com/rdelval/gorealis"
"github.com/rdelval/gorealis/gen-go/apache/aurora"
"github.com/rdelval/gorealis/response"
@ -63,6 +65,13 @@ func main() {
var monitor *realis.Monitor
var r realis.Realis
var defaultBackoff = &realis.Backoff{
Steps: 2,
Duration: 10 * time.Second,
Factor: 2.0,
Jitter: 0.1,
}
//check if zkUrl is available.
if *zkUrl != "" {
fmt.Println("zkUrl: ", *zkUrl)
@ -75,7 +84,8 @@ func main() {
}
fmt.Printf("cluster: %+v \n", cluster)
r, err = realis.NewDefaultClientUsingCluster(cluster, *username, *password)
//r, err = realis.NewRealisClient(realis.ZKCluster(cluster), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(15000))
r, err = realis.NewRealisClient(realis.ZKUrl(*zkUrl), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(15000), realis.BackOff(defaultBackoff))
if err != nil {
fmt.Println(err)
os.Exit(1)
@ -83,11 +93,12 @@ func main() {
monitor = &realis.Monitor{r}
} else {
r, err = realis.NewDefaultClientUsingUrl(*url, *username, *password)
r, err = realis.NewRealisClient(realis.SchedulerUrl(*url), realis.BasicAuth(*username, *password), realis.ThriftJSON(), realis.TimeoutMS(20000), realis.BackOff(defaultBackoff))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
monitor = &realis.Monitor{r}
}
defer r.Close()

View file

@ -36,13 +36,14 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout
Limit: 1,
}
duration := defaultBackoff.Duration
defaultBackoff := m.Client.RealisConfig().backoff
duration := defaultBackoff.Duration //defaultBackoff.Duration
var err error
var respDetail *aurora.Response
for i := 0; i*interval <= timeout; i++ {
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
for step := 0; step < defaultBackoff.Steps; step++ {
if step != 0 {
adjusted := duration
if defaultBackoff.Jitter > 0.0 {
adjusted = Jitter(duration, defaultBackoff.Jitter)
@ -94,13 +95,15 @@ func (m *Monitor) JobUpdate(updateKey aurora.JobUpdateKey, interval int, timeout
}
func (m *Monitor) Instances(key *aurora.JobKey, instances int32, interval int, timeout int) (bool, error) {
defaultBackoff := m.Client.RealisConfig().backoff
duration := defaultBackoff.Duration
var err error
var live map[int32]bool
for i := 0; i*interval < timeout; i++ {
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
for step := 0; step < defaultBackoff.Steps; step++ {
if step != 0 {
adjusted := duration
if defaultBackoff.Jitter > 0.0 {
adjusted = Jitter(duration, defaultBackoff.Jitter)

262
realis.go
View file

@ -31,6 +31,8 @@ import (
"github.com/rdelval/gorealis/response"
)
const VERSION = "1.0.4"
type Realis interface {
AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error)
AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error)
@ -51,6 +53,7 @@ type Realis interface {
StartCronJob(key *aurora.JobKey) (*aurora.Response, error)
GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error)
ReestablishConn() error
RealisConfig() *RealisConfig
Close()
}
@ -60,13 +63,170 @@ type realisClient struct {
readonlyClient *aurora.ReadOnlySchedulerClient
}
type option func(*RealisConfig)
//Config sets for options in RealisConfig.
func BasicAuth(username, password string) option {
return func(config *RealisConfig) {
config.username = username
config.password = password
}
}
func SchedulerUrl(url string) option {
return func(config *RealisConfig) {
config.url = url
}
}
func TimeoutMS(timeout int) option {
return func(config *RealisConfig) {
config.timeoutms = timeout
}
}
func ZKCluster(cluster *Cluster) option {
return func(config *RealisConfig) {
config.cluster = cluster
}
}
func ZKUrl(url string) option {
return func(config *RealisConfig) {
config.cluster = GetDefaultClusterFromZKUrl(url)
}
}
func Retries(backoff *Backoff) option {
return func(config *RealisConfig) {
config.backoff = backoff
}
}
func ThriftJSON() option {
return func(config *RealisConfig) {
config.jsonTransport = true
}
}
func ThriftBinary() option {
return func(config *RealisConfig) {
config.binTransport = true
}
}
func BackOff(b *Backoff) option {
return func(config *RealisConfig) {
config.backoff = b
}
}
func newTJSONTransport(url string, timeout int) (thrift.TTransport, error) {
trans, err := defaultTTransport(url, timeout)
if err != nil {
return nil, errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Content-Type", "application/x-thrift")
httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION)
return trans, err
}
func newTBinTransport(url string, timeout int) (thrift.TTransport, error) {
trans, err := defaultTTransport(url, timeout)
if err != nil {
return nil, errors.Wrap(err, "Error creating realis")
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION)
return trans, err
}
func NewRealisClient(options ...option) (Realis, error) {
config := &RealisConfig{}
fmt.Println(" options length: ", options)
for _, opt := range options {
opt(config)
}
//Default timeout
if config.timeoutms == 0 {
config.timeoutms = 10000
}
//Set default Transport to JSON if needed.
if !config.jsonTransport && !config.binTransport {
config.jsonTransport = true
}
var url string
var err error
//Cluster or URL?
if config.cluster != nil {
url, err = LeaderFromZK(*config.cluster)
if err != nil {
fmt.Errorf("LeaderFromZK error: %+v\n ", err)
}
fmt.Println("schedURLFromZK: ", url)
} else if config.url != "" {
fmt.Println("Scheduler URL: ", config.url)
url = config.url
} else {
return nil, errors.New("Incomplete Options -- url or cluster required")
}
if config.jsonTransport {
trans, err := newTJSONTransport(url, config.timeoutms)
if err != nil {
return nil, errors.Wrap(err, "Error creating realis")
}
config.transport = trans
config.protoFactory = thrift.NewTJSONProtocolFactory()
} else if config.binTransport {
trans, err := newTBinTransport(url, config.timeoutms)
if err != nil {
return nil, errors.Wrap(err, "Error creating realis")
}
config.transport = trans
config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
}
//Basic Authentication.
if config.username != "" && config.password != "" {
AddBasicAuth(config, config.username, config.password)
}
//Set defaultBackoff if required.
if config.backoff == nil {
config.backoff = &defaultBackoff
} else {
defaultBackoff = *config.backoff
fmt.Printf(" updating default backoff : %+v\n", *config.backoff)
}
fmt.Printf("gorealis config: %+v\n", config)
return &realisClient{
config: config,
client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory),
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory)}, nil
}
// Wrapper object to provide future flexibility
type RealisConfig struct {
username, password string
url string
cluster *Cluster
transport thrift.TTransport
protoFactory thrift.TProtocolFactory
username, password string
url string
timeoutms int
binTransport, jsonTransport bool
cluster *Cluster
backoff *Backoff
transport thrift.TTransport
protoFactory thrift.TProtocolFactory
}
type Backoff struct {
@ -225,6 +385,7 @@ func newTJSONConfig(url string, timeoutms int) (*RealisConfig, error) {
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.SetHeader("Content-Type", "application/x-thrift")
httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION)
return &RealisConfig{transport: trans, protoFactory: thrift.NewTJSONProtocolFactory()}, nil
}
@ -237,9 +398,11 @@ func newTBinaryConfig(url string, timeoutms int) (*RealisConfig, error) {
}
httpTrans := (trans).(*thrift.THttpClient)
httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
httpTrans.SetHeader("User-Agent", "GoRealis v1.0.4")
httpTrans.SetHeader("User-Agent", "GoRealis v"+VERSION)
return &RealisConfig{transport: trans, protoFactory: thrift.NewTBinaryProtocolFactoryDefault()}, nil
@ -263,34 +426,60 @@ func (r *realisClient) ReestablishConn() error {
fmt.Println("ReestablishConn begin ....")
r.Close()
//First check cluster object for re-establish; if not available then try with scheduler url.
//var config *RealisConfig
var err error
var url string
if r.config.cluster != nil && r.config.username != "" && r.config.password != "" {
//Re-establish using cluster object.
url, err := LeaderFromZK(*r.config.cluster)
url, err = LeaderFromZK(*r.config.cluster)
if err != nil {
fmt.Errorf("LeaderFromZK error: %+v\n ", err)
}
fmt.Println("ReestablishConn url: ", url)
config, err := newDefaultConfig(url, 10000)
if r.config.jsonTransport {
trans, err := newTJSONTransport(url, r.config.timeoutms)
if err != nil {
return errors.Wrap(err, "Error creating realis")
}
r.config.transport = trans
r.config.protoFactory = thrift.NewTJSONProtocolFactory()
} else if r.config.binTransport {
trans, err := newTBinTransport(url, r.config.timeoutms)
if err != nil {
return errors.Wrap(err, "Error creating realis")
}
r.config.transport = trans
r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
}
if err != nil {
fmt.Println(err)
fmt.Println("error creating config: ", err)
}
// Configured for basic-auth
AddBasicAuth(config, r.config.username, r.config.password)
config.cluster = r.config.cluster
r.config = config
r.client = aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory)
r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory)
AddBasicAuth(r.config, r.config.username, r.config.password)
r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory)
r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory)
} else if r.config.url != "" && r.config.username != "" && r.config.password != "" {
//Re-establish using scheduler url.
//Create new configuration with default transport layer
config, err := newDefaultConfig(r.config.url, 10000)
if err != nil {
fmt.Println(err)
fmt.Println("ReestablishConn url: ", r.config.url)
if r.config.jsonTransport {
trans, err := newTJSONTransport(url, r.config.timeoutms)
if err != nil {
return errors.Wrap(err, "Error creating realis")
}
r.config.transport = trans
r.config.protoFactory = thrift.NewTJSONProtocolFactory()
} else if r.config.binTransport {
trans, err := newTBinTransport(url, r.config.timeoutms)
if err != nil {
return errors.Wrap(err, "Error creating realis")
}
r.config.transport = trans
r.config.protoFactory = thrift.NewTBinaryProtocolFactoryDefault()
}
AddBasicAuth(config, r.config.username, r.config.password)
r.config = config
r.client = aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory)
r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory)
AddBasicAuth(r.config, r.config.username, r.config.password)
r.client = aurora.NewAuroraSchedulerManagerClientFactory(r.config.transport, r.config.protoFactory)
r.readonlyClient = aurora.NewReadOnlySchedulerClientFactory(r.config.transport, r.config.protoFactory)
} else {
fmt.Println(" Missing Data for ReestablishConn ")
fmt.Println(" r.config.cluster: ", r.config.cluster)
@ -299,6 +488,7 @@ func (r *realisClient) ReestablishConn() error {
fmt.Println(" r.config.url: ", r.config.url)
return errors.New(" Missing Data for ReestablishConn ")
}
fmt.Printf(" config before return: %+v\n", r.config)
return nil
}
@ -318,6 +508,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -356,6 +547,7 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -385,6 +577,8 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
instanceIds := make(map[int32]bool)
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for _, instId := range instances {
@ -413,6 +607,10 @@ func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*a
return nil, errors.Wrap(err, "Error sending Kill command to Aurora Scheduler")
}
func (r *realisClient) RealisConfig() *RealisConfig {
return r.config
}
// Sends a kill message to the scheduler for all active tasks under a job.
func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
@ -425,7 +623,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
}
if len(instanceIds) > 0 {
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -459,9 +657,11 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
fmt.Println(" STEPS: ", i)
adjusted := duration
if defaultBackoff.Jitter > 0.0 {
adjusted = Jitter(duration, defaultBackoff.Jitter)
@ -470,10 +670,11 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
time.Sleep(adjusted)
duration = time.Duration(float64(duration) * defaultBackoff.Factor)
}
fmt.Println(" calling CreateJob")
if resp, err = r.client.CreateJob(auroraJob.JobConfig()); err == nil {
return response.ResponseCodeCheck(resp)
}
fmt.Println("CreateJob err: %+v\n", err)
err1 := r.ReestablishConn()
if err1 != nil {
fmt.Println("error in ReestablishConn: ", err1)
@ -486,6 +687,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -514,6 +716,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -542,6 +745,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -575,6 +779,8 @@ func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32)
}
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -610,6 +816,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
var resp *aurora.Response
var err error
if len(instanceIds) > 0 {
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -644,6 +851,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -673,6 +881,7 @@ func (r *realisClient) AbortJobUpdate(
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -702,6 +911,7 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -750,6 +960,7 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -795,6 +1006,7 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -844,6 +1056,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
@ -870,6 +1083,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
var resp *aurora.Response
var err error
defaultBackoff := r.config.backoff
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {

View file

@ -28,8 +28,10 @@ var r Realis
var thermosPayload []byte
func TestMain(m *testing.M) {
var err error
// New configuration to connect to Vagrant image
config, err := NewDefaultConfig("http://192.168.33.7:8081",10000)
r, err = NewDefaultClientUsingUrl("http://192.168.33.7:8081","aurora", "secret")
if err != nil {
fmt.Println("Please run vagrant box before running test suite")
os.Exit(1)
@ -41,10 +43,6 @@ func TestMain(m *testing.M) {
os.Exit(1)
}
// Configured for vagrant
AddBasicAuth(&config, "aurora", "secret")
r = NewClient(config)
os.Exit(m.Run())
}

View file

@ -103,7 +103,7 @@ func NewTHttpClientWithOptions(urlstr string, options THttpClientOptions) (TTran
if client == nil {
client = DefaultHttpClient
}
httpHeader := map[string][]string{}
httpHeader := map[string][]string{"Content-Type": []string{"application/x-thrift"}}
return &THttpClient{client: client, response: response, url: parsedURL, header: httpHeader}, nil
}
@ -121,7 +121,7 @@ func NewTHttpPostClientWithOptions(urlstr string, options THttpClientOptions) (T
if client == nil {
client = DefaultHttpClient
}
httpHeader := map[string][]string{}
httpHeader := map[string][]string{"Content-Type": []string{"application/x-thrift"}}
return &THttpClient{client: client, url: parsedURL, requestBuffer: bytes.NewBuffer(buf), header: httpHeader}, nil
}

33
zk.go
View file

@ -17,11 +17,12 @@ package realis
import (
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/samuel/go-zookeeper/zk"
"strconv"
"strings"
"time"
"github.com/pkg/errors"
"github.com/samuel/go-zookeeper/zk"
)
type Endpoint struct {
@ -43,6 +44,33 @@ func (NoopLogger) Printf(format string, a ...interface{}) {
// Loads leader from ZK endpoint.
func LeaderFromZK(cluster Cluster) (string, error) {
var err error
var zkurl string
duration := defaultBackoff.Duration
for i := 0; i < defaultBackoff.Steps; i++ {
if i != 0 {
adjusted := duration
if defaultBackoff.Jitter > 0.0 {
adjusted = Jitter(duration, defaultBackoff.Jitter)
}
fmt.Println(" sleeping for: ", adjusted)
time.Sleep(adjusted)
duration = time.Duration(float64(duration) * defaultBackoff.Factor)
}
if zkurl, err = leaderFromZK(cluster); err == nil {
return zkurl, err
}
if err != nil {
fmt.Println("error in LeaderFromZK: ", err)
}
}
return "", err
}
func leaderFromZK(cluster Cluster) (string, error) {
endpoints := strings.Split(cluster.ZK, ",")
//TODO (rdelvalle): When enabling debugging, change logger here
@ -92,4 +120,5 @@ func LeaderFromZK(cluster Cluster) (string, error) {
}
return "", errors.New("No leader found")
}