fluent

package module
v0.0.0-...-9ee7cf3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2018 License: MIT Imports: 12 Imported by: 0

README

go-fluent-client

A fluentd client

Build Status

GoDoc

WARNING

This repository has been moved to github.com/lestrrat-go/fluent-client. This repository exists so that libraries pointing to this URL will keep functioning, but this repository will NOT be updated in the future. Please use the new import path.

SYNOPSIS

package fluent_test

import (
  "context"
  "log"
  "time"

  fluent "github.com/lestrrat/go-fluent-client"
)

func Example() {
  // Connects to fluentd at 127.0.0.1:24224. If you want to connect to
  // a different host, use the following:
  //
  //   client, err := fluent.New(fluent.WithAddress("fluent.example.com"))
  //
  client, err := fluent.New()
  if err != nil {
    // fluent.New may return an error if invalid values were
    // passed to the constructor
    log.Printf("failed to create client: %s", err)
    return
  }

  // do not forget to shutdown this client at the end. otherwise
  // we would not know if we were able to flush the pending
  // buffer or not.
  defer func() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    if err := client.Shutdown(ctx); err != nil {
      // Failed to shutdown properly. force-close it
      client.Close()
    }
  }()

  var payload = map[string]string{
    "foo": "bar",
  }
  if err := client.Post("tag", payload); err != nil {
    log.Printf("failed to post: %s", err)
    return
  }
}

func ExamplePing() {
  client, err := fluent.New()
  if err != nil {
    log.Printf("failed to create client: %s", err)
    return
  }

  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()

  // Goroutine to wait for errors
  errorCh := make(chan error, 1)
  go func() {
    // This is just an example to stop pinging on errors
    for {
      select {
      case <-ctx.Done():
        return
      case e := <-errorCh:
        log.Printf("got an error during ping: %s", e.Error())
        cancel()
        return
      }
    }
  }()

  go fluent.Ping(ctx, client, "ping", "hostname", fluent.WithPingResultChan(errorCh))

  // Do what you need with your main program...
}

DESCRIPTION

This is a client to the fluentd log collection daemon.

FEATURES

Performance

Please see the BENCHMARK section.

A well defined Shutdown() method

Because we expect to connect to remote daemons over the wire, the various fluentd clients all perform local buffering of data to be sent, then sends them when it can. At the end of your program, you should wait for your logs to be sent to the server, otherwise you might have pending writes that haven't gone through yet.

Calling either Close() or Shutdown() triggers the flushing of pending logs, but the former does not wait for this operation to be completed, while the latter does. With Shutdown you can either wait indefinitely, or timeout the operation after the desired period of time using context.Context

A flexible Post() method

The Post() method provided by this module can either simply enqueue a new payload to be appended to the buffer mentioned in the previous section, and let it process asynchronously, or it can wait for confirmation that the payload has been properly enqueued. Other libraries usually only do one or the other, but we can handle either.

// "fire-and-forget"
client.Post(tagName, payload)

// make sure that we receive confirmation the payload has been appended
if err := client.Post(tagName, payload, fluent.WithSyncAppend(true)); err != nil {
  ...
}

Buffered/Unbuffered clients

By default, we create a "buffered" client. This means that we enqueue the data to be sent to the fluentd process locally until we can actually connect and send them. However, since this decouples the user from the actual timing when the message is sent to the server, it may not be a suitable solution in cases where immediate action must be taken in case a message could not be sent.

If you must detect if a message has been successfully sent to the fluentd server immediately, use the Unbuffered client by passing the fluent.WithBuffered option to the constructor:

client, err := fluent.New(
  fluent.WithAddress("fluent.example.com"),
  fluent.WithBuffered(false), // Use the unbuffered client
)

The behavior will change as described above, but the interface is still the same.

OPTIONS (fluent.New)

Name Short Description Default Value Bufferd Unbuffered
fluent.WithBuffered(bool) Use buffered/unbuffered client true - -
fluent.WithNetwork(string) Network type of address "tcp" Y Y
fluent.WithAddress(string) Address to connect to "127.0.0.1:24224" Y Y
fluent.WithJSONMarshaler() Use JSON as serialization format - Y Y
fluent.WithMsgpackMarshaler() Use msgpack as serialization format used by default Y Y
fluent.WithTagPrefix(string) Tag prefix to prepend - Y Y
fluent.WithDialTimeout(time.Duration) Timeout value when connecting 3 * time.Second Y Y
fluent.WithConnectOnStart(bool) Attempt to connect immediately false Y Y
fluent.WithSubsecond(bool) Use EventTime false Y Y
fluent.WithBufferLimit(int) Max buffer size to store 8 * 1024 * 1024 Y N
fluent.WithWriteThreshold(int) Min buffer size before writes start 8 * 1024 Y N
fluent.WithMaxConnAttempts(int) Max attempts to make during close (buffered), or max attempts to make when connecting to the server (unbuffered) 64 Y Y
fluent.WithWriteQueueSize(int) Channel size for background reader 64 Y N

OPTIONS ((fluent.Client).Post)

Name Short Description Default Value Bufferd Unbuffered
fluent.WithTimestamp(time.Time) Timestamp to use for message current time Y Y
fluent.WithContext(context.Context) Context to use none Y N
fluent.WithSyncAppend(bool) Return failure if appending fails false Y N

OPTIONS (fluent.Ping)

Name Short Description Default Value
fluent.WithPingInterval(time.Duration) Interval between pings 5 * time.Second
fluent.WithPingResultChan(chan error) Where to send ping errors none

BENCHMARKS

instructions: make sure you have the required fluentd clients, start fluentd at 127.0.0.1:24224, and run

go test -run=none -bench=. -benchmem -tags bench
BenchmarkK0kubun-4              	  500000	      3220 ns/op	     968 B/op	      12 allocs/op
BenchmarkLestrrat-4             	  500000	      4122 ns/op	     530 B/op	       7 allocs/op
BenchmarkLestrratJSON-4         	  300000	      6639 ns/op	     741 B/op	      11 allocs/op
BenchmarkLestrratUnbuffered-4   	  100000	     11719 ns/op	     512 B/op	       7 allocs/op
BenchmarkOfficial-4             	  100000	     10443 ns/op	     896 B/op	       9 allocs/op
BenchmarkOfficialJSON-4         	  100000	     17796 ns/op	    1760 B/op	      25 allocs/op
PASS
ok  	github.com/lestrrat/go-fluent-client	10.287s

Versions

Name Version
fluentd (td-agent) 0.12.19
github.com/lestrrat/go-fluent-client f3fb05a2b7eb40bb426dd8e1ba43e9ff47b412ec
github.com/k0kubun/fluent-logger-go e1cfc57bb12c99d7207d43b942527c9450d14382
github.com/fluent/fluent-logger-golang 8bbc2356beaf021b04c9bd5cdc76ea5a7ccb40ec

Analysis

github.com/lestrrat/go-fluent-client
Pros
  • Lowest allocations per op
  • Proper Shutdown method to flush buffers at the end.
  • Tried very hard to avoid any race conditions.
  • Has buffered/unbuffered clients

While github.com/k0kubun/fluent-logger-go is fastest, it does not check errors and does not handle some synchronization edge cases. github.com/lestrrat/go-fluent-client goes into great pains to check these things, and still manages to come almost as fast as github.com/k0kubun/fluent-logger-go, whihch is already several times faster than the official library.

The buffered client is the default, but you may use the unbuffered client, which does not keep the payload in an internal buffer before sending to the server. This is slightly more inefficient, but has the advantage of allowing you to handle errors more gracefully.

Cons
  • I'm biased (duh).
github.com/k0kubun/fluent-logger-go
Pros

This library is fast. I believe this is due to the fact that it does very little error handling and synchronization. If you use the msgpack serialization format and that's it, you probably will be fine using this library.

Cons

Do note that as of the version I tested above, the Logger.Log method has a glaring race condition that will probably corrupt your messages sooner than you can blink: DO NOT USE THE Logger.Log method.

Also, there is no way for the caller to check for serialization errors when using Logger.Post. You can get the error status using Logger.Log, but as previously stated, you do not want to use it.

Finally, there is no way to flush pending buffers: If you append a lot of buffers in succession, and abruptly quit your program, you're done for. You lose all your data.

Oh, and it supports Msgpack only, but this is a very minor issue: a casual user really shouldn't have to care which serialization format you're sending your format with.

github.com/fluent/fluent-logger-golang
Pros

This official binding from the maitainers of fluentd is by far the most battle-tested library. It may be a bit slow, but it's sturdy, period.

Cons

The benchmark scores are pretty low. This could just be my benchmark, so please take with a grain of salt.

Looking at the code, it looks non-gopher-ish. Use of panic is one such item. In Go you should avoid casual panics, which causes long-running daemons to write code like this https://github.com/moby/moby/blob/1325f667eeb42b717c2f9d369f2ee6d701a280e3/daemon/logger/fluentd/fluentd.go#L46-L49

Documentation

Overview

Package fluent implements a client for the fluentd data logging daemon.

Example
package main

import (
	"context"
	"log"
	"time"

	fluent "github.com/lestrrat/go-fluent-client"
)

func main() {
	// Connects to fluentd at 127.0.0.1:24224. If you want to connect to
	// a different host, use the following:
	//
	//   client, err := fluent.New(fluent.WithAddress("fluent.example.com"))
	//
	client, err := fluent.New()
	if err != nil {
		// fluent.New may return an error if invalid values were
		// passed to the constructor
		log.Printf("failed to create client: %s", err)
		return
	}

	// do not forget to shutdown this client at the end. otherwise
	// we would not know if we were able to flush the pending
	// buffer or not.
	defer func() {
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()

		if err := client.Shutdown(ctx); err != nil {
			log.Printf("Failed to shutdown properly. force-close it")
			client.Close()
		}
		log.Printf("shutdown complete")
	}()

	var payload = map[string]string{
		"foo": "bar",
	}
	log.Printf("Posting message")
	if err := client.Post("debug.test", payload); err != nil {
		log.Printf("failed to post: %s", err)
		return
	}

}
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsBufferFull

func IsBufferFull(e error) bool

IsBufferFull returns true if the error is a BufferFull error

func Ping

func Ping(ctx context.Context, client Client, tag string, record interface{}, options ...Option)

Ping is a helper method that allows you to call client.Ping in a periodic manner.

By default a ping message will be sent every 5 minutes. You may change this using the WithPingInterval option.

If you need to capture ping failures, pass it a channel using WithPingResultChan

Example
package main

import (
	"context"
	"log"

	fluent "github.com/lestrrat/go-fluent-client"
)

func main() {
	client, err := fluent.New()
	if err != nil {
		log.Printf("failed to create client: %s", err)
		return
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Goroutine to wait for errors
	errorCh := make(chan error, 1)
	go func() {
		// This is just an example to stop pinging on errors
		for {
			select {
			case <-ctx.Done():
				return
			case e := <-errorCh:
				log.Printf("got an error during ping: %s", e.Error())
				cancel()
				return
			}
		}
	}()

	go fluent.Ping(ctx, client, "ping", "hostname", fluent.WithPingResultChan(errorCh))

	// Do what you need with your main program...
}
Output:

Types

type Buffered

type Buffered struct {
	// contains filtered or unexported fields
}

Buffered is a Client that buffers incoming messages, and sends them asynchrnously when it can.

func NewBuffered

func NewBuffered(options ...Option) (client *Buffered, err error)

NewBuffered creates a new Buffered client. Options may be one of the following:

  • fluent.WithAddress
  • fluent.WithBufferLimit
  • fluent.WithDialTimeout
  • fluent.WithJSONMarshaler
  • fluent.WithMaxConnAttempts
  • fluent.WithMsgpackMarshaler
  • fluent.WithNetwork
  • fluent.WithTagPrefix
  • fluent.WithWriteThreshold
  • fluent.WithWriteQueueSize

Please see their respective documentation for details.

func (*Buffered) Close

func (c *Buffered) Close() error

Close closes the connection, but does not wait for the pending buffers to be flushed. If you want to make sure that background minion has properly exited, you should probably use the Shutdown() method

func (*Buffered) Ping

func (c *Buffered) Ping(tag string, record interface{}, options ...Option) (err error)

Ping synchronously sends a ping message. This ping bypasses the underlying buffer of pending messages, and establishes a connection to the server entirely for this ping message.

func (*Buffered) Post

func (c *Buffered) Post(tag string, v interface{}, options ...Option) (err error)

Post posts the given structure after encoding it along with the given tag.

An error is returned if the client has already been closed.

If you would like to specify options to `Post()`, you may pass them at the end of the method. Currently you can use the following:

fluent.WithContext: specify context.Context to use
fluent.WithTimestamp: allows you to set arbitrary timestamp values
fluent.WithSyncAppend: allows you to verify if the append was successful

If fluent.WithSyncAppend is provide and is true, the following errors may be returned:

  1. If the current underlying pending buffer is is not large enough to hold this new data, an error will be returned
  2. If the marshaling into msgpack/json failed, it is returned

func (*Buffered) Shutdown

func (c *Buffered) Shutdown(ctx context.Context) error

Shutdown closes the connection, and notifies the background worker to flush all existing buffers. This method will block until the background minion exits, or the provided context object is canceled.

type Client

type Client interface {
	Post(string, interface{}, ...Option) error
	Ping(string, interface{}, ...Option) error
	Close() error
	Shutdown(context.Context) error
}

Client represents a fluentd client. The client receives data as we go, and proxies it to a background minion. The background minion attempts to write to the server as soon as possible

func New

func New(options ...Option) (Client, error)

New creates a new client. By default a buffered client is created. The `WithBufered` option switches which type of client is created. `WithBuffered(true)` (default) creates a buffered client, and `WithBuffered(false)` creates a unbuffered client. All options are delegates to `NewBuffered` and `NewUnbuffered` respectively.

type EventTime

type EventTime struct {
	time.Time
}

EventTime is used to represent the time in a msgpack Message

func (*EventTime) DecodeMsgpack

func (t *EventTime) DecodeMsgpack(d *msgpack.Decoder) error

DecodeMsgpack decodes from a msgpack stream and materializes a EventTime object

func (EventTime) EncodeMsgpack

func (t EventTime) EncodeMsgpack(e *msgpack.Encoder) error

EncodeMsgpack encodes the EventTime into msgpack format

type Message

type Message struct {
	Tag    string      `msgpack:"tag"`
	Time   EventTime   `msgpack:"time"`
	Record interface{} `msgpack:"record"`
	Option interface{} `msgpack:"option"`
	// contains filtered or unexported fields
}

Message is a fluentd's payload, which can be encoded in JSON or MessagePack format.

func (*Message) DecodeMsgpack

func (m *Message) DecodeMsgpack(d *msgpack.Decoder) error

DecodeMsgpack deserializes from a msgpack buffer and populates a Message struct appropriately

func (*Message) EncodeMsgpack

func (m *Message) EncodeMsgpack(e *msgpack.Encoder) error

EncodeMsgpack serializes a Message to msgpack format

func (*Message) MarshalJSON

func (m *Message) MarshalJSON() ([]byte, error)

MarshalJSON serializes a Message to JSON format

func (*Message) UnmarshalJSON

func (m *Message) UnmarshalJSON(buf []byte) error

UnmarshalJSON deserializes from a JSON buffer and populates a Message struct appropriately

type Option

type Option interface {
	Name() string
	Value() interface{}
}

Option is an interface used for providing options to the various methods

func WithAddress

func WithAddress(s string) Option

WithAddress specifies the address to connect to for `fluent.New` A unix domain socket path, or a hostname/IP address.

func WithBufferLimit

func WithBufferLimit(v interface{}) Option

WithBufferLimit specifies the buffer limit to be used for the underlying pending buffer. If a `Client.Post` operation would exceed this size, an error is returned (note: you must use `WithSyncAppend` in `Client.Post` if you want this error to be reported). The defalut value is 8MB

func WithBuffered

func WithBuffered(b bool) Option

WithBuffered specifies if we should create a buffered or unbuffered client

func WithConnectOnStart

func WithConnectOnStart(b bool) Option

WithConnectOnStart is specified when you would like a buffered client to make sure that it can connect to the specified fluentd server on startup.

func WithContext

func WithContext(ctx context.Context) Option

WithContext specifies the context.Context object to be used by Post(). Possible blocking operations are (1) writing to the background buffer, and (2) waiting for a reply from when WithSyncAppend(true) is in use.

func WithDialTimeout

func WithDialTimeout(d time.Duration) Option

WithDialTimeout specifies the amount of time allowed for the client to establish connection with the server. If we are forced to wait for a duration that exceeds the specified timeout, we deem the connection to have failed. The default value is 3 seconds

func WithJSONMarshaler

func WithJSONMarshaler() Option

WithJSONMarshaler specifies JSON marshaling to be used when sending messages to fluentd. Used for `fluent.New`

func WithMaxConnAttempts

func WithMaxConnAttempts(n uint64) Option

WithMaxConnAttempts specifies the maximum number of attempts made by the client to connect to the fluentd server during final data flushing for buffered clients. For unbuffered clients, this controls the number of attempts made when calling `Post`.

For buffered clients: During normal operation, the client will indefinitely attempt to connect to the server (whilst being backed-off properly), as it should try as hard as possible to send the stored data.

This option controls the behavior when the client still has more data to send AFTER it has been told to Close() or Shutdown(). In this case we know the client wants to stop at some point, so we try to connect up to a finite number of attempts.

The default value is 64 for both buffered and unbuffered clients.

func WithMsgpackMarshaler

func WithMsgpackMarshaler() Option

WithMsgpackMarshaler specifies msgpack marshaling to be used when sending messages to fluentd. Used in `fluent.New`

func WithNetwork

func WithNetwork(s string) Option

WithNetwork specifies the network type, i.e. "tcp" or "unix" for `fluent.New`

func WithPingInterval

func WithPingInterval(t time.Duration) Option

WithPingInterval is used in the fluent.Ping method to specify the time between pings. The default value is 5 minutes

func WithPingResultChan

func WithPingResultChan(ch chan error) Option

WithPingResultChan specifies the channel where you will receive ping failures

func WithSubsecond

func WithSubsecond(b bool) Option

WithSubsecond specifies if we should use EventTime for timestamps on fluentd messages. May be used on a per-client basis or per-call to Post(). By default this feature is turned OFF.

Note that this option will only work for fluentd v0.14 or above.

func WithSyncAppend

func WithSyncAppend(b bool) Option

WithSyncAppend specifies if we should synchronously check for success when appending to the underlying pending buffer. Used in `Client.Post`. If not specified, errors appending are not reported.

func WithTagPrefix

func WithTagPrefix(s string) Option

WithTagPrefix specifies the prefix to be appended to tag names when sending messages to fluend. Used in `fluent.New`

func WithTimestamp

func WithTimestamp(t time.Time) Option

WithTimestamp specifies the timestamp to be used for `Client.Post`

func WithWriteQueueSize

func WithWriteQueueSize(n int) Option

WithWriteQueueSize specifies the channel buffer size for the queue used to pass messages from the Client to the background writer goroutines. The default value is 64.

func WithWriteThreshold

func WithWriteThreshold(i int) Option

WithWriteThreshold specifies the minimum number of bytes that we should have pending before starting to attempt to write to the server. The default value is 8KB

type Unbuffered

type Unbuffered struct {
	// contains filtered or unexported fields
}

Unbuffered is a Client that synchronously sends messages.

func NewUnbuffered

func NewUnbuffered(options ...Option) (client *Unbuffered, err error)

NewUnbuffered creates an unbuffered client. Unlike the normal buffered client, an unbuffered client handles the Post() method synchronously, and does not attempt to buffer the payload.

  • fluent.WithAddress
  • fluent.WithDialTimeout
  • fluent.WithMarshaler
  • fluent.WithMaxConnAttempts
  • fluent.WithNetwork
  • fluent.WithSubSecond
  • fluent.WithTagPrefix

Please see their respective documentation for details.

func (*Unbuffered) Close

func (c *Unbuffered) Close() error

Close cloes the currenct cached connection, if any

func (*Unbuffered) Ping

func (c *Unbuffered) Ping(tag string, v interface{}, options ...Option) (err error)

Ping sends a ping message. A ping for an unbuffered client is completely analogous to sending a message with Post

func (*Unbuffered) Post

func (c *Unbuffered) Post(tag string, v interface{}, options ...Option) (err error)

Post posts the given structure after encoding it along with the given tag.

If you would like to specify options to `Post()`, you may pass them at the end of the method. Currently you can use the following:

fluent.WithTimestamp: allows you to set arbitrary timestamp values

func (*Unbuffered) Shutdown

func (c *Unbuffered) Shutdown(_ context.Context) error

Shutdown is an alias to Close(). Since an unbuffered Client does not have any pending buffers at any given moment, we do not have to do anything other than close

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL