From a8a7cf779f1addcf458c898bf14c0704bd555375 Mon Sep 17 00:00:00 2001 From: Renan DelValle Date: Thu, 12 Sep 2019 11:10:55 -0700 Subject: [PATCH] Splitting realis into regular API and admin API files. --- realis.go | 277 ++++-------------------------------------------- realis_admin.go | 268 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 288 insertions(+), 257 deletions(-) create mode 100644 realis_admin.go diff --git a/realis.go b/realis.go index 002cd56..1bcff71 100644 --- a/realis.go +++ b/realis.go @@ -60,7 +60,7 @@ type clientConfig struct { transport thrift.TTransport protoFactory thrift.TProtocolFactory logger *LevelLogger - InsecureSkipVerify bool + insecureSkipVerify bool certsPath string clientKey, clientCert string options []ClientOption @@ -146,7 +146,7 @@ func BackOff(b Backoff) ClientOption { func InsecureSkipVerify(InsecureSkipVerify bool) ClientOption { return func(config *clientConfig) { - config.InsecureSkipVerify = InsecureSkipVerify + config.insecureSkipVerify = InsecureSkipVerify } } @@ -204,7 +204,12 @@ func newTJSONTransport(url string, timeout time.Duration, config *clientConfig) if err != nil { return nil, errors.Wrap(err, "error creating realis") } - httpTrans := (trans).(*thrift.THttpClient) + + httpTrans, ok := (trans).(*thrift.THttpClient) + if !ok { + return nil, errors.Wrap(err, "transport does not contain a thrift client") + } + httpTrans.SetHeader("Content-Type", "application/x-thrift") httpTrans.SetHeader("User-Agent", "gorealis v"+VERSION) return trans, err @@ -215,7 +220,12 @@ func newTBinTransport(url string, timeout time.Duration, config *clientConfig) ( if err != nil { return nil, errors.Wrap(err, "error creating realis") } - httpTrans := (trans).(*thrift.THttpClient) + + httpTrans, ok := (trans).(*thrift.THttpClient) + if !ok { + return nil, errors.Wrap(err, "transport does not contain a thrift client") + } + httpTrans.DelHeader("Content-Type") // Workaround for using thrift HttpPostClient httpTrans.SetHeader("Accept", "application/vnd.apache.thrift.binary") httpTrans.SetHeader("Content-Type", "application/vnd.apache.thrift.binary") @@ -287,7 +297,7 @@ func NewClient(options ...ClientOption) (*Client, error) { return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required") } - url, err = validateAndPopulateAuroraURL(url) + url, err = validateAuroraAddress(url) if err != nil { return nil, errors.Wrap(err, "unable to create realis object, invalid url") } @@ -315,7 +325,10 @@ func NewClient(options ...ClientOption) (*Client, error) { // Adding Basic Authentication. if config.username != "" && config.password != "" { - httpTrans := (config.transport).(*thrift.THttpClient) + httpTrans, ok := (config.transport).(*thrift.THttpClient) + if !ok { + return nil, errors.New("transport provided does not contain an THttpClient") + } httpTrans.SetHeader("Authorization", "Basic "+basicAuth(config.username, config.password)) } @@ -354,7 +367,7 @@ func defaultTTransport(url string, timeout time.Duration, config *clientConfig) if config != nil { tlsConfig := &tls.Config{} - if config.InsecureSkipVerify { + if config.insecureSkipVerify { tlsConfig.InsecureSkipVerify = true } if config.certsPath != "" { @@ -949,253 +962,3 @@ func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) erro } return nil } - -/* Admin functions */ -// TODO(rdelvalle): Consider moving these functions to another interface. It would be a backwards incompatible change, -// but would add safety. - -// Set a list of nodes to DRAINING. This means nothing will be able to be scheduled on them and any existing -// tasks will be killed and re-scheduled elsewhere in the cluster. Tasks from DRAINING nodes are not guaranteed -// to return to running unless there is enough capacity in the cluster to run them. -func (c *Client) DrainHosts(hosts ...string) ([]*aurora.HostStatus, error) { - - if len(hosts) == 0 { - return nil, errors.New("no hosts provided to drain") - } - - drainList := aurora.NewHosts() - drainList.HostNames = hosts - - c.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList) - - resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { - return c.adminClient.DrainHosts(context.TODO(), drainList) - }) - - if retryErr != nil { - return nil, errors.Wrap(retryErr, "unable to recover connection") - } - - if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil { - return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil - } else { - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") - } -} - -// Start SLA Aware Drain. -// defaultSlaPolicy is the fallback SlaPolicy to use if a task does not have an SlaPolicy. -// After timeoutSecs, tasks will be forcefully drained without checking SLA. -func (c *Client) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ...string) ([]*aurora.HostStatus, error) { - - if len(hosts) == 0 { - return nil, errors.New("no hosts provided to drain") - } - - drainList := aurora.NewHosts() - drainList.HostNames = hosts - - c.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList) - - resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { - return c.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout) - }) - - if retryErr != nil { - return nil, errors.Wrap(retryErr, "unable to recover connection") - } - - if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil { - return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil - } else { - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") - } -} - -func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { - - if len(hosts) == 0 { - return nil, errors.New("no hosts provided to start maintenance on") - } - - hostList := aurora.NewHosts() - hostList.HostNames = hosts - - c.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList) - - resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { - return c.adminClient.StartMaintenance(context.TODO(), hostList) - }) - - if retryErr != nil { - return nil, errors.Wrap(retryErr, "unable to recover connection") - } - - if resp.GetResult_() != nil && resp.GetResult_().GetStartMaintenanceResult_() != nil { - return resp.GetResult_().GetStartMaintenanceResult_().GetStatuses(), nil - } else { - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") - } -} - -func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { - - if len(hosts) == 0 { - return nil, errors.New("no hosts provided to end maintenance on") - } - - hostList := aurora.NewHosts() - hostList.HostNames = hosts - - c.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList) - - resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { - return c.adminClient.EndMaintenance(context.TODO(), hostList) - }) - - if retryErr != nil { - return nil, errors.Wrap(retryErr, "unable to recover connection") - } - - if resp.GetResult_() != nil && resp.GetResult_().GetEndMaintenanceResult_() != nil { - return resp.GetResult_().GetEndMaintenanceResult_().GetStatuses(), nil - } else { - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") - } - -} - -func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusResult_, error) { - - var result *aurora.MaintenanceStatusResult_ - - if len(hosts) == 0 { - return nil, errors.New("no hosts provided to get maintenance status from") - } - - hostList := aurora.NewHosts() - hostList.HostNames = hosts - - c.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList) - - // Make thrift call. If we encounter an error sending the call, attempt to reconnect - // and continue trying to resend command until we run out of retries. - resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { - return c.adminClient.MaintenanceStatus(context.TODO(), hostList) - }) - - if retryErr != nil { - return result, errors.Wrap(retryErr, "unable to recover connection") - } - - if resp.GetResult_() != nil { - result = resp.GetResult_().GetMaintenanceStatusResult_() - } - - return result, nil -} - -// SetQuota sets a quota aggregate for the given role -// TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu` -func (c *Client) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) error { - ramResource := aurora.NewResource() - ramResource.RamMb = ramMb - cpuResource := aurora.NewResource() - cpuResource.NumCpus = cpu - diskResource := aurora.NewResource() - diskResource.DiskMb = diskMb - - quota := aurora.NewResourceAggregate() - quota.Resources = []*aurora.Resource{ramResource, cpuResource, diskResource} - - _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { - return c.adminClient.SetQuota(context.TODO(), role, quota) - }) - - if retryErr != nil { - return errors.Wrap(retryErr, "unable to set role quota") - } - return retryErr - -} - -// GetQuota returns the resource aggregate for the given role -func (c *Client) GetQuota(role string) (*aurora.GetQuotaResult_, error) { - - resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { - return c.adminClient.GetQuota(context.TODO(), role) - }) - - if retryErr != nil { - return nil, errors.Wrap(retryErr, "unable to get role quota") - } - - if resp.GetResult_() != nil { - return resp.GetResult_().GetGetQuotaResult_(), nil - } else { - return nil, errors.New("thrift error: Field in response is nil unexpectedly.") - } -} - -// Force Aurora Scheduler to perform a snapshot and write to Mesos log -func (c *Client) Snapshot() error { - - _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { - return c.adminClient.Snapshot(context.TODO()) - }) - - if retryErr != nil { - return errors.Wrap(retryErr, "unable to recover connection") - } - - return nil -} - -// Force Aurora Scheduler to write backup file to a file in the backup directory -func (c *Client) PerformBackup() error { - - _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { - return c.adminClient.PerformBackup(context.TODO()) - }) - - if retryErr != nil { - return errors.Wrap(retryErr, "unable to recover connection") - } - - return nil -} - -// Force an Implicit reconciliation between Mesos and Aurora -func (c *Client) ForceImplicitTaskReconciliation() error { - - _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { - return c.adminClient.TriggerImplicitTaskReconciliation(context.TODO()) - }) - - if retryErr != nil { - return errors.Wrap(retryErr, "unable to recover connection") - } - - return nil -} - -// Force an Explicit reconciliation between Mesos and Aurora -func (c *Client) ForceExplicitTaskReconciliation(batchSize *int32) error { - - if batchSize != nil && *batchSize < 1 { - return errors.New("invalid batch size.") - } - settings := aurora.NewExplicitReconciliationSettings() - - settings.BatchSize = batchSize - - _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { - return c.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings) - }) - - if retryErr != nil { - return errors.Wrap(retryErr, "unable to recover connection") - } - - return nil -} diff --git a/realis_admin.go b/realis_admin.go new file mode 100644 index 0000000..25dfb56 --- /dev/null +++ b/realis_admin.go @@ -0,0 +1,268 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package realis + +import ( + "context" + + "github.com/paypal/gorealis/v2/gen-go/apache/aurora" + "github.com/pkg/errors" +) + +// Set a list of nodes to DRAINING. This means nothing will be able to be scheduled on them and any existing +// tasks will be killed and re-scheduled elsewhere in the cluster. Tasks from DRAINING nodes are not guaranteed +// to return to running unless there is enough capacity in the cluster to run them. +func (c *Client) DrainHosts(hosts ...string) ([]*aurora.HostStatus, error) { + + if len(hosts) == 0 { + return nil, errors.New("no hosts provided to drain") + } + + drainList := aurora.NewHosts() + drainList.HostNames = hosts + + c.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList) + + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.DrainHosts(context.TODO(), drainList) + }) + + if retryErr != nil { + return nil, errors.Wrap(retryErr, "unable to recover connection") + } + + if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil { + return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil + } else { + return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + } +} + +// Start SLA Aware Drain. +// defaultSlaPolicy is the fallback SlaPolicy to use if a task does not have an SlaPolicy. +// After timeoutSecs, tasks will be forcefully drained without checking SLA. +func (c *Client) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ...string) ([]*aurora.HostStatus, error) { + + if len(hosts) == 0 { + return nil, errors.New("no hosts provided to drain") + } + + drainList := aurora.NewHosts() + drainList.HostNames = hosts + + c.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList) + + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout) + }) + + if retryErr != nil { + return nil, errors.Wrap(retryErr, "unable to recover connection") + } + + if resp.GetResult_() != nil && resp.GetResult_().GetDrainHostsResult_() != nil { + return resp.GetResult_().GetDrainHostsResult_().GetStatuses(), nil + } else { + return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + } +} + +func (c *Client) StartMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { + + if len(hosts) == 0 { + return nil, errors.New("no hosts provided to start maintenance on") + } + + hostList := aurora.NewHosts() + hostList.HostNames = hosts + + c.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList) + + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.StartMaintenance(context.TODO(), hostList) + }) + + if retryErr != nil { + return nil, errors.Wrap(retryErr, "unable to recover connection") + } + + if resp.GetResult_() != nil && resp.GetResult_().GetStartMaintenanceResult_() != nil { + return resp.GetResult_().GetStartMaintenanceResult_().GetStatuses(), nil + } else { + return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + } +} + +func (c *Client) EndMaintenance(hosts ...string) ([]*aurora.HostStatus, error) { + + if len(hosts) == 0 { + return nil, errors.New("no hosts provided to end maintenance on") + } + + hostList := aurora.NewHosts() + hostList.HostNames = hosts + + c.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList) + + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.EndMaintenance(context.TODO(), hostList) + }) + + if retryErr != nil { + return nil, errors.Wrap(retryErr, "unable to recover connection") + } + + if resp.GetResult_() != nil && resp.GetResult_().GetEndMaintenanceResult_() != nil { + return resp.GetResult_().GetEndMaintenanceResult_().GetStatuses(), nil + } else { + return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + } + +} + +func (c *Client) MaintenanceStatus(hosts ...string) (*aurora.MaintenanceStatusResult_, error) { + + var result *aurora.MaintenanceStatusResult_ + + if len(hosts) == 0 { + return nil, errors.New("no hosts provided to get maintenance status from") + } + + hostList := aurora.NewHosts() + hostList.HostNames = hosts + + c.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList) + + // Make thrift call. If we encounter an error sending the call, attempt to reconnect + // and continue trying to resend command until we run out of retries. + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.MaintenanceStatus(context.TODO(), hostList) + }) + + if retryErr != nil { + return result, errors.Wrap(retryErr, "unable to recover connection") + } + + if resp.GetResult_() != nil { + result = resp.GetResult_().GetMaintenanceStatusResult_() + } + + return result, nil +} + +// SetQuota sets a quota aggregate for the given role +// TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu` +func (c *Client) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) error { + ramResource := aurora.NewResource() + ramResource.RamMb = ramMb + cpuResource := aurora.NewResource() + cpuResource.NumCpus = cpu + diskResource := aurora.NewResource() + diskResource.DiskMb = diskMb + + quota := aurora.NewResourceAggregate() + quota.Resources = []*aurora.Resource{ramResource, cpuResource, diskResource} + + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.SetQuota(context.TODO(), role, quota) + }) + + if retryErr != nil { + return errors.Wrap(retryErr, "unable to set role quota") + } + return retryErr + +} + +// GetQuota returns the resource aggregate for the given role +func (c *Client) GetQuota(role string) (*aurora.GetQuotaResult_, error) { + + resp, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.GetQuota(context.TODO(), role) + }) + + if retryErr != nil { + return nil, errors.Wrap(retryErr, "unable to get role quota") + } + + if resp.GetResult_() != nil { + return resp.GetResult_().GetGetQuotaResult_(), nil + } else { + return nil, errors.New("thrift error: Field in response is nil unexpectedly.") + } +} + +// Force Aurora Scheduler to perform a snapshot and write to Mesos log +func (c *Client) Snapshot() error { + + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.Snapshot(context.TODO()) + }) + + if retryErr != nil { + return errors.Wrap(retryErr, "unable to recover connection") + } + + return nil +} + +// Force Aurora Scheduler to write backup file to a file in the backup directory +func (c *Client) PerformBackup() error { + + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.PerformBackup(context.TODO()) + }) + + if retryErr != nil { + return errors.Wrap(retryErr, "unable to recover connection") + } + + return nil +} + +// Force an Implicit reconciliation between Mesos and Aurora +func (c *Client) ForceImplicitTaskReconciliation() error { + + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.TriggerImplicitTaskReconciliation(context.TODO()) + }) + + if retryErr != nil { + return errors.Wrap(retryErr, "unable to recover connection") + } + + return nil +} + +// Force an Explicit reconciliation between Mesos and Aurora +func (c *Client) ForceExplicitTaskReconciliation(batchSize *int32) error { + + if batchSize != nil && *batchSize < 1 { + return errors.New("invalid batch size.") + } + settings := aurora.NewExplicitReconciliationSettings() + + settings.BatchSize = batchSize + + _, retryErr := c.thriftCallWithRetries(false, func() (*aurora.Response, error) { + return c.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings) + }) + + if retryErr != nil { + return errors.Wrap(retryErr, "unable to recover connection") + } + + return nil +}