producerConsumer

package
v0.0.0-...-da45c02 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2017 License: Apache-2.0 Imports: 5 Imported by: 5

README

producerConsumer

生产/消费模式

快速开始


    package main
   
    import (
        "fmt"
        "runtime"
        "strconv"
        "time"
        "github.com/BPing/Golib/producer_consumer"
    )
    
    type Message struct {
    	Key string
    }
    
    func(msg *Message)Id()string{
    	return msg.Key
    }
    
    func NewMessage(id string)*Message{
        return &Message{id}
    }
    
    var container *producerConsumer.Container
    
    func init(){
        consumeFunc := func(msg producerConsumer.IMessage) {
            fmt.Println("消费:", msg.Id(), "协程数目:", runtime.NumGoroutine())
        }
    
        container, _ = producerConsumer.NewContainerPC(20, consumeFunc)
        container.Consume()
    }
    
    func main(){
    
        go func() {
            for i := 0; i < 50; i++ {
                msg:=NewMessage("goone-"+strconv.Itoa(i), nil)
                container.Produce(msg)
            }
        }()
    
        go func() {
            for i := 0; i < 50; i++ {
                msg:=NewMessage("goone-"+strconv.Itoa(i), nil)
                container.Produce(msg)
                time.Sleep(time.Millisecond * 20)
            }
        }()
    
        time.Sleep(time.Second * 3)
    }

描述

  • 通过调用Consume()可以产生一个主要消费协程。主协程将一直存在,在没有消息体处理的时候进入阻塞等待。 可以通过调用Consume()的次数来控制产生主协程的数目。
  • 当消息体队列的已满,则会产生协助协程消费消息体。协助协程在消息体猛涨时候出现,在没有消息体处理的时候阻塞等待一定时间后将被销毁。 协助协程数目不作上限控制。

类型

  • channel型:基于缓冲channel队列实现的。
  • cache型:基于cache(如:redis)队列实现的。

Documentation

Index

Constants

View Source
const (
	MasterRunner = true
	AssistRunner = false

	ErrTag   = "PC-ErrTag"
	DebugTag = "PC-DebugTag"
	PanicTag = "PC-PanicTag"

	CacheType   = ContainerType("cache")
	ChannelType = ContainerType("channel")
)
View Source
const (
	// (秒)
	DefaultReadTimeout = 3
)

Variables

View Source
var (
	// 错误信息
	MsgListNilErr     = errors.New("list of message is nil ")
	ChanLenErr        = errors.New("length of chan should be bigger than one")
	ConsumeFuncNilErr = errors.New("func of consumer should not be nil")

	YieldCountThreshold = 10
)
View Source
var (
	ErrMarshalNil       = errors.New("ContainerRedis: Marshal is nil")
	ErrConsumeFuncNil   = errors.New("ContainerRedis: consumeFunc is nil")
	ErrUnmarshalNil     = errors.New("ContainerRedis: unmarshal is nil")
	ErrCacheInstanceNil = errors.New("ContainerRedis: cacheInstance is nil")
)

Functions

This section is empty.

Types

type CBaseInfo

type CBaseInfo struct {

	// 记录内部日志信息
	Record func(tag string, msg interface{})
	// contains filtered or unexported fields
}

基本属性

type Config

type Config struct {

	// * channel型(ChannelType):基于缓冲channel队列实现的。
	// * cache型(CacheType):基于redis型队列实现的。
	Type ContainerType

	// 消费信息的函数
	// 信息体最终落到此函数处理
	// 由用户自定义函数实体内容
	ConsumeFunc func(IMessage)

	// 消息队列长度
	//  如果为channel型,此变量为int类型。请自行处理不一致。
	MsgLen int64

	// 空闲存活时间(针对AssistRunner),单位为秒(s)
	//   如果是redis型的,此值等同于ReadTimeout。
	AssistIdleKeepAlive int64

	// 记录内部日志信息
	Record func(tag string, msg interface{})

	// redis 队列名字(唯一标识)
	//  针对redis型
	CacheListKey string

	// 把redis队列中的字符串信息解析成相应的信息结构体
	//  针对redis型
	Unmarshal func([]byte) (IMessage, error)

	// 把信息结构体序列化成字符串,以便保存到redis队列中
	//  针对redis型
	Marshal func(IMessage) ([]byte, error)

	// redis 操作实例 实现接口IRedis
	//  针对redis型
	CacheInstance ICache
}

初始配置项

type Container

type Container struct {
	CBaseInfo
	// contains filtered or unexported fields
}

容器 实现基于缓冲channel队列的生产/消费模式

1、Produce(msg interface{}) 生产信息,把消息放入消息列表中。
2、Consume() 消费消息。

开启主线程一直消费消息,如果消息过多时(消息队列满),则会开启协助协程消费消息。 协助协程将会在消息队列持续为空一段的时间后关闭.

func NewContainerPC

func NewContainerPC(chanLen int, consumeFunc func(IMessage)) (*Container, error)

新建生产/消费模式容器

func (*Container) Consume

func (c *Container) Consume() error

消费 一般调用一次即可 因为每一次调用都会开启一个主消费协程

func (*Container) NumGoroutine

func (c *Container) NumGoroutine() (master, assistActive int64)

或者活跃消费协程数目 @return

master :主要
assistActive:协助

func (*Container) Produce

func (c *Container) Produce(msg IMessage) error

生产 如果队列已满,开启新协助协程消费消息

func (*Container) SetAssistIdleKeepAlive

func (c *Container) SetAssistIdleKeepAlive(timeout time.Duration)

设置空闲存活时间

type ContainerRedis

type ContainerRedis struct {
	CBaseInfo

	// 把Cache队列中的字符串信息解析成相应的信息结构体
	Unmarshal func([]byte) (IMessage, error)

	// 把信息结构体序列化成字符串,以便保存到Cache(如:redis)队列中
	Marshal func(IMessage) ([]byte, error)

	// 读取信息列表超时时间(秒)
	//  如果列表没有元素会阻塞列表直到等待超时
	ReadTimeout int64

	// 信息队列长度。
	// 如果为零,代表程序不限制(则以Cache(如:redis)的队列最大限制为准)
	// 但不保证真正Cache(如:redis)队列长度一定会小于此值。
	// 另外,此值关系到是否有协助协程产生。
	MsgLen int64

	// cache 队列名字(唯一标识)
	CacheListKey string
	// contains filtered or unexported fields
}

以Cache(如:redis)队列作为信息队列

func NewContainerCachePC

func NewContainerCachePC(cacheInstance ICache, consumeFunc func(IMessage), unmarshal func([]byte) (IMessage, error), marshal func(IMessage) ([]byte, error)) (*ContainerRedis, error)

新建生产/消费模式容器

func (*ContainerRedis) Consume

func (cr *ContainerRedis) Consume() error

消费

func (*ContainerRedis) NumGoroutine

func (cr *ContainerRedis) NumGoroutine() (master, assistActive int64)

func (*ContainerRedis) Produce

func (cr *ContainerRedis) Produce(msg IMessage) error

生产

type ContainerType

type ContainerType string

type ICache

type ICache interface {
	// BLPOP key1 timeout(秒)
	// 移出并获取列表的第一个元素,
	// 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
	BLPop(key string, timeout int64) (map[string]string, error)

	// 在列表尾部中添加一个或多个值
	RPush(key string, values ...interface{}) (int64, error)

	// 获取列表长度
	LLen(key string) (int64, error)
}

缓存Cache接口

type IContainer

type IContainer interface {
	// 生产消息
	Produce(msg IMessage) error
	// 消费消息
	Consume() error
	// 消费消息的协程数目
	NumGoroutine() (master, assistActive int64)
}

容器接口

生产/消费模式

 通过调用`Consume()`可以产生一个主要消费协程。主协程将一直存在,在没有消息体处理的时候进入阻塞等待。

可以通过调用`Consume()`的次数来控制产生主协程的数目。

当消息体队列的已满,则会产生协助协程消费消息体。协助协程在消息体猛涨时候出现,在没有消息体处理的时候

阻塞等待一定时间后将被销毁。

1、Produce(msg interface{}) 生产信息,把消息放入消息列表中。
2、Consume() 消费消息。

func NewContainer

func NewContainer(config Config) (IContainer, error)

新建

type IMessage

type IMessage interface {
	// 标识
	// 此字段不能为空,
	// 否则会被当做无效数据抛弃。
	// 容器不对此标识的唯一性感兴趣。
	// 用户可以自行确保此标识的唯一性
	Id() string
}

消息接口

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL