broker

package
v0.0.0-...-60b9055 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2019 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	//RUNNING status
	RUNNING = iota
	//STOP status
	STOP
)
View Source
const (
	//LEVELON status
	LEVELON = 0
	//LEVELOFF status
	LEVELOFF = 1
)
View Source
const (
	//SAVE action
	SAVE = 1
	//DELETE action
	DELETE = 0

	//NEVERSYNC strategy
	NEVERSYNC = 0
	//EVERYSECOND strategy
	EVERYSECOND = 1
	//ALWAYSSYNC strategy
	ALWAYSSYNC = 2
)

Variables

View Source
var (
	//ErrEmptyMsgList occurs when the msgs on disk is scanned over.
	ErrEmptyMsgList = errors.New("empty Msg list")
)
View Source
var OneBroker sync.Once

OneBroker Only one broker

Functions

func ArbitrateConfigs

func ArbitrateConfigs(c *Configure)

ArbitrateConfigs will check the config file

func DisableDebug

func DisableDebug()

DisableDebug disable Debug logs

func EnableDebug

func EnableDebug()

EnableDebug enable Debug logs

func GetLocalIP

func GetLocalIP() string

GetLocalIP will return local IP address

func LoadConfig

func LoadConfig()

LoadConfig load default config and cmd-line flags

func NewBroker

func NewBroker()

NewBroker be invoked by singleton

func NewMsgChan

func NewMsgChan() msgChan

NewMsgChan creates a new msgChan

func PanicIfErr

func PanicIfErr(err error)

PanicIfErr will panic if err is not nil

func PortToLocalAddr

func PortToLocalAddr(port int) string

PortToLocalAddr convert the port to "host:port"

func ServeAPI

func ServeAPI(w http.ResponseWriter, r *http.Request)

ServeAPI handle the message service by rest api.

func ServeHTTP

func ServeHTTP(l net.Listener)

ServeHTTP serves the rest requests

func ServeHome

func ServeHome(w http.ResponseWriter, r *http.Request)

ServeHome will export metrics, which maybe used by prometheus etc.

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker instance

var DefaultBroker *Broker

DefaultBroker used by singleton

func GetInstance

func GetInstance() *Broker

GetInstance is the singleton interface

func (*Broker) Delete

func (b *Broker) Delete(topic string)

Delete topic

func (*Broker) Get

func (b *Broker) Get(topic string) *TopicQueue

Get topic

func (*Broker) Replay

func (b *Broker) Replay()

Replay will recover the msgs in db when the broker startup

func (*Broker) Start

func (b *Broker) Start()

Start the broker

func (*Broker) Stat

func (b *Broker) Stat() *Stat

Stat returns a Stat replication

func (*Broker) Stop

func (b *Broker) Stop()

Stop the broker

func (*Broker) Storage

func (b *Broker) Storage() Storage

Storage return the Storage

func (*Broker) TotalTopic

func (b *Broker) TotalTopic() int

TotalTopic returns the number of topics currently.

type Client

type Client struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Client that broker based on, to handle per request

func (*Client) Ack

func (c *Client) Ack() error

Ack send ack

func (*Client) Close

func (c *Client) Close()

Close the client

func (*Client) Run

func (c *Client) Run()

Run loop of a client, until a error occur.

type Configure

type Configure struct {
	HttpPort int
	MsgPort  int

	Auth     bool
	UserName string
	Token    string

	Retry int

	Aof       string
	SyncType  int
	Threshold int
}

Configure schema

var Config *Configure = new(Configure)

Config is the default config file

type Exporter

type Exporter struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Exporter instance

func NewExporter

func NewExporter(ns string, addr string) *Exporter

NewExporter creates a new Exporter

func (*Exporter) Collect

func (e *Exporter) Collect(ch chan<- prometheus.Metric)

Collect fetches and updates the appropriate metrics.

func (*Exporter) Describe

func (e *Exporter) Describe(ch chan<- *prometheus.Desc)

Describe outputs the metric descriptions.

func (*Exporter) Scrape

func (e *Exporter) Scrape(stat *Stat)

Scrape set the metrics value to gauge

type Logger

type Logger struct {
	// contains filtered or unexported fields
}

Logger instance

var (
	//Debug level
	Debug *Logger
	//Log level
	Log *Logger
	//Error level
	Error *Logger
)

func (*Logger) Flags

func (l *Logger) Flags() int

Flags wrapper the log.Flags

func (*Logger) Level

func (l *Logger) Level() int32

Level shows level

func (*Logger) Panic

func (l *Logger) Panic(v ...interface{})

Panic wrapper the log.Panic

func (*Logger) Panicf

func (l *Logger) Panicf(format string, v ...interface{})

Panicf wrapper the log.Panicf

func (*Logger) Panicln

func (l *Logger) Panicln(v ...interface{})

Panicln wrapper the log.Panicln

func (*Logger) Prefix

func (l *Logger) Prefix() string

Prefix wrapper the log.Prefix

func (*Logger) Print

func (l *Logger) Print(v ...interface{})

Print wrapper the log.Print

func (*Logger) Printf

func (l *Logger) Printf(format string, v ...interface{})

Printf wrapper the log.Printf

func (*Logger) Println

func (l *Logger) Println(v ...interface{})

Println wrapper the log.Println

func (*Logger) SetFlags

func (l *Logger) SetFlags(flag int)

SetFlags wrapper the log.SetFlags

func (*Logger) SetLevel

func (l *Logger) SetLevel(level int32)

SetLevel set switch on/off

func (*Logger) SetPrefix

func (l *Logger) SetPrefix(prefix string)

SetPrefix wrapper the log.SetPrefix

type StableStorage

type StableStorage struct {
	// contains filtered or unexported fields
}

StableStorage instance

func NewStable

func NewStable() *StableStorage

NewStable creates a StableStorage

func (*StableStorage) Close

func (s *StableStorage) Close() error

Close the storage

func (*StableStorage) Delete

func (s *StableStorage) Delete(msgs ...*msg.Message) error

Delete remove the msg from disk

func (*StableStorage) Get

func (s *StableStorage) Get() (*msg.Message, error)

Get read the msg from disk

func (*StableStorage) Save

func (s *StableStorage) Save(m ...*msg.Message) error

Save store the msg into disk

func (*StableStorage) Truncate

func (s *StableStorage) Truncate()

Truncate just for test

type Stat

type Stat struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Stat instance

func NewStat

func NewStat() *Stat

NewStat creates a stat object

func (*Stat) Add

func (s *Stat) Add(topic string, num int64)

Add increase the msg number of total and topic specified

func (*Stat) Get

func (s *Stat) Get() *Stat

Get will create a replication of Stat at current time point

func (*Stat) Subscribe

func (s *Stat) Subscribe(topic string)

Subscribe increase the subscribers number of per topic

func (*Stat) Success

func (s *Stat) Success(topic string, num int64)

Success increase the msg number that has been received by consumers.

type Storage

type Storage interface {
	//Save store the msg into disk
	Save(m ...*msg.Message) error
	//Get read the msg from disk
	Get() (*msg.Message, error)
	//Delete remove the msg from disk
	Delete(m ...*msg.Message) error
	//Close shutdown the storage
	Close() error
	//Truncate will remove the db file, that is only used in dev.
	Truncate()
}

Storage interface

type StorageAOF

type StorageAOF struct {
	MaxID uint64
	// contains filtered or unexported fields
}

StorageAOF instance

func NewStorageAOF

func NewStorageAOF(filename string, syncType int32, threshold int) *StorageAOF

NewStorageAOF creates a AOF storage

func (*StorageAOF) Close

func (s *StorageAOF) Close() error

Close aof

func (*StorageAOF) Delete

func (s *StorageAOF) Delete(m ...*msg.Message) error

Delete msg

func (*StorageAOF) DeleteOps

func (s *StorageAOF) DeleteOps() uint64

DeleteOps returns the deleteops of the AOF, to determine when to rewrite.

func (*StorageAOF) Get

func (s *StorageAOF) Get() (*msg.Message, error)

Get msg

func (*StorageAOF) Rewrite

func (s *StorageAOF) Rewrite(curMaxID uint64, startup bool) bool

Rewrite with param curMaxID, it won't rewrite messages that written into aof file after rewrite started. I achieve this by compare the MaxID, when background rewrite is invoked, pass s.MaxID as params curMaxID. when discover a Message ID > curMaxID in the progress of reading old file, then break. however, messages should never be duplicated. this takes effect on SAVE cmd. and DELETE cmd won't need de-duplicated, because msgID is never duplicated, and each can only be delete once.

func (*StorageAOF) Save

func (s *StorageAOF) Save(m ...*msg.Message) error

Save msg

func (*StorageAOF) Stat

func (s *StorageAOF) Stat() os.FileInfo

Stat returns the aof file's Stat

func (*StorageAOF) Truncate

func (s *StorageAOF) Truncate()

Truncate aof file

type TopicQueue

type TopicQueue struct {
	// contains filtered or unexported fields
}

TopicQueue instance, represent a topic.

func NewTopicQueue

func NewTopicQueue(broker *Broker, topic string) *TopicQueue

NewTopicQueue creates a TopicQueue

func (*TopicQueue) Bind

func (t *TopicQueue) Bind(s *subscribe)

Bind the subscriber

func (*TopicQueue) Close

func (t *TopicQueue) Close()

Close the topic queue

func (*TopicQueue) NumberOfSubscribers

func (t *TopicQueue) NumberOfSubscribers() int64

NumberOfSubscribers returns the current number of subscribers

func (*TopicQueue) Push

func (t *TopicQueue) Push(m *msg.Message)

Push msg

func (*TopicQueue) Run

func (t *TopicQueue) Run()

Run loop

func (*TopicQueue) Status

func (t *TopicQueue) Status() int

Status returns the runtime status of the topic

func (*TopicQueue) Unbind

func (t *TopicQueue) Unbind(s *subscribe)

Unbind the subscriber

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL