Moving admin functions to a new file. They are still part of the same pointer receiver type.
This commit is contained in:
parent
4520fb2a2a
commit
986e41aa62
2 changed files with 252 additions and 247 deletions
247
realis.go
247
realis.go
|
@ -988,250 +988,3 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
|
||||||
}
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Admin functions */
|
|
||||||
// TODO(rdelvalle): Consider moving these functions to another interface. It would be a backwards incompatible change,
|
|
||||||
// but would add safety.
|
|
||||||
|
|
||||||
// Set a list of nodes to DRAINING. This means nothing will be able to be scheduled on them and any existing
|
|
||||||
// tasks will be killed and re-scheduled elsewhere in the cluster. Tasks from DRAINING nodes are not guaranteed
|
|
||||||
// to return to running unless there is enough capacity in the cluster to run them.
|
|
||||||
func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error) {
|
|
||||||
|
|
||||||
var result *aurora.DrainHostsResult_
|
|
||||||
|
|
||||||
if len(hosts) == 0 {
|
|
||||||
return nil, nil, errors.New("no hosts provided to drain")
|
|
||||||
}
|
|
||||||
|
|
||||||
drainList := aurora.NewHosts()
|
|
||||||
drainList.HostNames = hosts
|
|
||||||
|
|
||||||
r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList)
|
|
||||||
|
|
||||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
|
||||||
return r.adminClient.DrainHosts(nil, drainList)
|
|
||||||
})
|
|
||||||
|
|
||||||
if retryErr != nil {
|
|
||||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.GetResult_() != nil {
|
|
||||||
result = resp.GetResult_().GetDrainHostsResult_()
|
|
||||||
}
|
|
||||||
|
|
||||||
return resp, result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start SLA Aware Drain.
|
|
||||||
// defaultSlaPolicy is the fallback SlaPolicy to use if a task does not have an SlaPolicy.
|
|
||||||
// After timeoutSecs, tasks will be forcefully drained without checking SLA.
|
|
||||||
func (r *realisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ...string) (*aurora.DrainHostsResult_, error) {
|
|
||||||
var result *aurora.DrainHostsResult_
|
|
||||||
|
|
||||||
if len(hosts) == 0 {
|
|
||||||
return nil, errors.New("no hosts provided to drain")
|
|
||||||
}
|
|
||||||
|
|
||||||
drainList := aurora.NewHosts()
|
|
||||||
drainList.HostNames = hosts
|
|
||||||
|
|
||||||
r.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList)
|
|
||||||
|
|
||||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
|
||||||
return r.adminClient.SlaDrainHosts(nil, drainList, policy, timeout)
|
|
||||||
})
|
|
||||||
|
|
||||||
if retryErr != nil {
|
|
||||||
return result, errors.Wrap(retryErr, "Unable to recover connection")
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.GetResult_() != nil {
|
|
||||||
result = resp.GetResult_().GetDrainHostsResult_()
|
|
||||||
}
|
|
||||||
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aurora.StartMaintenanceResult_, error) {
|
|
||||||
|
|
||||||
var result *aurora.StartMaintenanceResult_
|
|
||||||
|
|
||||||
if len(hosts) == 0 {
|
|
||||||
return nil, nil, errors.New("no hosts provided to start maintenance on")
|
|
||||||
}
|
|
||||||
|
|
||||||
hostList := aurora.NewHosts()
|
|
||||||
hostList.HostNames = hosts
|
|
||||||
|
|
||||||
r.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList)
|
|
||||||
|
|
||||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
|
||||||
return r.adminClient.StartMaintenance(nil, hostList)
|
|
||||||
})
|
|
||||||
|
|
||||||
if retryErr != nil {
|
|
||||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.GetResult_() != nil {
|
|
||||||
result = resp.GetResult_().GetStartMaintenanceResult_()
|
|
||||||
}
|
|
||||||
|
|
||||||
return resp, result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error) {
|
|
||||||
|
|
||||||
var result *aurora.EndMaintenanceResult_
|
|
||||||
|
|
||||||
if len(hosts) == 0 {
|
|
||||||
return nil, nil, errors.New("no hosts provided to end maintenance on")
|
|
||||||
}
|
|
||||||
|
|
||||||
hostList := aurora.NewHosts()
|
|
||||||
hostList.HostNames = hosts
|
|
||||||
|
|
||||||
r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList)
|
|
||||||
|
|
||||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
|
||||||
return r.adminClient.EndMaintenance(nil, hostList)
|
|
||||||
})
|
|
||||||
|
|
||||||
if retryErr != nil {
|
|
||||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.GetResult_() != nil {
|
|
||||||
result = resp.GetResult_().GetEndMaintenanceResult_()
|
|
||||||
}
|
|
||||||
|
|
||||||
return resp, result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error) {
|
|
||||||
|
|
||||||
var result *aurora.MaintenanceStatusResult_
|
|
||||||
|
|
||||||
if len(hosts) == 0 {
|
|
||||||
return nil, nil, errors.New("no hosts provided to get maintenance status from")
|
|
||||||
}
|
|
||||||
|
|
||||||
hostList := aurora.NewHosts()
|
|
||||||
hostList.HostNames = hosts
|
|
||||||
|
|
||||||
r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList)
|
|
||||||
|
|
||||||
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
|
|
||||||
// and continue trying to resend command until we run out of retries.
|
|
||||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
|
||||||
return r.adminClient.MaintenanceStatus(nil, hostList)
|
|
||||||
})
|
|
||||||
|
|
||||||
if retryErr != nil {
|
|
||||||
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.GetResult_() != nil {
|
|
||||||
result = resp.GetResult_().GetMaintenanceStatusResult_()
|
|
||||||
}
|
|
||||||
|
|
||||||
return resp, result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetQuota sets a quota aggregate for the given role
|
|
||||||
// TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu`
|
|
||||||
func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) (*aurora.Response, error) {
|
|
||||||
ramRes := aurora.NewResource()
|
|
||||||
ramRes.RamMb = ramMb
|
|
||||||
cpuRes := aurora.NewResource()
|
|
||||||
cpuRes.NumCpus = cpu
|
|
||||||
diskRes := aurora.NewResource()
|
|
||||||
diskRes.DiskMb = diskMb
|
|
||||||
quota := aurora.NewResourceAggregate()
|
|
||||||
quota.Resources = []*aurora.Resource{cpuRes, ramRes, diskRes}
|
|
||||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
|
||||||
return r.adminClient.SetQuota(nil, role, quota)
|
|
||||||
})
|
|
||||||
|
|
||||||
if retryErr != nil {
|
|
||||||
return resp, errors.Wrap(retryErr, "Unable to set role quota")
|
|
||||||
}
|
|
||||||
return resp, retryErr
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetQuota returns the resource aggregate for the given role
|
|
||||||
func (r *realisClient) GetQuota(role string) (*aurora.Response, error) {
|
|
||||||
|
|
||||||
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
|
||||||
return r.adminClient.GetQuota(nil, role)
|
|
||||||
})
|
|
||||||
|
|
||||||
if retryErr != nil {
|
|
||||||
return resp, errors.Wrap(retryErr, "Unable to get role quota")
|
|
||||||
}
|
|
||||||
return resp, retryErr
|
|
||||||
}
|
|
||||||
|
|
||||||
// Force Aurora Scheduler to perform a snapshot and write to Mesos log
|
|
||||||
func (r *realisClient) Snapshot() error {
|
|
||||||
|
|
||||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
|
||||||
return r.adminClient.Snapshot(nil)
|
|
||||||
})
|
|
||||||
|
|
||||||
if retryErr != nil {
|
|
||||||
return errors.Wrap(retryErr, "Unable to recover connection")
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Force Aurora Scheduler to write backup file to a file in the backup directory
|
|
||||||
func (r *realisClient) PerformBackup() error {
|
|
||||||
|
|
||||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
|
||||||
return r.adminClient.PerformBackup(nil)
|
|
||||||
})
|
|
||||||
|
|
||||||
if retryErr != nil {
|
|
||||||
return errors.Wrap(retryErr, "Unable to recover connection")
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *realisClient) ForceImplicitTaskReconciliation() error {
|
|
||||||
|
|
||||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
|
||||||
return r.adminClient.TriggerImplicitTaskReconciliation(nil)
|
|
||||||
})
|
|
||||||
|
|
||||||
if retryErr != nil {
|
|
||||||
return errors.Wrap(retryErr, "Unable to recover connection")
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error {
|
|
||||||
|
|
||||||
if batchSize != nil && *batchSize < 1 {
|
|
||||||
return errors.New("Invalid batch size.")
|
|
||||||
}
|
|
||||||
settings := aurora.NewExplicitReconciliationSettings()
|
|
||||||
|
|
||||||
settings.BatchSize = batchSize
|
|
||||||
|
|
||||||
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
|
||||||
return r.adminClient.TriggerExplicitTaskReconciliation(nil, settings)
|
|
||||||
})
|
|
||||||
|
|
||||||
if retryErr != nil {
|
|
||||||
return errors.Wrap(retryErr, "Unable to recover connection")
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
252
realis_admin.go
Normal file
252
realis_admin.go
Normal file
|
@ -0,0 +1,252 @@
|
||||||
|
package realis
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/paypal/gorealis/gen-go/apache/aurora"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO(rdelvalle): Consider moving these functions to another interface. It would be a backwards incompatible change,
|
||||||
|
// but would add safety.
|
||||||
|
|
||||||
|
// Set a list of nodes to DRAINING. This means nothing will be able to be scheduled on them and any existing
|
||||||
|
// tasks will be killed and re-scheduled elsewhere in the cluster. Tasks from DRAINING nodes are not guaranteed
|
||||||
|
// to return to running unless there is enough capacity in the cluster to run them.
|
||||||
|
func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.DrainHostsResult_, error) {
|
||||||
|
|
||||||
|
var result *aurora.DrainHostsResult_
|
||||||
|
|
||||||
|
if len(hosts) == 0 {
|
||||||
|
return nil, nil, errors.New("no hosts provided to drain")
|
||||||
|
}
|
||||||
|
|
||||||
|
drainList := aurora.NewHosts()
|
||||||
|
drainList.HostNames = hosts
|
||||||
|
|
||||||
|
r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList)
|
||||||
|
|
||||||
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
|
return r.adminClient.DrainHosts(nil, drainList)
|
||||||
|
})
|
||||||
|
|
||||||
|
if retryErr != nil {
|
||||||
|
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.GetResult_() != nil {
|
||||||
|
result = resp.GetResult_().GetDrainHostsResult_()
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start SLA Aware Drain.
|
||||||
|
// defaultSlaPolicy is the fallback SlaPolicy to use if a task does not have an SlaPolicy.
|
||||||
|
// After timeoutSecs, tasks will be forcefully drained without checking SLA.
|
||||||
|
func (r *realisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, hosts ...string) (*aurora.DrainHostsResult_, error) {
|
||||||
|
var result *aurora.DrainHostsResult_
|
||||||
|
|
||||||
|
if len(hosts) == 0 {
|
||||||
|
return nil, errors.New("no hosts provided to drain")
|
||||||
|
}
|
||||||
|
|
||||||
|
drainList := aurora.NewHosts()
|
||||||
|
drainList.HostNames = hosts
|
||||||
|
|
||||||
|
r.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList)
|
||||||
|
|
||||||
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
|
return r.adminClient.SlaDrainHosts(nil, drainList, policy, timeout)
|
||||||
|
})
|
||||||
|
|
||||||
|
if retryErr != nil {
|
||||||
|
return result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.GetResult_() != nil {
|
||||||
|
result = resp.GetResult_().GetDrainHostsResult_()
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aurora.StartMaintenanceResult_, error) {
|
||||||
|
|
||||||
|
var result *aurora.StartMaintenanceResult_
|
||||||
|
|
||||||
|
if len(hosts) == 0 {
|
||||||
|
return nil, nil, errors.New("no hosts provided to start maintenance on")
|
||||||
|
}
|
||||||
|
|
||||||
|
hostList := aurora.NewHosts()
|
||||||
|
hostList.HostNames = hosts
|
||||||
|
|
||||||
|
r.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList)
|
||||||
|
|
||||||
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
|
return r.adminClient.StartMaintenance(nil, hostList)
|
||||||
|
})
|
||||||
|
|
||||||
|
if retryErr != nil {
|
||||||
|
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.GetResult_() != nil {
|
||||||
|
result = resp.GetResult_().GetStartMaintenanceResult_()
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *aurora.EndMaintenanceResult_, error) {
|
||||||
|
|
||||||
|
var result *aurora.EndMaintenanceResult_
|
||||||
|
|
||||||
|
if len(hosts) == 0 {
|
||||||
|
return nil, nil, errors.New("no hosts provided to end maintenance on")
|
||||||
|
}
|
||||||
|
|
||||||
|
hostList := aurora.NewHosts()
|
||||||
|
hostList.HostNames = hosts
|
||||||
|
|
||||||
|
r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList)
|
||||||
|
|
||||||
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
|
return r.adminClient.EndMaintenance(nil, hostList)
|
||||||
|
})
|
||||||
|
|
||||||
|
if retryErr != nil {
|
||||||
|
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.GetResult_() != nil {
|
||||||
|
result = resp.GetResult_().GetEndMaintenanceResult_()
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *aurora.MaintenanceStatusResult_, error) {
|
||||||
|
|
||||||
|
var result *aurora.MaintenanceStatusResult_
|
||||||
|
|
||||||
|
if len(hosts) == 0 {
|
||||||
|
return nil, nil, errors.New("no hosts provided to get maintenance status from")
|
||||||
|
}
|
||||||
|
|
||||||
|
hostList := aurora.NewHosts()
|
||||||
|
hostList.HostNames = hosts
|
||||||
|
|
||||||
|
r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList)
|
||||||
|
|
||||||
|
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
|
||||||
|
// and continue trying to resend command until we run out of retries.
|
||||||
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
|
return r.adminClient.MaintenanceStatus(nil, hostList)
|
||||||
|
})
|
||||||
|
|
||||||
|
if retryErr != nil {
|
||||||
|
return resp, result, errors.Wrap(retryErr, "Unable to recover connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.GetResult_() != nil {
|
||||||
|
result = resp.GetResult_().GetMaintenanceStatusResult_()
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetQuota sets a quota aggregate for the given role
|
||||||
|
// TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu`
|
||||||
|
func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) (*aurora.Response, error) {
|
||||||
|
ramRes := aurora.NewResource()
|
||||||
|
ramRes.RamMb = ramMb
|
||||||
|
cpuRes := aurora.NewResource()
|
||||||
|
cpuRes.NumCpus = cpu
|
||||||
|
diskRes := aurora.NewResource()
|
||||||
|
diskRes.DiskMb = diskMb
|
||||||
|
quota := aurora.NewResourceAggregate()
|
||||||
|
quota.Resources = []*aurora.Resource{cpuRes, ramRes, diskRes}
|
||||||
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
|
return r.adminClient.SetQuota(nil, role, quota)
|
||||||
|
})
|
||||||
|
|
||||||
|
if retryErr != nil {
|
||||||
|
return resp, errors.Wrap(retryErr, "Unable to set role quota")
|
||||||
|
}
|
||||||
|
return resp, retryErr
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetQuota returns the resource aggregate for the given role
|
||||||
|
func (r *realisClient) GetQuota(role string) (*aurora.Response, error) {
|
||||||
|
|
||||||
|
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
|
return r.adminClient.GetQuota(nil, role)
|
||||||
|
})
|
||||||
|
|
||||||
|
if retryErr != nil {
|
||||||
|
return resp, errors.Wrap(retryErr, "Unable to get role quota")
|
||||||
|
}
|
||||||
|
return resp, retryErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Force Aurora Scheduler to perform a snapshot and write to Mesos log
|
||||||
|
func (r *realisClient) Snapshot() error {
|
||||||
|
|
||||||
|
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
|
return r.adminClient.Snapshot(nil)
|
||||||
|
})
|
||||||
|
|
||||||
|
if retryErr != nil {
|
||||||
|
return errors.Wrap(retryErr, "Unable to recover connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Force Aurora Scheduler to write backup file to a file in the backup directory
|
||||||
|
func (r *realisClient) PerformBackup() error {
|
||||||
|
|
||||||
|
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
|
return r.adminClient.PerformBackup(nil)
|
||||||
|
})
|
||||||
|
|
||||||
|
if retryErr != nil {
|
||||||
|
return errors.Wrap(retryErr, "Unable to recover connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *realisClient) ForceImplicitTaskReconciliation() error {
|
||||||
|
|
||||||
|
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
|
return r.adminClient.TriggerImplicitTaskReconciliation(nil)
|
||||||
|
})
|
||||||
|
|
||||||
|
if retryErr != nil {
|
||||||
|
return errors.Wrap(retryErr, "Unable to recover connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error {
|
||||||
|
|
||||||
|
if batchSize != nil && *batchSize < 1 {
|
||||||
|
return errors.New("Invalid batch size.")
|
||||||
|
}
|
||||||
|
settings := aurora.NewExplicitReconciliationSettings()
|
||||||
|
|
||||||
|
settings.BatchSize = batchSize
|
||||||
|
|
||||||
|
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
|
||||||
|
return r.adminClient.TriggerExplicitTaskReconciliation(nil, settings)
|
||||||
|
})
|
||||||
|
|
||||||
|
if retryErr != nil {
|
||||||
|
return errors.Wrap(retryErr, "Unable to recover connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue