task

package
v2.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2023 License: MIT Imports: 39 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TgSuccessMsgSubscribed   = "Subscribed successfully."
	TgSuccessMsgUnsubscribed = "Unsubscribed successfully."

	TgErrMsgRequiredSite = "Site (domain) is required."
	TgErrMsgNotFoundSite = "Site `%s` not found."
)
View Source
const (
	OpClientEnqueue = "task.client: enqueue ->"

	OpMetricsRegister = "task.metrics: register ->"
	OpMetricsClose    = "task.metrics: close ->"

	OpServerStart        = "task.server: start ->"
	OpServerProcessTask  = "task.server: process task ->"
	OpServerParseFeed    = "task.server: parse feed link ->"
	OpServerParseSitemap = "task.server: parse sitemap link ->"
	OpServerParseArticle = "task.server: parse article link ->"

	OpSchedulerStart  = "task.scheduler: start ->"
	OpSchedulerSync   = "task.scheduler: sync ->"
	OpSchedulerAdd    = "task.scheduler: add ->"
	OpSchedulerRemove = "task.scheduler: remove ->"

	OpMarshal   = "task: marshal payload ->"
	OpUnmarshal = "task: unmarshal payload ->"
)
View Source
const (
	TgCmdStart  = "start"
	TgCmdRumors = "rumors"
	TgCmdSites  = "sites"
	TgCmdSub    = "sub"
	TgCmdOn     = "on"
	TgCmdOff    = "off"
)
View Source
const (
	TelegramPrefix    = "telegram:"
	TelegramCmd       = TelegramPrefix + "cmd:"
	TelegramCmdRumors = TelegramCmd + TgCmdRumors
	TelegramCmdSites  = TelegramCmd + TgCmdSites
	TelegramCmdSub    = TelegramCmd + TgCmdSub
	TelegramCmdOn     = TelegramCmd + TgCmdOn
	TelegramCmdOff    = TelegramCmd + TgCmdOff
	TelegramChat      = TelegramPrefix + "chat:"
	TelegramChatNew   = TelegramChat + "new"
	TelegramChatEdit  = TelegramChat + "edit"
)
View Source
const DefaultQueue = "default"
View Source
const (
	PluginName = "task"
)

Variables

This section is empty.

Functions

func LoggingMiddleware

func LoggingMiddleware(log *slog.Logger) asynq.MiddlewareFunc

func TgCmdMiddleware

func TgCmdMiddleware(
	siteRepo repository.ReadRepository[*entity.Site],
	chatRepo repository.ReadWriteRepository[*entity.Chat],
	publisher common.Pub,
	logger *slog.Logger,
) asynq.MiddlewareFunc

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(redisConnOpt asynq.RedisConnOpt, logger *slog.Logger) *Client

func (*Client) Close

func (c *Client) Close() error

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, name string, data any, opts ...asynq.Option) error

func (*Client) EnqueueTgCmd

func (c *Client) EnqueueTgCmd(ctx context.Context, message *tgbotapi.Message, updateID int)

func (*Client) EnqueueTgMemberEdit

func (c *Client) EnqueueTgMemberEdit(ctx context.Context, member *tgbotapi.ChatMemberUpdated, updateID int)

func (*Client) EnqueueTgMemberNew

func (c *Client) EnqueueTgMemberNew(ctx context.Context, member *tgbotapi.Chat, updateID int)

type HandlerJobFeed

type HandlerJobFeed struct {
	// contains filtered or unexported fields
}

func (*HandlerJobFeed) ProcessTask

func (h *HandlerJobFeed) ProcessTask(ctx context.Context, task *asynq.Task) error

type HandlerJobSitemap

type HandlerJobSitemap struct {
	// contains filtered or unexported fields
}

func (*HandlerJobSitemap) ProcessTask

func (h *HandlerJobSitemap) ProcessTask(ctx context.Context, task *asynq.Task) error

type HandlerTgChat

type HandlerTgChat struct {
	// contains filtered or unexported fields
}

func (*HandlerTgChat) ProcessTask

func (h *HandlerTgChat) ProcessTask(ctx context.Context, task *asynq.Task) error

type HandlerTgCmdOff

type HandlerTgCmdOff struct {
	// contains filtered or unexported fields
}

func (*HandlerTgCmdOff) ProcessTask

func (h *HandlerTgCmdOff) ProcessTask(ctx context.Context, _ *asynq.Task) error

type HandlerTgCmdOn

type HandlerTgCmdOn struct {
	// contains filtered or unexported fields
}

func (*HandlerTgCmdOn) ProcessTask

func (h *HandlerTgCmdOn) ProcessTask(ctx context.Context, _ *asynq.Task) error

type HandlerTgCmdRumors

type HandlerTgCmdRumors struct {
	// contains filtered or unexported fields
}

func (*HandlerTgCmdRumors) ProcessTask

func (h *HandlerTgCmdRumors) ProcessTask(ctx context.Context, _ *asynq.Task) error

type HandlerTgCmdSites

type HandlerTgCmdSites struct {
	// contains filtered or unexported fields
}

func (*HandlerTgCmdSites) ProcessTask

func (h *HandlerTgCmdSites) ProcessTask(ctx context.Context, _ *asynq.Task) error

type HandlerTgCmdSub

type HandlerTgCmdSub struct {
	// contains filtered or unexported fields
}

func (*HandlerTgCmdSub) ProcessTask

func (h *HandlerTgCmdSub) ProcessTask(ctx context.Context, _ *asynq.Task) error

type Meta

type Meta struct {
	*opengraph.Meta
}

func MetaTag

func MetaTag(node *html.Node) *Meta

func (*Meta) Contribute

func (meta *Meta) Contribute(og *opengraph.OpenGraph) (err error)

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics(redisConnOpt asynq.RedisConnOpt, logger *slog.Logger) *Metrics

func (*Metrics) Close

func (m *Metrics) Close() error

func (*Metrics) Register

func (m *Metrics) Register() error

func (*Metrics) Unregister

func (m *Metrics) Unregister()

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

func (*Plugin) Client

func (p *Plugin) Client() common.Client

func (*Plugin) Init

func (p *Plugin) Init(
	cfg config.Configurer,
	uow common.UnitOfWork,
	redisConnOpt asynq.RedisConnOpt,
	pub common.Pub,
	log logger.Logger,
) error

func (*Plugin) Name

func (p *Plugin) Name() string

func (*Plugin) Provides

func (p *Plugin) Provides() []*dep.Out

func (*Plugin) Serve

func (p *Plugin) Serve() chan error

func (*Plugin) Stop

func (p *Plugin) Stop(ctx context.Context) (err error)

type PostEnqueueFunc

type PostEnqueueFunc func(info *asynq.TaskInfo, err error)

type PreEnqueueFunc

type PreEnqueueFunc func(task *asynq.Task, opts []asynq.Option)

type Scheduler

type Scheduler struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(repo repository.ReadRepository[*entity.Job], redisConnOpt asynq.RedisConnOpt, logger *slog.Logger, options ...SchedulerOption) *Scheduler

func (*Scheduler) Add

func (s *Scheduler) Add(job *entity.Job) error

func (*Scheduler) Remove

func (s *Scheduler) Remove(id uuid.UUID) error

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context, errCh chan<- error)

func (*Scheduler) Stop

func (s *Scheduler) Stop()

type SchedulerConfig

type SchedulerConfig struct {
	SyncInterval time.Duration `mapstructure:"sync_interval"`
}

func (*SchedulerConfig) Init

func (cfg *SchedulerConfig) Init()

type SchedulerOption

type SchedulerOption func(*Scheduler)

func WithInterval

func WithInterval(interval time.Duration) SchedulerOption

func WithPostEnqueueFunc

func WithPostEnqueueFunc(fn PostEnqueueFunc) SchedulerOption

func WithPreEnqueueFunc

func WithPreEnqueueFunc(fn PreEnqueueFunc) SchedulerOption

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(cfg *ServerConfig, redisConnOpt asynq.RedisConnOpt, logger *slog.Logger, options ...ServerOption) *Server

func (*Server) Start

func (s *Server) Start(handler asynq.Handler, errCh chan<- error)

func (*Server) Stop

func (s *Server) Stop()

type ServerConfig

type ServerConfig struct {
	Concurrency              int            `mapstructure:"concurrency"`
	Queues                   map[string]int `mapstructure:"queues"`
	StrictPriority           bool           `mapstructure:"strict_priority"`
	HealthCheckInterval      time.Duration  `mapstructure:"health_check_interval"`
	DelayedTaskCheckInterval time.Duration  `mapstructure:"delayed_task_check_interval"`
	GroupGracePeriod         time.Duration  `mapstructure:"group_grace_period"`
	GroupMaxDelay            time.Duration  `mapstructure:"group_max_delay"`
	GroupMaxSize             int            `mapstructure:"group_max_size"`
	GracefulTimeout          time.Duration  `mapstructure:"graceful_timeout"`
}

func (*ServerConfig) Init

func (cfg *ServerConfig) Init()

type ServerOption

type ServerOption func(s *Server)

func WithErrorHandler

func WithErrorHandler(errorHandler asynq.ErrorHandler) ServerOption

func WithGroupAggregator

func WithGroupAggregator(groupAggregator asynq.GroupAggregator) ServerOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL