ddp

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 13, 2023 License: ISC Imports: 11 Imported by: 27

README

DDP in Go

Meteor DDP library for Go. This library allows Go applications to connect to Meteor applications, subscribe to Meteor publications, read from a cached Collection (similar to minimongo), and call Meteor methods on the server.

See ddp/_examples for some tips and an example app that walks through all the features of the library.

Documentation

Overview

Package ddp implements the MeteorJS DDP protocol over websockets. Fallback to long polling is NOT supported (and is not planned on ever being supported by this library). We will try to model the library after `net/http` - right now the library is bare bones and doesn't provide the plug-ability of http. However, that's the goal for the package eventually.

Index

Constants

View Source
const (
	DISCONNECTED = iota
	DISCONNECTING
	CONNECTED
	DIALING
	CONNECTING
	RECONNECTING
)

Variables

This section is empty.

Functions

func IgnoreErr added in v0.0.3

func IgnoreErr(err error, msg string, l *log.Entry)

IgnoreErr logs an error if it occurs and ignores it.

Types

type Call

type Call struct {
	ID            string      // The uuid for this method call
	ServiceMethod string      // The name of the service and method to call.
	Args          interface{} // The argument to the function (*struct).
	Reply         interface{} // The reply from the function (*struct).
	Error         error       // After completion, the error status.
	Done          chan *Call  // Strobes when call is complete.
	Owner         *Client     // Client that owns the method call
	// contains filtered or unexported fields
}

Call represents an active RPC call.

type Client

type Client struct {
	// HeartbeatInterval is the time between heartbeats to send
	HeartbeatInterval time.Duration
	// HeartbeatTimeout is the time for a heartbeat ping to timeout
	HeartbeatTimeout time.Duration
	// ReconnectInterval is the time between reconnections on bad connections
	ReconnectInterval time.Duration

	// KeyManager tracks IDs for ddp messages
	KeyManager
	// contains filtered or unexported fields
}

Client represents a DDP client connection. The DDP client establish a DDP session and acts as a message pump for other tools.

func NewClient

func NewClient(url, origin string) *Client

NewClient creates a client with NewClientWithLogger and the default apex logger.

func NewClientWithLogger added in v0.0.6

func NewClientWithLogger(url, origin string, logger *log.Entry) *Client

NewClientWithLogger creates a default client (using an internal websocket) to the provided URL using the origin for the connection. The client will automatically connect, upgrade to a websocket, and establish a DDP connection session before returning the client. The client will automatically and internally handle heartbeats and reconnects.

TBD create an option to use an external websocket (aka htt.Transport) TBD create an option to substitute heartbeat and reconnect behavior (aka http.Transport) TBD create an option to hijack the connection (aka http.Hijacker) TBD create profiling features (aka net/http/pprof)

func (*Client) AddConnectionListener

func (c *Client) AddConnectionListener(listener ConnectionListener)

AddConnectionListener in order to receive connection updates.

func (*Client) AddStatusListener

func (c *Client) AddStatusListener(listener StatusListener)

AddStatusListener in order to receive status change updates.

func (*Client) Call

func (c *Client) Call(serviceMethod string, args ...interface{}) (interface{}, error)

Call invokes the named function, waits for it to complete, and returns its error status.

func (*Client) Close

func (c *Client) Close()

Close implements the io.Closer interface.

func (*Client) CollectionByName

func (c *Client) CollectionByName(name string) Collection

CollectionByName retrieves a collection by its name.

func (*Client) CollectionStats

func (c *Client) CollectionStats() []CollectionStats

CollectionStats returns a snapshot of statistics for the currently known collections.

func (*Client) Connect

func (c *Client) Connect() error

Connect attempts to connect the client to the server.

func (*Client) Go

func (c *Client) Go(serviceMethod string, done chan *Call, args ...interface{}) *Call

Go invokes the function asynchronously. It returns the Call structure representing the invocation. The done channel will signal when the call is complete by returning the same Call object. If done is nil, Go will allocate a new channel. If non-nil, done must be buffered or Go will deliberately crash.

Go and Call are modeled after the standard `net/rpc` package versions.

func (*Client) Ping

func (c *Client) Ping()

Ping sends a heartbeat signal to the server. The Ping doesn't look for a response but may trigger the connection to reconnect if the ping times out. This is primarily useful for reviving an unresponsive Client connection.

func (*Client) PingPong

func (c *Client) PingPong(id string, timeout time.Duration, handler func(error))

PingPong sends a heartbeat signal to the server and calls the provided function when a pong is received. An optional id can be sent to help track the responses - or an empty string can be used. It is the responsibility of the caller to respond to any errors that may occur.

func (*Client) Reconnect

func (c *Client) Reconnect()

Reconnect attempts to reconnect the client to the server on the existing DDP session.

TODO needs a reconnect backoff so we don't trash a down server TODO reconnect should not allow more reconnects while a reconnection is already in progress.

func (*Client) ResetStats

func (c *Client) ResetStats()

ResetStats resets the statistics for the client.

func (*Client) Send

func (c *Client) Send(msg interface{}) error

Send transmits messages to the server. The msg parameter must be json encoder compatible.

func (*Client) Session

func (c *Client) Session() string

Session returns the negotiated session token for the connection.

func (*Client) Stats

func (c *Client) Stats() *ClientStats

Stats returns the read and write statistics of the client.

func (*Client) Sub

func (c *Client) Sub(subName string, args ...interface{}) error

Sub sends a synchronous subscription request to the server.

func (*Client) Subscribe

func (c *Client) Subscribe(subName string, done chan *Call, args ...interface{}) *Call

Subscribe to data updates.

func (*Client) Version

func (c *Client) Version() string

Version returns the negotiated protocol version in use by the client.

type ClientStats

type ClientStats struct {
	// Reads provides statistics on the raw i/o network reads for the current connection.
	Reads *Stats
	// Reads provides statistics on the raw i/o network reads for the all client connections.
	TotalReads *Stats
	// Writes provides statistics on the raw i/o network writes for the current connection.
	Writes *Stats
	// Writes provides statistics on the raw i/o network writes for all the client connections.
	TotalWrites *Stats
	// Reconnects is the number of reconnections the client has made.
	Reconnects int64
	// PingsSent is the number of pings sent by the client
	PingsSent int64
	// PingsRecv is the number of pings received by the client
	PingsRecv int64
}

ClientStats displays combined statistics for the Client.

func (*ClientStats) String

func (stats *ClientStats) String() string

String produces a compact string representation of the client stats.

type Collection

type Collection interface {

	// FindOne queries objects and returns the first match.
	FindOne(id string) Update
	// FindAll returns a map of all items in the cache - this is a hack
	// until we have time to build out a real minimongo interface.
	FindAll() map[string]Update
	// AddUpdateListener adds a channel that receives update messages.
	AddUpdateListener(listener UpdateListener)
	// contains filtered or unexported methods
}

Collection managed cached collection data sent from the server in a livedata subscription.

It would be great to build an entire mongo compatible local store (minimongo)

func NewCollection

func NewCollection(name string) Collection

NewCollection creates a new collection - always KeyCache.

func NewMockCollection

func NewMockCollection() Collection

NewMockCollection creates an empty collection that does nothing.

type CollectionStats

type CollectionStats struct {
	Name  string // Name of the collection
	Count int    // Count is the total number of documents in the collection
}

CollectionStats combines statistics about a collection.

func (*CollectionStats) String

func (s *CollectionStats) String() string

String produces a compact string representation of the collection stat.

type Connect

type Connect struct {
	Message
	Version string   `json:"version"`
	Support []string `json:"support"`
	Session string   `json:"session,omitempty"`
}

Connect represents a DDP connect message.

func NewConnect

func NewConnect() *Connect

NewConnect creates a new connect message

func NewReconnect

func NewReconnect(session string) *Connect

NewReconnect creates a new connect message with a session ID to resume.

type ConnectionListener

type ConnectionListener interface {
	Connected()
}

type ConnectionNotifier

type ConnectionNotifier interface {
	AddConnectionListener(listener ConnectionListener)
}

type KeyCache

type KeyCache struct {
	// The name of the collection
	Name string
	// contains filtered or unexported fields
}

KeyCache caches items keyed on unique ID.

func (*KeyCache) AddUpdateListener

func (c *KeyCache) AddUpdateListener(listener UpdateListener)

AddUpdateListener adds a listener for changes on a collection.

func (*KeyCache) FindAll

func (c *KeyCache) FindAll() map[string]Update

FindAll returns a dump of all items in the collection

func (*KeyCache) FindOne

func (c *KeyCache) FindOne(id string) Update

FindOne returns the item with matching id.

type KeyManager added in v0.0.3

type KeyManager struct {
	// contains filtered or unexported fields
}

KeyManager provides simple incrementing IDs for ddp messages.

func NewKeyManager added in v0.0.3

func NewKeyManager() *KeyManager

NewKeyManager creates a new instance and sets up resources.

func (*KeyManager) Next added in v0.0.3

func (id *KeyManager) Next() string

Next issues a new ID for use in calls.

type Login

type Login struct {
	User     *User     `json:"user"`
	Password *Password `json:"password"`
}

Login provides a Meteor.Accounts password login support

func NewEmailLogin

func NewEmailLogin(email, pass string) *Login

func NewUsernameLogin

func NewUsernameLogin(user, pass string) *Login

type LoginResume

type LoginResume struct {
	Token string `json:"resume"`
}

func NewLoginResume

func NewLoginResume(token string) *LoginResume

type Message

type Message struct {
	Type string `json:"msg"`
	ID   string `json:"id,omitempty"`
}

Message contains the common fields that all DDP messages use.

type Method

type Method struct {
	Message
	ServiceMethod string        `json:"method"`
	Args          []interface{} `json:"params"`
}

Method is used to send a remote procedure call to the server.

func NewMethod

func NewMethod(id, serviceMethod string, args []interface{}) *Method

NewMethod creates a new method invocation object.

type MockCache

type MockCache struct {
}

MockCache implements the Collection interface but does nothing with the data.

func (*MockCache) AddUpdateListener

func (c *MockCache) AddUpdateListener(ch UpdateListener)

AddUpdateListener does nothing.

func (*MockCache) FindAll

func (c *MockCache) FindAll() map[string]Update

FindAll returns a dump of all items in the collection

func (*MockCache) FindOne

func (c *MockCache) FindOne(id string) Update

FindOne returns the item with matching id.

type OrderedCache

type OrderedCache struct {
	// contains filtered or unexported fields
}

OrderedCache caches items based on list order. This is a placeholder, currently not implemented as the Meteor server does not transmit ordered collections over DDP yet.

func (*OrderedCache) AddUpdateListener

func (c *OrderedCache) AddUpdateListener(ch UpdateListener)

AddUpdateListener does nothing.

func (*OrderedCache) FindAll

func (c *OrderedCache) FindAll() map[string]Update

FindAll returns a dump of all items in the collection

func (*OrderedCache) FindOne

func (c *OrderedCache) FindOne(id string) Update

FindOne returns the item with matching id.

type Password

type Password struct {
	Digest    string `json:"digest"`
	Algorithm string `json:"algorithm"`
}

func NewPassword

func NewPassword(pass string) *Password

type Ping

type Ping Message

Ping represents a DDP ping message.

func NewPing

func NewPing(id string) *Ping

NewPing creates a new ping message with optional ID.

type PingTracker added in v0.0.3

type PingTracker struct {
	// contains filtered or unexported fields
}

PingTracker tracks in-flight pings.

type Pong

type Pong Message

Pong represents a DDP pong message.

func NewPong

func NewPong(id string) *Pong

NewPong creates a new pong message with optional ID.

type ReaderStats

type ReaderStats struct {
	StatsTracker
	Reader io.Reader
}

ReaderStats tracks statistics on any io.Reader. ReaderStats wraps a Reader and passes data to the actual data consumer.

func NewReaderStats

func NewReaderStats(reader io.Reader) *ReaderStats

NewReaderStats creates a ReaderStats object for the provided Reader.

func (*ReaderStats) Read

func (r *ReaderStats) Read(p []byte) (int, error)

Read passes through a read collecting statistics and logging activity.

type Stats

type Stats struct {
	// Bytes is the total number of bytes transferred.
	Bytes int64
	// Ops is the total number of i/o operations performed.
	Ops int64
	// Errors is the total number of i/o errors encountered.
	Errors int64
	// Runtime is the duration that stats have been gathered.
	Runtime time.Duration
}

Stats tracks statistics for i/o operations.

type StatsTracker

type StatsTracker struct {
	// contains filtered or unexported fields
}

StatsTracker provides the basic tooling for tracking i/o stats.

func NewStatsTracker

func NewStatsTracker() *StatsTracker

NewStatsTracker create a new tracker with start time set to now.

func (*StatsTracker) Op

func (t *StatsTracker) Op(n int, err error) (int, error)

Op records an i/o operation. The parameters are passed through to allow easy chaining.

func (*StatsTracker) Reset

func (t *StatsTracker) Reset() *Stats

Reset all stats to initial values.

func (*StatsTracker) Snapshot

func (t *StatsTracker) Snapshot() *Stats

Snapshot takes a snapshot of the current Reader statistics.

type StatusListener

type StatusListener interface {
	Status(status int)
}

type StatusNotifier

type StatusNotifier interface {
	AddStatusListener(listener StatusListener)
}

type Sub

type Sub struct {
	Message
	SubName string        `json:"name"`
	Args    []interface{} `json:"params"`
}

Sub is used to send a subscription request to the server.

func NewSub

func NewSub(id, subName string, args []interface{}) *Sub

NewSub creates a new sub object.

type Time added in v0.0.3

type Time struct {
	time.Time
}

Time is an alias for time.Time with custom json marshalling implementations to support ejson.

func UnixMilli added in v0.0.3

func UnixMilli(i int64) Time

UnixMilli creates a new Time from the given unix millis but in UTC (as opposed to time.UnixMilli which returns time in the local time zone). This supports the proper loading of times from EJSON $date objects.

func (Time) MarshalJSON added in v0.0.3

func (t Time) MarshalJSON() ([]byte, error)

func (*Time) UnmarshalJSON added in v0.0.3

func (t *Time) UnmarshalJSON(b []byte) error

type Update

type Update map[string]interface{}

type UpdateListener

type UpdateListener interface {
	CollectionUpdate(collection, operation, id string, doc Update)
}

type User

type User struct {
	Email    string `json:"email,omitempty"`
	Username string `json:"username,omitempty"`
}

type WriterStats

type WriterStats struct {
	StatsTracker
	Writer io.Writer
}

WriterStats tracks statistics on any io.Writer. WriterStats wraps a Writer and passes data to the actual data producer.

func NewWriterStats

func NewWriterStats(writer io.Writer) *WriterStats

NewWriterStats creates a WriterStats object for the provided Writer.

func (*WriterStats) Write

func (w *WriterStats) Write(p []byte) (int, error)

Write collects Writer statistics.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL