/**
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package realis

import (
	"bytes"
	"crypto/tls"
	"encoding/json"
	"fmt"
	"net/http"
	"strings"

	"github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora"
)

// Offers on [aurora-scheduler]/offers endpoint
type Offer struct {
	ID struct {
		Value string `json:"value"`
	} `json:"id"`
	FrameworkID struct {
		Value string `json:"value"`
	} `json:"framework_id"`
	AgentID struct {
		Value string `json:"value"`
	} `json:"agent_id"`
	Hostname string `json:"hostname"`
	URL      struct {
		Scheme  string `json:"scheme"`
		Address struct {
			Hostname string `json:"hostname"`
			IP       string `json:"ip"`
			Port     int    `json:"port"`
		} `json:"address"`
		Path  string        `json:"path"`
		Query []interface{} `json:"query"`
	} `json:"url"`
	Resources []struct {
		Name   string `json:"name"`
		Type   string `json:"type"`
		Ranges struct {
			Range []struct {
				Begin int `json:"begin"`
				End   int `json:"end"`
			} `json:"range"`
		} `json:"ranges,omitempty"`
		Role         string        `json:"role"`
		Reservations []interface{} `json:"reservations"`
		Scalar       struct {
			Value float64 `json:"value"`
		} `json:"scalar,omitempty"`
	} `json:"resources"`
	Attributes []struct {
		Name string `json:"name"`
		Type string `json:"type"`
		Text struct {
			Value string `json:"value"`
		} `json:"text"`
	} `json:"attributes"`
	ExecutorIds []struct {
		Value string `json:"value"`
	} `json:"executor_ids"`
}

// hosts on [aurora-scheduler]/maintenance endpoint
type MaintenanceList struct {
	Drained   []string            `json:"DRAINED"`
	Scheduled []string            `json:"SCHEDULED"`
	Draining  map[string][]string `json:"DRAINING"`
}

type OfferCount map[float64]int64
type OfferGroupReport map[string]OfferCount
type OfferReport map[string]OfferGroupReport

// MaintenanceHosts list all the hosts under maintenance
func (c *Client) MaintenanceHosts() ([]string, error) {
	tr := &http.Transport{
		TLSClientConfig: &tls.Config{InsecureSkipVerify: c.config.insecureSkipVerify},
	}

	request := &http.Client{Transport: tr}

	resp, err := request.Get(fmt.Sprintf("%s/maintenance", c.GetSchedulerURL()))
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()

	buf := new(bytes.Buffer)
	if _, err := buf.ReadFrom(resp.Body); err != nil {
		return nil, err
	}

	var list MaintenanceList

	if err := json.Unmarshal(buf.Bytes(), &list); err != nil {
		return nil, err
	}

	hosts := append(list.Drained, list.Scheduled...)

	for drainingHost := range list.Draining {
		hosts = append(hosts, drainingHost)
	}

	return hosts, nil
}

// Offers pulls data from /offers endpoint
func (c *Client) Offers() ([]Offer, error) {
	tr := &http.Transport{
		TLSClientConfig: &tls.Config{InsecureSkipVerify: c.config.insecureSkipVerify},
	}

	request := &http.Client{Transport: tr}

	resp, err := request.Get(fmt.Sprintf("%s/offers", c.GetSchedulerURL()))
	if err != nil {
		return []Offer{}, err
	}
	defer resp.Body.Close()

	buf := new(bytes.Buffer)
	if _, err := buf.ReadFrom(resp.Body); err != nil {
		return nil, err
	}

	var offers []Offer

	if err := json.Unmarshal(buf.Bytes(), &offers); err != nil {
		return []Offer{}, err
	}

	return offers, nil
}

// AvailOfferReport returns a detailed summary of offers available for use.
// For example, 2 nodes offer 32 cpus and 10 nodes offer 1 cpus.
func (c *Client) AvailOfferReport() (OfferReport, error) {
	maintHosts, err := c.MaintenanceHosts()
	if err != nil {
		return nil, err
	}

	maintHostSet := map[string]bool{}
	for _, h := range maintHosts {
		maintHostSet[h] = true
	}

	// Get a list of offers
	offers, err := c.Offers()
	if err != nil {
		return nil, err
	}

	report := OfferReport{}

	for _, o := range offers {
		if maintHostSet[o.Hostname] {
			continue
		}

		group := "non-dedicated"
		for _, a := range o.Attributes {
			if a.Name == "dedicated" {
				group = a.Text.Value
				break
			}
		}

		if _, ok := report[group]; !ok {
			report[group] = map[string]OfferCount{}
		}

		for _, r := range o.Resources {

			if _, ok := report[group][r.Name]; !ok {
				report[group][r.Name] = OfferCount{}
			}

			val := 0.0
			switch r.Type {
			case "SCALAR":
				val = r.Scalar.Value
			case "RANGES":
				for _, pr := range r.Ranges.Range {
					val += float64(pr.End - pr.Begin + 1)
				}
			default:
				return nil, fmt.Errorf("%s is not supported", r.Type)
			}

			report[group][r.Name][val]++
		}
	}

	return report, nil
}

// FitTasks computes the number tasks can be fit in a list of offer
func (c *Client) FitTasks(taskConfig *aurora.TaskConfig, offers []Offer) (int64, error) {
	// count the number of tasks per limit contraint: limit.name -> limit.value -> count
	limitCounts := map[string]map[string]int64{}
	for _, c := range taskConfig.Constraints {
		if c.Constraint.Limit != nil {
			limitCounts[c.Name] = map[string]int64{}
		}
	}

	request := ResourcesToMap(taskConfig.Resources)

	// validate resource request
	if len(request) == 0 {
		return -1, fmt.Errorf("Resource request %v must not be empty", request)
	}

	isValid := false
	for _, resVal := range request {
		if resVal > 0 {
			isValid = true
			break
		}
	}

	if !isValid {
		return -1, fmt.Errorf("Resource request %v is not valid", request)
	}

	// pull the list of hosts under maintenance
	maintHosts, err := c.MaintenanceHosts()
	if err != nil {
		return -1, err
	}

	maintHostSet := map[string]bool{}
	for _, h := range maintHosts {
		maintHostSet[h] = true
	}

	numTasks := int64(0)

	for _, o := range offers {
		// skip the hosts under maintenance
		if maintHostSet[o.Hostname] {
			continue
		}

		numTasksPerOffer := int64(-1)

		for resName, resVal := range request {
			// skip as we can fit a infinite number of tasks with 0 demand.
			if resVal == 0 {
				continue
			}

			avail := 0.0
			for _, r := range o.Resources {
				if r.Name != resName {
					continue
				}

				switch r.Type {
				case "SCALAR":
					avail = r.Scalar.Value
				case "RANGES":
					for _, pr := range r.Ranges.Range {
						avail += float64(pr.End - pr.Begin + 1)
					}
				default:
					return -1, fmt.Errorf("%s is not supported", r.Type)
				}
			}

			numTasksPerResource := int64(avail / resVal)

			if numTasksPerResource < numTasksPerOffer || numTasksPerOffer < 0 {
				numTasksPerOffer = numTasksPerResource
			}
		}

		numTasks += fitConstraints(taskConfig, &o, limitCounts, numTasksPerOffer)
	}

	return numTasks, nil
}

func fitConstraints(taskConfig *aurora.TaskConfig,
	offer *Offer,
	limitCounts map[string]map[string]int64,
	numTasksPerOffer int64) int64 {

	// check dedicated attributes vs. constraints
	if !isDedicated(offer, taskConfig.Job.Role, taskConfig.Constraints) {
		return 0
	}

	limitConstraints := []*aurora.Constraint{}

	for _, c := range taskConfig.Constraints {
		// look for corresponding attribute
		attFound := false
		for _, a := range offer.Attributes {
			if a.Name == c.Name {
				attFound = true
			}
		}

		// constraint not found in offer's attributes
		if !attFound {
			return 0
		}

		if c.Constraint.Value != nil && !valueConstraint(offer, c) {
			// value constraint is not satisfied
			return 0
		} else if c.Constraint.Limit != nil {
			limitConstraints = append(limitConstraints, c)
			limit := limitConstraint(offer, c, limitCounts)

			if numTasksPerOffer > limit && limit >= 0 {
				numTasksPerOffer = limit
			}
		}
	}

	// update limitCounts
	for _, c := range limitConstraints {
		for _, a := range offer.Attributes {
			if a.Name == c.Name {
				limitCounts[a.Name][a.Text.Value] += numTasksPerOffer
			}
		}
	}

	return numTasksPerOffer
}

func isDedicated(offer *Offer, role string, constraints []*aurora.Constraint) bool {
	// get all dedicated attributes of an offer
	dedicatedAtts := map[string]bool{}
	for _, a := range offer.Attributes {
		if a.Name == "dedicated" {
			dedicatedAtts[a.Text.Value] = true
		}
	}

	if len(dedicatedAtts) == 0 {
		return true
	}

	// check if constraints are matching dedicated attributes
	matched := false
	for _, c := range constraints {
		if c.Name == "dedicated" && c.Constraint.Value != nil {
			found := false

			for _, v := range c.Constraint.Value.Values {
				if dedicatedAtts[v] && strings.HasPrefix(v, fmt.Sprintf("%s/", role)) {
					found = true
					break
				}
			}

			if found {
				matched = true
			} else {
				return false
			}
		}
	}

	return matched
}

// valueConstraint checks Value Contraints of task if the are matched by the offer.
// more details can be found here https://aurora.apache.org/documentation/latest/features/constraints/
func valueConstraint(offer *Offer, constraint *aurora.Constraint) bool {
	matched := false

	for _, a := range offer.Attributes {
		if a.Name == constraint.Name {
			for _, v := range constraint.Constraint.Value.Values {
				matched = (a.Text.Value == v && !constraint.Constraint.Value.Negated) ||
					(a.Text.Value != v && constraint.Constraint.Value.Negated)

				if matched {
					break
				}
			}

			if matched {
				break
			}
		}
	}

	return matched
}

// limitConstraint limits the number of pods on a group which has the same attribute.
// more details can be found here https://aurora.apache.org/documentation/latest/features/constraints/
func limitConstraint(offer *Offer, constraint *aurora.Constraint, limitCounts map[string]map[string]int64) int64 {
	limit := int64(-1)
	for _, a := range offer.Attributes {
		// limit constraint found
		if a.Name == constraint.Name {
			curr := limitCounts[a.Name][a.Text.Value]
			currLimit := int64(constraint.Constraint.Limit.Limit)

			if curr >= currLimit {
				return 0
			}

			if currLimit-curr < limit || limit < 0 {
				limit = currLimit - curr
			}
		}
	}

	return limit
}