zapappender

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 26, 2022 License: MIT Imports: 10 Imported by: 0

README

zapappender Build Status Coverage Status

Composable appender for uber-go/zap enabling:

  • Async logging
  • Fallback
  • Message Enveloping (like syslog formatting)

This project was created to allow logging to syslog over TCP.

Quick start

Firstly, compose the appender chain:

primaryOut := zapappender.NewWriter(zapcore.Lock(someTcpWriter))
consoleWriter := zapappender.NewWriter(zapcore.Lock(os.Stdout))
secondaryOut := zapappender.NewEnvelopingPreSuffix(consoleWriter, "FALLBACK: ", "")
fallback := zapappender.NewFallback(primaryOut, secondaryOut)
async, _ := zapappender.NewAsync(fallback,
    zapappender.AsyncOnQueueNearlyFullForwardTo(secondaryOut),
    zapappender.AsyncMaxQueueLength(10),
    zapappender.AsyncQueueMinFreePercent(0.2),
    zapappender.AsyncQueueMonitorPeriod(time.Millisecond),
)
appenderChain := async

Secondly, use that chain to create a zapcore.Core and finally to construct a zap.Logger.

encoder := zapcore.NewConsoleEncoder(encoderConfig)
core := zapappender.NewAppenderCore(encoder, appenderChain, zapcore.DebugLevel)
logger := zap.New(core)

logger.Info("this logs async")

See example_test.go for more details.

Documentation

Overview

Example (Core)
package main

import (
	"os"

	"github.com/delixfe/zapappender"

	"github.com/delixfe/zapappender/chaos"
	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
)

var encoderConfig = zapcore.EncoderConfig{
	MessageKey:       "msg",
	LevelKey:         "level",
	NameKey:          "logger",
	EncodeLevel:      zapcore.LowercaseLevelEncoder,
	EncodeTime:       zapcore.ISO8601TimeEncoder,
	EncodeDuration:   zapcore.StringDurationEncoder,
	ConsoleSeparator: " ** ",
}

func main() {

	writer := zapappender.NewWriter(zapcore.Lock(os.Stdout))

	failing := chaos.NewFailingSwitchable(writer)

	// this could be a TcpWriter
	var primaryOut zapappender.Appender = failing

	// this would normally be os.Stdout or Stderr without further wrapping
	secondaryOut := zapappender.NewEnvelopingPreSuffix(writer, "FALLBACK: ", "")

	fallback := zapappender.NewFallback(primaryOut, secondaryOut)

	core := zapappender.NewAppenderCore(zapcore.NewConsoleEncoder(encoderConfig), fallback, zapcore.DebugLevel)
	logger := zap.New(core)

	logger.Info("zappig")

	failing.Break()

	logger.Info("on the fallback")

}
Output:

info ** zappig
FALLBACK: info ** on the fallback

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrAppenderShutdown = errors.New("appender shut down")

Functions

func Synchronized

func Synchronized(s interface{}) bool

Types

type Appender

type Appender interface {

	// Write
	// must not retain p
	Write(p []byte, ent zapcore.Entry) (n int, err error)

	// Sync flushes buffered logs (if any).
	Sync() error
}

Appender is the interface for composable appenders.

The Write method receives the zapcore.Entry in addition to the text buffer. This allows appenders access to the fields like Time.

Several variants of the interface were analyzed. 1. Write with p, ent, fields 2. Write with p, ent 3. Write with p and a subset of ent A. Append with enc, ent, fields

Decision: variant 2 - thus variant 3 would also be an option. - we cannot keep the fields in an async process

  • they might hold references that might be already mutated or hinder GC

- without fields, we cannot use the Encoder to encode the message

func NewSynchronizing

func NewSynchronizing(inner Appender) Appender

type AppenderCore

type AppenderCore struct {
	zapcore.LevelEnabler
	// contains filtered or unexported fields
}

AppenderCore bridges between zapcore and zapappender.

func NewAppenderCore

func NewAppenderCore(enc zapcore.Encoder, appender Appender, enab zapcore.LevelEnabler) *AppenderCore

func (*AppenderCore) Check

func (*AppenderCore) Sync

func (c *AppenderCore) Sync() error

func (*AppenderCore) With

func (c *AppenderCore) With(fields []zapcore.Field) zapcore.Core

func (*AppenderCore) Write

func (c *AppenderCore) Write(ent zapcore.Entry, fields []zapcore.Field) error

type Async

type Async struct {
	// contains filtered or unexported fields
}

Async enables asynchronous logging so that the application is not affected by logging back pressure or errors.

The queuing is implemented by a buffered channel. A monitoring go routine watches that channel. If the queue nears its capacity, the oldest log entries are discarded or sent to a fallback.

Example
package main

import (
	"context"
	"os"
	"time"

	"github.com/delixfe/zapappender"

	"github.com/delixfe/zapappender/chaos"
	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
)

var encoderConfig = zapcore.EncoderConfig{
	MessageKey:       "msg",
	LevelKey:         "level",
	NameKey:          "logger",
	EncodeLevel:      zapcore.LowercaseLevelEncoder,
	EncodeTime:       zapcore.ISO8601TimeEncoder,
	EncodeDuration:   zapcore.StringDurationEncoder,
	ConsoleSeparator: " ** ",
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
	defer cancel()
	writer := zapappender.NewWriter(zapcore.Lock(os.Stdout))

	failing := chaos.NewFailingSwitchable(writer)
	blocking := chaos.NewBlockingSwitchableCtx(ctx, failing)

	// this could be a TcpWriter
	var primaryOut zapappender.Appender = zapappender.NewEnvelopingPreSuffix(blocking, "PRIMARY:  ", "")

	// this would normally be os.Stdout or Stderr without further wrapping
	secondaryOut := zapappender.NewEnvelopingPreSuffix(writer, "FALLBACK: ", "")

	fallback := zapappender.NewFallback(primaryOut, secondaryOut)
	async, _ := zapappender.NewAsync(fallback,
		zapappender.AsyncOnQueueNearlyFullForwardTo(zapappender.NewEnvelopingPreSuffix(writer, "QFALLBACK: ", "")),
		zapappender.AsyncMaxQueueLength(10),
		zapappender.AsyncQueueMinFreePercent(0.2),
		zapappender.AsyncQueueMonitorPeriod(time.Millisecond),
	)

	core := zapappender.NewAppenderCore(zapcore.NewConsoleEncoder(encoderConfig), async, zapcore.DebugLevel)
	logger := zap.New(core)

	logger.Info("this logs async")

	time.Sleep(time.Millisecond * 10)

	blocking.Break()

	logger.Info("primary blocks while trying to send this", zap.Int("i", 1))
	for i := 2; i <= 15; i++ {
		logger.Info("while broken", zap.Int("i", i))
	}

	blocking.Fix()
	time.Sleep(time.Millisecond * 10)
	async.Drain(ctx)

}
Output:

PRIMARY:  info ** this logs async
QFALLBACK: info ** while broken ** {"i": 2}
QFALLBACK: info ** while broken ** {"i": 3}
QFALLBACK: info ** while broken ** {"i": 4}
QFALLBACK: info ** while broken ** {"i": 5}
PRIMARY:  info ** primary blocks while trying to send this ** {"i": 1}
PRIMARY:  info ** while broken ** {"i": 6}
PRIMARY:  info ** while broken ** {"i": 7}
PRIMARY:  info ** while broken ** {"i": 8}
PRIMARY:  info ** while broken ** {"i": 9}
PRIMARY:  info ** while broken ** {"i": 10}
PRIMARY:  info ** while broken ** {"i": 11}
PRIMARY:  info ** while broken ** {"i": 12}
PRIMARY:  info ** while broken ** {"i": 13}
PRIMARY:  info ** while broken ** {"i": 14}
PRIMARY:  info ** while broken ** {"i": 15}

func NewAsync

func NewAsync(primary Appender, options ...AsyncOption) (a *Async, err error)

func (*Async) Drain

func (a *Async) Drain(ctx context.Context)

Drain tries to gracefully drain the remaining buffered messages, blocking until the buffer is empty or the provided context is cancelled.

func (*Async) Shutdown

func (a *Async) Shutdown(ctx context.Context)

func (*Async) Sync

func (a *Async) Sync() error

func (*Async) Synchronized

func (a *Async) Synchronized() bool

func (*Async) Write

func (a *Async) Write(p []byte, ent zapcore.Entry) (n int, err error)

the return value n does not work in an async context

type AsyncOption

type AsyncOption interface {
	// contains filtered or unexported methods
}

func AsyncMaxQueueLength

func AsyncMaxQueueLength(length int) AsyncOption

func AsyncOnQueueNearlyFullDropMessages

func AsyncOnQueueNearlyFullDropMessages() AsyncOption

func AsyncOnQueueNearlyFullForwardTo

func AsyncOnQueueNearlyFullForwardTo(fallback Appender) AsyncOption

AsyncOnQueueNearlyFullForwardTo fallback is wrapped in a Synchronizing appender

func AsyncQueueMinFreeItems

func AsyncQueueMinFreeItems(minFree int) AsyncOption

func AsyncQueueMinFreePercent

func AsyncQueueMinFreePercent(minFreePercent float32) AsyncOption

func AsyncQueueMonitorPeriod

func AsyncQueueMonitorPeriod(period time.Duration) AsyncOption

func AsyncSyncTimeout

func AsyncSyncTimeout(timeout time.Duration) AsyncOption

type Delegating

type Delegating struct {
	WriteFn           func(p []byte, ent zapcore.Entry) (n int, err error)
	SyncFn            func() error
	SynchronizedValue bool
}

Delegating delegates Write and Sync to functions

func NewDelegating

func NewDelegating(writeFn func(p []byte, ent zapcore.Entry) (n int, err error), syncFn func() error, synchronized bool) *Delegating

func (*Delegating) Sync

func (a *Delegating) Sync() error

func (*Delegating) Synchronized

func (a *Delegating) Synchronized() bool

func (*Delegating) Write

func (a *Delegating) Write(p []byte, ent zapcore.Entry) (int, error)

type Discard

type Discard struct {
}

Discard silently drops all messages

func NewDiscard

func NewDiscard() *Discard

func (*Discard) Sync

func (a *Discard) Sync() error

func (*Discard) Synchronized

func (a *Discard) Synchronized() bool

func (*Discard) Write

func (a *Discard) Write(p []byte, _ zapcore.Entry) (int, error)

type Enveloping

type Enveloping struct {
	// contains filtered or unexported fields
}

Enveloping allows to adapt the log message. This can be used to format the message output. That is especially usefull when a format should only be applied to a primary appender but not a fallback one.

func NewEnveloping

func NewEnveloping(inner Appender, envFn EnvelopingFn) *Enveloping

func NewEnvelopingPreSuffix

func NewEnvelopingPreSuffix(inner Appender, prefix, suffix string) *Enveloping

func (*Enveloping) Sync

func (a *Enveloping) Sync() error

func (*Enveloping) Synchronized

func (a *Enveloping) Synchronized() bool

func (*Enveloping) Write

func (a *Enveloping) Write(p []byte, ent zapcore.Entry) (n int, err error)

type EnvelopingFn

type EnvelopingFn func(p []byte, ent zapcore.Entry, output *buffer.Buffer) error

EnvelopingFn Function to create the enveloped output. p contains the original content the enveloped content must be written into output entry by ref or pointer? -> benchmarks show that using a pointer creates one alloc (for the pointer)

but passing by value does not

type Fallback

type Fallback struct {
	// contains filtered or unexported fields
}

Fallback forwards the message to secondary, if writing to primary returned an error. secondary is wrapped in a Synchronizing appender.

func NewFallback

func NewFallback(primary, secondary Appender) *Fallback

func (*Fallback) Sync

func (a *Fallback) Sync() error

func (*Fallback) Synchronized

func (a *Fallback) Synchronized() bool

func (*Fallback) Write

func (a *Fallback) Write(p []byte, ent zapcore.Entry) (n int, err error)

type SynchronizationAware

type SynchronizationAware interface {
	Synchronized() bool
}

type SynchronizationAwareAppender

type SynchronizationAwareAppender interface {
	Appender
	SynchronizationAware
}

type Synchronizing

type Synchronizing struct {
	// contains filtered or unexported fields
}

func (*Synchronizing) Sync

func (s *Synchronizing) Sync() error

func (*Synchronizing) Synchronized

func (s *Synchronizing) Synchronized() bool

func (*Synchronizing) Write

func (s *Synchronizing) Write(p []byte, ent zapcore.Entry) (n int, err error)

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer outputs the message to a zapcore.WriteSyncer

func NewWriter

func NewWriter(out zapcore.WriteSyncer) *Writer

func (*Writer) Sync

func (a *Writer) Sync() error

func (*Writer) Synchronized

func (a *Writer) Synchronized() bool

func (*Writer) Write

func (a *Writer) Write(p []byte, _ zapcore.Entry) (n int, err error)

Directories

Path Synopsis
bufferpool
Package bufferpool houses zap's shared internal buffer pool.
Package bufferpool houses zap's shared internal buffer pool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL