pulsar

package module
v0.0.0-...-54a44df Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2018 License: Apache-2.0 Imports: 23 Imported by: 0

README

go-pulsar

go-pulsar is a pulsar client library.

Development Status

Alpha.

go-pulsar is still under heavy development. Some functionality are known to be broken, missing or incomplete. The interface may also change.

TODO
  • partitioned topics functions
  • payload compression
  • unimplemented commands
  • error handling

How to build

pulsar-client

pulsar-client is a cli tool to use go-pulsar library. It's like Java's Pulsar client tool.

$ mkdir work && cd work
$ export GOPATH="$(pwd)"
$ mkdir -p src/github.com/t2y/go-pulsar
$ cd src/github.com/t2y/go-pulsar
$ git clone git@github.com:t2y/go-pulsar.git .

Get dependency libraries and build it.

$ make install-glide
$ make deps
$ make
$ ./bin/pulsar-client --help
Usage:
  pulsar-client [OPTIONS]

Application Options:
      --serviceUrl=                 pulsar service url [$PULSAR_SERVICE_URL]
      --authMethod=                 authentication method [$PULSAR_AUTH_METHOD]
      --authParams=                 authentication params [$PULSAR_AUTH_PARAMS]
      --useTls                      use tls to connect [$USE_TLS]
      --tlsAllowInsecureConnection  allow insecure tls connection [$TLS_ALLOW_INSECURE_CONNECTION]
      --athenzConf=                 path to athenz config file [$PULSAR_ATHENZ_CONF]
      --athenzAuthHeader=           athenz authentication header [$PULSAR_ATHENZ_AUTH_HEADER]
      --conf=                       path to pulsar config file [$PULSAR_CONF]
      --verbose                     use verbose mode [$VERBOSE]
      --timeout=                    timeout to communicate with pulsar broker [$PULSAR_TIMEOUT]
      --command=                    produce or consume [$PULSAR_COMMAND]
      --topic=                      topic name [$PULSAR_TOPIC]
      --messages=                   messages to produce [$PULSAR_MESSAGES]
      --properties=                 properties to produce. e.g) key1:value1,key2:value2 [$PULSAR_PROPERTIES]
      --numMessages=                number of messages to consume (default: 1) [$PULSAR_NUM_MESSAGES]
      --subscriptionName=           subscription name [$PULSAR_SUBSCRIPTION_NAME]
      --subscriptionType=           subscription type: exclusive, shared, failover (default: exclusive) [$PULSAR_SUBSCRIPTION_TYPE]

Help Options:
  -h, --help                        Show this help message

Some options can be set by ini file. There're a sample file in example directory.

$ cat example/default.ini
log_level = info
service_url = pulsar://localhost:6650/
timeout = 40s
min_connection_num = 2
max_connection_num = 20
Code generation for pulsar protocol

pulsar protocol is defined using Protocol Buffers.

go-pulsar also uses PulsarApi.proto and generates go source code: PulsarApi.pb.go.

First of all, install protoc command for your platform. For example, use Homebrew on macOS.

$ brew install protobuf

To get latest proto file and re-generate go source code, make as follows.

$ make install-pb
$ make gen-pb

The following files are updated

  • proto/PulsarApi.proto
  • proto/pb/PulsarApi.pb.go

Getting started

Startup Pulsar server

Build and install pulsar server.

$ git clone https://github.com/apache/incubator-pulsar.git
$ cd pulsar
$ mvn install -DskipTests

Start up a standalone pulsar server for development.

$ ./bin/pulsar standalone
Producer
$ ./bin/pulsar-client --conf example/default.ini --command produce --topic "persistent://sample/standalone/ns1/my-topic" --messages "Hello Pulsar"
INFO[2017-06-15T08:43:47.887709192+09:00] read and parse ini file                       iniConf=&{info pulsar://localhost:6650/ 40s 2 20 pulsar://localhost:6650/ info} path="example/default.ini"
INFO[2017-06-15T08:43:49.222165203+09:00] messages successfully produced                messages=[Hello Pulsar] properties=[]

The --verbose option makes debug easy. It shows communications between producers/consumers and brokers.

Consumer
$ ./bin/pulsar-client --conf example/default.ini --command consume --topic "persistent://sample/standalone/ns1/my-topic" --subscriptionName sub
INFO[2017-06-15T08:50:33.467806336+09:00] read and parse ini file                       iniConf=&{info pulsar://localhost:6650/ 40s 2 20 pulsar://localhost:6650/ info} path="example/default.ini"
INFO[2017-06-15T08:50:34.515306354+09:00] messages successfully consumed                key-value=[] message="Hello Pulsar"

Authentication

go-pulsar supports Athenz authentication.

Read above documentation for each athenz parameters. Then, you can set some or all parameters on your needs into ini file. There're a sample file for athenz in example directory.

$ cat example/athenz.ini 
log_level = info
service_url = pulsar://localhost:6650/
timeout = 40s
min_connection_num = 2
max_connection_num = 20

auth_method=athenz
auth_params=tenantDomain:${yourDomain},tenantService:${yourService},providerDomain:${yourProviderDomain},privateKeyPath:${pathToPrivateKeyFile},keyId:0
use_tls=true
tls_allow_insecure_connection=false
tls_trust_certs_filepath=${pathToRootCaCertsFileIfNeeded}

athenz_conf=${pathToAthenzConfFile}
athenz_auth_header=${yourAthenzAuthenticationHeader}

License

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0

Documentation

Index

Constants

View Source
const (
	ClientName             = "go-pulsar"
	DefaultProtocolVersion = 9
)
View Source
const (
	OptionsCommandConsume = "consume"
	OptionsCommandProduce = "produce"
)
View Source
const (
	DefaultDeadlineTimeout = time.Duration(40) * time.Second
)
View Source
const (
	OptionsAuthMethodAthenz = "athenz"
)
View Source
const (
	PROTO_TCP = "tcp"
)

Variables

View Source
var (
	ErrKeepAlive                 = errors.New("failed to receive pong command")
	ErrLookupTopicResponseFailed = errors.New(
		"got failed as response type from lookup topic",
	)
)
View Source
var (
	ErrAppendTrustCerts = errors.New("failed to append trust certs file")

	ErrNoConnection  = errors.New("need to establish a connection")
	ErrSentConnect   = errors.New("connecting now, wait for a couple of seconds")
	ErrHasConnection = errors.New("connection has already established")
	ErrCloseReadChan = errors.New("read channel has closed")

	ErrCloseProducerByBroker = errors.New("producer has closed by broker")
	ErrCloseConsumerByBroker = errors.New("consumer has closed by broker")
)
View Source
var (
	ErrExistsConnInPool = errors.New("same connection exists in pool")
	ErrNoAvailablePool  = errors.New("no available connection in pool")
	ErrReachMaxConn     = errors.New("connections in pool reach a maximum number")
)

Functions

func GetNToken

func GetNToken(
	privateKeyPath, domain, service, keyVersion string, expireTime time.Duration,
) (ntoken string, err error)

func GetRoleToken

func GetRoleToken(
	url, authHeader, ntoken, tenantDomain, providerDomain, service, role string,
) (roleToken *zts.RoleToken, err error)

func InitOptions

func InitOptions(opts *Options) (err error)

func NewCommandConnect

func NewCommandConnect(
	c *Config, useCache bool,
) (connect *pulsar_proto.CommandConnect, err error)

func NewConn

func NewConn(c *Config) (conn net.Conn, err error)

func NewTcpConn

func NewTcpConn(c *Config) (conn net.Conn, err error)

func NewTlsConn

func NewTlsConn(c *Config) (conn net.Conn, err error)

Types

type AsyncConn

type AsyncConn struct {
	// contains filtered or unexported fields
}

func NewAsyncConn

func NewAsyncConn(c *Config, conn net.Conn) (ac *AsyncConn)

func (*AsyncConn) Close

func (ac *AsyncConn) Close()

func (*AsyncConn) Connect

func (ac *AsyncConn) Connect(msg *pulsar_proto.CommandConnect) (err error)

func (*AsyncConn) GetCommandFromBroker

func (ac *AsyncConn) GetCommandFromBroker() (cmd *pulsar_proto.BaseCommand)

func (*AsyncConn) GetConfig

func (ac *AsyncConn) GetConfig() (c *Config)

func (*AsyncConn) GetConnection

func (ac *AsyncConn) GetConnection() (conn Conn)

func (*AsyncConn) GetID

func (ac *AsyncConn) GetID() (id string)

func (*AsyncConn) LookupTopic

func (*AsyncConn) Receive

func (ac *AsyncConn) Receive() (response *Response, err error)

func (*AsyncConn) Request

func (ac *AsyncConn) Request(r *Request) (response *Response, err error)

func (*AsyncConn) Run

func (ac *AsyncConn) Run()

func (*AsyncConn) Send

func (ac *AsyncConn) Send(r *Request) (err error)

type AsyncConns

type AsyncConns []*AsyncConn

type AthenzConfig

type AthenzConfig struct {
	ZmsUrl        string     `json:"zmsUrl"`
	ZtsUrl        string     `json:"ztsUrl"`
	ZtsPublicKeys AthenzKeys `json:"ztsPublicKeys"`
	ZmsPublicKeys AthenzKeys `json:"zmsPublicKeys"`
}

func GetAthenzConfig

func GetAthenzConfig(path string) (athenzConfig *AthenzConfig, err error)

type AthenzKey

type AthenzKey struct {
	ID  string `json:"id"`
	Key string `json:"key"`
}

type AthenzKeys

type AthenzKeys []AthenzKey

type Authentication

type Authentication interface {
	GetAuthMethodName() string
	GetAuthData() (AuthenticationDataProvider, error)
	Configure(map[string]string)
	Start() error
}

func NewAuthentication

func NewAuthentication(name string, config *Config) (auth Authentication, err error)

type AuthenticationAthenz

type AuthenticationAthenz struct {
	// contains filtered or unexported fields
}

func NewAuthenticationAthenz

func NewAuthenticationAthenz(config *Config) (athenz *AuthenticationAthenz)

func (*AuthenticationAthenz) Configure

func (a *AuthenticationAthenz) Configure(authParams map[string]string)

func (*AuthenticationAthenz) GetAuthData

func (a *AuthenticationAthenz) GetAuthData() (
	provider AuthenticationDataProvider, err error,
)

func (*AuthenticationAthenz) GetAuthMethodName

func (a *AuthenticationAthenz) GetAuthMethodName() (name string)

func (*AuthenticationAthenz) Start

func (a *AuthenticationAthenz) Start() (err error)

type AuthenticationDataAthenz

type AuthenticationDataAthenz struct {
	AuthenticationDataProvider
	// contains filtered or unexported fields
}

func NewAuthenticationDataAthenz

func NewAuthenticationDataAthenz(
	roleToken, httpHeaderName string,
) (provider *AuthenticationDataAthenz)

func (*AuthenticationDataAthenz) GetCommandData

func (a *AuthenticationDataAthenz) GetCommandData() string

func (*AuthenticationDataAthenz) GetHttpHeaders

func (a *AuthenticationDataAthenz) GetHttpHeaders() (headers http.Header)

func (*AuthenticationDataAthenz) HasDataForTls

func (a *AuthenticationDataAthenz) HasDataForTls() bool

func (*AuthenticationDataAthenz) HasDataFromCommand

func (a *AuthenticationDataAthenz) HasDataFromCommand() bool

type AuthenticationDataProvider

type AuthenticationDataProvider interface {
	HasDataForTls() bool
	GetTlsCertificates() []Certificate
	GetTlsPrivateKey() PrivateKey
	HasDataForHttp() bool
	GetHttpAuthType() string
	GetHttpHeaders() http.Header
	HasDataFromCommand() bool
	GetCommandData() string
}

type Certificate

type Certificate struct {
}

type Config

type Config struct {
	Proto            string
	LocalAddr        *net.TCPAddr
	RemoteAddr       *net.TCPAddr
	Timeout          time.Duration
	MinConnectionNum int
	MaxConnectionNum int

	AuthMethod                 string
	AuthParams                 map[string]string
	UseTLS                     bool
	TLSAllowInsecureConnection bool
	TLSTrustCertsFilepath      string

	AuthenticationDataProvider AuthenticationDataProvider

	AthenzConfig     *AthenzConfig
	AthenzAuthHeader string

	ServiceURL *url.URL
	LogLevel   log.Level
}

func NewConfigFromIni

func NewConfigFromIni(iniConf *IniConfig) (c *Config, err error)

func NewConfigFromOptions

func NewConfigFromOptions(opts *Options) (c *Config, err error)

func (*Config) Copy

func (c *Config) Copy() (config *Config)

type Conn

type Conn interface {
	GetID() string
	GetConfig() *Config
	GetConnection() Conn
	GetCommandFromBroker() *pulsar_proto.BaseCommand
	LookupTopic(*pulsar_proto.CommandLookupTopic,
	) (*pulsar_proto.CommandLookupTopicResponse, error)
	Connect(*pulsar_proto.CommandConnect) error
	Send(*Request) error
	Receive() (*Response, error)
	Request(*Request) (*Response, error)
	Close()
}

type ConnPool

type ConnPool struct {
	// contains filtered or unexported fields
}

func NewConnPool

func NewConnPool(c *Config) (p *ConnPool, err error)

func (*ConnPool) Close

func (p *ConnPool) Close()

func (*ConnPool) Delete

func (p *ConnPool) Delete(c *AsyncConn)

func (*ConnPool) Get

func (p *ConnPool) Get() (c *AsyncConn, err error)

func (*ConnPool) GetStatus

func (p *ConnPool) GetStatus() (status *PoolStatus)

func (*ConnPool) Put

func (p *ConnPool) Put(c *AsyncConn) (err error)

type ConnectionState

type ConnectionState int
const (
	ConnectionStateNone ConnectionState = iota + 1
	ConnectionStateSentConnectFrame
	ConnectionStateReady
)

type Consumer

type Consumer struct {
	*PulsarClient
}

func NewConsumer

func NewConsumer(client *PulsarClient) (c *Consumer)

func (*Consumer) CloseConsumer

func (c *Consumer) CloseConsumer(
	consumerId, requestId uint64,
) (err error)

func (*Consumer) Flow

func (c *Consumer) Flow(
	consumerId uint64, messagePermits uint32,
) (err error)

func (*Consumer) ReceiveMessage

func (c *Consumer) ReceiveMessage() (msg *Message, err error)

func (*Consumer) SendAck

func (c *Consumer) SendAck(
	consumerId uint64, ackType pulsar_proto.CommandAck_AckType,
	msgIdData *pulsar_proto.MessageIdData,
	validationError *pulsar_proto.CommandAck_ValidationError,
) (err error)

func (*Consumer) SendRedeliverUnacknowledgedMessages

func (c *Consumer) SendRedeliverUnacknowledgedMessages(
	subType pulsar_proto.CommandSubscribe_SubType,
	consumerId uint64,
	idsList []*pulsar_proto.MessageIdData,
) (err error)

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(
	topic, subscription string,
	subType pulsar_proto.CommandSubscribe_SubType,
	consumerId, requestId uint64,
) (err error)

type IniConfig

type IniConfig struct {
	LogLevelString string `ini:"log_level"`

	ServiceURLString string        `ini:"service_url"`
	Timeout          time.Duration `ini:"timeout"`
	MinConnectionNum int           `ini:"min_connection_num"`
	MaxConnectionNum int           `ini:"max_connection_num"`

	AuthMethod                 string `ini:"auth_method"`
	AuthParams                 string `ini:"auth_params"`
	UseTLS                     bool   `ini:"use_tls"`
	TLSAllowInsecureConnection bool   `ini:"tls_allow_insecure_connection"`
	TLSTrustCertsFilepath      string `ini:"tls_trust_certs_filepath"`
	AthenzConf                 string `ini:"athenz_conf"`
	AthenzAuthHeader           string `ini:"athenz_auth_header"`

	// internal use
	ServiceURL   *url.URL      `ini:"-"`
	LogLevel     log.Level     `ini:"-"`
	AthenzConfig *AthenzConfig `ini:"-"`
}

func LoadIniFile

func LoadIniFile(path string) (iniConf *IniConfig, err error)

type KeyValue

type KeyValue struct {
	Key   string
	Value string
}

type KeyValues

type KeyValues []KeyValue

func ConvertPropertiesToKeyValues

func ConvertPropertiesToKeyValues(
	properties []*pulsar_proto.KeyValue,
) (keyValues KeyValues)

func (KeyValues) Convert

func (kvs KeyValues) Convert() (properties []*pulsar_proto.KeyValue)

type Message

type Message struct {
	// contains filtered or unexported fields
}

func NewMessage

func NewMessage(
	cmd *pulsar_proto.CommandMessage,
	meta *pulsar_proto.MessageMetadata,
	body string,
	batchMessage command.BatchMessage,
) (msg *Message)

func (Message) GetBatchMessage

func (m Message) GetBatchMessage() (batchMessage command.BatchMessage)

func (Message) GetBody

func (m Message) GetBody() (body string)

func (Message) GetKeyValues

func (m Message) GetKeyValues() (keyValues KeyValues)

func (Message) GetMessageId

func (m Message) GetMessageId() (data *pulsar_proto.MessageIdData)

func (Message) HasBatchMessage

func (m Message) HasBatchMessage() (result bool)

type Options

type Options struct {
	// common options
	ServiceURLString           *string        `long:"serviceUrl" env:"PULSAR_SERVICE_URL" description:"pulsar service url"`
	AuthMethod                 *string        `long:"authMethod" env:"PULSAR_AUTH_METHOD" description:"authentication method"`
	AuthParams                 *string        `long:"authParams" env:"PULSAR_AUTH_PARAMS" description:"authentication params"`
	UseTLS                     bool           `long:"useTls" env:"USE_TLS" description:"use tls to connect"`
	TLSAllowInsecureConnection bool           `long:"tlsAllowInsecureConnection" env:"TLS_ALLOW_INSECURE_CONNECTION" description:"allow insecure tls connection"`
	AthenzConf                 *string        `long:"athenzConf" env:"PULSAR_ATHENZ_CONF" description:"path to athenz config file"`
	AthenzAuthHeader           *string        `long:"athenzAuthHeader" env:"PULSAR_ATHENZ_AUTH_HEADER" description:"athenz authentication header"`
	Conf                       *string        `long:"conf" env:"PULSAR_CONF" description:"path to pulsar config file"`
	Verbose                    bool           `long:"verbose" env:"VERBOSE" description:"use verbose mode"`
	Timeout                    *time.Duration `long:"timeout" env:"PULSAR_TIMEOUT" description:"timeout to communicate with pulsar broker"`

	Command *string `long:"command" env:"PULSAR_COMMAND" description:"produce or consume"`
	Topic   string  `long:"topic" env:"PULSAR_TOPIC" required:"true" description:"topic name"`

	// for producer
	Messages   []string `long:"messages" env:"PULSAR_MESSAGES" description:"messages to produce"`
	Properties []string `long:"properties" env:"PULSAR_PROPERTIES" description:"properties to produce. e.g) key1:value1,key2:value2"`

	// for consumer
	NumMessages      int    `long:"numMessages" env:"PULSAR_NUM_MESSAGES" default:"1" description:"number of messages to consume"`
	SubscriptionName string `long:"subscriptionName" env:"PULSAR_SUBSCRIPTION_NAME" description:"subscription name"`
	SubscriptionType string `` /* 135-byte string literal not displayed */

	// internal use
	ServiceURL *url.URL
}

type PoolStatus

type PoolStatus struct {
	// contains filtered or unexported fields
}

func (*PoolStatus) String

func (p *PoolStatus) String() (s string)

type PrivateKey

type PrivateKey interface {
}

type Producer

type Producer struct {
	*PulsarClient
}

func NewProducer

func NewProducer(client *PulsarClient) (p *Producer)

func (*Producer) CloseProducer

func (p *Producer) CloseProducer(
	producerId, requestId uint64,
) (err error)

func (*Producer) CreateProducer

func (p *Producer) CreateProducer(
	topic string, producerId, requestId uint64,
) (err error)

func (*Producer) ReceiveProducerSuccess

func (p *Producer) ReceiveProducerSuccess() (
	success *pulsar_proto.CommandProducerSuccess, err error,
)

func (*Producer) ReceiveSendReceipt

func (p *Producer) ReceiveSendReceipt() (
	receipt *pulsar_proto.CommandSendReceipt, err error,
)

func (*Producer) SendBatchSend

func (p *Producer) SendBatchSend(
	producerId, sequenceId uint64,
	producerName string, batchMessage command.BatchMessage,
	compression *pulsar_proto.CompressionType,
) (err error)

func (*Producer) SendSend

func (p *Producer) SendSend(
	producerId, sequenceId uint64, producerName, payload string,
	keyValues KeyValues,
) (err error)

type PulsarClient

type PulsarClient struct {
	Conn
	// contains filtered or unexported fields
}

func NewClient

func NewClient(ac *AsyncConn) (client *PulsarClient)

func (*PulsarClient) Close

func (c *PulsarClient) Close()

func (*PulsarClient) ConnectToBroker

func (c *PulsarClient) ConnectToBroker(
	response *pulsar_proto.CommandLookupTopicResponse,
) (ac *AsyncConn, err error)

func (*PulsarClient) GetPartitionedTopicMetadata

func (c *PulsarClient) GetPartitionedTopicMetadata(
	topic string, requestId uint64,
) (err error)

func (*PulsarClient) KeepAlive

func (c *PulsarClient) KeepAlive() (err error)

func (*PulsarClient) LookupTopicWithConnect

func (c *PulsarClient) LookupTopicWithConnect(
	conn Conn, topic string, requestId uint64, authoritative bool,
) (ac *AsyncConn, err error)

func (*PulsarClient) ReceiveSuccess

func (c *PulsarClient) ReceiveSuccess() (
	success *pulsar_proto.CommandSuccess, err error,
)

func (*PulsarClient) SetLookupTopicConnection

func (c *PulsarClient) SetLookupTopicConnection(
	topic string, requestId uint64, authoritative bool,
) (err error)

Set c.conn to a broker received by lookup topic response

type Request

type Request struct {
	Message      proto.Message
	Meta         *pulsar_proto.MessageMetadata
	Payload      string
	BatchMessage command.BatchMessage
}

type Response

type Response struct {
	BaseCommand  *command.Base
	Meta         *pulsar_proto.MessageMetadata
	Payload      string
	BatchMessage command.BatchMessage
	Error        error
}

Directories

Path Synopsis
cli
internal
proto
pb
Package pulsar_proto is a generated protocol buffer package.
Package pulsar_proto is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL