heartfelt

package module
v0.11.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2022 License: MIT Imports: 8 Imported by: 0

README

heartfelt

License Go Go Report Card Go Reference

A high performance heartbeats watcher.

Algorithm

1. Fixed timeout heartbeats watcher (LRU by queue)

Algorithm

2. Dynamic timeout heartbeats watcher (LRU by priority queue)

Algorithm

Usage

Core Api

// HeartHub is the api entrance of this package.
type HeartHub interface

// DynamicTimeoutHeartHub is a heartHub with dynamic timeout supported.
type DynamicTimeoutHeartHub interface

// NewFixedTimeoutHeartHub will make a fixed timeout heartHub.
func NewFixedTimeoutHeartHub(timeout time.Duration, options ...heartHubOption) HeartHub

// NewDynamicTimeoutHeartHub will make a dynamic timeout heartHub.
func NewDynamicTimeoutHeartHub(options ...heartHubOption) DynamicTimeoutHeartHub
Example 1: Fixed timeout heartbeats watcher
package main

import (
	"context"
	"log"
	"strconv"
	"time"

	"github.com/bunnier/heartfelt"
)

func main() {
	// FixedTimeoutHeartHub is a heartbeats watcher of fixed timeout service.
	heartHub := heartfelt.NewFixedTimeoutHeartHub(
		time.Second, // Timeout duration is 1s.
		heartfelt.WithDegreeOfParallelismOption(2),
	)
	eventCh := heartHub.GetEventChannel() // Events will be sent to this channel later.

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) // For exit this example.
	defer cancel()

	// startFakeServices will start 10000 fake services, each service make heartbeat in 500ms regularly.
	// But these services: index in {67, 120, 100, 3456, 4000, 5221, 7899, 9999} will stop work after {its_id} ms.
	// Fortunately, heartHub will catch them all ^_^
	go startFakeServices(ctx, heartHub, 10000, []int{67, 120, 100, 3456, 4000, 5221, 7899, 9999})

	for {
		select {
		case event := <-eventCh:
			// The special service checking will be stop after timeout or heartHub.Remove(key) be called manually.
			log.Default().Printf("received an event: heartKey=%s eventName=%s, timeout duration=%d, timeoutTime=%d, eventTime=%d, offset=%dms",
				event.HeartKey, event.EventName, event.Timeout/time.Millisecond, event.TimeoutTime.UnixMilli(), event.EventTime.UnixMilli(), event.EventTime.Sub(event.TimeoutTime)/time.Millisecond)
		case <-ctx.Done():
			heartHub.Close()
			return
		}
	}
}

// startFakeServices will start fake services.
// Each service will make heartbeat in 500ms regularly.
func startFakeServices(ctx context.Context, heartHub heartfelt.HeartHub, serviceNum int, stuckIds []int) {
	// These ids will stuck later.
	stuckIdsMap := make(map[int]struct{})
	for _, v := range stuckIds {
		stuckIdsMap[v] = struct{}{}
	}

	for i := 1; i <= serviceNum; i++ {
		ctx := ctx
		if _, ok := stuckIdsMap[i]; ok {
			ctx, _ = context.WithTimeout(ctx, time.Duration(i)*time.Millisecond)
		}

		// Each goroutine below represents a service.
		key := strconv.Itoa(i)
		go func() {
			for {
				select {
				case <-ctx.Done():
					return
				default:
					// Send heartbeat..
					heartHub.DisposableHeartbeat(key, nil)
					time.Sleep(500 * time.Millisecond)
				}
			}
		}()
	}
}

Output

2022/03/13 14:35:19 received an event: heartKey=67 eventName=TIME_OUT, timeout duration=1000, timeoutTime=1647153319675, eventTime=1647153319675, offset=0ms
2022/03/13 14:35:19 received an event: heartKey=100 eventName=TIME_OUT, timeout duration=1000, timeoutTime=1647153319675, eventTime=1647153319675, offset=0ms
2022/03/13 14:35:19 received an event: heartKey=120 eventName=TIME_OUT, timeout duration=1000, timeoutTime=1647153319675, eventTime=1647153319675, offset=0ms
2022/03/13 14:35:22 received an event: heartKey=3456 eventName=TIME_OUT, timeout duration=1000, timeoutTime=1647153322684, eventTime=1647153322684, offset=0ms
2022/03/13 14:35:23 received an event: heartKey=4000 eventName=TIME_OUT, timeout duration=1000, timeoutTime=1647153323184, eventTime=1647153323184, offset=0ms
2022/03/13 14:35:24 received an event: heartKey=5221 eventName=TIME_OUT, timeout duration=1000, timeoutTime=1647153324686, eventTime=1647153324687, offset=0ms
2022/03/13 14:35:27 received an event: heartKey=7899 eventName=TIME_OUT, timeout duration=1000, timeoutTime=1647153327196, eventTime=1647153327196, offset=0ms
2022/03/13 14:35:29 received an event: heartKey=9999 eventName=TIME_OUT, timeout duration=1000, timeoutTime=1647153329198, eventTime=1647153329198, offset=0ms
Example 2: Dynamic timeout heartbeats watcher
package main

import (
	"context"
	"log"
	"math/rand"
	"reflect"
	"sort"
	"strconv"
	"time"

	"github.com/bunnier/heartfelt"
)

func main() {
	// DynamicTimeoutHeartHub is a heartbeats watcher of dynamic timeout service.
	heartHub := heartfelt.NewDynamicTimeoutHeartHub(
		heartfelt.WithDegreeOfParallelismOption(1),
	)
	eventCh := heartHub.GetEventChannel() // Events will be sent to this channel later.

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
	defer cancel()

	serviceNum := 20 // The number of fake services.

	// Make random timeout list.
	rand := rand.New(rand.NewSource(time.Now().Unix()))
	var timeoutList []int
	for i := 0; i < serviceNum; i++ {
		timeout := rand.Int()%400 + 100
		timeoutList = append(timeoutList, timeout)
	}

	// Start fake services.
	go func() {
		for index, timeout := range timeoutList {
			key := strconv.Itoa(index)
			timeout := timeout
			go func() {
				heartHub.DisposableHeartbeatWithTimeout(key, time.Duration(timeout)*time.Millisecond, nil)
			}()
		}
	}()

	receivedTimeoutList := make([]int, 0, len(timeoutList))
OVER:
	for {
		select {
		case event := <-eventCh:
			log.Default().Printf("received an event: eventName=%s, timeout duration=%d, timeoutTime=%d, eventTime=%d, offset=%dms",
				event.EventName, event.Timeout/time.Millisecond, event.TimeoutTime.UnixMilli(), event.EventTime.UnixMilli(), event.EventTime.Sub(event.TimeoutTime)/time.Millisecond)
			receivedTimeoutList = append(receivedTimeoutList, int(event.Timeout/time.Millisecond)) // Record receive timeout order.
		case <-ctx.Done():
			heartHub.Close()
			break OVER
		}
	}

	sort.IntSlice(timeoutList).Sort() // Received order must equal with sort result.
	if reflect.DeepEqual(timeoutList, receivedTimeoutList) {
		log.Default().Println("Exactly equal!")
	}
}

Output

2022/03/13 14:32:31 received an event: eventName=TIME_OUT, timeout duration=122, timeoutTime=1647153151748, eventTime=1647153151748, offset=0ms
2022/03/13 14:32:31 received an event: eventName=TIME_OUT, timeout duration=148, timeoutTime=1647153151774, eventTime=1647153151775, offset=0ms
2022/03/13 14:32:31 received an event: eventName=TIME_OUT, timeout duration=198, timeoutTime=1647153151824, eventTime=1647153151825, offset=0ms
2022/03/13 14:32:31 received an event: eventName=TIME_OUT, timeout duration=219, timeoutTime=1647153151845, eventTime=1647153151845, offset=0ms
2022/03/13 14:32:31 received an event: eventName=TIME_OUT, timeout duration=226, timeoutTime=1647153151852, eventTime=1647153151852, offset=0ms
2022/03/13 14:32:31 received an event: eventName=TIME_OUT, timeout duration=247, timeoutTime=1647153151873, eventTime=1647153151873, offset=0ms
2022/03/13 14:32:31 received an event: eventName=TIME_OUT, timeout duration=253, timeoutTime=1647153151879, eventTime=1647153151879, offset=0ms
2022/03/13 14:32:31 received an event: eventName=TIME_OUT, timeout duration=261, timeoutTime=1647153151887, eventTime=1647153151888, offset=1ms
2022/03/13 14:32:31 received an event: eventName=TIME_OUT, timeout duration=270, timeoutTime=1647153151896, eventTime=1647153151896, offset=0ms
2022/03/13 14:32:31 received an event: eventName=TIME_OUT, timeout duration=288, timeoutTime=1647153151914, eventTime=1647153151914, offset=0ms
2022/03/13 14:32:31 received an event: eventName=TIME_OUT, timeout duration=290, timeoutTime=1647153151916, eventTime=1647153151916, offset=0ms
2022/03/13 14:32:31 received an event: eventName=TIME_OUT, timeout duration=293, timeoutTime=1647153151919, eventTime=1647153151919, offset=0ms
2022/03/13 14:32:31 received an event: eventName=TIME_OUT, timeout duration=297, timeoutTime=1647153151923, eventTime=1647153151923, offset=0ms
2022/03/13 14:32:31 received an event: eventName=TIME_OUT, timeout duration=299, timeoutTime=1647153151925, eventTime=1647153151925, offset=0ms
2022/03/13 14:32:31 received an event: eventName=TIME_OUT, timeout duration=359, timeoutTime=1647153151985, eventTime=1647153151985, offset=0ms
2022/03/13 14:32:31 received an event: eventName=TIME_OUT, timeout duration=369, timeoutTime=1647153151995, eventTime=1647153151995, offset=0ms
2022/03/13 14:32:32 received an event: eventName=TIME_OUT, timeout duration=417, timeoutTime=1647153152043, eventTime=1647153152043, offset=0ms
2022/03/13 14:32:32 received an event: eventName=TIME_OUT, timeout duration=443, timeoutTime=1647153152069, eventTime=1647153152070, offset=1ms
2022/03/13 14:32:32 received an event: eventName=TIME_OUT, timeout duration=444, timeoutTime=1647153152070, eventTime=1647153152070, offset=0ms
2022/03/13 14:32:32 received an event: eventName=TIME_OUT, timeout duration=472, timeoutTime=1647153152098, eventTime=1647153152098, offset=0ms
2022/03/13 14:32:32 Exactly equal!

Documentation

Index

Constants

View Source
const (
	// EventTimeout event will trigger when a heart meet timeout.
	EventTimeout = "TIME_OUT"

	// EventHeartBeat event will be triggered when a heart receives a heartbeat.
	EventHeartBeat = "HEART_BEAT"

	// EventRemoveKey event will be triggered when a heartbeat key be removed.
	EventRemoveKey = "REMOVE_KEY"
)

Variables

View Source
var ErrDynamicNotSupported error = errors.New("heartbeat: this HeartHub has been closed")

ErrDynamicNotSupported will be return from heartbeatWithTimeout method when the HeartHub do not support dynamic timeout.

View Source
var ErrHubClosed error = errors.New("heartbeat: this HeartHub has been closed")

ErrHubClosed will be return from heartbeat method when the HeartHub be closed.

View Source
var ErrNoDefaultTimeout error = errors.New("heartbeat: this HeartHub has been closed")

ErrNoDefaultTimeout will be return from heartbeat method when the default timeout do not be set.

Functions

func WithDefaultTimeoutOption added in v0.9.1

func WithDefaultTimeoutOption(timeout time.Duration) heartHubOption

WithDefaultTimeoutOption can set timeout to the hearthub.

func WithDegreeOfParallelismOption

func WithDegreeOfParallelismOption(degree int) heartHubOption

WithDegreeOfParallelismOption can control degree of parallelism.

func WithEventBufferSizeOption

func WithEventBufferSizeOption(bufferSize int) heartHubOption

WithEventBufferSizeOption can set event buffer size.

func WithHeartbeatBufferSizeOption

func WithHeartbeatBufferSizeOption(bufferSize int) heartHubOption

WithHeartbeatBufferSizeOption can set heartbeat buffer size.

func WithLoggerOption

func WithLoggerOption(logger Logger) heartHubOption

WithLoggerOption can set logger to the hearthub.

func WithSubscribeEventNamesOption

func WithSubscribeEventNamesOption(eventNames ...string) heartHubOption

WithSubscribeEventNamesOption can set watch events to hearthub.

func WithVerboseInfoOption

func WithVerboseInfoOption() heartHubOption

WithVerboseInfoOption can set log level.

Types

type DynamicTimeoutHeartHub added in v0.9.9

type DynamicTimeoutHeartHub interface {
	HeartHub

	// Heartbeat will beat the heart of specified key.
	// This method will auto re-watch the key from heartHub after timeout.
	//   @key: The unique key of target service.
	//   @timeout: The timeout duration after this heartbeat.
	//   @extra: It will be carried back by event data.
	HeartbeatWithTimeout(key string, timeout time.Duration, extra interface{}) error

	// Heartbeat will beat the heart of specified key.
	// This method will auto remove the key from heartHub after timeout.
	//   @key: The unique key of target service.
	//   @timeout: The timeout duration after this heartbeat.
	//   @extra: It will be carried back by event data.
	DisposableHeartbeatWithTimeout(key string, timeout time.Duration, extra interface{}) error
}

DynamicTimeoutHeartHub is a dynamic timeout heartHub.

func NewDynamicTimeoutHeartHub added in v0.9.9

func NewDynamicTimeoutHeartHub(options ...heartHubOption) DynamicTimeoutHeartHub

NewDynamicTimeoutHeartHub will make a dynamic timeout heartHub.

type Event

type Event struct {
	EventName   string        `json:"event_name"`
	HeartKey    string        `json:"heart_key"`
	JoinTime    time.Time     `json:"join_time"`  // JoinTime is register time of the key.
	EventTime   time.Time     `json:"event_time"` // Event trigger time.
	Timeout     time.Duration `json:"timeout"`    // Timeout duration.
	TimeoutTime time.Time     `json:"beat_time"`
	Disposable  bool          `json:"disposable"`
	Extra       interface{}   // Extra is the extra data you pass by calling heartbeat method.
}

Event just means an event, you can use GetEventChannel method to receive subscribed events.

type HeartHub

type HeartHub interface {
	// GetEventChannel return a channel for receiving subscribed events.
	GetEventChannel() <-chan *Event

	// Heartbeat will beat the heart of specified key.
	// This method will auto re-watch the key from heartHub after timeout.
	//   @key: The unique key of target service.
	//   @extra: It will be carried back by event data.
	Heartbeat(key string, extra interface{}) error

	// Heartbeat will beat the heart of specified key.
	// This method will auto remove the key from heartHub after timeout.
	//   @key: The unique key of target service.
	//   @extra: It will be carried back by event data.
	DisposableHeartbeat(key string, extra interface{}) error

	// Remove will stop watching the service of key from the heartHub.
	//   @key: The unique key of target service.
	Remove(key string) error

	// Close will stop watch all service keys and release all goroutines.
	Close()
}

HeartHub is the api entrance of this package.

func NewFixedTimeoutHeartHub added in v0.9.1

func NewFixedTimeoutHeartHub(timeout time.Duration, options ...heartHubOption) HeartHub

NewFixedTimeoutHeartHub will make a fixed timeout heartHub.

type Logger

type Logger interface {
	Info(v ...interface{})
	Error(v ...interface{})
}

Logger is a logger.

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL