mcorpc-agent-provider: github.com/choria-io/mcorpc-agent-provider/mcorpc/client Index | Files

package client

import "github.com/choria-io/mcorpc-agent-provider/mcorpc/client"

Index

Package Files

client.go nodelist.go options.go stats.go util.go

func InGroups Uses

func InGroups(set []string, size int, f func([]string) error) error

InGroups calls f for sub slices of a slice where every slice is at most `size` big

func InterruptableSleep Uses

func InterruptableSleep(ctx context.Context, d time.Duration) error

InterruptableSleep sleep for the duration of the n'th wait cycle in a way that can be interrupted by the context. An error is returned if the context cancels the sleep

type ChoriaClient Uses

type ChoriaClient interface {
    Request(ctx context.Context, msg *choria.Message, handler cclient.Handler) (err error)
}

ChoriaClient implements the connection to the Choria network

type ChoriaFramework Uses

type ChoriaFramework interface {
    Logger(string) *logrus.Entry
    Configuration() *config.Config
    NewMessage(payload string, agent string, collective string, msgType string, request *choria.Message) (msg *choria.Message, err error)
    NewReplyFromTransportJSON(payload []byte, skipvalidate bool) (msg protocol.Reply, err error)
    NewTransportFromJSON(data string) (message protocol.TransportMessage, err error)
    MiddlewareServers() (servers srvcache.Servers, err error)
    NewConnector(ctx context.Context, servers func() (srvcache.Servers, error), name string, logger *logrus.Entry) (conn choria.Connector, err error)
    NewRequestID() (string, error)
    Certname() string
}

type Connector Uses

type Connector interface {
    QueueSubscribe(ctx context.Context, name string, subject string, group string, output chan *choria.ConnectorMessage) error
    Publish(msg *choria.Message) error
}

Connector is a connection to the choria network

type DiscoveryEndFunc Uses

type DiscoveryEndFunc func(discovered int, limited int) error

DiscoveryEndFunc gets called after discovery ends and include the discovered node count and what count of nodes will be targeted after limits were applied should this return error the RPC call will terminate

type DiscoveryStartFunc Uses

type DiscoveryStartFunc func()

DiscoveryStartFunc gets called before discovery starts

type Handler Uses

type Handler func(protocol.Reply, *RPCReply)

Handler is a function that should handle each reply synchronously

type NodeList Uses

type NodeList struct {
    sync.RWMutex
    // contains filtered or unexported fields
}

NodeList is a list of nodes the client is interacting with and used to keep track of things like which have responded, still to respond etc

func NewNodeList Uses

func NewNodeList() *NodeList

NewNodeList creates a new initialized NodeList

func (*NodeList) AddHosts Uses

func (n *NodeList) AddHosts(hosts ...string)

AddHosts appends the given nodes to the list of known nodes

func (*NodeList) Clear Uses

func (n *NodeList) Clear()

Clear removes all nodes from the NodeList

func (*NodeList) Count Uses

func (n *NodeList) Count() int

Count returns the number of nodes on the list

func (*NodeList) DeleteIfKnown Uses

func (n *NodeList) DeleteIfKnown(host string) bool

DeleteIfKnown removes a node from the list if it's known, boolean result indicates if it was known

func (*NodeList) Have Uses

func (n *NodeList) Have(host string) bool

Have determines if a node is known

func (*NodeList) HaveAny Uses

func (n *NodeList) HaveAny(hosts ...string) bool

HaveAny determines if any of the given nodes are known in a boolean OR fashion

func (*NodeList) Hosts Uses

func (n *NodeList) Hosts() []string

Hosts returns the individual nodes on the list

type Option Uses

type Option func(r *RPC)

Option configures the RPC client

func DDL Uses

func DDL(d *addl.DDL) Option

DDL supplies a DDL when creating the client thus avoiding a disk search

type RPC Uses

type RPC struct {
    // contains filtered or unexported fields
}

RPC is a MCollective compatible RPC client

func New Uses

func New(fw ChoriaFramework, agent string, opts ...Option) (rpc *RPC, err error)

New creates a new RPC request

A DDL is required when one is not given using the DDL() option as argument attempts will be made to find it on the file system should this fail an error will be returned

func (*RPC) Do Uses

func (r *RPC) Do(ctx context.Context, action string, payload interface{}, opts ...RequestOption) (RequestResult, error)

Do performs a RPC request and optionally processes replies

If a filter is supplied using the Filter() option and Targets() are not then discovery will be done for you using the broadcast method, should no nodes be discovered an error will be returned

func (*RPC) Reset Uses

func (r *RPC) Reset()

Reset removes the cached options, any further Do() calls need to specify full options

type RPCReply Uses

type RPCReply struct {
    Statuscode mcorpc.StatusCode `json:"statuscode"`
    Statusmsg  string            `json:"statusmsg"`
    Data       json.RawMessage   `json:"data"`
}

RPCReply is a basic RPC reply

func ParseReplyData Uses

func ParseReplyData(source []byte) (*RPCReply, error)

ParseReplyData parses reply data and populates a Reply and custom Data

type RPCRequest Uses

type RPCRequest struct {
    Agent  string          `json:"agent"`
    Action string          `json:"action"`
    Data   json.RawMessage `json:"data"`
}

RPCRequest is a basic RPC request

type RequestOption Uses

type RequestOption func(*RequestOptions)

RequestOption is a function capable of setting an option

func BroadcastRequest Uses

func BroadcastRequest() RequestOption

BroadcastRequest for the request to be a broadcast mode

**NOTE:** You need to ensure you have filters etc done

func Collective Uses

func Collective(c string) RequestOption

Collective sets the collective to target a message at

func ConnectionName Uses

func ConnectionName(n string) RequestOption

ConnectionName sets the prefix used for various connection names

Setting this when making many clients will minimize prometheus metrics being created - 2 or 3 per client which with random generated names will snowball over time

func DirectRequest Uses

func DirectRequest() RequestOption

DirectRequest force the request to be a direct request

func DiscoveryEndCB Uses

func DiscoveryEndCB(h DiscoveryEndFunc) RequestOption

DiscoveryEndCB sets the function to be called after discovery and node limiting

func DiscoveryStartCB Uses

func DiscoveryStartCB(h DiscoveryStartFunc) RequestOption

DiscoveryStartCB sets the function to be called before discovery starts

func DiscoveryTimeout Uses

func DiscoveryTimeout(t time.Duration) RequestOption

DiscoveryTimeout configures the request discovery timeout, defaults to configured discovery timeout

func Filter Uses

func Filter(f *protocol.Filter) RequestOption

Filter sets the filter, if its set discovery will be done prior to performing requests

func InBatches Uses

func InBatches(size int, sleep int) RequestOption

InBatches performs requests in batches

func LimitMethod Uses

func LimitMethod(m string) RequestOption

LimitMethod configures the method to use when limiting targets - "random" or "first"

func LimitSeed Uses

func LimitSeed(s int64) RequestOption

LimitSeed sets the random seed used to select targets when limiting and limit method is "random"

func LimitSize Uses

func LimitSize(s string) RequestOption

LimitSize sets limits on the targets, either a number of a percentage like "10%"

func Protocol Uses

func Protocol(v string) RequestOption

Protocol sets the protocol version to use

func Replies Uses

func Replies(r chan *choria.ConnectorMessage) RequestOption

Replies creates a custom channel for replies and will avoid processing them

func ReplyHandler Uses

func ReplyHandler(f Handler) RequestOption

ReplyHandler configures a callback to be called for each message received

func ReplyTo Uses

func ReplyTo(r string) RequestOption

ReplyTo sets a custom reply to, else the connector will determine it

func Targets Uses

func Targets(t []string) RequestOption

Targets configures targets for a RPC request

func Timeout Uses

func Timeout(t time.Duration) RequestOption

Timeout configures the request timeout

func Workers Uses

func Workers(w int) RequestOption

Workers configures the amount of workers used to process responses this is ignored during batched mode as that is always done with a single worker

type RequestOptions Uses

type RequestOptions struct {
    BatchSize        int
    BatchSleep       time.Duration
    Collective       string
    ConnectionName   string
    DiscoveryTimeout time.Duration
    Filter           *protocol.Filter
    Handler          Handler
    ProcessReplies   bool
    ProtocolVersion  string
    Replies          chan *choria.ConnectorMessage
    ReplyTo          string
    RequestID        string
    RequestType      string
    Targets          []string
    Timeout          time.Duration
    Workers          int
    LimitSeed        int64
    LimitMethod      string
    LimitSize        string
    DiscoveryStartCB DiscoveryStartFunc
    DiscoveryEndCB   DiscoveryEndFunc
    // contains filtered or unexported fields
}

RequestOptions are options for a RPC request

func NewRequestOptions Uses

func NewRequestOptions(fw ChoriaFramework, ddl *agent.DDL) (*RequestOptions, error)

NewRequestOptions creates a initialized request options

func (*RequestOptions) ConfigureMessage Uses

func (o *RequestOptions) ConfigureMessage(msg *choria.Message) (err error)

ConfigureMessage configures a pre-made message object based on the settings contained

func (*RequestOptions) Stats Uses

func (o *RequestOptions) Stats() *Stats

Stats retrieves the stats for the completed request

type RequestResult Uses

type RequestResult interface {
    Stats() *Stats
}

RequestResult is the result of a request

type Stats Uses

type Stats struct {
    RequestID string
    // contains filtered or unexported fields
}

Stats represent stats for a request

func NewStats Uses

func NewStats() *Stats

NewStats initializes a new stats instance

func (*Stats) Action Uses

func (s *Stats) Action() string

Action returns the action the stat is for if it was set

func (*Stats) Agent Uses

func (s *Stats) Agent() string

Agent returns the agent the stat is for if it was set

func (*Stats) All Uses

func (s *Stats) All() bool

All determines if all expected nodes replied already

func (*Stats) DiscoveredCount Uses

func (s *Stats) DiscoveredCount() int

DiscoveredCount is how many nodes were discovered

func (*Stats) DiscoveredNodes Uses

func (s *Stats) DiscoveredNodes() *[]string

DiscoveredNodes are the nodes that was discovered for this request

func (*Stats) DiscoveryDuration Uses

func (s *Stats) DiscoveryDuration() (time.Duration, error)

DiscoveryDuration determines how long discovery took, 0 and error when discovery was not done

func (*Stats) End Uses

func (s *Stats) End()

End records the end time of a request

func (*Stats) EndDiscover Uses

func (s *Stats) EndDiscover()

EndDiscover records the end time of the discovery process

func (*Stats) EndPublish Uses

func (s *Stats) EndPublish()

EndPublish records the publish process ended

func (*Stats) FailCount Uses

func (s *Stats) FailCount() int

FailCount is the number of responses that were failures

func (*Stats) FailedRequestInc Uses

func (s *Stats) FailedRequestInc()

FailedRequestInc increments the failed request counter by one

func (*Stats) Merge Uses

func (s *Stats) Merge(other *Stats) error

Merge merges the stats from a specific batch into this

func (*Stats) NoResponseFrom Uses

func (s *Stats) NoResponseFrom() []string

NoResponseFrom calculates discovered which hosts did not respond

func (*Stats) OKCount Uses

func (s *Stats) OKCount() int

OKCount is the number of responses that were ok

func (*Stats) PassedRequestInc Uses

func (s *Stats) PassedRequestInc()

PassedRequestInc increments the passed request counter by one

func (*Stats) PublishDuration Uses

func (s *Stats) PublishDuration() (time.Duration, error)

PublishDuration calculates how long publishing took

func (*Stats) RecordReceived Uses

func (s *Stats) RecordReceived(sender string)

RecordReceived reords the fact that one message was received

func (*Stats) RequestDuration Uses

func (s *Stats) RequestDuration() (time.Duration, error)

RequestDuration calculates the total duration

func (*Stats) ResponsesCount Uses

func (s *Stats) ResponsesCount() int

ResponsesCount if the total amount of nodes that responded so far

func (*Stats) SetAction Uses

func (s *Stats) SetAction(a string)

SetAction stores the action the stats is for

func (*Stats) SetAgent Uses

func (s *Stats) SetAgent(a string)

SetAgent stores the agent the stats is for

func (*Stats) SetDiscoveredNodes Uses

func (s *Stats) SetDiscoveredNodes(nodes []string)

SetDiscoveredNodes records the node names we expect to communicate with

func (*Stats) Start Uses

func (s *Stats) Start()

Start records the start time of a request

func (*Stats) StartDiscover Uses

func (s *Stats) StartDiscover()

StartDiscover records the start time of the discovery process

func (*Stats) StartPublish Uses

func (s *Stats) StartPublish()

StartPublish records the publish process started

func (*Stats) UnexpectedResponseFrom Uses

func (s *Stats) UnexpectedResponseFrom() []string

UnexpectedResponseFrom calculates which hosts responses that we did not expect responses from

func (*Stats) WaitingFor Uses

func (s *Stats) WaitingFor(nodes []string) bool

WaitingFor checks if any of the given nodes are still outstanding

Package client imports 20 packages (graph) and is imported by 7 packages. Updated 2020-01-11. Refresh now. Tools for package owners.