Upgrading gorealis v1 to Thrift 0.12.0 code generation. End to end tests cleanup (#96)

* Ported all code from Thrift 0.9.3 to Thrift 0.12.0 while backporting some fixes from gorealis v2

* Removing git.apache.org dependency from Vendor folder as this dependency has migrated to github.

* Adding github.com thrift dependency back but now it points to github.com

* Removing unnecessary files from Thrift Vendor folder and adding them to .gitignore.

* Updating dep dependencies to include Thrift 0.12.0 from github.com

* Adding changelog.

* End to end tests: Adding coverage for killinstances.

*  End to end tests: Deleting instances after partition policy recovers them.

*  End to end tests: Adding more coverage to the realis API.

*  End to end tests: Allowing arguments to be passed to runTestMac so that '-run <test name>' can be passed in.

*  End to end tests: Reducing the resources used by CreateJob test.

*  End to end tests: Adding coverage for Pause and Resume update.

*   End to end tests: Removed checks for Aurora_OK response as that should always be handled by the error returned by the API. Changed names to be less verbose and repetitive.

*  End to end tests: Reducing watch time for instance running when creating service for reducing time it takes to run end to end test.
This commit is contained in:
Renan DelValle 2019-02-20 11:11:46 -08:00 committed by GitHub
parent 2b7eb3a852
commit 79fa7ba16d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2150 changed files with 32523 additions and 412691 deletions

188
realis.go
View file

@ -26,17 +26,18 @@ import (
"net/http/cookiejar"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"git.apache.org/thrift.git/lib/go/thrift"
"github.com/apache/thrift/lib/go/thrift"
"github.com/paypal/gorealis/gen-go/apache/aurora"
"github.com/paypal/gorealis/response"
"github.com/pkg/errors"
)
const VERSION = "1.3.1"
const VERSION = "1.4.0"
// TODO(rdelvalle): Move documentation to interface in order to make godoc look better/more accessible
type Realis interface {
@ -46,7 +47,7 @@ type Realis interface {
CreateService(auroraJob Job, settings *aurora.JobUpdateSettings) (*aurora.Response, *aurora.StartJobUpdateResult_, error)
DescheduleCronJob(key *aurora.JobKey) (*aurora.Response, error)
FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error)
GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error)
GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error)
GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQuery) (*aurora.Response, error)
GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error)
GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error)
@ -95,6 +96,7 @@ type realisClient struct {
logger LevelLogger
lock *sync.Mutex
debug bool
transport thrift.TTransport
}
type RealisConfig struct {
@ -366,7 +368,8 @@ func NewRealisClient(options ...ClientOption) (Realis, error) {
readonlyClient: aurora.NewReadOnlySchedulerClientFactory(config.transport, config.protoFactory),
adminClient: aurora.NewAuroraAdminClientFactory(config.transport, config.protoFactory),
logger: LevelLogger{Logger: config.logger, debug: config.debug, trace: config.trace},
lock: &sync.Mutex{}}, nil
lock: &sync.Mutex{},
transport: config.transport}, nil
}
func GetDefaultClusterFromZKUrl(zkurl string) *Cluster {
@ -525,13 +528,11 @@ func (r *realisClient) Close() {
r.lock.Lock()
defer r.lock.Unlock()
r.client.Transport.Close()
r.readonlyClient.Transport.Close()
r.adminClient.Transport.Close()
r.transport.Close()
}
// Uses predefined set of states to retrieve a set of active jobs in Apache Aurora.
func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.ScheduleStatus]bool) (map[int32]bool, error) {
func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states []aurora.ScheduleStatus) ([]int32, error) {
taskQ := &aurora.TaskQuery{
Role: &key.Role,
Environment: &key.Environment,
@ -542,7 +543,7 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", taskQ)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(taskQ)
return r.client.GetTasksWithoutConfigs(nil, taskQ)
})
// If we encountered an error we couldn't recover from by retrying, return an error to the user
@ -552,9 +553,9 @@ func (r *realisClient) GetInstanceIds(key *aurora.JobKey, states map[aurora.Sche
// Construct instance id map to stay in line with thrift's representation of sets
tasks := response.ScheduleStatusResult(resp).GetTasks()
jobInstanceIds := make(map[int32]bool)
jobInstanceIds := make([]int32, 0, len(tasks))
for _, task := range tasks {
jobInstanceIds[task.GetAssignedTask().GetInstanceId()] = true
jobInstanceIds = append(jobInstanceIds, task.GetAssignedTask().GetInstanceId())
}
return jobInstanceIds, nil
@ -565,7 +566,7 @@ func (r *realisClient) GetJobUpdateSummaries(jobUpdateQuery *aurora.JobUpdateQue
r.logger.DebugPrintf("GetJobUpdateSummaries Thrift Payload: %+v\n", jobUpdateQuery)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.readonlyClient.GetJobUpdateSummaries(jobUpdateQuery)
return r.readonlyClient.GetJobUpdateSummaries(nil, jobUpdateQuery)
})
if retryErr != nil {
@ -580,7 +581,7 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
var result *aurora.GetJobsResult_
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.readonlyClient.GetJobs(role)
return r.readonlyClient.GetJobs(nil, role)
})
if retryErr != nil {
@ -598,14 +599,8 @@ func (r *realisClient) GetJobs(role string) (*aurora.Response, *aurora.GetJobsRe
func (r *realisClient) KillInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
r.logger.DebugPrintf("KillTasks Thrift Payload: %+v %v\n", key, instances)
instanceIds := make(map[int32]bool)
for _, instId := range instances {
instanceIds[instId] = true
}
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.KillTasks(key, instanceIds, "")
return r.client.KillTasks(nil, key, instances, "")
})
if retryErr != nil {
@ -625,7 +620,7 @@ func (r *realisClient) KillJob(key *aurora.JobKey) (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
// Giving the KillTasks thrift call an empty set tells the Aurora scheduler to kill all active shards
return r.client.KillTasks(key, nil, "")
return r.client.KillTasks(nil, key, nil, "")
})
if retryErr != nil {
@ -643,7 +638,7 @@ func (r *realisClient) CreateJob(auroraJob Job) (*aurora.Response, error) {
r.logger.DebugPrintf("CreateJob Thrift Payload: %+v\n", auroraJob.JobConfig())
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.CreateJob(auroraJob.JobConfig())
return r.client.CreateJob(nil, auroraJob.JobConfig())
})
if retryErr != nil {
@ -674,7 +669,7 @@ func (r *realisClient) ScheduleCronJob(auroraJob Job) (*aurora.Response, error)
r.logger.DebugPrintf("ScheduleCronJob Thrift Payload: %+v\n", auroraJob.JobConfig())
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.ScheduleCronJob(auroraJob.JobConfig())
return r.client.ScheduleCronJob(nil, auroraJob.JobConfig())
})
if retryErr != nil {
@ -688,7 +683,7 @@ func (r *realisClient) DescheduleCronJob(key *aurora.JobKey) (*aurora.Response,
r.logger.DebugPrintf("DescheduleCronJob Thrift Payload: %+v\n", key)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.DescheduleCronJob(key)
return r.client.DescheduleCronJob(nil, key)
})
if retryErr != nil {
@ -704,7 +699,7 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
r.logger.DebugPrintf("StartCronJob Thrift Payload: %+v\n", key)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.StartCronJob(key)
return r.client.StartCronJob(nil, key)
})
if retryErr != nil {
@ -718,14 +713,8 @@ func (r *realisClient) StartCronJob(key *aurora.JobKey) (*aurora.Response, error
func (r *realisClient) RestartInstances(key *aurora.JobKey, instances ...int32) (*aurora.Response, error) {
r.logger.DebugPrintf("RestartShards Thrift Payload: %+v %v\n", key, instances)
instanceIds := make(map[int32]bool)
for _, instId := range instances {
instanceIds[instId] = true
}
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.RestartShards(key, instanceIds)
return r.client.RestartShards(nil, key, instances)
})
if retryErr != nil {
@ -746,7 +735,7 @@ func (r *realisClient) RestartJob(key *aurora.JobKey) (*aurora.Response, error)
if len(instanceIds) > 0 {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.RestartShards(key, instanceIds)
return r.client.RestartShards(nil, key, instanceIds)
})
if retryErr != nil {
@ -765,7 +754,7 @@ func (r *realisClient) StartJobUpdate(updateJob *UpdateJob, message string) (*au
r.logger.DebugPrintf("StartJobUpdate Thrift Payload: %+v %v\n", updateJob, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.StartJobUpdate(updateJob.req, message)
return r.client.StartJobUpdate(nil, updateJob.req, message)
})
if retryErr != nil {
@ -782,7 +771,7 @@ func (r *realisClient) AbortJobUpdate(updateKey aurora.JobUpdateKey, message str
r.logger.DebugPrintf("AbortJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.AbortJobUpdate(&updateKey, message)
return r.client.AbortJobUpdate(nil, &updateKey, message)
})
if retryErr != nil {
@ -802,7 +791,7 @@ func (r *realisClient) PauseJobUpdate(updateKey *aurora.JobUpdateKey, message st
r.logger.DebugPrintf("PauseJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.PauseJobUpdate(updateKey, message)
return r.client.PauseJobUpdate(nil, updateKey, message)
})
if retryErr != nil {
@ -818,7 +807,7 @@ func (r *realisClient) ResumeJobUpdate(updateKey *aurora.JobUpdateKey, message s
r.logger.DebugPrintf("ResumeJobUpdate Thrift Payload: %+v %v\n", updateKey, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.ResumeJobUpdate(updateKey, message)
return r.client.ResumeJobUpdate(nil, updateKey, message)
})
if retryErr != nil {
@ -834,7 +823,7 @@ func (r *realisClient) PulseJobUpdate(updateKey *aurora.JobUpdateKey) (*aurora.R
r.logger.DebugPrintf("PulseJobUpdate Thrift Payload: %+v\n", updateKey)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.PulseJobUpdate(updateKey)
return r.client.PulseJobUpdate(nil, updateKey)
})
if retryErr != nil {
@ -851,7 +840,7 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
r.logger.DebugPrintf("AddInstances Thrift Payload: %+v %v\n", instKey, count)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.AddInstances(&instKey, count)
return r.client.AddInstances(nil, &instKey, count)
})
if retryErr != nil {
@ -861,35 +850,34 @@ func (r *realisClient) AddInstances(instKey aurora.InstanceKey, count int32) (*a
}
//Scale down the number of instances under a job configuration using the configuration of a specific instance
// Scale down the number of instances under a job configuration using the configuration of a specific instance
func (r *realisClient) RemoveInstances(key *aurora.JobKey, count int32) (*aurora.Response, error) {
instanceIds, err := r.GetInstanceIds(key, aurora.ACTIVE_STATES)
if err != nil {
return nil, errors.Wrap(err, "RemoveInstances: Could not retrieve relevant instance IDs")
}
if len(instanceIds) < int(count) {
return nil, errors.New(fmt.Sprintf("RemoveInstances: No sufficient instances to Kill - "+
"Instances to kill %d Total Instances %d", count, len(instanceIds)))
return nil, errors.Errorf("Insufficient active instances available for killing: "+
" Instances to be killed %d Active instances %d", count, len(instanceIds))
}
instanceList := make([]int32, count)
i := 0
for k := range instanceIds {
instanceList[i] = k
i += 1
if i == int(count) {
break
}
}
return r.KillInstances(key, instanceList...)
// Sort instanceIds in ** decreasing ** order
sort.Slice(instanceIds, func(i, j int) bool {
return instanceIds[i] > instanceIds[j]
})
// Kill the instances with the highest ID number first
return r.KillInstances(key, instanceIds[:count]...)
}
// Get information about task including a fully hydrated task configuration object
func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) {
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", query)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksStatus(query)
return r.client.GetTasksStatus(nil, query)
})
if retryErr != nil {
@ -900,36 +888,34 @@ func (r *realisClient) GetTaskStatus(query *aurora.TaskQuery) (tasks []*aurora.S
}
// Get pending reason
func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) (pendingReasons []*aurora.PendingReason, e error) {
func (r *realisClient) GetPendingReason(query *aurora.TaskQuery) ([]*aurora.PendingReason, error) {
r.logger.DebugPrintf("GetPendingReason Thrift Payload: %+v\n", query)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetPendingReason(query)
return r.client.GetPendingReason(nil, query)
})
if retryErr != nil {
return nil, errors.Wrap(retryErr, "Error querying Aurora Scheduler for pending Reasons")
}
var result map[*aurora.PendingReason]bool
var pendingReasons []*aurora.PendingReason
if resp.GetResult_() != nil {
result = resp.GetResult_().GetGetPendingReasonResult_().GetReasons()
}
for reason := range result {
pendingReasons = append(pendingReasons, reason)
pendingReasons = resp.GetResult_().GetGetPendingReasonResult_().GetReasons()
}
return pendingReasons, nil
}
// Get information about task including without a task configuration object
func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []*aurora.ScheduledTask, e error) {
func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) ([]*aurora.ScheduledTask, error) {
r.logger.DebugPrintf("GetTasksWithoutConfigs Thrift Payload: %+v\n", query)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksWithoutConfigs(query)
return r.client.GetTasksWithoutConfigs(nil, query)
})
if retryErr != nil {
@ -942,22 +928,18 @@ func (r *realisClient) GetTasksWithoutConfigs(query *aurora.TaskQuery) (tasks []
// Get the task configuration from the aurora scheduler for a job
func (r *realisClient) FetchTaskConfig(instKey aurora.InstanceKey) (*aurora.TaskConfig, error) {
ids := make(map[int32]bool)
ids[instKey.InstanceId] = true
taskQ := &aurora.TaskQuery{
Role: &instKey.JobKey.Role,
Environment: &instKey.JobKey.Environment,
JobName: &instKey.JobKey.Name,
InstanceIds: ids,
InstanceIds: []int32{instKey.InstanceId},
Statuses: aurora.ACTIVE_STATES,
}
r.logger.DebugPrintf("GetTasksStatus Thrift Payload: %+v\n", taskQ)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetTasksStatus(taskQ)
return r.client.GetTasksStatus(nil, taskQ)
})
if retryErr != nil {
@ -983,7 +965,7 @@ func (r *realisClient) JobUpdateDetails(updateQuery aurora.JobUpdateQuery) (*aur
r.logger.DebugPrintf("GetJobUpdateDetails Thrift Payload: %+v\n", updateQuery)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.GetJobUpdateDetails(&updateQuery)
return r.client.GetJobUpdateDetails(nil, &updateQuery)
})
if retryErr != nil {
@ -998,7 +980,7 @@ func (r *realisClient) RollbackJobUpdate(key aurora.JobUpdateKey, message string
r.logger.DebugPrintf("RollbackJobUpdate Thrift Payload: %+v %v\n", key, message)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.client.RollbackJobUpdate(&key, message)
return r.client.RollbackJobUpdate(nil, &key, message)
})
if retryErr != nil {
@ -1023,15 +1005,12 @@ func (r *realisClient) DrainHosts(hosts ...string) (*aurora.Response, *aurora.Dr
}
drainList := aurora.NewHosts()
drainList.HostNames = make(map[string]bool)
for _, host := range hosts {
drainList.HostNames[host] = true
}
drainList.HostNames = hosts
r.logger.DebugPrintf("DrainHosts Thrift Payload: %v\n", drainList)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.DrainHosts(drainList)
return r.adminClient.DrainHosts(nil, drainList)
})
if retryErr != nil {
@ -1056,15 +1035,12 @@ func (r *realisClient) SLADrainHosts(policy *aurora.SlaPolicy, timeout int64, ho
}
drainList := aurora.NewHosts()
drainList.HostNames = make(map[string]bool)
for _, host := range hosts {
drainList.HostNames[host] = true
}
drainList.HostNames = hosts
r.logger.DebugPrintf("SLADrainHosts Thrift Payload: %v\n", drainList)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.SlaDrainHosts(drainList, policy, timeout)
return r.adminClient.SlaDrainHosts(nil, drainList, policy, timeout)
})
if retryErr != nil {
@ -1087,15 +1063,12 @@ func (r *realisClient) StartMaintenance(hosts ...string) (*aurora.Response, *aur
}
hostList := aurora.NewHosts()
hostList.HostNames = make(map[string]bool)
for _, host := range hosts {
hostList.HostNames[host] = true
}
hostList.HostNames = hosts
r.logger.DebugPrintf("StartMaintenance Thrift Payload: %v\n", hostList)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.StartMaintenance(hostList)
return r.adminClient.StartMaintenance(nil, hostList)
})
if retryErr != nil {
@ -1118,15 +1091,12 @@ func (r *realisClient) EndMaintenance(hosts ...string) (*aurora.Response, *auror
}
hostList := aurora.NewHosts()
hostList.HostNames = make(map[string]bool)
for _, host := range hosts {
hostList.HostNames[host] = true
}
hostList.HostNames = hosts
r.logger.DebugPrintf("EndMaintenance Thrift Payload: %v\n", hostList)
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.EndMaintenance(hostList)
return r.adminClient.EndMaintenance(nil, hostList)
})
if retryErr != nil {
@ -1149,17 +1119,14 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
}
hostList := aurora.NewHosts()
hostList.HostNames = make(map[string]bool)
for _, host := range hosts {
hostList.HostNames[host] = true
}
hostList.HostNames = hosts
r.logger.DebugPrintf("MaintenanceStatus Thrift Payload: %v\n", hostList)
// Make thrift call. If we encounter an error sending the call, attempt to reconnect
// and continue trying to resend command until we run out of retries.
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.MaintenanceStatus(hostList)
return r.adminClient.MaintenanceStatus(nil, hostList)
})
if retryErr != nil {
@ -1176,19 +1143,16 @@ func (r *realisClient) MaintenanceStatus(hosts ...string) (*aurora.Response, *au
// SetQuota sets a quota aggregate for the given role
// TODO(zircote) Currently investigating an error that is returned from thrift calls that include resources for `NamedPort` and `NumGpu`
func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb *int64) (*aurora.Response, error) {
ram := aurora.NewResource()
ram.RamMb = ramMb
c := aurora.NewResource()
c.NumCpus = cpu
d := aurora.NewResource()
d.DiskMb = diskMb
ramRes := aurora.NewResource()
ramRes.RamMb = ramMb
cpuRes := aurora.NewResource()
cpuRes.NumCpus = cpu
diskRes := aurora.NewResource()
diskRes.DiskMb = diskMb
quota := aurora.NewResourceAggregate()
quota.Resources = make(map[*aurora.Resource]bool)
quota.Resources[ram] = true
quota.Resources[c] = true
quota.Resources[d] = true
quota.Resources = []*aurora.Resource{cpuRes, ramRes, diskRes}
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.SetQuota(role, quota)
return r.adminClient.SetQuota(nil, role, quota)
})
if retryErr != nil {
@ -1202,7 +1166,7 @@ func (r *realisClient) SetQuota(role string, cpu *float64, ramMb *int64, diskMb
func (r *realisClient) GetQuota(role string) (*aurora.Response, error) {
resp, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.GetQuota(role)
return r.adminClient.GetQuota(nil, role)
})
if retryErr != nil {
@ -1215,7 +1179,7 @@ func (r *realisClient) GetQuota(role string) (*aurora.Response, error) {
func (r *realisClient) Snapshot() error {
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.Snapshot()
return r.adminClient.Snapshot(nil)
})
if retryErr != nil {
@ -1229,7 +1193,7 @@ func (r *realisClient) Snapshot() error {
func (r *realisClient) PerformBackup() error {
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.PerformBackup()
return r.adminClient.PerformBackup(nil)
})
if retryErr != nil {
@ -1242,7 +1206,7 @@ func (r *realisClient) PerformBackup() error {
func (r *realisClient) ForceImplicitTaskReconciliation() error {
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.TriggerImplicitTaskReconciliation()
return r.adminClient.TriggerImplicitTaskReconciliation(nil)
})
if retryErr != nil {
@ -1262,7 +1226,7 @@ func (r *realisClient) ForceExplicitTaskReconciliation(batchSize *int32) error {
settings.BatchSize = batchSize
_, retryErr := r.thriftCallWithRetries(func() (*aurora.Response, error) {
return r.adminClient.TriggerExplicitTaskReconciliation(settings)
return r.adminClient.TriggerExplicitTaskReconciliation(nil, settings)
})
if retryErr != nil {