kafka

package
v0.0.0-...-711078c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2019 License: BSD-2-Clause Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Mount

func Mount(mountPoint string) error

func NewConsumer

func NewConsumer(cc ClusterConfig) (*cluster.Consumer, error)

func Topics

func Topics(brokers []string) ([]string, error)

Types

type ClusterConfig

type ClusterConfig struct {
	KafkaConfig
	Name string `json:"name"`
}

type ClusterPipe

type ClusterPipe struct {
	Brokers  []string
	Topic    string
	Cluster  string
	FileName string
	Consumer *cluster.Consumer
	Producer *Producer
}

func (*ClusterPipe) Attr

func (kp *ClusterPipe) Attr(ctx context.Context, a *fuse.Attr) error

func (*ClusterPipe) Create

func (kp *ClusterPipe) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (fs.Node, fs.Handle, error)

func (*ClusterPipe) Open

func (kp *ClusterPipe) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error)

func (*ClusterPipe) Read

func (kp *ClusterPipe) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error

This provides 3 reading reading strategies for the cluster. reader - This is just writing raw messages out messages - provides binary provides a single bit errors - stream of error messages

func (*ClusterPipe) Release

func (kp *ClusterPipe) Release(ctx context.Context, req *fuse.ReleaseRequest) error

func (*ClusterPipe) Write

func (kp *ClusterPipe) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error

type KDir

type KDir struct {
	KFS  *KFS
	Path string
}

func (*KDir) Attr

func (kd *KDir) Attr(ctx context.Context, a *fuse.Attr) error

func (*KDir) IsRoot

func (kd *KDir) IsRoot() bool

func (*KDir) Lookup

func (kd *KDir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (fs.Node, error)

func (*KDir) ReadDirAll

func (kd *KDir) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error)

type KFS

type KFS struct {
	Path    string
	Brokers []string
}

KFS ... FS root structure

func NewKFS

func NewKFS(path string, brokers []string) *KFS

NewKFS ... Create a new fs

func (*KFS) Root

func (kfs *KFS) Root() (fs.Node, error)

type KafkaConfig

type KafkaConfig struct {
	Brokers []string `json:"brokers"`
	Topics  []string `json:"topics"`
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(brokers []string, topic string) (*Producer, error)

func (*Producer) Close

func (p *Producer) Close() error

func (*Producer) KeySend

func (p *Producer) KeySend(key string, bits []byte)

func (*Producer) Send

func (p *Producer) Send(bits []byte)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL