Capacity report (#18)
- pull capacity report via /offers endpoint. - calculate how many tasks (with resource and constraints) can be fit in the cluster. examples of using the above 2 features are in aurora-scheduler/australis#33
This commit is contained in:
parent
5d0998647a
commit
4258634ccf
5 changed files with 1012 additions and 10 deletions
434
offer.go
Normal file
434
offer.go
Normal file
|
@ -0,0 +1,434 @@
|
|||
/**
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package realis
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora"
|
||||
)
|
||||
|
||||
// Offers on [aurora-scheduler]/offers endpoint
|
||||
type Offer struct {
|
||||
ID struct {
|
||||
Value string `json:"value"`
|
||||
} `json:"id"`
|
||||
FrameworkID struct {
|
||||
Value string `json:"value"`
|
||||
} `json:"framework_id"`
|
||||
AgentID struct {
|
||||
Value string `json:"value"`
|
||||
} `json:"agent_id"`
|
||||
Hostname string `json:"hostname"`
|
||||
URL struct {
|
||||
Scheme string `json:"scheme"`
|
||||
Address struct {
|
||||
Hostname string `json:"hostname"`
|
||||
IP string `json:"ip"`
|
||||
Port int `json:"port"`
|
||||
} `json:"address"`
|
||||
Path string `json:"path"`
|
||||
Query []interface{} `json:"query"`
|
||||
} `json:"url"`
|
||||
Resources []struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Ranges struct {
|
||||
Range []struct {
|
||||
Begin int `json:"begin"`
|
||||
End int `json:"end"`
|
||||
} `json:"range"`
|
||||
} `json:"ranges,omitempty"`
|
||||
Role string `json:"role"`
|
||||
Reservations []interface{} `json:"reservations"`
|
||||
Scalar struct {
|
||||
Value float64 `json:"value"`
|
||||
} `json:"scalar,omitempty"`
|
||||
} `json:"resources"`
|
||||
Attributes []struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Text struct {
|
||||
Value string `json:"value"`
|
||||
} `json:"text"`
|
||||
} `json:"attributes"`
|
||||
ExecutorIds []struct {
|
||||
Value string `json:"value"`
|
||||
} `json:"executor_ids"`
|
||||
}
|
||||
|
||||
// hosts on [aurora-scheduler]/maintenance endpoint
|
||||
type MaintenanceList struct {
|
||||
Drained []string `json:"DRAINED"`
|
||||
Scheduled []string `json:"SCHEDULED"`
|
||||
Draining map[string][]string `json:"DRAINING"`
|
||||
}
|
||||
|
||||
type OfferCount map[float64]int64
|
||||
type OfferGroupReport map[string]OfferCount
|
||||
type OfferReport map[string]OfferGroupReport
|
||||
|
||||
// MaintenanceHosts list all the hosts under maintenance
|
||||
func (c *Client) MaintenanceHosts() ([]string, error) {
|
||||
tr := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.config.insecureSkipVerify},
|
||||
}
|
||||
|
||||
request := &http.Client{Transport: tr}
|
||||
|
||||
resp, err := request.Get(fmt.Sprintf("%s/maintenance", c.GetSchedulerURL()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
if _, err := buf.ReadFrom(resp.Body); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var list MaintenanceList
|
||||
|
||||
if err := json.Unmarshal(buf.Bytes(), &list); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hosts := append(list.Drained, list.Scheduled...)
|
||||
|
||||
for drainingHost := range list.Draining {
|
||||
hosts = append(hosts, drainingHost)
|
||||
}
|
||||
|
||||
return hosts, nil
|
||||
}
|
||||
|
||||
// Offers pulls data from /offers endpoint
|
||||
func (c *Client) Offers() ([]Offer, error) {
|
||||
tr := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.config.insecureSkipVerify},
|
||||
}
|
||||
|
||||
request := &http.Client{Transport: tr}
|
||||
|
||||
resp, err := request.Get(fmt.Sprintf("%s/offers", c.GetSchedulerURL()))
|
||||
if err != nil {
|
||||
return []Offer{}, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
if _, err := buf.ReadFrom(resp.Body); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var offers []Offer
|
||||
|
||||
if err := json.Unmarshal(buf.Bytes(), &offers); err != nil {
|
||||
return []Offer{}, err
|
||||
}
|
||||
|
||||
return offers, nil
|
||||
}
|
||||
|
||||
// AvailOfferReport returns a detailed summary of offers available for use.
|
||||
// For example, 2 nodes offer 32 cpus and 10 nodes offer 1 cpus.
|
||||
func (c *Client) AvailOfferReport() (OfferReport, error) {
|
||||
maintHosts, err := c.MaintenanceHosts()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
maintHostSet := map[string]bool{}
|
||||
for _, h := range maintHosts {
|
||||
maintHostSet[h] = true
|
||||
}
|
||||
|
||||
// Get a list of offers
|
||||
offers, err := c.Offers()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
report := OfferReport{}
|
||||
|
||||
for _, o := range offers {
|
||||
if maintHostSet[o.Hostname] {
|
||||
continue
|
||||
}
|
||||
|
||||
group := "non-dedicated"
|
||||
for _, a := range o.Attributes {
|
||||
if a.Name == "dedicated" {
|
||||
group = a.Text.Value
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if _, ok := report[group]; !ok {
|
||||
report[group] = map[string]OfferCount{}
|
||||
}
|
||||
|
||||
for _, r := range o.Resources {
|
||||
|
||||
if _, ok := report[group][r.Name]; !ok {
|
||||
report[group][r.Name] = OfferCount{}
|
||||
}
|
||||
|
||||
val := 0.0
|
||||
switch r.Type {
|
||||
case "SCALAR":
|
||||
val = r.Scalar.Value
|
||||
case "RANGES":
|
||||
for _, pr := range r.Ranges.Range {
|
||||
val += float64(pr.End - pr.Begin + 1)
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("%s is not supported", r.Type)
|
||||
}
|
||||
|
||||
report[group][r.Name][val]++
|
||||
}
|
||||
}
|
||||
|
||||
return report, nil
|
||||
}
|
||||
|
||||
// FitTasks computes the number tasks can be fit in a list of offer
|
||||
func (c *Client) FitTasks(taskConfig *aurora.TaskConfig, offers []Offer) (int64, error) {
|
||||
// count the number of tasks per limit contraint: limit.name -> limit.value -> count
|
||||
limitCounts := map[string]map[string]int64{}
|
||||
for _, c := range taskConfig.Constraints {
|
||||
if c.Constraint.Limit != nil {
|
||||
limitCounts[c.Name] = map[string]int64{}
|
||||
}
|
||||
}
|
||||
|
||||
request := ResourcesToMap(taskConfig.Resources)
|
||||
|
||||
// validate resource request
|
||||
if len(request) == 0 {
|
||||
return -1, fmt.Errorf("Resource request %v must not be empty", request)
|
||||
}
|
||||
|
||||
isValid := false
|
||||
for _, resVal := range request {
|
||||
if resVal > 0 {
|
||||
isValid = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !isValid {
|
||||
return -1, fmt.Errorf("Resource request %v is not valid", request)
|
||||
}
|
||||
|
||||
// pull the list of hosts under maintenance
|
||||
maintHosts, err := c.MaintenanceHosts()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
maintHostSet := map[string]bool{}
|
||||
for _, h := range maintHosts {
|
||||
maintHostSet[h] = true
|
||||
}
|
||||
|
||||
numTasks := int64(0)
|
||||
|
||||
for _, o := range offers {
|
||||
// skip the hosts under maintenance
|
||||
if maintHostSet[o.Hostname] {
|
||||
continue
|
||||
}
|
||||
|
||||
numTasksPerOffer := int64(-1)
|
||||
|
||||
for resName, resVal := range request {
|
||||
// skip as we can fit a infinite number of tasks with 0 demand.
|
||||
if resVal == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
avail := 0.0
|
||||
for _, r := range o.Resources {
|
||||
if r.Name != resName {
|
||||
continue
|
||||
}
|
||||
|
||||
switch r.Type {
|
||||
case "SCALAR":
|
||||
avail = r.Scalar.Value
|
||||
case "RANGES":
|
||||
for _, pr := range r.Ranges.Range {
|
||||
avail += float64(pr.End - pr.Begin + 1)
|
||||
}
|
||||
default:
|
||||
return -1, fmt.Errorf("%s is not supported", r.Type)
|
||||
}
|
||||
}
|
||||
|
||||
numTasksPerResource := int64(avail / resVal)
|
||||
|
||||
if numTasksPerResource < numTasksPerOffer || numTasksPerOffer < 0 {
|
||||
numTasksPerOffer = numTasksPerResource
|
||||
}
|
||||
}
|
||||
|
||||
numTasks += fitConstraints(taskConfig, &o, limitCounts, numTasksPerOffer)
|
||||
}
|
||||
|
||||
return numTasks, nil
|
||||
}
|
||||
|
||||
func fitConstraints(taskConfig *aurora.TaskConfig,
|
||||
offer *Offer,
|
||||
limitCounts map[string]map[string]int64,
|
||||
numTasksPerOffer int64) int64 {
|
||||
|
||||
// check dedicated attributes vs. constraints
|
||||
if !isDedicated(offer, taskConfig.Job.Role, taskConfig.Constraints) {
|
||||
return 0
|
||||
}
|
||||
|
||||
limitConstraints := []*aurora.Constraint{}
|
||||
|
||||
for _, c := range taskConfig.Constraints {
|
||||
// look for corresponding attribute
|
||||
attFound := false
|
||||
for _, a := range offer.Attributes {
|
||||
if a.Name == c.Name {
|
||||
attFound = true
|
||||
}
|
||||
}
|
||||
|
||||
// constraint not found in offer's attributes
|
||||
if !attFound {
|
||||
return 0
|
||||
}
|
||||
|
||||
if c.Constraint.Value != nil && !valueConstraint(offer, c) {
|
||||
// value constraint is not satisfied
|
||||
return 0
|
||||
} else if c.Constraint.Limit != nil {
|
||||
limitConstraints = append(limitConstraints, c)
|
||||
limit := limitConstraint(offer, c, limitCounts)
|
||||
|
||||
if numTasksPerOffer > limit && limit >= 0 {
|
||||
numTasksPerOffer = limit
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// update limitCounts
|
||||
for _, c := range limitConstraints {
|
||||
for _, a := range offer.Attributes {
|
||||
if a.Name == c.Name {
|
||||
limitCounts[a.Name][a.Text.Value] += numTasksPerOffer
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return numTasksPerOffer
|
||||
}
|
||||
|
||||
func isDedicated(offer *Offer, role string, constraints []*aurora.Constraint) bool {
|
||||
// get all dedicated attributes of an offer
|
||||
dedicatedAtts := map[string]bool{}
|
||||
for _, a := range offer.Attributes {
|
||||
if a.Name == "dedicated" {
|
||||
dedicatedAtts[a.Text.Value] = true
|
||||
}
|
||||
}
|
||||
|
||||
if len(dedicatedAtts) == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
// check if constraints are matching dedicated attributes
|
||||
matched := false
|
||||
for _, c := range constraints {
|
||||
if c.Name == "dedicated" && c.Constraint.Value != nil {
|
||||
found := false
|
||||
|
||||
for _, v := range c.Constraint.Value.Values {
|
||||
if dedicatedAtts[v] && strings.HasPrefix(v, fmt.Sprintf("%s/", role)) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if found {
|
||||
matched = true
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return matched
|
||||
}
|
||||
|
||||
// valueConstraint checks Value Contraints of task if the are matched by the offer.
|
||||
// more details can be found here https://aurora.apache.org/documentation/latest/features/constraints/
|
||||
func valueConstraint(offer *Offer, constraint *aurora.Constraint) bool {
|
||||
matched := false
|
||||
|
||||
for _, a := range offer.Attributes {
|
||||
if a.Name == constraint.Name {
|
||||
for _, v := range constraint.Constraint.Value.Values {
|
||||
matched = (a.Text.Value == v && !constraint.Constraint.Value.Negated) ||
|
||||
(a.Text.Value != v && constraint.Constraint.Value.Negated)
|
||||
|
||||
if matched {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if matched {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return matched
|
||||
}
|
||||
|
||||
// limitConstraint limits the number of pods on a group which has the same attribute.
|
||||
// more details can be found here https://aurora.apache.org/documentation/latest/features/constraints/
|
||||
func limitConstraint(offer *Offer, constraint *aurora.Constraint, limitCounts map[string]map[string]int64) int64 {
|
||||
limit := int64(-1)
|
||||
for _, a := range offer.Attributes {
|
||||
// limit constraint found
|
||||
if a.Name == constraint.Name {
|
||||
curr := limitCounts[a.Name][a.Text.Value]
|
||||
currLimit := int64(constraint.Constraint.Limit.Limit)
|
||||
|
||||
if curr >= currLimit {
|
||||
return 0
|
||||
}
|
||||
|
||||
if currLimit-curr < limit || limit < 0 {
|
||||
limit = currLimit - curr
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return limit
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue