couchbase

package module
v0.0.0-...-9f5e76e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2014 License: MIT Imports: 21 Imported by: 1

README

A smart client for couchbase in go

This is an evolving package, but does provide a useful interface to a couchbase server including all of the pool/bucket discovery features, compatible key distribution with other clients, and vbucket motion awareness so application can continue to operate during rebalances.

It also supports view querying with source node randomization so you don't bang on all one node to do all the work.

Install

go get github.com/couchbaselabs/go-couchbase

Example

c, err := couchbase.Connect("http://dev-couchbase.example.com:8091/")
if err != nil {
	log.Fatalf("Error connecting:  %v", err)
}

pool, err := c.GetPool("default")
if err != nil {
	log.Fatalf("Error getting pool:  %v", err)
}

bucket, err := pool.GetBucket("default")
if err != nil {
	log.Fatalf("Error getting bucket:  %v", err)
}

bucket.Set("someKey", []string{"an", "example", "list"})

Documentation

Overview

Package couchbase provides a smart client for go.

Usage:

client, err := couchbase.Connect("http://myserver:8091/")
handleError(err)
pool, err := client.GetPool("default")
handleError(err)
bucket, err := pool.GetBucket("MyAwesomeBucket")
handleError(err)
...

or a shortcut for the bucket directly

bucket, err := couchbase.GetBucket("http://myserver:8091/", "default", "default")

in any case, you can specify authentication credentials using standard URL userinfo syntax:

b, err := couchbase.GetBucket("http://bucketname:bucketpass@myserver:8091/",
        "default", "bucket")

Index

Constants

View Source
const (
	// Raw specifies that the value is raw []byte or nil; don't
	// JSON-encode it.
	Raw = WriteOptions(1 << iota)
	// AddOnly indicates an item should only be written if it
	// doesn't exist, otherwise ErrKeyExists is returned.
	AddOnly
	// Persist causes the operation to block until the server
	// confirms the item is persisted.
	Persist
	// Indexable causes the operation to block until it's availble via the index.
	Indexable
	// Append indicates the given value should be appended to the
	// existing value for the given key.
	Append
)
View Source
const UpdateCancel = memcached.CASQuit

Return this as the error from an UpdateFunc to cancel the Update operation.

Variables

View Source
var ClientOpCallback func(opname, k string, start time.Time, err error)

ClientOpCallback is called for each invocation of Do.

View Source
var ConnPoolAvailWaitTime = time.Millisecond

ConnPoolAvailWaitTime is the amount of time to wait for an existing connection from the pool before considering the creation of a new one.

View Source
var ConnPoolCallback func(host string, source string, start time.Time, err error)

ConnPoolTimeout is notified whenever connections are acquired from a pool.

View Source
var ConnPoolTimeout = time.Hour * 24 * 30

Default timeout for retrieving a connection from the pool.

View Source
var ErrKeyExists = errors.New("key exists")

Error returned from Write with AddOnly flag, when key already exists in the bucket.

View Source
var ErrOverwritten = errors.New("overwritten")

Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used) if the value has been overwritten by another before being persisted.

View Source
var ErrTimeout = errors.New("timeout")

Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used) if the value hasn't been persisted by the timeout interval

View Source
var HTTPClient = http.DefaultClient

HTTPClient to use for REST and view operations.

View Source
var MaxBulkRetries = 1000

Maximum number of times to retry a chunk of a bulk get on error.

View Source
var PoolOverflow = PoolSize

PoolOverflow is the number of overflow connections allowed in a pool.

View Source
var PoolSize = 4

PoolSize is the size of each connection pool (per host).

View Source
var SlowServerCallWarningThreshold time.Duration

If this is set to a nonzero duration, Do() and ViewCustom() will log a warning if the call takes longer than that.

View Source
var ViewCallback func(ddoc, name string, start time.Time, err error)

ViewCallback is called for each view invocation.

Functions

func CalculateVector

func CalculateVector(lastSeq uint64, flog FailoverLog) (
	vuuid uint64, startseq uint64, highseq uint64)

CalculateVector will return a 3-element tuple of (vbucket-uuid, startSeq, highSeq) for a give failover-log and last known sequence number.

func CleanupHost

func CleanupHost(h, commonSuffix string) string

CleanupHost returns the hostname with the given suffix removed.

func FindCommonSuffix

func FindCommonSuffix(input []string) string

FindCommonSuffix returns the longest common suffix from the given strings.

func ParseURL

func ParseURL(urlStr string) (result *url.URL, err error)

ParseURL is a wrapper around url.Parse with some sanity-checking

Types

type AuthHandler

type AuthHandler interface {
	GetCredentials() (string, string)
}

AuthHandler is a callback that gets the auth username and password for the given bucket.

type Bucket

type Bucket struct {
	AuthType            string             `json:"authType"`
	Capabilities        []string           `json:"bucketCapabilities"`
	CapabilitiesVersion string             `json:"bucketCapabilitiesVer"`
	Type                string             `json:"bucketType"`
	Name                string             `json:"name"`
	NodeLocator         string             `json:"nodeLocator"`
	Quota               map[string]float64 `json:"quota,omitempty"`
	Replicas            int                `json:"replicaNumber"`
	Password            string             `json:"saslPassword"`
	URI                 string             `json:"uri"`
	StreamingURI        string             `json:"streamingUri"`
	LocalRandomKeyURI   string             `json:"localRandomKeyUri,omitempty"`
	UUID                string             `json:"uuid"`
	DDocs               struct {
		URI string `json:"uri"`
	} `json:"ddocs,omitempty"`
	BasicStats  map[string]interface{} `json:"basicStats,omitempty"`
	Controllers map[string]interface{} `json:"controllers,omitempty"`

	// These are used for JSON IO, but isn't used for processing
	// since it needs to be swapped out safely.
	VBSMJson  VBucketServerMap `json:"vBucketServerMap"`
	NodesJSON []Node           `json:"nodes"`
	// contains filtered or unexported fields
}

Bucket is the primary entry point for most data operations.

func GetBucket

func GetBucket(endpoint, poolname, bucketname string) (*Bucket, error)

GetBucket is a convenience function for getting a named bucket from a URL

func (*Bucket) Add

func (b *Bucket) Add(k string, exp int, v interface{}) (added bool, err error)

Add adds a value to this bucket; like Set except that nothing happens if the key exists. The value will be serialized into a JSON document.

func (*Bucket) AddRaw

func (b *Bucket) AddRaw(k string, exp int, v []byte) (added bool, err error)

AddRaw adds a value to this bucket; like SetRaw except that nothing happens if the key exists. The value will be stored as raw bytes.

func (*Bucket) Append

func (b *Bucket) Append(k string, data []byte) error

Append appends raw data to an existing item.

func (*Bucket) Cas

func (b *Bucket) Cas(k string, exp int, cas uint64, v interface{}) error

Set a value in this bucket with Cas

func (*Bucket) CasRaw

func (b *Bucket) CasRaw(k string, exp int, cas uint64, v interface{}) error

Set a value in this bucket with Cas without json encoding it

func (*Bucket) Close

func (b *Bucket) Close()

Close marks this bucket as no longer needed, closing connections it may have open.

func (Bucket) CommonAddressSuffix

func (b Bucket) CommonAddressSuffix() string

CommonAddressSuffix finds the longest common suffix of all host:port strings in the node list.

func (*Bucket) Delete

func (b *Bucket) Delete(k string) error

Delete a key from this bucket.

func (*Bucket) DeleteDDoc

func (b *Bucket) DeleteDDoc(docname string) error

DeleteDDoc removes a design document.

func (*Bucket) Do

func (b *Bucket) Do(k string, f func(mc *memcached.Client, vb uint16) error) (err error)

Do executes a function on a memcached connection to the node owning key "k"

Note that this automatically handles transient errors by replaying your function on a "not-my-vbucket" error, so don't assume your command will only be executed only once.

func (*Bucket) Get

func (b *Bucket) Get(k string, rv interface{}) error

Get a value from this bucket. The value is expected to be a JSON stream and will be deserialized into rv.

func (*Bucket) GetBulk

func (b *Bucket) GetBulk(keys []string) (map[string]*gomemcached.MCResponse, error)

GetBulk fetches multiple keys concurrently.

Unlike more convenient GETs, the entire response is returned in the map for each key. Keys that were not found will not be included in the map.

func (*Bucket) GetDDoc

func (b *Bucket) GetDDoc(docname string, into interface{}) error

GetDDoc retrieves a specific a design doc.

func (*Bucket) GetDDocs

func (b *Bucket) GetDDocs() (DDocsResult, error)

GetDDocs lists all design documents

func (*Bucket) GetFailoverLogs

func (b *Bucket) GetFailoverLogs(name string) ([]FailoverLog, error)

GetFailoverLogs return a list of vuuid and sequence number for all vbuckets.

func (*Bucket) GetPool

func (b *Bucket) GetPool() *Pool

GetPool gets the pool to which this bucket belongs.

func (*Bucket) GetRaw

func (b *Bucket) GetRaw(k string) ([]byte, error)

GetRaw gets a raw value from this bucket. No marshaling is performed.

func (*Bucket) GetStats

func (b *Bucket) GetStats(which string) map[string]map[string]string

GetStats gets a set of stats from all servers.

Returns a map of server ID -> map of stat key to map value.

func (*Bucket) Gets

func (b *Bucket) Gets(k string, rv interface{}, caso *uint64) error

Gets gets a value from this bucket, including its CAS counter. The value is expected to be a JSON stream and will be deserialized into rv.

func (*Bucket) GetsRaw

func (b *Bucket) GetsRaw(k string) (data []byte, flags int,
	cas uint64, err error)

GetsRaw gets a raw value from this bucket including its CAS counter and flags.

func (*Bucket) Incr

func (b *Bucket) Incr(k string, amt, def uint64, exp int) (val uint64, err error)

Incr increments the value at a given key.

func (Bucket) NodeAddresses

func (b Bucket) NodeAddresses() []string

NodeAddresses gets the (sorted) list of memcached node addresses (hostname:port).

func (Bucket) Nodes

func (b Bucket) Nodes() []Node

Nodes returns teh current list of nodes servicing this bucket.

func (*Bucket) Observe

func (b *Bucket) Observe(k string) (result memcached.ObserveResult, err error)

Observe observes the current state of a document.

func (*Bucket) PutDDoc

func (b *Bucket) PutDDoc(docname string, value interface{}) error

PutDDoc installs a design document.

func (*Bucket) Set

func (b *Bucket) Set(k string, exp int, v interface{}) error

Set a value in this bucket. The value will be serialized into a JSON document.

func (*Bucket) SetRaw

func (b *Bucket) SetRaw(k string, exp int, v []byte) error

SetRaw sets a value in this bucket without JSON encoding it.

func (*Bucket) StartTapFeed

func (b *Bucket) StartTapFeed(args *memcached.TapArguments) (*TapFeed, error)

StartTapFeed creates and starts a new Tap feed

func (*Bucket) Update

func (b *Bucket) Update(k string, exp int, callback UpdateFunc) error

Update performs a Safe update of a document, avoiding conflicts by using CAS.

The callback function will be invoked with the current raw document contents (or nil if the document doesn't exist); it should return the updated raw contents (or nil to delete.) If it decides not to change anything it can return UpdateCancel as the error.

If another writer modifies the document between the get and the set, the callback will be invoked again with the newer value.

func (*Bucket) VBHash

func (b *Bucket) VBHash(key string) uint32

VBHash finds the vbucket for the given key.

func (Bucket) VBServerMap

func (b Bucket) VBServerMap() *VBucketServerMap

VBServerMap returns the current VBucketServerMap.

func (*Bucket) View

func (b *Bucket) View(ddoc, name string, params map[string]interface{}) (ViewResult, error)

View executes a view.

The ddoc parameter is just the bare name of your design doc without the "_design/" prefix.

Parameters are string keys with values that correspond to couchbase view parameters. Primitive should work fairly naturally (booleans, ints, strings, etc...) and other values will attempt to be JSON marshaled (useful for array indexing on on view keys, for example).

Example:

res, err := couchbase.View("myddoc", "myview", map[string]interface{}{
    "group_level": 2,
    "start_key":    []interface{}{"thing"},
    "end_key":      []interface{}{"thing", map[string]string{}},
    "stale": false,
    })

func (*Bucket) ViewCustom

func (b *Bucket) ViewCustom(ddoc, name string, params map[string]interface{},
	vres interface{}) (err error)

ViewCustom performs a view request that can map row values to a custom type.

See the source to View for an example usage.

func (*Bucket) ViewURL

func (b *Bucket) ViewURL(ddoc, name string,
	params map[string]interface{}) (string, error)

ViewURL constructs a URL for a view with the given ddoc, view name, and parameters.

func (*Bucket) WaitForPersistence

func (b *Bucket) WaitForPersistence(k string, cas uint64, deletion bool) error

WaitForPersistence waits for an item to be considered durable.

Besides transport errors, ErrOverwritten may be returned if the item is overwritten before it reaches durability. ErrTimeout may occur if the item isn't found durable in a reasonable amount of time.

func (*Bucket) Write

func (b *Bucket) Write(k string, flags, exp int, v interface{},
	opt WriteOptions) (err error)

General-purpose value setter.

The Set, Add and Delete methods are just wrappers around this. The interpretation of `v` depends on whether the `Raw` option is given. If it is, v must be a byte array or nil. (A nil value causes a delete.) If `Raw` is not given, `v` will be marshaled as JSON before being written. It must be JSON-marshalable and it must not be nil.

func (*Bucket) WriteCas

func (b *Bucket) WriteCas(k string, flags, exp int, cas uint64, v interface{},
	opt WriteOptions) (err error)

func (*Bucket) WriteUpdate

func (b *Bucket) WriteUpdate(k string, exp int, callback WriteUpdateFunc) error

WriteUpdate performs a Safe update of a document, avoiding conflicts by using CAS. WriteUpdate is like Update, except that the callback can return a set of WriteOptions, of which Persist and Indexable are recognized: these cause the call to wait until the document update has been persisted to disk and/or become available to index.

type Client

type Client struct {
	BaseURL *url.URL

	Info Pools
	// contains filtered or unexported fields
}

A Client is the starting point for all services across all buckets in a Couchbase cluster.

func Connect

func Connect(baseU string) (Client, error)

Connect to a couchbase cluster. An authentication handler will be created from the userinfo in the URL if provided.

func ConnectWithAuth

func ConnectWithAuth(baseU string, ah AuthHandler) (c Client, err error)

ConnectWithAuth connects to a couchbase cluster with the given authentication handler.

func (*Client) GetPool

func (c *Client) GetPool(name string) (p Pool, err error)

GetPool gets a pool from within the couchbase cluster (usually "default").

type DDoc

type DDoc struct {
	Language string                    `json:"language,omitempty"`
	Views    map[string]ViewDefinition `json:"views"`
}

DDoc is the document body of a design document specifying a view.

type DDocsResult

type DDocsResult struct {
	Rows []struct {
		DDoc struct {
			Meta map[string]interface{}
			JSON DDoc
		} `json:"doc"`
	} `json:"rows"`
}

DDocsResult represents the result from listing the design documents.

type DocID

type DocID string

DocID is the document ID type for the startkey_docid parameter in views.

type FailoverLog

type FailoverLog [][2]uint64

FailoverLog is a slice of 2 element array, containing a list of, [[vuuid, sequence-no], [vuuid, sequence-no] ...]

type Node

type Node struct {
	ClusterCompatibility int                `json:"clusterCompatibility"`
	ClusterMembership    string             `json:"clusterMembership"`
	CouchAPIBase         string             `json:"couchApiBase"`
	Hostname             string             `json:"hostname"`
	InterestingStats     map[string]float64 `json:"interestingStats,omitempty"`
	MCDMemoryAllocated   float64            `json:"mcdMemoryAllocated"`
	MCDMemoryReserved    float64            `json:"mcdMemoryReserved"`
	MemoryFree           float64            `json:"memoryFree"`
	MemoryTotal          float64            `json:"memoryTotal"`
	OS                   string             `json:"os"`
	Ports                map[string]int     `json:"ports"`
	Status               string             `json:"status"`
	Uptime               int                `json:"uptime,string"`
	Version              string             `json:"version"`
	ThisNode             bool               `json:"thisNode,omitempty"`
}

A Node is a computer in a cluster running the couchbase software.

type Pool

type Pool struct {
	BucketMap map[string]Bucket
	Nodes     []Node

	BucketURL map[string]string `json:"buckets"`
	// contains filtered or unexported fields
}

A Pool of nodes and buckets.

func (*Pool) GetBucket

func (p *Pool) GetBucket(name string) (*Bucket, error)

GetBucket gets a bucket from within this pool.

func (*Pool) GetClient

func (p *Pool) GetClient() *Client

GetClient gets the client from which we got this pool.

type Pools

type Pools struct {
	ComponentsVersion     map[string]string `json:"componentsVersion,omitempty"`
	ImplementationVersion string            `json:"implementationVersion"`
	IsAdmin               bool              `json:"isAdminCreds"`
	UUID                  string            `json:"uuid"`
	Pools                 []RestPool        `json:"pools"`
}

Pools represents the collection of pools as returned from the REST API.

type RestPool

type RestPool struct {
	Name         string `json:"name"`
	StreamingURI string `json:"streamingUri"`
	URI          string `json:"uri"`
}

RestPool represents a single pool returned from the pools REST API.

type TapFeed

type TapFeed struct {
	C <-chan memcached.TapEvent
	// contains filtered or unexported fields
}

A TapFeed streams mutation events from a bucket.

Events from the bucket can be read from the channel 'C'. Remember to call Close() on it when you're done, unless its channel has closed itself already.

func (*TapFeed) Close

func (feed *TapFeed) Close() error

Close a Tap feed.

type UpdateFunc

type UpdateFunc func(current []byte) (updated []byte, err error)

An UpdateFunc is a callback function to update a document

type UprEvent

type UprEvent struct {
	Bucket  string // bucket name for this event
	Opstr   string // TODO: Make this consistent with TAP
	Vbucket uint16 // vbucket number
	Seqno   uint64 // sequence number
	Key     []byte
	Value   []byte
}

UprEvent objects will be created for stream mutations and deletions and published on UprFeed:C channel.

type UprFeed

type UprFeed struct {
	// Exported channel where an aggregate of all UprEvent are sent to app.
	C <-chan UprEvent
	// contains filtered or unexported fields
}

UprFeed is per bucket structure managing connections to all nodes and vbucket streams.

func StartUprFeed

func StartUprFeed(
	b *Bucket, name string, streams map[uint16]*UprStream) (*UprFeed, error)

StartUprFeed creates a feed that aggregates all mutations for the bucket and publishes them as UprEvent on UprFeed:C channel.

func (*UprFeed) Close

func (feed *UprFeed) Close()

Close UprFeed. Does the opposite of StartUprFeed()

func (*UprFeed) GetStream

func (feed *UprFeed) GetStream(vbno uint16) *UprStream

Get stream will return the UprStream structure for vbucket `vbucket`. Caller can use the UprStream information to restart the stream later.

type UprStream

type UprStream struct {
	Vbucket  uint16 // vbucket id
	Vuuid    uint64 // vbucket uuid
	Opaque   uint32 // messages from producer to this stream have same value
	Highseq  uint64 // to be supplied by the application
	Startseq uint64 // to be supplied by the application
	Endseq   uint64 // to be supplied by the application
	Flog     FailoverLog
}

UprStream will maintain stream information per vbucket

type VBucketServerMap

type VBucketServerMap struct {
	HashAlgorithm string   `json:"hashAlgorithm"`
	NumReplicas   int      `json:"numReplicas"`
	ServerList    []string `json:"serverList"`
	VBucketMap    [][]int  `json:"vBucketMap"`
}

VBucketServerMap is the a mapping of vbuckets to nodes.

type ViewDefinition

type ViewDefinition struct {
	Map    string `json:"map"`
	Reduce string `json:"reduce,omitempty"`
}

ViewDefinition represents a single view within a design document.

type ViewError

type ViewError struct {
	From   string
	Reason string
}

A ViewError is a node-specific error indicating a partial failure within a view result.

func (ViewError) Error

func (ve ViewError) Error() string

type ViewResult

type ViewResult struct {
	TotalRows int `json:"total_rows"`
	Rows      []ViewRow
	Errors    []ViewError
}

ViewResult holds the entire result set from a view request, including the rows and the errors.

type ViewRow

type ViewRow struct {
	ID    string
	Key   interface{}
	Value interface{}
	Doc   *interface{}
}

ViewRow represents a single result from a view.

Doc is present only if include_docs was set on the request.

type WriteOptions

type WriteOptions int

WriteOptions is the set of option flags availble for the Write method. They are ORed together to specify the desired request.

func (WriteOptions) String

func (w WriteOptions) String() string

String representation of WriteOptions

type WriteUpdateFunc

type WriteUpdateFunc func(current []byte) (updated []byte, opt WriteOptions, err error)

A WriteUpdateFunc is a callback function to update a document

Directories

Path Synopsis
examples
tools
Package couchbaseutil offers some convenience functions for apps that use couchbase.
Package couchbaseutil offers some convenience functions for apps that use couchbase.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL