padmin

package
v0.0.0-...-90eaee1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2023 License: Apache-2.0 Imports: 15 Imported by: 2

Documentation

Index

Constants

View Source
const (
	// UrlPath for the Admin API
	UrlPath             = "/admin/v2"
	UrlClusters         = UrlPath + "/clusters"
	UrlTenants          = UrlPath + "/tenants"
	UrlNamespacesFormat = UrlPath + "/namespaces/%s/%s"
)
View Source
const (
	UrlBookiesAll         = UrlPath + "/bookies/all"
	UrlBookiesRacksInfo   = UrlPath + "/bookies/racks-info"
	UrlBookiesRacksFormat = UrlPath + "/bookies/racks-info/%s"
)

bookies

View Source
const (
	UrlLookupBrokerFormat             = "/lookup/v2/topic/%s/%s/%s/%s"
	UrlLookupGetNamespaceBundleFormat = "/lookup/v2/topic/%s/%s/%s/%s/bundle"
)

lookup

View Source
const (
	UrlNamespaceRetentionFormat                         = UrlPath + "/namespaces/%s/%s/retention"
	UrlNamespaceGetBacklogQuotaMapFormat                = UrlPath + "/namespaces/%s/%s/backlogQuotaMap"
	UrlNamespaceOperateBacklogQuotaFormat               = UrlPath + "/namespaces/%s/%s/backlogQuota"
	UrlNamespaceClearAllTopicsBacklogFormat             = UrlPath + "/namespaces/%s/%s/clearBacklog"
	UrlNamespaceClearSubscriptionBacklogFormat          = UrlPath + "/namespaces/%s/%s/clearBacklog/%s"
	UrlNamespaceClearAllTopicsBacklogForBundleFormat    = UrlPath + "/namespaces/%s/%s/%s/clearBacklog"
	UrlNamespaceClearSubscriptionBacklogForBundleFormat = UrlPath + "/namespaces/%s/%s/%s/clearBacklog/%s"
	UrlNamespaceCompactionThresholdFormat               = UrlPath + "/namespaces/%s/%s/compactionThreshold"
	UrlNamespaceMessageTTLFormat                        = UrlPath + "/namespaces/%s/%s/messageTTL"
)

namespace

View Source
const (
	UrlPersistentNamespaceFormat                            = UrlPath + "/persistent/%s/%s"
	UrlPersistentTopicFormat                                = UrlPath + "/persistent/%s/%s/%s"
	UrlPersistentPartitionedNamespaceFormat                 = UrlPath + "/persistent/%s/%s/partitioned"
	UrlPersistentPartitionedTopicFormat                     = UrlPath + "/persistent/%s/%s/%s/partitions"
	UrlPersistentPartitionedRetentionFormat                 = UrlPath + "/persistent/%s/%s/%s/retention"
	UrlPersistentTopicGetBacklogQuotaMapFormat              = UrlPath + "/persistent/%s/%s/%s/backlogQuotaMap"
	UrlPersistentTopicOperateBacklogQuotaFormat             = UrlPath + "/persistent/%s/%s/%s/backlogQuota"
	UrlPersistentTopicEstimatedOfflineBacklogFormat         = UrlPath + "/persistent/%s%s%s/backlog"
	UrlPersistentTopicCalculateBacklogSizeByMessageIDFormat = UrlPath + "/persistent/%s/%s/%s/backlogSize"
	UrlPersistentTopicCompactionThresholdFormat             = UrlPath + "/persistent/%s/%s/%s/compactionThreshold"
	UrlPersistentTopicCompactionFormat                      = UrlPath + "/persistent/%s/%s/%s/compaction"
	UrlPersistentTopicMessageTTLFormat                      = UrlPath + "/persistent/%s/%s/%s/messageTTL"
	UrlPersistentTopicCreateMissedPartitionsFormat          = UrlPath + "/persistent/%s/%s/%s/createMissedPartitions"
	UrlPersistentGetLastMessageIdFormat                     = UrlPath + "/persistent/%s/%s/%s/lastMessageId"
	UrlPersistentGetInternalStatsForTopicFormat             = UrlPath + "/persistent/%s/%s/%s/internalStats"
	UrlPersistentGetInternalStatsForPartitionedTopicFormat  = UrlPath + "/persistent/%s/%s/%s/partitioned-internalStats"
	UrlPersistentGetStatsForTopicFormat                     = UrlPath + "/persistent/%s/%s/%s/stats"
	UrlPersistentGetStatsForPartitionedTopicFormat          = UrlPath + "/persistent/%s/%s/%s/partitioned-stats"
)

persistent

View Source
const (
	UrlNonPersistentNamespaceFormat                            = UrlPath + "/non-persistent/%s/%s"
	UrlNonPersistentTopicFormat                                = UrlPath + "/non-persistent/%s/%s/%s"
	UrlNonPersistentPartitionedTopicFormat                     = UrlPath + "/non-persistent/%s/%s/%s/partitions"
	UrlNonPersistentPartitionedNamespaceFormat                 = UrlPath + "/non-persistent/%s/%s/partitioned"
	UrlNonPersistentPartitionedRetentionFormat                 = UrlPath + "/non-persistent/%s/%s/%s/retention"
	UrlNonPersistentTopicGetBacklogQuotaMapFormat              = UrlPath + "/non-persistent/%s/%s/%s/backlogQuotaMap"
	UrlNonPersistentTopicOperateBacklogQuotaFormat             = UrlPath + "/non-persistent/%s/%s/%s/backlogQuota"
	UrlNonPersistentTopicEstimatedOfflineBacklogFormat         = UrlPath + "/non-persistent/%s%s%s/backlog"
	UrlNonPersistentTopicCalculateBacklogSizeByMessageIDFormat = UrlPath + "/non-persistent/%s/%s/%s/backlogSize"
	UrlNonPersistentTopicCompactionThresholdFormat             = UrlPath + "/non-persistent/%s/%s/%s/compactionThreshold"
	UrlNonPersistentTopicCompactionFormat                      = UrlPath + "/non-persistent/%s/%s/%s/compaction"
	UrlNonPersistentTopicMessageTTLFormat                      = UrlPath + "/non-persistent/%s/%s/%s/messageTTL"
	UrlNonPersistentTopicsCreateMissedPartitionsFormat         = UrlPath + "/non-persistent/%s/%s/%s/createMissedPartitions"
	UrlNonPersistentGetLastMessageIdFormat                     = UrlPath + "/non-persistent/%s/%s/%s/lastMessageId"
	UrlNonPersistentGetInternalStatsForTopicFormat             = UrlPath + "/non-persistent/%s/%s/%s/internalStats"
	UrlNonPersistentGetInternalStatsForPartitionedTopicFormat  = UrlPath + "/non-persistent/%s/%s/%s/partitioned-internalStats"
	UrlNonPersistentGetStatsForTopicFormat                     = UrlPath + "/non-persistent/%s/%s/%s/stats"
	UrlNonPersistentGetStatsForPartitionedTopicFormat          = UrlPath + "/non-persistent/%s/%s/%s/partitioned-stats"
)

non-persistent

Variables

This section is empty.

Functions

func EasyReader

func EasyReader(resp *http.Response, ptr interface{}) error

func HttpCheck

func HttpCheck(response *http.Response) error

func HttpCheckReadBytes

func HttpCheckReadBytes(response *http.Response) ([]byte, error)

func RandBytes

func RandBytes(n int64) []byte

func RandStr

func RandStr(n int64) string

func ReadAll

func ReadAll(r io.Reader) (string, error)

func StatusNok

func StatusNok(code int) bool

func StatusOk

func StatusOk(code int) bool

Types

type AllBookiesResp

type AllBookiesResp struct {
	Bookies []RawBookieInfo `json:"bookies"`
}

type BacklogQuota

type BacklogQuota struct {
	Limit     int64
	LimitSize int64
	LimitTime int64
	Policy    RetentionPolicy
}

type BacklogQuotaResp

type BacklogQuotaResp struct {
	DestinationStorage BacklogQuota `json:"destination_storage,omitempty"`
	MessageAge         BacklogQuota `json:"message_age,omitempty"`
}

type BacklogQuotaType

type BacklogQuotaType string
const (
	DestinationStorage BacklogQuotaType = "destination_storage"
	MessageAge         BacklogQuotaType = "message_age"
)

type BookieInfo

type BookieInfo struct {
	Rack     string `json:"rack"`
	Hostname string `json:"hostname"`
}

type Bookies

type Bookies interface {
	// AllBookies Gets raw information for all the bookies in the cluster
	AllBookies() (*AllBookiesResp, error)
	// ListRacksInfo Gets the rack placement information for all the bookies in the cluster
	ListRacksInfo() (*ListRacksInfoResp, error)
	// RemoveRacksInfo Removed the rack placement information for a specific bookie in the cluster
	RemoveRacksInfo(string) error
	// GetRacksInfo Gets the rack placement information for a specific bookie in the cluster
	GetRacksInfo(string) (*BookieInfo, error)
	// UpdateRacksInfo Updates the rack placement information for a specific bookie in the cluster
	// (note. bookie address format:`address:port`)
	UpdateRacksInfo(string, *BookieInfo) error
}

type BookiesImpl

type BookiesImpl struct {
	// contains filtered or unexported fields
}

func (*BookiesImpl) AllBookies

func (b *BookiesImpl) AllBookies() (*AllBookiesResp, error)

func (*BookiesImpl) GetRacksInfo

func (b *BookiesImpl) GetRacksInfo(bookie string) (*BookieInfo, error)

func (*BookiesImpl) ListRacksInfo

func (b *BookiesImpl) ListRacksInfo() (*ListRacksInfoResp, error)

func (*BookiesImpl) RemoveRacksInfo

func (b *BookiesImpl) RemoveRacksInfo(bookie string) error

RemoveRacksInfo bookie is address:port

func (*BookiesImpl) UpdateRacksInfo

func (b *BookiesImpl) UpdateRacksInfo(bookie string, info *BookieInfo) error

type Clusters

type Clusters interface {
	List() ([]string, error)
}

type ClustersImpl

type ClustersImpl struct {
	// contains filtered or unexported fields
}

func (*ClustersImpl) List

func (c *ClustersImpl) List() ([]string, error)

type Compaction

type Compaction struct {
	LastCompactionRemovedEventCount   uint64 `json:"lastCompactionRemovedEventCount,omitempty"`
	LastCompactionSucceedTimestamp    uint64 `json:"lastCompactionSucceedTimestamp,omitempty"`
	LastCompactionFailedTimestamp     uint64 `json:"lastCompactionFailedTimestamp,omitempty"`
	LastCompactionDurationTimeInMills uint64 `json:"lastCompactionDurationTimeInMills,omitempty"`
}

type CompactionStatus

type CompactionStatus string
const (
	NotRun  CompactionStatus = "NOT_RUN"
	Running CompactionStatus = "RUNNING"
	Success CompactionStatus = "SUCCESS"
	ERROR   CompactionStatus = "ERROR"
)

type Config

type Config struct {

	// Host pulsar service address, default localhost
	Host string
	// Port pulsar service port, default 8080
	Port int
	// TlsEnable enable tls, default false
	TlsEnable bool
	// TlsConfig tls config
	TlsConfig *tls.Config
	// ConnectionTimeout connect timeout, default 0, zero means no timeout
	ConnectionTimeout int64
	// contains filtered or unexported fields
}

type CursorDetail

type CursorDetail struct {
	CursorBacklog  int64
	CursorLedgerId int64
}

type CursorStats

type CursorStats struct {
	MarkDeletePosition                       string           `json:"markDeletePosition"`
	ReadPosition                             string           `json:"readPosition"`
	WaitingReadOp                            bool             `json:"waitingReadOp"`
	PendingReadOps                           int              `json:"pendingReadOps"`
	MessagesConsumedCounter                  int64            `json:"messagesConsumedCounter"`
	CursorLedger                             int64            `json:"cursorLedger"`
	CursorLedgerLastEntry                    int64            `json:"cursorLedgerLastEntry"`
	IndividuallyDeletedMessages              string           `json:"individuallyDeletedMessages"`
	LastLedgerSwitchTimestamp                string           `json:"lastLedgerSwitchTimestamp"`
	State                                    string           `json:"state"`
	Active                                   bool             `json:"active"`
	NumberOfEntriesSinceFirstNotAckedMessage int64            `json:"numberOfEntriesSinceFirstNotAckedMessage"`
	TotalNonContiguousDeletedMessagesRange   int              `json:"totalNonContiguousDeletedMessagesRange"`
	SubscriptionHavePendingRead              bool             `json:"subscriptionHavePendingRead"`
	SubscriptionHavePendingReplayRead        bool             `json:"subscriptionHavePendingReplayRead"`
	Properties                               map[string]int64 `json:"properties"`
}

type HttpClient

type HttpClient interface {
	Get(path string) (*http.Response, error)
	Put(path string, body any) (*http.Response, error)
	Post(path string, body any) (*http.Response, error)
	Delete(path string) (*http.Response, error)
	Do(*http.Request) (*http.Response, error)
}

type HttpClientImpl

type HttpClientImpl struct {
	// contains filtered or unexported fields
}

func (*HttpClientImpl) Delete

func (h *HttpClientImpl) Delete(path string) (*http.Response, error)

func (*HttpClientImpl) Do

func (h *HttpClientImpl) Do(req *http.Request) (*http.Response, error)

func (*HttpClientImpl) Get

func (h *HttpClientImpl) Get(path string) (*http.Response, error)

func (*HttpClientImpl) Post

func (h *HttpClientImpl) Post(path string, body any) (*http.Response, error)

func (*HttpClientImpl) Put

func (h *HttpClientImpl) Put(path string, body any) (*http.Response, error)

type LedgerDetail

type LedgerDetail struct {
	Entries   int64
	Timestamp int64
	Size      int64
	LedgerId  int64
}

type LedgerInfo

type LedgerInfo struct {
	LedgerId        int64  `json:"ledgerId"`
	Entries         int64  `json:"entries"`
	Size            int64  `json:"size"`
	Offloaded       bool   `json:"offloaded"`
	Metadata        string `json:"metadata"`
	UnderReplicated bool   `json:"underReplicated"`
}

type ListRacksInfoResp

type ListRacksInfoResp map[string]map[string]BookieInfo

type LongRunningProcessStatus

type LongRunningProcessStatus struct {
	Status    CompactionStatus `json:"status"`
	LastError string           `json:"lastError"`
}

type Lookup

type Lookup interface {
	// GetOwner get the owner broker of the given topic.
	// topicDomain: ref TopicDomain type, value persistent or non-persistent
	GetOwner(topicDomain TopicDomain, tenant, namespace, topic string) (*LookupData, error)
	// GetNamespaceBundle get the namespace bundle which the given topic belongs to.
	GetNamespaceBundle(topicDomain TopicDomain, tenant, namespace, topic string) (string, error)
}

type LookupData

type LookupData struct {
	BrokerUrl    string `json:"brokerUrl"`
	BrokerUrlTls string `json:"brokerUrlTls"`
	HttpUrl      string `json:"httpUrl"`    // web service http address
	HttpUrlTls   string `json:"httpUrlTls"` // web service https address
	NativeUrl    string `json:"nativeUrl"`
}

type LookupImpl

type LookupImpl struct {
	// contains filtered or unexported fields
}

func (*LookupImpl) GetNamespaceBundle

func (l *LookupImpl) GetNamespaceBundle(topicDomain TopicDomain, tenant, namespace, topic string) (string, error)

func (*LookupImpl) GetOwner

func (l *LookupImpl) GetOwner(topicDomain TopicDomain, tenant, namespace, topic string) (*LookupData, error)

type MessageId

type MessageId struct {
	LedgerId       int64 `json:"ledgerId"`
	EntryId        int64 `json:"entryId"`
	PartitionIndex int32 `json:"partitionIndex"`
	BatchIndex     int32 `json:"batchIndex"`
}

type NamespaceBacklog

type NamespaceBacklog interface {
	// GetNamespaceBacklogQuota Get backlog quota map on a namespace.
	GetNamespaceBacklogQuota(tenant, namespace string) (*BacklogQuotaResp, error)
	// SetNamespaceBacklogQuota  Set a backlog quota for all the topics on a namespace.
	SetNamespaceBacklogQuota(tenant, namespace string, cfg *BacklogQuota) error
	// RemoveNamespaceBacklogQuota Remove a backlog quota policy from a namespace.
	RemoveNamespaceBacklogQuota(tenant, namespace string, opts ...Option) error
	//ClearNamespaceAllTopicsBacklog Clear backlog for all topics on a namespace.
	ClearNamespaceAllTopicsBacklog(tenant, namespace string) error
	// ClearNamespaceSubscriptionBacklog Clear backlog for a given subscription on all topics on a namespace.
	ClearNamespaceSubscriptionBacklog(tenant, namespace, subscription string) error
	// ClearNamespaceAllTopicsBacklogForBundle Clear backlog for all topics on a namespace bundle.
	ClearNamespaceAllTopicsBacklogForBundle(tenant, namespace, bundle string) error
	// ClearNamespaceSubscriptionBacklogForBundle Clear backlog for a given subscription on all topics on a namespace bundle.
	ClearNamespaceSubscriptionBacklogForBundle(tenant, namespace, subscription, bundle string) error
}

type NamespaceCompaction

type NamespaceCompaction interface {
	// GetMaximumUnCompactedBytes Delete maximum number of uncompacted bytes in a topic before compaction is triggered.
	GetMaximumUnCompactedBytes(tenant, namespace string) (int64, error)
	// SetMaximumUnCompactedBytes Set maximum number of uncompacted bytes in a topic before compaction is triggered.
	SetMaximumUnCompactedBytes(tenant, namespace string, threshold int64) error
	// RemoveMaximumUnCompactedBytes Delete maximum number of uncompacted bytes in a topic before compaction is triggered.
	RemoveMaximumUnCompactedBytes(tenant, namespace string) error
}

type NamespaceMessageTTL

type NamespaceMessageTTL interface {
	// GetNamespaceMessageTTL Get the message TTL for the namespace
	GetNamespaceMessageTTL(tenant, namespace string) (int64, error)
	// SetNamespaceMessageTTL Set the message TTL for the namespace
	SetNamespaceMessageTTL(tenant, namespace string, seconds int64) error
	// RemoveNamespaceMessageTTL Remove the message TTL for the namespace
	RemoveNamespaceMessageTTL(tenant, namespace string) error
}

NamespaceMessageTTL Discard data after some time (by automatically acknowledging)

type NamespaceRetention

type NamespaceRetention interface {
	// GetNamespaceRetention get retention configuration for namespace.
	GetNamespaceRetention(tenant, namespace string) (*RetentionConfiguration, error)
	// SetNamespaceRetention set retention configuration for namespace.
	SetNamespaceRetention(tenant, namespace string, cfg *RetentionConfiguration) error
	// RemoveNamespaceRetention remove retention configuration for namespace.
	RemoveNamespaceRetention(tenant, namespace string) error
}

NamespaceRetention operate interface about retention configuration for namespace.

type Namespaces

type Namespaces interface {
	Create(tenant, namespace string) error
	Delete(tenant, namespace string) error
	List(tenant string) ([]string, error)
	NamespaceRetention
	NamespaceBacklog
	NamespaceMessageTTL
	NamespaceCompaction
}

type NamespacesImpl

type NamespacesImpl struct {
	// contains filtered or unexported fields
}

func (*NamespacesImpl) ClearNamespaceAllTopicsBacklog

func (n *NamespacesImpl) ClearNamespaceAllTopicsBacklog(tenant, namespace string) error

ClearNamespaceAllTopicsBacklog Clear backlog for all topics on a namespace. required params: tenant, namespace

func (*NamespacesImpl) ClearNamespaceAllTopicsBacklogForBundle

func (n *NamespacesImpl) ClearNamespaceAllTopicsBacklogForBundle(tenant, namespace, bundle string) error

ClearNamespaceAllTopicsBacklogForBundle Clear backlog for all topics on a namespace bundle. required params: tenant, namespace, bundleName

func (*NamespacesImpl) ClearNamespaceSubscriptionBacklog

func (n *NamespacesImpl) ClearNamespaceSubscriptionBacklog(tenant, namespace, subscription string) error

ClearNamespaceSubscriptionBacklog Clear backlog for a given subscription on all topics on a namespace. required params: tenant, namespace, subscriptionName

func (*NamespacesImpl) ClearNamespaceSubscriptionBacklogForBundle

func (n *NamespacesImpl) ClearNamespaceSubscriptionBacklogForBundle(tenant, namespace, subscription, bundle string) error

ClearNamespaceSubscriptionBacklogForBundle Clear backlog for a given subscription on all topics on a namespace bundle. required params: tenant, namespace, subscriptionName, bundleName

func (*NamespacesImpl) Create

func (n *NamespacesImpl) Create(tenant, namespace string) error

func (*NamespacesImpl) Delete

func (n *NamespacesImpl) Delete(tenant, namespace string) error

func (*NamespacesImpl) GetMaximumUnCompactedBytes

func (n *NamespacesImpl) GetMaximumUnCompactedBytes(tenant, namespace string) (int64, error)

func (*NamespacesImpl) GetNamespaceBacklogQuota

func (n *NamespacesImpl) GetNamespaceBacklogQuota(tenant, namespace string) (*BacklogQuotaResp, error)

GetNamespaceBacklogQuota Get backlog quota map on a namespace. required params: tenant, namespace

func (*NamespacesImpl) GetNamespaceMessageTTL

func (n *NamespacesImpl) GetNamespaceMessageTTL(tenant, namespace string) (int64, error)

func (*NamespacesImpl) GetNamespaceRetention

func (n *NamespacesImpl) GetNamespaceRetention(tenant, namespace string) (*RetentionConfiguration, error)

func (*NamespacesImpl) List

func (n *NamespacesImpl) List(tenant string) ([]string, error)

func (*NamespacesImpl) RemoveMaximumUnCompactedBytes

func (n *NamespacesImpl) RemoveMaximumUnCompactedBytes(tenant, namespace string) error

func (*NamespacesImpl) RemoveNamespaceBacklogQuota

func (n *NamespacesImpl) RemoveNamespaceBacklogQuota(tenant, namespace string, opts ...Option) error

RemoveNamespaceBacklogQuota Remove a backlog quota policy from a namespace. required params: tenant, namespace

func (*NamespacesImpl) RemoveNamespaceMessageTTL

func (n *NamespacesImpl) RemoveNamespaceMessageTTL(tenant, namespace string) error

func (*NamespacesImpl) RemoveNamespaceRetention

func (n *NamespacesImpl) RemoveNamespaceRetention(tenant, namespace string) error

func (*NamespacesImpl) SetMaximumUnCompactedBytes

func (n *NamespacesImpl) SetMaximumUnCompactedBytes(tenant, namespace string, threshold int64) error

func (*NamespacesImpl) SetNamespaceBacklogQuota

func (n *NamespacesImpl) SetNamespaceBacklogQuota(tenant, namespace string, cfg *BacklogQuota) error

SetNamespaceBacklogQuota Set a backlog quota for all the topics on a namespace. required params: tenant, namespace

func (*NamespacesImpl) SetNamespaceMessageTTL

func (n *NamespacesImpl) SetNamespaceMessageTTL(tenant, namespace string, seconds int64) error

func (*NamespacesImpl) SetNamespaceRetention

func (n *NamespacesImpl) SetNamespaceRetention(tenant, namespace string, cfg *RetentionConfiguration) error

func (*NamespacesImpl) WithOptions

func (n *NamespacesImpl) WithOptions(opts ...Option) Namespaces

type NonPersistentTopics

type NonPersistentTopics interface {
	WithOptions(opts ...Option) NonPersistentTopics
	Topics
	TopicBacklog
	TopicMessageTTL
	TopicCompaction
	TopicStats
}

type NonPersistentTopicsImpl

type NonPersistentTopicsImpl struct {
	// contains filtered or unexported fields
}

func (*NonPersistentTopicsImpl) CalculateBacklogSizeByMessageID

func (n *NonPersistentTopicsImpl) CalculateBacklogSizeByMessageID(tenant, namespace, topic string) error

CalculateBacklogSizeByMessageID Calculate backlog size by a message ID (in bytes).

func (*NonPersistentTopicsImpl) CreateMissedPartitions

func (n *NonPersistentTopicsImpl) CreateMissedPartitions(tenant string, namespace string, topic string) error

func (*NonPersistentTopicsImpl) CreateNonPartitioned

func (n *NonPersistentTopicsImpl) CreateNonPartitioned(tenant, namespace, topic string) error

func (*NonPersistentTopicsImpl) CreatePartitioned

func (n *NonPersistentTopicsImpl) CreatePartitioned(tenant, namespace, topic string, numPartitions int) error

func (*NonPersistentTopicsImpl) DeleteNonPartitioned

func (n *NonPersistentTopicsImpl) DeleteNonPartitioned(tenant, namespace, topic string) error

func (*NonPersistentTopicsImpl) DeletePartitioned

func (n *NonPersistentTopicsImpl) DeletePartitioned(tenant, namespace, topic string) error

func (*NonPersistentTopicsImpl) EstimatedOfflineTopicBacklog

func (n *NonPersistentTopicsImpl) EstimatedOfflineTopicBacklog(tenant, namespace, topic string) (*OfflineTopicStats, error)

EstimatedOfflineTopicBacklog Get estimated backlog for offline topic.

func (*NonPersistentTopicsImpl) GetLastMessageId

func (n *NonPersistentTopicsImpl) GetLastMessageId(tenant string, namespace string, topic string) (*MessageId, error)

func (*NonPersistentTopicsImpl) GetPartitionedMetadata

func (n *NonPersistentTopicsImpl) GetPartitionedMetadata(tenant, namespace, topic string) (*PartitionedMetadata, error)

func (*NonPersistentTopicsImpl) GetPartitionedStats

func (n *NonPersistentTopicsImpl) GetPartitionedStats(tenant string, namespace string, topic string) (*TopicStatistics, error)

func (*NonPersistentTopicsImpl) GetPartitionedStatsInternal

func (n *NonPersistentTopicsImpl) GetPartitionedStatsInternal(tenant string, namespace string, topic string) (*PartitionedTopicInternalStats, error)

func (*NonPersistentTopicsImpl) GetStats

func (n *NonPersistentTopicsImpl) GetStats(tenant string, namespace string, topic string) (*TopicStatistics, error)

func (*NonPersistentTopicsImpl) GetStatsInternal

func (n *NonPersistentTopicsImpl) GetStatsInternal(tenant string, namespace string, topic string) (*TopicInternalStats, error)

func (*NonPersistentTopicsImpl) GetTopicBacklogQuota

func (n *NonPersistentTopicsImpl) GetTopicBacklogQuota(tenant, namespace, topic string) (*BacklogQuotaResp, error)

GetTopicBacklogQuota Get backlog quota map on a topic.

func (*NonPersistentTopicsImpl) GetTopicCompactionStatus

func (n *NonPersistentTopicsImpl) GetTopicCompactionStatus(tenant, namespace, topic string) (*LongRunningProcessStatus, error)

func (*NonPersistentTopicsImpl) GetTopicCompactionThreshold

func (n *NonPersistentTopicsImpl) GetTopicCompactionThreshold(tenant, namespace, topic string) (int64, error)

func (*NonPersistentTopicsImpl) GetTopicMessageTTL

func (n *NonPersistentTopicsImpl) GetTopicMessageTTL(tenant, namespace, topic string) (int64, error)

func (*NonPersistentTopicsImpl) GetTopicRetention

func (n *NonPersistentTopicsImpl) GetTopicRetention(tenant, namespace, topic string) (*RetentionConfiguration, error)

func (*NonPersistentTopicsImpl) ListNamespaceTopics

func (n *NonPersistentTopicsImpl) ListNamespaceTopics(tenant, namespace string) ([]string, error)

func (*NonPersistentTopicsImpl) ListNonPartitioned

func (n *NonPersistentTopicsImpl) ListNonPartitioned(tenant, namespace string) ([]string, error)

func (*NonPersistentTopicsImpl) ListPartitioned

func (n *NonPersistentTopicsImpl) ListPartitioned(tenant, namespace string) ([]string, error)

func (*NonPersistentTopicsImpl) RemoveTopicBacklogQuota

func (n *NonPersistentTopicsImpl) RemoveTopicBacklogQuota(tenant, namespace, topic string, opts ...Option) error

RemoveTopicBacklogQuota Remove a backlog quota policy from a topic.

func (*NonPersistentTopicsImpl) RemoveTopicCompactionThreshold

func (n *NonPersistentTopicsImpl) RemoveTopicCompactionThreshold(tenant, namespace, topic string) error

func (*NonPersistentTopicsImpl) RemoveTopicMessageTTL

func (n *NonPersistentTopicsImpl) RemoveTopicMessageTTL(tenant, namespace, topic string) error

func (*NonPersistentTopicsImpl) RemoveTopicRetention

func (n *NonPersistentTopicsImpl) RemoveTopicRetention(tenant, namespace, topic string) error

func (*NonPersistentTopicsImpl) SetTopicBacklogQuota

func (n *NonPersistentTopicsImpl) SetTopicBacklogQuota(tenant, namespace, topic string, cfg *BacklogQuota) error

SetTopicBacklogQuota Set a backlog quota for a topic.

func (*NonPersistentTopicsImpl) SetTopicCompactionThreshold

func (n *NonPersistentTopicsImpl) SetTopicCompactionThreshold(tenant, namespace, topic string, threshold int64) error

func (*NonPersistentTopicsImpl) SetTopicMessageTTL

func (n *NonPersistentTopicsImpl) SetTopicMessageTTL(tenant, namespace, topic string, seconds int64) error

func (*NonPersistentTopicsImpl) SetTopicRetention

func (n *NonPersistentTopicsImpl) SetTopicRetention(tenant, namespace, topic string, cfg *RetentionConfiguration) error

func (*NonPersistentTopicsImpl) TriggerTopicCompaction

func (n *NonPersistentTopicsImpl) TriggerTopicCompaction(tenant, namespace, topic string) error

func (*NonPersistentTopicsImpl) WithOptions

func (n *NonPersistentTopicsImpl) WithOptions(opts ...Option) NonPersistentTopics

type OfflineTopicStats

type OfflineTopicStats struct {
	StorageSize       int64
	TotalMessages     int64
	MessageBacklog    int64
	BrokerName        string
	TopicName         string
	DataLedgerDetails []LedgerDetail
	CursorDetails     map[string]CursorDetail
	StatGeneratedAt   time.Time
}

type Option

type Option func(*options)

func WithBacklogQuotaType

func WithBacklogQuotaType(backlogQuotaType BacklogQuotaType) Option

type PartitionedMetadata

type PartitionedMetadata struct {
	Deleted    bool  `json:"deleted"`
	Partitions int64 `json:"partitions"`
}

PartitionedMetadata partitioned topic metadata

type PartitionedTopicInternalStats

type PartitionedTopicInternalStats struct {
	Metadata   PartitionedMetadata            `json:"metadata"`
	Partitions map[string]*TopicInternalStats `json:"partitions"`
}

type PartitionedTopicMetadata

type PartitionedTopicMetadata struct {
	Partitions int32             `json:"partitions"`
	Properties map[string]string `json:"properties"`
}

type PersistentTopics

type PersistentTopics interface {
	WithOptions(opts ...Option) PersistentTopics
	Topics
	TopicBacklog
	TopicMessageTTL
	TopicCompaction
	TopicStats
}

type PersistentTopicsImpl

type PersistentTopicsImpl struct {
	// contains filtered or unexported fields
}

func (*PersistentTopicsImpl) CalculateBacklogSizeByMessageID

func (p *PersistentTopicsImpl) CalculateBacklogSizeByMessageID(tenant, namespace, topic string) error

CalculateBacklogSizeByMessageID Calculate backlog size by a message ID (in bytes).

func (*PersistentTopicsImpl) CreateMissedPartitions

func (p *PersistentTopicsImpl) CreateMissedPartitions(tenant string, namespace string, topic string) error

func (*PersistentTopicsImpl) CreateNonPartitioned

func (p *PersistentTopicsImpl) CreateNonPartitioned(tenant, namespace, topic string) error

func (*PersistentTopicsImpl) CreatePartitioned

func (p *PersistentTopicsImpl) CreatePartitioned(tenant, namespace, topic string, numPartitions int) error

func (*PersistentTopicsImpl) DeleteNonPartitioned

func (p *PersistentTopicsImpl) DeleteNonPartitioned(tenant, namespace, topic string) error

func (*PersistentTopicsImpl) DeletePartitioned

func (p *PersistentTopicsImpl) DeletePartitioned(tenant, namespace, topic string) error

func (*PersistentTopicsImpl) EstimatedOfflineTopicBacklog

func (p *PersistentTopicsImpl) EstimatedOfflineTopicBacklog(tenant, namespace, topic string) (*OfflineTopicStats, error)

EstimatedOfflineTopicBacklog Get estimated backlog for offline topic.

func (*PersistentTopicsImpl) GetLastMessageId

func (p *PersistentTopicsImpl) GetLastMessageId(tenant string, namespace string, topic string) (*MessageId, error)

func (*PersistentTopicsImpl) GetPartitionedMetadata

func (p *PersistentTopicsImpl) GetPartitionedMetadata(tenant, namespace, topic string) (*PartitionedMetadata, error)

func (*PersistentTopicsImpl) GetPartitionedStats

func (p *PersistentTopicsImpl) GetPartitionedStats(tenant, namespace, topic string) (*TopicStatistics, error)

func (*PersistentTopicsImpl) GetPartitionedStatsInternal

func (p *PersistentTopicsImpl) GetPartitionedStatsInternal(tenant, namespace, topic string) (*PartitionedTopicInternalStats, error)

func (*PersistentTopicsImpl) GetStats

func (p *PersistentTopicsImpl) GetStats(tenant, namespace, topic string) (*TopicStatistics, error)

func (*PersistentTopicsImpl) GetStatsInternal

func (p *PersistentTopicsImpl) GetStatsInternal(tenant, namespace, topic string) (*TopicInternalStats, error)

func (*PersistentTopicsImpl) GetTopicBacklogQuota

func (p *PersistentTopicsImpl) GetTopicBacklogQuota(tenant, namespace, topic string) (*BacklogQuotaResp, error)

GetTopicBacklogQuota Get backlog quota map on a topic.

func (*PersistentTopicsImpl) GetTopicCompactionStatus

func (p *PersistentTopicsImpl) GetTopicCompactionStatus(tenant, namespace, topic string) (*LongRunningProcessStatus, error)

func (*PersistentTopicsImpl) GetTopicCompactionThreshold

func (p *PersistentTopicsImpl) GetTopicCompactionThreshold(tenant, namespace, topic string) (int64, error)

func (*PersistentTopicsImpl) GetTopicMessageTTL

func (p *PersistentTopicsImpl) GetTopicMessageTTL(tenant, namespace, topic string) (int64, error)

func (*PersistentTopicsImpl) GetTopicRetention

func (p *PersistentTopicsImpl) GetTopicRetention(tenant, namespace, topic string) (*RetentionConfiguration, error)

func (*PersistentTopicsImpl) ListNamespaceTopics

func (p *PersistentTopicsImpl) ListNamespaceTopics(tenant, namespace string) ([]string, error)

ListNamespaceTopics Get the list of topics under a namespace.

func (*PersistentTopicsImpl) ListNonPartitioned

func (p *PersistentTopicsImpl) ListNonPartitioned(tenant, namespace string) ([]string, error)

func (*PersistentTopicsImpl) ListPartitioned

func (p *PersistentTopicsImpl) ListPartitioned(tenant, namespace string) ([]string, error)

ListPartitioned Get the list of partitioned topics under a namespace.

func (*PersistentTopicsImpl) RemoveTopicBacklogQuota

func (p *PersistentTopicsImpl) RemoveTopicBacklogQuota(tenant, namespace, topic string, opts ...Option) error

RemoveTopicBacklogQuota Remove a backlog quota policy from a topic.

func (*PersistentTopicsImpl) RemoveTopicCompactionThreshold

func (p *PersistentTopicsImpl) RemoveTopicCompactionThreshold(tenant, namespace, topic string) error

func (*PersistentTopicsImpl) RemoveTopicMessageTTL

func (p *PersistentTopicsImpl) RemoveTopicMessageTTL(tenant, namespace, topic string) error

func (*PersistentTopicsImpl) RemoveTopicRetention

func (p *PersistentTopicsImpl) RemoveTopicRetention(tenant, namespace, topic string) error

func (*PersistentTopicsImpl) SetTopicBacklogQuota

func (p *PersistentTopicsImpl) SetTopicBacklogQuota(tenant, namespace, topic string, cfg *BacklogQuota) error

SetTopicBacklogQuota Set a backlog quota for a topic.

func (*PersistentTopicsImpl) SetTopicCompactionThreshold

func (p *PersistentTopicsImpl) SetTopicCompactionThreshold(tenant, namespace, topic string, threshold int64) error

func (*PersistentTopicsImpl) SetTopicMessageTTL

func (p *PersistentTopicsImpl) SetTopicMessageTTL(tenant, namespace, topic string, seconds int64) error

func (*PersistentTopicsImpl) SetTopicRetention

func (p *PersistentTopicsImpl) SetTopicRetention(tenant, namespace, topic string, cfg *RetentionConfiguration) error

func (*PersistentTopicsImpl) TriggerTopicCompaction

func (p *PersistentTopicsImpl) TriggerTopicCompaction(tenant, namespace, topic string) error

func (*PersistentTopicsImpl) WithOptions

func (p *PersistentTopicsImpl) WithOptions(opts ...Option) PersistentTopics

type PulsarAdmin

type PulsarAdmin struct {
	Clusters            Clusters
	Tenants             Tenants
	Namespaces          Namespaces
	PersistentTopics    PersistentTopics
	NonPersistentTopics NonPersistentTopics
	Bookies             Bookies
	Lookup              Lookup
}

func NewDefaultPulsarAdmin

func NewDefaultPulsarAdmin() (*PulsarAdmin, error)

func NewPulsarAdmin

func NewPulsarAdmin(config Config) (*PulsarAdmin, error)

func NewTestPulsarAdmin

func NewTestPulsarAdmin(t *testing.T, port int) *PulsarAdmin

type RawBookieInfo

type RawBookieInfo struct {
	BookieId string `json:"bookieId"`
}

type RetentionConfiguration

type RetentionConfiguration struct {
	RetentionSizeInMB      int64 `json:"retentionSizeInMB"`
	RetentionTimeInMinutes int64 `json:"retentionTimeInMinutes"`
}

RetentionConfiguration retention configuration

type RetentionPolicy

type RetentionPolicy string
const (
	ProducerRequestHold     RetentionPolicy = "producer_request_hold"
	ProducerException       RetentionPolicy = "producer_exception"
	ConsumerBacklogEviction RetentionPolicy = "consumer_backlog_eviction"
)

type SubscriptionStats

type SubscriptionStats struct {
	LastConsumedFlowTimestamp uint64 `json:"lastConsumedFlowTimestamp,omitempty"`
	MsgBacklog                int    `json:"msgBacklog"`
}

type TenantInfo

type TenantInfo struct {
	AdminRoles      []string `json:"adminRoles,omitempty"`
	AllowedClusters []string `json:"allowedClusters,omitempty"`
}

type Tenants

type Tenants interface {
	Create(tenantName string, info TenantInfo) error
	Delete(tenantName string) error
	List() ([]string, error)
}

type TenantsImpl

type TenantsImpl struct {
	// contains filtered or unexported fields
}

func (*TenantsImpl) Create

func (t *TenantsImpl) Create(tenantName string, info TenantInfo) error

func (*TenantsImpl) Delete

func (t *TenantsImpl) Delete(tenantName string) error

func (*TenantsImpl) List

func (t *TenantsImpl) List() ([]string, error)

type TestBroker

type TestBroker struct {
	// contains filtered or unexported fields
}

func (*TestBroker) Close

func (tb *TestBroker) Close() error

type TopicBacklog

type TopicBacklog interface {
	// GetTopicBacklogQuota Get backlog quota map on a topic.
	GetTopicBacklogQuota(tenant, namespace, topic string) (*BacklogQuotaResp, error)
	// SetTopicBacklogQuota Set a backlog quota for a topic.
	SetTopicBacklogQuota(tenant, namespace, topic string, cfg *BacklogQuota) error
	// RemoveTopicBacklogQuota Remove a backlog quota policy from a topic.
	RemoveTopicBacklogQuota(tenant, namespace, topic string, opts ...Option) error
	// EstimatedOfflineTopicBacklog Get estimated backlog for offline topic.
	EstimatedOfflineTopicBacklog(tenant, namespace, topic string) (*OfflineTopicStats, error)
	// CalculateBacklogSizeByMessageID Calculate backlog size by a message ID (in bytes).
	CalculateBacklogSizeByMessageID(tenant, namespace, topic string) error
}

type TopicCompaction

type TopicCompaction interface {
	// GetTopicCompactionThreshold Get compaction threshold configuration for specified topic.
	GetTopicCompactionThreshold(tenant, namespace, topic string) (int64, error)
	// SetTopicCompactionThreshold Set compaction threshold configuration for specified topic.
	SetTopicCompactionThreshold(tenant, namespace, topic string, threshold int64) error
	// RemoveTopicCompactionThreshold Remove compaction threshold configuration for specified topic.
	RemoveTopicCompactionThreshold(tenant, namespace, topic string) error
	// GetTopicCompactionStatus Get the status of a compaction operation for a topic.
	GetTopicCompactionStatus(tenant, namespace, topic string) (*LongRunningProcessStatus, error)
	// TriggerTopicCompaction Trigger a compaction operation on a topic.
	TriggerTopicCompaction(tenant, namespace, topic string) error
}

type TopicDomain

type TopicDomain string
const (
	TopicDomainPersistent    TopicDomain = "persistent"
	TopicDomainNonPersistent TopicDomain = "non-persistent"
)

type TopicInternalStats

type TopicInternalStats struct {
	EntriesAddedCounter                int64                  `json:"entriesAddedCounter"`
	NumberOfEntries                    int64                  `json:"numberOfEntries"`
	TotalSize                          int64                  `json:"totalSize"`
	CurrentLedgerEntries               int64                  `json:"currentLedgerEntries"`
	CurrentLedgerSize                  int64                  `json:"currentLedgerSize"`
	LastLedgerCreatedTimestamp         string                 `json:"lastLedgerCreatedTimestamp"`
	LastLedgerCreationFailureTimestamp string                 `json:"lastLedgerCreationFailureTimestamp"`
	WaitingCursorsCount                int32                  `json:"waitingCursorsCount"`
	PendingAddEntriesCount             int32                  `json:"pendingAddEntriesCount"`
	LastConfirmedEntry                 string                 `json:"lastConfirmedEntry"`
	State                              string                 `json:"state"`
	Ledgers                            []*LedgerInfo          `json:"ledgers"`
	Cursors                            map[string]CursorStats `json:"cursors"`
	SchemaLedgers                      []*LedgerInfo          `json:"schemaLedgers"`
	CompactedLedger                    *LedgerInfo            `json:"compactedLedger"`
}

type TopicMessageTTL

type TopicMessageTTL interface {
	// GetTopicMessageTTL Get the message TTL for the topic
	GetTopicMessageTTL(tenant, namespace, topic string) (int64, error)
	// SetTopicMessageTTL Set the message TTL for the topic
	SetTopicMessageTTL(tenant, namespace, topic string, seconds int64) error
	// RemoveTopicMessageTTL Remove the message TTL for the topic
	RemoveTopicMessageTTL(tenant, namespace, topic string) error
}

TopicMessageTTL Discard data after some time (by automatically acknowledging)

type TopicRetention

type TopicRetention interface {
	// GetTopicRetention get retention configuration for namespace.
	GetTopicRetention(tenant, namespace, topic string) (*RetentionConfiguration, error)
	// SetTopicRetention set retention configuration for namespace.
	SetTopicRetention(tenant, namespace, topic string, cfg *RetentionConfiguration) error
	// RemoveTopicRetention remove retention configuration for namespace.
	RemoveTopicRetention(tenant, namespace, topic string) error
}

TopicRetention operate interface about retention configuration for specified topic.

type TopicStatistics

type TopicStatistics struct {
	MsgRateIn                                        float64                      `json:"msgRateIn,omitempty"`
	MsgThroughputIn                                  float64                      `json:"msgThroughputIn,omitempty"`
	MsgRateOut                                       float64                      `json:"msgRateOut,omitempty"`
	MsgThroughputOut                                 float64                      `json:"msgThroughputOut,omitempty"`
	BytesInCounter                                   uint64                       `json:"bytesInCounter,omitempty"`
	MsgInCounter                                     uint64                       `json:"msgInCounter,omitempty"`
	BytesOutCounter                                  uint64                       `json:"bytesOutCounter,omitempty"`
	MsgOutCounter                                    uint64                       `json:"msgOutCounter,omitempty"`
	AverageMsgSize                                   float64                      `json:"averageMsgSize,omitempty"`
	MsgChunkPublished                                bool                         `json:"msgChunkPublished,omitempty"`
	StorageSize                                      uint64                       `json:"storageSize,omitempty"`
	BacklogSize                                      uint64                       `json:"backlogSize,omitempty"`
	PublishRateLimitedTimes                          uint64                       `json:"publishRateLimitedTimes,omitempty"`
	EarliestMsgPublishTimeInBacklogs                 uint64                       `json:"earliestMsgPublishTimeInBacklogs,omitempty"`
	OffloadedStorageSize                             uint64                       `json:"offloadedStorageSize,omitempty"`
	LastOffloadLedgerId                              uint64                       `json:"lastOffloadLedgerId,omitempty"`
	LastOffloadSuccessTimeStamp                      uint64                       `json:"lastOffloadSuccessTimeStamp,omitempty"`
	LastOffloadFailureTimeStamp                      uint64                       `json:"lastOffloadFailureTimeStamp,omitempty"`
	OngoingTxnCount                                  uint64                       `json:"ongoingTxnCount,omitempty"`
	AbortedTxnCount                                  uint64                       `json:"abortedTxnCount,omitempty"`
	CommittedTxnCount                                uint64                       `json:"committedTxnCount,omitempty"`
	Publishers                                       []string                     `json:"publishers,omitempty"`
	WaitingPublishers                                uint64                       `json:"waitingPublishers,omitempty"`
	Subscriptions                                    map[string]SubscriptionStats `json:"subscriptions,omitempty"`
	Replication                                      map[string]string            `json:"replication,omitempty"`
	DeduplicationStatus                              string                       `json:"deduplicationStatus,omitempty"`
	NonContiguousDeletedMessagesRanges               uint64                       `json:"nonContiguousDeletedMessagesRanges,omitempty"`
	NonContiguousDeletedMessagesRangesSerializedSize uint64                       `json:"nonContiguousDeletedMessagesRangesSerializedSize,omitempty"`
	DelayedMessageIndexSizeInBytes                   uint64                       `json:"delayedMessageIndexSizeInBytes,omitempty"`
	Compaction                                       Compaction                   `json:"compaction"`
	OwnerBroker                                      string                       `json:"ownerBroker,omitempty"`
}

type TopicStats

type TopicStats interface {
	GetStats(string, string, string) (*TopicStatistics, error)
	GetPartitionedStats(string, string, string) (*TopicStatistics, error)
	GetStatsInternal(string, string, string) (*TopicInternalStats, error)
	GetPartitionedStatsInternal(string, string, string) (*PartitionedTopicInternalStats, error)
}

type Topics

type Topics interface {
	CreateNonPartitioned(tenant, namespace, topic string) error
	CreatePartitioned(tenant, namespace, topic string, numPartitions int) error
	DeleteNonPartitioned(tenant, namespace, topic string) error
	DeletePartitioned(tenant, namespace, topic string) error
	ListNonPartitioned(tenant, namespace string) ([]string, error)
	ListPartitioned(tenant, namespace string) ([]string, error)
	ListNamespaceTopics(tenant, namespace string) ([]string, error)
	GetPartitionedMetadata(tenant, namespace, topic string) (*PartitionedMetadata, error)
	CreateMissedPartitions(tenant, namespace, topic string) error
	GetLastMessageId(tenant, namespace, topic string) (*MessageId, error)
	TopicRetention
}

Topics persistent topic and non-persistent topic interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL