kafka_queue

package
v0.0.0-...-b5d444f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2024 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TaskRetries = 2

	MaxMessageSizeBytes = 256 * 1024 * 1024 // MiB
)
View Source
const ConsumerGroupName = "group-default"
View Source
const KafkaOperationTimeout = 25 * time.Second

KafkaOperationTimeout The timeout for all kafka send/receive operations.

Variables

View Source
var (
	EnvironmentPrefix = func() string {
		prefix := os.Getenv("KAFKA_ENV_PREFIX")
		if len(prefix) > 0 {
			return prefix
		} else {
			return os.Getenv("DOPPLER_CONFIG")
		}
	}()
)

Functions

func GetTopic

func GetTopic(options GetTopicOptions) string

Types

type AddSessionFeedbackArgs

type AddSessionFeedbackArgs struct {
	SessionSecureID string
	UserName        *string
	UserEmail       *string
	Verbatim        string
	Timestamp       time.Time
}

type AddSessionPropertiesArgs

type AddSessionPropertiesArgs struct {
	SessionSecureID  string
	PropertiesObject interface{}
}

type AddTrackPropertiesArgs

type AddTrackPropertiesArgs struct {
	SessionSecureID  string
	PropertiesObject interface{}
}

type BalancerWrapper

type BalancerWrapper struct {
	// contains filtered or unexported fields
}

func (*BalancerWrapper) AssignGroups

func (b *BalancerWrapper) AssignGroups(members []kafka.GroupMember, partitions []kafka.Partition) kafka.GroupMemberAssignments

func (*BalancerWrapper) ProtocolName

func (b *BalancerWrapper) ProtocolName() string

func (*BalancerWrapper) UserData

func (b *BalancerWrapper) UserData() ([]byte, error)

type ConfigOverride

type ConfigOverride struct {
	Async            *bool
	QueueCapacity    *int
	MinBytes         *int
	MaxWait          *time.Duration
	MessageSizeBytes *int64
	OnAssignGroups   func()
}

type ErrorGroupDataSyncArgs

type ErrorGroupDataSyncArgs struct {
	ErrorGroupID int
}

type ErrorObjectDataSyncArgs

type ErrorObjectDataSyncArgs struct {
	ErrorObjectID int
}

type GetTopicOptions

type GetTopicOptions struct {
	Type TopicType
}

type IdentifySessionArgs

type IdentifySessionArgs struct {
	SessionSecureID string
	UserIdentifier  string
	UserObject      interface{}
}

type InitializeSessionArgs

type InitializeSessionArgs struct {
	SessionSecureID                string
	CreatedAt                      time.Time
	ProjectVerboseID               string
	EnableStrictPrivacy            bool
	PrivacySetting                 *string
	EnableRecordingNetworkContents bool
	ClientVersion                  string
	FirstloadVersion               string
	ClientConfig                   string
	Environment                    string
	AppVersion                     *string
	Fingerprint                    string
	UserAgent                      string
	AcceptLanguage                 string
	IP                             string
	ClientID                       string
	NetworkRecordingDomains        []string
	DisableSessionRecording        *bool
	ServiceName                    string
}

type LogRowMessage

type LogRowMessage struct {
	Type         PayloadType
	Failures     int
	MaxRetries   int
	KafkaMessage *kafka.Message `json:",omitempty"`
	*clickhouse.LogRow
}

func (*LogRowMessage) GetFailures

func (m *LogRowMessage) GetFailures() int

func (*LogRowMessage) GetKafkaMessage

func (m *LogRowMessage) GetKafkaMessage() *kafka.Message

func (*LogRowMessage) GetMaxRetries

func (m *LogRowMessage) GetMaxRetries() int

func (*LogRowMessage) GetType

func (m *LogRowMessage) GetType() PayloadType

func (*LogRowMessage) SetFailures

func (m *LogRowMessage) SetFailures(value int)

func (*LogRowMessage) SetKafkaMessage

func (m *LogRowMessage) SetKafkaMessage(value *kafka.Message)

func (*LogRowMessage) SetMaxRetries

func (m *LogRowMessage) SetMaxRetries(value int)

type Message

type Message struct {
	Type                  PayloadType
	Failures              int
	MaxRetries            int
	KafkaMessage          *kafka.Message             `json:",omitempty"`
	PushPayload           *PushPayloadArgs           `json:",omitempty"`
	InitializeSession     *InitializeSessionArgs     `json:",omitempty"`
	IdentifySession       *IdentifySessionArgs       `json:",omitempty"`
	AddTrackProperties    *AddTrackPropertiesArgs    `json:",omitempty"`
	AddSessionProperties  *AddSessionPropertiesArgs  `json:",omitempty"`
	PushBackendPayload    *PushBackendPayloadArgs    `json:",omitempty"`
	PushMetrics           *PushMetricsArgs           `json:",omitempty"`
	AddSessionFeedback    *AddSessionFeedbackArgs    `json:",omitempty"`
	PushLogs              *PushLogsArgs              `json:",omitempty"`
	PushTraces            *PushTracesArgs            `json:",omitempty"`
	SessionDataSync       *SessionDataSyncArgs       `json:",omitempty"`
	ErrorGroupDataSync    *ErrorGroupDataSyncArgs    `json:",omitempty"`
	ErrorObjectDataSync   *ErrorObjectDataSyncArgs   `json:",omitempty"`
	PushCompressedPayload *PushCompressedPayloadArgs `json:",omitempty"`
}

func (*Message) GetFailures

func (m *Message) GetFailures() int

func (*Message) GetKafkaMessage

func (m *Message) GetKafkaMessage() *kafka.Message

func (*Message) GetMaxRetries

func (m *Message) GetMaxRetries() int

func (*Message) GetType

func (m *Message) GetType() PayloadType

func (*Message) SetFailures

func (m *Message) SetFailures(value int)

func (*Message) SetKafkaMessage

func (m *Message) SetKafkaMessage(value *kafka.Message)

func (*Message) SetMaxRetries

func (m *Message) SetMaxRetries(value int)

type MessageQueue

type MessageQueue interface {
	Stop(context.Context)
	Receive(context.Context) RetryableMessage
	Submit(context.Context, string, ...RetryableMessage) error
	LogStats()
}

type MockMessageQueue

type MockMessageQueue struct{}

func (*MockMessageQueue) LogStats

func (k *MockMessageQueue) LogStats()

func (*MockMessageQueue) Receive

func (*MockMessageQueue) Stop

func (k *MockMessageQueue) Stop(context.Context)

func (*MockMessageQueue) Submit

type Mode

type Mode int
const (
	Producer Mode = 1 << iota
	Consumer Mode = 1 << iota
)

type PayloadType

type PayloadType = int
const (
	PushPayload                            PayloadType = iota
	InitializeSession                      PayloadType = iota
	IdentifySession                        PayloadType = iota
	AddTrackProperties                     PayloadType = iota // Deprecated: track events are now processed in pushPayload
	AddSessionProperties                   PayloadType = iota
	PushBackendPayload                     PayloadType = iota
	PushMetrics                            PayloadType = iota
	MarkBackendSetup                       PayloadType = iota // Deprecated: setup events are written from other payload processing
	AddSessionFeedback                     PayloadType = iota
	PushLogs                               PayloadType = iota // Deprecated: use a LogRowMessage with payload type PushLogsFlattened
	PushTraces                             PayloadType = iota
	HubSpotCreateContactForAdmin           PayloadType = iota // Deprecated: noop
	HubSpotCreateCompanyForWorkspace       PayloadType = iota // Deprecated: noop
	HubSpotUpdateContactProperty           PayloadType = iota // Deprecated: noop
	HubSpotUpdateCompanyProperty           PayloadType = iota // Deprecated: noop
	HubSpotCreateContactCompanyAssociation PayloadType = iota // Deprecated: noop
	SessionDataSync                        PayloadType = iota
	ErrorGroupDataSync                     PayloadType = iota
	ErrorObjectDataSync                    PayloadType = iota
	PushCompressedPayload                  PayloadType = iota
	PushLogsFlattened                      PayloadType = iota
	PushTracesFlattened                    PayloadType = iota
	HealthCheck                            PayloadType = math.MaxInt
)

type PushBackendPayloadArgs

type PushBackendPayloadArgs struct {
	ProjectVerboseID *string
	SessionSecureID  *string
	Errors           []*customModels.BackendErrorObjectInput
}

type PushCompressedPayloadArgs

type PushCompressedPayloadArgs struct {
	SessionSecureID string
	PayloadID       int
	Data            string `json:"data"`
}

type PushLogsArgs

type PushLogsArgs struct {
	LogRow *clickhouse.LogRow
}

type PushMetricsArgs

type PushMetricsArgs struct {
	ProjectVerboseID *string
	SessionSecureID  *string
	Metrics          []*customModels.MetricInput
}

type PushPayloadArgs

type PushPayloadArgs struct {
	SessionSecureID    string
	PayloadID          *int
	Events             customModels.ReplayEventsInput   `json:"events"`
	Messages           string                           `json:"messages"`
	Resources          string                           `json:"resources"`
	WebSocketEvents    *string                          `json:"web_socket_events"`
	Errors             []*customModels.ErrorObjectInput `json:"errors"`
	IsBeacon           *bool                            `json:"is_beacon"`
	HasSessionUnloaded *bool                            `json:"has_session_unloaded"`
	HighlightLogs      *string                          `json:"highlight_logs"`
}

type PushTracesArgs

type PushTracesArgs struct {
	TraceRow *clickhouse.TraceRow
}

type Queue

type Queue struct {
	Topic            string
	ConsumerGroup    string
	MessageSizeBytes int64
	Client           *kafka.Client
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, topic string, mode Mode, configOverride *ConfigOverride) *Queue

func (*Queue) Commit

func (p *Queue) Commit(ctx context.Context, msg *kafka.Message)

func (*Queue) LogStats

func (p *Queue) LogStats()

func (*Queue) Receive

func (p *Queue) Receive(ctx context.Context) (msg RetryableMessage)

func (*Queue) Rewind

func (p *Queue) Rewind(ctx context.Context, dur time.Duration) error

func (*Queue) Stop

func (p *Queue) Stop(ctx context.Context)

func (*Queue) Submit

func (p *Queue) Submit(ctx context.Context, partitionKey string, messages ...RetryableMessage) error

type RetryableMessage

type RetryableMessage interface {
	GetType() PayloadType
	GetFailures() int
	SetFailures(value int)
	GetMaxRetries() int
	SetMaxRetries(value int)
	GetKafkaMessage() *kafka.Message
	SetKafkaMessage(value *kafka.Message)
}

type SessionDataSyncArgs

type SessionDataSyncArgs struct {
	SessionID int
}

type TopicType

type TopicType string
const (
	TopicTypeDefault  TopicType = "default"
	TopicTypeBatched  TopicType = "batched"
	TopicTypeDataSync TopicType = "datasync"
	TopicTypeTraces   TopicType = "traces"
)

type TraceRowMessage

type TraceRowMessage struct {
	Type         PayloadType
	Failures     int
	MaxRetries   int
	KafkaMessage *kafka.Message `json:",omitempty"`
	*clickhouse.TraceRow
}

func (*TraceRowMessage) GetFailures

func (m *TraceRowMessage) GetFailures() int

func (*TraceRowMessage) GetKafkaMessage

func (m *TraceRowMessage) GetKafkaMessage() *kafka.Message

func (*TraceRowMessage) GetMaxRetries

func (m *TraceRowMessage) GetMaxRetries() int

func (*TraceRowMessage) GetType

func (m *TraceRowMessage) GetType() PayloadType

func (*TraceRowMessage) SetFailures

func (m *TraceRowMessage) SetFailures(value int)

func (*TraceRowMessage) SetKafkaMessage

func (m *TraceRowMessage) SetKafkaMessage(value *kafka.Message)

func (*TraceRowMessage) SetMaxRetries

func (m *TraceRowMessage) SetMaxRetries(value int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL