queue

package
v0.0.0-...-23ae872 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2022 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RedisTimelineTag = "timeline-update"
)

Variables

View Source
var (
	ErrUnknownQueue   = errors.New("unknown queue")
	ErrNoOrEmptyQueue = errors.New("empty or unknown queue")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	Address                 string        `env:"REDIS_ADDRESS,default=localhost:6379"`
	UserName                string        `env:"REDIS_USERNAME"`
	Password                string        `env:"REDIS_PASSWORD,default=pass"`
	Database                int           `env:"REDIS_DATABASE,default=0"`
	PoolSize                int           `env:"REDIS_POOL_SIZE,default=5"`
	CleanPeriod             time.Duration `env:"QUEUE_CLEANUP_PERIOD,default=300s"`
	NumberConsumersForQueue int           `env:"CONSUMERS_PER_QUEUE,default=5"`
}

type ConsumerPost

type ConsumerPost struct {
	// contains filtered or unexported fields
}

func NewConsumerPost

func NewConsumerPost(tag string, logger *logrus.Logger, service *storage.UserService, queue *TaskQueue, publish func(ctx context.Context, userId int64, shardId string, event model.Event) error) *ConsumerPost

func (*ConsumerPost) Consume

func (c *ConsumerPost) Consume(delivery rmq.Delivery)

type ConsumerUserFeed

type ConsumerUserFeed struct {
	// contains filtered or unexported fields
}

func NewConsumerUserFeed

func NewConsumerUserFeed(tag string, logger *logrus.Logger, service *storage.UserService, redis *redis.Client) *ConsumerUserFeed

func (*ConsumerUserFeed) Consume

func (c *ConsumerUserFeed) Consume(delivery rmq.Delivery)

type Service

type Service struct {
	// contains filtered or unexported fields
}

func New

func New(config Config, storage *storage.UserService, logger *logrus.Logger, publish func(ctx context.Context, userId int64, shardId string, event model.Event) error) (*Service, error)

New Инициализация сервиса

func (*Service) GetRedisClient

func (s *Service) GetRedisClient() *redis.Client

func (*Service) NewPost

func (s *Service) NewPost(ctx context.Context, post model.Post) error

NewPost Обновление лент у пользователей

func (*Service) QueueCleaner

func (s *Service) QueueCleaner(config Config, log *logrus.Entry)

func (*Service) Run

func (s *Service) Run(ctx context.Context) error

Run Запуск сервиса

func (*Service) Shutdown

func (s *Service) Shutdown(ctx context.Context) error

Shutdown Graceful shutdown сервиса

func (*Service) StartConsumers

func (s *Service) StartConsumers(_ context.Context) error

StartConsumers Запустить консьюмеры

func (*Service) UpdateFeed

func (s *Service) UpdateFeed(ctx context.Context, userId int64) error

type ServiceDataResponse

type ServiceDataResponse struct {
	Rejected  int64 `json:"rejected"`
	New       int64 `json:"new"`
	Consumers int64 `json:"consumers"`
	InWork    int64 `json:"in_work"`
}

type TaskPost

type TaskPost struct {
	Post      model.Post
	QueueDate time.Time `json:"queue_date"`
}

TaskPost Задача на обновление поста

type TaskQueue

type TaskQueue struct {
	rmq.Queue
	// contains filtered or unexported fields
}

TaskQueue Очередь Задач

func (*TaskQueue) AddTaskPost

func (t *TaskQueue) AddTaskPost(post model.Post) error

func (*TaskQueue) AddTaskUpdateUserIdFeed

func (t *TaskQueue) AddTaskUpdateUserIdFeed(userId int64) error

func (*TaskQueue) TaskDone

func (t *TaskQueue) TaskDone()

type TaskUpdateUserIdFeed

type TaskUpdateUserIdFeed struct {
	UserId int64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL