riak

package module
v0.0.0-...-97ee455 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2017 License: Apache-2.0 Imports: 17 Imported by: 13

README

Note:

This work has influenced the client that is maintained by Basho, the creators of Riak. You can contribute to the repository here or install with go get github.com/basho/riak-go-client.


riak (goriakpbc) Build Status

Package riak is a riak-client, inspired by the Ruby riak-client gem and the riakpbc go package from mrb (github.com/mrb/riakpbc). It implements a connection to Riak using Protocol Buffers.

A simple program using goriakpbc:

package main

import (
	"fmt"
	"github.com/tpjg/goriakpbc"
)

func main() {
	err := riak.ConnectClient("127.0.0.1:8087")
	if err != nil {
		fmt.Println("Cannot connect, is Riak running?")
		return
	}

	defer riak.Close()

	bucket, _ := riak.NewBucket("tstriak")
	obj := bucket.NewObject("tstobj")
	obj.ContentType = "application/json"
	obj.Data = []byte("{'field':'value'}")
	obj.Store()

	fmt.Printf("Stored an object in Riak, vclock = %v\n", obj.Vclock)

}

Parts of the library are specifically designed to facilitate projects that use both Ruby and Go. See the "Document Models" below. To install run go get github.com/tpjg/goriakpbc and use import as in the example above. If the Document Models (ORM) features are not needed simply run rm $GOPATH/src/github.com/tpjg/goriakpbc/model*.go after go get.

Documentation

More documentation is available in the Wiki (https://github.com/tpjg/goriakpbc/wiki), below are some examples of the features implemented in this library. Full API documentation (automatically generated including protobuf definitions) is available at http://godoc.org//github.com/tpjg/goriakpbc or through go doc.

Secondary indexes (2i)

WARNING: The API has slightly changed and this may break existing applications. The "Indexes" are changed to store multiple values now. Please see https://github.com/tpjg/goriakpbc/issues/71 for some history and a rationale for choosing to break the API in a very clear and predictable way.

Secondary indexes are supported and can be queried for equality using IndexQuery and for a range using IndexQueryRange. Indexes must be added as strings, even when adding a "_int" index. See the example below, taken from riak_test.go:


obj, _ := bucket.NewObject("some_key")
obj.ContentType = "text/plain"
obj.Data = []byte("testing indexes")
obj.Indexes["test_int"] = []string{strconv.Itoa(123)}
err := obj.Store()
...
keys, err := bucket.IndexQuery("test_int", strconv.Itoa(123))
...
keys, err = bucket.IndexQueryRange("test_int", strconv.Itoa(120), strconv.Itoa(130))

Using Riak 1.4 and greater, you can pagination through keys in secondary indexes:

keys, continuation, err := bucket.IndexQueryPage("test_int", strconv.Itoa(123), 10, "")
...
keys, continuation, err = bucket.IndexQueryPage("test_int", strconv.Itoa(123), 10, continuation)
...
keys, continuation, err = bucket.IndexQueryRangePage("test_int", strconv.Itoa(120), strconv.Itoa(130), 10, "")
...
keys, continuation, err = bucket.IndexQueryRangePage("test_int", strconv.Itoa(120), strconv.Itoa(130), 10, continuation)

Map Reduce

There is a function to run a MapReduce directly:

func (c *Client) RunMapReduce(query string) (resp [][]byte, err error)

And MapReduce queries can be build similar to how the MapReduce class from the Ruby riak-client works:

mr := riak.NewMapReduce()
mr.Add("bucket", "key")
mr.LinkBucket("otherbucket", false)
mr.Map("function(v) {return [JSON.parse(v.values[0].data)];}", true)
res, err := mr.Run()

Map functions using Erlang instead of Javascript must be added using "MapErlang" instead of "Map" and there is a predefined function "MapObjectValue" that uses the riak_kv_mapreduce module's map_object_value function. Reduce functions can be added similarly using "Reduce" and "ReduceErlang". For efficiently counting the number of objects the "ReduceObjectCount" can be used that uses the riak_kv_mapreduce module's reduce_count_inputs function.

If the backend supports secondary indexes a whole bucket can be added as input to a MapReduce query. Alternatively range queries and single key queries on 2i are also supported:

mr := riak.NewMapReduce()
mr.AddBucket("bucket")
// mr.AddBucketRange("bucket", "a", "k")
// mr.AddIndexRange("bucket", "key", "a", "k")
// mr.AddIndex("bucket", "key", "somekey1234")
mr.MapObjectValue(true)
res, err := mr.Run()

Counters

Example:

// Get counter from bucket, loads value
c = bucket.GetCounter("key")

// Get counter without existing bucket instance, loads value
c = cli.GetCounterFrom("bucket", "key")

c.Value                  // 0
c.Reload
c.Increment(1)           // 1 on server, 0 in strucct
c.IncrementAndReload(1)  // 2 on server and in struct
c.Decrement(1)           // 1 on server, 2 in struct
c.DecrementAndReload(1)  // 0 on server and in struct

The "AndReload" methods exist to take advantage of an option in update that returns the current value, thus saving a req/resp cycle.

Example:


bucket := client.NewBucket("rocket_launchers")
bucket.SetSearch(true)

if docs, err := client.Search(&Search{Q: "quake", Index: "rocket_launchers"}); err == nil {
    for i, doc := range docs {
        fmt.Printf("Rocket launcher number: %s has key: %s\n", i, doc["key"])
    }
}

The Search struct has fields for row count, start, sorting, etc. See http://godoc.org/github.com/tpjg/goriakpbc#Search for all of them.

Riak Document Models

Document Models, commonly referred to as ORM (Object-Relational Mapping) in other database drivers, maps Go structs to an object in Riak and supports links between objects. This is done by parsing the JSON data from an object in Riak and mapping it to a struct's fields.

The library allows for easy integration of a Go application into a project that also uses Ruby (on Rails) with the "ripple" gem (https://github.com/basho/ripple). To enable easy integration with Ruby/ripple projects the struct "tag" feature of Go is used to get around the naming convention differences between Go and Ruby (Uppercase starting letter required for export versus Uppercase being constants and typically CamelCase versus snake_case). Also it stores the model/struct name as _type in Riak just like ripple does.

For example the following Ruby/Ripple class:

    class Device
      include Ripple::Document
      property :ip, String
      property :description, String
      property :download_enabled, Boolean
    end

can be mapped to the following Go struct:

    type Device struct {
        DownloadEnabled  bool    `riak:"download_enabled"`
        Ip               string  `riak:"ip"`
        Description      string  `riak:"description"`
        riak.Model       `riak:"devices"`
    }

Note that it is required to have an (anonymous) riak.Model field. If the riak.Model field is an anonymous field this has the benefit that the functions like "Save" or "SaveAs" can be called directly as in the example below.

To get an instantiated struct from Riak would then require only a call to the riak.Client "Load" function, and to store it call "Save" or "SaveAs":

err := riak.ConnectClient("127.0.0.1:8087")
var dev Device
err = riak.LoadModel("abcdefghijklm", &dev)
dev.Description = "something else"
err = dev.SaveAs("newkey")

Large object support

Storing really large values (over 10Mb) in Riak is not efficient and is not recommended. If you care about worst case latencies it is recommended to keep values under 100Kb (see http://lists.basho.com/pipermail/riak-users_lists.basho.com/2014-March/014938.html). Changing small parts of a large value is also not efficient because the complete value must be PUT on every change (e.g. when storing files that grow over time like daily log files).

For storing these a value can be split into multiple segments, goriakpbc provides an RFile object for this. This object abstracts the chunking of data and exposes the familiar io.Reader, io.Writer and io.Seeker interfaces similar to os.File.

src, err := os.Open("file.mp4")
// Create a file in Riak and split the data into 100Kb chunks
dst, err := riak.CreateFile("bucket", "key", "video/mp4", 102400)

size, err := io.Copy(dst, src)

Some meta-data, like the segment size and number of segments about the "RFile" will be stored in the value at "key" using the Meta tag feature of Riak, the actual data in segments with keys named "key-00000", "key-000001", et-cetera.

Licensing

goriakpbc is distributed under the Apache license, see LICENSE.txt file or http://www.apache.org/licenses/LICENSE-2.0 for details. The model_json_*.go files are a copy from the original Go distribution with minor changes and are governed by a BSD-style license, see LICENSE.go.txt.

Documentation

Overview

Package riak is a riak-client, inspired by the Ruby riak-client gem and the riakpbc go package from mrb. It implements a connection to Riak using protobuf.

Index

Constants

View Source
const (
	QuorumOne      = uint32(math.MaxUint32 - 1)
	QuorumMajority = uint32(math.MaxUint32 - 2)
	QuorumAll      = uint32(math.MaxUint32 - 3)
	QuorumDefault  = uint32(math.MaxUint32 - 4)
)

Protobuf symbolic quorum integer values

View Source
const (
	TYPE_COUNTER = 1
	TYPE_SET     = 2
	TYPE_MAP     = 3
)
View Source
const (
	COUNTER  = 1
	SET      = 2
	REGISTER = 3
	FLAG     = 4
	MAP      = 5
)

Variables

View Source
var (
	R1  = map[string]uint32{"r": 1}
	PR1 = map[string]uint32{"pr": 1}
	W1  = map[string]uint32{"w": 1}
	DW1 = map[string]uint32{"dw": 1}
	PW1 = map[string]uint32{"pw": 1}
)

Options for storing and retrieving data, only a few are defined, different values can be supplied by creating a map in the application, for example:

bucket.Get("key", map[string]int{"r":2})
View Source
var (
	BadNumberOfConnections = errors.New("Connection count <= 0")
	BadResponseLength      = errors.New("Response length too short")
	NoBucketName           = errors.New("No bucket name")
	BadMapReduceInputs     = errors.New("MapReduce inputs should be either a (single) index or bucket,key pairs - not both at")
	ChanWaitTimeout        = errors.New("Waiting for an available connection timed out")
)

Error definitions

View Source
var (
	ResolveNotImplemented     = errors.New("Resolve not implemented")
	DestinationError          = errors.New("Destination is not a pointer (to a struct)")
	DestinationIsNotModel     = errors.New("Destination has no riak.Model field")
	DestinationIsNotSlice     = errors.New("Must supply a slice to GetSiblings")
	DestinationLengthError    = errors.New("Length of slice does not match number of siblings")
	DestinationNotInitialized = errors.New("Destination struct is not initialized (correctly) using riak.New or riak.Load")
	ModelDoesNotMatch         = errors.New("Warning: struct name does not match _type in Riak")
	ModelNotNew               = errors.New("Destination struct already has an instantiated riak.Model (this struct is probably not new)")
	NoSiblingData             = errors.New("No non-empty sibling data")
)

Error definitions

View Source
var (
	NotFile     = errors.New("Not suitable to use as RFile")
	ErrorInFile = errors.New("Error in RFile")
)
View Source
var (
	NoDefaultClientConnection = errors.New("No (default) client connection")
)
View Source
var (
	NotFound = errors.New("Object not found")
)

Error definitions

Functions

func Close

func Close()

Closes the connection of the default client.

func ConnectClient

func ConnectClient(addr string) (err error)

Create the default client with a single connection to Riak.

func ConnectClientPool

func ConnectClientPool(addr string, count int) (err error)

Create the default client, using a pool of connections. This is the recommended method to connect to Riak in an application. A single client instance can be used by multiple threads/goroutines and will not block operations if there are enough connections in the pool. NOTE: If an application needs connections to different Riak clusters it can use riak.NewClientPool or riak.NewClient.

func DeleteFrom

func DeleteFrom(bucketname string, key string, options ...map[string]uint32) (err error)

Delete directly from a bucket, without creating a bucket object first

func ExistsIn

func ExistsIn(bucketname string, key string, options ...map[string]uint32) (exists bool, err error)

Test if an object exists in a bucket directly, without creating a bucket object first

func Id

func Id() (id string, err error)

Get the client Id

func IsWarning

func IsWarning(err error) bool

Return is an error is really a warning, e.g. a common json error, or ModelDoesNotMatch.

func LoadModel

func LoadModel(key string, dest Resolver, options ...map[string]uint32) (err error)

Load data into the model using the default bucket (from the Model's struct definition)

func LoadModelFrom

func LoadModelFrom(bucketname string, key string, dest Resolver, options ...map[string]uint32) (err error)

The LoadModelFrom function retrieves the data from Riak using the default client and stores it in the struct that is passed as destination.

func NewModel

func NewModel(key string, dest Resolver, options ...map[string]uint32) (err error)

Instantiate a new model (using the default client), setting the necessary fields, like the client. If key is not empty that key will be used, otherwise Riak will choose a key.

func NewModelIn

func NewModelIn(bucketname string, key string, dest Resolver, options ...map[string]uint32) (err error)

Create a new Document Model (using the default client), passing in the bucketname and key.

func Ping

func Ping() (err error)

Ping the server

func RunMapReduce

func RunMapReduce(query string) (resp [][]byte, err error)

Run a MapReduce query directly

func ServerVersion

func ServerVersion() (node string, version string, err error)

Get the server version for the default client

func SetId

func SetId(id string) (err error)

Set the client Id

Types

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

Implements access to a bucket and its properties

func NewBucket

func NewBucket(name string) (*Bucket, error)

Return a new bucket using the client connection

func NewBucketType

func NewBucketType(btype, name string) (*Bucket, error)

Return a new bucket using the client connection

func (*Bucket) AllowMult

func (b *Bucket) AllowMult() bool

Return the allowMult property of a bucket

func (*Bucket) Delete

func (b *Bucket) Delete(key string, options ...map[string]uint32) (err error)

Delete a key/value from the bucket

func (*Bucket) Exists

func (b *Bucket) Exists(key string, options ...map[string]uint32) (exists bool, err error)

Test if an object exists

func (*Bucket) FetchCounter

func (b *Bucket) FetchCounter(key string, options ...map[string]uint32) (obj *RDtCounter, err error)

func (*Bucket) FetchMap

func (b *Bucket) FetchMap(key string, options ...map[string]uint32) (obj *RDtMap, err error)

func (*Bucket) FetchSet

func (b *Bucket) FetchSet(key string, options ...map[string]uint32) (obj *RDtSet, err error)

func (*Bucket) Get

func (b *Bucket) Get(key string, options ...map[string]uint32) (obj *RObject, err error)

Get an object

func (*Bucket) GetCounter

func (b *Bucket) GetCounter(key string, options ...map[string]uint32) (c *Counter, err error)

Get a counter

func (*Bucket) GetCounterWithoutLoad

func (b *Bucket) GetCounterWithoutLoad(key string, options ...map[string]uint32) (c *Counter, err error)

func (*Bucket) IndexQuery

func (b *Bucket) IndexQuery(index string, key string) (keys []string, err error)

Return a list of keys using the index for a single key

func (*Bucket) IndexQueryPage

func (b *Bucket) IndexQueryPage(index string, key string, results uint32, continuation string) (keys []string, next string, err error)

Return a page of keys using the index for a single key

func (*Bucket) IndexQueryRange

func (b *Bucket) IndexQueryRange(index string, min string, max string) (keys []string, err error)

Return a list of keys using the index range query

func (*Bucket) IndexQueryRangePage

func (b *Bucket) IndexQueryRangePage(index string, min string, max string, results uint32, continuation string) (keys []string, next string, err error)

Return a page of keys using the index range query

func (*Bucket) LastWriteWins

func (b *Bucket) LastWriteWins() bool

Return the lastWriteWins property of a bucket

func (*Bucket) ListKeys

func (b *Bucket) ListKeys() (response [][]byte, err error)

List all keys from bucket

func (*Bucket) NVal

func (b *Bucket) NVal() uint32

Return the nval property of a bucket

func (*Bucket) Name

func (b *Bucket) Name() string

Return the bucket name

func (*Bucket) New

func (b *Bucket) New(key string, options ...map[string]uint32) *RObject

Create a new RObject. DEPRECATED, use NewObject instead

func (*Bucket) NewObject

func (b *Bucket) NewObject(key string, options ...map[string]uint32) *RObject

Create a new RObject

func (*Bucket) Search

func (b *Bucket) Search() bool

Return the search property of a bucket

func (*Bucket) SearchIndex

func (b *Bucket) SearchIndex() string

Return the search_index property of a bucket

func (*Bucket) SetAllowMult

func (b *Bucket) SetAllowMult(allowMult bool) (err error)

Set the allowMult property of a bucket

func (*Bucket) SetLastWriteWins

func (b *Bucket) SetLastWriteWins(lastWriteWins bool) (err error)

Set the lastWriteWins property of a bucket

func (*Bucket) SetNVal

func (b *Bucket) SetNVal(nval uint32) (err error)

Set the nval property of a bucket

func (*Bucket) SetSearch

func (b *Bucket) SetSearch(search bool) (err error)

Set the search property of a bucket

func (*Bucket) SetSearchIndex

func (b *Bucket) SetSearchIndex(searchIndex string) (err error)

Set the search_index property of a bucket

type Client

type Client struct {
	// contains filtered or unexported fields
}

riak.Client the client interface

func New

func New(addr string) *Client

Returns a new Client connection. DEPRECATED, use NewClient instead

func NewClient

func NewClient(addr string) *Client

Returns a new Client connection

func NewClientPool

func NewClientPool(addr string, count int) *Client

Returns a new Client with multiple connections to Riak

func NewPool

func NewPool(addr string, count int) *Client

Returns a new Client with multiple connections to Riak. DEPRECATED, use NewClientPool instead

func (*Client) Bucket

func (c *Client) Bucket(name string) (*Bucket, error)

Return a new bucket object. DEPRECATED, use NewBucket instead.

func (*Client) Close

func (c *Client) Close()

Close the connection

func (*Client) Connect

func (c *Client) Connect() error

Connects to a Riak server.

func (*Client) CreateFile

func (c *Client) CreateFile(bucketname string, key string, contentType string, chunk_size int, options ...map[string]uint32) (*RFile, error)

Create a new RFile. Will overwrite/truncate existing data.

func (*Client) DeleteFrom

func (c *Client) DeleteFrom(bucketname string, key string, options ...map[string]uint32) (err error)

Delete directly from a bucket, without creating a bucket object first

func (*Client) ExistsIn

func (c *Client) ExistsIn(bucketname string, key string, options ...map[string]uint32) (exists bool, err error)

Test if an object exists in a bucket directly, without creating a bucket object first

func (*Client) FetchSchema

func (c *Client) FetchSchema(schemaName string) (*Schema, error)

func (*Client) GetCounterFrom

func (c *Client) GetCounterFrom(bucketname string, key string, options ...map[string]uint32) (counter *Counter, err error)

Get counter directly from a bucket, without creating a bucket first

func (*Client) GetFrom

func (c *Client) GetFrom(bucketname string, key string, options ...map[string]uint32) (obj *RObject, err error)

Get directly from a bucket, without creating a bucket first

func (*Client) Id

func (c *Client) Id() (id string, err error)

Get the client Id

func (*Client) Key

func (c *Client) Key(dest interface{}) (key string, err error)

Get a models Key, e.g. needed when Riak has picked it

func (*Client) Load

func (c *Client) Load(bucketname string, key string, dest Resolver, options ...map[string]uint32) (err error)

Load data into model. DEPRECATED, use LoadModelFrom instead.

func (*Client) LoadModel

func (c *Client) LoadModel(key string, dest Resolver, options ...map[string]uint32) (err error)

Load data into the model using the default bucket (from the Model's struct definition)

func (*Client) LoadModelFrom

func (c *Client) LoadModelFrom(bucketname string, key string, dest Resolver, options ...map[string]uint32) (err error)

The LoadModelFrom function retrieves the data from Riak and stores it in the struct that is passed as destination. It stores some necessary information in the riak.Model field so it can be used later in other (Save) operations.

If the bucketname is empty ("") it'll be the default bucket, based on the riak.Model tag.

Using the "Device" struct as an example:

dev := &Device{} err := client.Load("devices", "12345", dev)

func (*Client) MapReduce

func (c *Client) MapReduce() *MapReduce

func (*Client) New

func (c *Client) New(bucketname string, key string, dest Resolver, options ...map[string]uint32) (err error)

Create a new model. DEPRECATED, use NewModelIn instead.

func (*Client) NewBucket

func (c *Client) NewBucket(name string) (*Bucket, error)

Return a new bucket object

func (*Client) NewBucketType

func (c *Client) NewBucketType(btype, name string) (*Bucket, error)

func (*Client) NewModel

func (c *Client) NewModel(key string, dest Resolver, options ...map[string]uint32) (err error)

Instantiate a new model, setting the necessary fields, like the client. If key is not empty that key will be used, otherwise Riak will choose a key.

func (*Client) NewModelIn

func (c *Client) NewModelIn(bucketname string, key string, dest Resolver, options ...map[string]uint32) (err error)

Create a new Document Model, passing in the bucketname and key. The key can be empty in which case Riak will pick a key. The destination must be a pointer to a struct that has the riak.Model field. If the bucketname is empty the default bucketname, based on the riak.Model tag will be used.

func (*Client) NewObjectIn

func (c *Client) NewObjectIn(bucketname string, key string, options ...map[string]uint32) (*RObject, error)

Create a new RObject in a bucket directly, without creating a bucket object first

func (*Client) NewSchema

func (c *Client) NewSchema(name string, content string) (*Schema, error)

func (*Client) NewSearchIndex

func (c *Client) NewSearchIndex(indexName string, schemaName string, nval uint32) (*SearchIndex, error)

func (*Client) OpenFile

func (c *Client) OpenFile(bucketname string, key string, options ...map[string]uint32) (*RFile, error)

Open a File. Will return an error if it does not exist in Riak yet or does not have the correct meta-tags to support File-like operations.

func (*Client) Ping

func (c *Client) Ping() (err error)

Ping the server

func (*Client) RunMapReduce

func (c *Client) RunMapReduce(query string) (resp [][]byte, err error)

Run a MapReduce query

func (*Client) Save

func (c *Client) Save(dest Resolver) (err error)

Save a Document Model to Riak

func (*Client) SaveAs

func (c *Client) SaveAs(newKey string, dest Resolver) (err error)

Save a Document Model to Riak under a new key, if empty a Key will be choosen by Riak

func (*Client) Search

func (c *Client) Search(s *Search) ([]map[string][]byte, float32, uint32, error)

func (*Client) ServerVersion

func (c *Client) ServerVersion() (node string, version string, err error)

Get the server version

func (*Client) SetChanWaitTimeout

func (c *Client) SetChanWaitTimeout(waitTimeout time.Duration)

Set the maximum time to wait for a connection to be available in the pool. By default getConn() will wait forever.

func (*Client) SetConnectTimeout

func (c *Client) SetConnectTimeout(timeout time.Duration)

Set the maximum time to wait for a connection to complete By default Connect() will wait around 3 minutes.

func (*Client) SetId

func (c *Client) SetId(id string) (err error)

Set the client Id

func (*Client) SetKey

func (c *Client) SetKey(newKey string, dest interface{}) (err error)

Set the Key value, note that this does not save the model, it only changes the data structure

type Counter

type Counter struct {
	Bucket  *Bucket
	Key     string
	Value   int64
	Options []map[string]uint32
}

func (*Counter) Decrement

func (c *Counter) Decrement(amount int64) (err error)

Decrement a counter by a given amount

func (*Counter) DecrementAndReload

func (c *Counter) DecrementAndReload(amount int64) (err error)

Decrement a counter by a given amount and reload its value

func (*Counter) Destroy

func (c *Counter) Destroy() (err error)

Destroy the counter

func (*Counter) Increment

func (c *Counter) Increment(amount int64) (err error)

Increment a counter by a given amount

func (*Counter) IncrementAndReload

func (c *Counter) IncrementAndReload(amount int64) (err error)

Increment a counter by a given amount and reload its value

func (*Counter) Reload

func (c *Counter) Reload() (err error)

Reload the value of a counter

type Link struct {
	Bucket string
	Key    string
	Tag    string
}

A Riak link

type Many

type Many []One

Link to many other models

func (*Many) Add

func (m *Many) Add(dest Resolver) (err error)

Add a Link to the given Model (dest)

func (m *Many) AddLink(o One)

Add a given Link (One) directly

func (*Many) Contains

func (m *Many) Contains(o One) bool

Return if a given Link is in the riak.Many slice

func (*Many) Len

func (m *Many) Len() int

Return the number of Links

func (*Many) Remove

func (m *Many) Remove(dest Resolver) (err error)

Remove a Link to the given Model (dest)

func (m *Many) RemoveLink(o One) (err error)

Remove a given Link (One) directly, e.g. so it can be used when iterating over a riak.Many slice

type MapKey

type MapKey struct {
	Key  string
	Type pb.MapField_MapFieldType
}

type MapReduce

type MapReduce struct {
	// contains filtered or unexported fields
}

An object to build a MapReduce job similar to how the Ruby client can build it by adding different stages.

func NewMapReduce

func NewMapReduce() *MapReduce

Create a MapReduce object that can be used to build a MR query

func (*MapReduce) Add

func (mr *MapReduce) Add(bucket string, key string) (err error)

func (*MapReduce) AddBucket

func (mr *MapReduce) AddBucket(bucket string) (err error)

Add a whole bucket as input. Note that this ONLY works on buckets that have secondary indexes (2i) enabled since listing keys on a bucket without using indexes is dangerous on production clusters.

func (*MapReduce) AddBucketRange

func (mr *MapReduce) AddBucketRange(bucket string, start string, end string) (err error)

Add a range of keys from one bucket using secondary indexes.

func (*MapReduce) AddIndex

func (mr *MapReduce) AddIndex(bucket string, index string, key string) (err error)

Add a keys using a secondary index.

func (*MapReduce) AddIndexRange

func (mr *MapReduce) AddIndexRange(bucket string, index string, start string, end string) (err error)

Add a range of keys using a secondary index.

func (*MapReduce) LinkBucket

func (mr *MapReduce) LinkBucket(name string, keep bool)

func (*MapReduce) Map

func (mr *MapReduce) Map(fun string, keep bool)

func (*MapReduce) MapErlang

func (mr *MapReduce) MapErlang(module string, fun string, keep bool)

func (*MapReduce) MapObjectValue

func (mr *MapReduce) MapObjectValue(keep bool)

func (*MapReduce) Query

func (mr *MapReduce) Query() (query []byte, err error)

Generate the Query string

func (*MapReduce) Reduce

func (mr *MapReduce) Reduce(fun string, keep bool)

func (*MapReduce) ReduceErlang

func (mr *MapReduce) ReduceErlang(module string, fun string, arg string, keep bool)

func (*MapReduce) ReduceObjectCount

func (mr *MapReduce) ReduceObjectCount(keep bool)

func (*MapReduce) Run

func (mr *MapReduce) Run() (resp [][]byte, err error)

type Model

type Model struct {
	// contains filtered or unexported fields
}

Make structs work like a Document Model, similar to how the Ruby based "ripple" gem works. This is done by parsing the JSON data and mapping it to the struct's fields. To enable easy integration with Ruby/ripple projects the struct "tag" feature of Go is used to possibly get around the naming convention differences between Go and Ruby (Uppercase starting letter required for export and typically CamelCase versus underscores). Also it stores the model/struct name as _type in Riak.

For example the following Ruby/Ripple class:

class Device
  include Ripple::Document
  property :ip, String
  property :description, String
  property :download_enabled, Boolean
end

can be mapped to the following Go class:

type Device struct {
	riak.Model       `riak:devices`
	Download_enabled bool    `riak:"download_enabled"`
	Ip               string  `riak:"ip"`
	Description      string  `riak:"description"`
}

Note that it is required to have a riak.Model field. Also if the field name in Ripple is equal the extra tag is not needed, (e.g. if the Ripple class above would have a "property :Ip, String").

func (*Model) CRC32

func (m *Model) CRC32() uint32

Return crc32 of the underlying robject data for easy comparison to other models

func (*Model) Delete

func (m *Model) Delete() (err error)

Delete a Document Model

func (*Model) GetSiblings

func (m *Model) GetSiblings(dest interface{}) (err error)

func (*Model) Indexes

func (m *Model) Indexes() map[string][]string

Return the object's indexes. This allows an application to set custom secondary indexes on the object for later querying.

func (Model) Key

func (m Model) Key() (key string)

Get a models Key, e.g. needed when Riak has picked it

func (*Model) Reload

func (m *Model) Reload() (err error)

Reload a Document Model

func (*Model) Resolve

func (*Model) Resolve(count int) (err error)

func (*Model) Save

func (m *Model) Save() (err error)

Save a Document Model to Riak

func (*Model) SaveAs

func (m *Model) SaveAs(newKey string) (err error)

Save a Document Model to Riak under a new key, if empty a Key will be choosen by Riak

func (Model) SetKey

func (m Model) SetKey(newKey string) (err error)

Set the Key value, note that this does not save the model, it only changes the data structure

func (*Model) Siblings

func (m *Model) Siblings(dest Resolver) (result interface{}, err error)

Allocates a slice of models for the result and populates it with the data from the siblings held in Riak.

func (*Model) Vclock

func (m *Model) Vclock() (vclock []byte)

Return the object Vclock - this allows an application to detect whether Reload() loaded a newer version of the object

type One

type One struct {
	// contains filtered or unexported fields
}

Link to one other model

func (*One) Empty

func (o *One) Empty() bool

Test if the link is empty (not set)

func (*One) Equal

func (o *One) Equal(e One) bool

Test for equality (bucket and key are equal)

func (*One) Get

func (o *One) Get(dest Resolver) (err error)
func (o One) Link() (link Link)

func (*One) Set

func (o *One) Set(dest Resolver) (err error)

Set the link to a given Model (dest)

func (o *One) SetLink(a One)

Set a link directly

type RDataType

type RDataType interface {
	Store() error
	Destroy() error
}

type RDataTypeObject

type RDataTypeObject struct {
	Bucket  *Bucket
	Key     string
	Options []map[string]uint32
	Context []uint8
}

func (*RDataTypeObject) Destroy

func (obj *RDataTypeObject) Destroy() (err error)

type RDtCounter

type RDtCounter struct {
	RDataTypeObject
	Value *int64
	Incr  int64
}

func (*RDtCounter) GetValue

func (counter *RDtCounter) GetValue() int64

func (*RDtCounter) Increment

func (counter *RDtCounter) Increment(value int64)

func (*RDtCounter) Store

func (counter *RDtCounter) Store() (err error)

func (*RDtCounter) ToOp

func (counter *RDtCounter) ToOp() *pb.DtOp

type RDtFlag

type RDtFlag struct {
	Value    bool
	Enabled  bool
	Disabled bool
}

func (*RDtFlag) Disable

func (f *RDtFlag) Disable()

func (*RDtFlag) Enable

func (f *RDtFlag) Enable()

func (*RDtFlag) GetValue

func (f *RDtFlag) GetValue() bool

type RDtMap

type RDtMap struct {
	RDataTypeObject
	Values   map[MapKey]interface{}
	ToAdd    []*pb.MapUpdate
	ToRemove []*pb.MapField
}

func (*RDtMap) AddCounter

func (m *RDtMap) AddCounter(key string) (e *RDtCounter)

func (*RDtMap) AddFlag

func (m *RDtMap) AddFlag(key string) (e *RDtFlag)

func (*RDtMap) AddMap

func (m *RDtMap) AddMap(key string) (e *RDtMap)

func (*RDtMap) AddRegister

func (m *RDtMap) AddRegister(key string) (e *RDtRegister)

func (*RDtMap) AddSet

func (m *RDtMap) AddSet(key string) (e *RDtSet)

func (*RDtMap) FetchCounter

func (m *RDtMap) FetchCounter(key string) (e *RDtCounter)

func (*RDtMap) FetchFlag

func (m *RDtMap) FetchFlag(key string) (e *RDtFlag)

func (*RDtMap) FetchMap

func (m *RDtMap) FetchMap(key string) (e *RDtMap)

func (*RDtMap) FetchRegister

func (m *RDtMap) FetchRegister(key string) (e *RDtRegister)

func (*RDtMap) FetchSet

func (m *RDtMap) FetchSet(key string) (e *RDtSet)

func (*RDtMap) Init

func (m *RDtMap) Init(mapvalues []*pb.MapEntry)

func (*RDtMap) Print

func (m *RDtMap) Print()

func (*RDtMap) PrintInt

func (m *RDtMap) PrintInt(indent int)

func (*RDtMap) RemoveCounter

func (m *RDtMap) RemoveCounter(key string)

func (*RDtMap) RemoveFlag

func (m *RDtMap) RemoveFlag(key string)

func (*RDtMap) RemoveMap

func (m *RDtMap) RemoveMap(key string)

func (*RDtMap) RemoveRegister

func (m *RDtMap) RemoveRegister(key string)

func (*RDtMap) RemoveSet

func (m *RDtMap) RemoveSet(key string)

func (*RDtMap) Size

func (m *RDtMap) Size() int

func (*RDtMap) Store

func (m *RDtMap) Store() (err error)

func (*RDtMap) ToOp

func (m *RDtMap) ToOp() *pb.DtOp

type RDtRegister

type RDtRegister struct {
	Value    []byte
	NewValue []byte
}

func (*RDtRegister) GetValue

func (f *RDtRegister) GetValue() []byte

func (*RDtRegister) Update

func (r *RDtRegister) Update(value []byte)

type RDtSet

type RDtSet struct {
	RDataTypeObject
	Value    [][]byte
	ToAdd    [][]byte
	ToRemove [][]byte
}

func (*RDtSet) Add

func (set *RDtSet) Add(value []byte)

func (*RDtSet) GetValue

func (set *RDtSet) GetValue() [][]byte

func (*RDtSet) Remove

func (set *RDtSet) Remove(value []byte)

func (*RDtSet) Store

func (set *RDtSet) Store() (err error)

func (*RDtSet) ToOp

func (set *RDtSet) ToOp() *pb.DtOp

type RFile

type RFile struct {
	// contains filtered or unexported fields
}
The RFile struct stores (large) values in Riak and behaves very similar

to a regular os.File object. It implements the io.Reader, io.Writer and io.Seeker interfaces. The value is split into chunks because really large values (>10Mb) can't be stored efficiently in Riak and also because growing or changing large values is inefficient (changing a single byte would require a PUT of the entire, possibly large, value).

func CreateFile

func CreateFile(bucketname string, key string, contentType string, chunk_size int, options ...map[string]uint32) (*RFile, error)

func OpenFile

func OpenFile(bucketname string, key string, options ...map[string]uint32) (*RFile, error)

func (*RFile) Flush

func (r *RFile) Flush() error

Force a write of the underlying root RObject (e.g. after changing Meta and/or Indexes)

func (*RFile) Indexes

func (r *RFile) Indexes() map[string][]string

Expose Indexes of the underlying root RObject

func (*RFile) Meta

func (r *RFile) Meta() map[string]string

Expose Meta information of the underlying root RObject

func (*RFile) Read

func (r *RFile) Read(p []byte) (n int, err error)

Implements the io.Reader interface

func (*RFile) Seek

func (r *RFile) Seek(offset int64, whence int) (int64, error)

Implements the io.Seeker interface

func (*RFile) Size

func (r *RFile) Size() int

func (*RFile) Write

func (r *RFile) Write(p []byte) (n int, err error)

Implements the io.Writer interface

type RObject

type RObject struct {
	Bucket       *Bucket
	Vclock       []byte
	Key          string
	ContentType  string
	Data         []byte
	Links        []Link
	Meta         map[string]string
	Indexes      map[string][]string
	Vtag         string
	LastMod      uint32
	LastModUsecs uint32

	Siblings []Sibling
	Options  []map[string]uint32
	// contains filtered or unexported fields
}

An RObject is an object or document that is or can be stored in Riak

func GetFrom

func GetFrom(bucketname string, key string, options ...map[string]uint32) (obj *RObject, err error)

Get directly from a bucket, without creating a bucket first

func NewObjectIn

func NewObjectIn(bucketname string, key string, options ...map[string]uint32) (*RObject, error)

Create a new RObject in a bucket directly, without creating a bucket object first

func (obj *RObject) AddLink(link Link) bool

Add a link if it is not already in the Links slics, returns false if already present

func (*RObject) Conflict

func (obj *RObject) Conflict() bool

Returns true if the object was fetched with multiple siblings (AllowMult=true on the bucket)

func (*RObject) Destroy

func (obj *RObject) Destroy() (err error)

Delete the object from Riak

func (*RObject) LinkTo

func (obj *RObject) LinkTo(target *RObject, tag string)

Add a link to another object (does not store the object, must explicitly call "Store()")

func (*RObject) Reload

func (obj *RObject) Reload() (err error)

Reload an object if it has changed (new Vclock)

func (*RObject) Store

func (obj *RObject) Store() (err error)

Store an RObject

type Resolver

type Resolver interface {
	Resolve(int) error
}

type Schema

type Schema struct {
	Name    string
	Content string
	// contains filtered or unexported fields
}

func (*Schema) Store

func (s *Schema) Store() error

StoreSchema validate schema and sends it to Riak

type Search struct {
	Q       string
	Index   string
	Rows    uint32
	Start   uint32
	Sort    string
	Filter  string
	Df      string
	Op      string
	PreSort string
	Fields  []string
}

type SearchIndex

type SearchIndex struct {
	Name   string
	Schema string
	NVal   uint32
	// contains filtered or unexported fields
}

func (*SearchIndex) Store

func (s *SearchIndex) Store() error

!Please wait some (5) seconds for Riak stores indexes, before start using it

type Sibling

type Sibling struct {
	ContentType  string
	Data         []byte
	Links        []Link
	Meta         map[string]string
	Indexes      map[string][]string
	Vtag         string
	LastMod      uint32
	LastModUsecs uint32
}

An object van have siblings that can each have their own content

Directories

Path Synopsis
Package riak is a generated protocol buffer package.
Package riak is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL