capacity report in terms of offers and tasks

This commit is contained in:
nhatle 2022-07-26 10:51:47 -07:00
parent 7ccb38f5e6
commit b184c4c0d2
5 changed files with 912 additions and 1 deletions

View file

@ -41,10 +41,11 @@ services:
MESOS_MASTER: zk://192.168.33.2:2181/mesos
MESOS_CONTAINERIZERS: docker,mesos
MESOS_PORT: 5051
MESOS_HOSTNAME: localhost
MESOS_HOSTNAME: agent-one
MESOS_RESOURCES: ports(*):[11000-11999]
MESOS_SYSTEMD_ENABLE_SUPPORT: 'false'
MESOS_WORK_DIR: /tmp/mesos
MESOS_ATTRIBUTES: 'host:agent-one;rack:1;zone:west'
networks:
aurora_cluster:
ipv4_address: 192.168.33.4
@ -55,6 +56,56 @@ services:
depends_on:
- zk
agent-two:
image: quay.io/aurorascheduler/mesos-agent:1.9.0
pid: host
restart: on-failure
ports:
- "5052:5051"
environment:
MESOS_MASTER: zk://192.168.33.2:2181/mesos
MESOS_CONTAINERIZERS: docker,mesos
MESOS_PORT: 5051
MESOS_HOSTNAME: agent-two
MESOS_RESOURCES: ports(*):[11000-11999]
MESOS_SYSTEMD_ENABLE_SUPPORT: 'false'
MESOS_WORK_DIR: /tmp/mesos
MESOS_ATTRIBUTES: 'host:agent-two;rack:2;zone:west'
networks:
aurora_cluster:
ipv4_address: 192.168.33.5
volumes:
- /sys/fs/cgroup:/sys/fs/cgroup
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zk
agent-three:
image: quay.io/aurorascheduler/mesos-agent:1.9.0
pid: host
restart: on-failure
ports:
- "5053:5051"
environment:
MESOS_MASTER: zk://192.168.33.2:2181/mesos
MESOS_CONTAINERIZERS: docker,mesos
MESOS_PORT: 5051
MESOS_HOSTNAME: agent-three
MESOS_RESOURCES: ports(*):[11000-11999]
MESOS_SYSTEMD_ENABLE_SUPPORT: 'false'
MESOS_WORK_DIR: /tmp/mesos
MESOS_ATTRIBUTES: 'host:agent-three;rack:2;zone:west;dedicated:vagrant/bar'
networks:
aurora_cluster:
ipv4_address: 192.168.33.6
volumes:
- /sys/fs/cgroup:/sys/fs/cgroup
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zk
aurora-one:
image: quay.io/aurorascheduler/scheduler:0.25.0
pid: host

427
offer.go Normal file
View file

@ -0,0 +1,427 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package realis
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora"
)
// Offers on [aurora-scheduler]/offers endpoint
type Offer struct {
ID struct {
Value string `json:"value"`
} `json:"id"`
FrameworkID struct {
Value string `json:"value"`
} `json:"framework_id"`
AgentID struct {
Value string `json:"value"`
} `json:"agent_id"`
Hostname string `json:"hostname"`
URL struct {
Scheme string `json:"scheme"`
Address struct {
Hostname string `json:"hostname"`
IP string `json:"ip"`
Port int `json:"port"`
} `json:"address"`
Path string `json:"path"`
Query []interface{} `json:"query"`
} `json:"url"`
Resources []struct {
Name string `json:"name"`
Type string `json:"type"`
Ranges struct {
Range []struct {
Begin int `json:"begin"`
End int `json:"end"`
} `json:"range"`
} `json:"ranges,omitempty"`
Role string `json:"role"`
Reservations []interface{} `json:"reservations"`
Scalar struct {
Value float64 `json:"value"`
} `json:"scalar,omitempty"`
} `json:"resources"`
Attributes []struct {
Name string `json:"name"`
Type string `json:"type"`
Text struct {
Value string `json:"value"`
} `json:"text"`
} `json:"attributes"`
ExecutorIds []struct {
Value string `json:"value"`
} `json:"executor_ids"`
}
// hosts on [aurora-scheduler]/maintenance endpoint
type MaintenanceList struct {
Drained []string `json:"DRAINED"`
Scheduled []string `json:"SCHEDULED"`
Draining map[string][]string `json:"DRAINING"`
}
type OfferCount map[float64]int64
type OfferGroupReport map[string]OfferCount
type OfferReport map[string]OfferGroupReport
// MaintenanceHosts list all the hosts under maintenance
func (c *Client) MaintenanceHosts() ([]string, error) {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.config.insecureSkipVerify},
}
request := &http.Client{Transport: tr}
resp, err := request.Get(fmt.Sprintf("%s/maintenance", c.GetSchedulerURL()))
if err != nil {
return nil, err
}
defer resp.Body.Close()
buf := new(bytes.Buffer)
if _, err := buf.ReadFrom(resp.Body); err != nil {
return nil, err
}
var list MaintenanceList
if err := json.Unmarshal(buf.Bytes(), &list); err != nil {
return nil, err
}
hosts := append(list.Drained, list.Scheduled...)
for drainingHost := range list.Draining {
hosts = append(hosts, drainingHost)
}
return hosts, nil
}
// Offers pulls data from /offers endpoint
func (c *Client) Offers() ([]Offer, error) {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.config.insecureSkipVerify},
}
request := &http.Client{Transport: tr}
resp, err := request.Get(fmt.Sprintf("%s/offers", c.GetSchedulerURL()))
if err != nil {
return []Offer{}, err
}
defer resp.Body.Close()
buf := new(bytes.Buffer)
if _, err := buf.ReadFrom(resp.Body); err != nil {
return nil, err
}
var offers []Offer
if err := json.Unmarshal(buf.Bytes(), &offers); err != nil {
return []Offer{}, err
}
return offers, nil
}
// AvailOfferReport returns a detailed summary of offers available for use.
// For example, 2 nodes offer 32 cpus and 10 nodes offer 1 cpus.
func (c *Client) AvailOfferReport() (OfferReport, error) {
maintHosts, err := c.MaintenanceHosts()
if err != nil {
return nil, err
}
maintHostSet := map[string]bool{}
for _, h := range maintHosts {
maintHostSet[h] = true
}
// Get a list of offers
offers, err := c.Offers()
if err != nil {
return nil, err
}
report := OfferReport{}
for _, o := range offers {
if maintHostSet[o.Hostname] {
continue
}
group := "non-dedicated"
for _, a := range o.Attributes {
if a.Name == "dedicated" {
group = a.Text.Value
}
}
if _, ok := report[group]; !ok {
report[group] = map[string]OfferCount{}
}
for _, r := range o.Resources {
if _, ok := report[group][r.Name]; !ok {
report[group][r.Name] = OfferCount{}
}
val := 0.0
switch r.Type {
case "SCALAR":
val = r.Scalar.Value
case "RANGES":
for _, pr := range r.Ranges.Range {
val += float64(pr.End - pr.Begin + 1)
}
default:
return nil, fmt.Errorf("%s is not supported", r.Type)
}
report[group][r.Name][val]++
}
}
return report, nil
}
// FitTasks computes the number tasks can be fit in a list of offer
func (c *Client) FitTasks(taskConfig *aurora.TaskConfig, offers []Offer) (int64, error) {
// count the number of tasks per limit contraint: limit.name -> limit.value -> count
limitCounts := map[string]map[string]int64{}
for _, c := range taskConfig.Constraints {
if c.Constraint.Limit != nil {
limitCounts[c.Name] = map[string]int64{}
}
}
request := ResourcesToMap(taskConfig.Resources)
// validate resource request
if len(request) == 0 {
return -1, fmt.Errorf("Resource request %v must not be empty", request)
}
isValid := false
for _, resVal := range request {
if resVal > 0 {
isValid = true
}
}
if !isValid {
return -1, fmt.Errorf("Resource request %v is not valid", request)
}
// pull the list of hosts under maintenance
maintHosts, err := c.MaintenanceHosts()
if err != nil {
return -1, err
}
maintHostSet := map[string]bool{}
for _, h := range maintHosts {
maintHostSet[h] = true
}
numTasks := int64(0)
for _, o := range offers {
// skip the hosts under maintenance
if maintHostSet[o.Hostname] {
continue
}
numTasksPerOffer := int64(-1)
for resName, resVal := range request {
// skip as we can fit a infinite number of tasks with 0 demand.
if resVal == 0 {
continue
}
avail := 0.0
for _, r := range o.Resources {
if r.Name != resName {
continue
}
switch r.Type {
case "SCALAR":
avail = r.Scalar.Value
case "RANGES":
for _, pr := range r.Ranges.Range {
avail += float64(pr.End - pr.Begin + 1)
}
default:
return -1, fmt.Errorf("%s is not supported", r.Type)
}
}
numTasksPerResource := int64(avail / resVal)
if numTasksPerResource < numTasksPerOffer || numTasksPerOffer < 0 {
numTasksPerOffer = numTasksPerResource
}
}
numTasks += fitConstraints(taskConfig, &o, limitCounts, numTasksPerOffer)
}
return numTasks, nil
}
func fitConstraints(taskConfig *aurora.TaskConfig,
offer *Offer,
limitCounts map[string]map[string]int64,
numTasksPerOffer int64) int64 {
// check dedicated attributes vs. constraints
if !isDedicated(offer, taskConfig.Constraints) {
return 0
}
limitConstraints := []*aurora.Constraint{}
for _, c := range taskConfig.Constraints {
// look for corresponding attribute
attFound := false
for _, a := range offer.Attributes {
if a.Name == c.Name {
attFound = true
}
}
// constraint not found in offer's attributes
if !attFound {
return 0
}
if c.Constraint.Value != nil && !valueConstraint(offer, c) {
// value constraint is not satisfied
return 0
} else if c.Constraint.Limit != nil {
limitConstraints = append(limitConstraints, c)
limit := limitConstraint(offer, c, limitCounts)
if numTasksPerOffer > limit && limit >= 0 {
numTasksPerOffer = limit
}
}
}
// update limitCounts
for _, c := range limitConstraints {
for _, a := range offer.Attributes {
if a.Name == c.Name {
limitCounts[a.Name][a.Text.Value] += numTasksPerOffer
}
}
}
return numTasksPerOffer
}
func isDedicated(offer *Offer, constraints []*aurora.Constraint) bool {
// get all dedicated attributes of an offer
dedicatedAtts := map[string]bool{}
for _, a := range offer.Attributes {
if a.Name == "dedicated" {
dedicatedAtts[a.Text.Value] = true
}
}
if len(dedicatedAtts) == 0 {
return true
}
// check if constraints are matching dedicated attributes
matched := false
for _, c := range constraints {
if c.Name == "dedicated" && c.Constraint.Value != nil {
found := false
for _, v := range c.Constraint.Value.Values {
if dedicatedAtts[v] {
found = true
break
}
}
if found {
matched = true
} else {
return false
}
}
}
return matched
}
func valueConstraint(offer *Offer, constraint *aurora.Constraint) bool {
matched := false
for _, a := range offer.Attributes {
if a.Name == constraint.Name {
for _, v := range constraint.Constraint.Value.Values {
matched = (a.Text.Value == v && !constraint.Constraint.Value.Negated) ||
(a.Text.Value != v && constraint.Constraint.Value.Negated)
if matched {
break
}
}
if matched {
break
}
}
}
return matched
}
func limitConstraint(offer *Offer, constraint *aurora.Constraint, limitCounts map[string]map[string]int64) int64 {
limit := int64(-1)
for _, a := range offer.Attributes {
// limit constraint found
if a.Name == constraint.Name {
curr := limitCounts[a.Name][a.Text.Value]
currLimit := int64(constraint.Constraint.Limit.Limit)
if curr >= currLimit {
return 0
}
if currLimit-curr < limit || limit < 0 {
limit = currLimit - curr
}
}
}
return limit
}

View file

@ -147,6 +147,8 @@ func NewClient(options ...ClientOption) (*Client, error) {
return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required")
}
config.url = url
url, err = validateAuroraAddress(url)
if err != nil {
return nil, errors.Wrap(err, "unable to create realis object, invalid url")
@ -841,3 +843,7 @@ func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) erro
}
return nil
}
func (c *Client) GetSchedulerURL() string {
return c.config.url
}

View file

@ -767,6 +767,8 @@ func TestRealisClient_PartitionPolicy(t *testing.T) {
assert.NoError(t, err)
}
err = r.KillJob(job.JobKey())
assert.NoError(t, err)
}
func TestRealisClient_UpdateStrategies(t *testing.T) {
@ -925,3 +927,408 @@ func TestRealisClient_GetJobSummary(t *testing.T) {
err = r.KillJob(job.JobKey())
assert.NoError(t, err)
}
func TestRealisClient_Offers(t *testing.T) {
var offers []realis.Offer
// since offers are being recycled, it take a few tries to get all of them.
i := 0
for ; len(offers) < 3 && i < 5; i++ {
offers, _ = r.Offers()
time.Sleep(5 * time.Second)
}
assert.NotEqual(t, i, 5)
}
func TestRealisClient_MaintenanceHosts(t *testing.T) {
offers, err := r.Offers()
assert.NoError(t, err)
for i := 0; i < len(offers); i++ {
_, err := r.DrainHosts(offers[i].Hostname)
assert.NoError(t, err)
hosts, err := r.MaintenanceHosts()
assert.Equal(t, i+1, len(hosts))
}
// clean up
for i := 0; i < len(offers); i++ {
_, err := r.EndMaintenance(offers[i].Hostname)
assert.NoError(t, err)
// Monitor change to DRAINING and DRAINED mode
_, err = r.MonitorHostMaintenance(
[]string{offers[i].Hostname},
[]aurora.MaintenanceMode{aurora.MaintenanceMode_NONE},
5*time.Second,
10*time.Second)
assert.NoError(t, err)
}
}
func TestRealisClient_AvailOfferReport(t *testing.T) {
var offers []realis.Offer
i := 0
for ; len(offers) < 3 && i < 5; i++ {
offers, _ = r.Offers()
time.Sleep(5 * time.Second)
}
assert.NotEqual(t, i, 3)
capacity, err := r.AvailOfferReport()
assert.NoError(t, err)
// 2 groups for non-dedicated & dedicated
assert.Equal(t, 2, len(capacity))
// 4 resources: cpus, disk, mem, ports
assert.Equal(t, 4, len(capacity["non-dedicated"]))
}
func TestRealisClient_FitTasks(t *testing.T) {
var offers []realis.Offer
i := 0
for ; len(offers) < 3 && i < 5; i++ {
offers, _ = r.Offers()
time.Sleep(5 * time.Second)
}
assert.NotEqual(t, i, 5)
cpuPerOffer := 0.0
for _, r := range offers[0].Resources {
if r.Name == "cpus" {
cpuPerOffer = r.Scalar.Value
}
}
validCpu := cpuPerOffer / 5
inValidCpu := cpuPerOffer + 1
gpu := int64(1)
tests := []struct {
message string
request aurora.Resource
constraints []*aurora.Constraint
expected int64
isError bool
}{
{
message: "task with gpu request",
request: aurora.Resource{
NumGpus: &gpu,
},
expected: 0,
isError: false,
},
{
message: "empty resource request",
request: aurora.Resource{},
expected: -1,
isError: true,
},
{
message: "valid resource request",
request: aurora.Resource{
NumCpus: &validCpu,
},
expected: 10,
isError: false,
},
{
message: "invalid cpu request",
request: aurora.Resource{
NumCpus: &inValidCpu,
},
expected: 0,
isError: false,
},
{
message: "dedicated constraint",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "dedicated",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: false,
Values: []string{"vagrant/bar"},
},
},
},
},
expected: 5,
isError: false,
},
{
message: "value constraint on zone",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "zone",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: false,
Values: []string{"west"},
},
},
},
},
expected: 10,
isError: false,
},
{
message: "negative value constraint on zone",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "zone",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: true,
Values: []string{"west"},
},
},
},
},
expected: 0,
isError: false,
},
{
message: "negative value constraint on host",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "host",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: true,
Values: []string{"agent-one"},
},
},
},
},
expected: 5,
isError: false,
},
{
message: "value constraint on unavailable zone",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "zone",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: false,
Values: []string{"east"},
},
},
},
},
expected: 0,
isError: false,
},
{
message: "value constraint on unavailable attribute",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "os",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: false,
Values: []string{"windows"},
},
},
},
},
expected: 0,
isError: false,
},
{
message: "1 value constraint with 2 values",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "host",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: false,
Values: []string{"agent-one", "agent-two"},
},
},
},
},
expected: 10,
isError: false,
},
{
message: "2 value constraints",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "host",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: false,
Values: []string{"agent-one"},
},
},
},
{
Name: "rack",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: false,
Values: []string{"2"},
},
},
},
},
expected: 0,
isError: false,
},
{
message: "limit constraint on host",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "host",
Constraint: &aurora.TaskConstraint{
Limit: &aurora.LimitConstraint{
Limit: 1,
},
},
},
},
expected: 2,
isError: false,
},
{
message: "limit constraint on zone",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "zone",
Constraint: &aurora.TaskConstraint{
Limit: &aurora.LimitConstraint{
Limit: 1,
},
},
},
},
expected: 1,
isError: false,
},
{
message: "limit constraint on zone & host",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "host",
Constraint: &aurora.TaskConstraint{
Limit: &aurora.LimitConstraint{
Limit: 1,
},
},
},
{
Name: "zone",
Constraint: &aurora.TaskConstraint{
Limit: &aurora.LimitConstraint{
Limit: 1,
},
},
},
},
expected: 1,
isError: false,
},
{
message: "limit constraint on unavailable zone",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "gpu-host", // no host has gpu-host attribute
Constraint: &aurora.TaskConstraint{
Limit: &aurora.LimitConstraint{
Limit: 1,
},
},
},
},
expected: 0,
isError: false,
},
{
message: "limit & dedicated constraint",
request: aurora.Resource{
NumCpus: &validCpu,
},
constraints: []*aurora.Constraint{
{
Name: "dedicated",
Constraint: &aurora.TaskConstraint{
Value: &aurora.ValueConstraint{
Negated: false,
Values: []string{"vagrant/bar"},
},
},
},
{
Name: "host",
Constraint: &aurora.TaskConstraint{
Limit: &aurora.LimitConstraint{
Limit: 1,
},
},
},
},
expected: 1,
isError: false,
},
}
for _, tc := range tests {
task := aurora.NewTaskConfig()
task.Resources = []*aurora.Resource{&tc.request}
task.Constraints = tc.constraints
numTasks, err := r.FitTasks(task, offers)
if !tc.isError {
assert.NoError(t, err)
assert.Equal(t, tc.expected, numTasks, tc.message)
} else {
assert.Error(t, err)
}
}
}

20
util.go
View file

@ -104,3 +104,23 @@ func calculateCurrentBatch(updatingInstances int32, batchSizes []int32) int {
}
return batchCount
}
func ResourcesToMap(resources []*aurora.Resource) map[string]float64 {
result := map[string]float64{}
for _, resource := range resources {
if resource.NumCpus != nil {
result["cpus"] += *resource.NumCpus
} else if resource.RamMb != nil {
result["mem"] += float64(*resource.RamMb)
} else if resource.DiskMb != nil {
result["disk"] += float64(*resource.DiskMb)
} else if resource.NamedPort != nil {
result["ports"]++
} else if resource.NumGpus != nil {
result["gpus"] += float64(*resource.NumGpus)
}
}
return result
}