Documentation ¶
Index ¶
- type Actor
- func (actor *Actor) GetStreamName() string
- func (actor *Actor) OnBootLoadConfig() error
- func (actor *Actor) PublishConfig(key string, data []byte) error
- func (actor *Actor) RunConfigListener(ctx context.Context)
- func (actor *Actor) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (actor *Actor) SetDownstreams(downstreams ...string)
- func (actor *Actor) SetOnConfigDelete(handler func(context.Context, *nats.Msg))
- func (actor *Actor) SetOnConfigUpdate(handler func(context.Context, *nats.Msg))
- type ActorConfig
- type ActorNatsConfig
- type CronActor
- type IActor
- type RaftActor
- type WSActorConfig
- type WorkerActor
- type WorkerWSActor
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Actor ¶
Actor is the base struct for all actors. It provides common helper functions and conforms to IActor.
func (*Actor) GetStreamName ¶
GetStreamName returns the stream name that the actor subscribed to.
func (*Actor) OnBootLoadConfig ¶
OnBootLoadConfig loads the configuration to setup the underlying object
func (*Actor) PublishConfig ¶
PublishConfig data into JetStream with a nats key. The nats key looks like this: stream-name.optional-key.command:UPDATE|DELETE.
func (*Actor) RunConfigListener ¶
RunConfigListener listens to config changes and execute hooks
func (*Actor) ServeHTTP ¶
func (actor *Actor) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP supports updating and deleting object configuration via HTTP. Supported commands are POST, PUT, DELETE, and UNSUB HTTP GET should only be supported by the underlying object. Override this method if you want to do something custom.
func (*Actor) SetDownstreams ¶
SetOnConfigDelete
func (*Actor) SetOnConfigDelete ¶
SetOnConfigDelete
func (*Actor) SetOnConfigUpdate ¶
SetOnConfigUpdate
type ActorConfig ¶
type ActorConfig struct { // Workers is the number of workers for this actor Workers int // HTTPAddr is the address to bind the HTTP server HTTPAddr string // Configuration for Nats Nats ActorNatsConfig // ConfigKV is the KV store available for all actors. ConfigKV *configkv.ConfigKV }
ActorConfig is the config that all actors need
type ActorNatsConfig ¶
type ActorNatsConfig struct { // Addr is the address to connect to Addr string // Conn is the connection to a NATS cluster Conn *nats.Conn // JetStreamContext JetStreamContext nats.JetStreamContext // StreamConfig StreamConfig *nats.StreamConfig // StreamChanBuffer, min is 64000 StreamChanBuffer int }
type CronActor ¶
type CronActor struct { Actor CronCollection *cron.CronCollection IsLeader chan bool IsFollower chan bool }
func NewCronActor ¶
func NewCronActor(actorConfig ActorConfig) (*CronActor, error)
NewCronActor is the constructor for CronActors
func (*CronActor) OnBecomingFollowerBlocking ¶
OnBecomingFollowerBlocking turn off all cron schedulers.
func (*CronActor) OnBecomingLeaderBlocking ¶
OnBecomingLeaderBlocking turn on all cron schedulers.
func (*CronActor) OnBootLoadConfig ¶
OnBootLoadConfig loads cron config from KV store and notify the listener to setup cron schedulers.
func (*CronActor) ServeHTTP ¶
func (actor *CronActor) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP supports updating and deleting object configuration via HTTP. Supported commands are POST, PUT, DELETE, and UNSUB HTTP GET should only be supported by the underlying object. Override this method if you want to do something custom.
type IActor ¶
type IActor interface { // GetStreamName GetStreamName() string // RunConfigListener RunConfigListener(context.Context) // PublishConfig PublishConfig(string, []byte) error // ServeHTTP ServeHTTP(http.ResponseWriter, *http.Request) // OnBootLoadConfig OnBootLoadConfig() error // SetOnConfigUpdate SetOnConfigUpdate(func(context.Context, *nats.Msg)) // SetOnConfigDelete SetOnConfigDelete(func(context.Context, *nats.Msg)) // SetDownstreams SetDownstreams(...string) // contains filtered or unexported methods }
IActor is the interface to conform to for all actors
type RaftActor ¶
type RaftActor struct { Actor Raft *raft.Raft OnBecomingLeader func(state graft.State) OnBecomingFollower func(state graft.State) OnBecomingCandidate func(state graft.State) OnClosed func(state graft.State) }
func NewRaftActor ¶
func NewRaftActor(actorConfig ActorConfig) (*RaftActor, error)
NewRaftActor is the constructor of RaftActor
Example ¶
httpAddr := ":3000" // Every Actor always subscribe to Nats JetStream nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() jetstreamContext, _ := nc.JetStream() // Every Actor always store its config on JetStream's KV store confkv, _ := configkv.NewConfigKV(jetstreamContext) raftActorConfig := actors.ActorConfig{ HTTPAddr: httpAddr, ConfigKV: confkv, Nats: actors.ActorNatsConfig{ Addr: nats.DefaultURL, Conn: nc, JetStreamContext: jetstreamContext, StreamConfig: &nats.StreamConfig{ MaxAge: 1 * time.Minute, }, }, } // Always setup cancellation context so that Actor can shutdown properly ctx, done := context.WithCancel(context.Background()) defer done() wg, ctx := errgroup.WithContext(ctx) // ConfigActor job is to receive config (from HTTP) raftActor, _ := actors.NewRaftActor(raftActorConfig) raftActor.OnBecomingLeader = func(state graft.State) { fmt.Printf("node is becoming a leader\n") } raftActor.OnBecomingFollower = func(state graft.State) { fmt.Printf("node is becoming a follower\n") } wg.Go(func() error { raftActor.RunConfigListener(ctx) return nil }) raftActor.OnBootLoadConfig() // Running HTTP Server to modify or display config r := chi.NewRouter() r.Use(middleware.Logger) r.Get("/", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.Write([]byte(`{"msg": "welcome"}`)) }) // POST /api/admin/raft // POST /api/admin/raft?command=UNSUB // sDELETE /api/admin/raft r.Method("POST", "/api/admin/raft", raftActor) r.Method("DELETE", "/api/admin/raft", raftActor) // GET method for raft metadata is handled by the underlying Raft struct r.Method("GET", "/api/admin/raft", raft.NewRaftHTTPGet(raftActor.Raft)) r.Method("GET", "/api/admin/configkv", configkv.NewConfigKVHTTPGetAll(confkv)) fmt.Printf("running an HTTP server...\n") httpServer := &http.Server{Addr: httpAddr, Handler: r} wg.Go(func() error { if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { return err } return nil }) // Listen to interrupts to cleanly kill Actors. wg.Go(func() error { signalChannel := make(chan os.Signal, 1) signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM) select { case <-signalChannel: fmt.Printf("signal received\n") httpServer.Shutdown(ctx) done() case <-ctx.Done(): fmt.Printf("closing signal goroutine\n") return ctx.Err() } return nil }) // wait for all errgroup goroutines wg.Wait()
Output:
func (*RaftActor) OnBootLoadConfig ¶
OnBootLoadConfig loads config from KV store and publish them so that we can build a consensus.
type WSActorConfig ¶
type WSActorConfig struct { // Workers is the number of workers for this actor Workers int // WSURL is the ws:// address to connect to WSURL string WSConfig recws.RecConn }
WSActorConfig
type WorkerActor ¶
type WorkerActor struct {
Actor
}
WorkerActor is a generic Actor. When it received an UPDATE command, it will execute the comand with the payload as parameters. DELETE is a no-op because WorkerActor doesn't store its config in the KV store.
func NewWorkerActor ¶
func NewWorkerActor(actorConfig ActorConfig, name string) (*WorkerActor, error)
NewWorkerActor is the constructor for WorkerActor
func (*WorkerActor) WSHandler ¶
func (actor *WorkerActor) WSHandler(w http.ResponseWriter, r *http.Request)
WSHandler is a websocket HTTP handler. It receives websocket connections and then pushes config data to websocket clients.
type WorkerWSActor ¶
type WorkerWSActor struct {
// contains filtered or unexported fields
}
WorkerWSActor receives parameters over websocket and execute work It does not inherit from Actor because it doesn't connect to a Nats.
func NewWorkerWSActor ¶
func NewWorkerWSActor(config WSActorConfig) (*WorkerWSActor, error)
func (*WorkerWSActor) RunConfigListener ¶
func (actor *WorkerWSActor) RunConfigListener(ctx context.Context)
RunConfigListener listens to config changes and update the storage
func (*WorkerWSActor) SetOnConfigDelete ¶
func (actor *WorkerWSActor) SetOnConfigDelete(handler func(context.Context, *nats.Msg))
SetOnConfigDelete
func (*WorkerWSActor) SetOnConfigUpdate ¶
func (actor *WorkerWSActor) SetOnConfigUpdate(handler func(context.Context, *nats.Msg))
SetOnConfigUpdate