/**
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package realis

import (
	"encoding/json"
	"math"
	"strconv"
	"strings"
	"time"

	"github.com/pkg/errors"
	"github.com/samuel/go-zookeeper/zk"
)

type Endpoint struct {
	Host string `json:"host"`
	Port int    `json:"port"`
}

type ServiceInstance struct {
	Service             Endpoint            `json:"serviceEndpoint"`
	AdditionalEndpoints map[string]Endpoint `json:"additionalEndpoints"`
	Status              string              `json:"status"`
}

type MesosAddress struct {
	Hostname string `json:"hostname"`
	IP       string `json:"ip"`
	Port     uint16 `json:"port"`
}

// MesosInstance is defined for mesos json stored in ZK.
type MesosInstance struct {
	Address MesosAddress `json:"address"`
	Version string       `json:"version"`
}

type zkConfig struct {
	endpoints []string
	path      string
	backoff   Backoff
	timeout   time.Duration
	logger    Logger
}

type ZKOpt func(z *zkConfig)

func ZKEndpoints(endpoints ...string) ZKOpt {
	return func(z *zkConfig) {
		z.endpoints = endpoints
	}
}

func ZKPath(path string) ZKOpt {
	return func(z *zkConfig) {
		z.path = path
	}
}

func ZKBackoff(b Backoff) ZKOpt {
	return func(z *zkConfig) {
		z.backoff = b
	}
}

func ZKTimeout(d time.Duration) ZKOpt {
	return func(z *zkConfig) {
		z.timeout = d
	}
}

func ZKLogger(l Logger) ZKOpt {
	return func(z *zkConfig) {
		z.logger = l
	}
}

// Retrieves current Aurora leader from ZK.
func LeaderFromZK(cluster Cluster) (string, error) {
	return LeaderFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.SchedZKPath))
}

// Retrieves current Aurora leader from ZK with a custom configuration.
func LeaderFromZKOpts(options ...ZKOpt) (string, error) {
	var leaderURL string

	// Load the default configuration for Zookeeper followed by overriding values with those provided by the caller.
	config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}}
	for _, opt := range options {
		opt(config)
	}

	if len(config.endpoints) == 0 {
		return "", errors.New("no Zookeeper endpoints supplied")
	}

	if config.path == "" {
		return "", errors.New("no Zookeeper path supplied")
	}

	// Create a closure that allows us to use the ExponentialBackoff function.
	retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) {

		c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) })
		if err != nil {
			return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper"))
		}

		defer c.Close()

		// Open up descriptor for the ZK path given
		children, _, _, err := c.ChildrenW(config.path)
		if err != nil {

			// Sentinel error check as there is no other way to check.
			if err == zk.ErrInvalidPath {
				return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path)
			}

			return false,
				NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path))
		}

		// Search for the leader through all the children in the given path
		serviceInst := new(ServiceInstance)
		for _, child := range children {

			// Only the leader will start with member_
			if strings.HasPrefix(child, "member_") {

				childPath := config.path + "/" + child
				data, _, err := c.Get(childPath)
				if err != nil {
					if err == zk.ErrInvalidPath {
						return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath)
					}

					return false, NewTemporaryError(errors.Wrap(err, "error fetching contents of leader"))
				}

				err = json.Unmarshal([]byte(data), serviceInst)
				if err != nil {
					return false,
						NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader"))
				}

				// Should only be one endpoint.
				// This should never be encountered as it would indicate Aurora
				// writing bad info into Zookeeper but is kept here as a safety net.
				if len(serviceInst.AdditionalEndpoints) > 1 {
					return false,
						NewTemporaryError(
							errors.New("ambiguous endpoints in json blob, Aurora wrote bad info to ZK"))
				}

				var scheme, host, port string
				for k, v := range serviceInst.AdditionalEndpoints {
					scheme = k
					host = v.Host
					port = strconv.Itoa(v.Port)
				}

				leaderURL = scheme + "://" + host + ":" + port
				return true, nil
			}
		}

		// Leader data might not be available yet, try to fetch again.
		return false, NewTemporaryError(errors.New("no leader found"))
	})

	if retryErr != nil {
		config.logger.Printf("Failed to determine leader after %v attempts", config.backoff.Steps)
		return "", retryErr
	}

	return leaderURL, nil
}

// Retrieves current mesos leader from ZK with a custom configuration.
func MesosFromZKOpts(options ...ZKOpt) (string, error) {
	var mesosURL string

	// Load the default configuration for Zookeeper followed by overriding values with those provided by the caller.
	config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}}
	for _, opt := range options {
		opt(config)
	}

	if len(config.endpoints) == 0 {
		return "", errors.New("no Zookeeper endpoints supplied")
	}

	if config.path == "" {
		return "", errors.New("no Zookeeper path supplied")
	}

	// Create a closure that allows us to use the ExponentialBackoff function.
	retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) {

		c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) })
		if err != nil {
			return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper"))
		}

		defer c.Close()

		// Open up descriptor for the ZK path given
		children, _, _, err := c.ChildrenW(config.path)
		if err != nil {

			// Sentinel error check as there is no other way to check.
			if err == zk.ErrInvalidPath {
				return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path)
			}

			return false,
				NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path))
		}

		// Search for the leader through all the children in the given path
		minScore := math.MaxInt64
		var mesosInstance MesosInstance
		for _, child := range children {
			// Only the leader will start with json.info_
			if strings.HasPrefix(child, "json.info_") {
				strs := strings.Split(child, "_")
				if len(strs) < 2 {
					config.logger.Printf("Zk node %v/%v's name is malformed.", config.path, child)
					continue
				}
				score, err := strconv.Atoi(strs[1])
				if err != nil {
					return false, NewTemporaryError(errors.Wrap(err, "unable to read the zk node for Mesos."))
				}

				// get the leader from the child with the smallest score.
				if score < minScore {
					minScore = score
					childPath := config.path + "/" + child
					data, _, err := c.Get(childPath)
					if err != nil {
						if err == zk.ErrInvalidPath {
							return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath)
						}

						return false, NewTemporaryError(errors.Wrap(err, "error fetching contents of leader"))
					}

					err = json.Unmarshal([]byte(data), &mesosInstance)
					if err != nil {
						config.logger.Printf("%s", err)
						return false,
							NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader"))
					}

					mesosURL = mesosInstance.Address.IP + ":" + strconv.Itoa(int(mesosInstance.Address.Port))
				}
			}
		}
		if len(mesosURL) > 0 {
			return true, nil
		}

		// Leader data might not be available yet, try to fetch again.
		return false, NewTemporaryError(errors.New("no leader found"))
	})

	if retryErr != nil {
		config.logger.Printf("Failed to determine leader after %v attempts", config.backoff.Steps)
		return "", retryErr
	}

	return mesosURL, nil
}

// Retrieves current Aurora master nodes from ZK.
func MasterNodesFromZK(cluster Cluster) (map[string][]string, error) {
	return MasterNodesFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.SchedZKPath))
}

// Retrieves current Mesos master nodes/leader from ZK with a custom configuration.
func MasterNodesFromZKOpts(options ...ZKOpt) (map[string][]string, error) {
	result := make(map[string][]string)

	// Load the default configuration for Zookeeper followed by overriding values with those provided by the caller.
	config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}}
	for _, opt := range options {
		opt(config)
	}

	if len(config.endpoints) == 0 {
		return nil, errors.New("no Zookeeper endpoints supplied")
	}

	if config.path == "" {
		return nil, errors.New("no Zookeeper path supplied")
	}

	// Create a closure that allows us to use the ExponentialBackoff function.
	retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) {

		c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) })
		if err != nil {
			return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper"))
		}

		defer c.Close()

		// Open up descriptor for the ZK path given
		children, _, _, err := c.ChildrenW(config.path)
		if err != nil {

			// Sentinel error check as there is no other way to check.
			if err == zk.ErrInvalidPath {
				return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path)
			}

			return false,
				NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path))
		}

		// Get all the master nodes through all the children in the given path
		serviceInst := new(ServiceInstance)
		var hosts []string
		for _, child := range children {
			childPath := config.path + "/" + child
			data, _, err := c.Get(childPath)
			if err != nil {
				if err == zk.ErrInvalidPath {
					return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath)
				}

				return false, NewTemporaryError(errors.Wrap(err, "error fetching contents of leader"))
			}
			// Only leader is in json format. Have to parse data differently between member_ and not member_
			if strings.HasPrefix(child, "member_") {
				err = json.Unmarshal([]byte(data), &serviceInst)
				if err != nil {
					return false,
						NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader"))
				}
				// Should only be one endpoint.
				// This should never be encountered as it would indicate Aurora
				// writing bad info into Zookeeper but is kept here as a safety net.
				if len(serviceInst.AdditionalEndpoints) > 1 {
					return false,
						NewTemporaryError(
							errors.New("ambiguous endpoints in json blob, Aurora wrote bad info to ZK"))
				}

				for _, v := range serviceInst.AdditionalEndpoints {
					result["leader"] = append(result["leader"], v.Host)
				}
			} else {
				// data is not in a json format
				hosts = append(hosts, string(data))
			}
		}
		result["masterNodes"] = hosts

		// Master nodes data might not be available yet, try to fetch again.
		if len(result["masterNodes"]) == 0 {
			return false, NewTemporaryError(errors.New("no master nodes found"))
		}
		return true, nil
	})

	if retryErr != nil {
		config.logger.Printf("Failed to get master nodes after %v attempts", config.backoff.Steps)
		return nil, retryErr
	}

	return result, nil
}

// Retrieves current Mesos Aurora master nodes from ZK.
func MesosMasterNodesFromZK(cluster Cluster) (map[string][]string, error) {
	return MesosMasterNodesFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.MesosZKPath))
}

// Retrieves current mesos master nodes/leader from ZK with a custom configuration.
func MesosMasterNodesFromZKOpts(options ...ZKOpt) (map[string][]string, error) {
	result := make(map[string][]string)

	// Load the default configuration for Zookeeper followed by overriding values with those provided by the caller.]
	config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}}
	for _, opt := range options {
		opt(config)
	}

	if len(config.endpoints) == 0 {
		return nil, errors.New("no Zookeeper endpoints supplied")
	}

	if config.path == "" {
		return nil, errors.New("no Zookeeper path supplied")
	}

	// Create a closure that allows us to use the ExponentialBackoff function.
	retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) {

		c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) })
		if err != nil {
			return false, NewTemporaryError(errors.Wrap(err, "Failed to connect to Zookeeper"))
		}

		defer c.Close()

		// Open up descriptor for the ZK path given
		children, _, _, err := c.ChildrenW(config.path)
		if err != nil {

			// Sentinel error check as there is no other way to check.
			if err == zk.ErrInvalidPath {
				return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path)
			}

			return false,
				NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path))
		}

		// Get all the master nodes through all the children in the given path
		minScore := math.MaxInt64
		var mesosInstance MesosInstance
		var hosts []string
		for _, child := range children {
			// Only the master nodes will start with json.info_
			if strings.HasPrefix(child, "json.info_") {
				strs := strings.Split(child, "_")
				if len(strs) < 2 {
					config.logger.Printf("Zk node %v/%v's name is malformed.", config.path, child)
					continue
				}
				score, err := strconv.Atoi(strs[1])
				if err != nil {
					return false, NewTemporaryError(errors.Wrap(err, "unable to read the zk node for Mesos."))
				}

				childPath := config.path + "/" + child
				data, _, err := c.Get(childPath)
				if err != nil {
					if err == zk.ErrInvalidPath {
						return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath)
					}

					return false, NewTemporaryError(errors.Wrap(err, "error fetching contents of leader"))
				}

				err = json.Unmarshal([]byte(data), &mesosInstance)
				if err != nil {
					config.logger.Printf("%s", err)
					return false,
						NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader"))
				}
				// Combine all master nodes into comma-separated
				// Return hostname instead of ip to be consistent with aurora master nodes
				hosts = append(hosts, mesosInstance.Address.Hostname)
				// get the leader from the child with the smallest score.
				if score < minScore {
					minScore = score
					result["leader"] = append(result["leader"], mesosInstance.Address.Hostname)
				}
			}
		}
		result["masterNodes"] = hosts
		// Master nodes data might not be available yet, try to fetch again.
		if len(result["masterNodes"]) == 0 {
			return false, NewTemporaryError(errors.New("no mesos master nodes found"))
		}
		return true, nil
	})

	if retryErr != nil {
		config.logger.Printf("Failed to get mesos master nodes after %v attempts", config.backoff.Steps)
		return nil, retryErr
	}

	return result, nil
}