alpaca

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2020 License: MIT Imports: 20 Imported by: 0

README

Alpaca-mq

  • High availability based on kafka
  • Support flow control, overload protection
  • Quickly complete the construction of multiple applications and multiple services
  • Support message delay queue
  • Point-to-point consumption based on command points

Message queue for service decoupling, You can use it to decouple services, but it is worth noting that you need to ensure that your services are idempotent.

Installation

Install alpaca-mq using the "go get" command:

  • go get github.com/SheepGardener/alpaca-mq

Puller

The pusher of the message, produces and builds the message, and completes the push of the message, Puller startup is very simple, you can start it quickly, of course, you need to configure puller first

	package main

	import (
		alpaca "github.com/SheepGardener/alpaca-mq"
	)

	func main() {
		puller := alpaca.InitPuller("./log/puller.log","./config/puller.yml", "./config/apps/")
		puller.Pull()
	}

Pusher

The consumer of the message, passing the message to the service application,Here is only a simple example, in fact, you can customize a pusher service more flexibly


	package main

	import (
		"encoding/json"
		"fmt"
		"net/http"

		alpaca "github.com/SheepGardener/alpaca-mq"
	)

	type Response struct {
		Errno  int8   `json:"errno"`
		Errmsg string `json:"errmsg"`
		Logid  string `json:"log_id"`
	}

	var Pusher *alpaca.Pusher

	func init() {
		Pusher = alpaca.InitPusher("./log/pusher.log","./config/pusher.yml")
	}

	func sendMsg(w http.ResponseWriter, r *http.Request) {
		r.ParseForm()
		resp := Response{}
		resp.Errno = 0
		resp.Errmsg = "success"

		Logid := r.Form.Get("logid")
		Cmd := r.Form.Get("cmd")
		Hashkey := r.Form.Get("hash_key")
		Data := r.Form.Get("data")

		if Logid == "" {
			Logid = "test" //alpaca.GetLogId()
		}

		if Cmd == "" {
			w.Write([]byte("{\"errno\":-1,\"errmsg\":\"Command cannot be empty\"}"))
			return
		}

		resp.Logid = Logid

		kmsg := &alpaca.Kmessage{
			Cmd:     Cmd,
			Data:    Data,
			LogId:   Logid,
			HashKey: Hashkey,
			Delay: 12,
		}

		err := Pusher.Push(kmsg)

		if err != nil {
			resp.Errno = -1
			resp.Errmsg = fmt.Sprintf("%s", err)
		}

		respJson, err := json.Marshal(resp)

		if err != nil {
			w.Write([]byte("{\"errno\":-1,\"errmsg\":\"ResponData json marchal failed\"}"))
			return
		}
		w.Write(respJson)
	}

	func main() {

		http.HandleFunc("/sendmsg", sendMsg)
		http.ListenAndServe(":8009", nil)
	}

Kmessage

The message is transmitted in the form of alpaca.Kmessage

  	alpaca.Kmessage{
		Cmd:     Cmd, // Command point
		Data:    Data, // transfer data
		LogId:   Logid, // Log ID
		HashKey: Hashkey, // The same hashkey will be assigned to the same queue for sequential processing
		Delay: 12, //Message delay time
	}

Personalise

Alpace implements message processing and service load balancing strategy by default. If you want to customize your own message processing and load balancing strategy, you can implement it in the following custom ways

	// Custom message processing implementation
	type customizeHandleMessage struct {
		...Implement your own logic
	}

	func (c *customizeHandleMessage) MessageHandle(url stirng, msg *alpaca.Kmessage) {
		...Implement your own logic
	}

	//Register message handler
	handlemsg := &customizeHandleMessage{}
	pull.SetMessageHandle(handlemsg)

	**********************************************************************************


	// Implement a custom service load balancing strategy
	type customizeServerSelector struct {
		...Implement your own logic
	}

	func (c *customizeServerSelector) GetAppUrl(ap App) {
			...Implement your own logic
	}

	//Register service load balancing selector
	selector := &customizeServerSelector{}
	pull.SetServerSelect(selector)

Notes

  • Service request succeeded, The condition for the puller request service to be successful is that the requested service needs to return 200 and the return parameter exists errno and is 0
  • Puller deployment recommendations. If you want to deploy multiple pullers, it is recommended that you initialize the number of kafka partitions equal to the number of pullers. If you can, please ensure that one puller only handles one partition, which will make full use of the performance advantages of alpaca-mq
  • Puller and pusher will only handle one topic, so don’t expect it to handle multiple topics. If you want to implement multiple topics, it is recommended that you divide different topics according to different businesses.
  • In order to ensure the entire message link, please be sure to carry the LogId, which will bring great benefits, not just limited to the location of the problem
  • Of course, you have more suggestions and ideas, you can contact me

More

If you want to know more how to use it, you can refer to the examples, it can provide you with more help

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNil = redis.ErrNil
View Source
var ZkErrExists = "zk: node already exists"

Functions

func AssembleApUrl added in v1.0.1

func AssembleApUrl(pro string, host string, path string) string

Types

type AlpaceMsg

type AlpaceMsg struct {
	// contains filtered or unexported fields
}

type App

type App struct {
	Protocol   string   `yaml:"protocol"`
	Path       string   `yaml:path`
	Name       string   `yaml:"name"`
	ServerType string   `yaml:"server_type"`
	Cmd        string   `yaml:"cmd"`
	Servers    []string `yaml:"servers"`
}

type BtLimit added in v1.0.1

type BtLimit struct {
	// contains filtered or unexported fields
}

func NewRateLimit added in v1.0.1

func NewRateLimit(capacity int64) (*BtLimit, error)

func (*BtLimit) Take added in v1.0.1

func (b *BtLimit) Take() bool

type Cache

type Cache struct {
	// contains filtered or unexported fields
}

func InitPool

func InitPool(server string) *Cache

func (*Cache) Exists

func (r *Cache) Exists(key string) (bool, error)

func (*Cache) GetInt64

func (r *Cache) GetInt64(key string) (int64, error)

func (*Cache) SAdd

func (r *Cache) SAdd(key string, value string) error

func (*Cache) SGet

func (r *Cache) SGet(key string) ([]interface{}, error)

func (*Cache) SetInt64

func (r *Cache) SetInt64(key string, value int64) error

func (*Cache) Srem

func (r *Cache) Srem(key string, value string) error

type CfgHandle added in v1.0.1

type CfgHandle struct {
	// contains filtered or unexported fields
}

func NewCfgHandle added in v1.0.1

func NewCfgHandle(lg *Logger) *CfgHandle

func (*CfgHandle) InitAppCfg added in v1.0.1

func (c *CfgHandle) InitAppCfg(apdir string, list []string) map[string]App

func (*CfgHandle) InitGPullerCfg added in v1.0.1

func (c *CfgHandle) InitGPullerCfg(cfgFile string) *GPullerConfig

func (*CfgHandle) InitGPusherCfg added in v1.0.1

func (c *CfgHandle) InitGPusherCfg(cfgFile string) *GPusherConfig

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func InitConsumer

func InitConsumer(topic string, gname string, conf *PullConfig) (*Consumer, error)

func (*Consumer) Close

func (cs *Consumer) Close() error

func (*Consumer) CommitOffsets

func (cs *Consumer) CommitOffsets() error

func (*Consumer) Errors

func (cs *Consumer) Errors() <-chan error

func (*Consumer) MarkOffset

func (cs *Consumer) MarkOffset(topic string, partition int32, offset int64, groupId string)

func (*Consumer) Notifications

func (cs *Consumer) Notifications() <-chan *sarama_cluster.Notification

func (*Consumer) Recv

func (cs *Consumer) Recv() <-chan *sarama.ConsumerMessage

func (*Consumer) ResetOffset

func (cs *Consumer) ResetOffset(topic string, partition int32, offset int64, groupId string)

func (*Consumer) Subscriptions

func (cs *Consumer) Subscriptions() map[string][]int32

type CurrMap added in v1.0.1

type CurrMap struct {
	// contains filtered or unexported fields
}

type Fields

type Fields map[string]interface{}

type GPullerConfig

type GPullerConfig struct {
	Kafka         []string `yaml:"kafka"`
	Zookeeper     []string `yaml:"zookeeper"`
	Redis         string   `yaml:"redis"`
	ZkRetryTimes  int32    `yaml:"zk_retry_times"`
	MsgCmtMode    int8     `yaml:"msg_commit_mode"`
	MsgDelayAble  int8     `yaml:"msg_delay_able"`
	OffsetCtTime  int32    `yaml:"offset_ct_time"`
	TimeWheelSize int32    `yaml:"time_wheel_size"`
	Wnd           int32    `yaml:"wnd"`
	GroupName     string   `yaml:"group_name"`
	Topic         string   `yaml:"topic"`
	Cmode         int8     `yaml:"msg_consum_mode"`
	Gpath         string   `yaml:"gpath"`
	Alist         []string `yaml:"services"`
	LoadBMode     int8     `yaml:"load_balance_mode"`
}

type GPusherCmd

type GPusherCmd struct {
	Cmd    string `yaml:"cmd"`
	Status bool   `yaml:"st"`
}

type GPusherConfig

type GPusherConfig struct {
	Kafka     []string     `yaml:"kafka"`
	Topic     string       `yaml:"topic"`
	RateLimit int64        `yaml:"rate_limit"`
	Cmds      []GPusherCmd `yaml:"cmds"`
}

type KaClient added in v1.0.1

type KaClient struct {
	// contains filtered or unexported fields
}

func NewKaClient added in v1.0.1

func NewKaClient(topic string, gname string, servers []string, cfg *PullConfig) (*KaClient, error)

type Kmessage

type Kmessage struct {
	Cmd     string
	LogId   string
	HashKey string
	Data    string
	Delay   int
}

type Logger

type Logger struct {
	// contains filtered or unexported fields
}

func NewLogger

func NewLogger(logFilePath string) *Logger

func (*Logger) Fatal

func (l *Logger) Fatal(args ...interface{})

func (*Logger) Fatalf

func (l *Logger) Fatalf(format string, args ...interface{})

func (*Logger) Info

func (l *Logger) Info(args ...interface{})

func (*Logger) Infof

func (l *Logger) Infof(format string, args ...interface{})

func (*Logger) Init

func (l *Logger) Init(timeForm time.Duration)

func (*Logger) Panic

func (l *Logger) Panic(args ...interface{})

func (*Logger) Panicf

func (l *Logger) Panicf(format string, args ...interface{})

func (*Logger) Warn

func (l *Logger) Warn(args ...interface{})

func (*Logger) Warnf

func (l *Logger) Warnf(format string, args ...interface{})

func (*Logger) WithFields

func (l *Logger) WithFields(fields map[string]interface{}) *logrus.Entry

type MessageHandle added in v1.0.1

type MessageHandle interface {
	HandleMessage(string, *Kmessage) error
}

Message core interface processing, through which the final consumption of messages, business logic, service address and message body Kmessage will be accepted

type MsgHandle added in v1.0.1

type MsgHandle struct {
	// contains filtered or unexported fields
}

func NewMsgHandle added in v1.0.1

func NewMsgHandle(klogger *Logger) *MsgHandle

Message processing, if the returned status is not equal to 200 or the returned errno is not equal to 0, it will be marked as a message processing failure

func (*MsgHandle) HandleMessage added in v1.0.1

func (m *MsgHandle) HandleMessage(url string, kmsg *Kmessage) error

type Producer

type Producer struct {
	Asp sarama.AsyncProducer
}

func InitProducer

func InitProducer(conf *PushConfig) (*Producer, error)

func (*Producer) Close

func (asp *Producer) Close() (err error)

func (*Producer) Errors

func (asp *Producer) Errors() <-chan *sarama.ProducerError

func (*Producer) Send

func (asp *Producer) Send() chan<- *sarama.ProducerMessage

func (*Producer) Successes

func (asp *Producer) Successes() <-chan *sarama.ProducerMessage

type PullConfig

type PullConfig struct {
	sarama_cluster.Config
	// contains filtered or unexported fields
}

func NewPullerConfig

func NewPullerConfig(gcf *GPullerConfig) *PullConfig

type Puller

type Puller struct {
	// contains filtered or unexported fields
}

func InitPuller

func InitPuller(logpath string, cfgfile string, apdir string) *Puller

func NewPuller

func NewPuller(lg *Logger, cg *GPullerConfig, aplist map[string]App) *Puller

func (*Puller) GetLogger added in v1.0.1

func (p *Puller) GetLogger() *Logger

If the default service selector and message processing do not meet your needs, you can implement your own way to meet your needs

func (*Puller) Pull

func (p *Puller) Pull()

func (*Puller) SetMessageHandle added in v1.0.1

func (p *Puller) SetMessageHandle(mhdle MessageHandle)

func (*Puller) SetServerSelect added in v1.0.1

func (p *Puller) SetServerSelect(ss ServerSelector)

type PushConfig

type PushConfig struct {
	Servers []string
	Sconf   *sarama.Config
}

func NewPusherConfig

func NewPusherConfig(cf *GPusherConfig) *PushConfig

type Pusher

type Pusher struct {
	// contains filtered or unexported fields
}

func InitPusher

func InitPusher(logpath string, cfgfile string) *Pusher

func NewPusher

func NewPusher(lg *Logger, pcfg *GPusherConfig) *Pusher

func (*Pusher) GetLogger added in v1.0.1

func (p *Pusher) GetLogger() *Logger

func (*Pusher) Push

func (p *Pusher) Push(message *Kmessage) error

type RandomSelect added in v1.0.1

type RandomSelect struct {
}

Randomly select a service to return, sometimes its randomness is not always correct

func NewRandomSelect added in v1.0.1

func NewRandomSelect() *RandomSelect

func (*RandomSelect) GetAppUrl added in v1.0.1

func (r *RandomSelect) GetAppUrl(ap App) string

type RoundRobin added in v1.0.1

type RoundRobin struct {
	// contains filtered or unexported fields
}

Sequentially polling and returning services to ensure that every service has been called, but it cannot remove abnormal services

func NewRoundRobin added in v1.0.1

func NewRoundRobin(aplist map[string]App) *RoundRobin

func (*RoundRobin) GetAppUrl added in v1.0.1

func (r *RoundRobin) GetAppUrl(ap App) string

type ServerSelector added in v1.0.1

type ServerSelector interface {
	GetAppUrl(ap App) string
}

Service load balancing strategy is implemented, select service and return, App structure will be returned

type TimeWheel

type TimeWheel struct {
	// contains filtered or unexported fields
}

func NewTimeWheel

func NewTimeWheel(bucketNum int32) (*TimeWheel, error)

type Tmsg added in v1.0.1

type Tmsg struct {
	// contains filtered or unexported fields
}

type Zk

type Zk struct {
	ZkConn *zk.Conn
}

func NewZk

func NewZk(Servers []string, timeout time.Duration) (*Zk, error)

func (*Zk) Create

func (z *Zk) Create(path string, data []byte, nodeType int32, acl []zk.ACL) error

func (*Zk) Delete

func (z *Zk) Delete(path string, version int32) error

func (*Zk) Exists

func (z *Zk) Exists(path string) (bool, error)

func (*Zk) Get

func (z *Zk) Get(path string) ([]byte, *zk.Stat, error)

func (*Zk) GetChildren

func (z *Zk) GetChildren(path string) ([]string, error)

func (*Zk) Set

func (z *Zk) Set(path string, data []byte, version int32) error

func (*Zk) WorldACL

func (z *Zk) WorldACL() []zk.ACL

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL