Documentation ¶
Index ¶
- Constants
- Variables
- func InitializeCallbackRegistry(clientCallbacks map[string]Factory)
- type ActionType
- type AirbusCallback
- func (h *AirbusCallback) GetDetails() (string, error)
- func (h *AirbusCallback) GetType() string
- func (h *AirbusCallback) Invoke(wrapper ScheduleWrapper) error
- func (h *AirbusCallback) Marshal(m map[string]interface{}) error
- func (h *AirbusCallback) UnmarshalJSON(data []byte) error
- func (h *AirbusCallback) Validate() error
- type App
- type AppErrorResponse
- type BulkActionTask
- type Callback
- type Configuration
- type CreateScheduleTask
- type Details
- type Factory
- type HTTPCallback
- func (h *HTTPCallback) GetDetails() (string, error)
- func (h *HTTPCallback) GetType() string
- func (h *HTTPCallback) Invoke(wrapper ScheduleWrapper) error
- func (h *HTTPCallback) Marshal(m map[string]interface{}) error
- func (h *HTTPCallback) UnmarshalJSON(data []byte) error
- func (h *HTTPCallback) Validate() error
- type HttpCallback
- func (h *HttpCallback) GetDetails() (string, error)
- func (h *HttpCallback) GetType() string
- func (h HttpCallback) Invoke(wrapper ScheduleWrapper) error
- func (h *HttpCallback) Marshal(m map[string]interface{}) error
- func (h *HttpCallback) UnmarshalJSON(data []byte) error
- func (h *HttpCallback) Validate() error
- type ReconciliationHistory
- type Schedule
- func (s Schedule) CheckUntriggeredCallback(flushPeriod int) bool
- func (s Schedule) CloneAsOneTime(at time.Time) Schedule
- func (s *Schedule) CreateScheduleFromCassandraMap(m map[string]interface{}) error
- func (s Schedule) GetCallBackType() string
- func (s Schedule) GetCallbackDetails() string
- func (s Schedule) GetTTL(app App, bufferTTL int) int
- func (s Schedule) IsRecurring() bool
- func (s *Schedule) SetFields(app App)
- func (s *Schedule) SetStatus(m map[string]interface{}) error
- func (s *Schedule) SetUnknownStatus(flushPeriod int)
- func (s *Schedule) UnmarshalJSON(data []byte) error
- func (s *Schedule) UpdateReconciliationHistory(status Status, errMsg string)
- func (s *Schedule) ValidateSchedule(app App, conf conf.AppLevelConfiguration) []string
- type ScheduleWrapper
- type Status
- type StatusTask
- type Task
Constants ¶
const ( Scheduled Status = "SCHEDULED" Deleted Status = "DELETED" Success Status = "SUCCESS" Failure Status = "FAILURE" Miss Status = "MISS" Error Status = "ERROR" Reconcile ActionType = "reconcile" Delete ActionType = "delete" )
const DefaultTimeLayout = "2006-01-02 15:04:05"
Variables ¶
var ( OldHttpTaskQueue chan ScheduleWrapper HttpTaskQueue chan ScheduleWrapper AirbusTaskQueue chan ScheduleWrapper // CronTaskQueue Channel sends the tasks to convert a recurring schedule to one time schedules CronTaskQueue chan CreateScheduleTask // AggregationTaskQueue Channel aggregates the schedules and forward to status update AggregationTaskQueue chan ScheduleWrapper // StatusTaskQueue Channel updates the status of schedules after callback is fired StatusTaskQueue chan StatusTask // BulkActionQueue Channel used to perform actions in bulk (Ex. Reconcile/Delete etc) BulkActionQueue chan BulkActionTask )
var Registry = map[string]Factory{}
Functions ¶
Types ¶
type ActionType ¶
type ActionType string
type AirbusCallback ¶
type AirbusCallback struct { EventName string `json:"eventName,omitempty"` AppName string `json:"appName,omitempty"` Headers map[string]string `json:"headers,omitempty"` }
func (*AirbusCallback) GetDetails ¶
func (h *AirbusCallback) GetDetails() (string, error)
func (*AirbusCallback) GetType ¶
func (h *AirbusCallback) GetType() string
func (*AirbusCallback) Invoke ¶
func (h *AirbusCallback) Invoke(wrapper ScheduleWrapper) error
func (*AirbusCallback) Marshal ¶
func (h *AirbusCallback) Marshal(m map[string]interface{}) error
func (*AirbusCallback) UnmarshalJSON ¶
func (h *AirbusCallback) UnmarshalJSON(data []byte) error
UnmarshalJSON Implement UnmarshalJSON for HttpCallback
func (*AirbusCallback) Validate ¶
func (h *AirbusCallback) Validate() error
type App ¶
type App struct { AppId string `json:"appId"` Partitions uint32 `json:"partitions"` Active bool `json:"active"` Configuration Configuration `json:"configuration"` }
func (App) GetBufferTTL ¶
GetBufferTTL gets bufferCassandraTTL in seconds
type AppErrorResponse ¶
type AppErrorResponse struct {
Errors []string `json:"errors"`
}
type BulkActionTask ¶
type Callback ¶
type Callback interface { GetType() string GetDetails() (string, error) Marshal(map[string]interface{}) error Invoke(wrapper ScheduleWrapper) error Validate() error // Added json.Unmarshaler // Add this }
type Configuration ¶
type Configuration struct { FutureScheduleCreationPeriod int `json:"futureScheduleCreationPeriod,omitempty"` FiredScheduleRetentionPeriod int `json:"firedScheduleRetentionPeriod,omitempty"` PayloadSize int `json:"payloadSize,omitempty"` HttpRetries int `json:"httpRetries,omitempty"` HttpTimeout int `json:"httpTimeout,omitempty"` }
type CreateScheduleTask ¶
type HTTPCallback ¶
type HTTPCallback struct { Url string `json:"url,omitempty"` Headers map[string]string `json:"headers,omitempty"` }
func (*HTTPCallback) GetDetails ¶
func (h *HTTPCallback) GetDetails() (string, error)
func (*HTTPCallback) GetType ¶
func (h *HTTPCallback) GetType() string
func (*HTTPCallback) Invoke ¶
func (h *HTTPCallback) Invoke(wrapper ScheduleWrapper) error
func (*HTTPCallback) Marshal ¶
func (h *HTTPCallback) Marshal(m map[string]interface{}) error
func (*HTTPCallback) UnmarshalJSON ¶
func (h *HTTPCallback) UnmarshalJSON(data []byte) error
UnmarshalJSON Implement UnmarshalJSON for HttpCallback
func (*HTTPCallback) Validate ¶
func (h *HTTPCallback) Validate() error
type HttpCallback ¶
func (*HttpCallback) GetDetails ¶
func (h *HttpCallback) GetDetails() (string, error)
func (*HttpCallback) GetType ¶
func (h *HttpCallback) GetType() string
func (HttpCallback) Invoke ¶
func (h HttpCallback) Invoke(wrapper ScheduleWrapper) error
func (*HttpCallback) Marshal ¶
func (h *HttpCallback) Marshal(m map[string]interface{}) error
func (*HttpCallback) UnmarshalJSON ¶
func (h *HttpCallback) UnmarshalJSON(data []byte) error
UnmarshalJSON Implement UnmarshalJSON for HttpCallback
func (*HttpCallback) Validate ¶
func (h *HttpCallback) Validate() error
type ReconciliationHistory ¶
type Schedule ¶
type Schedule struct { ScheduleId gocql.UUID `json:"scheduleId"` Payload string `json:"payload"` AppId string `json:"appId"` ScheduleTime int64 `json:"scheduleTime,omitempty"` PartitionId int `json:"partitionId"` ScheduleGroup int64 `json:"scheduleGroup,omitempty"` Callback Callback `json:"-"` CallbackRaw json.RawMessage `json:"callback,omitempty"` CronExpression string `json:"cronExpression,omitempty"` Status Status `json:"status,omitempty"` ErrorMessage string `json:"errorMessage,omitempty"` ParentScheduleId gocql.UUID `json:"-"` ReconciliationHistory []ReconciliationHistory `json:"reconciliationHistory,omitempty"` //Deprecated Ttl int `json:"-"` //Deprecated AirbusCallback AirbusCallback `json:"airbusCallback,omitempty"` //Deprecated HttpCallback HTTPCallback `json:"httpCallback,omitempty"` }
func (Schedule) CheckUntriggeredCallback ¶
CheckUntriggeredCallback checks if the current time is already past the schedule time group of the schedule with gap of more than a minute plus flush period
func (Schedule) CloneAsOneTime ¶
CloneAsOneTime Clones a given recurring schedule to one time schedule at a supplied time.:w
func (*Schedule) CreateScheduleFromCassandraMap ¶
func (Schedule) GetCallBackType ¶
func (Schedule) GetCallbackDetails ¶
func (Schedule) IsRecurring ¶
func (*Schedule) SetStatus ¶
Set status, error_msg and reconciliation_history of the schedule from map
func (*Schedule) SetUnknownStatus ¶
Set status as Scheduled if callback time is yet to come Set status as Miss if callback time is already expired
func (*Schedule) UnmarshalJSON ¶
func (*Schedule) UpdateReconciliationHistory ¶
Update schedule reconciliation history If the reconciliation history contains more than "HistorySize" reconciliations then consider the latest "HistorySize" reconciliations
func (*Schedule) ValidateSchedule ¶
func (s *Schedule) ValidateSchedule(app App, conf conf.AppLevelConfiguration) []string
type ScheduleWrapper ¶
type StatusTask ¶
type Task ¶
type Task struct {
Conf *conf.Configuration
}
func (*Task) InitTaskQueues ¶
func (t *Task) InitTaskQueues()