Documentation ¶
Index ¶
- Constants
- Variables
- func RunJob(jobName string, jConfig *JobConfig, gConfig *Config, storage *Storage) string
- func SendFailedJobNotification(cfg SMTPConfig, meta *JobMetadata) error
- func SetupLogging(logLevel string) error
- type BashExecutor
- type Config
- type Executer
- type Job
- type JobConfig
- type JobMetadata
- type JobMetadataFile
- type JobTemplateContext
- type Jober
- type NotificationTemplateContext
- type RemoteReader
- type RunAtSpec
- type SMTPConfig
- type Storage
- type StorageConn
- type StorageConnState
- type StorageCurrentJob
- type StorageJobManager
- func (m *StorageJobManager) AddConnection(id TaskId)
- func (m *StorageJobManager) AddJob(job *StorageCurrentJob)
- func (m *StorageJobManager) GetJob(id TaskId) (StorageCurrentJob, bool)
- func (m *StorageJobManager) JobConnectionCount(taskId TaskId) int
- func (m *StorageJobManager) RemoveConnection(id TaskId)
- func (m *StorageJobManager) RemoveJob(id TaskId)
- func (m *StorageJobManager) WaitJob(taskId TaskId)
- type StorageProtocolHandler
- type TaskId
Constants ¶
View Source
const ( STATE_WAIT_TASK_ID = iota STATE_WAIT_FILENAME STATE_WAIT_DATA STATE_RECEIVING STATE_END )
Storage connection states
View Source
const JOB_FINISH = "_@!_JOB_FINISH_!@_"
View Source
const STORAGE_AUTH_TIMEOUT = 30 // seconds
Waiting for client authentication
View Source
const STORAGE_FILENAME_LEN_LEN = 4
Length of filename length header
View Source
const STORAGE_READ_BUFSIZE = 4096
View Source
const STORAGE_TASK_ID_LEN = 36
Variables ¶
View Source
var JOB_TEMPLATE = template.Must(template.New("job").Parse(`
##
# Common header
##
set -e
TASK_NAME='{{.Job.Name}}'
_send_file(){
local name="$1"
exec 3<>/dev/tcp/{{.ToHost}}/{{.ToPort}}
echo -n {{.Job.TaskId}}$(printf "%0{{.FILENAME_LEN_LEN}}d" ${#name})${name} >&3
cat - >&3
exec 3>&-
}
_finish(){
echo > /dev/null
}
_fail(){
test ! -z "$1" && echo "command failed at line $1" >&2
exit 1
}
function error_exit {
if [ "$?" != "0" ]; then
echo "FAILED"
_fail
exit 1
fi
}
trap '_fail ${LINENO}' ERR
##
# Command
##
`))
View Source
var MAIL_TEMPLATE_JOB_FAILED = template.Must(template.New("mail").Parse(`From: {{ .From }}
To: {{.To}}
Subject: {{.Subject}}
Content-Type: text/plain;charset=utf8
Job {{.JobName}} failed:
{{.Message}}
Output:
-----------------------------
{{.Output}}
-----------------------------
Errput:
-----------------------------
{{.Errput}}
-----------------------------
`))
Functions ¶
func SendFailedJobNotification ¶
func SendFailedJobNotification(cfg SMTPConfig, meta *JobMetadata) error
func SetupLogging ¶
Types ¶
type BashExecutor ¶ added in v0.8.0
type BashExecutor struct { Args map[string]string Host string Port uint Sudo bool // contains filtered or unexported fields }
func NewBashExecutor ¶ added in v0.8.0
type Config ¶
type Config struct { IncludeJobs []string `yaml:"include_jobs"` Listen string StorageDir string `yaml:"storage_dir"` MetadataDir string `yaml:"metadata_dir"` CommandDir string `yaml:"command_dir"` SMTP SMTPConfig `yaml:"smtp"` Jobs map[string]*JobConfig }
func ParseConfig ¶
type Job ¶
type Job struct { Name string TaskId TaskId StorageAddr string CommandDir string // contains filtered or unexported fields }
func (*Job) Run ¶
func (job *Job) Run() *JobMetadata
type JobConfig ¶
type JobMetadata ¶
type JobMetadata struct { JobName string Gzip bool Namespace string TaskId TaskId Command string Success bool Message string TotalSize int64 StartTime time.Time EndTime time.Time ExpireTime time.Time Files []JobMetadataFile Pid int RetCode uint Script []byte Output []byte Errput []byte Config JobConfig }
func LoadJobMetadata ¶
func LoadJobMetadata(path string) (*JobMetadata, error)
func (*JobMetadata) AvgSpeed ¶
func (metadata *JobMetadata) AvgSpeed() int64
func (*JobMetadata) Duration ¶
func (metadata *JobMetadata) Duration() time.Duration
func (*JobMetadata) Save ¶
func (metadata *JobMetadata) Save(saveTo string) error
type JobMetadataFile ¶
type JobMetadataFile struct { Name string Size int64 SourceAddr string StartTime time.Time EndTime time.Time }
func (*JobMetadataFile) String ¶
func (m *JobMetadataFile) String() string
type JobTemplateContext ¶
func (*JobTemplateContext) ToHost ¶
func (jctx *JobTemplateContext) ToHost() string
func (*JobTemplateContext) ToPort ¶
func (jctx *JobTemplateContext) ToPort() string
type Jober ¶ added in v0.8.0
type Jober interface { AddJob(currentJob *StorageCurrentJob) RemoveJob(id TaskId) WaitJob(taskId TaskId) }
type RemoteReader ¶ added in v0.8.0
type RunAtSpec ¶
type RunAtSpec struct { Second string Minute string Hour string Day string Month string Weekday string }
func (*RunAtSpec) SchedulerString ¶
type SMTPConfig ¶
type Storage ¶
type Storage struct { *StorageJobManager RootDir string MetadataDir string // contains filtered or unexported fields }
func NewStorage ¶
func (*Storage) CleanupExpired ¶
func (*Storage) HandleConnection ¶ added in v0.8.0
func (stor *Storage) HandleConnection(conn StorageProtocolHandler) error
type StorageConn ¶
type StorageConn struct { RemoteReader State StorageConnState // contains filtered or unexported fields }
func NewStorageConn ¶
func NewStorageConn(rReader RemoteReader, logger *logging.Logger) *StorageConn
func (*StorageConn) ReadContent ¶ added in v0.8.0
func (sc *StorageConn) ReadContent(output io.Writer) (int64, error)
func (*StorageConn) ReadFilename ¶
func (sc *StorageConn) ReadFilename() (string, error)
func (*StorageConn) ReadTaskId ¶
func (sc *StorageConn) ReadTaskId() (TaskId, error)
type StorageConnState ¶
type StorageConnState uint8
type StorageCurrentJob ¶
type StorageCurrentJob struct { TaskId TaskId FileAddChan chan JobMetadataFile Namespace string Gzip bool }
type StorageJobManager ¶
type StorageJobManager struct {
// contains filtered or unexported fields
}
func NewStorageJobManager ¶
func NewStorageJobManager() *StorageJobManager
func (*StorageJobManager) AddConnection ¶
func (m *StorageJobManager) AddConnection(id TaskId)
func (*StorageJobManager) AddJob ¶
func (m *StorageJobManager) AddJob(job *StorageCurrentJob)
func (*StorageJobManager) GetJob ¶
func (m *StorageJobManager) GetJob(id TaskId) (StorageCurrentJob, bool)
func (*StorageJobManager) JobConnectionCount ¶
func (m *StorageJobManager) JobConnectionCount(taskId TaskId) int
func (*StorageJobManager) RemoveConnection ¶
func (m *StorageJobManager) RemoveConnection(id TaskId)
func (*StorageJobManager) RemoveJob ¶
func (m *StorageJobManager) RemoveJob(id TaskId)
func (*StorageJobManager) WaitJob ¶ added in v0.8.0
func (m *StorageJobManager) WaitJob(taskId TaskId)
type StorageProtocolHandler ¶ added in v0.8.0
Click to show internal directories.
Click to hide internal directories.