etcdqueue

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 27, 2021 License: Apache-2.0 Imports: 10 Imported by: 0

README

Etcd Queue

基于 https://github.com/etcd-io/etcd/tree/main/client/v3/experimental/recipes 代码修改。

Etcd Queue是基于Etcd集群做后端存储来实现任务队列的功能。

  • 写入任务
  • 读取任务

使用方法

package main

import (
	"fmt"

	"github.com/huweihuang/etcdqueue"
)

func main() {
	etcdConfig := &etcdqueue.EtcdConfig{
		Endpoints: "https://127.0.0.1:2379",
		CaFile:    "/etc/kubernetes/pki/etcd/ca.crt",
		KeyFile:   "/etc/kubernetes/pki/apiserver-etcd-client.key",
		CertFile:  "/etc/kubernetes/pki/apiserver-etcd-client.crt",
	}

	jobQueue, err := etcdqueue.NewEtcdQueue(etcdConfig, "/keyprefix")
	if err != nil {
		fmt.Errorf("faied to new etcd queue, error: %v", err)
	}

	err = jobQueue.Enqueue("{testjob}")
	if err != nil {
		fmt.Errorf("failed to enqueue")
	}

	job, err := jobQueue.Dequeue()
	if err != nil {
		fmt.Errorf("failed to dequeue")
	}
	fmt.Printf("queue: %v", job)
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrKeyExists      = errors.New("key already exists")
	ErrWaitMismatch   = errors.New("unexpected wait result")
	ErrTooManyClients = errors.New("too many clients")
	ErrNoWatcher      = errors.New("no watcher channel")
	ErrKeyNotFound    = errors.New("key not found")
)

Functions

func NewETCDClient

func NewETCDClient(etcdConfig *EtcdConfig) (*v3.Client, error)

NewETCDClient new etcd client v3

func WaitEvents

func WaitEvents(c *clientv3.Client, key string, rev int64, evs []mvccpb.Event_EventType) (*clientv3.Event, error)

WaitEvents waits on a key until it observes the given events and returns the final one.

func WaitPrefixEvents

func WaitPrefixEvents(ctx context.Context, c *clientv3.Client, prefix string, rev int64,
	evs []mvccpb.Event_EventType) (*clientv3.Event, error)

wait prefix events

Types

type EphemeralKV

type EphemeralKV struct{ RemoteKV }

EphemeralKV is a new key associated with a session lease

type EtcdConfig

type EtcdConfig struct {
	Endpoints        string        // args for clientv3.Config
	DialTimeout      time.Duration // args for clientv3.Config
	AutoSyncInterval time.Duration // args for clientv3.Config
	CaFile           string        // args for clientv3.Config.TLS
	CertFile         string        // args for clientv3.Config.TLS
	KeyFile          string        // args for clientv3.Config.TLS
}

EtcdConfig etcd client arguments

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue implements a multi-reader, multi-writer distributed queue.

func NewEtcdQueue

func NewEtcdQueue(etcdConfig *EtcdConfig, keyPrefix string) (*Queue, error)

NewEtcdQueue new a etcd queue

func NewQueue

func NewQueue(client *v3.Client, keyPrefix string) *Queue

NewQueue create Queue

func (*Queue) CancelWatch

func (q *Queue) CancelWatch()

cancel watch

func (*Queue) DeleteKey

func (q *Queue) DeleteKey(key string) error

Delete key

func (*Queue) DeleteKeyWithRevision

func (q *Queue) DeleteKeyWithRevision(key string, revision int64) (bool, error)

Delete key with revision

func (*Queue) Dequeue

func (q *Queue) Dequeue() (string, error)

Dequeue returns Enqueue()'d elements in FIFO order. If the queue is empty, Dequeue blocks until elements are available.

func (*Queue) DequeueFirstKey

func (q *Queue) DequeueFirstKey() (string, error)

Dequeue first key

func (*Queue) Enqueue

func (q *Queue) Enqueue(val string) error

enqueue

func (*Queue) EnqueueReturnKey

func (q *Queue) EnqueueReturnKey(val string) (string, error)

Enqueue key

func (*Queue) GetAllKeys

func (q *Queue) GetAllKeys() (map[string]string, error)

Get all keys

func (*Queue) GetFirstKey

func (q *Queue) GetFirstKey() (string, string, error)

returns key string value string error

func (*Queue) GetKey

func (q *Queue) GetKey(key string) (string, error)

Get key

func (*Queue) GetKeyAndRevision

func (q *Queue) GetKeyAndRevision(key string) (string, int64, error)

Get key and revision

func (*Queue) GetLastKey

func (q *Queue) GetLastKey() (string, string, error)

returns key string value string error

func (*Queue) UpdateKey

func (q *Queue) UpdateKey(key, value string) error

Update Key

func (*Queue) UpdateKeyWithRevison

func (q *Queue) UpdateKeyWithRevison(key, value string, revision int64) (bool, error)

Update key with revision

type RemoteKV

type RemoteKV struct {
	// contains filtered or unexported fields
}

RemoteKV is a key/revision pair created by the client and stored on etcd

func (*RemoteKV) Delete

func (rk *RemoteKV) Delete() error

Delete

func (*RemoteKV) Key

func (rk *RemoteKV) Key() string

Get Key

func (*RemoteKV) Put

func (rk *RemoteKV) Put(val string) error

Put

func (*RemoteKV) Revision

func (rk *RemoteKV) Revision() int64

Get revision

func (*RemoteKV) Value

func (rk *RemoteKV) Value() string

Get value

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL