kfile

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2021 License: BSD-3-Clause Imports: 7 Imported by: 0

README

kfile

Use kafka for distribute file storage, support write append and auto expire.

User story

In our team, we use PostgreSQL/MySQL as data storage, Kafka as message queue. It's difficult to share file between multi instances under these components.

For use, sharing file is a rare situation, only like export thoundes of records. We don't want to import one Distributed File System, so we use kafka to save temporary file.

Usage

Write:

name := "fileA"
writer, err := kfile.NewWriter(
    topic, name,
    client,
    kfile.WithNumPartitions(int32(numPartitions)),
    kfile.WithReplicationFactor(int16(replicationFactor)),
    kfile.WithTopicRetention(time.Hour*24*31))
if err != nil {
    fmt.Println(fmt.Errorf("new file: %w", err))
    return
}
for j := 0; j < 100; j++ {
    err := writer.Append(
        kfile.Line([]byte(fmt.Sprintf("Line %d of file %s\n", j, name))))
    if err != nil {
        fmt.Println(fmt.Errorf("write: %w", err))
        return
    }
}
uri, err := writer.Close(10 * time.Second)
if err != nil {
    fmt.Println(fmt.Errorf("close file: %w", err))
    return
}

fmt.Printf("write to %s\n", uri)

Read:

reader, err := kfile.NewReader(uri, client)
if err != nil {
    fmt.Println(fmt.Errorf("new reader: %w", err))
    return
}
buf, err := reader.ReadAll(0)
if err != nil {
    fmt.Println(fmt.Errorf("read: %w", err))
    return
}
fmt.Printf("read from %s\n", uri)
fmt.Println(buf.String())
if _, err := reader.Close(0); err != nil {
    fmt.Println(err)
    return
}

See more in example/

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNotFound = fmt.Errorf("kfile not found")

Functions

This section is empty.

Types

type File

type File struct {
	Name  []byte
	Topic string
	// contains filtered or unexported fields
}

func NewReader

func NewReader(addr string, client sarama.Client, opts ...Option) (*File, error)

func NewWriter

func NewWriter(topic, name string, client sarama.Client, opts ...Option) (*File, error)

NewWriter, new kafka producer for write msg. Notice: we maybe need to set sarama.MaxRequestSize to match kafka's max.request.size

func (*File) Append

func (f *File) Append(lines ...Line) error

func (*File) Close

func (f *File) Close(maxwait time.Duration) (string, error)

func (*File) ReadAll

func (f *File) ReadAll(maxwait time.Duration) (*bytes.Buffer, error)

func (*File) ReadLine

func (f *File) ReadLine() (chan Line, error)

func (*File) URI

func (f *File) URI() string

type Line

type Line []byte

type Option

type Option func(*File)

func WithNumPartitions

func WithNumPartitions(np int32) Option

WithNumpartitions Only valid if topic not exist and create by kfile.

func WithReplicationFactor

func WithReplicationFactor(rf int16) Option

WithReplicationFactor Only valid if topic not exist and create by kfile.

func WithTopicRetention

func WithTopicRetention(d time.Duration) Option

WithTopicRetention Only valid if topic not exist and create by kfile.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL