amqp

package module
v0.0.0-...-b4f76ec Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 11, 2021 License: Apache-2.0 Imports: 2 Imported by: 0

README

⚠️ This is a proof of concept

As this is a proof of concept, it won't be supported by the k6 team. It may also break in the future as xk6 evolves. USE AT YOUR OWN RISK! Any issues with the tool should be raised here.



xk6-amqp

AMQP xk6 plugin. Built for k6 using xk6.

Build

To build a k6 binary with this extension, first ensure you have the prerequisites:

Then:

  1. Download xk6:
$ go get github.com/k6io/xk6@latest
  1. Build the binary:
$ xk6 build --with github.com/lxkuz/xk6-amqp@latest

Development

To make development a little smoother, you may run the build script provided in the root folder. It will create a k6 binary with your local code rather than from GitHub.

$ ./build.sh && ./k6 run my-test-script.js

Example

import Amqp from 'k6/x/amqp';
import Queues from 'k6/x/amqp/queues';

export default function () {
  console.log("K6 amqp extension enabled, version: " + Amqp.version)
  const url = "amqp://guest:guest@localhost:5672/"
  Amqp.start({
    connection_url: url
  })
  console.log("Connection opened: " + url)
  
  const queueName = 'K6 general'
  
  Queues.declare({
    name: queueName,
    // durable: false,
    // delete_when_unused: false,
    // exclusive: false,
    // no_wait: false,
    // args: null
  })

  console.log(queueName + " queue is ready")

  Amqp.publish({
    queue_name: queueName,
    body: "Ping from k6"
    // exchange: '',
    // mandatory: false,
    // immediate: false,
  })

  const listener = function(data) { console.log('received data: ' + data) }
  Amqp.listen({
    queue_name: queueName,
    listener: listener,
    // consumer: '',
    // auto_ack: true,
    // exclusive: false,
    // no_local: false,
    // no_wait: false,
    // args: null
  })
}

Result output:

$ ./k6 run script.js

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: ../xk6-amqp/examples/test.js
     output: -

  scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
           * default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)

INFO[0000] K6 amqp extension enabled, version: v0.0.1    source=console
INFO[0000] Connection opened: amqp://guest:guest@localhost:5672/  source=console
INFO[0000] K6 general queue is ready                     source=console
INFO[0000] received data: Ping from k6                   source=console

running (00m00.0s), 0/1 VUs, 1 complete and 0 interrupted iterations
default ✓ [======================================] 1 VUs  00m00.0s/10m0s  1/1 iters, 1 per VU

     data_received........: 0 B 0 B/s
     data_sent............: 0 B 0 B/s
     iteration_duration...: avg=31.37ms min=31.37ms med=31.37ms max=31.37ms p(90)=31.37ms p(95)=31.37ms
     iterations...........: 1   30.855627/s

Inspect examples folder for more details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Amqp

type Amqp struct {
	Version    string
	Connection *amqpDriver.Connection
	Queues     *Queues
	Exchanges  *Exchanges
}

func (*Amqp) Listen

func (amqp *Amqp) Listen(options ListenOptions) error

func (*Amqp) Publish

func (amqp *Amqp) Publish(options PublishOptions) error

func (*Amqp) Start

func (amqp *Amqp) Start(options AmqpOptions) error

type AmqpOptions

type AmqpOptions struct {
	ConnectionUrl string
}

type ConsumeOptions

type ConsumeOptions struct {
	Consumer  string
	AutoAck   bool
	Exclusive bool
	NoLocal   bool
	NoWait    bool
	Args      amqpDriver.Table
}

type DeclareOptions

type DeclareOptions struct {
	Name             string
	Durable          bool
	DeleteWhenUnused bool
	Exclusive        bool
	NoWait           bool
	Args             amqpDriver.Table
}

type EchangeDeclareOptions

type EchangeDeclareOptions struct {
	Name       string
	Kind       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqpDriver.Table
}

type ExchangeBindOptions

type ExchangeBindOptions struct {
	DestinationExchangeName string
	SourceExchangeName      string
	RoutingKey              string
	NoWait                  bool
	Args                    amqpDriver.Table
}

type ExchangeOptions

type ExchangeOptions struct {
	ConnectionUrl string
}

type ExchangeUnindOptions

type ExchangeUnindOptions struct {
	DestinationExchangeName string
	SourceExchangeName      string
	RoutingKey              string
	NoWait                  bool
	Args                    amqpDriver.Table
}

type Exchanges

type Exchanges struct {
	Version    string
	Connection *amqpDriver.Connection
}

func (*Exchanges) Bind

func (exchanges *Exchanges) Bind(options ExchangeBindOptions) error

func (*Exchanges) Declare

func (exchanges *Exchanges) Declare(options EchangeDeclareOptions) error

func (*Exchanges) Delete

func (exchanges *Exchanges) Delete(name string) error

func (*Exchanges) Unbind

func (exchanges *Exchanges) Unbind(options ExchangeUnindOptions) error

type ListenOptions

type ListenOptions struct {
	Listener  ListenerType
	QueueName string
	Consumer  string
	AutoAck   bool
	Exclusive bool
	NoLocal   bool
	NoWait    bool
	Args      amqpDriver.Table
}

type ListenerType

type ListenerType func(string) error

type PublishOptions

type PublishOptions struct {
	QueueName string
	Body      string
	Exchange  string
	Mandatory bool
	Immediate bool
}

type QueueBindOptions

type QueueBindOptions struct {
	QueueName    string
	ExchangeName string
	RoutingKey   string
	NoWait       bool
	Args         amqpDriver.Table
}

type QueueOptions

type QueueOptions struct {
	ConnectionUrl string
}

type QueueUnindOptions

type QueueUnindOptions struct {
	QueueName    string
	ExchangeName string
	RoutingKey   string
	Args         amqpDriver.Table
}

type Queues

type Queues struct {
	Version    string
	Connection *amqpDriver.Connection
}

func (*Queues) Bind

func (queues *Queues) Bind(options QueueBindOptions) error

func (*Queues) Declare

func (queues *Queues) Declare(options DeclareOptions) (amqpDriver.Queue, error)

func (*Queues) Delete

func (queues *Queues) Delete(name string) error

func (*Queues) Inspect

func (queues *Queues) Inspect(name string) (amqpDriver.Queue, error)

func (*Queues) Purge

func (queues *Queues) Purge(name string, noWait bool) (int, error)

func (*Queues) Unbind

func (queues *Queues) Unbind(options QueueUnindOptions) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL