/**
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package realis

import (
	"encoding/json"
	"strconv"
	"strings"
	"time"

	"github.com/pkg/errors"
	"github.com/samuel/go-zookeeper/zk"
)

type endpoint struct {
	Host string `json:"host"`
	Port int    `json:"port"`
}

type serviceInstance struct {
	Service             endpoint            `json:"serviceEndpoint"`
	AdditionalEndpoints map[string]endpoint `json:"additionalEndpoints"`
	Status              string              `json:"status"`
}

type zkConfig struct {
	endpoints []string
	path      string
	backoff   Backoff
	timeout   time.Duration
	logger    logger
}

// ZKOpt - Configuration option for the Zookeeper client used.
type ZKOpt func(z *zkConfig)

// ZKEndpoints - Endpoints on which a Zookeeper instance is running to be used by the client.
func ZKEndpoints(endpoints ...string) ZKOpt {
	return func(z *zkConfig) {
		z.endpoints = endpoints
	}
}

// ZKPath - Path to look for information in when connected to Zookeeper.
func ZKPath(path string) ZKOpt {
	return func(z *zkConfig) {
		z.path = path
	}
}

// ZKBackoff - Configuration for Retry mechanism used when connecting to Zookeeper.
// TODO(rdelvalle): Determine if this is really necessary as the ZK library already has a retry built in.
func ZKBackoff(b Backoff) ZKOpt {
	return func(z *zkConfig) {
		z.backoff = b
	}
}

// ZKTimeout - How long to wait on a response from the Zookeeper instance before considering it dead.
func ZKTimeout(d time.Duration) ZKOpt {
	return func(z *zkConfig) {
		z.timeout = d
	}
}

// ZKLogger - Attach a logger to the Zookeeper client in order to debug issues.
func ZKLogger(l logger) ZKOpt {
	return func(z *zkConfig) {
		z.logger = l
	}
}

// LeaderFromZK - Retrieves current Aurora leader from ZK.
func LeaderFromZK(cluster Cluster) (string, error) {
	return LeaderFromZKOpts(ZKEndpoints(strings.Split(cluster.ZK, ",")...), ZKPath(cluster.SchedZKPath))
}

// LeaderFromZKOpts - Retrieves current Aurora leader from ZK with a custom configuration.
func LeaderFromZKOpts(options ...ZKOpt) (string, error) {
	var leaderURL string

	// Load the default configuration for Zookeeper followed by overriding values with those provided by the caller.
	config := &zkConfig{backoff: defaultBackoff, timeout: time.Second * 10, logger: NoopLogger{}}
	for _, opt := range options {
		opt(config)
	}

	if len(config.endpoints) == 0 {
		return "", errors.New("no Zookeeper endpoints supplied")
	}

	if config.path == "" {
		return "", errors.New("no Zookeeper path supplied")
	}

	// Create a closure that allows us to use the ExponentialBackoff function.
	retryErr := ExponentialBackoff(config.backoff, config.logger, func() (bool, error) {

		c, _, err := zk.Connect(config.endpoints, config.timeout, func(c *zk.Conn) { c.SetLogger(config.logger) })
		if err != nil {
			return false, NewTemporaryError(errors.Wrap(err, "failed to connect to Zookeeper"))
		}

		defer c.Close()

		// Open up descriptor for the ZK path given
		children, _, _, err := c.ChildrenW(config.path)
		if err != nil {

			// Sentinel error check as there is no other way to check.
			if err == zk.ErrInvalidPath {
				return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", config.path)
			}

			return false, NewTemporaryError(errors.Wrapf(err, "path %s doesn't exist on Zookeeper ", config.path))
		}

		// Search for the leader through all the children in the given path
		for _, child := range children {

			// Only the leader will start with member_
			if strings.HasPrefix(child, "member_") {

				childPath := config.path + "/" + child
				data, _, err := c.Get(childPath)
				if err != nil {
					if err == zk.ErrInvalidPath {
						return false, errors.Wrapf(err, "path %s is an invalid Zookeeper path", childPath)
					}

					return false, NewTemporaryError(errors.Wrap(err, "unable to fetch contents of leader"))
				}

				var serviceInst serviceInstance
				err = json.Unmarshal([]byte(data), &serviceInst)
				if err != nil {
					return false, NewTemporaryError(errors.Wrap(err, "unable to unmarshal contents of leader"))
				}

				// Should only be one endpoint.
				// This should never be encountered as it would indicate Aurora
				// writing bad info into Zookeeper but is kept here as a safety net.
				if len(serviceInst.AdditionalEndpoints) > 1 {
					return false,
						NewTemporaryError(errors.New("ambiguous endpoints in json blob, Aurora wrote bad info to ZK"))
				}

				var scheme, host, port string
				for k, v := range serviceInst.AdditionalEndpoints {
					scheme = k
					host = v.Host
					port = strconv.Itoa(v.Port)
				}

				leaderURL = scheme + "://" + host + ":" + port
				return true, nil
			}
		}

		// Leader data might not be available yet, try to fetch again.
		return false, NewTemporaryError(errors.New("no leader found"))
	})

	if retryErr != nil {
		config.logger.Printf("failed to determine leader after %v attempts", config.backoff.Steps)
		return "", retryErr
	}

	return leaderURL, nil
}