Documentation ¶
Index ¶
- Constants
- func EasyReader(resp *http.Response, ptr interface{}) error
- func HttpCheck(response *http.Response) error
- func HttpCheckReadBytes(response *http.Response) ([]byte, error)
- func RandBytes(n int64) []byte
- func RandStr(n int64) string
- func ReadAll(r io.Reader) (string, error)
- func StatusNok(code int) bool
- func StatusOk(code int) bool
- type AllBookiesResp
- type BacklogQuota
- type BacklogQuotaResp
- type BacklogQuotaType
- type BookieInfo
- type Bookies
- type BookiesImpl
- func (b *BookiesImpl) AllBookies() (*AllBookiesResp, error)
- func (b *BookiesImpl) GetRacksInfo(bookie string) (*BookieInfo, error)
- func (b *BookiesImpl) ListRacksInfo() (*ListRacksInfoResp, error)
- func (b *BookiesImpl) RemoveRacksInfo(bookie string) error
- func (b *BookiesImpl) UpdateRacksInfo(bookie string, info *BookieInfo) error
- type Clusters
- type ClustersImpl
- type Compaction
- type CompactionStatus
- type Config
- type CursorDetail
- type CursorStats
- type HttpClient
- type HttpClientImpl
- func (h *HttpClientImpl) Delete(path string) (*http.Response, error)
- func (h *HttpClientImpl) Do(req *http.Request) (*http.Response, error)
- func (h *HttpClientImpl) Get(path string) (*http.Response, error)
- func (h *HttpClientImpl) Post(path string, body any) (*http.Response, error)
- func (h *HttpClientImpl) Put(path string, body any) (*http.Response, error)
- type LedgerDetail
- type LedgerInfo
- type ListRacksInfoResp
- type LongRunningProcessStatus
- type Lookup
- type LookupData
- type LookupImpl
- type MessageId
- type NamespaceBacklog
- type NamespaceCompaction
- type NamespaceMessageTTL
- type NamespaceRetention
- type Namespaces
- type NamespacesImpl
- func (n *NamespacesImpl) ClearNamespaceAllTopicsBacklog(tenant, namespace string) error
- func (n *NamespacesImpl) ClearNamespaceAllTopicsBacklogForBundle(tenant, namespace, bundle string) error
- func (n *NamespacesImpl) ClearNamespaceSubscriptionBacklog(tenant, namespace, subscription string) error
- func (n *NamespacesImpl) ClearNamespaceSubscriptionBacklogForBundle(tenant, namespace, subscription, bundle string) error
- func (n *NamespacesImpl) Create(tenant, namespace string) error
- func (n *NamespacesImpl) Delete(tenant, namespace string) error
- func (n *NamespacesImpl) GetMaximumUnCompactedBytes(tenant, namespace string) (int64, error)
- func (n *NamespacesImpl) GetNamespaceBacklogQuota(tenant, namespace string) (*BacklogQuotaResp, error)
- func (n *NamespacesImpl) GetNamespaceMessageTTL(tenant, namespace string) (int64, error)
- func (n *NamespacesImpl) GetNamespaceRetention(tenant, namespace string) (*RetentionConfiguration, error)
- func (n *NamespacesImpl) List(tenant string) ([]string, error)
- func (n *NamespacesImpl) RemoveMaximumUnCompactedBytes(tenant, namespace string) error
- func (n *NamespacesImpl) RemoveNamespaceBacklogQuota(tenant, namespace string, opts ...Option) error
- func (n *NamespacesImpl) RemoveNamespaceMessageTTL(tenant, namespace string) error
- func (n *NamespacesImpl) RemoveNamespaceRetention(tenant, namespace string) error
- func (n *NamespacesImpl) SetMaximumUnCompactedBytes(tenant, namespace string, threshold int64) error
- func (n *NamespacesImpl) SetNamespaceBacklogQuota(tenant, namespace string, cfg *BacklogQuota) error
- func (n *NamespacesImpl) SetNamespaceMessageTTL(tenant, namespace string, seconds int64) error
- func (n *NamespacesImpl) SetNamespaceRetention(tenant, namespace string, cfg *RetentionConfiguration) error
- func (n *NamespacesImpl) WithOptions(opts ...Option) Namespaces
- type NonPersistentTopics
- type NonPersistentTopicsImpl
- func (n *NonPersistentTopicsImpl) CalculateBacklogSizeByMessageID(tenant, namespace, topic string) error
- func (n *NonPersistentTopicsImpl) CreateMissedPartitions(tenant string, namespace string, topic string) error
- func (n *NonPersistentTopicsImpl) CreateNonPartitioned(tenant, namespace, topic string) error
- func (n *NonPersistentTopicsImpl) CreatePartitioned(tenant, namespace, topic string, numPartitions int) error
- func (n *NonPersistentTopicsImpl) DeleteNonPartitioned(tenant, namespace, topic string) error
- func (n *NonPersistentTopicsImpl) DeletePartitioned(tenant, namespace, topic string) error
- func (n *NonPersistentTopicsImpl) EstimatedOfflineTopicBacklog(tenant, namespace, topic string) (*OfflineTopicStats, error)
- func (n *NonPersistentTopicsImpl) GetLastMessageId(tenant string, namespace string, topic string) (*MessageId, error)
- func (n *NonPersistentTopicsImpl) GetPartitionedMetadata(tenant, namespace, topic string) (*PartitionedMetadata, error)
- func (n *NonPersistentTopicsImpl) GetPartitionedStats(tenant string, namespace string, topic string) (*TopicStatistics, error)
- func (n *NonPersistentTopicsImpl) GetPartitionedStatsInternal(tenant string, namespace string, topic string) (*PartitionedTopicInternalStats, error)
- func (n *NonPersistentTopicsImpl) GetStats(tenant string, namespace string, topic string) (*TopicStatistics, error)
- func (n *NonPersistentTopicsImpl) GetStatsInternal(tenant string, namespace string, topic string) (*TopicInternalStats, error)
- func (n *NonPersistentTopicsImpl) GetTopicBacklogQuota(tenant, namespace, topic string) (*BacklogQuotaResp, error)
- func (n *NonPersistentTopicsImpl) GetTopicCompactionStatus(tenant, namespace, topic string) (*LongRunningProcessStatus, error)
- func (n *NonPersistentTopicsImpl) GetTopicCompactionThreshold(tenant, namespace, topic string) (int64, error)
- func (n *NonPersistentTopicsImpl) GetTopicMessageTTL(tenant, namespace, topic string) (int64, error)
- func (n *NonPersistentTopicsImpl) GetTopicRetention(tenant, namespace, topic string) (*RetentionConfiguration, error)
- func (n *NonPersistentTopicsImpl) ListNamespaceTopics(tenant, namespace string) ([]string, error)
- func (n *NonPersistentTopicsImpl) ListNonPartitioned(tenant, namespace string) ([]string, error)
- func (n *NonPersistentTopicsImpl) ListPartitioned(tenant, namespace string) ([]string, error)
- func (n *NonPersistentTopicsImpl) RemoveTopicBacklogQuota(tenant, namespace, topic string, opts ...Option) error
- func (n *NonPersistentTopicsImpl) RemoveTopicCompactionThreshold(tenant, namespace, topic string) error
- func (n *NonPersistentTopicsImpl) RemoveTopicMessageTTL(tenant, namespace, topic string) error
- func (n *NonPersistentTopicsImpl) RemoveTopicRetention(tenant, namespace, topic string) error
- func (n *NonPersistentTopicsImpl) SetTopicBacklogQuota(tenant, namespace, topic string, cfg *BacklogQuota) error
- func (n *NonPersistentTopicsImpl) SetTopicCompactionThreshold(tenant, namespace, topic string, threshold int64) error
- func (n *NonPersistentTopicsImpl) SetTopicMessageTTL(tenant, namespace, topic string, seconds int64) error
- func (n *NonPersistentTopicsImpl) SetTopicRetention(tenant, namespace, topic string, cfg *RetentionConfiguration) error
- func (n *NonPersistentTopicsImpl) TriggerTopicCompaction(tenant, namespace, topic string) error
- func (n *NonPersistentTopicsImpl) WithOptions(opts ...Option) NonPersistentTopics
- type OfflineTopicStats
- type Option
- type PartitionedMetadata
- type PartitionedTopicInternalStats
- type PartitionedTopicMetadata
- type PersistentTopics
- type PersistentTopicsImpl
- func (p *PersistentTopicsImpl) CalculateBacklogSizeByMessageID(tenant, namespace, topic string) error
- func (p *PersistentTopicsImpl) CreateMissedPartitions(tenant string, namespace string, topic string) error
- func (p *PersistentTopicsImpl) CreateNonPartitioned(tenant, namespace, topic string) error
- func (p *PersistentTopicsImpl) CreatePartitioned(tenant, namespace, topic string, numPartitions int) error
- func (p *PersistentTopicsImpl) DeleteNonPartitioned(tenant, namespace, topic string) error
- func (p *PersistentTopicsImpl) DeletePartitioned(tenant, namespace, topic string) error
- func (p *PersistentTopicsImpl) EstimatedOfflineTopicBacklog(tenant, namespace, topic string) (*OfflineTopicStats, error)
- func (p *PersistentTopicsImpl) GetLastMessageId(tenant string, namespace string, topic string) (*MessageId, error)
- func (p *PersistentTopicsImpl) GetPartitionedMetadata(tenant, namespace, topic string) (*PartitionedMetadata, error)
- func (p *PersistentTopicsImpl) GetPartitionedStats(tenant, namespace, topic string) (*TopicStatistics, error)
- func (p *PersistentTopicsImpl) GetPartitionedStatsInternal(tenant, namespace, topic string) (*PartitionedTopicInternalStats, error)
- func (p *PersistentTopicsImpl) GetStats(tenant, namespace, topic string) (*TopicStatistics, error)
- func (p *PersistentTopicsImpl) GetStatsInternal(tenant, namespace, topic string) (*TopicInternalStats, error)
- func (p *PersistentTopicsImpl) GetTopicBacklogQuota(tenant, namespace, topic string) (*BacklogQuotaResp, error)
- func (p *PersistentTopicsImpl) GetTopicCompactionStatus(tenant, namespace, topic string) (*LongRunningProcessStatus, error)
- func (p *PersistentTopicsImpl) GetTopicCompactionThreshold(tenant, namespace, topic string) (int64, error)
- func (p *PersistentTopicsImpl) GetTopicMessageTTL(tenant, namespace, topic string) (int64, error)
- func (p *PersistentTopicsImpl) GetTopicRetention(tenant, namespace, topic string) (*RetentionConfiguration, error)
- func (p *PersistentTopicsImpl) ListNamespaceTopics(tenant, namespace string) ([]string, error)
- func (p *PersistentTopicsImpl) ListNonPartitioned(tenant, namespace string) ([]string, error)
- func (p *PersistentTopicsImpl) ListPartitioned(tenant, namespace string) ([]string, error)
- func (p *PersistentTopicsImpl) RemoveTopicBacklogQuota(tenant, namespace, topic string, opts ...Option) error
- func (p *PersistentTopicsImpl) RemoveTopicCompactionThreshold(tenant, namespace, topic string) error
- func (p *PersistentTopicsImpl) RemoveTopicMessageTTL(tenant, namespace, topic string) error
- func (p *PersistentTopicsImpl) RemoveTopicRetention(tenant, namespace, topic string) error
- func (p *PersistentTopicsImpl) SetTopicBacklogQuota(tenant, namespace, topic string, cfg *BacklogQuota) error
- func (p *PersistentTopicsImpl) SetTopicCompactionThreshold(tenant, namespace, topic string, threshold int64) error
- func (p *PersistentTopicsImpl) SetTopicMessageTTL(tenant, namespace, topic string, seconds int64) error
- func (p *PersistentTopicsImpl) SetTopicRetention(tenant, namespace, topic string, cfg *RetentionConfiguration) error
- func (p *PersistentTopicsImpl) TriggerTopicCompaction(tenant, namespace, topic string) error
- func (p *PersistentTopicsImpl) WithOptions(opts ...Option) PersistentTopics
- type PulsarAdmin
- type RawBookieInfo
- type RetentionConfiguration
- type RetentionPolicy
- type SubscriptionStats
- type TenantInfo
- type Tenants
- type TenantsImpl
- type TestBroker
- type TopicBacklog
- type TopicCompaction
- type TopicDomain
- type TopicInternalStats
- type TopicMessageTTL
- type TopicRetention
- type TopicStatistics
- type TopicStats
- type Topics
Constants ¶
const ( // UrlPath for the Admin API UrlPath = "/admin/v2" UrlClusters = UrlPath + "/clusters" UrlTenants = UrlPath + "/tenants" UrlNamespacesFormat = UrlPath + "/namespaces/%s/%s" )
const ( UrlBookiesAll = UrlPath + "/bookies/all" UrlBookiesRacksInfo = UrlPath + "/bookies/racks-info" UrlBookiesRacksFormat = UrlPath + "/bookies/racks-info/%s" )
bookies
const ( UrlLookupBrokerFormat = "/lookup/v2/topic/%s/%s/%s/%s" UrlLookupGetNamespaceBundleFormat = "/lookup/v2/topic/%s/%s/%s/%s/bundle" )
lookup
const ( UrlNamespaceRetentionFormat = UrlPath + "/namespaces/%s/%s/retention" UrlNamespaceGetBacklogQuotaMapFormat = UrlPath + "/namespaces/%s/%s/backlogQuotaMap" UrlNamespaceOperateBacklogQuotaFormat = UrlPath + "/namespaces/%s/%s/backlogQuota" UrlNamespaceClearAllTopicsBacklogFormat = UrlPath + "/namespaces/%s/%s/clearBacklog" UrlNamespaceClearSubscriptionBacklogFormat = UrlPath + "/namespaces/%s/%s/clearBacklog/%s" UrlNamespaceClearAllTopicsBacklogForBundleFormat = UrlPath + "/namespaces/%s/%s/%s/clearBacklog" UrlNamespaceClearSubscriptionBacklogForBundleFormat = UrlPath + "/namespaces/%s/%s/%s/clearBacklog/%s" UrlNamespaceCompactionThresholdFormat = UrlPath + "/namespaces/%s/%s/compactionThreshold" UrlNamespaceMessageTTLFormat = UrlPath + "/namespaces/%s/%s/messageTTL" )
namespace
const ( UrlPersistentNamespaceFormat = UrlPath + "/persistent/%s/%s" UrlPersistentTopicFormat = UrlPath + "/persistent/%s/%s/%s" UrlPersistentPartitionedNamespaceFormat = UrlPath + "/persistent/%s/%s/partitioned" UrlPersistentPartitionedTopicFormat = UrlPath + "/persistent/%s/%s/%s/partitions" UrlPersistentPartitionedRetentionFormat = UrlPath + "/persistent/%s/%s/%s/retention" UrlPersistentTopicGetBacklogQuotaMapFormat = UrlPath + "/persistent/%s/%s/%s/backlogQuotaMap" UrlPersistentTopicOperateBacklogQuotaFormat = UrlPath + "/persistent/%s/%s/%s/backlogQuota" UrlPersistentTopicEstimatedOfflineBacklogFormat = UrlPath + "/persistent/%s%s%s/backlog" UrlPersistentTopicCalculateBacklogSizeByMessageIDFormat = UrlPath + "/persistent/%s/%s/%s/backlogSize" UrlPersistentTopicCompactionThresholdFormat = UrlPath + "/persistent/%s/%s/%s/compactionThreshold" UrlPersistentTopicCompactionFormat = UrlPath + "/persistent/%s/%s/%s/compaction" UrlPersistentTopicMessageTTLFormat = UrlPath + "/persistent/%s/%s/%s/messageTTL" UrlPersistentTopicCreateMissedPartitionsFormat = UrlPath + "/persistent/%s/%s/%s/createMissedPartitions" UrlPersistentGetLastMessageIdFormat = UrlPath + "/persistent/%s/%s/%s/lastMessageId" UrlPersistentGetInternalStatsForTopicFormat = UrlPath + "/persistent/%s/%s/%s/internalStats" UrlPersistentGetInternalStatsForPartitionedTopicFormat = UrlPath + "/persistent/%s/%s/%s/partitioned-internalStats" UrlPersistentGetStatsForTopicFormat = UrlPath + "/persistent/%s/%s/%s/stats" UrlPersistentGetStatsForPartitionedTopicFormat = UrlPath + "/persistent/%s/%s/%s/partitioned-stats" )
persistent
const ( UrlNonPersistentNamespaceFormat = UrlPath + "/non-persistent/%s/%s" UrlNonPersistentTopicFormat = UrlPath + "/non-persistent/%s/%s/%s" UrlNonPersistentPartitionedTopicFormat = UrlPath + "/non-persistent/%s/%s/%s/partitions" UrlNonPersistentPartitionedNamespaceFormat = UrlPath + "/non-persistent/%s/%s/partitioned" UrlNonPersistentPartitionedRetentionFormat = UrlPath + "/non-persistent/%s/%s/%s/retention" UrlNonPersistentTopicGetBacklogQuotaMapFormat = UrlPath + "/non-persistent/%s/%s/%s/backlogQuotaMap" UrlNonPersistentTopicOperateBacklogQuotaFormat = UrlPath + "/non-persistent/%s/%s/%s/backlogQuota" UrlNonPersistentTopicEstimatedOfflineBacklogFormat = UrlPath + "/non-persistent/%s%s%s/backlog" UrlNonPersistentTopicCalculateBacklogSizeByMessageIDFormat = UrlPath + "/non-persistent/%s/%s/%s/backlogSize" UrlNonPersistentTopicCompactionThresholdFormat = UrlPath + "/non-persistent/%s/%s/%s/compactionThreshold" UrlNonPersistentTopicCompactionFormat = UrlPath + "/non-persistent/%s/%s/%s/compaction" UrlNonPersistentTopicMessageTTLFormat = UrlPath + "/non-persistent/%s/%s/%s/messageTTL" UrlNonPersistentTopicsCreateMissedPartitionsFormat = UrlPath + "/non-persistent/%s/%s/%s/createMissedPartitions" UrlNonPersistentGetLastMessageIdFormat = UrlPath + "/non-persistent/%s/%s/%s/lastMessageId" UrlNonPersistentGetInternalStatsForTopicFormat = UrlPath + "/non-persistent/%s/%s/%s/internalStats" UrlNonPersistentGetInternalStatsForPartitionedTopicFormat = UrlPath + "/non-persistent/%s/%s/%s/partitioned-internalStats" UrlNonPersistentGetStatsForTopicFormat = UrlPath + "/non-persistent/%s/%s/%s/stats" UrlNonPersistentGetStatsForPartitionedTopicFormat = UrlPath + "/non-persistent/%s/%s/%s/partitioned-stats" )
non-persistent
Variables ¶
This section is empty.
Functions ¶
func EasyReader ¶
Types ¶
type AllBookiesResp ¶
type AllBookiesResp struct {
Bookies []RawBookieInfo `json:"bookies"`
}
type BacklogQuota ¶
type BacklogQuota struct { Limit int64 LimitSize int64 LimitTime int64 Policy RetentionPolicy }
type BacklogQuotaResp ¶
type BacklogQuotaResp struct { DestinationStorage BacklogQuota `json:"destination_storage,omitempty"` MessageAge BacklogQuota `json:"message_age,omitempty"` }
type BacklogQuotaType ¶
type BacklogQuotaType string
const ( DestinationStorage BacklogQuotaType = "destination_storage" MessageAge BacklogQuotaType = "message_age" )
type BookieInfo ¶
type Bookies ¶
type Bookies interface { // AllBookies Gets raw information for all the bookies in the cluster AllBookies() (*AllBookiesResp, error) // ListRacksInfo Gets the rack placement information for all the bookies in the cluster ListRacksInfo() (*ListRacksInfoResp, error) // RemoveRacksInfo Removed the rack placement information for a specific bookie in the cluster RemoveRacksInfo(string) error // GetRacksInfo Gets the rack placement information for a specific bookie in the cluster GetRacksInfo(string) (*BookieInfo, error) // UpdateRacksInfo Updates the rack placement information for a specific bookie in the cluster // (note. bookie address format:`address:port`) UpdateRacksInfo(string, *BookieInfo) error }
type BookiesImpl ¶
type BookiesImpl struct {
// contains filtered or unexported fields
}
func (*BookiesImpl) AllBookies ¶
func (b *BookiesImpl) AllBookies() (*AllBookiesResp, error)
func (*BookiesImpl) GetRacksInfo ¶
func (b *BookiesImpl) GetRacksInfo(bookie string) (*BookieInfo, error)
func (*BookiesImpl) ListRacksInfo ¶
func (b *BookiesImpl) ListRacksInfo() (*ListRacksInfoResp, error)
func (*BookiesImpl) RemoveRacksInfo ¶
func (b *BookiesImpl) RemoveRacksInfo(bookie string) error
RemoveRacksInfo bookie is address:port
func (*BookiesImpl) UpdateRacksInfo ¶
func (b *BookiesImpl) UpdateRacksInfo(bookie string, info *BookieInfo) error
type ClustersImpl ¶
type ClustersImpl struct {
// contains filtered or unexported fields
}
func (*ClustersImpl) List ¶
func (c *ClustersImpl) List() ([]string, error)
type Compaction ¶
type Compaction struct { LastCompactionRemovedEventCount uint64 `json:"lastCompactionRemovedEventCount,omitempty"` LastCompactionSucceedTimestamp uint64 `json:"lastCompactionSucceedTimestamp,omitempty"` LastCompactionFailedTimestamp uint64 `json:"lastCompactionFailedTimestamp,omitempty"` LastCompactionDurationTimeInMills uint64 `json:"lastCompactionDurationTimeInMills,omitempty"` }
type CompactionStatus ¶
type CompactionStatus string
const ( NotRun CompactionStatus = "NOT_RUN" Running CompactionStatus = "RUNNING" Success CompactionStatus = "SUCCESS" ERROR CompactionStatus = "ERROR" )
type Config ¶
type Config struct { // Host pulsar service address, default localhost Host string // Port pulsar service port, default 8080 Port int // TlsEnable enable tls, default false TlsEnable bool // TlsConfig tls config TlsConfig *tls.Config // ConnectionTimeout connect timeout, default 0, zero means no timeout ConnectionTimeout int64 // contains filtered or unexported fields }
type CursorDetail ¶
type CursorStats ¶
type CursorStats struct { MarkDeletePosition string `json:"markDeletePosition"` ReadPosition string `json:"readPosition"` WaitingReadOp bool `json:"waitingReadOp"` PendingReadOps int `json:"pendingReadOps"` MessagesConsumedCounter int64 `json:"messagesConsumedCounter"` CursorLedger int64 `json:"cursorLedger"` CursorLedgerLastEntry int64 `json:"cursorLedgerLastEntry"` IndividuallyDeletedMessages string `json:"individuallyDeletedMessages"` LastLedgerSwitchTimestamp string `json:"lastLedgerSwitchTimestamp"` State string `json:"state"` Active bool `json:"active"` NumberOfEntriesSinceFirstNotAckedMessage int64 `json:"numberOfEntriesSinceFirstNotAckedMessage"` TotalNonContiguousDeletedMessagesRange int `json:"totalNonContiguousDeletedMessagesRange"` SubscriptionHavePendingRead bool `json:"subscriptionHavePendingRead"` SubscriptionHavePendingReplayRead bool `json:"subscriptionHavePendingReplayRead"` Properties map[string]int64 `json:"properties"` }
type HttpClient ¶
type HttpClientImpl ¶
type HttpClientImpl struct {
// contains filtered or unexported fields
}
func (*HttpClientImpl) Delete ¶
func (h *HttpClientImpl) Delete(path string) (*http.Response, error)
type LedgerDetail ¶
type LedgerInfo ¶
type ListRacksInfoResp ¶
type ListRacksInfoResp map[string]map[string]BookieInfo
type LongRunningProcessStatus ¶
type LongRunningProcessStatus struct { Status CompactionStatus `json:"status"` LastError string `json:"lastError"` }
type Lookup ¶
type Lookup interface { // GetOwner get the owner broker of the given topic. // topicDomain: ref TopicDomain type, value persistent or non-persistent GetOwner(topicDomain TopicDomain, tenant, namespace, topic string) (*LookupData, error) // GetNamespaceBundle get the namespace bundle which the given topic belongs to. GetNamespaceBundle(topicDomain TopicDomain, tenant, namespace, topic string) (string, error) }
type LookupData ¶
type LookupImpl ¶
type LookupImpl struct {
// contains filtered or unexported fields
}
func (*LookupImpl) GetNamespaceBundle ¶
func (l *LookupImpl) GetNamespaceBundle(topicDomain TopicDomain, tenant, namespace, topic string) (string, error)
func (*LookupImpl) GetOwner ¶
func (l *LookupImpl) GetOwner(topicDomain TopicDomain, tenant, namespace, topic string) (*LookupData, error)
type NamespaceBacklog ¶
type NamespaceBacklog interface { // GetNamespaceBacklogQuota Get backlog quota map on a namespace. GetNamespaceBacklogQuota(tenant, namespace string) (*BacklogQuotaResp, error) // SetNamespaceBacklogQuota Set a backlog quota for all the topics on a namespace. SetNamespaceBacklogQuota(tenant, namespace string, cfg *BacklogQuota) error // RemoveNamespaceBacklogQuota Remove a backlog quota policy from a namespace. RemoveNamespaceBacklogQuota(tenant, namespace string, opts ...Option) error //ClearNamespaceAllTopicsBacklog Clear backlog for all topics on a namespace. ClearNamespaceAllTopicsBacklog(tenant, namespace string) error // ClearNamespaceSubscriptionBacklog Clear backlog for a given subscription on all topics on a namespace. ClearNamespaceSubscriptionBacklog(tenant, namespace, subscription string) error // ClearNamespaceAllTopicsBacklogForBundle Clear backlog for all topics on a namespace bundle. ClearNamespaceAllTopicsBacklogForBundle(tenant, namespace, bundle string) error // ClearNamespaceSubscriptionBacklogForBundle Clear backlog for a given subscription on all topics on a namespace bundle. ClearNamespaceSubscriptionBacklogForBundle(tenant, namespace, subscription, bundle string) error }
type NamespaceCompaction ¶
type NamespaceCompaction interface { // GetMaximumUnCompactedBytes Delete maximum number of uncompacted bytes in a topic before compaction is triggered. GetMaximumUnCompactedBytes(tenant, namespace string) (int64, error) // SetMaximumUnCompactedBytes Set maximum number of uncompacted bytes in a topic before compaction is triggered. SetMaximumUnCompactedBytes(tenant, namespace string, threshold int64) error // RemoveMaximumUnCompactedBytes Delete maximum number of uncompacted bytes in a topic before compaction is triggered. RemoveMaximumUnCompactedBytes(tenant, namespace string) error }
type NamespaceMessageTTL ¶
type NamespaceMessageTTL interface { // GetNamespaceMessageTTL Get the message TTL for the namespace GetNamespaceMessageTTL(tenant, namespace string) (int64, error) // SetNamespaceMessageTTL Set the message TTL for the namespace SetNamespaceMessageTTL(tenant, namespace string, seconds int64) error // RemoveNamespaceMessageTTL Remove the message TTL for the namespace RemoveNamespaceMessageTTL(tenant, namespace string) error }
NamespaceMessageTTL Discard data after some time (by automatically acknowledging)
type NamespaceRetention ¶
type NamespaceRetention interface { // GetNamespaceRetention get retention configuration for namespace. GetNamespaceRetention(tenant, namespace string) (*RetentionConfiguration, error) // SetNamespaceRetention set retention configuration for namespace. SetNamespaceRetention(tenant, namespace string, cfg *RetentionConfiguration) error // RemoveNamespaceRetention remove retention configuration for namespace. RemoveNamespaceRetention(tenant, namespace string) error }
NamespaceRetention operate interface about retention configuration for namespace.
type Namespaces ¶
type Namespaces interface { Create(tenant, namespace string) error Delete(tenant, namespace string) error List(tenant string) ([]string, error) NamespaceRetention NamespaceBacklog NamespaceMessageTTL NamespaceCompaction }
type NamespacesImpl ¶
type NamespacesImpl struct {
// contains filtered or unexported fields
}
func (*NamespacesImpl) ClearNamespaceAllTopicsBacklog ¶
func (n *NamespacesImpl) ClearNamespaceAllTopicsBacklog(tenant, namespace string) error
ClearNamespaceAllTopicsBacklog Clear backlog for all topics on a namespace. required params: tenant, namespace
func (*NamespacesImpl) ClearNamespaceAllTopicsBacklogForBundle ¶
func (n *NamespacesImpl) ClearNamespaceAllTopicsBacklogForBundle(tenant, namespace, bundle string) error
ClearNamespaceAllTopicsBacklogForBundle Clear backlog for all topics on a namespace bundle. required params: tenant, namespace, bundleName
func (*NamespacesImpl) ClearNamespaceSubscriptionBacklog ¶
func (n *NamespacesImpl) ClearNamespaceSubscriptionBacklog(tenant, namespace, subscription string) error
ClearNamespaceSubscriptionBacklog Clear backlog for a given subscription on all topics on a namespace. required params: tenant, namespace, subscriptionName
func (*NamespacesImpl) ClearNamespaceSubscriptionBacklogForBundle ¶
func (n *NamespacesImpl) ClearNamespaceSubscriptionBacklogForBundle(tenant, namespace, subscription, bundle string) error
ClearNamespaceSubscriptionBacklogForBundle Clear backlog for a given subscription on all topics on a namespace bundle. required params: tenant, namespace, subscriptionName, bundleName
func (*NamespacesImpl) Create ¶
func (n *NamespacesImpl) Create(tenant, namespace string) error
func (*NamespacesImpl) Delete ¶
func (n *NamespacesImpl) Delete(tenant, namespace string) error
func (*NamespacesImpl) GetMaximumUnCompactedBytes ¶
func (n *NamespacesImpl) GetMaximumUnCompactedBytes(tenant, namespace string) (int64, error)
func (*NamespacesImpl) GetNamespaceBacklogQuota ¶
func (n *NamespacesImpl) GetNamespaceBacklogQuota(tenant, namespace string) (*BacklogQuotaResp, error)
GetNamespaceBacklogQuota Get backlog quota map on a namespace. required params: tenant, namespace
func (*NamespacesImpl) GetNamespaceMessageTTL ¶
func (n *NamespacesImpl) GetNamespaceMessageTTL(tenant, namespace string) (int64, error)
func (*NamespacesImpl) GetNamespaceRetention ¶
func (n *NamespacesImpl) GetNamespaceRetention(tenant, namespace string) (*RetentionConfiguration, error)
func (*NamespacesImpl) RemoveMaximumUnCompactedBytes ¶
func (n *NamespacesImpl) RemoveMaximumUnCompactedBytes(tenant, namespace string) error
func (*NamespacesImpl) RemoveNamespaceBacklogQuota ¶
func (n *NamespacesImpl) RemoveNamespaceBacklogQuota(tenant, namespace string, opts ...Option) error
RemoveNamespaceBacklogQuota Remove a backlog quota policy from a namespace. required params: tenant, namespace
func (*NamespacesImpl) RemoveNamespaceMessageTTL ¶
func (n *NamespacesImpl) RemoveNamespaceMessageTTL(tenant, namespace string) error
func (*NamespacesImpl) RemoveNamespaceRetention ¶
func (n *NamespacesImpl) RemoveNamespaceRetention(tenant, namespace string) error
func (*NamespacesImpl) SetMaximumUnCompactedBytes ¶
func (n *NamespacesImpl) SetMaximumUnCompactedBytes(tenant, namespace string, threshold int64) error
func (*NamespacesImpl) SetNamespaceBacklogQuota ¶
func (n *NamespacesImpl) SetNamespaceBacklogQuota(tenant, namespace string, cfg *BacklogQuota) error
SetNamespaceBacklogQuota Set a backlog quota for all the topics on a namespace. required params: tenant, namespace
func (*NamespacesImpl) SetNamespaceMessageTTL ¶
func (n *NamespacesImpl) SetNamespaceMessageTTL(tenant, namespace string, seconds int64) error
func (*NamespacesImpl) SetNamespaceRetention ¶
func (n *NamespacesImpl) SetNamespaceRetention(tenant, namespace string, cfg *RetentionConfiguration) error
func (*NamespacesImpl) WithOptions ¶
func (n *NamespacesImpl) WithOptions(opts ...Option) Namespaces
type NonPersistentTopics ¶
type NonPersistentTopics interface { WithOptions(opts ...Option) NonPersistentTopics Topics TopicBacklog TopicMessageTTL TopicCompaction TopicStats }
type NonPersistentTopicsImpl ¶
type NonPersistentTopicsImpl struct {
// contains filtered or unexported fields
}
func (*NonPersistentTopicsImpl) CalculateBacklogSizeByMessageID ¶
func (n *NonPersistentTopicsImpl) CalculateBacklogSizeByMessageID(tenant, namespace, topic string) error
CalculateBacklogSizeByMessageID Calculate backlog size by a message ID (in bytes).
func (*NonPersistentTopicsImpl) CreateMissedPartitions ¶
func (n *NonPersistentTopicsImpl) CreateMissedPartitions(tenant string, namespace string, topic string) error
func (*NonPersistentTopicsImpl) CreateNonPartitioned ¶
func (n *NonPersistentTopicsImpl) CreateNonPartitioned(tenant, namespace, topic string) error
func (*NonPersistentTopicsImpl) CreatePartitioned ¶
func (n *NonPersistentTopicsImpl) CreatePartitioned(tenant, namespace, topic string, numPartitions int) error
func (*NonPersistentTopicsImpl) DeleteNonPartitioned ¶
func (n *NonPersistentTopicsImpl) DeleteNonPartitioned(tenant, namespace, topic string) error
func (*NonPersistentTopicsImpl) DeletePartitioned ¶
func (n *NonPersistentTopicsImpl) DeletePartitioned(tenant, namespace, topic string) error
func (*NonPersistentTopicsImpl) EstimatedOfflineTopicBacklog ¶
func (n *NonPersistentTopicsImpl) EstimatedOfflineTopicBacklog(tenant, namespace, topic string) (*OfflineTopicStats, error)
EstimatedOfflineTopicBacklog Get estimated backlog for offline topic.
func (*NonPersistentTopicsImpl) GetLastMessageId ¶
func (*NonPersistentTopicsImpl) GetPartitionedMetadata ¶
func (n *NonPersistentTopicsImpl) GetPartitionedMetadata(tenant, namespace, topic string) (*PartitionedMetadata, error)
func (*NonPersistentTopicsImpl) GetPartitionedStats ¶
func (n *NonPersistentTopicsImpl) GetPartitionedStats(tenant string, namespace string, topic string) (*TopicStatistics, error)
func (*NonPersistentTopicsImpl) GetPartitionedStatsInternal ¶
func (n *NonPersistentTopicsImpl) GetPartitionedStatsInternal(tenant string, namespace string, topic string) (*PartitionedTopicInternalStats, error)
func (*NonPersistentTopicsImpl) GetStats ¶
func (n *NonPersistentTopicsImpl) GetStats(tenant string, namespace string, topic string) (*TopicStatistics, error)
func (*NonPersistentTopicsImpl) GetStatsInternal ¶
func (n *NonPersistentTopicsImpl) GetStatsInternal(tenant string, namespace string, topic string) (*TopicInternalStats, error)
func (*NonPersistentTopicsImpl) GetTopicBacklogQuota ¶
func (n *NonPersistentTopicsImpl) GetTopicBacklogQuota(tenant, namespace, topic string) (*BacklogQuotaResp, error)
GetTopicBacklogQuota Get backlog quota map on a topic.
func (*NonPersistentTopicsImpl) GetTopicCompactionStatus ¶
func (n *NonPersistentTopicsImpl) GetTopicCompactionStatus(tenant, namespace, topic string) (*LongRunningProcessStatus, error)
func (*NonPersistentTopicsImpl) GetTopicCompactionThreshold ¶
func (n *NonPersistentTopicsImpl) GetTopicCompactionThreshold(tenant, namespace, topic string) (int64, error)
func (*NonPersistentTopicsImpl) GetTopicMessageTTL ¶
func (n *NonPersistentTopicsImpl) GetTopicMessageTTL(tenant, namespace, topic string) (int64, error)
func (*NonPersistentTopicsImpl) GetTopicRetention ¶
func (n *NonPersistentTopicsImpl) GetTopicRetention(tenant, namespace, topic string) (*RetentionConfiguration, error)
func (*NonPersistentTopicsImpl) ListNamespaceTopics ¶
func (n *NonPersistentTopicsImpl) ListNamespaceTopics(tenant, namespace string) ([]string, error)
func (*NonPersistentTopicsImpl) ListNonPartitioned ¶
func (n *NonPersistentTopicsImpl) ListNonPartitioned(tenant, namespace string) ([]string, error)
func (*NonPersistentTopicsImpl) ListPartitioned ¶
func (n *NonPersistentTopicsImpl) ListPartitioned(tenant, namespace string) ([]string, error)
func (*NonPersistentTopicsImpl) RemoveTopicBacklogQuota ¶
func (n *NonPersistentTopicsImpl) RemoveTopicBacklogQuota(tenant, namespace, topic string, opts ...Option) error
RemoveTopicBacklogQuota Remove a backlog quota policy from a topic.
func (*NonPersistentTopicsImpl) RemoveTopicCompactionThreshold ¶
func (n *NonPersistentTopicsImpl) RemoveTopicCompactionThreshold(tenant, namespace, topic string) error
func (*NonPersistentTopicsImpl) RemoveTopicMessageTTL ¶
func (n *NonPersistentTopicsImpl) RemoveTopicMessageTTL(tenant, namespace, topic string) error
func (*NonPersistentTopicsImpl) RemoveTopicRetention ¶
func (n *NonPersistentTopicsImpl) RemoveTopicRetention(tenant, namespace, topic string) error
func (*NonPersistentTopicsImpl) SetTopicBacklogQuota ¶
func (n *NonPersistentTopicsImpl) SetTopicBacklogQuota(tenant, namespace, topic string, cfg *BacklogQuota) error
SetTopicBacklogQuota Set a backlog quota for a topic.
func (*NonPersistentTopicsImpl) SetTopicCompactionThreshold ¶
func (n *NonPersistentTopicsImpl) SetTopicCompactionThreshold(tenant, namespace, topic string, threshold int64) error
func (*NonPersistentTopicsImpl) SetTopicMessageTTL ¶
func (n *NonPersistentTopicsImpl) SetTopicMessageTTL(tenant, namespace, topic string, seconds int64) error
func (*NonPersistentTopicsImpl) SetTopicRetention ¶
func (n *NonPersistentTopicsImpl) SetTopicRetention(tenant, namespace, topic string, cfg *RetentionConfiguration) error
func (*NonPersistentTopicsImpl) TriggerTopicCompaction ¶
func (n *NonPersistentTopicsImpl) TriggerTopicCompaction(tenant, namespace, topic string) error
func (*NonPersistentTopicsImpl) WithOptions ¶
func (n *NonPersistentTopicsImpl) WithOptions(opts ...Option) NonPersistentTopics
type OfflineTopicStats ¶
type OfflineTopicStats struct { StorageSize int64 TotalMessages int64 MessageBacklog int64 BrokerName string TopicName string DataLedgerDetails []LedgerDetail CursorDetails map[string]CursorDetail StatGeneratedAt time.Time }
type Option ¶
type Option func(*options)
func WithBacklogQuotaType ¶
func WithBacklogQuotaType(backlogQuotaType BacklogQuotaType) Option
type PartitionedMetadata ¶
type PartitionedMetadata struct { Deleted bool `json:"deleted"` Partitions int64 `json:"partitions"` }
PartitionedMetadata partitioned topic metadata
type PartitionedTopicInternalStats ¶
type PartitionedTopicInternalStats struct { Metadata PartitionedMetadata `json:"metadata"` Partitions map[string]*TopicInternalStats `json:"partitions"` }
type PersistentTopics ¶
type PersistentTopics interface { WithOptions(opts ...Option) PersistentTopics Topics TopicBacklog TopicMessageTTL TopicCompaction TopicStats }
type PersistentTopicsImpl ¶
type PersistentTopicsImpl struct {
// contains filtered or unexported fields
}
func (*PersistentTopicsImpl) CalculateBacklogSizeByMessageID ¶
func (p *PersistentTopicsImpl) CalculateBacklogSizeByMessageID(tenant, namespace, topic string) error
CalculateBacklogSizeByMessageID Calculate backlog size by a message ID (in bytes).
func (*PersistentTopicsImpl) CreateMissedPartitions ¶
func (p *PersistentTopicsImpl) CreateMissedPartitions(tenant string, namespace string, topic string) error
func (*PersistentTopicsImpl) CreateNonPartitioned ¶
func (p *PersistentTopicsImpl) CreateNonPartitioned(tenant, namespace, topic string) error
func (*PersistentTopicsImpl) CreatePartitioned ¶
func (p *PersistentTopicsImpl) CreatePartitioned(tenant, namespace, topic string, numPartitions int) error
func (*PersistentTopicsImpl) DeleteNonPartitioned ¶
func (p *PersistentTopicsImpl) DeleteNonPartitioned(tenant, namespace, topic string) error
func (*PersistentTopicsImpl) DeletePartitioned ¶
func (p *PersistentTopicsImpl) DeletePartitioned(tenant, namespace, topic string) error
func (*PersistentTopicsImpl) EstimatedOfflineTopicBacklog ¶
func (p *PersistentTopicsImpl) EstimatedOfflineTopicBacklog(tenant, namespace, topic string) (*OfflineTopicStats, error)
EstimatedOfflineTopicBacklog Get estimated backlog for offline topic.
func (*PersistentTopicsImpl) GetLastMessageId ¶
func (*PersistentTopicsImpl) GetPartitionedMetadata ¶
func (p *PersistentTopicsImpl) GetPartitionedMetadata(tenant, namespace, topic string) (*PartitionedMetadata, error)
func (*PersistentTopicsImpl) GetPartitionedStats ¶
func (p *PersistentTopicsImpl) GetPartitionedStats(tenant, namespace, topic string) (*TopicStatistics, error)
func (*PersistentTopicsImpl) GetPartitionedStatsInternal ¶
func (p *PersistentTopicsImpl) GetPartitionedStatsInternal(tenant, namespace, topic string) (*PartitionedTopicInternalStats, error)
func (*PersistentTopicsImpl) GetStats ¶
func (p *PersistentTopicsImpl) GetStats(tenant, namespace, topic string) (*TopicStatistics, error)
func (*PersistentTopicsImpl) GetStatsInternal ¶
func (p *PersistentTopicsImpl) GetStatsInternal(tenant, namespace, topic string) (*TopicInternalStats, error)
func (*PersistentTopicsImpl) GetTopicBacklogQuota ¶
func (p *PersistentTopicsImpl) GetTopicBacklogQuota(tenant, namespace, topic string) (*BacklogQuotaResp, error)
GetTopicBacklogQuota Get backlog quota map on a topic.
func (*PersistentTopicsImpl) GetTopicCompactionStatus ¶
func (p *PersistentTopicsImpl) GetTopicCompactionStatus(tenant, namespace, topic string) (*LongRunningProcessStatus, error)
func (*PersistentTopicsImpl) GetTopicCompactionThreshold ¶
func (p *PersistentTopicsImpl) GetTopicCompactionThreshold(tenant, namespace, topic string) (int64, error)
func (*PersistentTopicsImpl) GetTopicMessageTTL ¶
func (p *PersistentTopicsImpl) GetTopicMessageTTL(tenant, namespace, topic string) (int64, error)
func (*PersistentTopicsImpl) GetTopicRetention ¶
func (p *PersistentTopicsImpl) GetTopicRetention(tenant, namespace, topic string) (*RetentionConfiguration, error)
func (*PersistentTopicsImpl) ListNamespaceTopics ¶
func (p *PersistentTopicsImpl) ListNamespaceTopics(tenant, namespace string) ([]string, error)
ListNamespaceTopics Get the list of topics under a namespace.
func (*PersistentTopicsImpl) ListNonPartitioned ¶
func (p *PersistentTopicsImpl) ListNonPartitioned(tenant, namespace string) ([]string, error)
func (*PersistentTopicsImpl) ListPartitioned ¶
func (p *PersistentTopicsImpl) ListPartitioned(tenant, namespace string) ([]string, error)
ListPartitioned Get the list of partitioned topics under a namespace.
func (*PersistentTopicsImpl) RemoveTopicBacklogQuota ¶
func (p *PersistentTopicsImpl) RemoveTopicBacklogQuota(tenant, namespace, topic string, opts ...Option) error
RemoveTopicBacklogQuota Remove a backlog quota policy from a topic.
func (*PersistentTopicsImpl) RemoveTopicCompactionThreshold ¶
func (p *PersistentTopicsImpl) RemoveTopicCompactionThreshold(tenant, namespace, topic string) error
func (*PersistentTopicsImpl) RemoveTopicMessageTTL ¶
func (p *PersistentTopicsImpl) RemoveTopicMessageTTL(tenant, namespace, topic string) error
func (*PersistentTopicsImpl) RemoveTopicRetention ¶
func (p *PersistentTopicsImpl) RemoveTopicRetention(tenant, namespace, topic string) error
func (*PersistentTopicsImpl) SetTopicBacklogQuota ¶
func (p *PersistentTopicsImpl) SetTopicBacklogQuota(tenant, namespace, topic string, cfg *BacklogQuota) error
SetTopicBacklogQuota Set a backlog quota for a topic.
func (*PersistentTopicsImpl) SetTopicCompactionThreshold ¶
func (p *PersistentTopicsImpl) SetTopicCompactionThreshold(tenant, namespace, topic string, threshold int64) error
func (*PersistentTopicsImpl) SetTopicMessageTTL ¶
func (p *PersistentTopicsImpl) SetTopicMessageTTL(tenant, namespace, topic string, seconds int64) error
func (*PersistentTopicsImpl) SetTopicRetention ¶
func (p *PersistentTopicsImpl) SetTopicRetention(tenant, namespace, topic string, cfg *RetentionConfiguration) error
func (*PersistentTopicsImpl) TriggerTopicCompaction ¶
func (p *PersistentTopicsImpl) TriggerTopicCompaction(tenant, namespace, topic string) error
func (*PersistentTopicsImpl) WithOptions ¶
func (p *PersistentTopicsImpl) WithOptions(opts ...Option) PersistentTopics
type PulsarAdmin ¶
type PulsarAdmin struct { Clusters Clusters Tenants Tenants Namespaces Namespaces PersistentTopics PersistentTopics NonPersistentTopics NonPersistentTopics Bookies Bookies Lookup Lookup }
func NewDefaultPulsarAdmin ¶
func NewDefaultPulsarAdmin() (*PulsarAdmin, error)
func NewPulsarAdmin ¶
func NewPulsarAdmin(config Config) (*PulsarAdmin, error)
func NewTestPulsarAdmin ¶
func NewTestPulsarAdmin(t *testing.T, port int) *PulsarAdmin
type RawBookieInfo ¶
type RawBookieInfo struct {
BookieId string `json:"bookieId"`
}
type RetentionConfiguration ¶
type RetentionConfiguration struct { RetentionSizeInMB int64 `json:"retentionSizeInMB"` RetentionTimeInMinutes int64 `json:"retentionTimeInMinutes"` }
RetentionConfiguration retention configuration
type RetentionPolicy ¶
type RetentionPolicy string
const ( ProducerRequestHold RetentionPolicy = "producer_request_hold" ProducerException RetentionPolicy = "producer_exception" ConsumerBacklogEviction RetentionPolicy = "consumer_backlog_eviction" )
type SubscriptionStats ¶
type TenantInfo ¶
type TenantsImpl ¶
type TenantsImpl struct {
// contains filtered or unexported fields
}
func (*TenantsImpl) Create ¶
func (t *TenantsImpl) Create(tenantName string, info TenantInfo) error
func (*TenantsImpl) Delete ¶
func (t *TenantsImpl) Delete(tenantName string) error
func (*TenantsImpl) List ¶
func (t *TenantsImpl) List() ([]string, error)
type TestBroker ¶
type TestBroker struct {
// contains filtered or unexported fields
}
func (*TestBroker) Close ¶
func (tb *TestBroker) Close() error
type TopicBacklog ¶
type TopicBacklog interface { // GetTopicBacklogQuota Get backlog quota map on a topic. GetTopicBacklogQuota(tenant, namespace, topic string) (*BacklogQuotaResp, error) // SetTopicBacklogQuota Set a backlog quota for a topic. SetTopicBacklogQuota(tenant, namespace, topic string, cfg *BacklogQuota) error // RemoveTopicBacklogQuota Remove a backlog quota policy from a topic. RemoveTopicBacklogQuota(tenant, namespace, topic string, opts ...Option) error // EstimatedOfflineTopicBacklog Get estimated backlog for offline topic. EstimatedOfflineTopicBacklog(tenant, namespace, topic string) (*OfflineTopicStats, error) // CalculateBacklogSizeByMessageID Calculate backlog size by a message ID (in bytes). CalculateBacklogSizeByMessageID(tenant, namespace, topic string) error }
type TopicCompaction ¶
type TopicCompaction interface { // GetTopicCompactionThreshold Get compaction threshold configuration for specified topic. GetTopicCompactionThreshold(tenant, namespace, topic string) (int64, error) // SetTopicCompactionThreshold Set compaction threshold configuration for specified topic. SetTopicCompactionThreshold(tenant, namespace, topic string, threshold int64) error // RemoveTopicCompactionThreshold Remove compaction threshold configuration for specified topic. RemoveTopicCompactionThreshold(tenant, namespace, topic string) error // GetTopicCompactionStatus Get the status of a compaction operation for a topic. GetTopicCompactionStatus(tenant, namespace, topic string) (*LongRunningProcessStatus, error) // TriggerTopicCompaction Trigger a compaction operation on a topic. TriggerTopicCompaction(tenant, namespace, topic string) error }
type TopicDomain ¶
type TopicDomain string
const ( TopicDomainPersistent TopicDomain = "persistent" TopicDomainNonPersistent TopicDomain = "non-persistent" )
type TopicInternalStats ¶
type TopicInternalStats struct { EntriesAddedCounter int64 `json:"entriesAddedCounter"` NumberOfEntries int64 `json:"numberOfEntries"` TotalSize int64 `json:"totalSize"` CurrentLedgerEntries int64 `json:"currentLedgerEntries"` CurrentLedgerSize int64 `json:"currentLedgerSize"` LastLedgerCreatedTimestamp string `json:"lastLedgerCreatedTimestamp"` LastLedgerCreationFailureTimestamp string `json:"lastLedgerCreationFailureTimestamp"` WaitingCursorsCount int32 `json:"waitingCursorsCount"` PendingAddEntriesCount int32 `json:"pendingAddEntriesCount"` LastConfirmedEntry string `json:"lastConfirmedEntry"` State string `json:"state"` Ledgers []*LedgerInfo `json:"ledgers"` Cursors map[string]CursorStats `json:"cursors"` SchemaLedgers []*LedgerInfo `json:"schemaLedgers"` CompactedLedger *LedgerInfo `json:"compactedLedger"` }
type TopicMessageTTL ¶
type TopicMessageTTL interface { // GetTopicMessageTTL Get the message TTL for the topic GetTopicMessageTTL(tenant, namespace, topic string) (int64, error) // SetTopicMessageTTL Set the message TTL for the topic SetTopicMessageTTL(tenant, namespace, topic string, seconds int64) error // RemoveTopicMessageTTL Remove the message TTL for the topic RemoveTopicMessageTTL(tenant, namespace, topic string) error }
TopicMessageTTL Discard data after some time (by automatically acknowledging)
type TopicRetention ¶
type TopicRetention interface { // GetTopicRetention get retention configuration for namespace. GetTopicRetention(tenant, namespace, topic string) (*RetentionConfiguration, error) // SetTopicRetention set retention configuration for namespace. SetTopicRetention(tenant, namespace, topic string, cfg *RetentionConfiguration) error // RemoveTopicRetention remove retention configuration for namespace. RemoveTopicRetention(tenant, namespace, topic string) error }
TopicRetention operate interface about retention configuration for specified topic.
type TopicStatistics ¶
type TopicStatistics struct { MsgRateIn float64 `json:"msgRateIn,omitempty"` MsgThroughputIn float64 `json:"msgThroughputIn,omitempty"` MsgRateOut float64 `json:"msgRateOut,omitempty"` MsgThroughputOut float64 `json:"msgThroughputOut,omitempty"` BytesInCounter uint64 `json:"bytesInCounter,omitempty"` MsgInCounter uint64 `json:"msgInCounter,omitempty"` BytesOutCounter uint64 `json:"bytesOutCounter,omitempty"` MsgOutCounter uint64 `json:"msgOutCounter,omitempty"` AverageMsgSize float64 `json:"averageMsgSize,omitempty"` MsgChunkPublished bool `json:"msgChunkPublished,omitempty"` StorageSize uint64 `json:"storageSize,omitempty"` BacklogSize uint64 `json:"backlogSize,omitempty"` PublishRateLimitedTimes uint64 `json:"publishRateLimitedTimes,omitempty"` EarliestMsgPublishTimeInBacklogs uint64 `json:"earliestMsgPublishTimeInBacklogs,omitempty"` OffloadedStorageSize uint64 `json:"offloadedStorageSize,omitempty"` LastOffloadLedgerId uint64 `json:"lastOffloadLedgerId,omitempty"` LastOffloadSuccessTimeStamp uint64 `json:"lastOffloadSuccessTimeStamp,omitempty"` LastOffloadFailureTimeStamp uint64 `json:"lastOffloadFailureTimeStamp,omitempty"` OngoingTxnCount uint64 `json:"ongoingTxnCount,omitempty"` AbortedTxnCount uint64 `json:"abortedTxnCount,omitempty"` CommittedTxnCount uint64 `json:"committedTxnCount,omitempty"` Publishers []string `json:"publishers,omitempty"` WaitingPublishers uint64 `json:"waitingPublishers,omitempty"` Subscriptions map[string]SubscriptionStats `json:"subscriptions,omitempty"` Replication map[string]string `json:"replication,omitempty"` DeduplicationStatus string `json:"deduplicationStatus,omitempty"` NonContiguousDeletedMessagesRanges uint64 `json:"nonContiguousDeletedMessagesRanges,omitempty"` NonContiguousDeletedMessagesRangesSerializedSize uint64 `json:"nonContiguousDeletedMessagesRangesSerializedSize,omitempty"` DelayedMessageIndexSizeInBytes uint64 `json:"delayedMessageIndexSizeInBytes,omitempty"` Compaction Compaction `json:"compaction"` OwnerBroker string `json:"ownerBroker,omitempty"` }
type TopicStats ¶
type TopicStats interface { GetStats(string, string, string) (*TopicStatistics, error) GetPartitionedStats(string, string, string) (*TopicStatistics, error) GetStatsInternal(string, string, string) (*TopicInternalStats, error) GetPartitionedStatsInternal(string, string, string) (*PartitionedTopicInternalStats, error) }
type Topics ¶
type Topics interface { CreateNonPartitioned(tenant, namespace, topic string) error CreatePartitioned(tenant, namespace, topic string, numPartitions int) error DeleteNonPartitioned(tenant, namespace, topic string) error DeletePartitioned(tenant, namespace, topic string) error ListNonPartitioned(tenant, namespace string) ([]string, error) ListPartitioned(tenant, namespace string) ([]string, error) ListNamespaceTopics(tenant, namespace string) ([]string, error) GetPartitionedMetadata(tenant, namespace, topic string) (*PartitionedMetadata, error) CreateMissedPartitions(tenant, namespace, topic string) error GetLastMessageId(tenant, namespace, topic string) (*MessageId, error) TopicRetention }
Topics persistent topic and non-persistent topic interface
Source Files ¶
- backlog_interface.go
- bookies_impl.go
- bookies_interface.go
- clusters.go
- compaction_interface.go
- config_interface.go
- http_client.go
- lookup_impl.go
- lookup_interface.go
- message.go
- message_ttl_interface.go
- namespaces_impl.go
- non_persistent_topics_impl.go
- persistent_topics_impl.go
- pulsar_admin.go
- pulsar_broker_test_util.go
- random_util.go
- retention_interface.go
- stats_interface.go
- tenant_impl.go
- tenants_interface.go
- test_util.go
- topic_interface.go
- url_const.go
- util.go