move cfg to loggers + refactor + log fn wrappers

1. Instead of maintaining a global config, each specialized logger
now stores its config.
2. Refactored logInterface to elektronLogger.
3. Refactored loggerImpl to baseElektronLogger to be consistent with
the rest of the code base.
4. Wrapped elektronLogger#Log(...) and elektronLogf(...) so that we
do not have to use the instance of elektronLogger everytime we want
to log. Instead, we just do logging.Log(...) or logging.Logf(...).
5. Wrapped elektronLogger#WithFields(...) and
elektronLogger#WithField(...).
6. Refactored codebase to adhere to the changes.
This commit is contained in:
Pradyumna Kaushik 2019-12-05 21:32:37 -05:00
parent 5a6d1bed4a
commit 6fb0e4a3fe
20 changed files with 396 additions and 250 deletions

View file

@ -250,30 +250,29 @@ func (s *BaseScheduler) StatusUpdate(driver sched.SchedulerDriver, status *mesos
func (s *BaseScheduler) LogTaskStarting(ts *def.Task, offer *mesos.Offer) {
if ts == nil {
elekLog.ElektronLogger.WithFields(log.Fields{"host": fmt.Sprintf("%s", offer.GetHostname())}).Log(elekLogTypes.CONSOLE, log.InfoLevel, "TASKS STARTING...")
elekLog.WithFields(log.Fields{"host": fmt.Sprintf("%s", offer.GetHostname())}).Log(elekLogTypes.CONSOLE, log.InfoLevel, "TASKS STARTING...")
} else {
elekLog.ElektronLogger.WithFields(log.Fields{"task": fmt.Sprintf("%s", ts.Name),
"Instance": fmt.Sprintf("%d", *ts.Instances), "host": fmt.Sprintf("%s", offer.GetHostname())}).Log(elekLogTypes.CONSOLE,
log.InfoLevel, "TASK STARTING... ")
elekLog.WithFields(log.Fields{"task": fmt.Sprintf("%s", ts.Name), "Instance": fmt.Sprintf("%d", *ts.Instances),
"host": fmt.Sprintf("%s", offer.GetHostname())}).Log(elekLogTypes.CONSOLE, log.InfoLevel, "TASK STARTING... ")
}
}
func (s *BaseScheduler) LogTaskWattsConsideration(ts def.Task, host string, wattsToConsider float64) {
elekLog.ElektronLogger.WithFields(log.Fields{"task": ts.Name, "host": host, "Watts": fmt.Sprintf("%f", wattsToConsider)}).Log(elekLogTypes.CONSOLE, log.InfoLevel, "Watts considered for ")
elekLog.WithFields(log.Fields{"task": ts.Name, "host": host, "Watts": fmt.Sprintf("%f", wattsToConsider)}).Log(elekLogTypes.CONSOLE, log.InfoLevel, "Watts considered for ")
}
func (s *BaseScheduler) LogOffersReceived(offers []*mesos.Offer) {
elekLog.ElektronLogger.WithFields(log.Fields{"Resource offers received": fmt.Sprintf("%d", len(offers))}).Log(elekLogTypes.CONSOLE,
elekLog.WithFields(log.Fields{"Resource offers received": fmt.Sprintf("%d", len(offers))}).Log(elekLogTypes.CONSOLE,
log.InfoLevel, "")
}
func (s *BaseScheduler) LogNoPendingTasksDeclineOffers(offer *mesos.Offer) {
elekLog.ElektronLogger.WithFields(log.Fields{"DECLINING OFFER for host": fmt.Sprintf("%s", offer.GetHostname())}).Log(elekLogTypes.CONSOLE,
elekLog.WithFields(log.Fields{"DECLINING OFFER for host": fmt.Sprintf("%s", offer.GetHostname())}).Log(elekLogTypes.CONSOLE,
log.WarnLevel, "No tasks left to schedule ")
}
func (s *BaseScheduler) LogNumberOfRunningTasks() {
elekLog.ElektronLogger.WithFields(log.Fields{"Number of tasks still Running": fmt.Sprintf("%d", s.tasksRunning)}).Log(elekLogTypes.CONSOLE,
elekLog.WithFields(log.Fields{"Number of tasks still Running": fmt.Sprintf("%d", s.tasksRunning)}).Log(elekLogTypes.CONSOLE,
log.InfoLevel, "")
}
@ -284,67 +283,67 @@ func (s *BaseScheduler) LogCoLocatedTasks(slaveID string) {
buffer.WriteString(fmt.Sprintln(taskName))
}
s.TasksRunningMutex.Unlock()
elekLog.ElektronLogger.WithFields(log.Fields{"Colocated with": fmt.Sprintf("%s", buffer.String())}).Log(elekLogTypes.CONSOLE,
elekLog.WithFields(log.Fields{"Colocated with": fmt.Sprintf("%s", buffer.String())}).Log(elekLogTypes.CONSOLE,
log.InfoLevel, "")
}
func (s *BaseScheduler) LogSchedTrace(taskToSchedule *mesos.TaskInfo, offer *mesos.Offer) {
elekLog.ElektronLogger.WithFields(log.Fields{offer.GetHostname(): fmt.Sprintf("%s", taskToSchedule.GetTaskId().GetValue())}).Log(elekLogTypes.SCHED_TRACE, log.InfoLevel, "")
elekLog.WithFields(log.Fields{offer.GetHostname(): fmt.Sprintf("%s", taskToSchedule.GetTaskId().GetValue())}).Log(elekLogTypes.SCHED_TRACE, log.InfoLevel, "")
}
func (s *BaseScheduler) LogTerminateScheduler() {
elekLog.ElektronLogger.Log(elekLogTypes.CONSOLE, log.InfoLevel, "Done scheduling all tasks!")
elekLog.Log(elekLogTypes.CONSOLE, log.InfoLevel, "Done scheduling all tasks!")
}
func (s *BaseScheduler) LogInsufficientResourcesDeclineOffer(offer *mesos.Offer,
offerResources ...interface{}) {
buffer := bytes.Buffer{}
buffer.WriteString(fmt.Sprintf("<CPU: %f, RAM: %f, Watts: %f>", offerResources...))
elekLog.ElektronLogger.WithFields(log.Fields{"Offer Resources": fmt.Sprintf("%s", buffer.String())}).Log(elekLogTypes.CONSOLE,
elekLog.WithFields(log.Fields{"Offer Resources": fmt.Sprintf("%s", buffer.String())}).Log(elekLogTypes.CONSOLE,
log.WarnLevel, "DECLINING OFFER... Offer has insufficient resources to launch a task")
}
func (s *BaseScheduler) LogOfferRescinded(offerID *mesos.OfferID) {
elekLog.ElektronLogger.WithFields(log.Fields{"OfferID": fmt.Sprintf("%s", offerID)}).Log(elekLogTypes.CONSOLE,
elekLog.WithFields(log.Fields{"OfferID": fmt.Sprintf("%s", offerID)}).Log(elekLogTypes.CONSOLE,
log.ErrorLevel, "OFFER RESCINDED")
}
func (s *BaseScheduler) LogSlaveLost(slaveID *mesos.SlaveID) {
elekLog.ElektronLogger.WithFields(log.Fields{"SlaveID": fmt.Sprintf("%s", slaveID)}).Log(elekLogTypes.CONSOLE,
elekLog.WithFields(log.Fields{"SlaveID": fmt.Sprintf("%s", slaveID)}).Log(elekLogTypes.CONSOLE,
log.ErrorLevel, "SLAVE LOST")
}
func (s *BaseScheduler) LogExecutorLost(executorID *mesos.ExecutorID, slaveID *mesos.SlaveID) {
elekLog.ElektronLogger.WithFields(log.Fields{"ExecutorID": fmt.Sprintf("%s", executorID), "SlaveID": fmt.Sprintf("%s", slaveID)}).Log(elekLogTypes.CONSOLE, log.ErrorLevel, "EXECUTOR LOST")
elekLog.WithFields(log.Fields{"ExecutorID": fmt.Sprintf("%s", executorID), "SlaveID": fmt.Sprintf("%s", slaveID)}).Log(elekLogTypes.CONSOLE, log.ErrorLevel, "EXECUTOR LOST")
}
func (s *BaseScheduler) LogFrameworkMessage(executorID *mesos.ExecutorID,
slaveID *mesos.SlaveID, message string) {
elekLog.ElektronLogger.WithFields(log.Fields{"Received Framework message from executor": executorID}).Log(elekLogTypes.CONSOLE,
elekLog.WithFields(log.Fields{"Received Framework message from executor": executorID}).Log(elekLogTypes.CONSOLE,
log.InfoLevel, message)
}
func (s *BaseScheduler) LogMesosError(err string) {
elekLog.ElektronLogger.WithFields(log.Fields{"MESOS CONSOLE": fmt.Sprintf("%v", err)}).Log(elekLogTypes.CONSOLE,
elekLog.WithFields(log.Fields{"MESOS CONSOLE": fmt.Sprintf("%v", err)}).Log(elekLogTypes.CONSOLE,
log.ErrorLevel, "")
}
func (s *BaseScheduler) LogElectronError(err error) {
elekLog.ElektronLogger.WithFields(log.Fields{"ELECTRON CONSOLE": fmt.Sprintf("%v", err)}).Log(elekLogTypes.CONSOLE, log.ErrorLevel, "")
elekLog.WithFields(log.Fields{"ELECTRON CONSOLE": fmt.Sprintf("%v", err)}).Log(elekLogTypes.CONSOLE, log.ErrorLevel, "")
}
func (s *BaseScheduler) LogFrameworkRegistered(frameworkID *mesos.FrameworkID,
masterInfo *mesos.MasterInfo) {
elekLog.ElektronLogger.WithFields(log.Fields{"frameworkID": fmt.Sprintf("%s", frameworkID), "master": fmt.Sprintf("%v", masterInfo)}).Log(elekLogTypes.CONSOLE, log.InfoLevel, "FRAMEWORK REGISTERED!")
elekLog.WithFields(log.Fields{"frameworkID": fmt.Sprintf("%s", frameworkID), "master": fmt.Sprintf("%v", masterInfo)}).Log(elekLogTypes.CONSOLE, log.InfoLevel, "FRAMEWORK REGISTERED!")
}
func (s *BaseScheduler) LogFrameworkReregistered(masterInfo *mesos.MasterInfo) {
elekLog.ElektronLogger.WithFields(log.Fields{"master": fmt.Sprintf("%v", masterInfo)}).Log(elekLogTypes.CONSOLE,
elekLog.WithFields(log.Fields{"master": fmt.Sprintf("%v", masterInfo)}).Log(elekLogTypes.CONSOLE,
log.InfoLevel, "Framework re-registered")
}
func (s *BaseScheduler) LogDisconnected() {
elekLog.ElektronLogger.Log(elekLogTypes.CONSOLE, log.WarnLevel, "Framework disconnected with master")
elekLog.Log(elekLogTypes.CONSOLE, log.WarnLevel, "Framework disconnected with master")
}
func (s *BaseScheduler) LogTaskStatusUpdate(status *mesos.TaskStatus) {
@ -356,12 +355,12 @@ func (s *BaseScheduler) LogTaskStatusUpdate(status *mesos.TaskStatus) {
default:
level = log.InfoLevel
}
elekLog.ElektronLogger.WithFields(log.Fields{"task": fmt.Sprintf("%s", *status.TaskId.Value), "state": NameFor(status.State)}).Log(elekLogTypes.CONSOLE, level, "Task Status received")
elekLog.WithFields(log.Fields{"task": fmt.Sprintf("%s", *status.TaskId.Value), "state": NameFor(status.State)}).Log(elekLogTypes.CONSOLE, level, "Task Status received")
}
func (s *BaseScheduler) LogSchedPolicySwitch(name string, nextPolicy SchedPolicyState) {
logSPS := func() {
elekLog.ElektronLogger.WithFields(log.Fields{"Name": name}).Log(elekLogTypes.SPS, log.InfoLevel, "")
elekLog.WithFields(log.Fields{"Name": name}).Log(elekLogTypes.SPS, log.InfoLevel, "")
}
if s.hasReceivedResourceOffers && (s.curSchedPolicy != nextPolicy) {
logSPS()
@ -370,10 +369,10 @@ func (s *BaseScheduler) LogSchedPolicySwitch(name string, nextPolicy SchedPolicy
}
// Logging the size of the scheduling window and the scheduling policy
// that is going to schedule the tasks in the scheduling window.
elekLog.ElektronLogger.WithFields(log.Fields{"Window size": fmt.Sprintf("%d", s.schedWindowSize), "Name": name}).Log(elekLogTypes.SCHED_WINDOW, log.InfoLevel, "")
elekLog.WithFields(log.Fields{"Window size": fmt.Sprintf("%d", s.schedWindowSize), "Name": name}).Log(elekLogTypes.SCHED_WINDOW, log.InfoLevel, "")
}
func (s *BaseScheduler) LogClsfnAndTaskDistOverhead(overhead time.Duration) {
// Logging the overhead in microseconds.
elekLog.ElektronLogger.WithFields(log.Fields{"Overhead in microseconds": fmt.Sprintf("%f", float64(overhead.Nanoseconds())/1000.0)}).Log(elekLogTypes.CLSFN_TASKDISTR_OVERHEAD, log.InfoLevel, "")
elekLog.WithFields(log.Fields{"Overhead in microseconds": fmt.Sprintf("%f", float64(overhead.Nanoseconds())/1000.0)}).Log(elekLogTypes.CLSFN_TASKDISTR_OVERHEAD, log.InfoLevel, "")
}

View file

@ -36,10 +36,10 @@ import (
func coLocated(tasks map[string]bool, s BaseScheduler) {
for _, task := range tasks {
elekLog.ElektronLogger.WithFields(log.Fields{"Task": task}).Log(elekLogTypes.CONSOLE, log.InfoLevel, "")
elekLog.WithFields(log.Fields{"Task": task}).Log(elekLogTypes.CONSOLE, log.InfoLevel, "")
}
elekLog.ElektronLogger.Log(elekLogTypes.CONSOLE, log.InfoLevel, "---------------------")
elekLog.Log(elekLogTypes.CONSOLE, log.InfoLevel, "---------------------")
}
// Get the powerClass of the given hostname.

View file

@ -90,7 +90,7 @@ func switchTaskDistBased(baseSchedRef *BaseScheduler) string {
// Determine the distribution of tasks in the new scheduling window.
taskDist, err := def.GetTaskDistributionInWindow(baseSchedRef.schedWindowSize, baseSchedRef.tasks)
baseSchedRef.LogClsfnAndTaskDistOverhead(time.Now().Sub(startTime))
elekLog.ElektronLogger.WithFields(log.Fields{"Task Distribution": fmt.Sprintf("%f", taskDist)}).Log(elekLogTypes.CONSOLE,
elekLog.WithFields(log.Fields{"Task Distribution": fmt.Sprintf("%f", taskDist)}).Log(elekLogTypes.CONSOLE,
log.InfoLevel, "Switching... ")
if err != nil {
// All the tasks in the window were only classified into 1 cluster.