message

package
v0.0.0-...-753ac85 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2021 License: BSD-2-Clause-Views Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Log = logrus.New()

Functions

func ForkChannel

func ForkChannel(in *MessageChannel, out ...*MessageChannel)

func FromJson

func FromJson(o Message) ([]byte, error)

func NewKazaam

func NewKazaam(spec string) *kazaam.Kazaam

Types

type ChannelDB

type ChannelDB map[uint64]*MessageChannel
var Channels ChannelDB

type Message

type Message map[string]interface{}

func Jsonf

func Jsonf(format string, args ...interface{}) (Message, error)

func KazaamHandler

func KazaamHandler(kz *kazaam.Kazaam, o Message) (Message, error)

func PostHandler

func PostHandler(url string, o Message) (Message, error)

func ScriptHandler

func ScriptHandler(cmdline []string, o Message) (Message, error)

func ToJson

func ToJson(data []byte) (Message, error)

func (*Message) GetPath

func (msg *Message) GetPath(jp string) (interface{}, error)

func (*Message) String

func (msg *Message) String() (string, error)

type MessageChannel

type MessageChannel struct {
	Quit chan bool
	C    chan Message
	// contains filtered or unexported fields
}

func AllFinalChannels

func AllFinalChannels() []*MessageChannel

func ConsumeChannels

func ConsumeChannels(h MessageChannelSource, name string, cs ...*MessageChannel) *MessageChannel

func FetchFeedHandler

func FetchFeedHandler(urlPath string, o Message) (*MessageChannel, error)

func FilterChannels

func FilterChannels(h MessageFilter, name string, cs ...*MessageChannel) *MessageChannel

func NewMessageChannel

func NewMessageChannel(name string, sz ...int) *MessageChannel

func ProcessChannels

func ProcessChannels(h MessageHandler, name string, cs ...*MessageChannel) *MessageChannel

func (*MessageChannel) AddInput

func (channel *MessageChannel) AddInput(in *MessageChannel)

func (*MessageChannel) Close

func (channel *MessageChannel) Close()

func (*MessageChannel) Consume

func (channel *MessageChannel) Consume(src *MessageChannel)

func (*MessageChannel) Done

func (channel *MessageChannel) Done()

func (*MessageChannel) ID

func (channel *MessageChannel) ID() uint64

func (*MessageChannel) Inputs

func (channel *MessageChannel) Inputs() []*MessageChannel

func (*MessageChannel) IsFinal

func (channel *MessageChannel) IsFinal() bool

func (*MessageChannel) MarshalJSON

func (channel *MessageChannel) MarshalJSON() ([]byte, error)

func (*MessageChannel) Process

func (channel *MessageChannel) Process(sendToChannel MessageSender, cs ...*MessageChannel) *MessageChannel

func (*MessageChannel) Recv

func (channel *MessageChannel) Recv() Message

func (*MessageChannel) RecvCount

func (channel *MessageChannel) RecvCount() int

func (*MessageChannel) Send

func (channel *MessageChannel) Send(o Message)

func (*MessageChannel) SentCount

func (channel *MessageChannel) SentCount() int

func (*MessageChannel) Sink

func (channel *MessageChannel) Sink()

func (*MessageChannel) String

func (channel *MessageChannel) String() string

func (*MessageChannel) Wait

func (channel *MessageChannel) Wait()

type MessageChannelSource

type MessageChannelSource func(Message) (*MessageChannel, error)

type MessageFilter

type MessageFilter func(Message) bool

type MessageHandler

type MessageHandler func(Message) (Message, error)

type MessageSender

type MessageSender func(*MessageChannel, Message)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL