Misc. bug fixes and addition of debug logging (#61)
* Fixing possible race condition when passing backoff around as a pointer. * Adding a debug logger that is turned off by default. If debug is turned on, but a logger has not been assigned, a default logger that will print to STDOUT will be created. * Making Mutex a pointer so that there's no chance it can accidentally be copied. * Removing a leftover helper function from before we changed how we configured the client. * Minor changes to demonstrate how a logger can be used in conjunction to debug mode in the sample client.
This commit is contained in:
parent
c0d2969976
commit
4f5766b443
6 changed files with 156 additions and 63 deletions
102
realis.go
102
realis.go
|
@ -21,8 +21,10 @@ import (
|
|||
"encoding/base64"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/http/cookiejar"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
|
@ -80,8 +82,9 @@ type realisClient struct {
|
|||
client *aurora.AuroraSchedulerManagerClient
|
||||
readonlyClient *aurora.ReadOnlySchedulerClient
|
||||
adminClient *aurora.AuroraAdminClient
|
||||
logger Logger
|
||||
lock sync.Mutex
|
||||
logger LevelLogger
|
||||
lock *sync.Mutex
|
||||
debug bool
|
||||
}
|
||||
|
||||
type RealisConfig struct {
|
||||
|
@ -90,7 +93,7 @@ type RealisConfig struct {
|
|||
timeoutms int
|
||||
binTransport, jsonTransport bool
|
||||
cluster *Cluster
|
||||
backoff *Backoff
|
||||
backoff Backoff
|
||||
transport thrift.TTransport
|
||||
protoFactory thrift.TProtocolFactory
|
||||
logger Logger
|
||||
|
@ -98,6 +101,7 @@ type RealisConfig struct {
|
|||
certspath string
|
||||
clientkey, clientcert string
|
||||
options []ClientOption
|
||||
debug bool
|
||||
}
|
||||
|
||||
var defaultBackoff = Backoff{
|
||||
|
@ -141,7 +145,7 @@ func ZKUrl(url string) ClientOption {
|
|||
}
|
||||
}
|
||||
|
||||
func Retries(backoff *Backoff) ClientOption {
|
||||
func Retries(backoff Backoff) ClientOption {
|
||||
return func(config *RealisConfig) {
|
||||
config.backoff = backoff
|
||||
}
|
||||
|
@ -159,7 +163,7 @@ func ThriftBinary() ClientOption {
|
|||
}
|
||||
}
|
||||
|
||||
func BackOff(b *Backoff) ClientOption {
|
||||
func BackOff(b Backoff) ClientOption {
|
||||
return func(config *RealisConfig) {
|
||||
config.backoff = b
|
||||
}
|
||||
|
@ -183,13 +187,20 @@ func ClientCerts(clientKey, clientCert string) ClientOption {
|
|||
}
|
||||
}
|
||||
|
||||
// Using the word set to avoid name collision with Interface
|
||||
// Using the word set to avoid name collision with Interface.
|
||||
func SetLogger(l Logger) ClientOption {
|
||||
return func(config *RealisConfig) {
|
||||
config.logger = l
|
||||
}
|
||||
}
|
||||
|
||||
// Enable debug statements.
|
||||
func Debug() ClientOption {
|
||||
return func(config *RealisConfig) {
|
||||
config.debug = true
|
||||
}
|
||||
}
|
||||
|
||||
func newTJSONTransport(url string, timeout int, config *RealisConfig) (thrift.TTransport, error) {
|
||||
trans, err := defaultTTransport(url, timeout, config)
|
||||
if err != nil {
|
||||
|
@ -220,8 +231,10 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
|||
|
||||
// Default configs
|
||||
config.timeoutms = 10000
|
||||
config.backoff = &defaultBackoff
|
||||
config.logger = NoopLogger{}
|
||||
config.backoff = defaultBackoff
|
||||
config.logger = LevelLogger{NoopLogger{}, false}
|
||||
|
||||
// Save options to recreate client if a connection error happens
|
||||
config.options = options
|
||||
|
||||
// Override default configs where necessary
|
||||
|
@ -231,6 +244,11 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
|||
|
||||
config.logger.Println("Number of options applied to config: ", len(options))
|
||||
|
||||
// Set a logger if debug has been set to true but no logger has been set
|
||||
if config.logger == nil && config.debug {
|
||||
config.logger = log.New(os.Stdout, "realis: ", log.Ltime|log.Ldate|log.LUTC)
|
||||
}
|
||||
|
||||
//Set default Transport to JSON if needed.
|
||||
if !config.jsonTransport && !config.binTransport {
|
||||
config.jsonTransport = true
|
||||
|
@ -274,9 +292,10 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
|||
|
||||
config.logger.Printf("gorealis config url: %+v\n", url)
|
||||
|
||||
//Basic Authentication.
|
||||
// Adding Basic Authentication.
|
||||
if config.username != "" && config.password != "" {
|
||||
AddBasicAuth(config, config.username, config.password)
|
||||
httpTrans := (config.transport).(*thrift.THttpClient)
|
||||
httpTrans.SetHeader("Authorization", "Basic "+basicAuth(config.username, config.password))
|
||||
}
|
||||
|
||||
return &realisClient{
|
||||
|
@ -284,7 +303,8 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
|
|||
client: aurora.NewAuroraSchedulerManagerClientFactory(config.transport, config.protoFactory),
|
||||
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory),
|
||||
adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory),
|
||||
logger: config.logger}, nil
|
||||
logger: LevelLogger{config.logger, config.debug},
|
||||
lock: &sync.Mutex{}}, nil
|
||||
}
|
||||
|
||||
func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
|
||||
|
@ -404,14 +424,6 @@ func newTBinaryConfig(url string, timeoutms int, config *RealisConfig) (*RealisC
|
|||
|
||||
}
|
||||
|
||||
// Helper function to add basic authorization needed to communicate with Apache Aurora.
|
||||
func AddBasicAuth(config *RealisConfig, username string, password string) {
|
||||
config.username = username
|
||||
config.password = password
|
||||
httpTrans := (config.transport).(*thrift.THttpClient)
|
||||
httpTrans.SetHeader("Authorization", "Basic "+basicAuth(username, password))
|
||||
}
|
||||
|
||||
func basicAuth(username, password string) string {
|
||||
auth := username + ":" + password
|
||||
return base64.StdEncoding.EncodeToString([]byte(auth))
|
||||
|
@ -458,6 +470,8 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
|
|||
Statuses: states,
|
||||
}
|
||||
|
||||
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksWithoutConfigs(taskQ)
|
||||
})
|
||||
|
@ -478,6 +492,8 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
|
|||
}
|
||||
|
||||
func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error) {
|
||||
r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery)
|
||||
})
|
||||
|
@ -510,6 +526,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
|
|||
|
||||
// Kill specific instances of a job.
|
||||
func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
|
||||
r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances)
|
||||
|
||||
instanceIds := make(map[int32]bool)
|
||||
|
||||
|
@ -534,6 +551,8 @@ func (r *realisClient) RealisConfig() *RealisConfig {
|
|||
// Sends a kill message to the scheduler for all active tasks under a job.
|
||||
func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||
|
||||
r.logger.DebugPrintf("KillTasks Thrift Payload: %+v\n", key)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
|
||||
return r.client.KillTasks(key, nil, "")
|
||||
|
@ -551,6 +570,8 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
|
|||
// Use this API to create ad-hoc jobs.
|
||||
func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
|
||||
|
||||
r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.CreateJob(auroraJob.JobConfig())
|
||||
})
|
||||
|
@ -570,17 +591,18 @@ func (r *realisClient) CreateService(auroraJob Job, settings *aurora.JobUpdateSe
|
|||
|
||||
resp, err := r.StartJobUpdate(update, "")
|
||||
if err != nil {
|
||||
return resp, nil, errors.Wrap(err, "unable to create service")
|
||||
return nil, nil, errors.Wrap(err, "unable to create service")
|
||||
}
|
||||
|
||||
if resp != nil && resp.GetResult_() != nil {
|
||||
return resp, resp.GetResult_().GetStartJobUpdateResult_(), nil
|
||||
}
|
||||
|
||||
return resp, nil, errors.New("results object is nil")
|
||||
return nil, nil, errors.New("results object is nil")
|
||||
}
|
||||
|
||||
func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error) {
|
||||
r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig())
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.ScheduleCronJob(auroraJob.JobConfig())
|
||||
|
@ -594,6 +616,8 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
|
|||
|
||||
func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||
|
||||
r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.DescheduleCronJob(key)
|
||||
})
|
||||
|
@ -608,6 +632,8 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
|
|||
|
||||
func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error) {
|
||||
|
||||
r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.StartCronJob(key)
|
||||
})
|
||||
|
@ -621,6 +647,8 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
|
|||
|
||||
// Restarts specific instances specified
|
||||
func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
|
||||
r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances)
|
||||
|
||||
instanceIds := make(map[int32]bool)
|
||||
|
||||
for _, instId := range instances {
|
||||
|
@ -645,6 +673,8 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
|
|||
return nil, errors.Wrap(err1, "Could not retrieve relevant task instance IDs")
|
||||
}
|
||||
|
||||
r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instanceIds)
|
||||
|
||||
if len(instanceIds) > 0 {
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.RestartShards(key, instanceIds)
|
||||
|
@ -663,6 +693,8 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
|
|||
// Update all tasks under a job configuration. Currently gorealis doesn't support for canary deployments.
|
||||
func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*aurora.Response, error) {
|
||||
|
||||
r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.StartJobUpdate(updateJob.req, message)
|
||||
})
|
||||
|
@ -676,6 +708,8 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
|
|||
// Abort Job Update on Aurora. Requires the updateId which can be obtained on the Aurora web UI.
|
||||
func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||
|
||||
r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.AbortJobUpdate(&updateKey, message)
|
||||
})
|
||||
|
@ -689,6 +723,8 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
|
|||
//Pause Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||
func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||
|
||||
r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.PauseJobUpdate(updateKey, message)
|
||||
})
|
||||
|
@ -703,6 +739,8 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
|
|||
//Resume Paused Job Update. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||
func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||
|
||||
r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.ResumeJobUpdate(updateKey, message)
|
||||
})
|
||||
|
@ -717,6 +755,8 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
|
|||
//Pulse Job Update on Aurora. UpdateID is returned from StartJobUpdate or the Aurora web UI.
|
||||
func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.Response, error) {
|
||||
|
||||
r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.PulseJobUpdate(updateKey)
|
||||
})
|
||||
|
@ -732,6 +772,8 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
|
|||
// instance to scale up.
|
||||
func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*aurora.Response, error) {
|
||||
|
||||
r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.AddInstances(&instKey, count)
|
||||
})
|
||||
|
@ -768,6 +810,8 @@ func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora
|
|||
// Get information about task including a fully hydrated task configuration object
|
||||
func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
|
||||
|
||||
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksStatus(query)
|
||||
})
|
||||
|
@ -782,6 +826,8 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S
|
|||
// Get information about task including without a task configuration object
|
||||
func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
|
||||
|
||||
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksWithoutConfigs(query)
|
||||
})
|
||||
|
@ -808,6 +854,8 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
|
|||
Statuses: aurora.ACTIVE_STATES,
|
||||
}
|
||||
|
||||
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetTasksStatus(taskQ)
|
||||
})
|
||||
|
@ -832,6 +880,8 @@ func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.Task
|
|||
|
||||
func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aurora.Response, error) {
|
||||
|
||||
r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.GetJobUpdateDetails(&updateQuery)
|
||||
})
|
||||
|
@ -845,6 +895,8 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
|
|||
|
||||
func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string) (*aurora.Response, error) {
|
||||
|
||||
r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.client.RollbackJobUpdate(&key, message)
|
||||
})
|
||||
|
@ -872,6 +924,8 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
|
|||
drainList.HostNames[host] = true
|
||||
}
|
||||
|
||||
r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.DrainHosts(drainList)
|
||||
})
|
||||
|
@ -901,6 +955,8 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
|
|||
hostList.HostNames[host] = true
|
||||
}
|
||||
|
||||
r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList)
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
return r.adminClient.EndMaintenance(hostList)
|
||||
})
|
||||
|
@ -930,6 +986,8 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
|
|||
hostList.HostNames[host] = true
|
||||
}
|
||||
|
||||
r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList)
|
||||
|
||||
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
|
||||
// and continue trying to resend command until we run out of retries.
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
|
@ -977,7 +1035,7 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb
|
|||
func (r *realisClient) GetQuota(role string) (*aurora.Response, error) {
|
||||
|
||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||
resp, retryErr :=r.adminClient.GetQuota(role)
|
||||
resp, retryErr := r.adminClient.GetQuota(role)
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "Unable to get role quota")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue