pulsar

package module
v1.2.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2024 License: MIT Imports: 13 Imported by: 0

README

pulsar

Apache Pulsar 是 Apache 软件基金会的顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。

Pulsar 的关键特性如下:

  • 是下一代云原生分布式消息流平台。
  • Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。
  • 极低地发布延迟和端到端延迟。
  • 可无缝扩展到超过一百万个 topic。
  • 简单的客户端 API,支持 Java、Go、Python 和 C++。
  • 主题的多种订阅模式(独占、共享和故障转移)。
  • 通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。
  • 由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
  • 基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。
  • 分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如S3、GCS)中。

核心概念

Messages(消息)
Producers(生产者)
Consumers(消费者)
Topics(主题)
Subscriptions(订阅模式)
Message retention and expiry(消息保留和过期)
Message deduplication(消息去重)

Docker部署开发环境

docker pull apachepulsar/pulsar:latest

docker run -itd \
    -p 6650:6650 \
    -p 8080:8080 \
    --name pulsar-standalone \
    apachepulsar/pulsar:latest bin/pulsar standalone
docker pull apachepulsar/pulsar-manager:latest

docker run -itd \
    -p 9527:9527 \
    -p 7750:7750 \
    --name pulsar-manager \
    -e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \
    apachepulsar/pulsar-manager:latest

设置管理员账号密码:

CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
curl \
   -H 'X-XSRF-TOKEN: $CSRF_TOKEN' \
   -H 'Cookie: XSRF-TOKEN=$CSRF_TOKEN;' \
   -H "Content-Type: application/json" \
   -X PUT http://localhost:7750/pulsar-manager/users/superuser \
   -d '{"name": "admin", "password": "apachepulsar", "description": "test", 
        "email": "username@test.org"}'

管理后台 http://localhost:9527
登陆账号:admin,密码:apachepulsar

在后台创建一个环境,其中,服务地址为pulsar的地址:

参考资料

Documentation

Index

Constants

View Source
const (
	KindPulsar transport.Kind = "pulsar"
)

Variables

This section is empty.

Functions

func LogDebug

func LogDebug(args ...interface{})

func LogDebugf

func LogDebugf(format string, args ...interface{})

func LogError

func LogError(args ...interface{})

func LogErrorf

func LogErrorf(format string, args ...interface{})

func LogFatal

func LogFatal(args ...interface{})

func LogFatalf

func LogFatalf(format string, args ...interface{})

func LogInfo

func LogInfo(args ...interface{})

func LogInfof

func LogInfof(format string, args ...interface{})

func LogWarn

func LogWarn(args ...interface{})

func LogWarnf

func LogWarnf(format string, args ...interface{})

func RegisterSubscriber

func RegisterSubscriber[T any](srv *Server, ctx context.Context, topic string, handler func(context.Context, string, broker.Headers, *T) error, opts ...broker.SubscribeOption) error

Types

type Server

type Server struct {
	broker.Broker

	sync.RWMutex
	// contains filtered or unexported fields
}

func NewServer

func NewServer(opts ...ServerOption) *Server

func (*Server) Endpoint

func (s *Server) Endpoint() (*url.URL, error)

func (*Server) Name

func (s *Server) Name() string

func (*Server) RegisterSubscriber

func (s *Server) RegisterSubscriber(ctx context.Context, topic string, handler broker.Handler, binder broker.Binder, opts ...broker.SubscribeOption) error

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

func (*Server) Stop

func (s *Server) Stop(_ context.Context) error

type ServerOption

type ServerOption func(o *Server)

func WithAddress

func WithAddress(addrs []string) ServerOption

func WithBrokerOptions

func WithBrokerOptions(opts ...broker.Option) ServerOption

WithBrokerOptions MQ代理配置

func WithCodec

func WithCodec(c string) ServerOption

func WithEnableKeepAlive

func WithEnableKeepAlive(enable bool) ServerOption

WithEnableKeepAlive enable keep alive

func WithGlobalPropagator

func WithGlobalPropagator() ServerOption

func WithGlobalTracerProvider

func WithGlobalTracerProvider() ServerOption

func WithPropagator

func WithPropagator(propagators propagation.TextMapPropagator) ServerOption

func WithTLSConfig

func WithTLSConfig(c *tls.Config) ServerOption

func WithTracerProvider

func WithTracerProvider(provider trace.TracerProvider, tracerName string) ServerOption

type SubscribeOption

type SubscribeOption struct {
	// contains filtered or unexported fields
}

type SubscribeOptionMap

type SubscribeOptionMap map[string]*SubscribeOption

type SubscriberMap

type SubscriberMap map[string]broker.Subscriber

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport is a Pulsar transport.

func (*Transport) Endpoint

func (tr *Transport) Endpoint() string

Endpoint returns the transport endpoint.

func (*Transport) Kind

func (tr *Transport) Kind() transport.Kind

Kind returns the transport kind.

func (*Transport) NodeFilters

func (tr *Transport) NodeFilters() []selector.NodeFilter

NodeFilters returns the client select filters.

func (*Transport) Operation

func (tr *Transport) Operation() string

Operation returns the transport operation.

func (*Transport) ReplyHeader

func (tr *Transport) ReplyHeader() transport.Header

ReplyHeader returns the reply header.

func (*Transport) RequestHeader

func (tr *Transport) RequestHeader() transport.Header

RequestHeader returns the request header.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL