avroipc

package module
v0.0.0-...-5e83f8a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2023 License: MIT Imports: 6 Imported by: 0

README

Build Status Coverage Status

avroipc

Avroipc is a pure-go-implemented client for flume's avro source.

I wrote avroipc to learn avro rpc protocol, and it's not production ready!, use it as you own risk.

Thanks to Linkedin's goavro!

Usage

package main

import (
    "log"

    "github.com/myzhan/avroipc"
)

func main() {
    // Create a new client with default parameters
    client, err := avroipc.NewClient("localhost:20200")
    if err != nil {
        log.Fatal(err)
    }
    
    event := &avroipc.Event{
        Body: []byte("hello from go"),
    	Headers: map[string]string {
            "topic": "myzhan",
            "timestamp": "1508740315478",
        },
    }
    status, err := client.Append(event)
    if err != nil {
        log.Fatal(err)
    }
    if status != "OK" {
        log.Fatalf("Bad status: %s", status)
    }
}

To specify particular parameters of the client it is possible to use the config builder:

package main
import (
    "log"
    "time"

    "github.com/myzhan/avroipc"
)

func main() {
    config := avroipc.NewConfig()
    config.WithTimeout(3*time.Second)

    client, err := avroipc.NewClientWithConfig("localhost:20200", config)
    if err != nil {
        log.Fatal(err)
    }
    
    // Use the client as before
    _ = client
}

Development

Clone the repository and do the following sequence of command:

go get
go test ./...

To run a test with a real client run the following command:

FLUME_SERVER_ADDRESS=127.0.0.1:20201 go test -count=1 -run TestSend

where 127.0.0.1:20201 is a real Apache Flume server, -count=1 is a way to disable Go build cache.

If you want to run a test with a real client and enabled data compression run the following command:

FLUME_SERVER_ADDRESS=127.0.0.1:20201 FLUME_COMPRESSION_LEVEL=1 go test -count=1 -run TestSend

where FLUME_COMPRESSION_LEVEL is a new environment variable to specify wanted compression level. Support values from 1 to 9.

License

Open source licensed under the MIT license (see LICENSE file for details).

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	Close() error
	SendMessage(method string, datum interface{}) (string, error)
}

An avro client implementation

func NewClientWithConfig

func NewClientWithConfig(addr string, proto protocols.MessageProtocol, config *Config) (Client, error)

NewClient creates an avro client with considering values of options from the passed configuration object and connects to the specified remote Flume endpoint immediately.

This constructor supposed to be used in production environments.

type Config

type Config struct {
	// Connection timeout for the built-in socket transport that limit time of
	// connection to a Flume server.
	//
	// It is sane to always set this timeout before creating a new client
	// instance because it will protect against hanging clients.
	//
	// Defaults to zero which means disabled connection timeout.
	Timeout time.Duration

	// Used to set read and write deadline of the built-in transports
	// (actually, affects only the socket transport). It sets both deadlines
	// together at the same time and there is no way to set them separately.
	// These deadlines are used to limit execution time of reading and writing
	// operations.
	//
	// This timeout is supposed to be always set to any appropriate for
	// a particular situation value except maybe test and example
	// configurations.
	//
	// Defaults to zero which means disabled read/write timeouts.
	SendTimeout time.Duration

	// A buffer size of the built-in buffered transport.
	//
	// Defaults to zero which means that the buffered transport won't be used.
	BufferSize int
	// A compression level of the built-in zlib transport.
	//
	// Defaults to zero which means that the compression will be disabled.
	CompressionLevel int

	// Use TLS Config
	//
	// Defaults to false
	TLSConfig *tls.Config
}

Config provides a configuration for the client. Use the NewConfig method to create an instance of the Config and set all necessary parameters of the configuration.

func NewConfig

func NewConfig() *Config

NewConfig returns a pointer to a new Config instance that is used to configure the client at a creation time. Invoking methods of the config instance may be chained with each other to specify all necessary config options in a single command. A NewConfig call may be also chained with other methods to inline config creations.

config := NewConfig()
config.WithTimeout(3*time.Second)
client, err := NewClientWithConfig(config)

or just

client, err := NewClientWithConfig(NewConfig().WithTimeout(3*time.Second))

func (*Config) WithBufferSize

func (c *Config) WithBufferSize(s int) *Config

Sets size of the internal buffer of the buffered transport.

func (*Config) WithCompressionLevel

func (c *Config) WithCompressionLevel(l int) *Config

Sets the compression level of the zlib transport.

func (*Config) WithSendTimeout

func (c *Config) WithSendTimeout(t time.Duration) *Config

Sets the read/write timeouts together.

func (*Config) WithTLSConfig

func (c *Config) WithTLSConfig(cfg *tls.Config) *Config

func (*Config) WithTimeout

func (c *Config) WithTimeout(t time.Duration) *Config

Sets the connection timeout.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL