aqdispatch

package module
v0.6.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2023 License: Apache-2.0 Imports: 18 Imported by: 0

README

AQDispatch

Dispatch RPC over The Big Red's Advanced Queue (DBMS_AQ).

You give the request type (needs a Name and a Payload), and the response type (needs an Errmsg and a Payload) to New, and Dispatcher receives the messages, and calls the given do function for each messages.

The output of that function is sent back as answer.

This is a dispatcher, which splits input over the NAME of the task, and manages separate disk queues for each NAME.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrUnknownCommand = errors.New("unknown command")
	ErrSkipResponse   = errors.New("skip response")
	ErrEmpty          = errors.New("empty")
	ErrExit           = errors.New("exit")
	ErrAnswer         = errors.New("answer send error")
)

Functions

This section is empty.

Types

type Config

type Config struct {

	// Enc is needed when DB sends/requires something other than UTF-8
	Enc encoding.Encoding

	*slog.Logger

	// RequestKeyName is the attribute name instead of NAME.
	RequestKeyName string

	// DisQPrefix is the diskqueue file's prefix.
	DisQPrefix string
	// DisQPath is the path for the diskqueue.
	DisQPath string

	// Correlation specifies that only dequeue messages with the same correlation string.
	Correlation string
	// ResponseKeyPayload is the attribute name instead of PAYLOAD
	ResponseKeyPayload string
	// ResponseKeyBlob is the attribute name instead of AUX
	ResponseKeyBlob string

	// ResponseKeyErrMsg is the attribute name instead of ERRMSG
	ResponseKeyErrMsg string
	// RequestKeyPayload is the attribute name instead of PAYLOAD
	RequestKeyPayload string
	// RequestKeyBlob is the attribute name instead of AUX
	RequestKeyBlob string

	DisQMaxFileSize, DisQSyncEvery int64
	DisQSyncTimeout                time.Duration

	Timeout, PipeTimeout time.Duration
	// QueueCount is the approximate number of queues dispatched over this AQ.
	QueueCount int
	// Concurrency is the number of concurrent RPCs.
	Concurrency int

	DisQMinMsgSize, DisQMaxMsgSize int32
}

Config of the Dispatcher.

type Dispatcher

type Dispatcher struct {
	Timezone *time.Location
	// contains filtered or unexported fields
}

Dispatcher. After creating with New, run it with Run.

Reads tasks and store the messages in diskqueues - one for each distinct NAME. If Concurrency allows, calls the do function given in New, and sends the answer as PAYLOAD of what that writes as response.

func New

func New(
	db *sql.DB,
	conf Config,
	inQName, inQType string,
	do DoFunc,
	outQName, outQType string,
) (*Dispatcher, error)

New returns a new Dispatcher, which receives inQType typed messages on inQName.

Then it calls "do" function with the task, and sends its output as response on outQName queue in outQType message.

When outQNameand outQType is empty, no response is sent, no response queue is opened.

func (*Dispatcher) Close

func (di *Dispatcher) Close() error

Close the dispatcher.

func (*Dispatcher) Decode

func (di *Dispatcher) Decode(p []byte) string

Decode the string from DB's encoding to UTF-8.

func (*Dispatcher) Encode

func (di *Dispatcher) Encode(s string) string

Encode a string using the DB's encoding.

func (*Dispatcher) PurgeExpired added in v0.3.16

func (di *Dispatcher) PurgeExpired(ctx context.Context) error

PurgeExpired calls PurgeExpired on the underlying queues, purging expired messages.

func (*Dispatcher) Run added in v0.3.0

func (di *Dispatcher) Run(ctx context.Context, taskNames []string) error

Run the dispatcher, accepting tasks with names in taskNames.

type DoFunc added in v0.6.1

type DoFunc func(context.Context, io.Writer, *Task) (io.Reader, error)

DoFunc is the type of the function that processes the Task.

type Task

type Task = pb.Task

Task is a task.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL