grpcserver

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 14, 2023 License: Apache-2.0 Imports: 15 Imported by: 0

README

Conduit Connector for gRPC Server

The gRPC Server connector is one of Conduit plugins. It provides a source gRPC Server connector.

This connector should be paired with another Conduit instance or pipeline, that provides a gRPC client destination. Where the client will initiate the connection with this server, and start sending records to it.

How to build?

Run make build to build the connector.

Testing

Run make test to run all the unit tests.

Source

This source connector creates a server on the url provided as a parameter. When a client initiates connection, a bidirectional gRPC stream is created between the server and the client, the server keeps listening on this stream to receive records sent from the client, when a record is received, an acknowledgment is sent to the client on the same stream.

Configuration
name description required default value
url url to gRPC server. true
mtls.disabled option to disable mTLS secure connection, set it to true for an insecure connection. false false
mtls.server.certPath the server certificate path. required if mtls.disabled is false
mtls.server.keyPath the server private key path. required if mtls.disabled is false
mtls.ca.certPath the root CA certificate path. required if mtls.disabled is false

Mutual TLS (mTLS)

Mutual TLS is used by default to connect to the server, to disable mTLS you can set the parameter mtls.disabled to true, this will result in an insecure connection to the server.

This repo contains self-signed certificates that can be used for local testing purposes, you can find them under ./test/certs, note that these certificates are not meant to be used in production environment.

To generate your own secure mTLS certificates, check this tutorial.

Planned work

  • Add a destination for gRPC server.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Connector = sdk.Connector{
	NewSpecification: Specification,
	NewSource:        NewSource,
	NewDestination:   nil,
}

Connector combines all constructors for each plugin in one struct.

Functions

func AttachPositionIndex

func AttachPositionIndex(p sdk.Position, index uint32) sdk.Position

func NewSource

func NewSource() sdk.Source

func NewSourceWithListener

func NewSourceWithListener(lis net.Listener) sdk.Source

NewSourceWithListener for testing purposes.

func Specification

func Specification() sdk.Specification

Specification returns the connector's specification.

Types

type Config

type Config struct {
	// url to gRPC server
	URL string `json:"url" validate:"required"`
	// mTLS configurations.
	MTLS MTLSConfig `json:"mtls"`
}

Config has the generic parameters needed for a gRPC server

type MTLSConfig

type MTLSConfig struct {
	// the server certificate path.
	ServerCertPath string `json:"server.certPath"`
	// the server private key path.
	ServerKeyPath string `json:"server.keyPath"`
	// the root CA certificate path.
	CACertPath string `json:"ca.certPath"`
	// option to disable mTLS secure connection, set it to `true` for an insecure connection.
	Disabled bool `json:"disabled" default:"false"`
}

func (*MTLSConfig) ParseMTLSFiles

func (mc *MTLSConfig) ParseMTLSFiles() (tls.Certificate, *x509.CertPool, error)

ParseMTLSFiles parses and validates mTLS params values, returns the parsed server certificate, and CA certificate pool, and an error if the parsing fails

type Position

type Position struct {
	Index    uint32
	Original []byte
}

func ToRecordPosition

func ToRecordPosition(p sdk.Position) Position

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue represents a simple FIFO queue

func (*Queue) Dequeue

func (q *Queue) Dequeue() (uint32, error)

Dequeue removes and returns the first item from the queue

func (*Queue) Enqueue

func (q *Queue) Enqueue(item uint32)

Enqueue adds an item to the end of the queue

type Source

type Source struct {
	sdk.UnimplementedSource
	// contains filtered or unexported fields
}

func (*Source) Ack

func (s *Source) Ack(ctx context.Context, position sdk.Position) error

func (*Source) Configure

func (s *Source) Configure(ctx context.Context, cfg map[string]string) error

func (*Source) Open

func (s *Source) Open(ctx context.Context, _ sdk.Position) error

func (*Source) Parameters

func (s *Source) Parameters() map[string]sdk.Parameter

func (*Source) Read

func (s *Source) Read(ctx context.Context) (sdk.Record, error)

func (*Source) Teardown

func (s *Source) Teardown(ctx context.Context) error

type SourceConfig

type SourceConfig struct {
	Config
}

func (SourceConfig) Parameters

func (SourceConfig) Parameters() map[string]sdk.Parameter

Directories

Path Synopsis
cmd
proto
v1

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL