feedback

package
v3.7.1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2023 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	APNSPlatform = "apns"
	GCMPlatform  = "gcm"
)

Constants

View Source
const (
	MetricsTokensDeleteSuccess     = "tokens_delete_success"
	MetricsTokensDeleteError       = "tokens_delete_error"
	MetricsTokensDeleteNonexistent = "tokens_delete_nonexistent"
)

Metrics name sent by the Handler

Variables

View Source
var (
	ErrAPNSUnmarshal = errors.New("error unmarshalling apns message")
	ErrGCMUnmarshal  = errors.New("error unmarshalling gcm message")
)

Errors

View Source
var AvailableStatsReporters = map[string]statsReporterInitializer{
	"statsd": func(config *viper.Viper, logger *logrus.Logger, clientOrNil interfaces.StatsDClient) (interfaces.StatsReporter, error) {
		return extensions.NewStatsD(config, logger, clientOrNil)
	},
}

AvailableStatsReporters contains functions to initialize all stats reporters

Functions

func WaitTimeout

func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool

WaitTimeout waits for the waitgroup for the specified max timeout. Returns true if waiting timed out. got from http://stackoverflow.com/a/32843750/3987733

Types

type Broker

type Broker struct {
	StatsReporters []interfaces.StatsReporter
	Logger         *log.Logger
	Config         *viper.Viper
	InChan         chan QueueMessage

	InvalidTokenOutChan chan *InvalidToken

	InvalidTokenEnabled bool
	// contains filtered or unexported fields
}

Broker receives kafka messages in its InChan, unmarshal them according to the platform and routes them to the correct out channel after examining their content.

func NewBroker

func NewBroker(
	logger *log.Logger, cfg *viper.Viper, statsReporters []interfaces.StatsReporter,
	inChan chan QueueMessage,
	pendingMessagesWG *sync.WaitGroup,
) (*Broker, error)

NewBroker creates a new Broker instance

func (*Broker) Start

func (b *Broker) Start()

Start starts a routine to process the Broker in channel.

func (*Broker) Stop

func (b *Broker) Stop()

Stop stops all routines from processing the in channel and closes all output channels.

type InvalidToken

type InvalidToken struct {
	Token    string
	Game     string
	Platform string
}

InvalidToken represents a token with the necessary information to be deleted

type InvalidTokenHandler

type InvalidTokenHandler struct {
	Logger        *log.Logger
	Config        *viper.Viper
	StatsReporter []interfaces.StatsReporter
	Client        *extensions.PGClient

	InChan chan *InvalidToken
	Buffer []*InvalidToken
	// contains filtered or unexported fields
}

InvalidTokenHandler takes the InvalidTokens from the InChannel and put them in a buffer. When the buffer is full or after a timeout, it is flushed, triggering the deletion of the tokens from the database

func NewInvalidTokenHandler

func NewInvalidTokenHandler(
	logger *log.Logger, cfg *viper.Viper, statsReporter []interfaces.StatsReporter,
	inChan chan *InvalidToken,
	dbOrNil ...interfaces.DB,
) (*InvalidTokenHandler, error)

NewInvalidTokenHandler returns a new InvalidTokenHandler instance

func (*InvalidTokenHandler) Start

func (i *InvalidTokenHandler) Start()

Start starts to process the InvalidTokens from the intake channel

func (*InvalidTokenHandler) Stop

func (i *InvalidTokenHandler) Stop()

Stop stops the Handler from consuming messages from the intake channel

type KafkaConsumer

type KafkaConsumer struct {
	Topics              []string
	Brokers             string
	Consumer            interfaces.KafkaConsumerClient
	ConsumerGroup       string
	OffsetResetStrategy string
	Config              *viper.Viper
	ChannelSize         int
	Logger              *logrus.Logger
	FetchMinBytes       int
	FetchWaitMaxMs      int

	SessionTimeout int

	HandleAllMessagesBeforeExiting bool
	AssignedPartition              bool
	// contains filtered or unexported fields
}

KafkaConsumer for getting pusher feedbacks

func NewKafkaConsumer

func NewKafkaConsumer(
	config *viper.Viper,
	logger *logrus.Logger,
	stopChannel *chan struct{},
	clientOrNil ...interfaces.KafkaConsumerClient,
) (*KafkaConsumer, error)

NewKafkaConsumer for creating a new KafkaConsumer instance

func (*KafkaConsumer) Cleanup

func (q *KafkaConsumer) Cleanup() error

Cleanup closes kafka consumer connection

func (*KafkaConsumer) ConsumeLoop

func (q *KafkaConsumer) ConsumeLoop() error

ConsumeLoop consume messages from the queue and put in messages to send channel

func (*KafkaConsumer) MessagesChannel

func (q *KafkaConsumer) MessagesChannel() chan QueueMessage

MessagesChannel returns the channel that will receive all messages got from kafka

func (*KafkaConsumer) PendingMessagesWaitGroup

func (q *KafkaConsumer) PendingMessagesWaitGroup() *sync.WaitGroup

PendingMessagesWaitGroup returns the waitGroup that is incremented every time a feedback is consumed

func (*KafkaConsumer) StopConsuming

func (q *KafkaConsumer) StopConsuming()

StopConsuming stops consuming messages from the queue

type KafkaMessage

type KafkaMessage struct {
	Game     string
	Platform string
	Value    []byte
}

KafkaMessage implements the FeedbackMessage interface

func (*KafkaMessage) GetGame

func (k *KafkaMessage) GetGame() string

GetGame returns the message's Game

func (*KafkaMessage) GetPlatform

func (k *KafkaMessage) GetPlatform() string

GetPlatform returns the message's Platform

func (*KafkaMessage) GetValue

func (k *KafkaMessage) GetValue() []byte

GetValue returns the message's Value

type Listener

type Listener struct {
	Config         *viper.Viper
	Logger         *log.Logger
	StatsReporters []interfaces.StatsReporter

	Queue                   Queue
	Broker                  *Broker
	InvalidTokenHandler     *InvalidTokenHandler
	GracefulShutdownTimeout int
	// contains filtered or unexported fields
}

Listener will consume push feedbacks from a queue and use a broker to route the messages to a convenient handler

func NewListener

func NewListener(
	config *viper.Viper, logger *log.Logger,
	statsdClientOrNil interfaces.StatsDClient,
) (*Listener, error)

NewListener creates and return a new Listener instance

func (*Listener) Cleanup

func (l *Listener) Cleanup()

Cleanup ends the Listener execution

func (*Listener) Start

func (l *Listener) Start()

Start starts the listener

type Message

type Message struct {
	From             string                 `json:"from"`
	MessageID        string                 `json:"message_id"`
	MessageType      string                 `json:"message_type"`
	Error            string                 `json:"error"`
	ErrorDescription string                 `json:"error_description"`
	DeviceToken      string                 `json:"DeviceToken"`
	ID               string                 `json:"id"`
	Err              map[string]interface{} `json:"Err"`
	Metadata         map[string]interface{} `json:"metadata"`
	Reason           string                 `json:"reason"`
}

Message is a struct that will decode an apns or gcm feedback message.

type Queue

type Queue interface {
	MessagesChannel() chan QueueMessage
	ConsumeLoop() error
	StopConsuming()
	Cleanup() error
	PendingMessagesWaitGroup() *sync.WaitGroup
}

Queue interface for making new queues pluggable easily

type QueueMessage

type QueueMessage interface {
	GetGame() string
	GetPlatform() string
	GetValue() []byte
}

QueueMessage defines the interface that should be implemented by the type produced by a Queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL