topic

package
v0.0.0-...-e2755d2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2022 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Prefix for all topic keys in the registry
	Prefix = "topics/"

	// RetryTopicSuffix every primary topic subscription will have a retry topic with this suffix as well
	RetryTopicSuffix = "-retry"

	// DeadLetterTopicSuffix every primary topic subscription will have a dlq topic with this suffix as well
	DeadLetterTopicSuffix = "-dlq"

	// SubscriptionSuffix is the suffix to be appended to the subscription specific topic
	SubscriptionSuffix = "-subscription-internal"

	// DefaultNumPartitions default no of partitions for a topic
	DefaultNumPartitions = 1

	// MaxNumPartitions max number of partitions for a topic
	MaxNumPartitions = 100

	// RetentionPeriodConfig is the topic level retention period config
	RetentionPeriodConfig = "retention.ms"

	// RetentionPeriod is the time after which messages will be deleted from the topic = 14 days
	RetentionPeriod = 1000 * 60 * 60 * 24 * 14

	// RetentionSizeConfig is the partition retention size config
	RetentionSizeConfig = "retention.bytes"

	// RetentionSizePerPartition is the max no of bytes retained per partition = 10000MB
	RetentionSizePerPartition = 10000 * 1000000
)
View Source
const DelayConsumerGroupIDFormat = "%v-cg"

DelayConsumerGroupIDFormat ... -> subs.delay.30.seconds-cg

View Source
const DelayConsumerGroupInstanceIDFormat = "%v-%v"

DelayConsumerGroupInstanceIDFormat ... -> delayTopicName-subscriberID

View Source
const DelayTopicNameFormat = "%v.delay.%v.seconds"

DelayTopicNameFormat ... -> subs-delay-30-seconds, subs-delay-60-seconds ... subs-delay-600-seconds

View Source
const DelayTopicSuffix = "delay.%v.seconds"

DelayTopicSuffix ... -> delay-30-seconds, delay-60-seconds ... delay-600-seconds

View Source
const DelayTopicWithProjectNameFormat = "projects/%v/topics/%v.delay.%v.seconds"

DelayTopicWithProjectNameFormat ... -> projects/p1/topics/subs.delay.30.seconds

View Source
const (
	// TopicNameFormat for public topic name "projects/{projectID}/topics/{topicName}
	TopicNameFormat = "projects/%s/topics/%s"
)

Variables

View Source
var (
	// MinDelay ...
	MinDelay = Delay5sec
	// MaxDelay ...
	MaxDelay = Delay3600sec
)

Intervals during subscription creation, query from the allowed intervals list, and create all the needed topics for retry.

Functions

func ExtractTopicMetaAndValidate

func ExtractTopicMetaAndValidate(ctx context.Context, name string) (projectID string, topicName string, err error)

ExtractTopicMetaAndValidate extracts topic metadata from its fully qualified name

func ExtractTopicMetaAndValidateForCreate

func ExtractTopicMetaAndValidateForCreate(ctx context.Context, name string) (string, string, error)

ExtractTopicMetaAndValidateForCreate extracts and validates the topic details, additionally for topic create it checks if name can collide with dlq topics

func GetTopicName

func GetTopicName(projectID string, name string) string

GetTopicName helper return the public topic name using project and topic name using format

func GetTopicNameOnly

func GetTopicNameOnly(topicName string) string

GetTopicNameOnly from the complete Topic Name

func IsDLQTopic

func IsDLQTopic(topicName string) bool

IsDLQTopic helper checks if the topic is dlq topic

func IsRetentionPolicyUnchanged

func IsRetentionPolicyUnchanged(existing, required map[string]string) bool

IsRetentionPolicyUnchanged checks if the existing and the required retention policy are same or not

Types

type Core

type Core struct {
	// contains filtered or unexported fields
}

Core implements all business logic for a topic

func (*Core) CreateDeadLetterTopic

func (c *Core) CreateDeadLetterTopic(ctx context.Context, model *Model) error

CreateDeadLetterTopic creates a dead letter topic for the given primary topic and name

func (*Core) CreateRetryTopic

func (c *Core) CreateRetryTopic(ctx context.Context, model *Model) error

CreateRetryTopic creates a retry topic for the given primary topic and name

func (*Core) CreateSubscriptionTopic

func (c *Core) CreateSubscriptionTopic(ctx context.Context, model *Model) error

CreateSubscriptionTopic creates a subscription topic for the given primary topic and name

func (*Core) CreateTopic

func (c *Core) CreateTopic(ctx context.Context, m *Model) error

CreateTopic implements topic creation

func (*Core) DeleteProjectTopics

func (c *Core) DeleteProjectTopics(ctx context.Context, projectID string) error

DeleteProjectTopics deletes all topics for a given projectID

func (*Core) DeleteTopic

func (c *Core) DeleteTopic(ctx context.Context, m *Model) error

DeleteTopic deletes a topic and all resources associated with it

func (*Core) Exists

func (c *Core) Exists(ctx context.Context, key string) (bool, error)

Exists checks if the topic exists with a given key

func (*Core) ExistsWithName

func (c *Core) ExistsWithName(ctx context.Context, name string) (bool, error)

ExistsWithName checks if the topic exists with a given name

func (*Core) Get

func (c *Core) Get(ctx context.Context, key string) (*Model, error)

Get returns topic with the given key

func (*Core) List

func (c *Core) List(ctx context.Context, prefix string) ([]*Model, error)

List gets slice of topics starting with given prefix

func (*Core) SetupTopicRetentionConfigs

func (c *Core) SetupTopicRetentionConfigs(ctx context.Context, names []string) ([]string, error)

SetupTopicRetentionConfigs sets up retention policy on top of dlq topics

func (*Core) UpdateTopic

func (c *Core) UpdateTopic(ctx context.Context, m *Model) error

UpdateTopic implements topic updation

type ICore

type ICore interface {
	CreateTopic(ctx context.Context, model *Model) error
	UpdateTopic(ctx context.Context, model *Model) error
	SetupTopicRetentionConfigs(ctx context.Context, names []string) ([]string, error)
	Exists(ctx context.Context, key string) (bool, error)
	ExistsWithName(ctx context.Context, name string) (bool, error)
	DeleteTopic(ctx context.Context, m *Model) error
	DeleteProjectTopics(ctx context.Context, projectID string) error
	Get(ctx context.Context, key string) (*Model, error)
	CreateSubscriptionTopic(ctx context.Context, model *Model) error
	CreateRetryTopic(ctx context.Context, model *Model) error
	CreateDeadLetterTopic(ctx context.Context, model *Model) error
	List(ctx context.Context, prefix string) ([]*Model, error)
}

ICore is an interface over topic core

func NewCore

func NewCore(repo IRepo, projectCore project.ICore, brokerStore brokerstore.IBrokerStore) ICore

NewCore returns an instance of Core

type IRepo

type IRepo interface {
	common.IRepo
	List(ctx context.Context, prefix string) ([]common.IModel, error)
}

IRepo interface over database repository

func NewRepo

func NewRepo(registry registry.IRegistry) IRepo

NewRepo returns IRepo

type Interval

type Interval uint

Interval is internal delay type per allowed interval

var (
	// Delay5sec 5sec
	Delay5sec Interval = 5
	// Delay30sec 30sec
	Delay30sec Interval = 30
	// Delay60sec 1min
	Delay60sec Interval = 60
	// Delay150sec 2.5min
	Delay150sec Interval = 150
	// Delay300sec 5min
	Delay300sec Interval = 300
	// Delay600sec 10min
	Delay600sec Interval = 600
	// Delay1800sec 30min
	Delay1800sec Interval = 1800
	// Delay3600sec 60min
	Delay3600sec Interval = 3600
)

type Model

type Model struct {
	common.BaseModel
	Name               string            `json:"name"`
	Labels             map[string]string `json:"labels"`
	ExtractedProjectID string            `json:"extracted_project_id"`
	ExtractedTopicName string            `json:"extracted_topic_name"`
	NumPartitions      int               `json:"num_partitions"`
}

Model for a topic

func GetValidatedModel

func GetValidatedModel(ctx context.Context, req *metrov1.Topic) (*Model, error)

GetValidatedModel validates an incoming proto request and returns the model

func GetValidatedTopicForAdminUpdate

func GetValidatedTopicForAdminUpdate(ctx context.Context, req *metrov1.AdminTopic) (*Model, error)

GetValidatedTopicForAdminUpdate validates an incoming proto request and returns the model

func (*Model) GetRetentionConfig

func (m *Model) GetRetentionConfig() map[string]string

GetRetentionConfig returns the retention policy for a given topic

func (*Model) IsDeadLetterTopic

func (m *Model) IsDeadLetterTopic() bool

IsDeadLetterTopic checks if the topic is a dead letter topic created for dlq support on subscription

func (*Model) IsDelayTopic

func (m *Model) IsDelayTopic() bool

IsDelayTopic checks if the topic is a delay topic created for delay support in subscription

func (*Model) IsPrimaryTopic

func (m *Model) IsPrimaryTopic() bool

IsPrimaryTopic checks if the topic is primary topic or not

func (*Model) IsRetryTopic

func (m *Model) IsRetryTopic() bool

IsRetryTopic checks if the topic is a retry topic created for retry support in subscription

func (*Model) IsSubscriptionInternalTopic

func (m *Model) IsSubscriptionInternalTopic() bool

IsSubscriptionInternalTopic checks if the topic is subscription's internal topic or not

func (*Model) Key

func (m *Model) Key() string

Key returns the key for storing in the registry

func (*Model) Prefix

func (m *Model) Prefix() string

Prefix returns the key prefix

type Repo

type Repo struct {
	common.BaseRepo
}

Repo implements various repository methods

func (*Repo) List

func (r *Repo) List(ctx context.Context, prefix string) ([]common.IModel, error)

List returns a slice of topics matching prefix

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL