Documentation ¶
Index ¶
- Constants
- Variables
- func GetNToken(privateKeyPath, domain, service, keyVersion string, expireTime time.Duration) (ntoken string, err error)
- func GetRoleToken(url, authHeader, ntoken, tenantDomain, providerDomain, service, role string) (roleToken *zts.RoleToken, err error)
- func InitOptions(opts *Options) (err error)
- func NewCommandConnect(c *Config, useCache bool) (connect *pulsar_proto.CommandConnect, err error)
- func NewConn(c *Config) (conn net.Conn, err error)
- func NewTcpConn(c *Config) (conn net.Conn, err error)
- func NewTlsConn(c *Config) (conn net.Conn, err error)
- type AsyncConn
- func (ac *AsyncConn) Close()
- func (ac *AsyncConn) Connect(msg *pulsar_proto.CommandConnect) (err error)
- func (ac *AsyncConn) GetCommandFromBroker() (cmd *pulsar_proto.BaseCommand)
- func (ac *AsyncConn) GetConfig() (c *Config)
- func (ac *AsyncConn) GetConnection() (conn Conn)
- func (ac *AsyncConn) GetID() (id string)
- func (ac *AsyncConn) LookupTopic(msg *pulsar_proto.CommandLookupTopic) (res *pulsar_proto.CommandLookupTopicResponse, err error)
- func (ac *AsyncConn) Receive() (response *Response, err error)
- func (ac *AsyncConn) Request(r *Request) (response *Response, err error)
- func (ac *AsyncConn) Run()
- func (ac *AsyncConn) Send(r *Request) (err error)
- type AsyncConns
- type AthenzConfig
- type AthenzKey
- type AthenzKeys
- type Authentication
- type AuthenticationAthenz
- type AuthenticationDataAthenz
- type AuthenticationDataProvider
- type Certificate
- type Config
- type Conn
- type ConnPool
- type ConnectionState
- type Consumer
- func (c *Consumer) CloseConsumer(consumerId, requestId uint64) (err error)
- func (c *Consumer) Flow(consumerId uint64, messagePermits uint32) (err error)
- func (c *Consumer) ReceiveMessage() (msg *Message, err error)
- func (c *Consumer) SendAck(consumerId uint64, ackType pulsar_proto.CommandAck_AckType, ...) (err error)
- func (c *Consumer) SendRedeliverUnacknowledgedMessages(subType pulsar_proto.CommandSubscribe_SubType, consumerId uint64, ...) (err error)
- func (c *Consumer) Subscribe(topic, subscription string, subType pulsar_proto.CommandSubscribe_SubType, ...) (err error)
- type IniConfig
- type KeyValue
- type KeyValues
- type Message
- type Options
- type PoolStatus
- type PrivateKey
- type Producer
- func (p *Producer) CloseProducer(producerId, requestId uint64) (err error)
- func (p *Producer) CreateProducer(topic string, producerId, requestId uint64) (err error)
- func (p *Producer) ReceiveProducerSuccess() (success *pulsar_proto.CommandProducerSuccess, err error)
- func (p *Producer) ReceiveSendReceipt() (receipt *pulsar_proto.CommandSendReceipt, err error)
- func (p *Producer) SendBatchSend(producerId, sequenceId uint64, producerName string, ...) (err error)
- func (p *Producer) SendSend(producerId, sequenceId uint64, producerName, payload string, ...) (err error)
- type PulsarClient
- func (c *PulsarClient) Close()
- func (c *PulsarClient) ConnectToBroker(response *pulsar_proto.CommandLookupTopicResponse) (ac *AsyncConn, err error)
- func (c *PulsarClient) GetPartitionedTopicMetadata(topic string, requestId uint64) (err error)
- func (c *PulsarClient) KeepAlive() (err error)
- func (c *PulsarClient) LookupTopicWithConnect(conn Conn, topic string, requestId uint64, authoritative bool) (ac *AsyncConn, err error)
- func (c *PulsarClient) ReceiveSuccess() (success *pulsar_proto.CommandSuccess, err error)
- func (c *PulsarClient) SetLookupTopicConnection(topic string, requestId uint64, authoritative bool) (err error)
- type Request
- type Response
Constants ¶
View Source
const ( ClientName = "go-pulsar" DefaultProtocolVersion = 9 )
View Source
const ( OptionsCommandConsume = "consume" OptionsCommandProduce = "produce" )
View Source
const (
DefaultDeadlineTimeout = time.Duration(40) * time.Second
)
View Source
const (
OptionsAuthMethodAthenz = "athenz"
)
View Source
const (
PROTO_TCP = "tcp"
)
Variables ¶
View Source
var ( ErrKeepAlive = errors.New("failed to receive pong command") ErrLookupTopicResponseFailed = errors.New( "got failed as response type from lookup topic", ) )
View Source
var ( ErrAppendTrustCerts = errors.New("failed to append trust certs file") ErrNoConnection = errors.New("need to establish a connection") ErrSentConnect = errors.New("connecting now, wait for a couple of seconds") ErrHasConnection = errors.New("connection has already established") ErrCloseReadChan = errors.New("read channel has closed") ErrCloseProducerByBroker = errors.New("producer has closed by broker") ErrCloseConsumerByBroker = errors.New("consumer has closed by broker") )
Functions ¶
func GetRoleToken ¶
func InitOptions ¶
func NewCommandConnect ¶
func NewCommandConnect( c *Config, useCache bool, ) (connect *pulsar_proto.CommandConnect, err error)
Types ¶
type AsyncConn ¶
type AsyncConn struct {
// contains filtered or unexported fields
}
func (*AsyncConn) Connect ¶
func (ac *AsyncConn) Connect(msg *pulsar_proto.CommandConnect) (err error)
func (*AsyncConn) GetCommandFromBroker ¶
func (ac *AsyncConn) GetCommandFromBroker() (cmd *pulsar_proto.BaseCommand)
func (*AsyncConn) GetConnection ¶
func (*AsyncConn) LookupTopic ¶
func (ac *AsyncConn) LookupTopic( msg *pulsar_proto.CommandLookupTopic, ) (res *pulsar_proto.CommandLookupTopicResponse, err error)
type AsyncConns ¶
type AsyncConns []*AsyncConn
type AthenzConfig ¶
type AthenzConfig struct { ZmsUrl string `json:"zmsUrl"` ZtsUrl string `json:"ztsUrl"` ZtsPublicKeys AthenzKeys `json:"ztsPublicKeys"` ZmsPublicKeys AthenzKeys `json:"zmsPublicKeys"` }
func GetAthenzConfig ¶
func GetAthenzConfig(path string) (athenzConfig *AthenzConfig, err error)
type AthenzKeys ¶
type AthenzKeys []AthenzKey
type Authentication ¶
type Authentication interface { GetAuthMethodName() string GetAuthData() (AuthenticationDataProvider, error) Configure(map[string]string) Start() error }
func NewAuthentication ¶
func NewAuthentication(name string, config *Config) (auth Authentication, err error)
type AuthenticationAthenz ¶
type AuthenticationAthenz struct {
// contains filtered or unexported fields
}
func NewAuthenticationAthenz ¶
func NewAuthenticationAthenz(config *Config) (athenz *AuthenticationAthenz)
func (*AuthenticationAthenz) Configure ¶
func (a *AuthenticationAthenz) Configure(authParams map[string]string)
func (*AuthenticationAthenz) GetAuthData ¶
func (a *AuthenticationAthenz) GetAuthData() ( provider AuthenticationDataProvider, err error, )
func (*AuthenticationAthenz) GetAuthMethodName ¶
func (a *AuthenticationAthenz) GetAuthMethodName() (name string)
func (*AuthenticationAthenz) Start ¶
func (a *AuthenticationAthenz) Start() (err error)
type AuthenticationDataAthenz ¶
type AuthenticationDataAthenz struct { AuthenticationDataProvider // contains filtered or unexported fields }
func NewAuthenticationDataAthenz ¶
func NewAuthenticationDataAthenz( roleToken, httpHeaderName string, ) (provider *AuthenticationDataAthenz)
func (*AuthenticationDataAthenz) GetCommandData ¶
func (a *AuthenticationDataAthenz) GetCommandData() string
func (*AuthenticationDataAthenz) GetHttpHeaders ¶
func (a *AuthenticationDataAthenz) GetHttpHeaders() (headers http.Header)
func (*AuthenticationDataAthenz) HasDataForTls ¶
func (a *AuthenticationDataAthenz) HasDataForTls() bool
func (*AuthenticationDataAthenz) HasDataFromCommand ¶
func (a *AuthenticationDataAthenz) HasDataFromCommand() bool
type AuthenticationDataProvider ¶
type AuthenticationDataProvider interface { HasDataForTls() bool GetTlsCertificates() []Certificate GetTlsPrivateKey() PrivateKey HasDataForHttp() bool GetHttpAuthType() string GetHttpHeaders() http.Header HasDataFromCommand() bool GetCommandData() string }
type Certificate ¶
type Certificate struct { }
type Config ¶
type Config struct { Proto string LocalAddr *net.TCPAddr RemoteAddr *net.TCPAddr Timeout time.Duration MinConnectionNum int MaxConnectionNum int AuthMethod string AuthParams map[string]string UseTLS bool TLSAllowInsecureConnection bool TLSTrustCertsFilepath string AuthenticationDataProvider AuthenticationDataProvider AthenzConfig *AthenzConfig AthenzAuthHeader string ServiceURL *url.URL LogLevel log.Level }
func NewConfigFromIni ¶
func NewConfigFromOptions ¶
type Conn ¶
type Conn interface { GetID() string GetConfig() *Config GetConnection() Conn GetCommandFromBroker() *pulsar_proto.BaseCommand LookupTopic(*pulsar_proto.CommandLookupTopic, ) (*pulsar_proto.CommandLookupTopicResponse, error) Connect(*pulsar_proto.CommandConnect) error Send(*Request) error Receive() (*Response, error) Request(*Request) (*Response, error) Close() }
type ConnPool ¶
type ConnPool struct {
// contains filtered or unexported fields
}
func NewConnPool ¶
func (*ConnPool) GetStatus ¶
func (p *ConnPool) GetStatus() (status *PoolStatus)
type ConnectionState ¶
type ConnectionState int
const ( ConnectionStateNone ConnectionState = iota + 1 ConnectionStateSentConnectFrame ConnectionStateReady )
type Consumer ¶
type Consumer struct {
*PulsarClient
}
func NewConsumer ¶
func NewConsumer(client *PulsarClient) (c *Consumer)
func (*Consumer) CloseConsumer ¶
func (*Consumer) ReceiveMessage ¶
func (*Consumer) SendAck ¶
func (c *Consumer) SendAck( consumerId uint64, ackType pulsar_proto.CommandAck_AckType, msgIdData *pulsar_proto.MessageIdData, validationError *pulsar_proto.CommandAck_ValidationError, ) (err error)
func (*Consumer) SendRedeliverUnacknowledgedMessages ¶
func (c *Consumer) SendRedeliverUnacknowledgedMessages( subType pulsar_proto.CommandSubscribe_SubType, consumerId uint64, idsList []*pulsar_proto.MessageIdData, ) (err error)
func (*Consumer) Subscribe ¶
func (c *Consumer) Subscribe( topic, subscription string, subType pulsar_proto.CommandSubscribe_SubType, consumerId, requestId uint64, ) (err error)
type IniConfig ¶
type IniConfig struct { LogLevelString string `ini:"log_level"` ServiceURLString string `ini:"service_url"` Timeout time.Duration `ini:"timeout"` MinConnectionNum int `ini:"min_connection_num"` MaxConnectionNum int `ini:"max_connection_num"` AuthMethod string `ini:"auth_method"` AuthParams string `ini:"auth_params"` UseTLS bool `ini:"use_tls"` TLSAllowInsecureConnection bool `ini:"tls_allow_insecure_connection"` TLSTrustCertsFilepath string `ini:"tls_trust_certs_filepath"` AthenzConf string `ini:"athenz_conf"` AthenzAuthHeader string `ini:"athenz_auth_header"` // internal use ServiceURL *url.URL `ini:"-"` LogLevel log.Level `ini:"-"` AthenzConfig *AthenzConfig `ini:"-"` }
func LoadIniFile ¶
type KeyValues ¶
type KeyValues []KeyValue
func ConvertPropertiesToKeyValues ¶
func ConvertPropertiesToKeyValues( properties []*pulsar_proto.KeyValue, ) (keyValues KeyValues)
func (KeyValues) Convert ¶
func (kvs KeyValues) Convert() (properties []*pulsar_proto.KeyValue)
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
func NewMessage ¶
func NewMessage( cmd *pulsar_proto.CommandMessage, meta *pulsar_proto.MessageMetadata, body string, batchMessage command.BatchMessage, ) (msg *Message)
func (Message) GetBatchMessage ¶
func (m Message) GetBatchMessage() (batchMessage command.BatchMessage)
func (Message) GetKeyValues ¶
func (Message) GetMessageId ¶
func (m Message) GetMessageId() (data *pulsar_proto.MessageIdData)
func (Message) HasBatchMessage ¶
type Options ¶
type Options struct { // common options ServiceURLString *string `long:"serviceUrl" env:"PULSAR_SERVICE_URL" description:"pulsar service url"` AuthMethod *string `long:"authMethod" env:"PULSAR_AUTH_METHOD" description:"authentication method"` AuthParams *string `long:"authParams" env:"PULSAR_AUTH_PARAMS" description:"authentication params"` UseTLS bool `long:"useTls" env:"USE_TLS" description:"use tls to connect"` TLSAllowInsecureConnection bool `long:"tlsAllowInsecureConnection" env:"TLS_ALLOW_INSECURE_CONNECTION" description:"allow insecure tls connection"` AthenzConf *string `long:"athenzConf" env:"PULSAR_ATHENZ_CONF" description:"path to athenz config file"` AthenzAuthHeader *string `long:"athenzAuthHeader" env:"PULSAR_ATHENZ_AUTH_HEADER" description:"athenz authentication header"` Conf *string `long:"conf" env:"PULSAR_CONF" description:"path to pulsar config file"` Verbose bool `long:"verbose" env:"VERBOSE" description:"use verbose mode"` Timeout *time.Duration `long:"timeout" env:"PULSAR_TIMEOUT" description:"timeout to communicate with pulsar broker"` Command *string `long:"command" env:"PULSAR_COMMAND" description:"produce or consume"` Topic string `long:"topic" env:"PULSAR_TOPIC" required:"true" description:"topic name"` // for producer Messages []string `long:"messages" env:"PULSAR_MESSAGES" description:"messages to produce"` Properties []string `long:"properties" env:"PULSAR_PROPERTIES" description:"properties to produce. e.g) key1:value1,key2:value2"` // for consumer NumMessages int `long:"numMessages" env:"PULSAR_NUM_MESSAGES" default:"1" description:"number of messages to consume"` SubscriptionName string `long:"subscriptionName" env:"PULSAR_SUBSCRIPTION_NAME" description:"subscription name"` SubscriptionType string `` /* 135-byte string literal not displayed */ // internal use ServiceURL *url.URL }
type PoolStatus ¶
type PoolStatus struct {
// contains filtered or unexported fields
}
func (*PoolStatus) String ¶
func (p *PoolStatus) String() (s string)
type PrivateKey ¶
type PrivateKey interface { }
type Producer ¶
type Producer struct {
*PulsarClient
}
func NewProducer ¶
func NewProducer(client *PulsarClient) (p *Producer)
func (*Producer) CloseProducer ¶
func (*Producer) CreateProducer ¶
func (*Producer) ReceiveProducerSuccess ¶
func (p *Producer) ReceiveProducerSuccess() ( success *pulsar_proto.CommandProducerSuccess, err error, )
func (*Producer) ReceiveSendReceipt ¶
func (p *Producer) ReceiveSendReceipt() ( receipt *pulsar_proto.CommandSendReceipt, err error, )
func (*Producer) SendBatchSend ¶
func (p *Producer) SendBatchSend( producerId, sequenceId uint64, producerName string, batchMessage command.BatchMessage, compression *pulsar_proto.CompressionType, ) (err error)
type PulsarClient ¶
type PulsarClient struct { Conn // contains filtered or unexported fields }
func NewClient ¶
func NewClient(ac *AsyncConn) (client *PulsarClient)
func (*PulsarClient) Close ¶
func (c *PulsarClient) Close()
func (*PulsarClient) ConnectToBroker ¶
func (c *PulsarClient) ConnectToBroker( response *pulsar_proto.CommandLookupTopicResponse, ) (ac *AsyncConn, err error)
func (*PulsarClient) GetPartitionedTopicMetadata ¶
func (c *PulsarClient) GetPartitionedTopicMetadata( topic string, requestId uint64, ) (err error)
func (*PulsarClient) KeepAlive ¶
func (c *PulsarClient) KeepAlive() (err error)
func (*PulsarClient) LookupTopicWithConnect ¶
func (*PulsarClient) ReceiveSuccess ¶
func (c *PulsarClient) ReceiveSuccess() ( success *pulsar_proto.CommandSuccess, err error, )
func (*PulsarClient) SetLookupTopicConnection ¶
func (c *PulsarClient) SetLookupTopicConnection( topic string, requestId uint64, authoritative bool, ) (err error)
Set c.conn to a broker received by lookup topic response
type Request ¶
type Request struct { Message proto.Message Meta *pulsar_proto.MessageMetadata Payload string BatchMessage command.BatchMessage }
type Response ¶
type Response struct { BaseCommand *command.Base Meta *pulsar_proto.MessageMetadata Payload string BatchMessage command.BatchMessage Error error }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.