goutube

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2022 License: Apache-2.0 Imports: 50 Imported by: 0

README

Goutube

Streaming utility to build largely-scalable, highly available, fast, and secure distributed streaming APIs!

Why build this? What's new here?
  • Simple reason is that distributed systems are cool because of many reasons and some of these reasons are largely-scalability, high availability, secure, and fast.
  • With this project, I attempt to go in-depth on how to build a system that grows in functionality as well as users and the team developing it.
  • It's my attempt to broaden my knowledge and make it strong by developing this real-world end-to-end product.
Why choose Go?
  • Simplicity
  • Strongly typed and compiled
  • Compiles to a single binary with no external dependencies
  • Fast and lightweight
  • Good coding practices
  • Excellent support for network programming and concurrency
  • Easy to deploy
Prerequisites
  • Go 1.16+

Configuration parameters

Parameters Type Usage
DataDir string Path to the directory to store data locally.
BindAddr string The address that Goutube will bind to for communication with other members on the ring.
RPCPort int The port that Goutube will bind to for the member's RPC server.
ReplicationPort string If this server allows replication (by setting the ParticipationRule). This is the port where leader and its follower will communicate to sync with each other. (eventually consistent)
NodeName string Unique node name to identify this member.
SeedAddresses []string Addresses of other members to join the ring upon start up.
VirtualNodeCount int Number of virtual nodes to create on the ring for this member.
ACLModelFile string Path to the model file for authorization using casbin
ACLPolicyFile string Path to the policy file for authorization using casbin
ServerTLSConfig *tls.Config To configure the server for authentication.
PeerTLSConfig int To configure the client for authentication (to connect to other servers for replication, forwarding, etc.)
LeaderAddresses []string Hash function to calculate position of the server on the ring.
Rule ParticipationRule Participation rule to define the boundary and extension of the server.
MaxChunkSize uint64 Size of the chunk that can be processed by this server.
MultiStreamPercent int Percents of the number of followers to return upon GetMetadata request?
Can I contribute to this project?

Feel free to create a PR, I’m more than happy to review and merge it.

What's the long-term goal?
  • Onboard videos and documentation
  • Clean code, full test coverage and minimal tech debt

Thank you!

Feel free to create an issue, if you have any problem running this distributed system or any suggestions.

Documentation

Index

Constants

View Source
const DefaultMaxChunkSize = 256 // DefaultMaxChunkSize defines the default value of the size of the chunk that can be processed by this server.
View Source
const DefaultMultiStreamPercent = 100 // DefaultMultiStreamPercent defines the default value for the multi-stream percent is 100.
View Source
const RingRPC = 1
View Source
const TestName = "goutube"

Variables

View Source
var (
	// ErrArcShutdown is returned when operations are requested against an
	// inactive Raft.
	ErrArcShutdown = errors.New("arc is already shutdown")

	// ErrEnqueueTimeout is returned when a command fails due to a timeout.
	ErrEnqueueTimeout = errors.New("timed out enqueuing operation")

	// ErrStoreNullPointer is returned when the provided ArcConfig has nil Log
	ErrStoreNullPointer = errors.New("store cannot be nil")

	// ErrFSMNullPointer is returned when the provided ArcConfig has nil FSM
	ErrFSMNullPointer = errors.New("FSM cannot be nil")
)
View Source
var (
	CAFile               = configFile("ca.pem")
	ServerCertFile       = configFile("server.pem")
	ServerKeyFile        = configFile("server-key.pem")
	ClientCertFile       = configFile("client.pem")
	ClientKeyFile        = configFile("client-key.pem")
	RootClientCertFile   = configFile("root-client.pem")
	RootClientKeyFile    = configFile("root-client-key.pem")
	NobodyClientCertFile = configFile("nobody-client.pem")
	NobodyClientKeyFile  = configFile("nobody-client-key.pem")
	ACLModelFile         = configFile("model.conf")
	ACLPolicyFile        = configFile("policy.csv")
)
View Source
var (
	ErrCannotHandleRequest          = errors.New("couldn't handle the request")
	ErrMultiStreamMetadataCorrupted = errors.New("metadata cannot be verified")
	ErrWorkersNotFound              = errors.New("workers not found")
)
View Source
var (
	// ErrTransportShutdown is returned when operations on a transport are
	// invoked after it's been terminated.
	ErrTransportShutdown = errors.New("transport shutdown")

	// ErrPipelineShutdown is returned when the pipeline is closed.
	ErrPipelineShutdown = errors.New("command pipeline closed")
)
View Source
var (
	ErrMaxChunkSizeInvalid = errors.New("configuration error: max chunk size cannot be zero")
)
View Source
var (
	ErrPointNotFoundInMemory = errors.New("point not found in the store")
)

Functions

func Check

func Check(err error)

func NewFollowerCache

func NewFollowerCache(followers []*streaming_api.Server, multiStreamPercent int) *followerCache

NewFollowerCache caches the provided followers. multiStreamPercent is used to calculate the percent of followers that will be used to read stream parallel.

func NewLoadBalancer

func NewLoadBalancer(config *loadbalancerConfig, opts ...grpc.ServerOption) (*grpc.Server, error)

func NewServer

func NewServer(config *ServerConfig, opts ...grpc.ServerOption) (*grpc.Server, error)

func PanicValue

func PanicValue(fn func()) (recovered interface{})

func RunService

func RunService(ctx context.Context, targetDir string, service string)

Build and run a service in a target directory

func SetupTLSConfig

func SetupTLSConfig(cfg TLSConfig) (*tls.Config, error)

func StopService

func StopService(ctx context.Context)

Types

type Agent

type Agent struct {
	AgentConfig
	// contains filtered or unexported fields
}

func NewAgent

func NewAgent(config AgentConfig) (*Agent, error)

func (*Agent) Shutdown

func (a *Agent) Shutdown() error

type AgentConfig

type AgentConfig struct {
	DataDir          string
	BindAddr         string
	RPCPort          int
	ReplicationPort  int
	NodeName         string
	SeedAddresses    []string
	VirtualNodeCount int

	ACLModelFile    string
	ACLPolicyFile   string
	ServerTLSConfig *tls.Config // Served to clients.
	PeerTLSConfig   *tls.Config // Servers so they can connect with and replicate each other.

	LeaderAddresses []string          // Addresses of the servers which will set this server as one of its loadbalancers (for replication).
	Rule            ParticipationRule // True, if this server takes part in the ring (peer-to-peer architecture) and/or replication.

	MaxChunkSize       uint64 // MaxChunkSize defines the size of the chunk that can be processed by this server.
	MultiStreamPercent int    // MultiStreamPercent tells Percents of the number of followers to return upon GetMetadata request?
}

func (AgentConfig) RPCAddr

func (c AgentConfig) RPCAddr() (string, error)

func (AgentConfig) ReplicationRPCAddr

func (c AgentConfig) ReplicationRPCAddr() (string, error)

type Arc

type Arc struct {
	ArcConfig

	Dir string
	// contains filtered or unexported fields
}

func NewArc

func NewArc(config ArcConfig) (*Arc, error)

func (*Arc) Apply

func (arc *Arc) Apply(data []byte, timeout time.Duration) *RecordPromise

func (Arc) GetFollowers

func (state Arc) GetFollowers() []Server

GetFollowers gets the addresses of its loadbalancers.

func (*Arc) Shutdown

func (arc *Arc) Shutdown() Promise

Shutdown is used to stop the Arc background routines. This is not a graceful operation. Provides a future that can be used to block until all background routines have exited.

func (*Arc) Transfer added in v0.1.2

func (arc *Arc) Transfer(rt *ring.ShardResponsibility, keys []string)

Transfer transfers the responsibility of any point to the responsible new server due to resharding.

type ArcConfig

type ArcConfig struct {

	// Dialer
	StreamLayer StreamLayer
	Logger      hclog.Logger
	// Timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply
	// the timeout by (SnapshotSize / TimeoutScale).
	Timeout time.Duration

	MaxChunkSize uint64

	Bundler Bundler
	// contains filtered or unexported fields
}

type Bundler

type Bundler interface {
	Build(header interface{}, key interface{}, value interface{}) ([]byte, error)
}

type Config

type Config struct {
	Distributed struct {
		LocalID      string
		StreamLayer  *LocusStreamLayer
		StoreAddress string
		Logger       hclog.Logger
		Rule         ParticipationRule
		RPCAddress   string
		Ring         *ring.Ring
		MaxChunkSize uint64
	}
	Point struct {
		TickTime     time.Duration
		CloseTimeout time.Duration
		// contains filtered or unexported fields
	}
}

type DistributedLoci

type DistributedLoci struct {
	// contains filtered or unexported fields
}

func NewDistributedLoci

func NewDistributedLoci(dataDir string, config Config) (
	*DistributedLoci,
	error,
)

func (*DistributedLoci) Append

func (*DistributedLoci) ClosePoint

func (d *DistributedLoci) ClosePoint(pointId string) error

func (*DistributedLoci) GetFollowers

func (*DistributedLoci) GetMetadata added in v0.1.2

func (d *DistributedLoci) GetMetadata(pointId string) (PointMetadata, error)

func (*DistributedLoci) GetPoints

func (d *DistributedLoci) GetPoints() []string

func (*DistributedLoci) GetServers

func (*DistributedLoci) Join

func (d *DistributedLoci) Join(rpcAddr string, rule ParticipationRule) error

func (*DistributedLoci) Leave

func (d *DistributedLoci) Leave(rpcAddr string) error

func (*DistributedLoci) OnChange added in v0.1.2

func (d *DistributedLoci) OnChange(batch []ring.ShardResponsibility)

OnChange allows to get notified when new server joins the ring at the position next on the ring to this server. Hence, this server needs to send off points that should not be handled by this server anymore.

func (*DistributedLoci) Read

func (d *DistributedLoci) Read(pointId string, pos uint64) (uint64, []byte, error)

func (*DistributedLoci) ReadAt

func (d *DistributedLoci) ReadAt(pointId string, b []byte, off uint64) (int, error)

func (*DistributedLoci) ReadWithLimit added in v0.1.2

func (d *DistributedLoci) ReadWithLimit(pointId string, pos uint64, chunkSize uint64, limit uint64) (uint64, []byte, error)

func (*DistributedLoci) Remove

func (d *DistributedLoci) Remove() error

func (*DistributedLoci) Shutdown

func (d *DistributedLoci) Shutdown() error

type FSM

type FSM interface {
	Apply(command *RecordRequest) *FSMRecordResponse
	Read(key string, offset uint64) (uint64, []byte, error)
}

type FSMRecordResponse

type FSMRecordResponse struct {
	StoreKey   interface{}
	StoreValue interface{}
	Response   interface{}
}

type Follower

type Follower struct {
	// contains filtered or unexported fields
}

func NewFollower

func NewFollower(ServerAddress ServerAddress) (*Follower, error)

type GetFollowerer

type GetFollowerer interface {
	GetFollowers(*streaming_api.GetFollowersRequest) ([]*streaming_api.Server, error)
}

type GetServerer

type GetServerer interface {
	GetServers(*streaming_api.GetServersRequest) ([]*streaming_api.Server, error)
}

type GetServersPromise

type GetServersPromise struct {
	// contains filtered or unexported fields
}

func (*GetServersPromise) Error

func (p *GetServersPromise) Error() error

func (*GetServersPromise) Request

func (c *GetServersPromise) Request() *GetServersRequest

func (*GetServersPromise) Response

func (c *GetServersPromise) Response() interface{}

type GetServersRequest

type GetServersRequest struct {
}

type GetServersResponse

type GetServersResponse struct {
	Response interface{}
}

type InMemoryPointStore

type InMemoryPointStore struct {
	// contains filtered or unexported fields
}

InMemoryPointStore manages points and its last offset

func NewInMomoryPointStore

func NewInMomoryPointStore(dir string) (*InMemoryPointStore, error)

func (*InMemoryPointStore) AddPointEvent

func (store *InMemoryPointStore) AddPointEvent(pointId string, offset uint64) error

func (*InMemoryPointStore) GetPointEvent

func (store *InMemoryPointStore) GetPointEvent(pointId string) (uint64, error)

type Locus

type Locus struct {
	Config Config
	// contains filtered or unexported fields
}

func NewLocus

func NewLocus(locusDir string, config Config) (*Locus, error)

func (*Locus) Append

func (l *Locus) Append(pointId string, b []byte) (n uint64, pos uint64, err error)

func (*Locus) Close

func (l *Locus) Close(pointId string) error

func (*Locus) CloseAll

func (l *Locus) CloseAll() error

func (*Locus) GetMetadata added in v0.1.2

func (l *Locus) GetMetadata(pointId string) (PointMetadata, error)

func (*Locus) GetPoints

func (l *Locus) GetPoints() []string

func (*Locus) Read

func (l *Locus) Read(pointId string, pos uint64, chunkSize uint64, limit uint64) (uint64, []byte, error)

func (*Locus) ReadAt

func (l *Locus) ReadAt(pointId string, b []byte, off uint64) (int, error)

func (*Locus) Remove

func (l *Locus) Remove(pointId string) error

func (*Locus) RemoveAll

func (l *Locus) RemoveAll() error

func (*Locus) Reset

func (l *Locus) Reset() error

type LocusHelper

type LocusHelper interface {
	Append(*streaming_api.ProduceRequest) (uint64, error)
	GetMetadata(string) (PointMetadata, error)
	ReadWithLimit(string, uint64, uint64, uint64) (uint64, []byte, error)
	ClosePoint(string) error
}

type LocusStreamLayer

type LocusStreamLayer struct {
	// contains filtered or unexported fields
}

func NewStreamLayer

func NewStreamLayer(
	ln net.Listener,
	serverTLSConfig,
	peerTLSConfig *tls.Config,
) *LocusStreamLayer

func (*LocusStreamLayer) Accept

func (s *LocusStreamLayer) Accept() (net.Conn, error)

func (*LocusStreamLayer) Addr

func (s *LocusStreamLayer) Addr() net.Addr

func (*LocusStreamLayer) Close

func (s *LocusStreamLayer) Close() error

func (*LocusStreamLayer) Dial

func (s *LocusStreamLayer) Dial(address ServerAddress, timeout time.Duration) (net.Conn, error)

type ParticipationRule

type ParticipationRule uint8
const (
	StandaloneLeaderRule ParticipationRule = iota // StandaloneLeaderRule server only participates in the sharding and doesn't need replication.
	LeaderRule                                    // LeaderRule server participates in the sharding, but also has followers for replication.
	FollowerRule                                  // FollowerRule server only participates in the replication.
	LoadBalancerRule                              // LoadBalancerRule server acts as an entry point to servers with other rules.
)

type Picker

type Picker struct {
	// contains filtered or unexported fields
}

func (*Picker) Build

func (p *Picker) Build(buildInfo base.PickerBuildInfo) balancer.Picker

func (*Picker) Pick

func (p *Picker) Pick(info balancer.PickInfo) (
	balancer.PickResult,
	error,
)

type Point

type Point struct {
	File *os.File
	// contains filtered or unexported fields
}

func (*Point) Append

func (p *Point) Append(b []byte) (n uint64, pos uint64, err error)

func (*Point) Close

func (p *Point) Close() error

func (*Point) GetLastAccessed

func (p *Point) GetLastAccessed() time.Time

func (*Point) GetMetadata added in v0.1.2

func (p *Point) GetMetadata() PointMetadata

func (*Point) Open

func (p *Point) Open() error

func (*Point) Read

func (p *Point) Read(pos uint64, chunkSize uint64, limit uint64) (uint64, []byte, error)

func (*Point) ReadAt

func (p *Point) ReadAt(b []byte, off uint64) (int, error)

type PointMetadata added in v0.1.2

type PointMetadata struct {
	// contains filtered or unexported fields
}

type Promise

type Promise interface {
	Error() error
	Response() interface{}
}

type RPC

type RPC struct {
	Command  interface{}
	RespChan chan<- RPCResponse
}

RPC has a command, and provides a response mechanism.

func (*RPC) Respond

func (r *RPC) Respond(resp interface{}, err error)

Respond is used to respondError with a response, error or both

type RPCResponse

type RPCResponse struct {
	Response interface{}
	Error    error
}

RPCResponse captures both a response and a potential error.

type RecordEntriesPipeline

type RecordEntriesPipeline interface {
	// SendRecordEntriesRequest is used to add another request to the pipeline.
	// To send may block which is an effective form of back-pressure.
	SendRecordEntriesRequest(req *RecordEntriesRequest, resp *RecordEntriesResponse) (Promise, error)

	// Consumer returns a channel that can be used to consume
	// response futures when they are ready.
	Consumer() <-chan Promise

	// Close closes the pipeline and cancels all inflight RPCs
	Close() error
}

RecordEntriesPipeline is used for pipelining AppendEntries requests. It is used to increase the replication throughput by masking latency and better utilizing bandwidth.

type RecordEntriesPromise

type RecordEntriesPromise struct {
	// contains filtered or unexported fields
}

func (*RecordEntriesPromise) Error

func (p *RecordEntriesPromise) Error() error

func (*RecordEntriesPromise) Request

func (*RecordEntriesPromise) Response

func (c *RecordEntriesPromise) Response() interface{}

type RecordEntriesRequest

type RecordEntriesRequest struct {
	Entries []*RecordRequest
}

type RecordEntriesResponse

type RecordEntriesResponse struct {
	LastOff  uint64
	Response interface{}
}

type RecordPromise

type RecordPromise struct {
	// contains filtered or unexported fields
}

func (*RecordPromise) Error

func (p *RecordPromise) Error() error

func (*RecordPromise) Request

func (c *RecordPromise) Request() *RecordRequest

func (*RecordPromise) Response

func (c *RecordPromise) Response() interface{}

type RecordRequest

type RecordRequest struct {
	Data []byte
}

type RecordResponse

type RecordResponse struct {
	LastOff  uint64
	Response interface{}
}

type RedisPointStore

type RedisPointStore struct {
	// contains filtered or unexported fields
}

RedisPointStore manages points and its last offset

func NewRedisPointStore

func NewRedisPointStore(address string, dir string) (*RedisPointStore, error)

func (*RedisPointStore) AddPointEvent

func (rps *RedisPointStore) AddPointEvent(pointId string, offset uint64) error

func (*RedisPointStore) GetPointEvent

func (rps *RedisPointStore) GetPointEvent(pointId string) (uint64, error)

type ReplicateCommandPromise

type ReplicateCommandPromise struct {
	// contains filtered or unexported fields
}

func NewReplicateCommandPromise

func NewReplicateCommandPromise(req *RecordRequest, expected interface{}) *ReplicateCommandPromise

type ReplicationClusterConfig

type ReplicationClusterConfig struct {
	NodeName      string
	BindAddr      string
	Tags          map[string]string
	SeedAddresses []string
}

type ReplicationClusterHandler

type ReplicationClusterHandler interface {
	Join(rpcAddr string, rule ParticipationRule) error
	Leave(rpcAddr string) error
}

ReplicationClusterHandler interface to get notified when a new member joins or existing member leaves the cluster of replication.

type RequestBundler

type RequestBundler struct {
}

func (*RequestBundler) Build

func (rb *RequestBundler) Build(header interface{}, key interface{}, value interface{}) (
	[]byte,
	error,
)

type RequestType

type RequestType uint8
const (
	AppendRequestType RequestType = 0
)

type Resolver

type Resolver struct {
	// contains filtered or unexported fields
}

func (*Resolver) Build

func (r *Resolver) Build(
	target resolver.Target,
	cc resolver.ClientConn,
	opts resolver.BuildOptions,
) (
	resolver.Resolver,
	error,
)

func (*Resolver) Close

func (r *Resolver) Close()

func (*Resolver) ResolveNow

func (r *Resolver) ResolveNow(resolver.ResolveNowOptions)

func (*Resolver) Scheme

func (r *Resolver) Scheme() string

type ResolverHelperConfig

type ResolverHelperConfig struct {
	GetServerer   GetServerer
	GetFollowerer GetFollowerer
}

type Server

type Server struct {
	// Address is its network address that a transport can contact.
	Address ServerAddress
}

Server tracks the information about a single server in a configuration.

type ServerAddress

type ServerAddress string

ServerAddress is a network address for a server that a transport can contact.

type ServerConfig

type ServerConfig struct {
	StreamingConfig      *StreamingConfig
	ResolverHelperConfig *ResolverHelperConfig
	Rule                 ParticipationRule
}

type ServerID

type ServerID string

ServerID is a unique string identifying a server for all time.

type Store

type Store interface {
	AddPointEvent(pointId string, offset uint64) error
	GetPointEvent(pointId string) (uint64, error)
}

type StreamLayer

type StreamLayer interface {
	net.Listener

	// Dial is used to create a new outgoing connection
	Dial(address ServerAddress, timeout time.Duration) (net.Conn, error)
}

StreamLayer is used with the NetworkTransport to provide the low level stream abstraction.

type StreamingConfig

type StreamingConfig struct {
	Locus      LocusHelper
	Authorizer *authorizer
}

type StreamingManager

type StreamingManager struct {
	streaming_api.UnimplementedStreamingServer
	*StreamingConfig
}

func NewStreamingServer

func NewStreamingServer(config *StreamingConfig) (*StreamingManager, error)

func (*StreamingManager) ConsumeStream

func (*StreamingManager) GetMetadata added in v0.1.2

func (*StreamingManager) ProduceStream

type TCPStreamLayer

type TCPStreamLayer struct {
	// contains filtered or unexported fields
}

TCPStreamLayer implements StreamLayer interface for plain TCP.

func (*TCPStreamLayer) Accept

func (t *TCPStreamLayer) Accept() (c net.Conn, err error)

Accept implements the net.Listener interface.

func (*TCPStreamLayer) Addr

func (t *TCPStreamLayer) Addr() net.Addr

Addr implements the net.Listener interface.

func (*TCPStreamLayer) Close

func (t *TCPStreamLayer) Close() (err error)

Close implements the net.Listener interface.

func (*TCPStreamLayer) Dial

func (t *TCPStreamLayer) Dial(address ServerAddress, timeout time.Duration) (net.Conn, error)

Dial implements the StreamLayer interface.

type TLSConfig

type TLSConfig struct {
	CertFile      string
	KeyFile       string
	CAFile        string
	ServerAddress string
	Server        bool
}

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport provides a network based transport that can be used to communicate with Raft on remote machines. It requires an underlying stream layer to provide a stream abstraction, which can be simple TCP, TLS, etc.

This transport is very simple and lightweight. Each RPC request is framed by sending a byte that indicates the MsgPack encoded request.

The response is an error string followed by the response object, both are encoded using MsgPack.

func NewTransportWithConfig

func NewTransportWithConfig(
	config *TransportConfig,
) *Transport

NewTransportWithConfig creates a new network transport with the given config struct

func (*Transport) Close

func (transport *Transport) Close() error

Close is used to stop the network transport.

func (*Transport) CloseStreams

func (transport *Transport) CloseStreams()

CloseStreams closes the current streams.

func (*Transport) Consumer

func (transport *Transport) Consumer() <-chan RPC

Consumer implements the Transport interface.

func (*Transport) IsShutdown

func (transport *Transport) IsShutdown() bool

IsShutdown is used to check if the transport is shutdown.

func (*Transport) LocalAddr

func (transport *Transport) LocalAddr() ServerAddress

LocalAddr implements the Transport interface.

func (*Transport) PrepareCommandTransport

func (transport *Transport) PrepareCommandTransport(target ServerAddress) (RecordEntriesPipeline, error)

PrepareCommandTransport returns an interface that can be used to pipeline SendRecordEntriesRequest requests.

func (*Transport) SendGetServersRequest

func (transport *Transport) SendGetServersRequest(target ServerAddress, req *GetServersRequest, resp *GetServersResponse) error

SendGetServersRequest requests the target to provide the list of its loadbalancers.

func (*Transport) SendRecordEntriesRequest

func (transport *Transport) SendRecordEntriesRequest(target ServerAddress, req *RecordEntriesRequest, resp *RecordEntriesResponse) error

SendRecordEntriesRequest implements the Transport interface.

type TransportConfig

type TransportConfig struct {
	Logger hclog.Logger

	// Dialer
	Stream StreamLayer

	// Timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply
	// the timeout by (SnapshotSize / TimeoutScale).
	Timeout time.Duration

	MaxPool int
}

TransportConfig encapsulates configuration for the network transport layer.

Directories

Path Synopsis
api
START: intro
START: intro

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL