posix_mq

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2024 License: MIT Imports: 6 Imported by: 5

README

posix_mq

a Go wrapper and utility for POSIX Message Queues

posix_mq is a Go wrapper for POSIX Message Queues. It's important you read the manual for POSIX Message Queues, ms_send(2) and mq_receive(2) before using this library. posix_mq is a very light wrapper, and will not hide any errors from you.

Example

Sender
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/joe-at-startupmedia/posix_mq"
)

const maxSendTickNum = 10

func main() {
	mq, err := posix_mq.NewMessageQueue(&posix_mq.QueueConfig{
		Name:  "posix_mq_example_simple",
		Flags: posix_mq.O_WRONLY | posix_mq.O_CREAT,
		Mode:  0666,
	})
	if err != nil {
		log.Fatalf("Sender: error initializing %s", err)
	}

	count := 0
	for {
		count++
		if err = mq.Send([]byte(fmt.Sprintf("Hello, World : %d\n", count)), 0); err != nil {
			fmt.Printf("Sender: error sending message: %s\n", err)
			continue
		}

		fmt.Println("Sender: Sent a new message")

		if count >= maxSendTickNum {
			break
		}

		time.Sleep(1 * time.Second)
	}
	fmt.Println("Sender: finished")
}
Receiver
package main

import (
	"fmt"
	"log"

	"github.com/joe-at-startupmedia/posix_mq"
)

const maxSendTickNum = 10

func main() {
	mq, err := posix_mq.NewMessageQueue(&posix_mq.QueueConfig{
		Name:  "posix_mq_example_simple",
		Flags: posix_mq.O_RDONLY
		Mode:  0666,
	})
	if err != nil {
		log.Fatalf("Receiver: error initializing %s", err)
	}
	defer func() {
		if err := mq.Unlink(); err != nil {
			log.Println(err)
		}
		fmt.Println("Receiver: finished")
	}()

	fmt.Println("Receiver: Start receiving messages")

	count := 0
	for {
		count++

		msg, _, err := mq.Receive()
		if err != nil {
			fmt.Printf("Receiver: error getting message: %s\n", err)
			continue
		}
		
		fmt.Printf("Receiver: got new message: %s\n", string(msg))

		if count >= maxSendTickNum {
			break
		}
	}
}

Acknowledgement

It's inspired by Shopify/sysv_mq

Documentation

Index

Constants

View Source
const (
	O_RDONLY = C.O_RDONLY
	O_WRONLY = C.O_WRONLY
	O_RDWR   = C.O_RDWR

	O_CLOEXEC  = C.O_CLOEXEC
	O_CREAT    = C.O_CREAT
	O_EXCL     = C.O_EXCL
	O_NONBLOCK = C.O_NONBLOCK

	// Based on Linux 3.5+
	MSGSIZE_MAX     = 16777216
	MSGSIZE_DEFAULT = MSGSIZE_MAX
)
View Source
const POSIX_MQ_DIR = "/dev/mqueue/"

Variables

View Source
var (
	MemoryAllocationError = fmt.Errorf("Memory Allocation Error")
)

Functions

func ForceRemoveQueue added in v0.2.1

func ForceRemoveQueue(name string) error

ForceRemoveQueue deletes the posix queue by name If one or more processes have the message queue open when mq_unlink() is called, destruction of the message queue shall be postponed until all references to the message queue have been closed.

Types

type MessageQueue

type MessageQueue struct {
	// contains filtered or unexported fields
}

Represents the message queue

func NewMessageQueue

func NewMessageQueue(config *QueueConfig) (*MessageQueue, error)

NewMessageQueue returns an instance of the message queue given a QueueConfig.

func (*MessageQueue) Close

func (mq *MessageQueue) Close() error

Close closes the message queue.

func (*MessageQueue) Count added in v0.2.0

func (mq *MessageQueue) Count() (int, error)

Count gets the number of queued messages

func (*MessageQueue) GetAttr added in v0.2.1

func (mq *MessageQueue) GetAttr() (*MessageQueueAttribute, error)

GetAttr gets the queue attributes

func (*MessageQueue) Notify

func (mq *MessageQueue) Notify(sigNo syscall.Signal) error

Notify set signal notification to handle new message

func (*MessageQueue) Receive

func (mq *MessageQueue) Receive() ([]byte, uint, error)

Receive receives message from the message queue.

func (*MessageQueue) Send

func (mq *MessageQueue) Send(data []byte, priority uint) error

Send sends message to the message queue.

func (*MessageQueue) TimedReceive

func (mq *MessageQueue) TimedReceive(duration time.Duration) ([]byte, uint, error)

TimedReceive receives message from the message queue with a ceiling on the time for which the call will block.

func (*MessageQueue) TimedSend

func (mq *MessageQueue) TimedSend(data []byte, priority uint, duration time.Duration) error

TimedSend sends message to the message queue with a ceiling on the time for which the call will block.

func (mq *MessageQueue) Unlink() error

Unlink deletes the message queue.

type MessageQueueAttribute

type MessageQueueAttribute struct {
	Flags   int // Flags (ignored for mq_open())
	MaxMsg  int // Max. # of messages on queue
	MsgSize int // Max. message size (bytes)
	MsgCnt  int // # of messages in the queue
}

type QueueConfig added in v0.2.0

type QueueConfig struct {
	Name  string
	Dir   string
	Flags int
	Mode  int // The mode of the message queue, e.g. 0600
	Attrs *MessageQueueAttribute
}

QueueConfig is used to configure an instance of the message queue.

func (*QueueConfig) GetFile added in v0.2.0

func (config *QueueConfig) GetFile() string

GetFile gets the file on the OS where the queues are stored

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL