226 lines
6 KiB
Go
226 lines
6 KiB
Go
/**
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"os"
|
|
"time"
|
|
|
|
realis "github.com/paypal/gorealis"
|
|
"github.com/paypal/gorealis/gen-go/apache/aurora"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
type URIJson struct {
|
|
URI string `json:"uri"`
|
|
Extract bool `json:"extract"`
|
|
Cache bool `json:"cache"`
|
|
}
|
|
|
|
type JobJson struct {
|
|
Name string `json:"name"`
|
|
CPU float64 `json:"cpu"`
|
|
RAM int64 `json:"ram_mb"`
|
|
Disk int64 `json:"disk_mb"`
|
|
Executor string `json:"executor"`
|
|
ExecutorDataFile string `json:"exec_data_file,omitempty"`
|
|
Instances int32 `json:"instances"`
|
|
URIs []URIJson `json:"uris"`
|
|
Labels map[string]string `json:"labels"`
|
|
Service bool `json:"service"`
|
|
Ports int `json:"ports"`
|
|
}
|
|
|
|
func (j *JobJson) Validate() bool {
|
|
|
|
if j.Name == "" {
|
|
return false
|
|
}
|
|
|
|
if j.CPU <= 0.0 {
|
|
return false
|
|
}
|
|
|
|
if j.RAM <= 0 {
|
|
return false
|
|
}
|
|
|
|
if j.Disk <= 0 {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
type Config struct {
|
|
realis.Cluster `json:"cluster"`
|
|
Username string `json:"username"`
|
|
Password string `json:"password"`
|
|
SchedUrl string `json:"sched_url"`
|
|
Transport string `json:"transport,omitempty"`
|
|
Debug bool `json:"debug,omitempty"`
|
|
}
|
|
|
|
// Command-line arguments for config and job JSON files.
|
|
var configJSONFile, jobJSONFile string
|
|
|
|
var job *JobJson
|
|
var config *Config
|
|
|
|
// Reading command line arguments and validating.
|
|
// If Aurora scheduler URL not provided, then using zookeeper to locate the leader.
|
|
func init() {
|
|
flag.StringVar(&configJSONFile, "config", "./config.json", "The config file that contains username, password, and the cluster configuration information.")
|
|
flag.StringVar(&jobJSONFile, "job", "./job.json", "JSON file containing job definitions.")
|
|
|
|
flag.Parse()
|
|
|
|
job = new(JobJson)
|
|
config = new(Config)
|
|
|
|
if jobsFile, jobJSONReadErr := os.Open(jobJSONFile); jobJSONReadErr != nil {
|
|
flag.Usage()
|
|
fmt.Println("Error reading the job JSON file: ", jobJSONReadErr)
|
|
os.Exit(1)
|
|
} else {
|
|
if unmarshallErr := json.NewDecoder(jobsFile).Decode(job); unmarshallErr != nil {
|
|
flag.Usage()
|
|
fmt.Println("Error parsing job json file: ", unmarshallErr)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Need to validate the job JSON file.
|
|
if !job.Validate() {
|
|
fmt.Println("Invalid Job.")
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
if configFile, configJSONErr := os.Open(configJSONFile); configJSONErr != nil {
|
|
flag.Usage()
|
|
fmt.Println("Error reading the config JSON file: ", configJSONErr)
|
|
os.Exit(1)
|
|
} else {
|
|
if unmarshallErr := json.NewDecoder(configFile).Decode(config); unmarshallErr != nil {
|
|
fmt.Println("Error parsing config JSON file: ", unmarshallErr)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
}
|
|
|
|
func CreateRealisClient(config *Config) (realis.Realis, error) {
|
|
var transportOption realis.ClientOption
|
|
// Configuring transport protocol. If not transport is provided, then using JSON as the
|
|
// default transport protocol.
|
|
switch config.Transport {
|
|
case "binary":
|
|
transportOption = realis.ThriftBinary()
|
|
case "json", "":
|
|
transportOption = realis.ThriftJSON()
|
|
default:
|
|
fmt.Println("Invalid transport option provided!")
|
|
os.Exit(1)
|
|
}
|
|
|
|
clientOptions := []realis.ClientOption{
|
|
realis.BasicAuth(config.Username, config.Password),
|
|
transportOption,
|
|
realis.ZKCluster(&config.Cluster),
|
|
// realis.SchedulerUrl(config.SchedUrl),
|
|
realis.SetLogger(log.New(os.Stdout, "realis-debug: ", log.Ldate)),
|
|
realis.BackOff(realis.Backoff{
|
|
Steps: 2,
|
|
Duration: 10 * time.Second,
|
|
Factor: 2.0,
|
|
Jitter: 0.1,
|
|
}),
|
|
}
|
|
|
|
if config.Debug {
|
|
clientOptions = append(clientOptions, realis.Debug())
|
|
}
|
|
|
|
return realis.NewRealisClient(clientOptions...)
|
|
}
|
|
|
|
func main() {
|
|
if r, clientCreationErr := CreateRealisClient(config); clientCreationErr != nil {
|
|
fmt.Println(clientCreationErr)
|
|
os.Exit(1)
|
|
} else {
|
|
monitor := &realis.Monitor{Client: r}
|
|
defer r.Close()
|
|
uris := job.URIs
|
|
labels := job.Labels
|
|
|
|
auroraJob := realis.NewJob().
|
|
Environment("prod").
|
|
Role("vagrant").
|
|
Name(job.Name).
|
|
CPU(job.CPU).
|
|
RAM(job.RAM).
|
|
Disk(job.Disk).
|
|
IsService(job.Service).
|
|
InstanceCount(job.Instances).
|
|
AddPorts(job.Ports)
|
|
|
|
// If thermos executor, then reading in the thermos payload.
|
|
if (job.Executor == aurora.AURORA_EXECUTOR_NAME) || (job.Executor == "thermos") {
|
|
payload, err := ioutil.ReadFile(job.ExecutorDataFile)
|
|
if err != nil {
|
|
fmt.Println(errors.Wrap(err, "Invalid thermos payload file!"))
|
|
os.Exit(1)
|
|
}
|
|
auroraJob.ExecutorName(aurora.AURORA_EXECUTOR_NAME).
|
|
ExecutorData(string(payload))
|
|
} else {
|
|
auroraJob.ExecutorName(job.Executor)
|
|
}
|
|
|
|
// Adding URIs.
|
|
for _, uri := range uris {
|
|
auroraJob.AddURIs(uri.Extract, uri.Cache, uri.URI)
|
|
}
|
|
|
|
// Adding Labels.
|
|
for key, value := range labels {
|
|
auroraJob.AddLabel(key, value)
|
|
}
|
|
|
|
fmt.Println("Creating Job...")
|
|
if resp, jobCreationErr := r.CreateJob(auroraJob); jobCreationErr != nil {
|
|
fmt.Println("Error creating Aurora job: ", jobCreationErr)
|
|
os.Exit(1)
|
|
} else {
|
|
if resp.ResponseCode == aurora.ResponseCode_OK {
|
|
if ok, monitorErr := monitor.Instances(auroraJob.JobKey(), auroraJob.GetInstanceCount(), 5, 50); !ok || monitorErr != nil {
|
|
if _, jobErr := r.KillJob(auroraJob.JobKey()); jobErr !=
|
|
nil {
|
|
fmt.Println(jobErr)
|
|
os.Exit(1)
|
|
} else {
|
|
fmt.Println("ok: ", ok)
|
|
fmt.Println("jobErr: ", jobErr)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|