store

package
v0.2.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Scheduled Status     = "SCHEDULED"
	Deleted   Status     = "DELETED"
	Success   Status     = "SUCCESS"
	Failure   Status     = "FAILURE"
	Miss      Status     = "MISS"
	Error     Status     = "ERROR"
	Reconcile ActionType = "reconcile"
	Delete    ActionType = "delete"
)
View Source
const DefaultTimeLayout = "2006-01-02 15:04:05"

Variables

View Source
var (
	OldHttpTaskQueue chan ScheduleWrapper
	HttpTaskQueue    chan ScheduleWrapper
	AirbusTaskQueue  chan ScheduleWrapper
	// CronTaskQueue Channel sends the tasks to convert a recurring schedule to one time schedules
	CronTaskQueue chan CreateScheduleTask
	// AggregationTaskQueue Channel aggregates the schedules and forward to status update
	AggregationTaskQueue chan ScheduleWrapper
	// StatusTaskQueue Channel updates the status of schedules after callback is fired
	StatusTaskQueue chan StatusTask
	// BulkActionQueue Channel used to perform actions in bulk (Ex. Reconcile/Delete etc)
	BulkActionQueue chan BulkActionTask
)
View Source
var Registry = map[string]Factory{}

Functions

func InitializeCallbackRegistry

func InitializeCallbackRegistry(clientCallbacks map[string]Factory)

Types

type ActionType

type ActionType string

type AirbusCallback

type AirbusCallback struct {
	EventName string            `json:"eventName,omitempty"`
	AppName   string            `json:"appName,omitempty"`
	Headers   map[string]string `json:"headers,omitempty"`
}

func (*AirbusCallback) GetDetails

func (h *AirbusCallback) GetDetails() (string, error)

func (*AirbusCallback) GetType

func (h *AirbusCallback) GetType() string

func (*AirbusCallback) Invoke

func (h *AirbusCallback) Invoke(wrapper ScheduleWrapper) error

func (*AirbusCallback) Marshal

func (h *AirbusCallback) Marshal(m map[string]interface{}) error

func (*AirbusCallback) UnmarshalJSON

func (h *AirbusCallback) UnmarshalJSON(data []byte) error

UnmarshalJSON Implement UnmarshalJSON for HttpCallback

func (*AirbusCallback) Validate

func (h *AirbusCallback) Validate() error

type App

type App struct {
	AppId         string        `json:"appId"`
	Partitions    uint32        `json:"partitions"`
	Active        bool          `json:"active"`
	Configuration Configuration `json:"configuration"`
}

func (App) GetBufferTTL

func (a App) GetBufferTTL(bufferTTL int) int

GetBufferTTL gets bufferCassandraTTL in seconds

func (App) GetMaxTTL

func (a App) GetMaxTTL(maxTTL int) int

GetMaxTTL gets maxCassandraTTL in seconds

type AppErrorResponse

type AppErrorResponse struct {
	Errors []string `json:"errors"`
}

type BulkActionTask

type BulkActionTask struct {
	App               App        `json:"app,omitempty"`
	PartitionId       int        `json:"partitionId,omitempty"`
	ScheduleTimeGroup time.Time  `json:"scheduleTimeGroup,omitempty"`
	Status            Status     `json:"status,omitempty"`
	ActionType        ActionType `json:"actionType"`
}

type Callback

type Callback interface {
	GetType() string
	GetDetails() (string, error)
	Marshal(map[string]interface{}) error
	Invoke(wrapper ScheduleWrapper) error
	Validate() error // Added
	json.Unmarshaler // Add this
}

type Configuration

type Configuration struct {
	FutureScheduleCreationPeriod int `json:"futureScheduleCreationPeriod,omitempty"`
	FiredScheduleRetentionPeriod int `json:"firedScheduleRetentionPeriod,omitempty"`
	PayloadSize                  int `json:"payloadSize,omitempty"`
	HttpRetries                  int `json:"httpRetries,omitempty"`
	HttpTimeout                  int `json:"httpTimeout,omitempty"`
}

type CreateScheduleTask

type CreateScheduleTask struct {
	Cron     Schedule
	From     time.Time
	Duration time.Duration
}

type Details

type Details struct {
	Url     string            `json:"url"`
	Method  string            `json:"method"`
	Headers map[string]string `json:"headers"`
}

type Factory

type Factory func() Callback

type HTTPCallback

type HTTPCallback struct {
	Url     string            `json:"url,omitempty"`
	Headers map[string]string `json:"headers,omitempty"`
}

func (*HTTPCallback) GetDetails

func (h *HTTPCallback) GetDetails() (string, error)

func (*HTTPCallback) GetType

func (h *HTTPCallback) GetType() string

func (*HTTPCallback) Invoke

func (h *HTTPCallback) Invoke(wrapper ScheduleWrapper) error

func (*HTTPCallback) Marshal

func (h *HTTPCallback) Marshal(m map[string]interface{}) error

func (*HTTPCallback) UnmarshalJSON

func (h *HTTPCallback) UnmarshalJSON(data []byte) error

UnmarshalJSON Implement UnmarshalJSON for HttpCallback

func (*HTTPCallback) Validate

func (h *HTTPCallback) Validate() error

type HttpCallback

type HttpCallback struct {
	Type    string  `json:"type"`
	Details Details `json:"details"`
}

func (*HttpCallback) GetDetails

func (h *HttpCallback) GetDetails() (string, error)

func (*HttpCallback) GetType

func (h *HttpCallback) GetType() string

func (HttpCallback) Invoke

func (h HttpCallback) Invoke(wrapper ScheduleWrapper) error

func (*HttpCallback) Marshal

func (h *HttpCallback) Marshal(m map[string]interface{}) error

func (*HttpCallback) UnmarshalJSON

func (h *HttpCallback) UnmarshalJSON(data []byte) error

UnmarshalJSON Implement UnmarshalJSON for HttpCallback

func (*HttpCallback) Validate

func (h *HttpCallback) Validate() error

type ReconciliationHistory

type ReconciliationHistory struct {
	Status       Status `json:"status,omitempty"`
	ErrorMessage string `json:"errorMessage,omitempty"`
	CallbackOn   string `json:"callbackOn,omitempty"`
}

type Schedule

type Schedule struct {
	ScheduleId            gocql.UUID              `json:"scheduleId"`
	Payload               string                  `json:"payload"`
	AppId                 string                  `json:"appId"`
	ScheduleTime          int64                   `json:"scheduleTime,omitempty"`
	PartitionId           int                     `json:"partitionId"`
	ScheduleGroup         int64                   `json:"scheduleGroup,omitempty"`
	Callback              Callback                `json:"-"`
	CallbackRaw           json.RawMessage         `json:"callback,omitempty"`
	CronExpression        string                  `json:"cronExpression,omitempty"`
	Status                Status                  `json:"status,omitempty"`
	ErrorMessage          string                  `json:"errorMessage,omitempty"`
	ParentScheduleId      gocql.UUID              `json:"-"`
	ReconciliationHistory []ReconciliationHistory `json:"reconciliationHistory,omitempty"`
	//Deprecated
	Ttl int `json:"-"`
	//Deprecated
	AirbusCallback AirbusCallback `json:"airbusCallback,omitempty"`
	//Deprecated
	HttpCallback HTTPCallback `json:"httpCallback,omitempty"`
}

func (Schedule) CheckUntriggeredCallback

func (s Schedule) CheckUntriggeredCallback(flushPeriod int) bool

CheckUntriggeredCallback checks if the current time is already past the schedule time group of the schedule with gap of more than a minute plus flush period

func (Schedule) CloneAsOneTime

func (s Schedule) CloneAsOneTime(at time.Time) Schedule

CloneAsOneTime Clones a given recurring schedule to one time schedule at a supplied time.:w

func (*Schedule) CreateScheduleFromCassandraMap

func (s *Schedule) CreateScheduleFromCassandraMap(m map[string]interface{}) error

func (Schedule) GetCallBackType

func (s Schedule) GetCallBackType() string

func (Schedule) GetCallbackDetails

func (s Schedule) GetCallbackDetails() string

func (Schedule) GetTTL

func (s Schedule) GetTTL(app App, bufferTTL int) int

GetTTL TTL will be set at schedule level ttl = scheduleTime - now

func (Schedule) IsRecurring

func (s Schedule) IsRecurring() bool

func (*Schedule) SetFields

func (s *Schedule) SetFields(app App)

func (*Schedule) SetStatus

func (s *Schedule) SetStatus(m map[string]interface{}) error

Set status, error_msg and reconciliation_history of the schedule from map

func (*Schedule) SetUnknownStatus

func (s *Schedule) SetUnknownStatus(flushPeriod int)

Set status as Scheduled if callback time is yet to come Set status as Miss if callback time is already expired

func (*Schedule) UnmarshalJSON

func (s *Schedule) UnmarshalJSON(data []byte) error

func (*Schedule) UpdateReconciliationHistory

func (s *Schedule) UpdateReconciliationHistory(status Status, errMsg string)

Update schedule reconciliation history If the reconciliation history contains more than "HistorySize" reconciliations then consider the latest "HistorySize" reconciliations

func (*Schedule) ValidateSchedule

func (s *Schedule) ValidateSchedule(app App, conf conf.AppLevelConfiguration) []string

type ScheduleWrapper

type ScheduleWrapper struct {
	Schedule         Schedule
	App              App
	IsReconciliation bool
}

type Status

type Status string

Status for schedules in the system

type StatusTask

type StatusTask struct {
	Schedules []Schedule
	App       App
}

type Task

type Task struct {
	Conf *conf.Configuration
}

func (*Task) InitTaskQueues

func (t *Task) InitTaskQueues()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL