inlinekeepalive

package
v0.11.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2023 License: MPL-2.0 Imports: 16 Imported by: 0

README

inlinekeepalive

inlinekeepalive is a package that sends "keepalive" messages over existing grpc streams. See doc.go for full package description

Diagrams

These diagrams demonstrate the intended usage of inline keepalives within Waypoint.

ClientStream
sequenceDiagram
autonumber

actor Waypoint Runner
participant Client
participant ClientInterceptor
participant ALB
participant ServerInterceptor
participant RunnerConfig Server Handler
participant Server


Waypoint Runner ->> +Client: RunnerConfig

%% Unnecessary detail
%% Client ->> ServerInterceptor: Open connection for RunnerConfig
%% ServerInterceptor -x ClientInterceptor: Not a ServerStream - DOES NOT send keepalives

Client ->> ClientInterceptor: Create the GRPC client handler

ClientInterceptor ->> +Server: GetVersionInfo

Server -->> -ClientInterceptor: Has feature inlinekeepalives

loop Async send inline keepalives for duration of RPC
    activate ClientInterceptor
    ClientInterceptor -) +ServerInterceptor: SendMsg(InlineKeepalive)

    %% Critical
    ServerInterceptor -x RunnerConfig Server Handler: Recognizes InlineKeepalive. DOES NOT forward.
   
    deactivate ServerInterceptor
    deactivate ClientInterceptor
end

Waypoint Runner ->> Client: Send(event)
Client ->> ClientInterceptor: Send(event) to interceptor
ClientInterceptor ->> ServerInterceptor: Passthrough event to server

%% Critical
ServerInterceptor ->> RunnerConfig Server Handler: Not a keepalive, passthrough

deactivate Client

ServerStream
sequenceDiagram
autonumber

actor User
participant Client
participant ClientInterceptor
participant ALB
participant ServerInterceptor
participant GetLogStream Server Handler
actor CEB


User ->> +Client: GetLogStream

Client ->> +ServerInterceptor: Open connection for RunnerConfig, including inline-keepalive GRPC metadata

loop Async send inline keepalives for duration of RPC
    ServerInterceptor -) +ClientInterceptor: SendMsg(InlineKeepalive)
    
    %% Critical
    ClientInterceptor -x Client: Recognizes InlineKeepalive. DOES NOT forward.
    
    deactivate ClientInterceptor
end

Client ->> ClientInterceptor: Create the GRPC client handler

CEB ->> GetLogStream Server Handler: Send(Log)
GetLogStream Server Handler ->> ServerInterceptor: Send(Log)
ServerInterceptor ->> ClientInterceptor: Send(event) to interceptor

%% Critical
ClientInterceptor ->> Client: Not a keepalive, passthrough

Client ->> User: 

deactivate Client

Documentation

Index

Constants

View Source
const (
	KeepaliveProtoSignature     = "inline_keepalive"
	GrpcMetaSendKeepalivesKey   = "wp-inline-keepalives"
	GrpcMetaSendKeepalivesValue = "true"
)

Variables

This section is empty.

Functions

func IsInlineKeepalive

func IsInlineKeepalive(log hclog.Logger, m protoreflect.ProtoMessage) bool

IsInlineKeepalive determines if a given proto message is an inline keepalive.

func KeepaliveClientStreamInterceptor

func KeepaliveClientStreamInterceptor(sendInterval time.Duration) grpc.StreamClientInterceptor

KeepaliveClientStreamInterceptor returns a stream interceptor that sends inline keepalive messages on client streams (if the server is compatible), and intercepts inline keepalives from the server. This is intended to be invoked once at the beginning of an RPC, may call the server's GetVersionInfo RPC, and if the server is compatible and this is a ClientStream, will spawn a goroutine that runs for the duration of the stream to send inline keepalives. Will send a keepalive every sendInterval

func KeepaliveServerStreamInterceptor

func KeepaliveServerStreamInterceptor(sendInterval time.Duration) grpc.StreamServerInterceptor

KeepaliveServerStreamInterceptor returns a stream interceptor that sends inline keepalive messages on server streams (if the client is compatible), and intercepts inline keepalives from the client. This is intended to be invoked once at the beginning of an RPC. If the client is compatible and this is a ServerStream, will spawn a goroutine that runs for the duration of the stream to send inline keepalives. Will send a keepalive every sendInterval.

func ServeKeepalives

func ServeKeepalives(
	ctx context.Context,
	log hclog.Logger,
	stream GrpcStream,
	sendInterval time.Duration,
	sendMx *sync.Mutex,
)

ServeKeepalives sends keepalive messages along the provided grpc stream at the rate specified by sendInterval. It returns when the context is cancelled. NOTE: this will call SendMsg, and concurrent calls to SendMsg are unsafe. This will not call SendMsg unless it holds the sendMx lock.

func TestClientStream

func TestClientStream(t testing.T, recvMessages []proto.Message) grpc.ClientStream

TestClientStream returns a grpc.ClientStream that plays the given messages when Recv is called

func TestServerStream

func TestServerStream(t testing.T, recvMessages []proto.Message) grpc.ServerStream

TestServerStream returns a grpc.ServerStream that plays the given messages when Recv is called

Types

type GrpcStream

type GrpcStream interface {
	SendMsg(m interface{}) error
}

GrpcStream can be either a grpc.ClientStream or a grpc.ServerStream

type KeepaliveClientStream

type KeepaliveClientStream struct {
	// contains filtered or unexported fields
}

KeepaliveClientStream implements grpc.ClientStream

func (*KeepaliveClientStream) CloseSend

func (k *KeepaliveClientStream) CloseSend() error

func (*KeepaliveClientStream) Context

func (k *KeepaliveClientStream) Context() context.Context

func (*KeepaliveClientStream) Header

func (k *KeepaliveClientStream) Header() (metadata.MD, error)

func (*KeepaliveClientStream) RecvMsg

func (k *KeepaliveClientStream) RecvMsg(m interface{}) error

RecvMsg intercepts keepalive messages and does not pass them along to the handler.

func (*KeepaliveClientStream) SendMsg

func (k *KeepaliveClientStream) SendMsg(m interface{}) error

func (*KeepaliveClientStream) Trailer

func (k *KeepaliveClientStream) Trailer() metadata.MD

type KeepaliveServerStream

type KeepaliveServerStream struct {
	// contains filtered or unexported fields
}

KeepaliveClientStream implements grpc.ServerStream

func (*KeepaliveServerStream) Context

func (k *KeepaliveServerStream) Context() context.Context

func (*KeepaliveServerStream) RecvMsg

func (k *KeepaliveServerStream) RecvMsg(m interface{}) error

RecvMsg intercepts keepalive messages and does not pass them along to the handler.

func (*KeepaliveServerStream) SendHeader

func (k *KeepaliveServerStream) SendHeader(md metadata.MD) error

func (*KeepaliveServerStream) SendMsg

func (k *KeepaliveServerStream) SendMsg(m interface{}) error

func (*KeepaliveServerStream) SetHeader

func (k *KeepaliveServerStream) SetHeader(md metadata.MD) error

func (*KeepaliveServerStream) SetTrailer

func (k *KeepaliveServerStream) SetTrailer(md metadata.MD)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL