2016-08-24 12:00:26 -07:00
/ * *
* Licensed under the Apache License , Version 2.0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an "AS IS" BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
* /
2016-08-19 15:25:15 -07:00
package main
import (
"encoding/json"
"flag"
"fmt"
2018-07-13 02:14:11 -07:00
"io/ioutil"
"log"
"os"
"time"
2018-09-13 17:02:15 -07:00
2020-02-19 12:01:02 -08:00
realis "github.com/aurora-scheduler/gorealis/v2"
"github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora"
2018-09-13 17:02:15 -07:00
"github.com/pkg/errors"
2016-08-19 15:25:15 -07:00
)
type URIJson struct {
2016-08-24 12:00:26 -07:00
URI string ` json:"uri" `
Extract bool ` json:"extract" `
Cache bool ` json:"cache" `
2016-08-19 15:25:15 -07:00
}
type JobJson struct {
2018-07-13 02:14:11 -07:00
Name string ` json:"name" `
CPU float64 ` json:"cpu" `
RAM int64 ` json:"ram_mb" `
Disk int64 ` json:"disk_mb" `
Executor string ` json:"executor" `
ExecutorDataFile string ` json:"exec_data_file,omitempty" `
Instances int32 ` json:"instances" `
URIs [ ] URIJson ` json:"uris" `
Labels map [ string ] string ` json:"labels" `
Service bool ` json:"service" `
Ports int ` json:"ports" `
2016-08-19 15:25:15 -07:00
}
func ( j * JobJson ) Validate ( ) bool {
if j . Name == "" {
return false
}
2016-08-24 12:00:26 -07:00
if j . CPU <= 0.0 {
2016-08-19 15:25:15 -07:00
return false
}
2016-08-24 12:00:26 -07:00
if j . RAM <= 0 {
2016-08-19 15:25:15 -07:00
return false
}
2016-08-24 12:00:26 -07:00
if j . Disk <= 0 {
2016-08-19 15:25:15 -07:00
return false
}
return true
}
2018-07-13 02:14:11 -07:00
type Config struct {
realis . Cluster ` json:"cluster" `
Username string ` json:"username" `
Password string ` json:"password" `
SchedUrl string ` json:"sched_url" `
Transport string ` json:"transport,omitempty" `
Debug bool ` json:"debug,omitempty" `
}
// Command-line arguments for config and job JSON files.
var configJSONFile , jobJSONFile string
var job * JobJson
var config * Config
// Reading command line arguments and validating.
// If Aurora scheduler URL not provided, then using zookeeper to locate the leader.
func init ( ) {
flag . StringVar ( & configJSONFile , "config" , "./config.json" , "The config file that contains username, password, and the cluster configuration information." )
flag . StringVar ( & jobJSONFile , "job" , "./job.json" , "JSON file containing job definitions." )
2016-08-19 15:25:15 -07:00
flag . Parse ( )
2018-07-13 02:14:11 -07:00
job = new ( JobJson )
config = new ( Config )
if jobsFile , jobJSONReadErr := os . Open ( jobJSONFile ) ; jobJSONReadErr != nil {
2016-08-19 15:25:15 -07:00
flag . Usage ( )
2018-07-13 02:14:11 -07:00
fmt . Println ( "Error reading the job JSON file: " , jobJSONReadErr )
2016-08-19 15:25:15 -07:00
os . Exit ( 1 )
2018-07-13 02:14:11 -07:00
} else {
if unmarshallErr := json . NewDecoder ( jobsFile ) . Decode ( job ) ; unmarshallErr != nil {
flag . Usage ( )
fmt . Println ( "Error parsing job json file: " , unmarshallErr )
os . Exit ( 1 )
}
// Need to validate the job JSON file.
if ! job . Validate ( ) {
fmt . Println ( "Invalid Job." )
os . Exit ( 1 )
}
2016-08-19 15:25:15 -07:00
}
2018-07-13 02:14:11 -07:00
if configFile , configJSONErr := os . Open ( configJSONFile ) ; configJSONErr != nil {
flag . Usage ( )
fmt . Println ( "Error reading the config JSON file: " , configJSONErr )
2016-08-19 15:25:15 -07:00
os . Exit ( 1 )
2018-07-13 02:14:11 -07:00
} else {
if unmarshallErr := json . NewDecoder ( configFile ) . Decode ( config ) ; unmarshallErr != nil {
fmt . Println ( "Error parsing config JSON file: " , unmarshallErr )
os . Exit ( 1 )
}
2016-08-19 15:25:15 -07:00
}
2018-07-13 02:14:11 -07:00
}
2016-08-19 15:25:15 -07:00
2018-12-08 08:57:15 -08:00
func CreateRealisClient ( config * Config ) ( * realis . Client , error ) {
2018-07-13 02:14:11 -07:00
var transportOption realis . ClientOption
// Configuring transport protocol. If not transport is provided, then using JSON as the
// default transport protocol.
switch config . Transport {
case "binary" :
transportOption = realis . ThriftBinary ( )
case "json" , "" :
transportOption = realis . ThriftJSON ( )
default :
fmt . Println ( "Invalid transport option provided!" )
2016-08-19 15:25:15 -07:00
os . Exit ( 1 )
}
2018-07-13 02:14:11 -07:00
clientOptions := [ ] realis . ClientOption {
realis . BasicAuth ( config . Username , config . Password ) ,
transportOption ,
realis . ZKCluster ( & config . Cluster ) ,
// realis.SchedulerUrl(config.SchedUrl),
realis . SetLogger ( log . New ( os . Stdout , "realis-debug: " , log . Ldate ) ) ,
realis . BackOff ( realis . Backoff {
Steps : 2 ,
Duration : 10 * time . Second ,
Factor : 2.0 ,
Jitter : 0.1 ,
} ) ,
2016-08-19 15:25:15 -07:00
}
2018-07-13 02:14:11 -07:00
if config . Debug {
clientOptions = append ( clientOptions , realis . Debug ( ) )
2016-08-19 15:25:15 -07:00
}
2018-12-08 08:57:15 -08:00
return realis . NewClient ( clientOptions ... )
2018-07-13 02:14:11 -07:00
}
2016-08-19 15:25:15 -07:00
2018-07-13 02:14:11 -07:00
func main ( ) {
if r , clientCreationErr := CreateRealisClient ( config ) ; clientCreationErr != nil {
fmt . Println ( clientCreationErr )
2016-08-19 15:25:15 -07:00
os . Exit ( 1 )
2018-07-13 02:14:11 -07:00
} else {
defer r . Close ( )
uris := job . URIs
labels := job . Labels
auroraJob := realis . NewJob ( ) .
Environment ( "prod" ) .
Role ( "vagrant" ) .
Name ( job . Name ) .
CPU ( job . CPU ) .
RAM ( job . RAM ) .
Disk ( job . Disk ) .
IsService ( job . Service ) .
2021-10-15 12:18:26 -07:00
Tier ( "preemptible" ) .
Priority ( 0 ) .
2018-07-13 02:14:11 -07:00
InstanceCount ( job . Instances ) .
AddPorts ( job . Ports )
// If thermos executor, then reading in the thermos payload.
if ( job . Executor == aurora . AURORA_EXECUTOR_NAME ) || ( job . Executor == "thermos" ) {
payload , err := ioutil . ReadFile ( job . ExecutorDataFile )
if err != nil {
fmt . Println ( errors . Wrap ( err , "Invalid thermos payload file!" ) )
os . Exit ( 1 )
}
auroraJob . ExecutorName ( aurora . AURORA_EXECUTOR_NAME ) .
ExecutorData ( string ( payload ) )
} else {
auroraJob . ExecutorName ( job . Executor )
}
// Adding URIs.
for _ , uri := range uris {
auroraJob . AddURIs ( uri . Extract , uri . Cache , uri . URI )
}
// Adding Labels.
for key , value := range labels {
auroraJob . AddLabel ( key , value )
}
fmt . Println ( "Creating Job..." )
2018-12-04 15:19:08 -08:00
if jobCreationErr := r . CreateJob ( auroraJob ) ; jobCreationErr != nil {
2018-07-13 02:14:11 -07:00
fmt . Println ( "Error creating Aurora job: " , jobCreationErr )
os . Exit ( 1 )
} else {
2019-09-10 17:42:51 -07:00
if ok , monitorErr := r . MonitorInstances ( auroraJob . JobKey ( ) , auroraJob . GetInstanceCount ( ) , 5 , 50 ) ; ! ok || monitorErr != nil {
2018-12-04 15:19:08 -08:00
if jobErr := r . KillJob ( auroraJob . JobKey ( ) ) ; jobErr !=
nil {
fmt . Println ( jobErr )
os . Exit ( 1 )
} else {
fmt . Println ( "ok: " , ok )
fmt . Println ( "jobErr: " , jobErr )
2018-07-13 02:14:11 -07:00
}
}
}
2016-08-19 15:25:15 -07:00
}
}