engine

package
v0.0.0-...-8f325c2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2023 License: MPL-2.0 Imports: 51 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ConfigurationExitCode = 10
	JetStreamExitCode     = 11
	GRPCServerExitCode    = 12
	HTTPServerExitCode    = 13
	TelemetryExitCode     = 14
	ResourceStoreExitCode = 15

	InterruptCode = 130
)

OS status codes

View Source
const (
	TplComponentKey = "component/%s"
	TplAdapterKey   = "adapter/%s"
)

Variables

View Source
var (
	ErrKubeFox = errors.New("")

	ErrComponentGone     = fmt.Errorf("%wcomponent gone", ErrKubeFox)
	ErrComponentMismatch = fmt.Errorf("%wcomponent mismatch", ErrKubeFox)
	ErrEventInvalid      = fmt.Errorf("%wevent invalid", ErrKubeFox)
	ErrEventRequestGone  = fmt.Errorf("%wevent request gone", ErrKubeFox)
	ErrEventTimeout      = fmt.Errorf("%wevent time out", ErrKubeFox)
	ErrRouteInvalid      = fmt.Errorf("%wroute invalid", ErrKubeFox)
	ErrRouteNotFound     = fmt.Errorf("%wroute not found", ErrKubeFox)
	ErrSubCanceled       = fmt.Errorf("%wsubscription canceled", ErrKubeFox)
	ErrUnexpected        = fmt.Errorf("%wunexpected error", ErrKubeFox)
)
View Source
var (
	EventStreamTTL = time.Hour * 24 * 3 // 3 days
	ComponentsTTL  = time.Hour * 12     // 12 hours
)
View Source
var (
	NoopCancel = func(err error) {}
)

Functions

This section is empty.

Types

type Broker

type Broker interface {
	AuthorizeComponent(context.Context, *kubefox.Component, string) error
	Subscribe(context.Context, *SubscriptionConf) (ReplicaSubscription, error)
	RecvEvent(*LiveEvent) error
	Id() string
}

type Engine

type Engine interface {
	Start()
}

func New

func New() Engine

type GRPCServer

type GRPCServer struct {
	grpc.UnimplementedBrokerServer
	// contains filtered or unexported fields
}

func NewGRPCServer

func NewGRPCServer(brk Broker) *GRPCServer

func (*GRPCServer) Shutdown

func (srv *GRPCServer) Shutdown(timeout time.Duration)

func (*GRPCServer) Start

func (srv *GRPCServer) Start(ctx context.Context) error

func (*GRPCServer) Subscribe

func (srv *GRPCServer) Subscribe(stream grpc.Broker_SubscribeServer) error

type GroupSubscription

type GroupSubscription interface {
	Subscription
}

type HTTPClient

type HTTPClient struct {
	// contains filtered or unexported fields
}

func NewHTTPClient

func NewHTTPClient(brk Broker) *HTTPClient

func (*HTTPClient) SendEvent

func (c *HTTPClient) SendEvent(req *LiveEvent) error

type HTTPServer

type HTTPServer struct {
	// contains filtered or unexported fields
}

func NewHTTPServer

func NewHTTPServer(brk Broker) *HTTPServer

func (*HTTPServer) Component

func (srv *HTTPServer) Component() *kubefox.Component

func (*HTTPServer) ServeHTTP

func (srv *HTTPServer) ServeHTTP(resWriter http.ResponseWriter, httpReq *http.Request)

func (*HTTPServer) Shutdown

func (srv *HTTPServer) Shutdown(timeout time.Duration)

func (*HTTPServer) Start

func (srv *HTTPServer) Start() (err error)

func (*HTTPServer) Subscription

func (srv *HTTPServer) Subscription() Subscription

type JetStreamClient

type JetStreamClient struct {
	jetstream.JetStream
	// contains filtered or unexported fields
}

func NewJetStreamClient

func NewJetStreamClient(brk Broker) *JetStreamClient

func (*JetStreamClient) Close

func (c *JetStreamClient) Close()

func (*JetStreamClient) ComponentsKV

func (c *JetStreamClient) ComponentsKV() jetstream.KeyValue

func (*JetStreamClient) Connect

func (c *JetStreamClient) Connect(ctx context.Context) error

func (*JetStreamClient) IsHealthy

func (c *JetStreamClient) IsHealthy(ctx context.Context) bool

func (*JetStreamClient) Name

func (c *JetStreamClient) Name() string

func (*JetStreamClient) Publish

func (c *JetStreamClient) Publish(subject string, evt *kubefox.Event) error

func (*JetStreamClient) PullEvents

func (c *JetStreamClient) PullEvents(sub ReplicaSubscription) error

type LiveEvent

type LiveEvent struct {
	*kubefox.Event

	MatchedEvent *kubefox.MatchedEvent

	Receiver     Receiver
	ReceivedAt   time.Time
	Subscription ReplicaSubscription
	ErrCh        chan error
	// contains filtered or unexported fields
}

func (*LiveEvent) Err

func (evt *LiveEvent) Err(err error)

func (*LiveEvent) TTL

func (evt *LiveEvent) TTL() time.Duration

type Receiver

type Receiver int
const (
	ReceiverJetStream Receiver = iota
	ReceiverGRPCServer
	ReceiverHTTPServer
	ReceiverHTTPClient
)

func (Receiver) String

func (r Receiver) String() string

type RecvEvent

type RecvEvent func(*LiveEvent) error

type RecvMsg

type RecvMsg func(*nats.Msg)

type ReplicaSubscription

type ReplicaSubscription interface {
	Subscription
	Component() *kubefox.Component
	ComponentReg() *kubefox.ComponentReg
	IsGroupEnabled() bool
	Context() context.Context
	Cancel(err error)
	Err() error
}

type SendEvent

type SendEvent func(*LiveEvent) error

type Store

type Store struct {
	// contains filtered or unexported fields
}

func NewStore

func NewStore(namespace string) *Store

func (*Store) Adapter

func (str *Store) Adapter(ctx context.Context, comp *kubefox.Component) bool

func (*Store) BrokerMap

func (str *Store) BrokerMap() (map[string]string, error)

TODO return a map of node names to broker pod id. This will allow running broker without host network. Broker just sends back correct ip during subscribe.

func (*Store) Close

func (str *Store) Close()

func (*Store) Component

func (str *Store) Component(ctx context.Context, comp *kubefox.Component) (*kubefox.ComponentReg, error)

func (*Store) Deployment

func (str *Store) Deployment(name string) (*v1alpha1.Deployment, error)

func (*Store) DeploymentMatcher

func (str *Store) DeploymentMatcher(ctx context.Context, evtCtx *kubefox.EventContext) (*matcher.EventMatcher, error)

func (*Store) Environment

func (str *Store) Environment(name string) (*v1alpha1.Environment, error)

func (*Store) OnAdd

func (str *Store) OnAdd(obj interface{}, isInInitialList bool)

func (*Store) OnDelete

func (str *Store) OnDelete(obj interface{})

func (*Store) OnUpdate

func (str *Store) OnUpdate(oldObj, obj interface{})

func (*Store) Open

func (str *Store) Open(compRegKV jetstream.KeyValue) error

func (*Store) RegisterAdapter

func (str *Store) RegisterAdapter(ctx context.Context, comp *kubefox.Component) error

func (*Store) RegisterComponent

func (str *Store) RegisterComponent(ctx context.Context, comp *kubefox.Component, reg *kubefox.ComponentReg) error

func (*Store) Release

func (str *Store) Release(name string) (*v1alpha1.Release, error)

func (*Store) ReleaseMatcher

func (str *Store) ReleaseMatcher(ctx context.Context) (*matcher.EventMatcher, error)

type Subscription

type Subscription interface {
	SendEvent(evt *LiveEvent) error
	IsActive() bool
}

type SubscriptionConf

type SubscriptionConf struct {
	Component   *kubefox.Component
	CompReg     *kubefox.ComponentReg
	SendFunc    SendEvent
	EnableGroup bool
}

type SubscriptionMgr

type SubscriptionMgr interface {
	Create(ctx context.Context, cfg *SubscriptionConf, recvCh chan *LiveEvent) (ReplicaSubscription, error)
	Subscription(comp *kubefox.Component) (Subscription, bool)
	ReplicaSubscription(comp *kubefox.Component) (ReplicaSubscription, bool)
	GroupSubscription(comp *kubefox.Component) (GroupSubscription, bool)
	Subscriptions() []ReplicaSubscription
	Close()
}

func NewManager

func NewManager() SubscriptionMgr

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL