package realis import ( "context" "github.com/paypal/gorealis/gen-go/apache/aurora" "github.com/pkg/errors" ) // TODO(rdelvalle): Consider moving these functions to another interface. It would be a backwards incompatible change, // but would add safety. // Set a list of nodes to DRAINING. This means nothing will be able to be scheduled on them and any existing // tasks will be killed and re-scheduled elsewhere in the cluster. Tasks from DRAINING nodes are not guaranteed // to return to running unless there is enough capacity in the cluster to run them. func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error) { var result *aurora.DrainHostsResult_ if len(hosts) == 0 { return nil, nil, errors.New("no hosts provided to drain") } drainList := aurora.NewHosts() drainList.HostNames = hosts r.logger.debugPrintf("DrainHosts Thrift Payload: %v\n", drainList) resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { return r.adminClient.DrainHosts(context.TODO(), drainList) }, nil, ) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") } if resp.GetResult_() != nil { result = resp.GetResult_().GetDrainHostsResult_() } return resp, result, nil } // Start SLA Aware Drain. // defaultSlaPolicy is the fallback SlaPolicy to use if a task does not have an SlaPolicy. // After timeoutSecs, tasks will be forcefully drained without checking SLA. func (r *realisClient) SLADrainHosts( policy *aurora.SlaPolicy, timeout int64, hosts ...string) (*aurora.DrainHostsResult_, error) { var result *aurora.DrainHostsResult_ if len(hosts) == 0 { return nil, errors.New("no hosts provided to drain") } if policy == nil || policy.CountSetFieldsSlaPolicy() == 0 { policy = &defaultSlaPolicy r.logger.Printf("Warning: start draining with default sla policy %v", policy) } if timeout < 0 { r.logger.Printf("Warning: timeout %d secs is invalid, draining with default timeout %d secs", timeout, defaultSlaDrainTimeoutSecs) timeout = defaultSlaDrainTimeoutSecs } drainList := aurora.NewHosts() drainList.HostNames = hosts r.logger.debugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList) resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { return r.adminClient.SlaDrainHosts(context.TODO(), drainList, policy, timeout) }, nil, ) if retryErr != nil { return result, errors.Wrap(retryErr, "Unable to recover connection") } if resp.GetResult_() != nil { result = resp.GetResult_().GetDrainHostsResult_() } return result, nil } func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aurora.StartMaintenanceResult_, error) { var result *aurora.StartMaintenanceResult_ if len(hosts) == 0 { return nil, nil, errors.New("no hosts provided to start maintenance on") } hostList := aurora.NewHosts() hostList.HostNames = hosts r.logger.debugPrintf("StartMaintenance Thrift Payload: %v\n", hostList) resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { return r.adminClient.StartMaintenance(context.TODO(), hostList) }, nil, ) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") } if resp.GetResult_() != nil { result = resp.GetResult_().GetStartMaintenanceResult_() } return resp, result, nil } func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error) { var result *aurora.EndMaintenanceResult_ if len(hosts) == 0 { return nil, nil, errors.New("no hosts provided to end maintenance on") } hostList := aurora.NewHosts() hostList.HostNames = hosts r.logger.debugPrintf("EndMaintenance Thrift Payload: %v\n", hostList) resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { return r.adminClient.EndMaintenance(context.TODO(), hostList) }, nil, ) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") } if resp.GetResult_() != nil { result = resp.GetResult_().GetEndMaintenanceResult_() } return resp, result, nil } func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error) { var result *aurora.MaintenanceStatusResult_ if len(hosts) == 0 { return nil, nil, errors.New("no hosts provided to get maintenance status from") } hostList := aurora.NewHosts() hostList.HostNames = hosts r.logger.debugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList) // Make thrift call. If we encounter an error sending the call, attempt to reconnect // and continue trying to resend command until we run out of retries. resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { return r.adminClient.MaintenanceStatus(context.TODO(), hostList) }, nil, ) if retryErr != nil { return resp, result, errors.Wrap(retryErr, "Unable to recover connection") } if resp.GetResult_() != nil { result = resp.GetResult_().GetMaintenanceStatusResult_() } return resp, result, nil } // SetQuota sets a quota aggregate for the given role // TODO(zircote) Currently investigating an error that is returned // from thrift calls that include resources for `NamedPort` and `NumGpu` func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) (*aurora.Response, error) { quota := &aurora.ResourceAggregate{ Resources: []*aurora.Resource{{NumCpus: cpu}, {RamMb: ramMb}, {DiskMb: diskMb}}, } resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { return r.adminClient.SetQuota(context.TODO(), role, quota) }, nil, ) if retryErr != nil { return resp, errors.Wrap(retryErr, "Unable to set role quota") } return resp, retryErr } // GetQuota returns the resource aggregate for the given role func (r *realisClient) GetQuota(role string) (*aurora.Response, error) { resp, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { return r.adminClient.GetQuota(context.TODO(), role) }, nil, ) if retryErr != nil { return resp, errors.Wrap(retryErr, "Unable to get role quota") } return resp, retryErr } // Force Aurora Scheduler to perform a snapshot and write to Mesos log func (r *realisClient) Snapshot() error { _, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { return r.adminClient.Snapshot(context.TODO()) }, nil, ) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") } return nil } // Force Aurora Scheduler to write backup file to a file in the backup directory func (r *realisClient) PerformBackup() error { _, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { return r.adminClient.PerformBackup(context.TODO()) }, nil, ) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") } return nil } func (r *realisClient) ForceImplicitTaskReconciliation() error { _, retryErr := r.thriftCallWithRetries( false, func() (*aurora.Response, error) { return r.adminClient.TriggerImplicitTaskReconciliation(context.TODO()) }, nil, ) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") } return nil } func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error { if batchSize != nil && *batchSize < 1 { return errors.New("invalid batch size") } settings := aurora.NewExplicitReconciliationSettings() settings.BatchSize = batchSize _, retryErr := r.thriftCallWithRetries(false, func() (*aurora.Response, error) { return r.adminClient.TriggerExplicitTaskReconciliation(context.TODO(), settings) }, nil, ) if retryErr != nil { return errors.Wrap(retryErr, "Unable to recover connection") } return nil }