data

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2022 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// JobQueued is the job status during first attempt
	JobQueued JobStatus = iota + 1000
	// JobInflight is to signify that the DeliveryJob is in its first attempt
	JobInflight
	// JobDelivered signifies that the DeliveryJob received 2XX status from consumer
	JobDelivered
	// JobDead signifies that retry has taken its toll and max retried happened
	JobDead
	// JobQueuedStr is the string rep of JobQueued
	JobQueuedStr = "QUEUED"
	// JobInflightStr is the string rep of JobInflight
	JobInflightStr = "INFLIGHT"
	// JobDeliveredStr is the string rep of JobDelivered
	JobDeliveredStr = "DELIVERED"
	// JobDeadStr is the string rep of JobDead
	JobDeadStr = "DEAD"
)
View Source
const (

	// MsgStatusAcknowledged represents the state after receiving the message but before it is dispatched
	MsgStatusAcknowledged MsgStatus = iota + 100
	// MsgStatusDispatched represents the fact that the dispatch jobs have been created for the message
	MsgStatusDispatched
	// MsgStatusAcknowledgedStr is the string representation of message's acknowledged status
	MsgStatusAcknowledgedStr = "ACKNOWLEDGED"
	// MsgStatusDispatchedStr is the string representation of message's dispatched status
	MsgStatusDispatchedStr = "DISPATCHED"
)

Variables

View Source
var (
	// ErrInsufficientInformationForCreating is returned when NewProducer is called with insufficient information
	ErrInsufficientInformationForCreating = errors.New("Necessary information missing for persistence")
)
View Source
var (
	// ErrLockableNil represents the error returned when lockable is nil in NewLock
	ErrLockableNil = errors.New("lockable can not be nil")
)

Functions

This section is empty.

Types

type App

type App struct {
	// contains filtered or unexported fields
}

App represents this application state for cross cluster use

func NewApp

func NewApp(seedData *config.SeedData, status AppStatus) *App

NewApp initializes a new App instance

func (*App) GetSeedData

func (app *App) GetSeedData() *config.SeedData

GetSeedData retrieves the current seed data config of the App. In NonInitialized status it can be nil

func (*App) GetStatus

func (app *App) GetStatus() AppStatus

GetStatus retrieves the current status of the App

type AppStatus

type AppStatus int

AppStatus represents the status of this App

const (
	// NotInitialized is when the App is just started and no initialization ever happened
	NotInitialized AppStatus = iota + 1
	// Initializing is when App has started to run the initializing process
	Initializing
	// Initialized is when init process is completed for the App
	Initialized
)

type BasePaginateable

type BasePaginateable struct {
	ID        xid.ID
	CreatedAt time.Time
	UpdatedAt time.Time
}

BasePaginateable provides common functionalities around paginateable objects

func (*BasePaginateable) GetCursor

func (paginateable *BasePaginateable) GetCursor() (cursor *Cursor, err error)

GetCursor returns the cursor value for this producer

func (*BasePaginateable) GetLastUpdatedHTTPTimeString

func (paginateable *BasePaginateable) GetLastUpdatedHTTPTimeString() string

GetLastUpdatedHTTPTimeString exposes the string rep of the last modified timestamp for the object

func (*BasePaginateable) QuickFix

func (paginateable *BasePaginateable) QuickFix() bool

QuickFix fixes base paginate-able model's attribute

type Channel

type Channel struct {
	MessageStakeholder
	ChannelID string
}

Channel is the object that producer broadcasts to and consumer consumes from

func NewChannel

func NewChannel(channelID string, token string) (*Channel, error)

NewChannel creates new Consumer

func (*Channel) IsInValidState

func (channel *Channel) IsInValidState() bool

IsInValidState returns false if any of channel id or name or token is empty

func (*Channel) QuickFix

func (channel *Channel) QuickFix() bool

QuickFix fixes the model to set default ID, name same as channel id, created and updated at to current time.

type Consumer

type Consumer struct {
	MessageStakeholder
	ConsumerID    string
	CallbackURL   string
	ConsumingFrom *Channel
}

Consumer is the object that producer broadcasts to and consumer consumes from

func NewConsumer

func NewConsumer(channel *Channel, consumerID, token string, callbackURL *url.URL) (*Consumer, error)

NewConsumer creates new Consumer

func (*Consumer) GetChannelIDSafely

func (consumer *Consumer) GetChannelIDSafely() (channelID string)

GetChannelIDSafely retrieves channel id account for the fact that ConsumingFrom may be null

func (*Consumer) IsInValidState

func (consumer *Consumer) IsInValidState() bool

IsInValidState returns false if any of consumer id or name or token is empty, channel is not nil and callback URL is absolute URL

func (*Consumer) QuickFix

func (consumer *Consumer) QuickFix() bool

QuickFix fixes the model to set default ID, name same as producer id, created and updated at to current time.

type Cursor

type Cursor struct {
	ID        string
	Timestamp time.Time
}

Cursor represents a string used for pagination

func ParseCursor

func ParseCursor(encodedCursorString string) (cursor *Cursor, err error)

ParseCursor creates Cursor from its string representation

func (*Cursor) String

func (c *Cursor) String() string

type DeliveryJob

type DeliveryJob struct {
	BasePaginateable
	Message               *Message
	Listener              *Consumer
	Status                JobStatus
	StatusChangedAt       time.Time
	DispatchReceivedAt    time.Time
	EarliestNextAttemptAt time.Time
	RetryAttemptCount     uint
}

DeliveryJob represents the DTO object for deliverying a Message to a consumer

func NewDeliveryJob

func NewDeliveryJob(msg *Message, consumer *Consumer) (job *DeliveryJob, err error)

NewDeliveryJob creates a new instance of DeliveryJob; returns insufficient info error if parameters are not valid for a new DeliveryJob

func (*DeliveryJob) GetLockID

func (job *DeliveryJob) GetLockID() string

GetLockID retrieves the Lock ID representing this instance of DeliveryJob

func (*DeliveryJob) IsInValidState

func (job *DeliveryJob) IsInValidState() bool

IsInValidState returns false if any of message id or payload or content type is empty, channel is nil, callback URL is not url or not absolute URL, status not recognized, received at and outboxed at not set properly. Call QuickFix before IsInValidState is called.

func (*DeliveryJob) QuickFix

func (job *DeliveryJob) QuickFix() bool

QuickFix fixes the object state automatically as much as possible

type JobStatus

type JobStatus int

JobStatus represents the delivery job status

func (JobStatus) String

func (status JobStatus) String() string

type Lock

type Lock struct {
	LockID     string
	AttainedAt time.Time
}

Lock represents the construct for lock information

func NewLock

func NewLock(lockable Lockable) (lock *Lock, err error)

NewLock returns a new instance of lock from the lockable

type Lockable

type Lockable interface {
	GetLockID() string
}

Lockable represents the API necessary to lock an object for distributed MUTEX operation

type Message

type Message struct {
	BasePaginateable
	MessageID     string
	Payload       string
	ContentType   string
	Priority      uint
	Status        MsgStatus
	BroadcastedTo *Channel
	ProducedBy    *Producer
	ReceivedAt    time.Time
	OutboxedAt    time.Time
}

Message represents the main payload of the application to be delivered

func NewMessage

func NewMessage(channel *Channel, producedBy *Producer, payload, contentType string) (*Message, error)

NewMessage creates and returns new instance of message

func (*Message) GetChannelIDSafely

func (message *Message) GetChannelIDSafely() (channelID string)

GetChannelIDSafely retrieves channel id account for the fact that BroadcastedTo may be null

func (*Message) GetLockID

func (message *Message) GetLockID() string

GetLockID retrieves lock ID for the current instance of message

func (*Message) IsInValidState

func (message *Message) IsInValidState() bool

IsInValidState returns false if any of message id or payload or content type is empty, channel is nil, callback URL is not url or not absolute URL, status not recognized, received at and outboxed at not set properly. Call QuickFix before IsInValidState is called.

func (*Message) QuickFix

func (message *Message) QuickFix() bool

QuickFix fixes the object state automatically as much as possible

type MessageStakeholder

type MessageStakeholder struct {
	BasePaginateable
	Name  string
	Token string
}

MessageStakeholder represents all objects around a message, for example, Producer, Channel, Consumer

type MsgStatus

type MsgStatus int

MsgStatus represents the state of a Msg.

func (MsgStatus) String

func (status MsgStatus) String() string

type Paginateable

type Paginateable interface {
	GetCursor() (*Cursor, error)
}

Paginateable should be implemented by objects having xid.ID as field ID in DB and helps get cursor object

type Pagination

type Pagination struct {
	Next     *Cursor
	Previous *Cursor
}

Pagination represents a data structure to determine how to traverse a list

func NewPagination

func NewPagination(after Paginateable, before Paginateable) *Pagination

NewPagination returns a new pagination wrapper

type Producer

type Producer struct {
	MessageStakeholder
	ProducerID string
}

Producer represents generator of messages

func NewProducer

func NewProducer(producerID string, token string) (*Producer, error)

NewProducer creates new Producer

func (*Producer) IsInValidState

func (prod *Producer) IsInValidState() bool

IsInValidState returns false if any of producer id or name or token is empty

func (*Producer) QuickFix

func (prod *Producer) QuickFix() bool

QuickFix fixes the model to set default ID, name same as producer id, created and updated at to current time.

type Updateable

type Updateable interface {
	GetLastUpdatedHTTPTimeString() string
}

Updateable represents interface for objects that expose updated date

type ValidateableModel

type ValidateableModel interface {
	QuickFix() bool
	IsInValidState() bool
}

ValidateableModel model supporting this can be checked for valid state before write ops. Also allows for quick fix to be applied

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL