cable

package module
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2023 License: MIT Imports: 23 Imported by: 0

README

xk6-cable

A k6 extension for testing Action Cable and AnyCable functionality. Built for k6 using the xk6 system.

Comparing to the official WebSockets support, xk6-cable provides the following features:

  • Built-in Action Cable API support (no need to manually build or parse protocol messages).
  • Synchronous API to initialize connections and subscriptions.
  • AnyCable-specific extensions (e.g., binary encodings)

Read also "Real-time stress: AnyCable, k6, WebSockets, and Yabeda"

We also provide JS helpers to simplify writing benchmarks for Rails applications.

Build

To build a k6 binary with this extension, first ensure you have the prerequisites:

  1. Install xk6 framework for extending k6:
go install go.k6.io/xk6/cmd/xk6@latest
  1. Build the binary:
xk6 build --with github.com/anycable/xk6-cable@latest

# you can specify k6 version
xk6 build v0.42.0 --with github.com/anycable/xk6-cable@latest

# or if you want to build from the local source
xk6 build --with github.com/anycable/xk6-cable@latest=/path/to/source

Example

Consider a simple example using the EchoChannel:

// benchmark.js
import { check, sleep } from 'k6';
import cable from "k6/x/cable";

export default function () {
  // Initialize the connection
  const client = cable.connect("ws://localhost:8080/cable");
  // If connection were not sucessful, the return value is null
  // It's a good practice to add a check and configure a threshold (so, you can fail-fast if
  // configuration is incorrect)
  if (
    !check(client, {
      "successful connection": (obj) => obj,
    })
  ) {
    fail("connection failed");
  }

  // At this point, the client has been successfully connected
  // (e.g., welcome message has been received)

  // Send subscription request and wait for the confirmation.
  // Returns null if failed to subscribe (due to rejection or timeout).
  const channel = client.subscribe("EchoChannel");

  // Perform an action
  channel.perform("echo", { foo: 1 });

  // Retrieve a single message from the incoming inbox (FIFO).
  // Returns null if no messages have been received in the specified period of time (see below).
  const res = channel.receive();
  check(res, {
    "received res": (obj) => obj.foo === 1,
  });

  channel.perform("echo", { foobar: 3 });
  channel.perform("echo", { foobaz: 3 });

  // You can also retrieve multiple messages at a time.
  // Returns as many messages (but not more than expected) as have been received during
  // the specified period of time. If none, returns an empty array.
  const reses = channel.receiveN(2);
  check(reses, {
    "received 2 messages": (obj) => obj.length === 2,
  });

  sleep(1);

  // You can also subscribe to a channel asynchrounsly and wait for the confirmation later
  // That allows to send multiple subscribe commands at once in a non-blocking way
  const channelSubscribed = client.subscribeAsync("EchoChannel", { foo: 1 });
  const anotherChannelSubscribed = client.subscribeAsync("EchoChannel", { foo: 2 });

  // Wait for the confirmation
  channelSubscribed.await();
  anotherChannelSubscribed.await();

  // Terminate the WS connection
  client.disconnect()
}

Example run results:

$ ./k6 run benchmark.js


          /\      |‾‾| /‾‾/   /‾‾/
     /\  /  \     |  |/  /   /  /
    /  \/    \    |     (   /   ‾‾\
   /          \   |  |\  \ |  (‾)  |
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: benchmark.js
     output: -

  scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
           * default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)


running (00m00.0s), 0/1 VUs, 1 complete and 0 interrupted iterations
default ✓ [======================================] 1 VUs  00m00.0s/10m0s  1/1 iters, 1 per VU

     ✓ received res
     ✓ received res2
     ✓ received 3 messages
     ✓ received 2 messages
     ✓ all messages with baz attr

     checks...............: 100.00% ✓ 5 ✗ 0
     data_received........: 995 B   83 kB/s
     data_sent............: 1.2 kB  104 kB/s
     iteration_duration...: avg=11.06ms  min=11.06ms  med=11.06ms  max=11.06ms  p(90)=11.06ms  p(95)=11.06ms
     iterations...........: 1       83.850411/s
     ws_connecting........: avg=904.62µs min=904.62µs med=904.62µs max=904.62µs p(90)=904.62µs p(95)=904.62µs
     ws_msgs_received.....: 9       754.653698/s
     ws_msgs_sent.........: 9       754.653698/s
     ws_sessions..........: 1       83.850411/s

You can pass the following options to the connect method as the second argument:

{
  headers: {}, // HTTP headers to use (e.g., { COOKIE: 'some=cookie;' })
  cookies: "", // HTTP cookies as string (overwrite the value passed in headers if present)
  tags: {}, // k6 tags
  handshakeTimeoutS: 60, // Max allowed time to initialize a connection
  receiveTimeoutMs: 1000, // Max time to wait for an incoming message
  logLevel: "info" // logging level (change to debug to see more information)
  codec: "json", // Codec (encoder) to use. Supported values are: json, msgpack, protobuf.
}

NOTE: msgpack and protobuf codecs are only supported by AnyCable PRO.

More examples could be found in the examples/ folder.

JS helpers for k6

We provide a collection of utils to simplify development of k6 scripts for Rails applications (w/Action Cable or AnyCable):

import {
  cableUrl, // reads the value of the action-cable-url (or cable-url) meta value
  turboStreamSource, // find and returns a stream name and a channel name from the <turbo-cable-stream-source> element
  csrfToken, // reads the value of the csrf-token meta value
  csrfParam, // reads the value of the csrf-param meta value
  readMeta, // reads the value of the meta tag with the given name
} from 'https://anycable.io/xk6-cable/jslib/k6-rails/0.1.0/index.js'

export default function () {
  let res = http.get("http://localhost:3000/home");

  if (
    !check(res, {
      "is status 200": (r) => r.status === 200,
    })
  ) {
    fail("couldn't open page");
  }

  const html = res.html();
  const wsUrl = cableUrl(html);

  if (!wsUrl) {
    fail("couldn't find cable url on the page");
  }

  let client = cable.connect(wsUrl);

  if (
    !check(client, {
      "successful connection": (obj) => obj,
    })
  ) {
    fail("connection failed");
  }

  let { streamName, channelName } = turboStreamSource(html);

  if (!streamName) {
    fail("couldn't find a turbo stream element");
  }

  let channel = client.subscribe(channelName, {
    signed_stream_name: streamName,
  });

  if (
    !check(channel, {
      "successful subscription": (obj) => obj,
    })
  ) {
    fail("failed to subscribe");
  }

  // ...
}

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/anycable/xk6-cable.

License

The gem is available as open source under the terms of the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var JSONCodec = &Codec{
	Receive: func(c *websocket.Conn, v interface{}) error {
		return c.ReadJSON(v)
	},
	Send: func(c *websocket.Conn, v interface{}) error {
		return c.WriteJSON(v)
	},
}
View Source
var MsgPackCodec = &Codec{
	Receive: func(c *websocket.Conn, v interface{}) error {
		_, r, err := c.NextReader()
		if err != nil {
			return err
		}
		enc := msgpack.NewDecoder(r)
		enc.SetCustomStructTag("json")
		return enc.Decode(v)
	},
	Send: func(c *websocket.Conn, v interface{}) error {
		w, err := c.NextWriter(websocket.BinaryMessage)
		if err != nil {
			return err
		}
		enc := msgpack.NewEncoder(w)
		enc.SetCustomStructTag("json")
		err1 := enc.Encode(v)
		err2 := w.Close()
		if err1 != nil {
			return err1
		}
		return err2
	},
}
View Source
var ProtobufCodec = &Codec{
	Receive: func(c *websocket.Conn, v interface{}) error {
		mtype, r, err := c.NextReader()
		if err != nil {
			return err
		}

		if mtype != websocket.BinaryMessage {
			return fmt.Errorf("Unexpected message type: %v", mtype)
		}

		raw, err := ioutil.ReadAll(r)

		if err != nil {
			return err
		}

		buf := &pb.Message{}
		if err := proto.Unmarshal(raw, buf); err != nil {
			return err
		}

		msg := (v).(*cableMsg)

		msg.Type = buf.Type.String()
		msg.Identifier = buf.Identifier

		if buf.Message != nil {
			var message interface{}

			err = msgpack.Unmarshal(buf.Message, &message)
			msg.Message = message
		}

		return nil
	},
	Send: func(c *websocket.Conn, v interface{}) error {
		msg := (v).(*cableMsg)

		buf := &pb.Message{}

		buf.Command = pb.Command(pb.Command_value[msg.Command])
		buf.Identifier = msg.Identifier
		buf.Data = msg.Data

		b, err := proto.Marshal(buf)
		if err != nil {
			return err
		}

		w, err := c.NextWriter(websocket.BinaryMessage)
		if err != nil {
			return err
		}

		w.Write(b)
		err = w.Close()
		return err
	},
}

Functions

This section is empty.

Types

type AttrMatcher

type AttrMatcher struct {
	// contains filtered or unexported fields
}

func (*AttrMatcher) Match

func (m *AttrMatcher) Match(msg interface{}) bool

type Cable

type Cable struct {
	// contains filtered or unexported fields
}

func (*Cable) Connect

func (c *Cable) Connect(cableUrl string, opts goja.Value) (*Client, error)

Connect connects to the websocket, creates and starts client, and returns it to the js.

type CableModule added in v0.3.0

type CableModule struct {
	*Cable
}

func (*CableModule) Exports added in v0.3.0

func (c *CableModule) Exports() modules.Exports

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

func NewChannel added in v0.7.0

func NewChannel(c *Client, identifier string) *Channel

func (*Channel) AckDuration added in v0.7.0

func (ch *Channel) AckDuration() int64

func (*Channel) IgnoreReads added in v0.0.2

func (ch *Channel) IgnoreReads()

IgnoreReads allows skipping collecting incoming messages (in case you only care about the subscription)

func (*Channel) OnMessage added in v0.4.0

func (ch *Channel) OnMessage(fn goja.Value)

Register callback to receive messages asynchronously

func (*Channel) Perform

func (ch *Channel) Perform(action string, attr goja.Value) error

Perform sends passed action with additional data to the channel

func (*Channel) Receive

func (ch *Channel) Receive(attr goja.Value) interface{}

Receive checks channels messages query for message, sugar for ReceiveN(1, attrs)

func (*Channel) ReceiveAll added in v0.5.0

func (ch *Channel) ReceiveAll(sec int, cond goja.Value) []interface{}

ReceiveAll fethes all messages for a given number of seconds.

func (*Channel) ReceiveN

func (ch *Channel) ReceiveN(n int, cond goja.Value) []interface{}

ReceiveN checks channels messages query for provided number of messages satisfying provided condition.

type Client

type Client struct {
	// contains filtered or unexported fields
}

func (*Client) Disconnect

func (c *Client) Disconnect()

func (*Client) Loop added in v0.5.0

func (c *Client) Loop(fn goja.Value)

Repeat function in a loop until it returns false

func (*Client) Subscribe

func (c *Client) Subscribe(channelName string, paramsIn goja.Value) (*Channel, error)

Subscribe creates and returns Channel

func (*Client) SubscribeAsync added in v0.7.0

func (c *Client) SubscribeAsync(channelName string, paramsIn goja.Value) (*SubscribePromise, error)

Subscribe creates and returns Channel

type Codec

type Codec struct {
	Receive func(*websocket.Conn, interface{}) error
	Send    func(*websocket.Conn, interface{}) error
}

type FuncMatcher

type FuncMatcher struct {
	// contains filtered or unexported fields
}

func (*FuncMatcher) Match

func (m *FuncMatcher) Match(msg interface{}) bool

type Matcher

type Matcher interface {
	Match(msg interface{}) bool
}

type PassthruMatcher

type PassthruMatcher struct{}

func (PassthruMatcher) Match

func (PassthruMatcher) Match(_ interface{}) bool

type RootModule added in v0.3.0

type RootModule struct{}

func New added in v0.3.0

func New() *RootModule

func (*RootModule) NewModuleInstance added in v0.3.0

func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance

type StringMatcher

type StringMatcher struct {
	// contains filtered or unexported fields
}

func (*StringMatcher) Match

func (m *StringMatcher) Match(msg interface{}) bool

type SubscribePromise added in v0.7.0

type SubscribePromise struct {
	// contains filtered or unexported fields
}

func (*SubscribePromise) Await added in v0.7.0

func (sp *SubscribePromise) Await(ms int) (*Channel, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL