srpc

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 21, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

README

go-srpc

A package that implements streaming RPC in Go, with highlights as:

  • No Dependancy srpc merely extends the built-in net/rpc.
  • Ease of Use srpc supports pushing versatile events to the client, including ordinary values or logging entries.
  • Server-side Timeout Control Server is able to disconnect a stream after client being silent for a certain while.
  • Client-side Cancellation Client is able to cancel an ongoing stream.

An example for quick glance:

package main

import (
    "errors"
    "log"
    "net"
    "net/http"
    "net/rpc"
    "time"

    "github.com/hsfzxjy/go-srpc"
)

type Foo int

func (*Foo) Bar(n int, s *srpc.Session) error {
    return srpc.S(func() error {
        for i := 0; i < n; i++ {
            s.PushValue(i)
            s.Logf("Log from Server: i=%d\n", i)
        }
        return errors.New("example error")
    }, s, nil)
}

func main() {
    // start RPC server
    rpc.Register(new(Foo))
    rpc.HandleHTTP()
    listener, _ := net.Listen("tcp", ":1234")
    go http.Serve(listener, nil)

    time.Sleep(time.Millisecond * 10)

    // prepare RPC client
    cl, _ := rpc.DialHTTP("tcp", "localhost:1234")
    cli := srpc.WrapClient(cl)

    // invoke remote stream function
    h, _ := cli.CallStream("Foo.Bar", 6)
    // enumerate the result
    for x := range h.C() {
        log.Printf("recieve value from remote: %+v\n", x)
    }
    // check potential returned error
    if err := h.Result(); err != nil {
        log.Printf("remote returns error: %+v", err)
    }

    listener.Close()
}

Usage

Server-side

To define a streaming method that would continuously push events to the client, one should use the following snippet

import "github.com/hsfzxjy/srpc"

func (*Foo) Bar(arg ArgType, s *srpc.Session) error {
    return srpc.S(func() error {
        // your code goes here

        // push arbitary values to the client
        s.PushValue(42)
        s.PushValue("Hello world")

        // log something at the client-side
        s.Logf("Log from server! The arg is %v", arg)

        // wait for a client-side interrupt
        <-s.EndedC()
        // which may caused by
        switch s.EndCause {
        case srpc.EC_CLIENT_CANCELED:
            // a cancellation
        case srpc.EC_CLIENT_TIMEOUT:
            // a client timeout
        }

        // return an optional error to the client
        return nil
    }, s, nil)

The third argument of srpc.S can be used to configure the current session

srpc.S(func() error { ... }, s, &SessionConfig {
    // The capacity of pushing channel.
    // PushValue() or Logf() will block if client does not recieve in-time.
    // Default: 10
    BufferCapacity: 0,
    // Timeout control for lazy clients.
    // If PushValue() or Logf() block for duration `ClientTimeout`,
    // they panic and abort the whole stream.
    // Default: 10 * time.Second
    ClientTimeout: 1 * time.Second,
    // Keep the session alive up to duration `KeepAlive` after it finished,
    // so that client is able to recieve remaining events.
    // Default: 10 * time.Second
    KeepAlive: 1 * time.Second,
})
Client

To call a streaming method at remote, one should firstly wraps an existing rpc.Client, for example

cl, _ := rpc.DialHTTP("tcp", "localhost:1234")
cli := srpc.WrapClient(cl)

cli.CallStream() allows you to invoke a remote streaming method

h, err := cli.CallStream("Foo.Bar", arg)

err would be non-nil if the method does not exist. With h, you may recieve values, perform cancellation or inspect potential errors from remote

// recieve values
<- h.C()
for x := h.C() {
    println(x)
}

// cancel the stream and no consequent events will be pushed
h.Cancel()

// inspect potential error
var err error = h.Err
// inspect potential panic
var p any = h.Panic
// or simply use h.Result()
// if remote panics, h.Result() will panic with the same value
err := h.Result()

Check examples/ for more concrete examples.

License

The Apache License Version 2.0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func S

func S(f func() error, sess *Session, cfg *SessionConfig) error

func SetClientLogFunc

func SetClientLogFunc(logf func(string))

func SetDefaultSessionConfig

func SetDefaultSessionConfig(cfg SessionConfig) bool

func SetServerLogFunc

func SetServerLogFunc(logf func(string, ...any))

Types

type Client

type Client struct {
	*rpc.Client
}

func WrapClient

func WrapClient(c *rpc.Client) *Client

func (*Client) CallStream

func (c *Client) CallStream(name string, args any) (h *StreamHandle, err error)

type EndCause added in v0.2.0

type EndCause int
const (
	EC_UNKNOWN EndCause = iota
	EC_NORMAL
	EC_ERROR
	EC_PANIC
	EC_CLIENT_CANCELED
	EC_CLIENT_TIMEOUT
)

func (EndCause) Error added in v0.2.0

func (ec EndCause) Error() string

type Session

type Session struct {
	Sid uint64

	EndCause EndCause
	// contains filtered or unexported fields
}

A streaming RPC session

func (*Session) EndC added in v0.3.0

func (s *Session) EndC() <-chan struct{}

func (*Session) Logf

func (s *Session) Logf(format string, args ...any)

Log a formatted message at client side

func (*Session) Logv

func (s *Session) Logv(args ...any)

func (*Session) PushValue

func (s *Session) PushValue(data any)

Push arbitary value to client

type SessionConfig

type SessionConfig struct {
	BufferCapacity int
	ClientTimeout  time.Duration
	KeepAlive      time.Duration
	PollTimeout    time.Duration
}

type SrpcError

type SrpcError struct {
	ErrorString string
	Typ         string
}

func (*SrpcError) Error

func (s *SrpcError) Error() string

func (*SrpcError) Format

func (s *SrpcError) Format(f fmt.State, verb rune)

type StreamEvent

type StreamEvent struct {
	Typ  streamEventType
	Data any
}

type StreamHandle added in v0.1.9

type StreamHandle struct {
	EndCause EndCause

	Err   error
	Panic *panicInfo
	// contains filtered or unexported fields
}

func (*StreamHandle) C added in v0.1.9

func (h *StreamHandle) C() <-chan any

func (*StreamHandle) Cancel added in v0.1.9

func (h *StreamHandle) Cancel() bool

Forcibly cancel the stream, no more events is coming from the server

func (*StreamHandle) CancelAndResult added in v0.1.9

func (h *StreamHandle) CancelAndResult() error

func (*StreamHandle) EndC added in v0.3.0

func (h *StreamHandle) EndC() <-chan struct{}

func (*StreamHandle) GetError added in v0.1.9

func (h *StreamHandle) GetError() error

Wait for the stream to complete. If server throws a panic or error, return that value

func (*StreamHandle) IsEnded added in v0.2.0

func (h *StreamHandle) IsEnded() bool

func (*StreamHandle) Result added in v0.1.9

func (h *StreamHandle) Result() error

Wait for the stream to complete. If server-side throws a panic, PANIC with that value; otherwise, return the error that server throws

func (*StreamHandle) SoftCancel added in v0.2.3

func (h *StreamHandle) SoftCancel() bool

Advise the server that the stream should be canceled, but still wait for subsequent events

func (*StreamHandle) Success added in v0.1.9

func (h *StreamHandle) Success() bool

Wait for the stream to complete, and return whether a panic or error is thrown from the server-side

type StreamManager

type StreamManager struct {
	// contains filtered or unexported fields
}

func (*StreamManager) Cancel

func (m *StreamManager) Cancel(sid uint64, loaded *bool) error

func (*StreamManager) Poll

func (m *StreamManager) Poll(sid uint64, reply *[]*StreamEvent) error

func (*StreamManager) SoftCancel added in v0.2.4

func (m *StreamManager) SoftCancel(sid uint64, loaded *bool) error

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL