Documentation ¶
Index ¶
- Variables
- func AnyToString(v any) string
- func CtxWithLogger(ctx context.Context, dep any, v Logger) context.Context
- func CtxWithPingPong(ctx context.Context, v WaitPingPong) context.Context
- func ErrorExtractCode(err error) int
- func ErrorJoin3rdParty(myErr error, Err3rd error) error
- func ErrorJoin3rdPartyWithMsg(myErr error, Err3rd error, msg string, args ...any) error
- func ErrorWrapWithMessage(myErr error, msg string, args ...any) error
- func GenerateRandomCode(length int) string
- func GenerateUlid() string
- func LookupLogLevel(level LogLevel) string
- func PutMessage(message *Message)
- func ReliableTask(task func() error, allowStop func() bool, retryMaxSecond int, ...) error
- func SendPingWaitPong(sendPingSeconds int, sendPing func() error, waitPong WaitPingPong, ...) error
- func SetDefaultLogger(l Logger)
- func UseSkipMessage() func(message *Message, dep any) error
- func WaitPingSendPong(waitPingSeconds int, waitPing WaitPingPong, sendPong func() error, ...) error
- type Adapter
- func (adp *Adapter) Identifier() string
- func (adp *Adapter) IsStopped() bool
- func (adp *Adapter) Listen() (err error)
- func (adp *Adapter) Log() Logger
- func (adp *Adapter) OnDisconnect(terminates ...func(adp IAdapter))
- func (adp *Adapter) RawInfra() any
- func (adp *Adapter) RawSend(messages ...*Message) error
- func (adp *Adapter) Send(messages ...*Message) error
- func (adp *Adapter) SetLog(logger Logger)
- func (adp *Adapter) Stop() error
- func (adp *Adapter) WaitStop() chan struct{}
- type AdapterHub
- type AdapterOption
- func (opt *AdapterOption) AdapterFixup(maxRetrySecond int, adapterFixup func(IAdapter) error) *AdapterOption
- func (opt *AdapterOption) AdapterHub(hub AdapterHub) *AdapterOption
- func (opt *AdapterOption) AdapterRecv(adapterRecv func(logger Logger) (message *Message, err error)) *AdapterOption
- func (opt *AdapterOption) AdapterSend(adapterSend func(logger Logger, message *Message) error) *AdapterOption
- func (opt *AdapterOption) AdapterStop(adapterStop func(logger Logger) error) *AdapterOption
- func (opt *AdapterOption) Build() (adp IAdapter, err error)
- func (opt *AdapterOption) DecorateAdapter(wrap func(adapter IAdapter) (application IAdapter)) *AdapterOption
- func (opt *AdapterOption) EgressMux(mux *Mux) *AdapterOption
- func (opt *AdapterOption) Identifier(identifier string) *AdapterOption
- func (opt *AdapterOption) IngressMux(mux *Mux) *AdapterOption
- func (opt *AdapterOption) Lifecycle(setup func(life *Lifecycle)) *AdapterOption
- func (opt *AdapterOption) Logger(logger Logger) *AdapterOption
- func (opt *AdapterOption) RawInfra(infra any) *AdapterOption
- func (opt *AdapterOption) SendPing(sendPingSeconds int, waitPong WaitPingPong, sendPing func(IAdapter) error) *AdapterOption
- func (opt *AdapterOption) WaitPing(waitPingSeconds int, waitPing WaitPingPong, sendPong func(IAdapter) error) *AdapterOption
- type Consumer
- type CustomError
- type HandleFunc
- type Hub
- func (hub *Hub) Count(filter func(IAdapter) bool) int
- func (hub *Hub) DoAsync(action func(IAdapter))
- func (hub *Hub) DoSync(action func(IAdapter) (stop bool))
- func (hub *Hub) FindByKey(key string) (adp IAdapter, found bool)
- func (hub *Hub) FindMulti(filter func(IAdapter) bool) (all []IAdapter, found bool)
- func (hub *Hub) FindMultiByKey(keys []string) (all []IAdapter, found bool)
- func (hub *Hub) FindOne(filter func(IAdapter) bool) (adp IAdapter, found bool)
- func (hub *Hub) IsShutdown() bool
- func (hub *Hub) Join(key string, adp IAdapter) error
- func (hub *Hub) RemoveByKey(key string)
- func (hub *Hub) RemoveMulti(filter func(IAdapter) bool)
- func (hub *Hub) RemoveMultiByKey(keys []string)
- func (hub *Hub) RemoveOne(filter func(IAdapter) bool)
- func (hub *Hub) SetConcurrencyQty(concurrencyQty int)
- func (hub *Hub) Shutdown()
- func (hub *Hub) Total() int
- func (hub *Hub) UpdateByOldKey(oldKey string, update func(IAdapter) (freshKey string)) error
- func (hub *Hub) WaitShutdown() chan struct{}
- type IAdapter
- type Lifecycle
- type LogLevel
- type Logger
- type Message
- type Middleware
- func UseAsync() Middleware
- func UseExclude(subjects []string) Middleware
- func UseHowMuchTime() Middleware
- func UseInclude(subjects []string) Middleware
- func UseLogger(withMsgId bool, safeConcurrency SafeConcurrencyKind) Middleware
- func UsePrintResult(isEgress bool, ignoreOkSubjects []string) Middleware
- func UseRecover() Middleware
- func UseRetry(retryMaxSecond int) Middleware
- type Mux
- func (mux *Mux) DefaultHandler(h HandleFunc, mw ...Middleware) *Mux
- func (mux *Mux) EnableMessagePool() *Mux
- func (mux *Mux) Endpoints(action func(subject, handler string))
- func (mux *Mux) ErrorHandler(errHandlers ...Middleware) *Mux
- func (mux *Mux) Group(groupName string) *Mux
- func (mux *Mux) GroupByNumber(groupName int) *Mux
- func (mux *Mux) HandleMessage(message *Message, dependency any) (err error)
- func (mux *Mux) Handler(subject string, h HandleFunc, mw ...Middleware) *Mux
- func (mux *Mux) HandlerByNumber(subject int, h HandleFunc, mw ...Middleware) *Mux
- func (mux *Mux) Middleware(middlewares ...Middleware) *Mux
- func (mux *Mux) NotFoundHandler(h HandleFunc) *Mux
- func (mux *Mux) PostMiddleware(handleFuncs ...HandleFunc) *Mux
- func (mux *Mux) PreMiddleware(handleFuncs ...HandleFunc) *Mux
- func (mux *Mux) Transform(transform HandleFunc) *Mux
- type Producer
- type Prosumer
- type SafeConcurrencyKind
- type Shutdown
- type WaitPingPong
Constants ¶
This section is empty.
Variables ¶
var ( ErrClosed = NewCustomError(2001, "service has been closed") ErrNotFound = NewCustomError(2100, "not found") ErrNotFoundSubject = NewCustomError(2101, "not found subject mux") )
Functions ¶
func AnyToString ¶ added in v0.44.0
func CtxWithLogger ¶ added in v0.44.0
func CtxWithPingPong ¶ added in v0.44.0
func CtxWithPingPong(ctx context.Context, v WaitPingPong) context.Context
func ErrorExtractCode ¶
func ErrorJoin3rdParty ¶
func GenerateRandomCode ¶
func GenerateUlid ¶ added in v0.31.0
func GenerateUlid() string
func LookupLogLevel ¶
func PutMessage ¶ added in v0.44.0
func PutMessage(message *Message)
func ReliableTask ¶
func SendPingWaitPong ¶
func SendPingWaitPong(sendPingSeconds int, sendPing func() error, waitPong WaitPingPong, isStopped func() bool) error
func SetDefaultLogger ¶
func SetDefaultLogger(l Logger)
func UseSkipMessage ¶ added in v0.43.0
func WaitPingSendPong ¶
func WaitPingSendPong(waitPingSeconds int, waitPing WaitPingPong, sendPong func() error, isStop func() bool) error
Types ¶
type Adapter ¶ added in v0.16.0
type Adapter struct {
// contains filtered or unexported fields
}
func (*Adapter) Identifier ¶ added in v0.16.0
func (*Adapter) OnDisconnect ¶ added in v0.46.0
type AdapterHub ¶ added in v0.7.0
type AdapterOption ¶ added in v0.16.0
type AdapterOption struct {
// contains filtered or unexported fields
}
func NewAdapterOption ¶ added in v0.43.0
func NewAdapterOption() (opt *AdapterOption)
func (*AdapterOption) AdapterFixup ¶ added in v0.16.0
func (opt *AdapterOption) AdapterFixup(maxRetrySecond int, adapterFixup func(IAdapter) error) *AdapterOption
func (*AdapterOption) AdapterHub ¶ added in v0.36.0
func (opt *AdapterOption) AdapterHub(hub AdapterHub) *AdapterOption
func (*AdapterOption) AdapterRecv ¶ added in v0.16.0
func (opt *AdapterOption) AdapterRecv(adapterRecv func(logger Logger) (message *Message, err error)) *AdapterOption
func (*AdapterOption) AdapterSend ¶ added in v0.16.0
func (opt *AdapterOption) AdapterSend(adapterSend func(logger Logger, message *Message) error) *AdapterOption
func (*AdapterOption) AdapterStop ¶ added in v0.16.0
func (opt *AdapterOption) AdapterStop(adapterStop func(logger Logger) error) *AdapterOption
func (*AdapterOption) Build ¶ added in v0.33.0
func (opt *AdapterOption) Build() (adp IAdapter, err error)
func (*AdapterOption) DecorateAdapter ¶ added in v0.35.0
func (opt *AdapterOption) DecorateAdapter(wrap func(adapter IAdapter) (application IAdapter)) *AdapterOption
func (*AdapterOption) EgressMux ¶ added in v0.39.0
func (opt *AdapterOption) EgressMux(mux *Mux) *AdapterOption
func (*AdapterOption) Identifier ¶ added in v0.16.0
func (opt *AdapterOption) Identifier(identifier string) *AdapterOption
func (*AdapterOption) IngressMux ¶ added in v0.39.0
func (opt *AdapterOption) IngressMux(mux *Mux) *AdapterOption
func (*AdapterOption) Lifecycle ¶ added in v0.18.0
func (opt *AdapterOption) Lifecycle(setup func(life *Lifecycle)) *AdapterOption
func (*AdapterOption) Logger ¶ added in v0.37.0
func (opt *AdapterOption) Logger(logger Logger) *AdapterOption
func (*AdapterOption) RawInfra ¶ added in v0.43.0
func (opt *AdapterOption) RawInfra(infra any) *AdapterOption
func (*AdapterOption) SendPing ¶ added in v0.16.0
func (opt *AdapterOption) SendPing(sendPingSeconds int, waitPong WaitPingPong, sendPing func(IAdapter) error) *AdapterOption
SendPing
When SendPingWaitPong sends a ping message and waits for a corresponding pong message. SendPeriod = WaitSecond / 2
func (*AdapterOption) WaitPing ¶ added in v0.16.0
func (opt *AdapterOption) WaitPing(waitPingSeconds int, waitPing WaitPingPong, sendPong func(IAdapter) error) *AdapterOption
WaitPing
When WaitPingSendPong waits for a ping message and response a corresponding pong message. SendPeriod = WaitSecond
type CustomError ¶
type CustomError struct {
// contains filtered or unexported fields
}
func NewCustomError ¶
func NewCustomError(myCode int, title string) *CustomError
func (*CustomError) CustomError ¶
func (c *CustomError) CustomError()
func (*CustomError) Error ¶
func (c *CustomError) Error() string
func (*CustomError) MyCode ¶
func (c *CustomError) MyCode() int
type HandleFunc ¶
func Link ¶ added in v0.44.0
func Link(handler HandleFunc, middlewares ...Middleware) HandleFunc
func UseAdHocFunc ¶ added in v0.45.0
func UseAdHocFunc(AdHoc func(message *Message, dep any) error) HandleFunc
func UsePrintDetail ¶ added in v0.44.0
func UsePrintDetail() HandleFunc
func (HandleFunc) Link ¶ added in v0.44.0
func (h HandleFunc) Link(middlewares ...Middleware) HandleFunc
func (HandleFunc) PostMiddleware ¶
func (h HandleFunc) PostMiddleware() Middleware
func (HandleFunc) PreMiddleware ¶
func (h HandleFunc) PreMiddleware() Middleware
type Hub ¶ added in v0.9.0
type Hub struct {
// contains filtered or unexported fields
}
func (*Hub) FindMultiByKey ¶ added in v0.15.0
func (*Hub) IsShutdown ¶ added in v0.39.0
func (*Hub) RemoveByKey ¶ added in v0.15.0
func (*Hub) RemoveMulti ¶ added in v0.15.0
If filter returns true, remove target
func (*Hub) RemoveMultiByKey ¶ added in v0.15.0
func (*Hub) SetConcurrencyQty ¶ added in v0.9.0
SetConcurrencyQty concurrencyQty controls how many tasks can run simultaneously, preventing resource usage or avoid frequent context switches.
func (*Hub) UpdateByOldKey ¶ added in v0.15.0
func (*Hub) WaitShutdown ¶ added in v0.39.0
func (hub *Hub) WaitShutdown() chan struct{}
type Lifecycle ¶
type Lifecycle struct {
// contains filtered or unexported fields
}
Lifecycle define a management mechanism when init obj and terminate obj.
func (*Lifecycle) OnDisconnect ¶ added in v0.46.0
type Logger ¶
type Logger interface { Debug(format string, a ...any) Info(format string, a ...any) Warn(format string, a ...any) Error(format string, a ...any) Fatal(format string, a ...any) SetLogLevel(level LogLevel) LogLevel() LogLevel WithCallDepth(externalDepth uint) Logger WithKeyValue(key string, v any) Logger }
func DefaultLogger ¶
func DefaultLogger() Logger
func NewWriterLogger ¶ added in v0.31.6
func SilentLogger ¶
func SilentLogger() Logger
type Message ¶ added in v0.43.0
type Message struct { Subject string Bytes []byte Body any Mutex sync.Mutex // RouteParam are used to capture values from subject. // These parameters represent resources or identifiers. // // Example: // // define mux subject = "/users/{id}" // send or recv subject = "/users/1017" // // get route param: // key : value => id : 1017 RouteParam maputil.Data Metadata maputil.Data RawInfra any Ctx context.Context // contains filtered or unexported fields }
func GetMessage ¶ added in v0.44.0
func GetMessage() *Message
type Middleware ¶
type Middleware func(next HandleFunc) HandleFunc
func UseAsync ¶ added in v0.45.0
func UseAsync() Middleware
func UseExclude ¶ added in v0.44.0
func UseExclude(subjects []string) Middleware
func UseHowMuchTime ¶ added in v0.44.0
func UseHowMuchTime() Middleware
func UseInclude ¶ added in v0.44.0
func UseInclude(subjects []string) Middleware
func UseLogger ¶ added in v0.43.0
func UseLogger(withMsgId bool, safeConcurrency SafeConcurrencyKind) Middleware
func UsePrintResult ¶ added in v0.44.0
func UsePrintResult(isEgress bool, ignoreOkSubjects []string) Middleware
func UseRecover ¶ added in v0.44.0
func UseRecover() Middleware
func UseRetry ¶ added in v0.43.0
func UseRetry(retryMaxSecond int) Middleware
func (Middleware) Link ¶ added in v0.44.0
func (mw Middleware) Link(handler HandleFunc) HandleFunc
type Mux ¶
type Mux struct {
// contains filtered or unexported fields
}
Mux refers to a router or multiplexer, which can be used to handle different message. Itself is also a HandleFunc, but with added routing capabilities.
Message represents a high-level abstraction data structure containing metadata (e.g. header) + body
func NewMux ¶
NewMux If routeDelimiter is an empty string, Message.RouteParam cannot be used. RouteDelimiter can only be set to a string of length 1. This parameter determines different parts of the Message.Subject.
func (*Mux) DefaultHandler ¶ added in v0.40.0
func (mux *Mux) DefaultHandler(h HandleFunc, mw ...Middleware) *Mux
DefaultHandler When a subject cannot be found, execute the 'Default'.
"The difference between 'Default' and 'NotFound' is that the 'Default' handler will utilize middleware, whereas 'NotFound' won't use middleware."
func (*Mux) EnableMessagePool ¶ added in v0.44.0
func (*Mux) ErrorHandler ¶ added in v0.40.0
func (mux *Mux) ErrorHandler(errHandlers ...Middleware) *Mux
func (*Mux) GroupByNumber ¶ added in v0.13.0
func (*Mux) Handler ¶
func (mux *Mux) Handler(subject string, h HandleFunc, mw ...Middleware) *Mux
func (*Mux) HandlerByNumber ¶ added in v0.13.0
func (mux *Mux) HandlerByNumber(subject int, h HandleFunc, mw ...Middleware) *Mux
func (*Mux) Middleware ¶
func (mux *Mux) Middleware(middlewares ...Middleware) *Mux
Middleware Before registering handler, middleware must be defined; otherwise, the handler won't be able to use middleware.
func (*Mux) NotFoundHandler ¶ added in v0.40.0
func (mux *Mux) NotFoundHandler(h HandleFunc) *Mux
NotFoundHandler When a subject cannot be found, execute the 'NotFound'.
"The difference between 'Default' and 'NotFound' is that the 'Default' handler will utilize middleware, whereas 'NotFound' won't use middleware."
func (*Mux) PostMiddleware ¶
func (mux *Mux) PostMiddleware(handleFuncs ...HandleFunc) *Mux
func (*Mux) PreMiddleware ¶
func (mux *Mux) PreMiddleware(handleFuncs ...HandleFunc) *Mux
func (*Mux) Transform ¶
func (mux *Mux) Transform(transform HandleFunc) *Mux
Transform Originally, the message passed through the mux would only call 'getSubject' once. However, if there is a definition of Transform, when the message passes through the Transform function, 'getSubject' will be called again.
type SafeConcurrencyKind ¶ added in v0.45.0
type SafeConcurrencyKind int
const ( SafeConcurrency_Skip SafeConcurrencyKind = iota SafeConcurrency_Mutex SafeConcurrency_Copy )
type Shutdown ¶
type Shutdown struct { Logger Logger // contains filtered or unexported fields }
func NewShutdown ¶ added in v0.9.2
func NewShutdown() *Shutdown
func (*Shutdown) StopService ¶ added in v0.40.0
func (*Shutdown) WaitFinish ¶ added in v0.40.0
func (s *Shutdown) WaitFinish() chan struct{}
type WaitPingPong ¶ added in v0.44.0
type WaitPingPong chan struct{}
func CtxGetPingPong ¶ added in v0.44.0
func CtxGetPingPong(ctx context.Context) WaitPingPong
func NewWaitPingPong ¶ added in v0.44.0
func NewWaitPingPong() WaitPingPong
func (WaitPingPong) Ack ¶ added in v0.44.0
func (wait WaitPingPong) Ack()