Splitting realis into regular API and admin API files.
This commit is contained in:
parent
98f2cab4a2
commit
a8a7cf779f
2 changed files with 288 additions and 257 deletions
277
realis.go
277
realis.go
|
@ -60,7 +60,7 @@ type clientConfig struct {
|
|||
transport thrift.TTransport
|
||||
protoFactory thrift.TProtocolFactory
|
||||
logger *LevelLogger
|
||||
InsecureSkipVerify bool
|
||||
insecureSkipVerify bool
|
||||
certsPath string
|
||||
clientKey, clientCert string
|
||||
options []ClientOption
|
||||
|
@ -146,7 +146,7 @@ func BackOff(b Backoff) ClientOption {
|
|||
|
||||
func InsecureSkipVerify(InsecureSkipVerify bool) ClientOption {
|
||||
return func(config *clientConfig) {
|
||||
config.InsecureSkipVerify = InsecureSkipVerify
|
||||
config.insecureSkipVerify = InsecureSkipVerify
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -204,7 +204,12 @@ func newTJSONTransport(url string, timeout time.Duration, config *clientConfig)
|
|||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error creating realis")
|
||||
}
|
||||
httpTrans := (trans).(*thrift.THttpClient)
|
||||
|
||||
httpTrans, ok := (trans).(*thrift.THttpClient)
|
||||
if !ok {
|
||||
return nil, errors.Wrap(err, "transport does not contain a thrift client")
|
||||
}
|
||||
|
||||
httpTrans.SetHeader("Content-Type", "application/x-thrift")
|
||||
httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION)
|
||||
return trans, err
|
||||
|
@ -215,7 +220,12 @@ func newTBinTransport(url string, timeout time.Duration, config *clientConfig) (
|
|||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error creating realis")
|
||||
}
|
||||
httpTrans := (trans).(*thrift.THttpClient)
|
||||
|
||||
httpTrans, ok := (trans).(*thrift.THttpClient)
|
||||
if !ok {
|
||||
return nil, errors.Wrap(err, "transport does not contain a thrift client")
|
||||
}
|
||||
|
||||
httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient
|
||||
httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary")
|
||||
httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary")
|
||||
|
@ -287,7 +297,7 @@ func NewClient(options ...ClientOption) (*Client, error) {
|
|||
return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required")
|
||||
}
|
||||
|
||||
url, err = validateAndPopulateAuroraURL(url)
|
||||
url, err = validateAuroraAddress(url)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to create realis object, invalid url")
|
||||
}
|
||||
|
@ -315,7 +325,10 @@ func NewClient(options ...ClientOption) (*Client, error) {
|
|||
|
||||
// Adding Basic Authentication.
|
||||
if config.username != "" && config.password != "" {
|
||||
httpTrans := (config.transport).(*thrift.THttpClient)
|
||||
httpTrans, ok := (config.transport).(*thrift.THttpClient)
|
||||
if !ok {
|
||||
return nil, errors.New("transport provided does not contain an THttpClient")
|
||||
}
|
||||
httpTrans.SetHeader("Authorization", "Basic "+basicAuth(config.username, config.password))
|
||||
}
|
||||
|
||||
|
@ -354,7 +367,7 @@ func defaultTTransport(url string, timeout time.Duration, config *clientConfig)
|
|||
|
||||
if config != nil {
|
||||
tlsConfig := &tls.Config{}
|
||||
if config.InsecureSkipVerify {
|
||||
if config.insecureSkipVerify {
|
||||
tlsConfig.InsecureSkipVerify = true
|
||||
}
|
||||
if config.certsPath != "" {
|
||||
|
@ -949,253 +962,3 @@ func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) erro
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
/* Admin functions */
|
||||
// TODO(rdelvalle): Consider moving these functions to another interface. It would be a backwards incompatible change,
|
||||
// but would add safety.
|
||||
|
||||
// Set a list of nodes to DRAINING. This means nothing will be able to be scheduled on them and any existing
|
||||
// tasks will be killed and re-scheduled elsewhere in the cluster. Tasks from DRAINING nodes are not guaranteed
|
||||
// to return to running unless there is enough capacity in the cluster to run them.
|
||||
func (c *Client) DrainHosts(hosts ...string) ([]*aurora.HostStatus, error) {
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, errors.New("no hosts provided to drain")
|
||||
}
|
||||
|
||||
drainList := aurora.NewHosts()
|
||||
drainList.HostNames = hosts
|
||||
|
||||
c.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList)
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.DrainHosts(context.TODO(), drainList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "unable to recover connection")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil {
|
||||
return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil
|
||||
} else {
|
||||
return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
|
||||
}
|
||||
}
|
||||
|
||||
// Start SLA Aware Drain.
|
||||
// defaultSlaPolicy is the fallback SlaPolicy to use if a task does not have an SlaPolicy.
|
||||
// After timeoutSecs, tasks will be forcefully drained without checking SLA.
|
||||
func (c *Client) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ...string) ([]*aurora.HostStatus, error) {
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, errors.New("no hosts provided to drain")
|
||||
}
|
||||
|
||||
drainList := aurora.NewHosts()
|
||||
drainList.HostNames = hosts
|
||||
|
||||
c.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList)
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "unable to recover connection")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil {
|
||||
return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil
|
||||
} else {
|
||||
return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error) {
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, errors.New("no hosts provided to start maintenance on")
|
||||
}
|
||||
|
||||
hostList := aurora.NewHosts()
|
||||
hostList.HostNames = hosts
|
||||
|
||||
c.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList)
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.StartMaintenance(context.TODO(), hostList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "unable to recover connection")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil && resp.GetResult_().GetStartMaintenanceResult_() != nil {
|
||||
return resp.GetResult_().GetStartMaintenanceResult_().GetStatuses(), nil
|
||||
} else {
|
||||
return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) {
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, errors.New("no hosts provided to end maintenance on")
|
||||
}
|
||||
|
||||
hostList := aurora.NewHosts()
|
||||
hostList.HostNames = hosts
|
||||
|
||||
c.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList)
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.EndMaintenance(context.TODO(), hostList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "unable to recover connection")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil && resp.GetResult_().GetEndMaintenanceResult_() != nil {
|
||||
return resp.GetResult_().GetEndMaintenanceResult_().GetStatuses(), nil
|
||||
} else {
|
||||
return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusResult_, error) {
|
||||
|
||||
var result *aurora.MaintenanceStatusResult_
|
||||
|
||||
if len(hosts) == 0 {
|
||||
return nil, errors.New("no hosts provided to get maintenance status from")
|
||||
}
|
||||
|
||||
hostList := aurora.NewHosts()
|
||||
hostList.HostNames = hosts
|
||||
|
||||
c.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList)
|
||||
|
||||
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
|
||||
// and continue trying to resend command until we run out of retries.
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.MaintenanceStatus(context.TODO(), hostList)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return result, errors.Wrap(retryErr, "unable to recover connection")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
result = resp.GetResult_().GetMaintenanceStatusResult_()
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// SetQuota sets a quota aggregate for the given role
|
||||
// TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu`
|
||||
func (c *Client) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) error {
|
||||
ramResource := aurora.NewResource()
|
||||
ramResource.RamMb = ramMb
|
||||
cpuResource := aurora.NewResource()
|
||||
cpuResource.NumCpus = cpu
|
||||
diskResource := aurora.NewResource()
|
||||
diskResource.DiskMb = diskMb
|
||||
|
||||
quota := aurora.NewResourceAggregate()
|
||||
quota.Resources = []*aurora.Resource{ramResource, cpuResource, diskResource}
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.SetQuota(context.TODO(), role, quota)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return errors.Wrap(retryErr, "unable to set role quota")
|
||||
}
|
||||
return retryErr
|
||||
|
||||
}
|
||||
|
||||
// GetQuota returns the resource aggregate for the given role
|
||||
func (c *Client) GetQuota(role string) (*aurora.GetQuotaResult_, error) {
|
||||
|
||||
resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.GetQuota(context.TODO(), role)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return nil, errors.Wrap(retryErr, "unable to get role quota")
|
||||
}
|
||||
|
||||
if resp.GetResult_() != nil {
|
||||
return resp.GetResult_().GetGetQuotaResult_(), nil
|
||||
} else {
|
||||
return nil, errors.New("thrift error: Field in response is nil unexpectedly.")
|
||||
}
|
||||
}
|
||||
|
||||
// Force Aurora Scheduler to perform a snapshot and write to Mesos log
|
||||
func (c *Client) Snapshot() error {
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.Snapshot(context.TODO())
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return errors.Wrap(retryErr, "unable to recover connection")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Force Aurora Scheduler to write backup file to a file in the backup directory
|
||||
func (c *Client) PerformBackup() error {
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.PerformBackup(context.TODO())
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return errors.Wrap(retryErr, "unable to recover connection")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Force an Implicit reconciliation between Mesos and Aurora
|
||||
func (c *Client) ForceImplicitTaskReconciliation() error {
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.TriggerImplicitTaskReconciliation(context.TODO())
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return errors.Wrap(retryErr, "unable to recover connection")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Force an Explicit reconciliation between Mesos and Aurora
|
||||
func (c *Client) ForceExplicitTaskReconciliation(batchSize *int32) error {
|
||||
|
||||
if batchSize != nil && *batchSize < 1 {
|
||||
return errors.New("invalid batch size.")
|
||||
}
|
||||
settings := aurora.NewExplicitReconciliationSettings()
|
||||
|
||||
settings.BatchSize = batchSize
|
||||
|
||||
_, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) {
|
||||
return c.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings)
|
||||
})
|
||||
|
||||
if retryErr != nil {
|
||||
return errors.Wrap(retryErr, "unable to recover connection")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue