collectors

package
v0.0.0-...-c5d900d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2019 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var CollectorService = fx.Provide(func(lc fx.Lifecycle, params CollectorParams) (Collector, error) {
	logger := params.Logger
	if logger == nil {
		logger = log15.New()
		logger.SetHandler(log15.DiscardHandler())
	}
	c, err := NewCollector(params.Args, params.Redis, logger)
	if err != nil {
		return nil, err
	}
	utils.Append(lc, c, logger)
	return c, nil
})

Functions

func CollectAndForward

func CollectAndForward(done <-chan struct{}, incoming *models.IncomingMail, c Collector, f forwarders.Forwarder) error

Types

type BaseCollector

type BaseCollector struct {
	Cur *sync.Map
	// TODO: replace Ch with an unbounded queue?
	Ch chan *models.IncomingMail
}

func (BaseCollector) ACK

func (c BaseCollector) ACK(uid ulid.ULID)

func (BaseCollector) Add

func (c BaseCollector) Add(uid ulid.ULID, m *models.IncomingMail)

func (BaseCollector) RePush

func (c BaseCollector) RePush(ctx context.Context)

func (BaseCollector) Start

func (c BaseCollector) Start(ctx context.Context)

type ChanCollector

type ChanCollector struct {
	BaseCollector
	// contains filtered or unexported fields
}

func NewChanCollector

func NewChanCollector(size int, logger log15.Logger) (*ChanCollector, error)

func (*ChanCollector) Close

func (c *ChanCollector) Close() error

func (*ChanCollector) Name

func (c *ChanCollector) Name() string

func (*ChanCollector) Pull

func (c *ChanCollector) Pull(stop <-chan struct{}) (*models.IncomingMail, error)

func (*ChanCollector) PullCtx

func (c *ChanCollector) PullCtx(ctx context.Context) (*models.IncomingMail, error)

func (*ChanCollector) Push

func (c *ChanCollector) Push(stop <-chan struct{}, m *models.IncomingMail) error

func (*ChanCollector) PushCtx

func (c *ChanCollector) PushCtx(ctx context.Context, info *models.IncomingMail) error

func (*ChanCollector) Start

func (c *ChanCollector) Start(ctx context.Context) error

type Collector

type Collector interface {
	utils.Service
	Push(stop <-chan struct{}, info *models.IncomingMail) error
	PushCtx(ctx context.Context, info *models.IncomingMail) error
	Pull(stop <-chan struct{}) (*models.IncomingMail, error)
	PullCtx(ctx context.Context) (*models.IncomingMail, error)
	ACK(uid ulid.ULID)
}

func NewCollector

func NewCollector(args *arguments.Args, redis utils.RedisConn, logger log15.Logger) (Collector, error)

type CollectorParams

type CollectorParams struct {
	fx.In
	Args   *arguments.Args
	Logger log15.Logger    `optional:"true"`
	Redis  utils.RedisConn `name:"collector" optional:"true"`
}

type FSCollector

type FSCollector struct {
	BaseCollector
	// contains filtered or unexported fields
}

func NewFSCollector

func NewFSCollector(root string, logger log15.Logger) (*FSCollector, error)

func (*FSCollector) Close

func (c *FSCollector) Close() error

func (*FSCollector) Name

func (c *FSCollector) Name() string

func (*FSCollector) Pull

func (c *FSCollector) Pull(stop <-chan struct{}) (*models.IncomingMail, error)

func (*FSCollector) PullCtx

func (c *FSCollector) PullCtx(ctx context.Context) (*models.IncomingMail, error)

func (*FSCollector) Push

func (c *FSCollector) Push(stop <-chan struct{}, m *models.IncomingMail) error

func (*FSCollector) PushCtx

func (c *FSCollector) PushCtx(ctx context.Context, info *models.IncomingMail) error

func (*FSCollector) Start

func (c *FSCollector) Start(ctx context.Context) error

type FileStore

type FileStore struct {
	// contains filtered or unexported fields
}

func NewFileStore

func NewFileStore(root string, logger log15.Logger) (*FileStore, error)

func (*FileStore) Close

func (s *FileStore) Close() error

func (*FileStore) Get

func (s *FileStore) Get(stop <-chan struct{}, obj *models.IncomingMail) error

func (*FileStore) New

func (s *FileStore) New(uid ulid.ULID, obj *models.IncomingMail) error

type RabbitCollector

type RabbitCollector struct {
	// contains filtered or unexported fields
}

func NewRabbitCollector

func NewRabbitCollector(args arguments.RabbitArgs, logger log15.Logger) (*RabbitCollector, error)

func (*RabbitCollector) ACK

func (c *RabbitCollector) ACK(uid ulid.ULID)

func (*RabbitCollector) Close

func (c *RabbitCollector) Close() error

func (*RabbitCollector) Name

func (c *RabbitCollector) Name() string

func (*RabbitCollector) Pull

func (c *RabbitCollector) Pull(stop <-chan struct{}) (*models.IncomingMail, error)

func (*RabbitCollector) PullCtx

func (*RabbitCollector) Push

func (c *RabbitCollector) Push(stop <-chan struct{}, m *models.IncomingMail) error

func (*RabbitCollector) PushCtx

func (*RabbitCollector) Start

func (c *RabbitCollector) Start(ctx context.Context) error

type RedisCollector

type RedisCollector struct {
	// contains filtered or unexported fields
}

func NewRedisCollector

func NewRedisCollector(args arguments.RedisArgs, redis utils.RedisConn, logger log15.Logger) (*RedisCollector, error)

func (*RedisCollector) ACK

func (c *RedisCollector) ACK(uid ulid.ULID)

func (*RedisCollector) Name

func (c *RedisCollector) Name() string

func (*RedisCollector) Pull

func (c *RedisCollector) Pull(stop <-chan struct{}) (*models.IncomingMail, error)

func (*RedisCollector) PullCtx

func (c *RedisCollector) PullCtx(ctx context.Context) (*models.IncomingMail, error)

func (*RedisCollector) Push

func (c *RedisCollector) Push(stop <-chan struct{}, m *models.IncomingMail) error

func (*RedisCollector) PushCtx

func (c *RedisCollector) PushCtx(ctx context.Context, info *models.IncomingMail) error

func (*RedisCollector) Start

func (c *RedisCollector) Start(ctx context.Context) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL