sparki

package module
v0.0.0-...-0193d4f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 3, 2022 License: MPL-2.0 Imports: 13 Imported by: 0

README

Sparki

Sparki provides a client for sending logs to Loki from Zap and Golang.

Loki Logs

Sparki provides a core for use with Zap:

package main

import "github.com/uber-go/zap"
import "github.com/uber-go/zapcore"
import "github.com/aurowora/sparki"

func main() {
    // Create a PushClient
    pushClient := sparki.NewPushClient("https://loki.example.com/loki/api/v1/push")
    defer pushClient.Close() // Closing the push client will flush all logs
    
    // Open a stream
    streamClient := pushClient.NewStream([]sparki.Label{{Name: "source", Value: "sparki"}})
    // defer streamClient.Close() is redundant because pushClient.Close will close all streams
    
    // Create a core for use with Zap, create a zap logger, and attach the core
    sparkiCore := sparki.NewCore(streamClient)
    logger := zap.NewExample()
    defer logger.Sync()
    logger = logger.WithOptions(zap.WrapCore(func (core zapcore.Core) zapcore.Core {
        return zapcore.NewTee(core, sparkiCore)
    }))
    
    // Log some messages
    logger.Info("Hello, world! ⚡")
    logger.Error("Crashing and burning 🔥", zap.String("scream", "aaaah!"))
}

Sparki's PushClient and StreamClient can also be used directly, without Zap.

package main

import "time"
import "github.com/aurowora/sparki"

func main() {
    pushClient := sparki.NewPushClient("https://loki.example.com/loki/api/v1/push")
    defer pushClient.Close()
    
    streamClient := pushClient.NewStream([]sparki.Label{{Name: "source", Value: "sparki"}})
    streamClient.Log(time.Now(), "This will be sent to Loki.")
}

Configuring

Sparki provides configuration options for its various types.

ClientOption

One or more ClientOptions can be passed to sparki.NewPushClient to configure it. ClientOptions are returned by functions beginning with WithClient. The following options are available:

  • WithClientMaxPayloadLength(int) - Specifies the maximum number of logs to store in memory (per stream) before the logs must be sent to Loki. Defaults to 8192.
  • WithClientMaxPayloadAge(time.Duration) - Specifies the maximum amount of time that a log can remain in memory before it must be sent to Loki. Defaults to 5 minutes.
  • WithClientHeaders(http.Header) - Specifies any additional headers to be sent to Loki. Can be used to add headers for authentication.
  • WithClientTransport(http.RoundTripper) - Specifies the Transport that should be used for the HTTP client.
  • WithClientRetryLimit(int) - Specifies how many times PushClient is allowed to retry failures when sending logs. By default, Sparki may retry up to 7 times before the logs are lost for good.
  • WithClientPushWorkers(int) - Specifies how many go routines are created to send HTTP requests to Loki. By default, 6 workers are used.
Authentication

It is common for Loki to reside behind a reverse proxy that requires authentication of some sort.

An example of using WithClientHeaders to authenticate with a reverse proxy that uses a header-based auth scheme:

package main

import "net/http"
import "github.com/aurowora/sparki"

func main() {
    extraHeaders := make(http.Header, 1)
    extraHeaders.Set("Authorization", "Basic YWxhZGRpbjpvcGVuc2VzYW1l")
    
    pushClient := sparki.NewPushClient("https://loki.example.com/loki/api/v1/push", sparki.WithClientHeaders(extraHeaders))
    defer pushClient.Close()
    
    // other setup code...
}

An example of using WithClientTransport to authenticate with a reverse proxy that uses an mTLS-based auth scheme:

package main

import "crypto/tls"
import "net/http"
import "github.com/aurowora/sparki"

func main() {
    cert, err := tls.LoadX509KeyPair("./client.pem", "./client.key")
    if err != nil {
        panic(err)
    }
    
    transport := http.Transport{
        TLSClientConfig: &tls.Config{
            Certificates: []tls.Certificate{cert},
        },
    }
    
    pushClient := sparki.NewPushClient("https://loki.example.com/loki/api/v1/push", sparki.WithClientTransport(transport))
    defer pushClient.Close()
    
    // other setup code...
}
StreamOption

One or more StreamOptions can be passed to PushClient's NewStream method to configure the stream. StreamOptions are returned by functions beginning with WithStream. The following options are available:

  • WithStreamMaxPayloadLength(int) - Like WithClientMaxPayloadLength, but for this stream only. Defaults to the client's max payload length.
  • WithStreamMaxPayloadAge(time.Duration) - Like WithClientMaxPayloadAge, but for this stream only. Defaults the client's max payload age.
CoreOption

One or more CoreOptions can be passed to sparki.NewCore to configure it. CoreOptions are returned by functions beginning with WithCore. The following options are available:

  • WithCoreFieldFormatter(sparki.FieldFormatter) - Specifies a function used to format log messages and fields into the message string.
  • WithCoreLevel(zapcore.Level) - Specifies the log level that this core should use. Defaults to zapcore.InfoLevel
  • WithCoreCaller(bool) - Specifies whether to inject a caller field containing information about the function making the log call. Defaults to true.

Labels

Loki does poorly with high cardinality logs (having many labels with many potential values) and works best with a small set of static labels. High label cardinality results in many small streams being stored, which can bloat Loki's index size and slow down queries. As such, our API favors the use of static labels. Please read the Loki best practices for more information.

One effect of this is that the implementation of sparki.Core appends any fields to the log line itself using the function that is provided in WithCoreFieldFormatter (or the default implementation if none provided). For similar reasons, instead of using labels for the log level, we opt to add a field called level for each message. This means that when querying your logs, you'd need to use a query like the following query to filter for a specific log level:

{app="myApp"} |= `level=error`

Zap provides the following levels: debug, info, warn, error, dpanic, panic, and fatal. Any other field can be queried in a similar manner.

Currently, neither the Zap core nor the PushClient itself support the use of dynamic labels, though this may change if I ever find a use for them for my own projects.

Tests

Tests cover most things in the package other than the Zap core, which I've been testing by hand until I have time to write proper tests for it. Tests can be ran using go test.

License

sparki Copyright (C) 2022 Aurora McGinnis

This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at https://mozilla.org/MPL/2.0/.

The full terms of the Mozilla Public License 2.0 can also be found in the LICENSE.txt file within this repository.

Documentation

Overview

Package sparki provides a client for sending logs to Loki with or without the use of the Zap logging framework.

For basic information on using this package, see README.md

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ClientOption

type ClientOption func(*opts)

ClientOption is used to configure a PushClient in NewPushClient See the With* functions for the available options.

func WithClientHeaders

func WithClientHeaders(h http.Header) ClientOption

WithClientHeaders allows you to specify HTTP headers that should be added to any requests to Loki. This is potentially useful for Authorization purposes.

func WithClientMaxPayloadAge

func WithClientMaxPayloadAge(d time.Duration) ClientOption

WithClientMaxPayloadAge specifies the maximum amount of t that may pass before a given stream must be committed to Loki. Note that logs may be sent to loki more frequently (requested sync or maxPayloadLength).

func WithClientMaxPayloadLength

func WithClientMaxPayloadLength(logLines int) ClientOption

WithClientMaxPayloadLength specifies the number of log lines that can be added to a stream before it must be committed to Loki. Note that logs may be sent to loki before this threshold is hit (requested sync or maxPayloadAge).

func WithClientPushWorkers

func WithClientPushWorkers(numWorkers int) ClientOption

WithClientPushWorkers sets the number of go routines created to send HTTP requests to Loki. The default is 6 workers.

func WithClientRetryLimit

func WithClientRetryLimit(limit int) ClientOption

WithClientRetryLimit controls how many times logs that fail will be retried. Backoff period after a failure is exponential. PushClient gives up on logs after the limit is passed. Logs are lost at this point.

func WithClientTransport

func WithClientTransport(t http.RoundTripper) ClientOption

WithClientTransport allows you to specify a custom HTTP transport that should be used when communicating with Loki.

type Core

type Core struct {
	zapcore.LevelEnabler
	// contains filtered or unexported fields
}

func NewCore

func NewCore(sc *StreamClient, opts ...CoreOption) *Core

NewCore creates a zapcore that logs to a given StreamClient. Use any number of CoreOption (from WithCore*) to configure the Core.

func (*Core) Check

func (s *Core) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry

func (*Core) Sync

func (s *Core) Sync() error

func (*Core) With

func (s *Core) With(fields []zapcore.Field) zapcore.Core

func (*Core) Write

func (s *Core) Write(entry zapcore.Entry, fields []zapcore.Field) error

type CoreOption

type CoreOption func(*coreOpts)

CoreOption can be passed to NewCore to configure the zap core

func WithCoreCaller

func WithCoreCaller(includeCaller bool) CoreOption

WithCoreCaller specifies whether to add caller information to the log message as a field if zap provides the information

func WithCoreFieldFormatter

func WithCoreFieldFormatter(formatter FieldFormatter) CoreOption

WithCoreFieldFormatter allows a function that is responsible for combining the log message and the fields into a single string to be specified. The default field formatter separates the message and the field list with "::" and lists the fields and their values, separated by spaces. E.g. My message :: level="error" field1="a" field2="b"

func WithCoreLevel

func WithCoreLevel(level zapcore.Level) CoreOption

WithCoreLevel specifies the log level. The default is INFO or higher.

type FieldFormatter

type FieldFormatter func(message string, fields map[string]interface{}, stack string) string

FieldFormatter takes a log message and its corresponding fields and returns a string with the fields and message combined somehow.

type Label

type Label struct {
	Name  string
	Value string
}

Label associates a label name with a value. One or more labels uniquely identify a stream of messages in Loki. For best practices regarding labels in Loki, see: https://grafana.com/docs/loki/latest/best-practices/

type PushClient

type PushClient struct {
	// contains filtered or unexported fields
}

PushClient is used for sending log lines to Loki. Use NewPushClient to create a new PushClient. Before the application terminates, call Close to ensure all logs are sent. PushClient is safe for concurrent use.

func NewPushClient

func NewPushClient(endpoint string, opts ...ClientOption) *PushClient

NewPushClient creates a new PushClient to send logs to endpoint with the specified Options The endpoint should be the Loki api route /loki/api/v1/push

func (*PushClient) Close

func (l *PushClient) Close() error

Close will shut down all open streams, sending their logs to Loki in the process. This will block until all streams have terminated cleanly. The PushClient cannot be used again.

func (*PushClient) NewStream

func (l *PushClient) NewStream(labels []Label, opts ...StreamOption) *StreamClient

NewStream creates a new stream with a given set of labels and returns a new StreamClient to write to the stream.

type StreamClient

type StreamClient struct {
	// contains filtered or unexported fields
}

StreamClient can write to a stream of a PushClient. Create one using the PushClient.NewStream method.

func (*StreamClient) Close

func (sc *StreamClient) Close() error

Close instructs the stream to close and then flush itself. This does not block, so messages will be flushed in the background.

func (*StreamClient) Log

func (sc *StreamClient) Log(t time.Time, message string) error

Log writes a message and its timestamp to the stream. Logs are buffered, so it may be some time (depending on the configuration) before it appears in Loki. This will block only if the underlying channel's buffer is full.

func (*StreamClient) Sync

func (sc *StreamClient) Sync() error

Sync instructs PushClient to flush the stream to Loki. This will block until the logs have either been sent to loki or the push request has failed (in which case, the chunk will be retried in the background)

func (*StreamClient) SyncNoWait

func (sc *StreamClient) SyncNoWait() error

SyncNoWait is similar to Sync, except that it will not block at all.

type StreamOption

type StreamOption func(o *streamOpts)

StreamOption can be passed to NewStream to configure an individual stream.

func WithStreamMaxPayloadAge

func WithStreamMaxPayloadAge(age time.Duration) StreamOption

WithStreamMaxPayloadAge is like WithClientMaxPayloadAge, except that it only applies to a single stream.

func WithStreamMaxPayloadLength

func WithStreamMaxPayloadLength(limit int) StreamOption

WithStreamMaxPayloadLength is like WithClientMaxPayloadLength, except that it only applies to a single stream

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL