Adding a monitor that is friendly with auto pause.

This commit is contained in:
Renan DelValle 2020-05-05 20:47:15 -07:00
parent 45eac75e47
commit 6c954c14fd
No known key found for this signature in database
GPG key ID: C240AD6D6F443EC9
3 changed files with 154 additions and 0 deletions

View file

@ -244,3 +244,68 @@ func (c *Client) MonitorHostMaintenance(hosts []string,
}
}
}
// AutoPaused monitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update
// being monitored is capable of auto pausing and has auto pausing enabled. After verifying this information,
// the monitor watches for the job to enter the ROLL_FORWARD_PAUSED state and calculates the current batch
// the update is in using information from the update configuration.
func (c *Client) MonitorAutoPausedUpdate(key aurora.JobUpdateKey, interval, timeout time.Duration) (int, error) {
key.Job = &aurora.JobKey{
Role: key.Job.Role,
Environment: key.Job.Environment,
Name: key.Job.Name,
}
query := aurora.JobUpdateQuery{
UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES,
Limit: 1,
Key: &key,
}
updateDetails, err := c.JobUpdateDetails(query)
if err != nil {
return -1, errors.Wrap(err, "unable to get information about update")
}
if len(updateDetails) == 0 {
return -1, errors.Errorf("details for update could not be found")
}
updateStrategy := updateDetails[0].Update.Instructions.Settings.UpdateStrategy
var batchSizes []int32
switch {
case updateStrategy.IsSetVarBatchStrategy():
batchSizes = updateStrategy.VarBatchStrategy.GroupSizes
if !updateStrategy.VarBatchStrategy.AutopauseAfterBatch {
return -1, errors.Errorf("update does not have auto pause enabled")
}
case updateStrategy.IsSetBatchStrategy():
batchSizes = []int32{updateStrategy.BatchStrategy.GroupSize}
if !updateStrategy.BatchStrategy.AutopauseAfterBatch {
return -1, errors.Errorf("update does not have auto pause enabled")
}
default:
return -1, errors.Errorf("update is not using a batch update strategy")
}
query.UpdateStatuses = append(TerminalUpdateStates(), aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED)
summary, err := c.MonitorJobUpdateQuery(query, interval, timeout)
if err != nil {
return -1, err
}
// Summary 0 is assumed to exist because MonitorJobUpdateQuery will return an error if there is Summaries
if summary[0].State.Status != aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED {
return -1, errors.Errorf("update is in a terminal state %v", summary[0].State.Status)
}
updatingInstances := make(map[int32]struct{})
for _, e := range updateDetails[0].InstanceEvents {
// We only care about INSTANCE_UPDATING actions because we only care that they've been attempted
if e != nil && e.GetAction() == aurora.JobUpdateAction_INSTANCE_UPDATING {
updatingInstances[e.GetInstanceId()] = struct{}{}
}
}
return calculateCurrentBatch(int32(len(updatingInstances)), batchSizes), nil
}

31
util.go
View file

@ -40,6 +40,18 @@ func init() {
}
}
// TerminalJobUpdateStates returns a slice containing all the terminal states an update may end up in.
// This is a function in order to avoid having a slice that can be accidentally mutated.
func TerminalUpdateStates() []aurora.JobUpdateStatus {
return []aurora.JobUpdateStatus{
aurora.JobUpdateStatus_ROLLED_FORWARD,
aurora.JobUpdateStatus_ROLLED_BACK,
aurora.JobUpdateStatus_ABORTED,
aurora.JobUpdateStatus_ERROR,
aurora.JobUpdateStatus_FAILED,
}
}
func validateAuroraAddress(address string) (string, error) {
// If no protocol defined, assume http
@ -73,3 +85,22 @@ func validateAuroraAddress(address string) (string, error) {
return u.String(), nil
}
func calculateCurrentBatch(updatingInstances int32, batchSizes []int32) int {
for i, size := range batchSizes {
updatingInstances -= size
if updatingInstances <= 0 {
return i
}
}
// Overflow batches
batchCount := len(batchSizes) - 1
lastBatchIndex := len(batchSizes) - 1
batchCount += int(updatingInstances / batchSizes[lastBatchIndex])
if updatingInstances%batchSizes[lastBatchIndex] != 0 {
batchCount++
}
return batchCount
}

58
util_test.go Normal file
View file

@ -0,0 +1,58 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package realis
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestCurrentBatchCalculator(t *testing.T) {
t.Run("singleBatchOverflow", func(t *testing.T) {
curBatch := calculateCurrentBatch(10, []int32{2})
assert.Equal(t, 4, curBatch)
})
t.Run("noInstancesUpdating", func(t *testing.T) {
curBatch := calculateCurrentBatch(0, []int32{2})
assert.Equal(t, 0, curBatch)
})
t.Run("evenMatchSingleBatch", func(t *testing.T) {
curBatch := calculateCurrentBatch(2, []int32{2})
assert.Equal(t, 0, curBatch)
})
t.Run("moreInstancesThanBatches", func(t *testing.T) {
curBatch := calculateCurrentBatch(5, []int32{1, 2})
assert.Equal(t, 2, curBatch)
})
t.Run("moreInstancesThanBatchesDecreasing", func(t *testing.T) {
curBatch := calculateCurrentBatch(5, []int32{2, 1})
assert.Equal(t, 3, curBatch)
})
t.Run("unevenFit", func(t *testing.T) {
curBatch := calculateCurrentBatch(2, []int32{1, 2})
assert.Equal(t, 1, curBatch)
})
t.Run("halfWay", func(t *testing.T) {
curBatch := calculateCurrentBatch(1, []int32{1, 2})
assert.Equal(t, 0, curBatch)
})
}