Upgrade to Aurora 0.22.0 (#5)

* Upgrading to Thrift 0.13.1. This version is a fork of 0.13.0 with a patch on top of it to fix an issue where trying a realis call after the connection has been closed results in a panic.

* Upgrading compose set up to Mesos 1.6.2 and Aurora 0.22.0.

* Adding support for using different update strategies.

* Adding a monitor that is friendly with auto pause.

* Adding tests for new update strategies.
This commit is contained in:
Renán I. Del Valle 2020-05-05 20:55:25 -07:00 committed by GitHub
parent 1d8afcd329
commit 69ced895e2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 2013 additions and 1068 deletions

View file

@ -716,9 +716,40 @@ struct JobUpdateKey {
2: string id
}
/** Limits the amount of active changes being made to instances to groupSize. */
struct QueueJobUpdateStrategy {
1: i32 groupSize
}
/** Similar to Queue strategy but will not start a new group until all instances in an active
* group have finished updating.
*/
struct BatchJobUpdateStrategy {
1: i32 groupSize
/* Update will pause automatically after each batch completes */
2: bool autopauseAfterBatch
}
/** Same as Batch strategy but each time an active group completes, the size of the next active
* group may change.
*/
struct VariableBatchJobUpdateStrategy {
1: list<i32> groupSizes
/* Update will pause automatically after each batch completes */
2: bool autopauseAfterBatch
}
union JobUpdateStrategy {
1: QueueJobUpdateStrategy queueStrategy
2: BatchJobUpdateStrategy batchStrategy
3: VariableBatchJobUpdateStrategy varBatchStrategy
}
/** Job update thresholds and limits. */
struct JobUpdateSettings {
/** Max number of instances being updated at any given moment. */
/** Deprecated, please set value inside of desired update strategy instead.
* Max number of instances being updated at any given moment.
*/
1: i32 updateGroupSize
/** Max number of instance failures to tolerate before marking instance as FAILED. */
@ -736,7 +767,7 @@ struct JobUpdateSettings {
/** Instance IDs to act on. All instances will be affected if this is not set. */
7: set<Range> updateOnlyTheseInstances
/**
/** Deprecated, please set updateStrategy to the Batch strategy instead.
* If true, use updateGroupSize as strict batching boundaries, and avoid proceeding to another
* batch until the preceding batch finishes updating.
*/
@ -755,6 +786,9 @@ struct JobUpdateSettings {
* differs between the old and new task configurations, updates will use the newest configuration.
*/
10: optional bool slaAware
/** Update strategy to be used for the update. See JobUpdateStrategy for choices. */
11: optional JobUpdateStrategy updateStrategy
}
/** Event marking a state transition in job update lifecycle. */

View file

@ -14,7 +14,7 @@ services:
ipv4_address: 192.168.33.2
master:
image: rdelvalle/mesos-master:1.5.1
image: rdelvalle/mesos-master:1.6.2
restart: on-failure
ports:
- "5050:5050"
@ -32,7 +32,7 @@ services:
- zk
agent-one:
image: rdelvalle/mesos-agent:1.5.1
image: rdelvalle/mesos-agent:1.6.2
pid: host
restart: on-failure
ports:
@ -56,7 +56,7 @@ services:
- zk
aurora-one:
image: rdelvalle/aurora:0.21.0
image: rdelvalle/aurora:0.22.0
pid: host
ports:
- "8081:8081"
@ -69,6 +69,7 @@ services:
-http_authentication_mechanism=BASIC
-shiro_realm_modules=INI_AUTHNZ
-shiro_ini_path=/etc/aurora/security.ini
-min_required_instances_for_sla_check=1
volumes:
- ./.aurora-config:/etc/aurora
networks:

View file

@ -1,4 +1,4 @@
// Autogenerated by Thrift Compiler (0.12.0)
// Autogenerated by Thrift Compiler (0.13.0)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
package aurora

View file

@ -1,9 +1,9 @@
// Autogenerated by Thrift Compiler (0.12.0)
// Autogenerated by Thrift Compiler (0.13.0)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
package aurora
import (
import(
"bytes"
"context"
"reflect"

File diff suppressed because it is too large Load diff

View file

@ -1,22 +1,23 @@
// Autogenerated by Thrift Compiler (0.12.0)
// Autogenerated by Thrift Compiler (0.13.0)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
package main
import (
"context"
"flag"
"fmt"
"math"
"net"
"net/url"
"os"
"strconv"
"strings"
"github.com/apache/thrift/lib/go/thrift"
"apache/aurora"
"context"
"flag"
"fmt"
"math"
"net"
"net/url"
"os"
"strconv"
"strings"
"github.com/apache/thrift/lib/go/thrift"
"apache/aurora"
)
var _ = aurora.GoUnusedProtection__
func Usage() {
fmt.Fprintln(os.Stderr, "Usage of ", os.Args[0], " [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]:")
@ -175,19 +176,19 @@ func main() {
fmt.Fprintln(os.Stderr, "CreateJob requires 1 args")
flag.Usage()
}
arg162 := flag.Arg(1)
mbTrans163 := thrift.NewTMemoryBufferLen(len(arg162))
defer mbTrans163.Close()
_, err164 := mbTrans163.WriteString(arg162)
if err164 != nil {
arg163 := flag.Arg(1)
mbTrans164 := thrift.NewTMemoryBufferLen(len(arg163))
defer mbTrans164.Close()
_, err165 := mbTrans164.WriteString(arg163)
if err165 != nil {
Usage()
return
}
factory165 := thrift.NewTJSONProtocolFactory()
jsProt166 := factory165.GetProtocol(mbTrans163)
factory166 := thrift.NewTJSONProtocolFactory()
jsProt167 := factory166.GetProtocol(mbTrans164)
argvalue0 := aurora.NewJobConfiguration()
err167 := argvalue0.Read(jsProt166)
if err167 != nil {
err168 := argvalue0.Read(jsProt167)
if err168 != nil {
Usage()
return
}
@ -200,19 +201,19 @@ func main() {
fmt.Fprintln(os.Stderr, "ScheduleCronJob requires 1 args")
flag.Usage()
}
arg168 := flag.Arg(1)
mbTrans169 := thrift.NewTMemoryBufferLen(len(arg168))
defer mbTrans169.Close()
_, err170 := mbTrans169.WriteString(arg168)
if err170 != nil {
arg169 := flag.Arg(1)
mbTrans170 := thrift.NewTMemoryBufferLen(len(arg169))
defer mbTrans170.Close()
_, err171 := mbTrans170.WriteString(arg169)
if err171 != nil {
Usage()
return
}
factory171 := thrift.NewTJSONProtocolFactory()
jsProt172 := factory171.GetProtocol(mbTrans169)
factory172 := thrift.NewTJSONProtocolFactory()
jsProt173 := factory172.GetProtocol(mbTrans170)
argvalue0 := aurora.NewJobConfiguration()
err173 := argvalue0.Read(jsProt172)
if err173 != nil {
err174 := argvalue0.Read(jsProt173)
if err174 != nil {
Usage()
return
}
@ -225,19 +226,19 @@ func main() {
fmt.Fprintln(os.Stderr, "DescheduleCronJob requires 1 args")
flag.Usage()
}
arg174 := flag.Arg(1)
mbTrans175 := thrift.NewTMemoryBufferLen(len(arg174))
defer mbTrans175.Close()
_, err176 := mbTrans175.WriteString(arg174)
if err176 != nil {
arg175 := flag.Arg(1)
mbTrans176 := thrift.NewTMemoryBufferLen(len(arg175))
defer mbTrans176.Close()
_, err177 := mbTrans176.WriteString(arg175)
if err177 != nil {
Usage()
return
}
factory177 := thrift.NewTJSONProtocolFactory()
jsProt178 := factory177.GetProtocol(mbTrans175)
factory178 := thrift.NewTJSONProtocolFactory()
jsProt179 := factory178.GetProtocol(mbTrans176)
argvalue0 := aurora.NewJobKey()
err179 := argvalue0.Read(jsProt178)
if err179 != nil {
err180 := argvalue0.Read(jsProt179)
if err180 != nil {
Usage()
return
}
@ -250,19 +251,19 @@ func main() {
fmt.Fprintln(os.Stderr, "StartCronJob requires 1 args")
flag.Usage()
}
arg180 := flag.Arg(1)
mbTrans181 := thrift.NewTMemoryBufferLen(len(arg180))
defer mbTrans181.Close()
_, err182 := mbTrans181.WriteString(arg180)
if err182 != nil {
arg181 := flag.Arg(1)
mbTrans182 := thrift.NewTMemoryBufferLen(len(arg181))
defer mbTrans182.Close()
_, err183 := mbTrans182.WriteString(arg181)
if err183 != nil {
Usage()
return
}
factory183 := thrift.NewTJSONProtocolFactory()
jsProt184 := factory183.GetProtocol(mbTrans181)
factory184 := thrift.NewTJSONProtocolFactory()
jsProt185 := factory184.GetProtocol(mbTrans182)
argvalue0 := aurora.NewJobKey()
err185 := argvalue0.Read(jsProt184)
if err185 != nil {
err186 := argvalue0.Read(jsProt185)
if err186 != nil {
Usage()
return
}
@ -275,36 +276,36 @@ func main() {
fmt.Fprintln(os.Stderr, "RestartShards requires 2 args")
flag.Usage()
}
arg186 := flag.Arg(1)
mbTrans187 := thrift.NewTMemoryBufferLen(len(arg186))
defer mbTrans187.Close()
_, err188 := mbTrans187.WriteString(arg186)
if err188 != nil {
arg187 := flag.Arg(1)
mbTrans188 := thrift.NewTMemoryBufferLen(len(arg187))
defer mbTrans188.Close()
_, err189 := mbTrans188.WriteString(arg187)
if err189 != nil {
Usage()
return
}
factory189 := thrift.NewTJSONProtocolFactory()
jsProt190 := factory189.GetProtocol(mbTrans187)
factory190 := thrift.NewTJSONProtocolFactory()
jsProt191 := factory190.GetProtocol(mbTrans188)
argvalue0 := aurora.NewJobKey()
err191 := argvalue0.Read(jsProt190)
if err191 != nil {
err192 := argvalue0.Read(jsProt191)
if err192 != nil {
Usage()
return
}
value0 := argvalue0
arg192 := flag.Arg(2)
mbTrans193 := thrift.NewTMemoryBufferLen(len(arg192))
defer mbTrans193.Close()
_, err194 := mbTrans193.WriteString(arg192)
if err194 != nil {
arg193 := flag.Arg(2)
mbTrans194 := thrift.NewTMemoryBufferLen(len(arg193))
defer mbTrans194.Close()
_, err195 := mbTrans194.WriteString(arg193)
if err195 != nil {
Usage()
return
}
factory195 := thrift.NewTJSONProtocolFactory()
jsProt196 := factory195.GetProtocol(mbTrans193)
factory196 := thrift.NewTJSONProtocolFactory()
jsProt197 := factory196.GetProtocol(mbTrans194)
containerStruct1 := aurora.NewAuroraSchedulerManagerRestartShardsArgs()
err197 := containerStruct1.ReadField2(jsProt196)
if err197 != nil {
err198 := containerStruct1.ReadField2(jsProt197)
if err198 != nil {
Usage()
return
}
@ -318,36 +319,36 @@ func main() {
fmt.Fprintln(os.Stderr, "KillTasks requires 3 args")
flag.Usage()
}
arg198 := flag.Arg(1)
mbTrans199 := thrift.NewTMemoryBufferLen(len(arg198))
defer mbTrans199.Close()
_, err200 := mbTrans199.WriteString(arg198)
if err200 != nil {
arg199 := flag.Arg(1)
mbTrans200 := thrift.NewTMemoryBufferLen(len(arg199))
defer mbTrans200.Close()
_, err201 := mbTrans200.WriteString(arg199)
if err201 != nil {
Usage()
return
}
factory201 := thrift.NewTJSONProtocolFactory()
jsProt202 := factory201.GetProtocol(mbTrans199)
factory202 := thrift.NewTJSONProtocolFactory()
jsProt203 := factory202.GetProtocol(mbTrans200)
argvalue0 := aurora.NewJobKey()
err203 := argvalue0.Read(jsProt202)
if err203 != nil {
err204 := argvalue0.Read(jsProt203)
if err204 != nil {
Usage()
return
}
value0 := argvalue0
arg204 := flag.Arg(2)
mbTrans205 := thrift.NewTMemoryBufferLen(len(arg204))
defer mbTrans205.Close()
_, err206 := mbTrans205.WriteString(arg204)
if err206 != nil {
arg205 := flag.Arg(2)
mbTrans206 := thrift.NewTMemoryBufferLen(len(arg205))
defer mbTrans206.Close()
_, err207 := mbTrans206.WriteString(arg205)
if err207 != nil {
Usage()
return
}
factory207 := thrift.NewTJSONProtocolFactory()
jsProt208 := factory207.GetProtocol(mbTrans205)
factory208 := thrift.NewTJSONProtocolFactory()
jsProt209 := factory208.GetProtocol(mbTrans206)
containerStruct1 := aurora.NewAuroraSchedulerManagerKillTasksArgs()
err209 := containerStruct1.ReadField2(jsProt208)
if err209 != nil {
err210 := containerStruct1.ReadField2(jsProt209)
if err210 != nil {
Usage()
return
}
@ -363,25 +364,25 @@ func main() {
fmt.Fprintln(os.Stderr, "AddInstances requires 2 args")
flag.Usage()
}
arg211 := flag.Arg(1)
mbTrans212 := thrift.NewTMemoryBufferLen(len(arg211))
defer mbTrans212.Close()
_, err213 := mbTrans212.WriteString(arg211)
if err213 != nil {
arg212 := flag.Arg(1)
mbTrans213 := thrift.NewTMemoryBufferLen(len(arg212))
defer mbTrans213.Close()
_, err214 := mbTrans213.WriteString(arg212)
if err214 != nil {
Usage()
return
}
factory214 := thrift.NewTJSONProtocolFactory()
jsProt215 := factory214.GetProtocol(mbTrans212)
factory215 := thrift.NewTJSONProtocolFactory()
jsProt216 := factory215.GetProtocol(mbTrans213)
argvalue0 := aurora.NewInstanceKey()
err216 := argvalue0.Read(jsProt215)
if err216 != nil {
err217 := argvalue0.Read(jsProt216)
if err217 != nil {
Usage()
return
}
value0 := argvalue0
tmp1, err217 := (strconv.Atoi(flag.Arg(2)))
if err217 != nil {
tmp1, err218 := (strconv.Atoi(flag.Arg(2)))
if err218 != nil {
Usage()
return
}
@ -395,19 +396,19 @@ func main() {
fmt.Fprintln(os.Stderr, "ReplaceCronTemplate requires 1 args")
flag.Usage()
}
arg218 := flag.Arg(1)
mbTrans219 := thrift.NewTMemoryBufferLen(len(arg218))
defer mbTrans219.Close()
_, err220 := mbTrans219.WriteString(arg218)
if err220 != nil {
arg219 := flag.Arg(1)
mbTrans220 := thrift.NewTMemoryBufferLen(len(arg219))
defer mbTrans220.Close()
_, err221 := mbTrans220.WriteString(arg219)
if err221 != nil {
Usage()
return
}
factory221 := thrift.NewTJSONProtocolFactory()
jsProt222 := factory221.GetProtocol(mbTrans219)
factory222 := thrift.NewTJSONProtocolFactory()
jsProt223 := factory222.GetProtocol(mbTrans220)
argvalue0 := aurora.NewJobConfiguration()
err223 := argvalue0.Read(jsProt222)
if err223 != nil {
err224 := argvalue0.Read(jsProt223)
if err224 != nil {
Usage()
return
}
@ -420,19 +421,19 @@ func main() {
fmt.Fprintln(os.Stderr, "StartJobUpdate requires 2 args")
flag.Usage()
}
arg224 := flag.Arg(1)
mbTrans225 := thrift.NewTMemoryBufferLen(len(arg224))
defer mbTrans225.Close()
_, err226 := mbTrans225.WriteString(arg224)
if err226 != nil {
arg225 := flag.Arg(1)
mbTrans226 := thrift.NewTMemoryBufferLen(len(arg225))
defer mbTrans226.Close()
_, err227 := mbTrans226.WriteString(arg225)
if err227 != nil {
Usage()
return
}
factory227 := thrift.NewTJSONProtocolFactory()
jsProt228 := factory227.GetProtocol(mbTrans225)
factory228 := thrift.NewTJSONProtocolFactory()
jsProt229 := factory228.GetProtocol(mbTrans226)
argvalue0 := aurora.NewJobUpdateRequest()
err229 := argvalue0.Read(jsProt228)
if err229 != nil {
err230 := argvalue0.Read(jsProt229)
if err230 != nil {
Usage()
return
}
@ -447,19 +448,19 @@ func main() {
fmt.Fprintln(os.Stderr, "PauseJobUpdate requires 2 args")
flag.Usage()
}
arg231 := flag.Arg(1)
mbTrans232 := thrift.NewTMemoryBufferLen(len(arg231))
defer mbTrans232.Close()
_, err233 := mbTrans232.WriteString(arg231)
if err233 != nil {
arg232 := flag.Arg(1)
mbTrans233 := thrift.NewTMemoryBufferLen(len(arg232))
defer mbTrans233.Close()
_, err234 := mbTrans233.WriteString(arg232)
if err234 != nil {
Usage()
return
}
factory234 := thrift.NewTJSONProtocolFactory()
jsProt235 := factory234.GetProtocol(mbTrans232)
factory235 := thrift.NewTJSONProtocolFactory()
jsProt236 := factory235.GetProtocol(mbTrans233)
argvalue0 := aurora.NewJobUpdateKey()
err236 := argvalue0.Read(jsProt235)
if err236 != nil {
err237 := argvalue0.Read(jsProt236)
if err237 != nil {
Usage()
return
}
@ -474,19 +475,19 @@ func main() {
fmt.Fprintln(os.Stderr, "ResumeJobUpdate requires 2 args")
flag.Usage()
}
arg238 := flag.Arg(1)
mbTrans239 := thrift.NewTMemoryBufferLen(len(arg238))
defer mbTrans239.Close()
_, err240 := mbTrans239.WriteString(arg238)
if err240 != nil {
arg239 := flag.Arg(1)
mbTrans240 := thrift.NewTMemoryBufferLen(len(arg239))
defer mbTrans240.Close()
_, err241 := mbTrans240.WriteString(arg239)
if err241 != nil {
Usage()
return
}
factory241 := thrift.NewTJSONProtocolFactory()
jsProt242 := factory241.GetProtocol(mbTrans239)
factory242 := thrift.NewTJSONProtocolFactory()
jsProt243 := factory242.GetProtocol(mbTrans240)
argvalue0 := aurora.NewJobUpdateKey()
err243 := argvalue0.Read(jsProt242)
if err243 != nil {
err244 := argvalue0.Read(jsProt243)
if err244 != nil {
Usage()
return
}
@ -501,19 +502,19 @@ func main() {
fmt.Fprintln(os.Stderr, "AbortJobUpdate requires 2 args")
flag.Usage()
}
arg245 := flag.Arg(1)
mbTrans246 := thrift.NewTMemoryBufferLen(len(arg245))
defer mbTrans246.Close()
_, err247 := mbTrans246.WriteString(arg245)
if err247 != nil {
arg246 := flag.Arg(1)
mbTrans247 := thrift.NewTMemoryBufferLen(len(arg246))
defer mbTrans247.Close()
_, err248 := mbTrans247.WriteString(arg246)
if err248 != nil {
Usage()
return
}
factory248 := thrift.NewTJSONProtocolFactory()
jsProt249 := factory248.GetProtocol(mbTrans246)
factory249 := thrift.NewTJSONProtocolFactory()
jsProt250 := factory249.GetProtocol(mbTrans247)
argvalue0 := aurora.NewJobUpdateKey()
err250 := argvalue0.Read(jsProt249)
if err250 != nil {
err251 := argvalue0.Read(jsProt250)
if err251 != nil {
Usage()
return
}
@ -528,19 +529,19 @@ func main() {
fmt.Fprintln(os.Stderr, "RollbackJobUpdate requires 2 args")
flag.Usage()
}
arg252 := flag.Arg(1)
mbTrans253 := thrift.NewTMemoryBufferLen(len(arg252))
defer mbTrans253.Close()
_, err254 := mbTrans253.WriteString(arg252)
if err254 != nil {
arg253 := flag.Arg(1)
mbTrans254 := thrift.NewTMemoryBufferLen(len(arg253))
defer mbTrans254.Close()
_, err255 := mbTrans254.WriteString(arg253)
if err255 != nil {
Usage()
return
}
factory255 := thrift.NewTJSONProtocolFactory()
jsProt256 := factory255.GetProtocol(mbTrans253)
factory256 := thrift.NewTJSONProtocolFactory()
jsProt257 := factory256.GetProtocol(mbTrans254)
argvalue0 := aurora.NewJobUpdateKey()
err257 := argvalue0.Read(jsProt256)
if err257 != nil {
err258 := argvalue0.Read(jsProt257)
if err258 != nil {
Usage()
return
}
@ -555,19 +556,19 @@ func main() {
fmt.Fprintln(os.Stderr, "PulseJobUpdate requires 1 args")
flag.Usage()
}
arg259 := flag.Arg(1)
mbTrans260 := thrift.NewTMemoryBufferLen(len(arg259))
defer mbTrans260.Close()
_, err261 := mbTrans260.WriteString(arg259)
if err261 != nil {
arg260 := flag.Arg(1)
mbTrans261 := thrift.NewTMemoryBufferLen(len(arg260))
defer mbTrans261.Close()
_, err262 := mbTrans261.WriteString(arg260)
if err262 != nil {
Usage()
return
}
factory262 := thrift.NewTJSONProtocolFactory()
jsProt263 := factory262.GetProtocol(mbTrans260)
factory263 := thrift.NewTJSONProtocolFactory()
jsProt264 := factory263.GetProtocol(mbTrans261)
argvalue0 := aurora.NewJobUpdateKey()
err264 := argvalue0.Read(jsProt263)
if err264 != nil {
err265 := argvalue0.Read(jsProt264)
if err265 != nil {
Usage()
return
}
@ -598,19 +599,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetTasksStatus requires 1 args")
flag.Usage()
}
arg266 := flag.Arg(1)
mbTrans267 := thrift.NewTMemoryBufferLen(len(arg266))
defer mbTrans267.Close()
_, err268 := mbTrans267.WriteString(arg266)
if err268 != nil {
arg267 := flag.Arg(1)
mbTrans268 := thrift.NewTMemoryBufferLen(len(arg267))
defer mbTrans268.Close()
_, err269 := mbTrans268.WriteString(arg267)
if err269 != nil {
Usage()
return
}
factory269 := thrift.NewTJSONProtocolFactory()
jsProt270 := factory269.GetProtocol(mbTrans267)
factory270 := thrift.NewTJSONProtocolFactory()
jsProt271 := factory270.GetProtocol(mbTrans268)
argvalue0 := aurora.NewTaskQuery()
err271 := argvalue0.Read(jsProt270)
if err271 != nil {
err272 := argvalue0.Read(jsProt271)
if err272 != nil {
Usage()
return
}
@ -623,19 +624,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetTasksWithoutConfigs requires 1 args")
flag.Usage()
}
arg272 := flag.Arg(1)
mbTrans273 := thrift.NewTMemoryBufferLen(len(arg272))
defer mbTrans273.Close()
_, err274 := mbTrans273.WriteString(arg272)
if err274 != nil {
arg273 := flag.Arg(1)
mbTrans274 := thrift.NewTMemoryBufferLen(len(arg273))
defer mbTrans274.Close()
_, err275 := mbTrans274.WriteString(arg273)
if err275 != nil {
Usage()
return
}
factory275 := thrift.NewTJSONProtocolFactory()
jsProt276 := factory275.GetProtocol(mbTrans273)
factory276 := thrift.NewTJSONProtocolFactory()
jsProt277 := factory276.GetProtocol(mbTrans274)
argvalue0 := aurora.NewTaskQuery()
err277 := argvalue0.Read(jsProt276)
if err277 != nil {
err278 := argvalue0.Read(jsProt277)
if err278 != nil {
Usage()
return
}
@ -648,19 +649,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetPendingReason requires 1 args")
flag.Usage()
}
arg278 := flag.Arg(1)
mbTrans279 := thrift.NewTMemoryBufferLen(len(arg278))
defer mbTrans279.Close()
_, err280 := mbTrans279.WriteString(arg278)
if err280 != nil {
arg279 := flag.Arg(1)
mbTrans280 := thrift.NewTMemoryBufferLen(len(arg279))
defer mbTrans280.Close()
_, err281 := mbTrans280.WriteString(arg279)
if err281 != nil {
Usage()
return
}
factory281 := thrift.NewTJSONProtocolFactory()
jsProt282 := factory281.GetProtocol(mbTrans279)
factory282 := thrift.NewTJSONProtocolFactory()
jsProt283 := factory282.GetProtocol(mbTrans280)
argvalue0 := aurora.NewTaskQuery()
err283 := argvalue0.Read(jsProt282)
if err283 != nil {
err284 := argvalue0.Read(jsProt283)
if err284 != nil {
Usage()
return
}
@ -673,19 +674,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetConfigSummary requires 1 args")
flag.Usage()
}
arg284 := flag.Arg(1)
mbTrans285 := thrift.NewTMemoryBufferLen(len(arg284))
defer mbTrans285.Close()
_, err286 := mbTrans285.WriteString(arg284)
if err286 != nil {
arg285 := flag.Arg(1)
mbTrans286 := thrift.NewTMemoryBufferLen(len(arg285))
defer mbTrans286.Close()
_, err287 := mbTrans286.WriteString(arg285)
if err287 != nil {
Usage()
return
}
factory287 := thrift.NewTJSONProtocolFactory()
jsProt288 := factory287.GetProtocol(mbTrans285)
factory288 := thrift.NewTJSONProtocolFactory()
jsProt289 := factory288.GetProtocol(mbTrans286)
argvalue0 := aurora.NewJobKey()
err289 := argvalue0.Read(jsProt288)
if err289 != nil {
err290 := argvalue0.Read(jsProt289)
if err290 != nil {
Usage()
return
}
@ -718,19 +719,19 @@ func main() {
fmt.Fprintln(os.Stderr, "PopulateJobConfig requires 1 args")
flag.Usage()
}
arg292 := flag.Arg(1)
mbTrans293 := thrift.NewTMemoryBufferLen(len(arg292))
defer mbTrans293.Close()
_, err294 := mbTrans293.WriteString(arg292)
if err294 != nil {
arg293 := flag.Arg(1)
mbTrans294 := thrift.NewTMemoryBufferLen(len(arg293))
defer mbTrans294.Close()
_, err295 := mbTrans294.WriteString(arg293)
if err295 != nil {
Usage()
return
}
factory295 := thrift.NewTJSONProtocolFactory()
jsProt296 := factory295.GetProtocol(mbTrans293)
factory296 := thrift.NewTJSONProtocolFactory()
jsProt297 := factory296.GetProtocol(mbTrans294)
argvalue0 := aurora.NewJobConfiguration()
err297 := argvalue0.Read(jsProt296)
if err297 != nil {
err298 := argvalue0.Read(jsProt297)
if err298 != nil {
Usage()
return
}
@ -743,19 +744,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateSummaries requires 1 args")
flag.Usage()
}
arg298 := flag.Arg(1)
mbTrans299 := thrift.NewTMemoryBufferLen(len(arg298))
defer mbTrans299.Close()
_, err300 := mbTrans299.WriteString(arg298)
if err300 != nil {
arg299 := flag.Arg(1)
mbTrans300 := thrift.NewTMemoryBufferLen(len(arg299))
defer mbTrans300.Close()
_, err301 := mbTrans300.WriteString(arg299)
if err301 != nil {
Usage()
return
}
factory301 := thrift.NewTJSONProtocolFactory()
jsProt302 := factory301.GetProtocol(mbTrans299)
factory302 := thrift.NewTJSONProtocolFactory()
jsProt303 := factory302.GetProtocol(mbTrans300)
argvalue0 := aurora.NewJobUpdateQuery()
err303 := argvalue0.Read(jsProt302)
if err303 != nil {
err304 := argvalue0.Read(jsProt303)
if err304 != nil {
Usage()
return
}
@ -768,19 +769,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateDetails requires 1 args")
flag.Usage()
}
arg304 := flag.Arg(1)
mbTrans305 := thrift.NewTMemoryBufferLen(len(arg304))
defer mbTrans305.Close()
_, err306 := mbTrans305.WriteString(arg304)
if err306 != nil {
arg305 := flag.Arg(1)
mbTrans306 := thrift.NewTMemoryBufferLen(len(arg305))
defer mbTrans306.Close()
_, err307 := mbTrans306.WriteString(arg305)
if err307 != nil {
Usage()
return
}
factory307 := thrift.NewTJSONProtocolFactory()
jsProt308 := factory307.GetProtocol(mbTrans305)
factory308 := thrift.NewTJSONProtocolFactory()
jsProt309 := factory308.GetProtocol(mbTrans306)
argvalue0 := aurora.NewJobUpdateQuery()
err309 := argvalue0.Read(jsProt308)
if err309 != nil {
err310 := argvalue0.Read(jsProt309)
if err310 != nil {
Usage()
return
}
@ -793,19 +794,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateDiff requires 1 args")
flag.Usage()
}
arg310 := flag.Arg(1)
mbTrans311 := thrift.NewTMemoryBufferLen(len(arg310))
defer mbTrans311.Close()
_, err312 := mbTrans311.WriteString(arg310)
if err312 != nil {
arg311 := flag.Arg(1)
mbTrans312 := thrift.NewTMemoryBufferLen(len(arg311))
defer mbTrans312.Close()
_, err313 := mbTrans312.WriteString(arg311)
if err313 != nil {
Usage()
return
}
factory313 := thrift.NewTJSONProtocolFactory()
jsProt314 := factory313.GetProtocol(mbTrans311)
factory314 := thrift.NewTJSONProtocolFactory()
jsProt315 := factory314.GetProtocol(mbTrans312)
argvalue0 := aurora.NewJobUpdateRequest()
err315 := argvalue0.Read(jsProt314)
if err315 != nil {
err316 := argvalue0.Read(jsProt315)
if err316 != nil {
Usage()
return
}

View file

@ -1,22 +1,23 @@
// Autogenerated by Thrift Compiler (0.12.0)
// Autogenerated by Thrift Compiler (0.13.0)
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
package main
import (
"context"
"flag"
"fmt"
"math"
"net"
"net/url"
"os"
"strconv"
"strings"
"github.com/apache/thrift/lib/go/thrift"
"apache/aurora"
"context"
"flag"
"fmt"
"math"
"net"
"net/url"
"os"
"strconv"
"strings"
"github.com/apache/thrift/lib/go/thrift"
"apache/aurora"
)
var _ = aurora.GoUnusedProtection__
func Usage() {
fmt.Fprintln(os.Stderr, "Usage of ", os.Args[0], " [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]:")
@ -179,19 +180,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetTasksStatus requires 1 args")
flag.Usage()
}
arg81 := flag.Arg(1)
mbTrans82 := thrift.NewTMemoryBufferLen(len(arg81))
defer mbTrans82.Close()
_, err83 := mbTrans82.WriteString(arg81)
if err83 != nil {
arg82 := flag.Arg(1)
mbTrans83 := thrift.NewTMemoryBufferLen(len(arg82))
defer mbTrans83.Close()
_, err84 := mbTrans83.WriteString(arg82)
if err84 != nil {
Usage()
return
}
factory84 := thrift.NewTJSONProtocolFactory()
jsProt85 := factory84.GetProtocol(mbTrans82)
factory85 := thrift.NewTJSONProtocolFactory()
jsProt86 := factory85.GetProtocol(mbTrans83)
argvalue0 := aurora.NewTaskQuery()
err86 := argvalue0.Read(jsProt85)
if err86 != nil {
err87 := argvalue0.Read(jsProt86)
if err87 != nil {
Usage()
return
}
@ -204,19 +205,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetTasksWithoutConfigs requires 1 args")
flag.Usage()
}
arg87 := flag.Arg(1)
mbTrans88 := thrift.NewTMemoryBufferLen(len(arg87))
defer mbTrans88.Close()
_, err89 := mbTrans88.WriteString(arg87)
if err89 != nil {
arg88 := flag.Arg(1)
mbTrans89 := thrift.NewTMemoryBufferLen(len(arg88))
defer mbTrans89.Close()
_, err90 := mbTrans89.WriteString(arg88)
if err90 != nil {
Usage()
return
}
factory90 := thrift.NewTJSONProtocolFactory()
jsProt91 := factory90.GetProtocol(mbTrans88)
factory91 := thrift.NewTJSONProtocolFactory()
jsProt92 := factory91.GetProtocol(mbTrans89)
argvalue0 := aurora.NewTaskQuery()
err92 := argvalue0.Read(jsProt91)
if err92 != nil {
err93 := argvalue0.Read(jsProt92)
if err93 != nil {
Usage()
return
}
@ -229,19 +230,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetPendingReason requires 1 args")
flag.Usage()
}
arg93 := flag.Arg(1)
mbTrans94 := thrift.NewTMemoryBufferLen(len(arg93))
defer mbTrans94.Close()
_, err95 := mbTrans94.WriteString(arg93)
if err95 != nil {
arg94 := flag.Arg(1)
mbTrans95 := thrift.NewTMemoryBufferLen(len(arg94))
defer mbTrans95.Close()
_, err96 := mbTrans95.WriteString(arg94)
if err96 != nil {
Usage()
return
}
factory96 := thrift.NewTJSONProtocolFactory()
jsProt97 := factory96.GetProtocol(mbTrans94)
factory97 := thrift.NewTJSONProtocolFactory()
jsProt98 := factory97.GetProtocol(mbTrans95)
argvalue0 := aurora.NewTaskQuery()
err98 := argvalue0.Read(jsProt97)
if err98 != nil {
err99 := argvalue0.Read(jsProt98)
if err99 != nil {
Usage()
return
}
@ -254,19 +255,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetConfigSummary requires 1 args")
flag.Usage()
}
arg99 := flag.Arg(1)
mbTrans100 := thrift.NewTMemoryBufferLen(len(arg99))
defer mbTrans100.Close()
_, err101 := mbTrans100.WriteString(arg99)
if err101 != nil {
arg100 := flag.Arg(1)
mbTrans101 := thrift.NewTMemoryBufferLen(len(arg100))
defer mbTrans101.Close()
_, err102 := mbTrans101.WriteString(arg100)
if err102 != nil {
Usage()
return
}
factory102 := thrift.NewTJSONProtocolFactory()
jsProt103 := factory102.GetProtocol(mbTrans100)
factory103 := thrift.NewTJSONProtocolFactory()
jsProt104 := factory103.GetProtocol(mbTrans101)
argvalue0 := aurora.NewJobKey()
err104 := argvalue0.Read(jsProt103)
if err104 != nil {
err105 := argvalue0.Read(jsProt104)
if err105 != nil {
Usage()
return
}
@ -299,19 +300,19 @@ func main() {
fmt.Fprintln(os.Stderr, "PopulateJobConfig requires 1 args")
flag.Usage()
}
arg107 := flag.Arg(1)
mbTrans108 := thrift.NewTMemoryBufferLen(len(arg107))
defer mbTrans108.Close()
_, err109 := mbTrans108.WriteString(arg107)
if err109 != nil {
arg108 := flag.Arg(1)
mbTrans109 := thrift.NewTMemoryBufferLen(len(arg108))
defer mbTrans109.Close()
_, err110 := mbTrans109.WriteString(arg108)
if err110 != nil {
Usage()
return
}
factory110 := thrift.NewTJSONProtocolFactory()
jsProt111 := factory110.GetProtocol(mbTrans108)
factory111 := thrift.NewTJSONProtocolFactory()
jsProt112 := factory111.GetProtocol(mbTrans109)
argvalue0 := aurora.NewJobConfiguration()
err112 := argvalue0.Read(jsProt111)
if err112 != nil {
err113 := argvalue0.Read(jsProt112)
if err113 != nil {
Usage()
return
}
@ -324,19 +325,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateSummaries requires 1 args")
flag.Usage()
}
arg113 := flag.Arg(1)
mbTrans114 := thrift.NewTMemoryBufferLen(len(arg113))
defer mbTrans114.Close()
_, err115 := mbTrans114.WriteString(arg113)
if err115 != nil {
arg114 := flag.Arg(1)
mbTrans115 := thrift.NewTMemoryBufferLen(len(arg114))
defer mbTrans115.Close()
_, err116 := mbTrans115.WriteString(arg114)
if err116 != nil {
Usage()
return
}
factory116 := thrift.NewTJSONProtocolFactory()
jsProt117 := factory116.GetProtocol(mbTrans114)
factory117 := thrift.NewTJSONProtocolFactory()
jsProt118 := factory117.GetProtocol(mbTrans115)
argvalue0 := aurora.NewJobUpdateQuery()
err118 := argvalue0.Read(jsProt117)
if err118 != nil {
err119 := argvalue0.Read(jsProt118)
if err119 != nil {
Usage()
return
}
@ -349,19 +350,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateDetails requires 1 args")
flag.Usage()
}
arg119 := flag.Arg(1)
mbTrans120 := thrift.NewTMemoryBufferLen(len(arg119))
defer mbTrans120.Close()
_, err121 := mbTrans120.WriteString(arg119)
if err121 != nil {
arg120 := flag.Arg(1)
mbTrans121 := thrift.NewTMemoryBufferLen(len(arg120))
defer mbTrans121.Close()
_, err122 := mbTrans121.WriteString(arg120)
if err122 != nil {
Usage()
return
}
factory122 := thrift.NewTJSONProtocolFactory()
jsProt123 := factory122.GetProtocol(mbTrans120)
factory123 := thrift.NewTJSONProtocolFactory()
jsProt124 := factory123.GetProtocol(mbTrans121)
argvalue0 := aurora.NewJobUpdateQuery()
err124 := argvalue0.Read(jsProt123)
if err124 != nil {
err125 := argvalue0.Read(jsProt124)
if err125 != nil {
Usage()
return
}
@ -374,19 +375,19 @@ func main() {
fmt.Fprintln(os.Stderr, "GetJobUpdateDiff requires 1 args")
flag.Usage()
}
arg125 := flag.Arg(1)
mbTrans126 := thrift.NewTMemoryBufferLen(len(arg125))
defer mbTrans126.Close()
_, err127 := mbTrans126.WriteString(arg125)
if err127 != nil {
arg126 := flag.Arg(1)
mbTrans127 := thrift.NewTMemoryBufferLen(len(arg126))
defer mbTrans127.Close()
_, err128 := mbTrans127.WriteString(arg126)
if err128 != nil {
Usage()
return
}
factory128 := thrift.NewTJSONProtocolFactory()
jsProt129 := factory128.GetProtocol(mbTrans126)
factory129 := thrift.NewTJSONProtocolFactory()
jsProt130 := factory129.GetProtocol(mbTrans127)
argvalue0 := aurora.NewJobUpdateRequest()
err130 := argvalue0.Read(jsProt129)
if err130 != nil {
err131 := argvalue0.Read(jsProt130)
if err131 != nil {
Usage()
return
}

View file

@ -1,6 +1,6 @@
#! /bin/bash
THRIFT_VER=0.12.0
THRIFT_VER=0.13.0
if [[ $(thrift -version | grep -e $THRIFT_VER -c) -ne 1 ]]; then
echo "Warning: This wrapper has only been tested with version" $THRIFT_VER;

2
go.mod
View file

@ -7,6 +7,6 @@ require (
github.com/stretchr/testify v1.5.0
)
replace github.com/apache/thrift v0.12.0 => github.com/ridv/thrift v0.12.2
replace github.com/apache/thrift v0.13.0 => github.com/ridv/thrift v0.13.1
go 1.13

View file

@ -31,32 +31,36 @@ type JobUpdate struct {
func NewJobUpdate() *JobUpdate {
newTask := NewTask()
req := aurora.JobUpdateRequest{}
req.TaskConfig = newTask.TaskConfig()
req.Settings = newUpdateSettings()
return &JobUpdate{task: newTask, request: &req}
return &JobUpdate{
task: newTask,
request: &aurora.JobUpdateRequest{TaskConfig: newTask.TaskConfig(), Settings: newUpdateSettings()},
}
}
// Creates an update with default values using an AuroraTask as the underlying task configuration.
// This function has a high level understanding of Aurora Tasks and thus will support copying a task that is configured
// to use Thermos.
func JobUpdateFromAuroraTask(task *AuroraTask) *JobUpdate {
newTask := task.Clone()
req := aurora.JobUpdateRequest{}
req.TaskConfig = newTask.TaskConfig()
req.Settings = newUpdateSettings()
return &JobUpdate{task: newTask, request: &req}
return &JobUpdate{
task: newTask,
request: &aurora.JobUpdateRequest{TaskConfig: newTask.TaskConfig(), Settings: newUpdateSettings()},
}
}
// JobUpdateFromConfig creates an update with default values using an aurora.TaskConfig
// primitive as the underlying task configuration.
// This function should not be used unless the implications of using a primitive value are understood.
// For example, the primitive has no concept of Thermos.
func JobUpdateFromConfig(task *aurora.TaskConfig) *JobUpdate {
// Perform a deep copy to avoid unexpected behavior
newTask := TaskFromThrift(task)
req := aurora.JobUpdateRequest{}
req.TaskConfig = newTask.TaskConfig()
req.Settings = newUpdateSettings()
return &JobUpdate{task: newTask, request: &req}
return &JobUpdate{
task: newTask,
request: &aurora.JobUpdateRequest{TaskConfig: newTask.TaskConfig(), Settings: newUpdateSettings()},
}
}
// Set instance count the job will have after the update.
@ -106,6 +110,26 @@ func (j *JobUpdate) PulseIntervalTimeout(timeout time.Duration) *JobUpdate {
j.request.Settings.BlockIfNoPulsesAfterMs = thrift.Int32Ptr(int32(timeout.Seconds() * 1000))
return j
}
func (j *JobUpdate) BatchUpdateStrategy(autoPause bool, batchSize int32) *JobUpdate {
j.request.Settings.UpdateStrategy = &aurora.JobUpdateStrategy{
BatchStrategy: &aurora.BatchJobUpdateStrategy{GroupSize: batchSize, AutopauseAfterBatch: autoPause},
}
return j
}
func (j *JobUpdate) QueueUpdateStrategy(groupSize int32) *JobUpdate {
j.request.Settings.UpdateStrategy = &aurora.JobUpdateStrategy{
QueueStrategy: &aurora.QueueJobUpdateStrategy{GroupSize: groupSize},
}
return j
}
func (j *JobUpdate) VariableBatchStrategy(autoPause bool, batchSizes ...int32) *JobUpdate {
j.request.Settings.UpdateStrategy = &aurora.JobUpdateStrategy{
VarBatchStrategy: &aurora.VariableBatchJobUpdateStrategy{GroupSizes: batchSizes, AutopauseAfterBatch: autoPause},
}
return j
}
func newUpdateSettings() *aurora.JobUpdateSettings {

View file

@ -244,3 +244,68 @@ func (c *Client) MonitorHostMaintenance(hosts []string,
}
}
}
// AutoPaused monitor is a special monitor for auto pause enabled batch updates. This monitor ensures that the update
// being monitored is capable of auto pausing and has auto pausing enabled. After verifying this information,
// the monitor watches for the job to enter the ROLL_FORWARD_PAUSED state and calculates the current batch
// the update is in using information from the update configuration.
func (c *Client) MonitorAutoPausedUpdate(key aurora.JobUpdateKey, interval, timeout time.Duration) (int, error) {
key.Job = &aurora.JobKey{
Role: key.Job.Role,
Environment: key.Job.Environment,
Name: key.Job.Name,
}
query := aurora.JobUpdateQuery{
UpdateStatuses: aurora.ACTIVE_JOB_UPDATE_STATES,
Limit: 1,
Key: &key,
}
updateDetails, err := c.JobUpdateDetails(query)
if err != nil {
return -1, errors.Wrap(err, "unable to get information about update")
}
if len(updateDetails) == 0 {
return -1, errors.Errorf("details for update could not be found")
}
updateStrategy := updateDetails[0].Update.Instructions.Settings.UpdateStrategy
var batchSizes []int32
switch {
case updateStrategy.IsSetVarBatchStrategy():
batchSizes = updateStrategy.VarBatchStrategy.GroupSizes
if !updateStrategy.VarBatchStrategy.AutopauseAfterBatch {
return -1, errors.Errorf("update does not have auto pause enabled")
}
case updateStrategy.IsSetBatchStrategy():
batchSizes = []int32{updateStrategy.BatchStrategy.GroupSize}
if !updateStrategy.BatchStrategy.AutopauseAfterBatch {
return -1, errors.Errorf("update does not have auto pause enabled")
}
default:
return -1, errors.Errorf("update is not using a batch update strategy")
}
query.UpdateStatuses = append(TerminalUpdateStates(), aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED)
summary, err := c.MonitorJobUpdateQuery(query, interval, timeout)
if err != nil {
return -1, err
}
// Summary 0 is assumed to exist because MonitorJobUpdateQuery will return an error if there is Summaries
if summary[0].State.Status != aurora.JobUpdateStatus_ROLL_FORWARD_PAUSED {
return -1, errors.Errorf("update is in a terminal state %v", summary[0].State.Status)
}
updatingInstances := make(map[int32]struct{})
for _, e := range updateDetails[0].InstanceEvents {
// We only care about INSTANCE_UPDATING actions because we only care that they've been attempted
if e != nil && e.GetAction() == aurora.JobUpdateAction_INSTANCE_UPDATING {
updatingInstances[e.GetInstanceId()] = struct{}{}
}
}
return calculateCurrentBatch(int32(len(updatingInstances)), batchSizes), nil
}

View file

@ -26,6 +26,7 @@ import (
"github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var r *realis.Client
@ -698,7 +699,6 @@ func TestRealisClient_PartitionPolicy(t *testing.T) {
Environment("prod").
Role(role).
Name("create_thermos_job_partition_policy_test").
ExecutorName(aurora.AURORA_EXECUTOR_NAME).
ThermosExecutor(thermosExec).
CPU(.5).
RAM(64).
@ -723,3 +723,103 @@ func TestRealisClient_PartitionPolicy(t *testing.T) {
}
}
func TestRealisClient_UpdateStrategies(t *testing.T) {
// Create a single job
job := realis.NewJob().
Environment("prod").
Role("vagrant").
ThermosExecutor(thermosExec).
CPU(.01).
RAM(4).
Disk(10).
InstanceCount(6).
IsService(true)
// Needed to populate the task config correctly
assert.NoError(t, job.BuildThermosPayload())
strategies := []struct {
jobUpdate *realis.JobUpdate
Name string
}{
{
jobUpdate: realis.JobUpdateFromAuroraTask(job.AuroraTask()).
QueueUpdateStrategy(2).
InstanceCount(6).
WatchTime(1000),
Name: "Queue",
},
{
jobUpdate: realis.JobUpdateFromAuroraTask(job.AuroraTask()).
BatchUpdateStrategy(false, 2).
InstanceCount(6).
WatchTime(1000),
Name: "Batch",
},
{
jobUpdate: realis.JobUpdateFromAuroraTask(job.AuroraTask()).
VariableBatchStrategy(false, 1, 2, 3).
InstanceCount(6).
WatchTime(1000),
Name: "VarBatch",
},
}
for _, strategy := range strategies {
t.Run("TestRealisClient_UpdateStrategies_"+strategy.Name, func(t *testing.T) {
strategy.jobUpdate.Name("update_strategies_" + strategy.Name)
result, err := r.StartJobUpdate(strategy.jobUpdate, "")
require.NoError(t, err)
assert.NotNil(t, result)
var ok bool
var mErr error
key := *result.GetKey()
if ok, mErr = r.MonitorJobUpdate(key, 5, 240); !ok || mErr != nil {
// Update may already be in a terminal state so don't check for error
assert.NoError(t, r.AbortJobUpdate(key, "Monitor timed out."))
}
assert.NoError(t, r.KillJob(strategy.jobUpdate.JobKey()))
})
}
}
func TestRealisClient_BatchAwareAutoPause(t *testing.T) {
// Create a single job
job := realis.NewJob().
Environment("prod").
Role("vagrant").
Name("BatchAwareAutoPauseTest").
ThermosExecutor(thermosExec).
CPU(.01).
RAM(4).
Disk(10).
InstanceCount(6).
IsService(true)
updateGroups := []int32{1, 2, 3}
strategy := realis.JobUpdateFromAuroraTask(job.AuroraTask()).
VariableBatchStrategy(true, updateGroups...).
InstanceCount(6).
WatchTime(1000)
result, err := r.StartJobUpdate(strategy, "")
require.NoError(t, err)
require.NotNil(t, result)
key := *result.GetKey()
for i := range updateGroups {
curStep, mErr := r.MonitorAutoPausedUpdate(key, time.Second*5, time.Second*240)
if mErr != nil {
// Update may already be in a terminal state so don't check for error
assert.NoError(t, r.AbortJobUpdate(key, "Monitor timed out."))
}
assert.Equal(t, i, curStep)
require.NoError(t, r.ResumeJobUpdate(key, "auto resuming test"))
}
assert.NoError(t, r.KillJob(strategy.JobKey()))
}

31
util.go
View file

@ -40,6 +40,18 @@ func init() {
}
}
// TerminalJobUpdateStates returns a slice containing all the terminal states an update may end up in.
// This is a function in order to avoid having a slice that can be accidentally mutated.
func TerminalUpdateStates() []aurora.JobUpdateStatus {
return []aurora.JobUpdateStatus{
aurora.JobUpdateStatus_ROLLED_FORWARD,
aurora.JobUpdateStatus_ROLLED_BACK,
aurora.JobUpdateStatus_ABORTED,
aurora.JobUpdateStatus_ERROR,
aurora.JobUpdateStatus_FAILED,
}
}
func validateAuroraAddress(address string) (string, error) {
// If no protocol defined, assume http
@ -73,3 +85,22 @@ func validateAuroraAddress(address string) (string, error) {
return u.String(), nil
}
func calculateCurrentBatch(updatingInstances int32, batchSizes []int32) int {
for i, size := range batchSizes {
updatingInstances -= size
if updatingInstances <= 0 {
return i
}
}
// Overflow batches
batchCount := len(batchSizes) - 1
lastBatchIndex := len(batchSizes) - 1
batchCount += int(updatingInstances / batchSizes[lastBatchIndex])
if updatingInstances%batchSizes[lastBatchIndex] != 0 {
batchCount++
}
return batchCount
}

58
util_test.go Normal file
View file

@ -0,0 +1,58 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package realis
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestCurrentBatchCalculator(t *testing.T) {
t.Run("singleBatchOverflow", func(t *testing.T) {
curBatch := calculateCurrentBatch(10, []int32{2})
assert.Equal(t, 4, curBatch)
})
t.Run("noInstancesUpdating", func(t *testing.T) {
curBatch := calculateCurrentBatch(0, []int32{2})
assert.Equal(t, 0, curBatch)
})
t.Run("evenMatchSingleBatch", func(t *testing.T) {
curBatch := calculateCurrentBatch(2, []int32{2})
assert.Equal(t, 0, curBatch)
})
t.Run("moreInstancesThanBatches", func(t *testing.T) {
curBatch := calculateCurrentBatch(5, []int32{1, 2})
assert.Equal(t, 2, curBatch)
})
t.Run("moreInstancesThanBatchesDecreasing", func(t *testing.T) {
curBatch := calculateCurrentBatch(5, []int32{2, 1})
assert.Equal(t, 3, curBatch)
})
t.Run("unevenFit", func(t *testing.T) {
curBatch := calculateCurrentBatch(2, []int32{1, 2})
assert.Equal(t, 1, curBatch)
})
t.Run("halfWay", func(t *testing.T) {
curBatch := calculateCurrentBatch(1, []int32{1, 2})
assert.Equal(t, 0, curBatch)
})
}