This repository has been archived on 2024-04-10. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.
elektron/utilities/trackResourceUsage.go
Bhargavi Hanumant Alandikar 3543960689 Elektron Logging library (#16)
switch to logrus for logging.

replaced old logging library with a wrapper around logrus.
We now just need to use the exported Log(...) and Logf(...) from the logging/
package that wraps around a set of loggers constituting a chain (following COR).
Loggers are configured using a YAML file that specifies the following.
1. enabled/disabled
2. whether the message should be logged on console.
3. filename extension.
4. minimum log level.

Retrofitted source code to now use the updated logging library.
Updated the documentation with information regarding the specification
of the log config file.

Currently, the log format in the config file is not adhered to. This is going to be
addressed in a future commit.
2019-12-09 20:15:33 -05:00

180 lines
5.5 KiB
Go

// Copyright (C) 2018 spdfg
//
// This file is part of Elektron.
//
// Elektron is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Elektron is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Elektron. If not, see <http://www.gnu.org/licenses/>.
//
package utilities
import (
"errors"
"sync"
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
"github.com/spdfg/elektron/def"
"github.com/spdfg/elektron/utilities/offerUtils"
)
type TrackResourceUsage struct {
perHostResourceAvailability map[string]*ResourceCount
sync.Mutex
}
// Maintain information regarding the usage of the cluster resources.
// This information is maintained for each node in the cluster.
type ResourceCount struct {
// Total resources available.
TotalCPU float64
TotalRAM float64
TotalWatts float64
// Resources currently unused.
UnusedCPU float64
UnusedRAM float64
UnusedWatts float64
}
// Increment unused resources.
func (rc *ResourceCount) IncrUnusedResources(tr def.TaskResources) {
rc.UnusedCPU += tr.CPU
rc.UnusedRAM += tr.Ram
rc.UnusedWatts += tr.Watts
}
// Decrement unused resources.
func (rc *ResourceCount) DecrUnusedResources(tr def.TaskResources) {
rc.UnusedCPU -= tr.CPU
rc.UnusedRAM -= tr.Ram
rc.UnusedWatts -= tr.Watts
}
var truInstance *TrackResourceUsage
func getTRUInstance() *TrackResourceUsage {
if truInstance == nil {
truInstance = newResourceUsageTracker()
}
return truInstance
}
func newResourceUsageTracker() *TrackResourceUsage {
return &TrackResourceUsage{
perHostResourceAvailability: make(map[string]*ResourceCount),
}
}
// Determine the total available resources from the first round of mesos resource offers.
func RecordTotalResourceAvailability(offers []*mesos.Offer) {
tru := getTRUInstance()
tru.Lock()
defer tru.Unlock()
for _, offer := range offers {
// If first offer received from Mesos Agent.
if _, ok := tru.perHostResourceAvailability[*offer.SlaveId.Value]; !ok {
cpu, mem, watts := offerUtils.OfferAgg(offer)
tru.perHostResourceAvailability[*offer.SlaveId.Value] = &ResourceCount{
TotalCPU: cpu,
TotalRAM: mem,
TotalWatts: watts,
// Initially, all resources are used.
UnusedCPU: cpu,
UnusedRAM: mem,
UnusedWatts: watts,
}
}
}
}
// Resource availability update scenarios.
var resourceAvailabilityUpdateScenario = map[string]func(mesos.TaskID, mesos.SlaveID) error{
"ON_TASK_TERMINAL_STATE": func(taskID mesos.TaskID, slaveID mesos.SlaveID) error {
tru := getTRUInstance()
tru.Lock()
defer tru.Unlock()
if taskResources, err := def.GetResourceRequirement(*taskID.Value); err != nil {
return err
} else {
// Checking if first resource offer already recorded for slaveID.
if resCount, ok := tru.perHostResourceAvailability[*slaveID.Value]; ok {
resCount.IncrUnusedResources(taskResources)
} else {
// Shouldn't be here.
// First round of mesos resource offers not recorded.
return errors.New("Recource Availability not recorded for " + *slaveID.Value)
}
return nil
}
},
"ON_TASK_ACTIVE_STATE": func(taskID mesos.TaskID, slaveID mesos.SlaveID) error {
tru := getTRUInstance()
tru.Lock()
defer tru.Unlock()
if taskResources, err := def.GetResourceRequirement(*taskID.Value); err != nil {
return err
} else {
// Checking if first resource offer already recorded for slaveID.
if resCount, ok := tru.perHostResourceAvailability[*slaveID.Value]; ok {
resCount.DecrUnusedResources(taskResources)
} else {
// Shouldn't be here.
// First round of mesos resource offers not recorded.
return errors.New("Resource Availability not recorded for " + *slaveID.Value)
}
return nil
}
},
}
// Updating cluster resource availability based on the given scenario.
func ResourceAvailabilityUpdate(scenario string, taskID mesos.TaskID, slaveID mesos.SlaveID) error {
if updateFunc, ok := resourceAvailabilityUpdateScenario[scenario]; ok {
// Applying the update function
updateFunc(taskID, slaveID)
return nil
} else {
// Incorrect scenario specified.
return errors.New("Incorrect scenario specified for resource availability update: " + scenario)
}
}
// Retrieve clusterwide resource availability.
func GetClusterwideResourceAvailability() ResourceCount {
tru := getTRUInstance()
tru.Lock()
defer tru.Unlock()
clusterwideResourceCount := ResourceCount{}
for _, resCount := range tru.perHostResourceAvailability {
// Aggregating the total CPU, RAM and Watts.
clusterwideResourceCount.TotalCPU += resCount.TotalCPU
clusterwideResourceCount.TotalRAM += resCount.TotalRAM
clusterwideResourceCount.TotalWatts += resCount.TotalWatts
// Aggregating the total unused CPU, RAM and Watts.
clusterwideResourceCount.UnusedCPU += resCount.UnusedCPU
clusterwideResourceCount.UnusedRAM += resCount.UnusedRAM
clusterwideResourceCount.UnusedWatts += resCount.UnusedWatts
}
return clusterwideResourceCount
}
// Retrieve resource availability for each host in the cluster.
func GetPerHostResourceAvailability() map[string]*ResourceCount {
tru := getTRUInstance()
tru.Lock()
defer tru.Unlock()
return tru.perHostResourceAvailability
}