coherence

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2024 License: UPL-1.0 Imports: 35 Imported by: 1

Documentation

Overview

Package coherence provides a set of functions and interfaces for Go programs to act as cache clients to a Coherence Cluster using gRPC for the network transport.

Your cluster must be running Coherence Community Edition (CE) 22.06.4+ or Coherence commercial version 14.1.1.2206.4+ and must be running a gRPC Proxy.

Two interfaces, NamedMap and NamedCache, are available to access Coherence caches. NamedCache is syntactically identical in behaviour to a NamedMap, but additionally implements the PutWithExpiry operation.

Introduction

The Coherence Go client provides the following features:

  • Familiar Map-like interface for manipulating cache entries including but not limited to Put, PutWithExpiry, PutIfAbsent, PutAll, Get, GetAll, Remove, Clear, GetOrDefault, Replace, ReplaceMapping, Size, IsEmpty, ContainsKey, ContainsValue, ContainsEntry
  • Cluster-side querying, aggregation and filtering of map entries
  • Cluster-side manipulation of map entries using EntryProcessors
  • Registration of listeners to be notified of mutations such as
  • insert, update and delete on Maps, map lifecycle events such as truncated, released or destroyed and session lifecycle events such as connected, disconnected, reconnected and closed
  • Support for storing Go structs as JSON as well as the ability to serialize to Java objects on the server for access from other Coherence language API's
  • Near cache support to cache frequently accessed data in the Go client to avoid sending requests across the network
  • Full support for Go generics in all Coherence API's

For more information on Coherence caches, please see the Coherence Documentation.

Supported Go versions

This API fully supports Go Generics and is only supported for use with Go versions 1.19 and above.

Obtaining a Session

Example:

import (
    coherence "github.com/oracle/coherence-go-client/coherence"
)

...

session, err := coherence.NewSession(ctx)
if err != nil {
    log.Fatal(err)
}
defer session.Close()

The NewSession function creates a new session that will connect to a gRPC proxy server on "localhost:1408" by default.

You can specify the host and port to connect to by specifying the environment variable COHERENCE_SERVER_ADDRESS. See gRPC Naming for information on values for this. You can also pass coherence.WithAddress("host:port") to specify the gRPC host and port to connect to. The default connection mode is with SSL enabled, but you can use plan-text via using coherence.WithPlainText().

To Configure SSL, you must first enable SSL on the gRPC Proxy, see gRPC Proxy documentation for details. Refer to the section on NewSession for more information on setting up a SSL connection on the client.

See SessionOptions which lists all the options supported by the Session API.

Controlling timeouts

Most operations you call require you to supply a context.Context. If your context does not contain a deadline, the operation will wrap your context in a new context.WithTimeout using either the default timeout of 30,000 millis or the value you set using option coherence.WithRequestTimeout when you called NewSession.

For example, to override the default request timeout of 30,000 millis with one of 5 seconds for a Session you can do the following:

session, err = coherence.NewSession(ctx, coherence.WithRequestTimeout(time.Duration(5) * time.Second))

You can also override the default request timeout using the environment variable COHERENCE_CLIENT_REQUEST_TIMEOUT.

By default, if an endpoint is not ready, the Go client will fail-fast. You can change this behaviour by setting the option coherence.WithReadyTimeout to a value millis value greater than zero which will cause the Go client to wait until up to the timeout specified until it fails if no endpoint is available. You can also use the environment variable COHERENCE_READY_TIMEOUT.

You also have the ability to control maximum amount of time, in milliseconds, a Session may remain in a disconnected state without successfully reconnecting. For this you use the option coherence.WithDisconnectTimeout or the environment variable COHERENCE_SESSION_DISCONNECT_TIMEOUT.

Obtaining a NamedMap or NamedCache

Once a session has been created, the GetNamedMap(session, name, ...options) or GetNamedCache(session, name, ...options) can be used to obtain an instance of a NamedMap or NamedCache. The key and value types must be provided as generic type arguments. This identifier may be shared across clients. It's also possible to have many [NamedMap]s or [NamedCache]s defined and in use simultaneously.

Example:

session, err := coherence.NewSession(ctx)
if err != nil {
    log.Fatal(err)
}
defer session.Close()

namedMap, err := coherence.GetNamedMap[int, string](session, "customers")
if err != nil {
    log.Fatal(err)
}

If you wish to create a NamedCache, which supports expiry, you can use the GetNamedCache function and then use the PutWithExpiry function call.

namedCache, err := coherence.GetNamedCache[int, string](session, "customers")
if err != nil {
    log.Fatal(err)
}

_, err = namedCache.PutWithExpiry(ctx, person1.ID, person1, time.Duration(5)*time.Second)

If your NamedCache requires the same expiry for every entry, you can use the coherence.WithExpiry cache option. Each call to Put will use the default expiry you have specified. If you use PutWithExpiry, this will override the default expiry for that key.

namedCache, err := coherence.GetNamedCache[int, Person](session, "cache-expiry", coherence.WithExpiry(time.Duration(5)*time.Second))

See CacheOptions which lists all the options supported by the GetNamedCache or GetNamedMap API.

Basic CRUD operations

Note: See the examples on GitHub for detailed examples.

Assuming a very trivial NamedMap with integer keys and string values.

session, err := coherence.NewSession(coherence.WithPlainText())
if err != nil {
    log.Fatal(err)
}

namedMap, err := coherence.GetNamedMap[int, string](session, "my-map")
if err != nil {
    log.Fatal(err)
}

ctx := context.Background()

// put a new key / value
if _, err = namedMap.Put(ctx, 1, "one"); err != nil {
    log.Fatal(err)
}

// get the value for the given key
if value, err = namedMap.Get(ctx, 1); err != nil {
    log.Fatal(err)
}
fmt.Println("Value for key 1 is", *value)

// update the value for key 1
if _, err = namedMap.Put(ctx, 1, "ONE"); err != nil {
    log.Fatal(err)
}

// retrieve the updated value for the given key
if value, err = namedMap.Get(ctx, 1); err != nil {
    log.Fatal(err)
}
fmt.Println("Updated value is", *value)

if _, err = namedMap.Remove(ctx, 1); err != nil {
    log.Fatal(err)
}

Note: Keys and values are serialized to JSON and stored in Coherence as a com.oracle.coherence.io.json.JsonObject. if you wish to store structs as native Java objects, then please see the section further down on "Serializing to Java Objects on the Server".

Working with structs

type Person struct {
    ID   int    `json:"id"`
    Name string `json:"name"`
    Age  int    `json:"age"`
}

// create a new NamedMap of Person with key int
namedMap, err := coherence.GetNamedMap[int, Person](session, "test")
if err != nil {
    log.Fatal(err)
}

// clear the Map
if err = namedMap.Clear(ctx); err != nil {
    log.Fatal(err)
}

newPerson := Person{ID: 1, Name: "Tim", Age: 21}
fmt.Println("Add new Person", newPerson)
if _, err = namedMap.Put(ctx, newPerson.Id, newPerson); err != nil {
    log.Fatal(err)
}

// retrieve the Person
if person, err = namedMap.Get(ctx, 1); err != nil {
    log.Fatal(err)
}
fmt.Println("Person from Get() is", *person)

// Update the age using and entry processor for in-place processing
_, err = coherence.Invoke[int, Person, bool](ctx, namedMap, 1, processors.Update("age", 56))
if err != nil {
    log.Fatal(err)
}

// retrieve the updatedPerson
if person, err = namedMap.Get(ctx, 1); err != nil {
    log.Fatal(err)
}
fmt.Println("Person is", *person)

Querying and filtering using channels

Channels are used to deal with individual keys, values or entries streamed from the backend using a filter or an open query. Depending upon the operation, each result element is wrapped in one of the structs StreamedEntry, StreamedValue or StreamedKey which wraps an error and a Key and/or a Value. As always, the Err object must be checked for errors before accessing the Key or Value fields. All functions that return channels are EntrySetFilter, KeySetFilter, ValuesFilter, EntrySet, KeySet, Values, InvokeAll and InvokeAllFilter.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

// extractors
age := extractors.Extract[int]("age")
name := extractors.Extract[string]("name")

// retrieve all people aged > 30
ch := namedMap.EntrySetFilter(ctx, filters.Greater(age, 20))
for result := range ch {
    if result.Err != nil {
        log.Fatal(result.Err)
    }
    fmt.Println("Key:", result.Key, "Value:", result.Value)
}

// we can also do more complex filtering such as looking for people > 30 and where there name begins with 'T'
ch := namedMap.EntrySetFilter(ctx, filters.Greater(age, 20).And(filters.Like(name, "T%", true)))

Using entry processors for in-place processing

A Processor is an object that allows you to process (update) one or more NamedMap entries on the NamedMap itself, instead of moving the entries to the client across the network. In other words, using processors we send the processing to where the data resides thus avoiding massive data movement across the network. Processors can be executed against all entries, a single key or against a set of entries that match a Filter.

To demonstrate this, lets assume we have a NamedMap populated with Person struct below, and we want to run various scenarios to increase peoples salary by using a processors.Multiply processor.

type Person struct {
    Id     int     `json:"id"`
    Name   string  `json:"name"`
    Salary float32 `json:"salary"`
    Age    int     `json:"age"`
    City   string  `json:"city"`
}

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")

// 1. Increase the salary of the person with Id = 1
newSalary, err = coherence.Invoke[int, Person, float32](ctx, namedMap, 1, processors.Multiply("salary", 1.1, true))

city := extractors.Extract[string]("city")

// 2. Increase the salary of all people in Perth
ch2 := coherence.InvokeAllFilter[int, Person, float32](ctx, namedMap, filters.Equal(city, "Perth"), processors.Multiply("salary", 1.1, true)
for result := range ch2 {
    if result.Err != nil {
        log.Fatal(result.Err)
    }
}

// 3. Increase the salary of people with Id 1 and 5
ch2 := coherence.InvokeAllKeys[int, Person, float32](ctx, namedMap, []int{1, 5}, processors.Multiply("salary", 1.1, true)
for result := range ch2 {
    if result.Err != nil {
        log.Fatal(result.Err)
    }
}

Aggregating cache data

Aggregators can be used to perform operations against a subset of entries to obtain a single result. Entry aggregation occurs in parallel across the grid to provide map-reduce support when working with large amounts of data.

To demonstrate this, lets assume we have a NamedMap populated with Person struct as per the previous example, and we want to run various scenarios to perform aggregations.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

// Retrieve the distinct cities from all people
citiesValues, err := coherence.Aggregate(ctx, namedMap, extractors.Extract[string]("city"))
if err != nil {
    log.Fatal(err)
}
fmt.Println(*citiesValues)
// output: [Perth, Melbourne, Brisbane]

age := extractors.Extract[int]("age")

// minimum age across keys 3 and 4
ageResult, err = coherence.AggregateKeys(ctx, namedMap, []int{3, 4}, aggregators.Min(age))

// top 2 people by salary using filter
var salaryResult *[]Person
salaryResult, err = coherence.AggregateFilter[int, Person, []Person](ctx, namedMap, filters.Greater(age, 40),
    aggregators.TopN[float32, Person](extractors.Extract[float32]("salary"), false, 2))

Responding to cache events

he Coherence Go Client provides the ability to add a MapListener that will receive events (inserts, updates, deletes) that occur against a NamedMap or NamedCache. You can listen for all events, events based upon a filter or vents based upon a key.

// in your main code, create a new NamedMap and register the listener
namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

listener := coherence.NewMapListener[int, Person]().OnUpdated(
func(e coherence.MapEvent[int, Person]) {
    key, err := e.Key()
    if err != nil {
        panic("unable to deserialize key")
    }

    newValue, err := e.NewValue()
    if err != nil {
        panic("unable to deserialize new value")
    }

    oldValue, err := e.OldValue()
    if err != nil {
        panic("unable to deserialize old value")
    }

    fmt.Printf("**EVENT=Updated: key=%v, oldValue=%v, newValue=%v\n", *key, *oldValue, *newValue)
})

if err = namedMap.AddListener(ctx, listener); err != nil {
    panic(err)
}

// ensure we unregister the listener
defer func(ctx context.Context, namedMap coherence.NamedMap[int, Person], listener coherence.MapListener[int, Person]) {
    _ = namedMap.RemoveListener(ctx, listener)
}(ctx, namedMap, listener)

// As you carry out operations that will mutate the cache entries, update the age to 56, you will see the events printed
_, err = coherence.Invoke[int, Person, bool](ctx, namedMap, 1, processors.Update("age", 56))
if err != nil {
    log.Fatal(err)
}

// output
// **EVENT=Updated: key=1, oldValue={1 Tim 53}, newValue={1 Tim 53}

// you can also listen based upon filters, for example the following would create a
// listener for all entries where the salary is > 17000
if err = namedMap.AddFilterListener(ctx, listener,
    filters.Greater(extractors.Extract[int]("salary"), 17000)); err != nil {
    log.Fatal("unable to add listener", listener, err)
}

// You can also listen on a specific key, e.g. list on key 1.
listener := NewUpdateEventsListener[int, Person]()
if err = namedMap.AddKeyListener(ctx, listener, 1); err != nil {
    log.Fatal("unable to add listener", listener, err)
}

Responding to cache lifecycle events

The Coherence Go Client provides the ability to add a MapLifecycleListener that will receive events (truncated and destroyed) that occur against a NamedMap or NamedCache.

// consider the example below where we want to listen for all 'truncate' events for a NamedMap.
// in your main code, create a new NamedMap and register the listener
namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

// Create a listener and add to the cache
listener := coherence.NewMapLifecycleListener[int, Person]().
    OnTruncated(func(e coherence.MapLifecycleEvent[int, Person]) {
        fmt.Printf("**EVENT=%s: source=%v\n", e.Type(), e.Source())
    })

namedMap.AddLifecycleListener(listener)
defer namedMap.RemoveLifecycleListener(listener)

newPerson := Person{ID: 1, Name: "Tim", Age: 21}
fmt.Println("Add new Person", newPerson)
if _, err = namedMap.Put(ctx, newPerson.Id, newPerson); err != nil {
    log.Fatal(err)
}

if size, err = namedMap.Size(ctx); err != nil {
    log.Fatal(err)
}
fmt.Println("Cache size is", size, "truncating cache")

if err = namedMap.Truncate(ctx); err != nil {
    log.Fatal(err)
}

time.Sleep(time.Duration(5) * time.Second)

// output
// Add new Person {1 Tim 53}
// Cache size is 1 truncating cache
// **EVENT=Truncated: value=NamedMap{name=people, format=json}

Responding to session lifecycle events

The Coherence Go Client provides the ability to add a SessionLifecycleListener that will receive events (connected, closed, disconnected or reconnected) that occur against the Session. Note: These events use and experimental gRPC API so may not be reliable or may change in the future. This is due to the experimental nature of the underlying gRPC API.

Consider the example below where we want to listen for all 'All' events for a Session. in your main code, create a new Session and register the listener

// create a new Session
session, err := coherence.NewSession(ctx, coherence.WithPlainText())
if err != nil {
    log.Fatal(err)
}

// Create a listener to listen for session events
listener := coherence.NewSessionLifecycleListener().
    OnAny(func(e coherence.SessionLifecycleEvent) {
        fmt.Printf("**EVENT=%s: source=%v\n", e.Type(), e.Source())
})

session.AddSessionLifecycleListener(listener)
defer session.RemoveSessionLifecycleListener(listener)

// create a new NamedMap of Person with key int
namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

// clear the Map
if err = namedMap.Clear(ctx); err != nil {
    log.Fatal(err)
}

session.Close()

time.Sleep(time.Duration(5) * time.Second)

// output
// 2023/01/31 11:15:37 connected session 59f3ec81-dda1-41b7-92de-70aad3d26615 to address localhost:1408
// 2023/01/31 11:15:38 closed session 59f3ec81-dda1-41b7-92de-70aad3d26615
// **EVENT=session_closed: source=SessionID=59f3ec81-dda1-41b7-92de-70aad3d26615, closed=true, caches=0, maps=0

Serializing to Java objects on the server

By default, the Coherence Go client serializes any keys and values to JSON and then stores them as JsonObjects in Coherence. This is usually sufficient for most applications where you are only accessing your data via the Go Client.

If you wish to access your data via other clients such as Java, JavaScript, C++, .NET or Python, it's best to use Java classes, known to Coherence server, representing the data model. The following describes how to achieve interoperability with Java.

Step 1. Create your Java Classes

Firstly you must define your data model for all Java classes and configure for JSON serialization. You do not need to annotate all the attributes with @JsonbProperty, but it is a good practice so that you have consistent names with Go. Below is a shorted version of a Customer class without all the extras such as getters, setters, hashCode, etc, that you know you need. In the example below I am using standard Java serialization, but you can use POF serialization if you have that configured.

package com.oracle.demo;

public class Customer implements Serializable {
	public Customer() {} // required

	@JsonbProperty("id")
	private int id;

	@JsonbProperty("customerName")
	private String  customerName;

	@JsonbProperty("outstandingBalance")
	private double outstandingBalance;

	...

Step 2. Define your type alias.

In the code deployed to your Coherence storage-nodes, you need to create a file in your resources root called META-INF/type-aliases.properties which contains an alias and fully qualified class name for each of your classes.

# Example META-INF/type-aliases.properties file
customer=com.oracle.demo.Customer
order=com.oracle.demo.Order

Step 3. Define your Go structs

Next you need to define your Go structs with JSON names matching your Java objects. You also need to include a Class attribute with the JSON attribute name of "@class". We will set this in our object to the value "customer" matching the value in the type-aliases.properties on the server.

type Customer struct {
    Class              string   `json:"@class"`
    ID                 int      `json:"id"`
    CustomerName       string   `json:"customerName"`
    OutstandingBalance float32  `json:"outstandingBalance"`
}

Step 4. Create and put the value

Lastly, when you create a Customer object you must set the Class value matching the alias above.

customer := Customer{
    Class:              "customer",
    ID:                 1,
    CustomerName:       "Tim",
    OutstandingBalance: 10000,
}

// store the entry in Coherence, it will be stored as a com.oracle.demo.Customer POJO!

_, err = namedMap.Put(ctx, customer.ID, customer)
if err != nil {
    log.Fatal(err)
}

Using Near Caches

The Coherence Go client allows you to specify a near cache to cache frequently accessed data in your Go application. When you access data using Get() or GetAll() operations, returned entries are stored in the near cache and subsequent data access for keys in the near cache is almost instant where without a near cache each operation above always results in a network call.

On creating a near cache, Coherence automatically adds a MapListener to your NamedMap or NamedCache which listens on all cache events and updates or invalidates entries in the near cache that have been changed or removed on the server.

To manage the amount of memory used by the near cache, the following options are supported when creating one:

  • time-to-live (TTL) – objects expired after time in near cache, e.g. 5 minutes
  • High-Units – maximum number of cache entries in the near cache
  • Memory – maximum amount of memory used by cache entries

Note: You can specify either High-Units or Memory and in either case, optionally, a TTL.

The above can be specified by passing NearCacheOptions within WithNearCache when creating a NamedMap or NamedCache. See below for various ways of creating near caches.

You can ask a NamedMap or NamedCache for its near cache statistics by calling GetNearCacheStats(). Various statistics are recorded with regards to the near cache and can be seen via the CacheStats interface. If the NamedMap or NamedCache does not have a near cache, nil will be returned.

1. Creating a Near Cache specifying time-to-live (TTL)

The following example shows how to get a named cache that will cache entries from Get() or GetAll() for up to 30 seconds.

// specify a TTL of 30 seconds
nearCacheOptions := coherence.NearCacheOptions{TTL: time.Duration(30) * time.Second}

namedMap, err := coherence.GetNamedMap[int, string](session, "customers", coherence.WithNearCache(&nearCacheOptions))
if err != nil {
    log.Fatal(err)
}

// issue first Get for data in the cache on the storage-nodes. Entries found will be stored in near cache
value, err = namedMap.Get(ctx, 1)
if err != nil {
    panic(err)
}

// subsequent access will be almost instant from near cache
value, err = namedMap.Get(ctx, 1)

// you can check the near cache stats
fmt.Println("Near cache size is", namedMap.GetNearCacheStats().Size())

// output: "Near cache size is 1"

2. Creating a Near Cache specifying maximum number of entries to store

The following example shows how to get a named cache that will cache up to 100 entries from Get() or GetAll(). When the threshold of HighUnits is reached, the near cache is pruned to 80% of its size and evicts least recently accessed and created entries.

// specify HighUnits of 1000
nearCacheOptions := coherence.NearCacheOptions{HighUnits: 1000}

namedMap, err := coherence.GetNamedMap[int, string](session, "customers", coherence.WithNearCache(&nearCacheOptions))
if err != nil {
    log.Fatal(err)
}

// assume we have 2000 entries in the coherence cache, issue 1000 gets and the near cache will have 100 entries
for i := 1; i <= 1000; i++ {
	_, err = namedMap.Get(ctx, i)
	if err != nil {
		panic(err)
	}
}

fmt.Println("Near cache size is", namedMap.GetNearCacheStats().Size())
// output: "Near cache size is 1000"

// issue a subsequent Get() for an entry not in the near cache and the cache will be pruned to 80%
customer, err = namedMap.Get(ctx, 1)

fmt.Println("Near cache size is", namedCache.GetNearCacheStats().Size())
// output: "Near cache size is 800"

3. Creating a Near Cache specifying maximum memory to use

The following example shows how to get a named cache that will cache up to 10KB of entries from Get() or GetAll(). When the threshold of HighUnits is reached, the near cache is pruned to 80% of its size and evicts least recently accessed and created entries.

// specify HighUnits of 1000
nearCacheOptions := coherence.NearCacheOptions{HighUnitsMemory: 10 * 1024}

namedMap, err := coherence.GetNamedMap[int, string](session, "customers", coherence.WithNearCache(&nearCacheOptions))
if err != nil {
    log.Fatal(err)
}

// assume we have 5000 entries in the coherence cache, issue 5000 gets and the near cache will be pruned and
// not have the full 5000 entries as it does not fit within 10KB.
for i := 1; i <= 5000; i++ {
	_, err = namedMap.Get(ctx, i)
	if err != nil {
		panic(err)
	}
}

// print the near cache stats via String()
fmt.Println(namedMap.GetNearCacheStats())
// localCache{name=customers options=localCacheOptions{ttl=0s, highUnits=0, highUnitsMemory=10.0KB, invalidation=ListenAll},
// stats=CacheStats{puts=5000, gets=5000, hits=0, misses=5000, missesDuration=4.95257111s, hitRate=0, prunes=7, prunesDuration=196.498µs, size=398, memoryUsed=9.3KB}}

Index

Constants

View Source
const (
	// EntryInserted this event indicates that an entry has been added to the cache.
	EntryInserted MapEventType = "insert"

	// EntryUpdated this event indicates that an entry has been updated in the cache.
	EntryUpdated MapEventType = "update"

	// EntryDeleted this event indicates that an entry has been removed from the cache.
	EntryDeleted MapEventType = "delete"

	// Destroyed raised when a storage for a given cache is destroyed
	// usually as a result of a call to NamedMap.Destroy().
	Destroyed MapLifecycleEventType = "map_destroyed"

	// Truncated raised when a storage for a given cache is truncated
	// as a result of a call to NamedMap.Truncate().
	Truncated MapLifecycleEventType = "map_truncated"

	// Released raised when the local resources for a cache has been released
	// as a result of a call to NamedMap.Release().
	Released MapLifecycleEventType = "map_released"

	// Connected raised when the session has connected.
	Connected SessionLifecycleEventType = "session_connected"

	// Disconnected raised when the session has disconnected.
	Disconnected SessionLifecycleEventType = "session_disconnected"

	// Reconnected raised when the session has re-connected.
	Reconnected SessionLifecycleEventType = "session_reconnected"

	// Closed raised when the session has been closed.
	Closed SessionLifecycleEventType = "session_closed"
)
View Source
const (
	KB = 1024
	MB = KB * KB
	GB = MB * KB
)

Variables

View Source
var (
	// ErrDestroyed indicates that the NamedMap or NamedCache has been destroyed and can no-longer be used.
	ErrDestroyed = errors.New("the NamedMap or NamedCache has been destroyed and is not usable")

	// ErrReleased indicates that the NamedMap or NamedCache has been released and can no-longer be used.
	ErrReleased = errors.New("the NamedMap or NamedCache has been released and is not usable")

	// ErrClosed indicates that the session has been closed.
	ErrClosed = errors.New("the session is closed and is not usable")

	// ErrShutdown indicates the gRPC channel has been shutdown.
	ErrShutdown = errors.New("gRPC channel has been shutdown")
)
View Source
var (
	ErrInvalidFormat             = errors.New("format can only be 'json'")
	ErrInvalidNearCache          = errors.New("you must specify at least one near cache option")
	ErrInvalidNearCacheWithTTL   = errors.New("when using TTL for near cache you can only specify highUnits or highUnitsMemory")
	ErrInvalidNearCacheWithNoTTL = errors.New("you can only specify highUnits or highUnitsMemory, not both")
	ErrNegativeNearCacheOptions  = errors.New("you cannot specify negative values for near cache options")
)

ErrInvalidFormat indicates that the serialization format can only be JSON.

View Source
var (

	// ErrDone indicates that there are no more entries to return.
	ErrDone = errors.New("iterator done")
)

Functions

func AddIndex

func AddIndex[K comparable, V, T, E any](ctx context.Context, nm NamedMap[K, V], extractor extractors.ValueExtractor[T, E], sorted bool) error

AddIndex adds the index based upon the supplied extractors.ValueExtractor. The type parameters are T = type to extract from and E = type of the extracted value.

The example below shows how to add a sorted index (on age) on the age attribute.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

if err = coherence.AddIndex(ctx, namedMap, extractors.Extract[int]("age"), true); err != nil {
    log.Fatal(err)
}

func AddIndexWithComparator

func AddIndexWithComparator[K comparable, V, T, E any](ctx context.Context, nm NamedMap[K, V], extractor extractors.ValueExtractor[T, E], comparator extractors.ValueExtractor[T, E]) error

AddIndexWithComparator adds the index based upon the supplied extractors.ValueExtractor and comparator. The type parameters are T = type to extract from and E = type of the extracted value.

The example below shows how to add an index on the age attribute sorted by name.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

err = coherence.AddIndexWithComparator(ctx, namedMap, extractors.Extract[int]("age"), extractors.Extract[int]("name"))
if err != nil {
    log.Fatal(err)
}

func Aggregate

func Aggregate[K comparable, V, R any](ctx context.Context, nm NamedMap[K, V], aggr aggregators.Aggregator[R]) (*R, error)

Aggregate performs an aggregating operation (identified by aggregator) against all the entries in a NamedMap or NamedCache. The type parameter is R = type of the result of the aggregation.

The example below shows how to get the average age of all people.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

// Note: the Average aggregator returns a big.Rat
bigRat, err = coherence.Aggregate(ctx, namedMap aggregators.Average(extractors.Extract[int]("age")))
if err != nil {
    log.Fatal(err)
}
value, _ := bigRat.Float32()
fmt.Printf("Average age of people is %.2f\n", value)

func AggregateFilter

func AggregateFilter[K comparable, V, R any](ctx context.Context, nm NamedMap[K, V], filter filters.Filter, aggr aggregators.Aggregator[R]) (*R, error)

AggregateFilter performs an aggregating operation (identified by aggregator) against the set of entries selected by the specified filter. The type parameter is R = type of the result of the aggregation.

The example below shows how to get the count of people ages older than 19.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

count, err = coherence.AggregateFilter(ctx, namedMap, filters.Greater(extractors.Extract[int]("age"), 19), aggregators.Count())
if err != nil {
    log.Fatal(err)
}
fmt.Println("Number of people aged greater than 19 is", *count)

func AggregateKeys

func AggregateKeys[K comparable, V, R any](ctx context.Context, nm NamedMap[K, V], keys []K, aggr aggregators.Aggregator[R]) (*R, error)

AggregateKeys performs an aggregating operation (identified by aggregator) against the set of entries selected by the specified keys. The type parameter is R = type of the result of the aggregation.

The example below shows how to get the minimum age across the people with keys 3, 4, and 5.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

minAge, err = coherence.AggregateKeys(ctx, namedMap, []int{3, 4, 5}, aggregators.Min(extractors.Extract[int]("age")))
if err != nil {
    log.Fatal(err)
}
fmt.Println("Minimum age of people with keys 3, 4 and 5 is", *minAge)

func Invoke

func Invoke[K comparable, V, R any](ctx context.Context, nm NamedMap[K, V], key K, proc processors.Processor) (*R, error)

Invoke the specified processor against the entry mapped to the specified key. Processors are invoked atomically against a specific entry as the process may mutate the entry. The type parameter is R = type of the result of the invocation.

The example below shows how to run an entry processor to increment the age of person identified by the key 1.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

newAge, err := coherence.Invoke[int, Person, int](ctx, namedMap, 1, processors.Increment("age", 1))
fmt.Println("New age is", *newAge)

func InvokeAll

func InvokeAll[K comparable, V any, R any](ctx context.Context, nm NamedMap[K, V], proc processors.Processor) <-chan *StreamedValue[R]

InvokeAll invokes the specified function against all entries in a NamedMap. Functions are invoked atomically against a specific entry as the function may mutate the entry. The type parameter is R = type of the result of the invocation.

The example below shows how to run an entry processor to increment the age of all people. This function returns a stream of StreamedValue[R] of the values changed.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := coherence.InvokeAll[int, Person, int](ctx, namedMap, processors.Increment("age", 1))
for se := range ch {
    // Check the error
    if se.Err != nil {
        // process the error
        log.Println(se.Err)
    } else {
        // process the result which will be the key of the person changed
        fmt.Println(se.Value)
    }
}

func InvokeAllFilter

func InvokeAllFilter[K comparable, V any, R any](ctx context.Context, nm NamedMap[K, V], fltr filters.Filter, proc processors.Processor) <-chan *StreamedValue[R]

InvokeAllFilter invokes the specified function against the entries matching the specified filter. Functions are invoked atomically against a specific entry as the function may mutate the entry. The type parameter is R = type of the result of the invocation.

The example below shows how to run an entry processor to increment the age of any people older than 1. This function returns a stream of StreamedValue[R] of the values changed.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

age := extractors.Extract[int]("age")

ch := coherence.InvokeAllFilter[int, Person, int](ctx, namedMap, filters.Greater(age, 1), processors.Increment("age", 1)
for se := range ch {
    // Check the error
    if se.Err != nil {
        // process the error
        log.Println(se.Err)
    } else {
        // process the result which will be the person changed
        fmt.Println(se.Value)
    }
}

func InvokeAllKeys

func InvokeAllKeys[K comparable, V any, R any](ctx context.Context, nm NamedMap[K, V], keys []K, proc processors.Processor) <-chan *StreamedValue[R]

InvokeAllKeys invokes the specified function against the entries matching the specified keys. Functions are invoked atomically against a specific entry as the function may mutate the entry. The type parameter is R = type of the result of the invocation.

The example below shows how to run an entry processor to increment the age of any people with keys 1 and 2. This function returns a stream of StreamedValue[R] of the values changed.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := coherence.InvokeAllKeys[int, Person, int](ctx, namedMap, []int{1, 2}, processors.Increment("age", 1))
for se := range ch {
    // Check the error
    if se.Err != nil {
        // process the error
        log.Println(se.Err)
    } else {
        // process the result which will be the key of the person changed
        fmt.Println(se.Value)
    }
}

func RemoveIndex

func RemoveIndex[K comparable, V, T, E any](ctx context.Context, nm NamedMap[K, V], extractor extractors.ValueExtractor[T, E]) error

RemoveIndex removes index based upon the supplied extractors.ValueExtractor. The type parameters are T = type to extract from and E = type of the extracted value.

The example below shows how to remove and index on the age attribute.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

if err = coherence.RemoveIndex(ctx, namedMap, extractors.Extract[int]("age")); err != nil {
    log.Fatal(err)
}

func WithAddress

func WithAddress(host string) func(sessionOptions *SessionOptions)

WithAddress returns a function to set the address for session.

func WithDisconnectTimeout

func WithDisconnectTimeout(timeout time.Duration) func(sessionOptions *SessionOptions)

WithDisconnectTimeout returns a function to set the maximum amount of time, in millis, a Session may remain in a disconnected state without successfully reconnecting.

func WithExpiry

func WithExpiry(ttl time.Duration) func(cacheOptions *CacheOptions)

WithExpiry returns a function to set the default expiry for a NamedCache. This option is not valid on NamedMap.

func WithFormat

func WithFormat(format string) func(sessionOptions *SessionOptions)

WithFormat returns a function to set the format for a session. Currently, only "json" is supported.

func WithIgnoreInvalidCerts added in v1.0.1

func WithIgnoreInvalidCerts() func(sessionOptions *SessionOptions)

WithIgnoreInvalidCerts returns a function to set the connection to ignore invalid certificates for a session.

func WithNearCache added in v1.1.0

func WithNearCache(options *NearCacheOptions) func(cacheOptions *CacheOptions)

WithNearCache returns a function to set NearCacheOptions.

func WithPlainText

func WithPlainText() func(sessionOptions *SessionOptions)

WithPlainText returns a function to set the connection to plan text (insecure) for a session.

func WithReadyTimeout

func WithReadyTimeout(timeout time.Duration) func(sessionOptions *SessionOptions)

WithReadyTimeout returns a function to set the maximum amount of time an NamedMap or NamedCache operations may wait for the underlying gRPC channel to be ready. This is independent of the request timeout which sets a deadline on how long the call may take after being dispatched.

func WithRequestTimeout

func WithRequestTimeout(timeout time.Duration) func(sessionOptions *SessionOptions)

WithRequestTimeout returns a function to set the request timeout in millis.

func WithScope

func WithScope(scope string) func(sessionOptions *SessionOptions)

WithScope returns a function to set the scope for a session. This will prefix all caches with the provided scope to make them unique within a scope.

func WithTLSCertsPath added in v1.0.1

func WithTLSCertsPath(path string) func(sessionOptions *SessionOptions)

WithTLSCertsPath returns a function to set the (CA) certificates to be added for a session.

func WithTLSClientCert added in v1.0.1

func WithTLSClientCert(path string) func(sessionOptions *SessionOptions)

WithTLSClientCert returns a function to set the client certificate to be added for a session.

func WithTLSClientKey added in v1.0.1

func WithTLSClientKey(path string) func(sessionOptions *SessionOptions)

WithTLSClientKey returns a function to set the client key to be added for a session.

func WithTLSConfig added in v1.0.1

func WithTLSConfig(tlsConfig *tls.Config) func(sessionOptions *SessionOptions)

WithTLSConfig returns a function to set the tls.Config directly. This is typically used when you require fine-grained control over these options.

Types

type CacheOptions

type CacheOptions struct {
	DefaultExpiry    time.Duration
	NearCacheOptions *NearCacheOptions
}

CacheOptions holds various cache options.

type CacheStats added in v1.1.0

type CacheStats interface {
	GetCacheHits() int64                   // the number of entries served from the near cache
	GetCacheMisses() int64                 // the number of entries that had to be retrieved from the cluster
	GetCacheMissesDuration() time.Duration // the total duration of all misses
	GetHitRate() float32                   // the hit rate of the near cache
	GetCachePuts() int64                   // the number of entries put in the near cache
	GetTotalGets() int64                   // the number of gets against the near cache
	GetCachePrunes() int64                 // the number of times the near cache was pruned
	GetCachePrunesDuration() time.Duration // the duration of all prunes
	Size() int                             // the number of entries in the near cache
	SizeBytes() int64                      // the number of bytes used by the entries (keys and values) in the near cache
	ResetStats()                           // reset the stats for the near cache, not including Size() or SizeBytes()
}

CacheStats defines various statics for near caches.

type Entry

type Entry[K comparable, V any] struct {
	Key   K
	Value V
}

Entry represents a returned entry from entryPageIterator.

type InvalidationStrategyType added in v1.1.0

type InvalidationStrategyType int

InvalidationStrategyType described the type if invalidation strategies for near cache.

const (
	ListenAll InvalidationStrategyType = 0
)

type JSONSerializer

type JSONSerializer[T any] struct {
	// contains filtered or unexported fields
}

JSONSerializer serializes data using JSON.

func (JSONSerializer[T]) Deserialize

func (s JSONSerializer[T]) Deserialize(data []byte) (*T, error)

Deserialize deserialized an object and returns the correct type of T.

func (JSONSerializer[T]) Format

func (s JSONSerializer[T]) Format() string

func (JSONSerializer[T]) Serialize

func (s JSONSerializer[T]) Serialize(object T) ([]byte, error)

Serialize serializes an object of type T and returns the []byte representation.

type MapEvent

type MapEvent[K comparable, V any] interface {
	Source() NamedMap[K, V]
	Key() (*K, error)
	OldValue() (*V, error)
	NewValue() (*V, error)
	Type() MapEventType
}

MapEvent an event which indicates that the content of the NamedMap or NamedCache has changed (i.e., an entry has been added, updated, and/or removed).

type MapEventType

type MapEventType string

MapEventType describes an event raised by a cache mutation.

type MapLifecycleEvent

type MapLifecycleEvent[K comparable, V any] interface {
	Source() NamedMap[K, V]
	Type() MapLifecycleEventType
}

type MapLifecycleEventType

type MapLifecycleEventType string

MapLifecycleEventType describes an event that may be raised during the lifecycle of cache.

type MapLifecycleListener

type MapLifecycleListener[K comparable, V any] interface {
	OnAny(callback func(MapLifecycleEvent[K, V])) MapLifecycleListener[K, V]
	OnDestroyed(callback func(MapLifecycleEvent[K, V])) MapLifecycleListener[K, V]
	OnTruncated(callback func(MapLifecycleEvent[K, V])) MapLifecycleListener[K, V]
	OnReleased(callback func(MapLifecycleEvent[K, V])) MapLifecycleListener[K, V]
	// contains filtered or unexported methods
}

MapLifecycleListener allows registering callbacks to be notified when lifecycle events (truncated or released) occur against a NamedMap or NamedCache.

func NewMapLifecycleListener

func NewMapLifecycleListener[K comparable, V any]() MapLifecycleListener[K, V]

NewMapLifecycleListener creates and returns a pointer to a new MapLifecycleListener instance.

type MapListener

type MapListener[K comparable, V any] interface {
	OnInserted(callback func(MapEvent[K, V])) MapListener[K, V]
	OnUpdated(callback func(MapEvent[K, V])) MapListener[K, V]
	OnDeleted(callback func(MapEvent[K, V])) MapListener[K, V]
	OnAny(callback func(MapEvent[K, V])) MapListener[K, V]
	// contains filtered or unexported methods
}

MapListener allows registering callbacks to be notified when mutations events occur within a NamedMap or NamedCache.

func NewMapListener

func NewMapListener[K comparable, V any]() MapListener[K, V]

NewMapListener creates and returns a pointer to a new MapListener instance.

type NamedCache

type NamedCache[K comparable, V any] interface {
	NamedMap[K, V]

	// PutWithExpiry associates the specified value with the specified key. If the cache
	// previously contained a value for this key, the old value is replaced.
	// This variation of the Put(ctx context.Context, key K, value V)
	// function allows the caller to specify an expiry (or "time to live")
	// for the cache entry.  If coherence.ExpiryNever < ttl < 1 millisecond,
	// ttl is set to 1 millisecond.
	// V will be nil if there was no previous value.
	PutWithExpiry(ctx context.Context, key K, value V, ttl time.Duration) (*V, error)
}

NamedCache is syntactically identical in behaviour to a NamedMap, but additionally implements the PutWithExpiry operation. The type parameters are K = type of the key and V = type of the value.

func GetNamedCache

func GetNamedCache[K comparable, V any](session *Session, cacheName string, options ...func(session *CacheOptions)) (NamedCache[K, V], error)

GetNamedCache returns a NamedCache from a session. NamedCache is syntactically identical in behaviour to a NamedMap, but additionally implements the PutWithExpiry function. If there is an existing NamedCache defined with the same type parameters and name it will be returned, otherwise a new instance will be returned. An error will be returned if there already exists a NamedCache with the same name and different type parameters.

// connect to the default address localhost:1408
session, err = coherence.NewSession(ctx)
if err != nil {
    log.Fatal(err)
}

namedMap, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

If you wish to create a NamedCache that has the same expiry for each entry you can use the coherence.WithExpiry cache option. Each call to Put will use the default expiry you have specified. If you use PutWithExpiry, this will override the default expiry for that key.

namedCache, err := coherence.GetNamedCache[int, Person](session, "cache-expiry", coherence.WithExpiry(time.Duration(5)*time.Second))

type NamedCacheClient

type NamedCacheClient[K comparable, V any] struct {
	NamedCache[K, V]
	// contains filtered or unexported fields
}

NamedCacheClient is the implementation of the NamedCache interface. The type parameters are K = type of the key and V= type of the value.

func (*NamedCacheClient[K, V]) AddFilterListener

func (nc *NamedCacheClient[K, V]) AddFilterListener(ctx context.Context, listener MapListener[K, V], filter filters.Filter) error

AddFilterListener Adds a MapListener that will receive events (inserts, updates, deletes) that occur against the map where entries satisfy the specified filters.Filter, with the key, and optionally, the old-value and new-value included.

func (*NamedCacheClient[K, V]) AddFilterListenerLite

func (nc *NamedCacheClient[K, V]) AddFilterListenerLite(ctx context.Context, listener MapListener[K, V], filter filters.Filter) error

AddFilterListenerLite Adds a MapListener that will receive events (inserts, updates, deletes) that occur against the map where entries satisfy the specified filters.Filter, with the key, the old-value and new-value included.

func (*NamedCacheClient[K, V]) AddKeyListener

func (nc *NamedCacheClient[K, V]) AddKeyListener(ctx context.Context, listener MapListener[K, V], key K) error

AddKeyListener Adds a MapListener that will receive events (inserts, updates, deletes) that occur against the specified key within the map, with the key, old-value and new-value included.

func (*NamedCacheClient[K, V]) AddKeyListenerLite

func (nc *NamedCacheClient[K, V]) AddKeyListenerLite(ctx context.Context, listener MapListener[K, V], key K) error

AddKeyListenerLite Adds a ]MapListener] that will receive events (inserts, updates, deletes) that occur against the specified key within the map, with the key, and optionally, the old-value and new-value included.

func (*NamedCacheClient[K, V]) AddLifecycleListener

func (nc *NamedCacheClient[K, V]) AddLifecycleListener(listener MapLifecycleListener[K, V])

AddLifecycleListener Adds a MapLifecycleListener that will receive events (truncated or released) that occur against the NamedCache.

func (*NamedCacheClient[K, V]) AddListener

func (nc *NamedCacheClient[K, V]) AddListener(ctx context.Context, listener MapListener[K, V]) error

AddListener Adds a MapListener that will receive events (inserts, updates, deletes) that occur against the map, with the key, old-value and new-value included. This call is equivalent to calling AddFilterListener with filters.Always as the filter.

func (*NamedCacheClient[K, V]) AddListenerLite

func (nc *NamedCacheClient[K, V]) AddListenerLite(ctx context.Context, listener MapListener[K, V]) error

AddListenerLite Adds a MapListener that will receive events (inserts, updates, deletes) that occur against the map, with the key, and optionally, the old-value and new-value included. This call is equivalent to calling [AddFilterListenerLite] with filters.Always as the filter.

func (*NamedCacheClient[K, V]) Clear

func (nc *NamedCacheClient[K, V]) Clear(ctx context.Context) error

Clear removes all mappings from this NamedCache. This operation is observable and will trigger any registered events.

func (*NamedCacheClient[K, V]) ContainsEntry

func (nc *NamedCacheClient[K, V]) ContainsEntry(ctx context.Context, key K, value V) (bool, error)

ContainsEntry returns true if this NamedCache contains a mapping for the specified key and value.

The example below shows how to check if a NamedCache contains a mapping for the key 1 and person.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}
person := Person{ID: 1, Name: "Tim"}

if found, err = namedCache.ContainsEntry(ctx, person.ID, person); err != nil {
   log.Fatal(err)
}

func (*NamedCacheClient[K, V]) ContainsKey

func (nc *NamedCacheClient[K, V]) ContainsKey(ctx context.Context, key K) (bool, error)

ContainsKey returns true if this NamedCache contains a mapping for the specified key.

The example below shows how to check if a NamedCache contains a mapping for the key 1.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

if found, err = namedCache.ContainsKey(ctx, 1); err != nil {
   log.Fatal(err)
}

func (*NamedCacheClient[K, V]) ContainsValue

func (nc *NamedCacheClient[K, V]) ContainsValue(ctx context.Context, value V) (bool, error)

ContainsValue returns true if this NamedCache contains a mapping for the specified value.

The example below shows how to check if a NamedCache contains a mapping for the person.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}
person := Person{ID: 1, Name: "Tim"}

if found, err = namedCache.ContainsValue(ctx, person); err != nil {
   log.Fatal(err)
}

func (*NamedCacheClient[K, V]) Destroy

func (nc *NamedCacheClient[K, V]) Destroy(ctx context.Context) error

Destroy releases and destroys this instance of NamedCache. Warning This method is used to completely destroy the specified NamedCache across the cluster. All references in the entire cluster to this cache will be invalidated, the data will be cleared, and all internal resources will be released. Note: the removal of entries caused by this operation will not be observable.

func (*NamedCacheClient[K, V]) EntrySet

func (nc *NamedCacheClient[K, V]) EntrySet(ctx context.Context) <-chan *StreamedEntry[K, V]

EntrySet returns a channel from which all entries can be obtained.

Note: the entries are paged internally to avoid excessive memory usage, but you need to be careful when running this operation against NamedCaches with large number of entries.

The example below shows how to iterate the entries in a NamedCache.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedCache.EntrySet(ctx)
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key, "Value:", result.Value)
    }
}

func (*NamedCacheClient[K, V]) EntrySetFilter

func (nc *NamedCacheClient[K, V]) EntrySetFilter(ctx context.Context, fltr filters.Filter) <-chan *StreamedEntry[K, V]

EntrySetFilter returns a channel from which entries satisfying the specified filter can be obtained. Each entry in the channel is of type *StreamEntry which wraps an error and the result. As always, the result must be accessed (and will be valid) only if the error is nil.

The example below shows how to iterate the entries in a NamedCache where the age > 20.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedCache.EntrySetFilter(ctx, filters.GreaterEqual(extractors.Extract[int]("age"), 20))
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key, "Value:", result.Value)
    }
}

func (*NamedCacheClient[K, V]) Get

func (nc *NamedCacheClient[K, V]) Get(ctx context.Context, key K) (*V, error)

Get returns the value to which the specified key is mapped. V will be nil if this NamedCache contains no mapping for the key.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

person, err = namedCache.Get(1)
if err != nil {
    log.Fatal(err)
}
if person != nil {
    fmt.Println("Person is", *value)
} else {
    fmt.Println("No person found")
}

func (*NamedCacheClient[K, V]) GetAll

func (nc *NamedCacheClient[K, V]) GetAll(ctx context.Context, keys []K) <-chan *StreamedEntry[K, V]

GetAll returns a channel from which entries satisfying the specified filter can be obtained. Each entry in the channel is of type *StreamedEntry which wraps an error and the result. As always, the result must be accessed (and will be valid) only if the error is nil.

The example below shows how to get all the entries for keys 1, 3 and 4.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedCache.GetAll(ctx, []int{1, 3, 4})
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key, "Value:", result.Value)
    }
}

func (*NamedCacheClient[K, V]) GetCacheName added in v1.1.1

func (nc *NamedCacheClient[K, V]) GetCacheName() string

GetCacheName returns the cache name of the NamedCache.

func (*NamedCacheClient[K, V]) GetNearCacheStats added in v1.1.0

func (nc *NamedCacheClient[K, V]) GetNearCacheStats() CacheStats

GetNearCacheStats returns the CacheStats for a near cache for a NamedMap. If no near cache is defined, nil is returned.

func (*NamedCacheClient[K, V]) GetOrDefault

func (nc *NamedCacheClient[K, V]) GetOrDefault(ctx context.Context, key K, def V) (*V, error)

GetOrDefault will return the value mapped to the specified key, or if there is no mapping, it will return the specified default.

func (*NamedCacheClient[K, V]) GetSession

func (nc *NamedCacheClient[K, V]) GetSession() *Session

GetSession returns the session.

func (*NamedCacheClient[K, V]) IsEmpty

func (nc *NamedCacheClient[K, V]) IsEmpty(ctx context.Context) (bool, error)

IsEmpty returns true if this NamedCache contains no mappings.

func (*NamedCacheClient[K, V]) IsReady added in v1.0.1

func (nc *NamedCacheClient[K, V]) IsReady(ctx context.Context) (bool, error)

IsReady returns whether this NamedCache is ready to be used. An example of when this method would return false would be where a partitioned cache service that owns this cache has no storage-enabled members. If it is not supported by the gRPC proxy, an error will be returned.

func (*NamedCacheClient[K, V]) KeySet

func (nc *NamedCacheClient[K, V]) KeySet(ctx context.Context) <-chan *StreamedKey[K]

KeySet returns a channel from which keys of all entries can be obtained.

Note: the entries are paged internally to avoid excessive memory usage, but you need to be careful when running this operation against [NamedCache]s with large number of entries.

The example below shows how to iterate the keys in a NamedCache.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedCache.KeySet(ctx)
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key)
    }
}

func (*NamedCacheClient[K, V]) KeySetFilter

func (nc *NamedCacheClient[K, V]) KeySetFilter(ctx context.Context, fltr filters.Filter) <-chan *StreamedKey[K]

KeySetFilter returns a channel from which keys of the entries that satisfy the filter can be obtained. Each entry in the channel is of type *StreamEntry which wraps an error and the key. As always, the result must be accessed (and will be valid) only if the error is nil.

The example below shows how to iterate the keys in a NamedCache where the age > 20.

namedCache, err := coherence.GetNamedCache[int, Person](session,"people")
if err != nil {
    log.Fatal(err)
}

ch := namedCache.KeySetFilter(ctx, filters.GreaterEqual(extractors.Extract[int]("age"), 20))
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key)
    }
}

func (*NamedCacheClient[K, V]) Name

func (nc *NamedCacheClient[K, V]) Name() string

Name returns the name of the NamedCache].

func (*NamedCacheClient[K, V]) Put

func (nc *NamedCacheClient[K, V]) Put(ctx context.Context, key K, value V) (*V, error)

Put associates the specified value with the specified key returning the previously mapped value, if any. V will be nil if there was no previous value.

func (*NamedCacheClient[K, V]) PutAll

func (nc *NamedCacheClient[K, V]) PutAll(ctx context.Context, entries map[K]V) error

PutAll copies all the mappings from the specified map to this NamedCache. This is the most efficient way to add multiple entries into a NamedCache as it is carried out in parallel and no previous values are returned.

var peopleData = map[int]Person{
    1: {ID: 1, Name: "Tim", Age: 21},
    2: {ID: 2, Name: "Andrew", Age: 44},
    3: {ID: 3, Name: "Helen", Age: 20},
    4: {ID: 4, Name: "Alexa", Age: 12},
}

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

if err = namedCache.PutAll(ctx, peopleData); err != nil {
    log.Fatal(err)
}

func (*NamedCacheClient[K, V]) PutIfAbsent

func (nc *NamedCacheClient[K, V]) PutIfAbsent(ctx context.Context, key K, value V) (*V, error)

PutIfAbsent adds the specified mapping if the key is not already associated with a value in the NamedCache and returns nil, else returns the current value.

func (*NamedCacheClient[K, V]) PutWithExpiry

func (nc *NamedCacheClient[K, V]) PutWithExpiry(ctx context.Context, key K, value V, ttl time.Duration) (*V, error)

PutWithExpiry associates the specified value with the specified key. If the NamedCache previously contained a value for this key, the old value is replaced. This variation of the Put(ctx context.Context, key K, value V) function allows the caller to specify an expiry (or "time to live") for the cache entry. If coherence.ExpiryNever < ttl < 1 millisecond, ttl is set to 1 millisecond. V will be nil if there was no previous value.

func (*NamedCacheClient[K, V]) Release

func (nc *NamedCacheClient[K, V]) Release()

Release releases the instance of NamedCache. This operation does not affect the contents of the NamedCache, but only releases the client resources. To access the NamedCache, you must get a new instance.

func (*NamedCacheClient[K, V]) Remove

func (nc *NamedCacheClient[K, V]) Remove(ctx context.Context, key K) (*V, error)

Remove removes the mapping for a key from this NamedCache if it is present and returns the previous value or nil if there wasn't one.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

oldValue, err = namedCache.Remove(ctx, 1)
if err != nil {
    log.Fatal(err)
}

if oldValue == nil {
    fmt.Println("No previous person was found")
} else {
    fmt.Println("Previous person was", *oldValue)
}

func (*NamedCacheClient[K, V]) RemoveFilterListener

func (nc *NamedCacheClient[K, V]) RemoveFilterListener(ctx context.Context, listener MapListener[K, V], f filters.Filter) error

RemoveFilterListener removes the listener that was previously registered to receive events where entries satisfy the specified filters.Filter.

func (*NamedCacheClient[K, V]) RemoveKeyListener

func (nc *NamedCacheClient[K, V]) RemoveKeyListener(ctx context.Context, listener MapListener[K, V], key K) error

RemoveKeyListener removes the listener that was previously registered to receive events against the specified key.

func (*NamedCacheClient[K, V]) RemoveLifecycleListener

func (nc *NamedCacheClient[K, V]) RemoveLifecycleListener(listener MapLifecycleListener[K, V])

RemoveLifecycleListener removes the lifecycle listener that was previously registered to receive events.

func (*NamedCacheClient[K, V]) RemoveListener

func (nc *NamedCacheClient[K, V]) RemoveListener(ctx context.Context, listener MapListener[K, V]) error

RemoveListener removes the listener that was previously registered to receive events.

func (*NamedCacheClient[K, V]) RemoveMapping

func (nc *NamedCacheClient[K, V]) RemoveMapping(ctx context.Context, key K, value V) (bool, error)

RemoveMapping removes the entry for the specified key only if it is currently mapped to the specified value. Returns true if the entry was removed.

func (*NamedCacheClient[K, V]) Replace

func (nc *NamedCacheClient[K, V]) Replace(ctx context.Context, key K, value V) (*V, error)

Replace replaces the entry for the specified key only if it is currently mapped to some value.

func (*NamedCacheClient[K, V]) ReplaceMapping

func (nc *NamedCacheClient[K, V]) ReplaceMapping(ctx context.Context, key K, prevValue V, newValue V) (bool, error)

ReplaceMapping replaces the entry for the specified key only if it is currently mapped to some value. Returns true if the value was replaced.

func (*NamedCacheClient[K, V]) Size

func (nc *NamedCacheClient[K, V]) Size(ctx context.Context) (int, error)

Size returns the number of mappings contained within this NamedCache.

func (*NamedCacheClient[K, V]) String

func (nc *NamedCacheClient[K, V]) String() string

String returns a string representation of a NamedCacheClient.

func (*NamedCacheClient[K, V]) Truncate

func (nc *NamedCacheClient[K, V]) Truncate(ctx context.Context) error

Truncate removes all mappings from this NamedCache. Note: the removal of entries caused by this truncate operation will not be observable.

func (*NamedCacheClient[K, V]) Values

func (nc *NamedCacheClient[K, V]) Values(ctx context.Context) <-chan *StreamedValue[V]

Values returns a view of all values contained in this NamedCache.

Note: the entries are paged internally to avoid excessive memory usage, but you need to be careful when running this operation against NamedCaches with large number of entries.

The example below shows how to iterate the values in a NamedCache.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedCache.Values(ctx)
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Value:", result.Value)
    }
}

func (*NamedCacheClient[K, V]) ValuesFilter

func (nc *NamedCacheClient[K, V]) ValuesFilter(ctx context.Context, fltr filters.Filter) <-chan *StreamedValue[V]

ValuesFilter returns a view of filtered values contained in this NamedCache. The returned channel will be asynchronously filled with values in the NamedCache that satisfy the filter.

The example below shows how to iterate the values in a NamedCache where the age > 20.

namedCache, err := coherence.GetNamedCache[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedCache.ValuesFilter(ctx, filters.GreaterEqual(extractors.Extract[int]("age"), 20))
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Value:", result.Value)
    }
}

type NamedMap

type NamedMap[K comparable, V any] interface {
	// AddLifecycleListener Adds a MapLifecycleListener that will receive events (truncated, released) that occur
	// against the NamedMap.
	AddLifecycleListener(listener MapLifecycleListener[K, V])

	// AddFilterListener adds a MapListener that will receive events (inserts, updates, deletes) that occur
	// against the NamedMap where entries satisfy the specified filters.Filter, with the key, and optionally,
	// the old-value and new-value included.
	AddFilterListener(ctx context.Context, listener MapListener[K, V], filter filters.Filter) error

	// AddFilterListenerLite adds a MapListener that will receive events (inserts, updates, deletes) that occur
	// against the NamedMap where entries satisfy the specified filters.Filter, with the key,
	// the old-value and new-value included.
	AddFilterListenerLite(ctx context.Context, listener MapListener[K, V], filter filters.Filter) error

	// AddKeyListener adds a MapListener that will receive events (inserts, updates, deletes) that occur
	// against the specified key within the NamedMap, with the key, old-value and new-value included.
	AddKeyListener(ctx context.Context, listener MapListener[K, V], key K) error

	// AddKeyListenerLite adds a MapListener that will receive events (inserts, updates, deletes) that occur
	// against the specified key within the NamedMap, with the key, and optionally, the old-value and new-value included.
	AddKeyListenerLite(ctx context.Context, listener MapListener[K, V], key K) error

	// AddListener adds a MapListener that will receive events (inserts, updates, deletes) that occur
	// against the NamedMap, with the key, old-value and new-value included.
	// This call is equivalent to calling AddFilterListener with filters.Always as the filter.
	AddListener(ctx context.Context, listener MapListener[K, V]) error

	// AddListenerLite adds a MapListener that will receive events (inserts, updates, deletes) that occur
	// against the NamedMap, with the key, and optionally, the old-value and new-value included.
	// This call is equivalent to calling AddFilterListenerLite with filters.Always as the filter.
	AddListenerLite(ctx context.Context, listener MapListener[K, V]) error

	// Clear removes all mappings from the NamedMap.
	Clear(ctx context.Context) error

	// Truncate removes all mappings from the NamedMap.
	// Note: the removal of entries caused by this truncate operation will not be observable.
	Truncate(ctx context.Context) error

	// Destroy releases and destroys this instance of NamedMap.
	// Warning This method is used to completely destroy the specified
	// NamedMap across the cluster. All references in the entire cluster to this
	// cache will be invalidated, the data will be cleared, and all
	// internal resources will be released.
	// Note: the removal of entries caused by this truncate operation will not be observable.
	Destroy(ctx context.Context) error

	// Release releases the instance of NamedMap.
	// This operation does not affect the contents of the NamedMap, but only releases the client
	// resources. To access the NamedMap, you must get a new instance.
	Release()

	// ContainsKey returns true if the NamedMap contains a mapping for the specified key.
	ContainsKey(ctx context.Context, key K) (bool, error)

	// ContainsValue returns true if the NamedMap maps one or more keys to the specified value
	ContainsValue(ctx context.Context, value V) (bool, error)

	// ContainsEntry returns true if the NamedMap contains a mapping for the specified key and value.
	ContainsEntry(ctx context.Context, key K, value V) (bool, error)

	// IsEmpty returns true if the NamedMap contains no mappings.
	IsEmpty(ctx context.Context) (bool, error)

	// EntrySetFilter returns a channel from which entries satisfying the specified filter can be obtained.
	// Each entry in the channel is of type *StreamEntry which basically wraps an error and the result.
	// As always, the result must be accessed (and will be valid) only if the error is nil.
	EntrySetFilter(ctx context.Context, filter filters.Filter) <-chan *StreamedEntry[K, V]

	// EntrySet returns a channel from which all entries can be obtained.
	// Note: the entries are paged internally to avoid excessive memory usage, but you need to be
	// careful when running this operation against NamedMaps with large number of entries.
	EntrySet(ctx context.Context) <-chan *StreamedEntry[K, V]

	// Get returns the value to which the specified key is mapped. V will be nil if there was no previous value.
	Get(ctx context.Context, key K) (*V, error)

	// GetAll returns a channel from which entries satisfying the specified filter can be obtained.
	// Each entry in the channel is of type *StreamEntry which basically wraps an error and the result.
	// As always, the result must be accessed (and will be valid) only of the error is nil.
	GetAll(ctx context.Context, keys []K) <-chan *StreamedEntry[K, V]

	// GetOrDefault will return the value mapped to the specified key,
	// or if there is no mapping, it will return the specified default.
	GetOrDefault(ctx context.Context, key K, def V) (*V, error)

	// InvokeAll invokes the specified processor against the entries matching the specified keys or filter.  If no
	// keys or filter are specified, then the function will be run against all entries.
	// Functions are invoked atomically against a specific entry as the function may mutate the entry.
	InvokeAll(ctx context.Context, keysOrFilter any, proc processors.Processor) <-chan *StreamedValue[V]

	// KeySetFilter returns a channel from which keys of the entries that satisfy the filter can be obtained.
	// Each entry in the channel is of type *StreamEntry which basically wraps an error and the key.
	// As always, the result must be accessed (and will be valid) only of the error is nil.
	KeySetFilter(ctx context.Context, filter filters.Filter) <-chan *StreamedKey[K]

	// KeySet returns a channel from which keys of all entries can be obtained.
	// Note: the entries are paged internally to avoid excessive memory usage, but you need to be
	// careful when running this operation against NamedMaps with large number of entries.
	KeySet(ctx context.Context) <-chan *StreamedKey[K]

	// Name returns the name of the NamedMap.
	Name() string

	// Put associates the specified value with the specified key returning the previously
	// mapped value. V will be nil if there was no previous value.
	Put(ctx context.Context, key K, value V) (*V, error)

	// PutAll copies all the mappings from the specified map to the NamedMap.
	PutAll(ctx context.Context, entries map[K]V) error

	// PutIfAbsent adds the specified mapping if the key is not already associated with a value in the NamedMap.
	// Error will be equal to coherence. V will be nil if there was no previous value.
	PutIfAbsent(ctx context.Context, key K, value V) (*V, error)

	// Remove removes the mapping for a key from the NamedMap if it is present and returns the previously
	// mapped value, if any. V will be nil if there was no previous value.
	Remove(ctx context.Context, key K) (*V, error)

	// RemoveLifecycleListener removes the lifecycle listener that was previously registered to receive events.
	RemoveLifecycleListener(listener MapLifecycleListener[K, V])

	// RemoveFilterListener removes the listener that was previously registered to receive events.
	RemoveFilterListener(ctx context.Context, listener MapListener[K, V], filter filters.Filter) error

	// RemoveKeyListener removes the listener that was previously registered to receive events.
	RemoveKeyListener(ctx context.Context, listener MapListener[K, V], key K) error

	// RemoveListener removes the listener that was previously registered to receive events.
	RemoveListener(ctx context.Context, listener MapListener[K, V]) error

	// RemoveMapping removes the entry for the specified key only if it is currently
	// mapped to the specified value. Returns true if the value was removed.
	RemoveMapping(ctx context.Context, key K, value V) (bool, error)

	// Replace replaces the entry for the specified key only if it is
	// currently mapped to some value.
	Replace(ctx context.Context, key K, value V) (*V, error)

	// ReplaceMapping replaces the entry for the specified key only if it is
	// currently mapped to the value. Returns true if the value was replaced.
	ReplaceMapping(ctx context.Context, key K, prevValue V, newValue V) (bool, error)

	// Size returns the number of mappings contained within the NamedMap.
	Size(ctx context.Context) (int, error)

	// GetSession returns the Session associated with the NamedMap.
	GetSession() *Session

	// ValuesFilter returns a view of filtered values contained in the NamedMap.
	// The returned channel will be asynchronously filled with values in the
	// NamedMap that satisfy the filter.
	ValuesFilter(ctx context.Context, filter filters.Filter) <-chan *StreamedValue[V]

	// Values return a view of all values contained in the NamedMap.
	// Note: the entries are paged internally to avoid excessive memory usage, but you need to be
	// careful when running this operation against NamedMaps with large number of entries.
	Values(ctx context.Context) <-chan *StreamedValue[V]

	// IsReady returns whether this NamedMap is ready to be used.
	// An example of when this method would return false would
	// be where a partitioned cache service that owns this cache has no
	// storage-enabled members.
	// If it is not supported by the gRPC proxy, an error will be returned.
	IsReady(ctx context.Context) (bool, error)

	// GetNearCacheStats returns the [CacheStats] for a near cache for a [NamedMap] or [NamedCache].
	// If no near cache is defined, nil is returned.
	GetNearCacheStats() CacheStats

	// GetCacheName returns the cache name of the [NamedMap] or [NamedCache].
	GetCacheName() string
	// contains filtered or unexported methods
}

NamedMap defines the APIs to cache data, mapping keys to values, supporting full concurrency of retrievals and high expected concurrency for updates. Like traditional maps, this object cannot contain duplicate keys; each key can map to at most one value.

As keys and values must be serializable in some manner. The current supported serialization method is JSON.

Instances of this interface are typically acquired via a coherence.Session.

Although all operations are thread-safe, retrieval operations do not entail locking, and there is no support for locking an entire map in a way to prevent all access. Retrievals reflect the results of the most recently completed update operations holding upon their onset.

The type parameters are K = type of the key and V = type of the value.

func GetNamedMap

func GetNamedMap[K comparable, V any](session *Session, cacheName string, options ...func(session *CacheOptions)) (NamedMap[K, V], error)

GetNamedMap returns a NamedMap from a session. If there is an existing NamedMap defined with the same type parameters and name it will be returned, otherwise a new instance will be returned. An error will be returned if there already exists a NamedMap with the same name and different type parameters.

// connect to the default address localhost:1408
session, err = coherence.NewSession(ctx)
if err != nil {
    log.Fatal(err)
}

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

type NamedMapClient

type NamedMapClient[K comparable, V any] struct {
	NamedMap[K, V]
	// contains filtered or unexported fields
}

NamedMapClient is the implementation of the NamedMap interface. The type parameters are K = type of the key and V = type of the value.

func (*NamedMapClient[K, V]) AddFilterListener

func (nm *NamedMapClient[K, V]) AddFilterListener(ctx context.Context, listener MapListener[K, V], filter filters.Filter) error

AddFilterListener adds a MapListener that will receive events (inserts, updates, deletes) that occur against the NamedMap where entries satisfy the specified filters.Filter, with the key, and optionally, the old-value and new-value included.

func (*NamedMapClient[K, V]) AddFilterListenerLite

func (nm *NamedMapClient[K, V]) AddFilterListenerLite(ctx context.Context, listener MapListener[K, V], filter filters.Filter) error

AddFilterListenerLite adds a MapListener that will receive events (inserts, updates, deletes) that occur against the NamedMap where entries satisfy the specified filters.Filter, with the key, the old-value and new-value included.

func (*NamedMapClient[K, V]) AddKeyListener

func (nm *NamedMapClient[K, V]) AddKeyListener(ctx context.Context, listener MapListener[K, V], key K) error

AddKeyListener adds a MapListener that will receive events (inserts, updates, deletes) that occur against the specified key within the NamedMap, with the key, old-value and new-value included.

func (*NamedMapClient[K, V]) AddKeyListenerLite

func (nm *NamedMapClient[K, V]) AddKeyListenerLite(ctx context.Context, listener MapListener[K, V], key K) error

AddKeyListenerLite adds a MapListener that will receive events (inserts, updates, deletes) that occur against the specified key within the NamedMap, with the key, and optionally, the old-value and new-value included.

func (*NamedMapClient[K, V]) AddLifecycleListener

func (nm *NamedMapClient[K, V]) AddLifecycleListener(listener MapLifecycleListener[K, V])

AddLifecycleListener adds a MapLifecycleListener that will receive events (truncated or released) that occur against the NamedMap.

func (*NamedMapClient[K, V]) AddListener

func (nm *NamedMapClient[K, V]) AddListener(ctx context.Context, listener MapListener[K, V]) error

AddListener adds a MapListener that will receive events (inserts, updates, deletes) that occur against the map, with the key, old-value and new-value included. This call is equivalent to calling [AddFilterListener] with filters.Always as the filter.

func (*NamedMapClient[K, V]) AddListenerLite

func (nm *NamedMapClient[K, V]) AddListenerLite(ctx context.Context, listener MapListener[K, V]) error

AddListenerLite adds a MapListener that will receive events (inserts, updates, deletes) that occur against the map, with the key, and optionally, the old-value and new-value included. This call is equivalent to calling [AddFilterListenerLite] with filters.Always as the filter.

func (*NamedMapClient[K, V]) Clear

func (nm *NamedMapClient[K, V]) Clear(ctx context.Context) error

Clear removes all mappings from the NamedMap. This operation is observable and will trigger any registered events.

func (*NamedMapClient[K, V]) ContainsEntry

func (nm *NamedMapClient[K, V]) ContainsEntry(ctx context.Context, key K, value V) (bool, error)

ContainsEntry returns true if the NamedMap contains a mapping for the specified key and value.

The example below shows how to check if a NamedMap contains a mapping for the key 1 and person.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}
person := Person{ID: 1, Name: "Tim"}

if found, err = namedMap.ContainsEntry(ctx, person.ID, person); err != nil {
   log.Fatal(err)
}

func (*NamedMapClient[K, V]) ContainsKey

func (nm *NamedMapClient[K, V]) ContainsKey(ctx context.Context, key K) (bool, error)

ContainsKey returns true if the NamedMap contains a mapping for the specified key.

The example below shows how to check if a NamedMap contains a mapping for the key 1.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

if found, err = namedMap.ContainsKey(ctx, 1); err != nil {
   log.Fatal(err)
}

func (*NamedMapClient[K, V]) ContainsValue

func (nm *NamedMapClient[K, V]) ContainsValue(ctx context.Context, value V) (bool, error)

ContainsValue returns true if the NamedMap contains a mapping for the specified value.

The example below shows how to check if a NamedMap contains a mapping for the person.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}
person := Person{ID: 1, Name: "Tim"}

if found, err = namedMap.ContainsValue(ctx, person); err != nil {
   log.Fatal(err)
}

func (*NamedMapClient[K, V]) Destroy

func (nm *NamedMapClient[K, V]) Destroy(ctx context.Context) error

Destroy releases and destroys this instance of NamedMap. Warning This method is used to completely destroy the specified NamedMap across the cluster. All references in the entire cluster to this cache will be invalidated, the data will be cleared, and all internal resources will be released. Note: the removal of entries caused by this operation will not be observable.

func (*NamedMapClient[K, V]) EntrySet

func (nm *NamedMapClient[K, V]) EntrySet(ctx context.Context) <-chan *StreamedEntry[K, V]

EntrySet returns a channel from which all entries can be obtained.

Note: the entries are paged internally to avoid excessive memory usage, but you need to be careful when running this operation against [NamedMap]s with large number of entries.

The example below shows how to iterate the entries in a NamedMap.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedMap.EntrySet(ctx)
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key, "Value:", result.Value)
    }
}

func (*NamedMapClient[K, V]) EntrySetFilter

func (nm *NamedMapClient[K, V]) EntrySetFilter(ctx context.Context, fltr filters.Filter) <-chan *StreamedEntry[K, V]

EntrySetFilter returns a channel from which entries satisfying the specified filter can be obtained. Each entry in the channel is of type *StreamedEntry which wraps an error and the result. As always, the result must be accessed (and will be valid) only if the error is nil.

The example below shows how to iterate the entries in a NamedMap where the age > 20.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedMap.EntrySetFilter(ctx, filters.GreaterEqual(extractors.Extract[int]("age"), 20))
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key, "Value:", result.Value)
    }
}

func (*NamedMapClient[K, V]) Get

func (nm *NamedMapClient[K, V]) Get(ctx context.Context, key K) (*V, error)

Get returns the value to which the specified key is mapped. V will be nil if the NamedMap contains no mapping for the key.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

person, err = namedMap.Get(1)
if err != nil {
    log.Fatal(err)
}
if person != nil {
    fmt.Println("Person is", *value)
} else {
    fmt.Println("No person found")
}

func (*NamedMapClient[K, V]) GetAll

func (nm *NamedMapClient[K, V]) GetAll(ctx context.Context, keys []K) <-chan *StreamedEntry[K, V]

GetAll returns a channel from which entries satisfying the specified filter can be obtained. Each entry in the channel is of type *StreamedEntry which wraps an error and the result. As always, the result must be accessed (and will be valid) only if the error is nil.

The example below shows how to get all the entries for keys 1, 3 and 4.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedMap.GetAll(ctx, []int{1, 3, 4})
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key, "Value:", result.Value)
    }
}

func (*NamedMapClient[K, V]) GetCacheName added in v1.1.1

func (nm *NamedMapClient[K, V]) GetCacheName() string

GetCacheName returns the cache name of the NamedMap.

func (*NamedMapClient[K, V]) GetNearCacheStats added in v1.1.0

func (nm *NamedMapClient[K, V]) GetNearCacheStats() CacheStats

GetNearCacheStats returns the CacheStats for a near cache for a NamedMap. If no near cache is defined, nil is returned.

func (*NamedMapClient[K, V]) GetOrDefault

func (nm *NamedMapClient[K, V]) GetOrDefault(ctx context.Context, key K, def V) (*V, error)

GetOrDefault will return the value mapped to the specified key, or if there is no mapping, it will return the specified default.

func (*NamedMapClient[K, V]) GetSession

func (nm *NamedMapClient[K, V]) GetSession() *Session

GetSession returns the session.

func (*NamedMapClient[K, V]) IsEmpty

func (nm *NamedMapClient[K, V]) IsEmpty(ctx context.Context) (bool, error)

IsEmpty returns true if the NamedMap contains no mappings.

func (*NamedMapClient[K, V]) IsReady added in v1.0.1

func (nm *NamedMapClient[K, V]) IsReady(ctx context.Context) (bool, error)

IsReady returns whether this NamedMap is ready to be used. An example of when this method would return false would be where a partitioned cache service that owns this cache has no storage-enabled members. If it is not supported by the gRPC proxy, an error will be returned.

func (*NamedMapClient[K, V]) KeySet

func (nm *NamedMapClient[K, V]) KeySet(ctx context.Context) <-chan *StreamedKey[K]

KeySet returns a channel from which keys of all entries can be obtained.

Note: the entries are paged internally to avoid excessive memory usage, but you need to be careful when running this operation against [NamedMap]s with large number of entries.

The example below shows how to iterate the keys in a NamedMap.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedMap.KeySet(ctx)
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key)
    }
}

func (*NamedMapClient[K, V]) KeySetFilter

func (nm *NamedMapClient[K, V]) KeySetFilter(ctx context.Context, fltr filters.Filter) <-chan *StreamedKey[K]

KeySetFilter returns a channel from which keys of the entries that satisfy the filter can be obtained. Each entry in the channel is of type *StreamEntry which wraps an error and the key. As always, the result must be accessed (and will be valid) only if the error is nil.

The example below shows how to iterate the keys in a NamedMap where the age > 20.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedMap.KeySetFilter(ctx, filters.GreaterEqual(extractors.Extract[int]("age"), 20))
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Key:", result.Key)
    }
}

func (*NamedMapClient[K, V]) Name

func (nm *NamedMapClient[K, V]) Name() string

Name returns the name of the NamedMap.

func (*NamedMapClient[K, V]) Put

func (nm *NamedMapClient[K, V]) Put(ctx context.Context, key K, value V) (*V, error)

Put associates the specified value with the specified key returning the previously mapped value, if any. V will be nil if there was no previous value.

func (*NamedMapClient[K, V]) PutAll

func (nm *NamedMapClient[K, V]) PutAll(ctx context.Context, entries map[K]V) error

PutAll copies all the mappings from the specified map to the NamedMap. This is the most efficient way to add multiple entries into a NamedMap as it is carried out in parallel and no previous values are returned.

var peopleData = map[int]Person{
    1: {ID: 1, Name: "Tim", Age: 21},
    2: {ID: 2, Name: "Andrew", Age: 44},
    3: {ID: 3, Name: "Helen", Age: 20},
    4: {ID: 4, Name: "Alexa", Age: 12},
}

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

if err = namedMap.PutAll(ctx, peopleData); err != nil {
    log.Fatal(err)
}

func (*NamedMapClient[K, V]) PutIfAbsent

func (nm *NamedMapClient[K, V]) PutIfAbsent(ctx context.Context, key K, value V) (*V, error)

PutIfAbsent adds the specified mapping if the key is not already associated with a value in the NamedMap and returns nil, else returns the current value.

func (*NamedMapClient[K, V]) Release

func (nm *NamedMapClient[K, V]) Release()

Release releases the instance of NamedMap. This operation does not affect the contents of the NamedMap, but only releases the client resources. To access the NamedMap, you must get a new instance.

func (*NamedMapClient[K, V]) Remove

func (nm *NamedMapClient[K, V]) Remove(ctx context.Context, key K) (*V, error)

Remove removes the mapping for a key from the NamedMap if it is present and returns the previous value or nil if there wasn't one.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

oldValue, err = namedMap.Remove(ctx, 1)
if err != nil {
    log.Fatal(err)
}

if oldValue == nil {
    fmt.Println("No previous person was found")
} else {
    fmt.Println("Previous person was", *oldValue)
}

func (*NamedMapClient[K, V]) RemoveFilterListener

func (nm *NamedMapClient[K, V]) RemoveFilterListener(ctx context.Context, listener MapListener[K, V], f filters.Filter) error

RemoveFilterListener removes the listener that was previously registered to receive events where entries satisfy the specified filters.Filter.

func (*NamedMapClient[K, V]) RemoveKeyListener

func (nm *NamedMapClient[K, V]) RemoveKeyListener(ctx context.Context, listener MapListener[K, V], key K) error

RemoveKeyListener removes the listener that was previously registered to receive events against the specified key.

func (*NamedMapClient[K, V]) RemoveLifecycleListener

func (nm *NamedMapClient[K, V]) RemoveLifecycleListener(listener MapLifecycleListener[K, V])

RemoveLifecycleListener removes the lifecycle listener that was previously registered to receive events.

func (*NamedMapClient[K, V]) RemoveListener

func (nm *NamedMapClient[K, V]) RemoveListener(ctx context.Context, listener MapListener[K, V]) error

RemoveListener removes the listener that was previously registered to receive events.

func (*NamedMapClient[K, V]) RemoveMapping

func (nm *NamedMapClient[K, V]) RemoveMapping(ctx context.Context, key K, value V) (bool, error)

RemoveMapping removes the entry for the specified key only if it is currently mapped to the specified value.

func (*NamedMapClient[K, V]) Replace

func (nm *NamedMapClient[K, V]) Replace(ctx context.Context, key K, value V) (*V, error)

Replace replaces the entry for the specified key only if it is currently mapped to some value.

func (*NamedMapClient[K, V]) ReplaceMapping

func (nm *NamedMapClient[K, V]) ReplaceMapping(ctx context.Context, key K, prevValue V, newValue V) (bool, error)

ReplaceMapping replaces the entry for the specified key only if it is currently mapped to some value. Returns true if the value was replaced.

func (*NamedMapClient[K, V]) Size

func (nm *NamedMapClient[K, V]) Size(ctx context.Context) (int, error)

Size returns the number of mappings contained within the NamedMap.

func (*NamedMapClient[K, V]) String

func (nm *NamedMapClient[K, V]) String() string

String returns a string representation of a NamedMapClient.

func (*NamedMapClient[K, V]) Truncate

func (nm *NamedMapClient[K, V]) Truncate(ctx context.Context) error

Truncate removes all mappings from the NamedMap. Note: the removal of entries caused by this truncate operation will not be observable.

func (*NamedMapClient[K, V]) Values

func (nm *NamedMapClient[K, V]) Values(ctx context.Context) <-chan *StreamedValue[V]

Values returns a view of all values contained in the NamedMap.

Note: the entries are paged internally to avoid excessive memory usage, but you need to be careful when running this operation against [NamedMap]s with large number of entries.

The example below shows how to iterate the values in a NamedMap.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedMap.Values(ctx)
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Value:", result.Value)
    }
}

func (*NamedMapClient[K, V]) ValuesFilter

func (nm *NamedMapClient[K, V]) ValuesFilter(ctx context.Context, fltr filters.Filter) <-chan *StreamedValue[V]

ValuesFilter returns a view of filtered values contained in the NamedMap. The returned channel will be asynchronously filled with values in the NamedMap that satisfy the filter.

The example below shows how to iterate the values in a NamedMap where the age > 20.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
    log.Fatal(err)
}

ch := namedMap.ValuesFilter(ctx, filters.GreaterEqual(extractors.Extract[int]("age"), 20))
for result := range ch {
    if result.Err != nil {
        // process, handle the error
    } else {
        fmt.Println("Value:", result.Value)
    }
}

type NearCacheOptions added in v1.1.0

type NearCacheOptions struct {
	TTL                  time.Duration
	HighUnits            int64
	HighUnitsMemory      int64
	InvalidationStrategy InvalidationStrategyType // currently only supports ListenAll
}

NearCacheOptions defines options when creating a near cache.

func (NearCacheOptions) String added in v1.1.0

func (n NearCacheOptions) String() string

type Serializer

type Serializer[T any] interface {
	Serialize(object T) ([]byte, error)
	Deserialize(data []byte) (*T, error)
	Format() string
}

Serializer defines how to serialize/ de-serialize objects.

func NewSerializer

func NewSerializer[T any](format string) Serializer[T]

NewSerializer returns a new Serializer based upon the format and the type.

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session provides APIs to create NamedCaches. The NewSession method creates a new instance of a Session. This method also takes a variable number of arguments, called options, that can be passed to configure the Session.

func NewSession

func NewSession(ctx context.Context, options ...func(session *SessionOptions)) (*Session, error)

NewSession creates a new Session with the specified sessionOptions.

Example 1: Create a Session that will eventually connect to host "localhost" and gRPC port: 1408 using an insecure connection.

ctx := context.Background()
session, err = coherence.NewSession(ctx, coherence.WithPlainText())
if err != nil {
    log.Fatal(err)
}

Example 2: Create a Session that will eventually connect to host "acme.com" and gRPC port: 1408

session, err := coherence.NewSession(ctx, coherence.WithAddress("acme.com:1408"), coherence.WithPlainText())

You can also set the environment variable COHERENCE_SERVER_ADDRESS to specify the address.

Example 3: Create a Session that will eventually connect to default "localhost:1408" using a secured connection

session, err := coherence.NewSession(ctx)

A Session can also be configured using environment variable COHERENCE_SERVER_ADDRESS. See gRPC Naming for information on values for this.

To Configure SSL, you must first enable SSL on the gRPC Proxy, see gRPC Proxy Server for details.

There are a number of ways to set the TLS options when creating a session. You can use WithTLSConfig to specify a custom tls.Config or specify the client certificate, key and trust certificate using additional session options or using environment variables. See below for more details.

myTlSConfig = &tls.Config{....}
session, err := coherence.NewSession(ctx, coherence.WithTLSConfig(myTLSConfig))

You can also use the following to set the required TLS options when creating a session:

session, err := coherence.NewSession(ctx, coherence.WithTLSClientCert("/path/to/client/certificate"),
                                          coherence.WithTLSClientKey("/path/path/to/client/key"),
                                          coherence.WithTLSCertsPath("/path/to/cert/to/be/added/for/trust"))

You can also use coherence.WithIgnoreInvalidCerts() to ignore self-signed certificates for testing only, not to be used in production.

The following environment variables can also be set for the client, and will override any options.

export COHERENCE_TLS_CLIENT_CERT=/path/to/client/certificate
export COHERENCE_TLS_CLIENT_KEY=/path/path/to/client/key
export COHERENCE_TLS_CERTS_PATH=/path/to/cert/to/be/added/for/trust
export COHERENCE_IGNORE_INVALID_CERTS=true    // option to ignore self-signed certificates for testing only, not to be used in production

Finally, the Close() method can be used to close the Session. Once a Session is closed, no APIs on the NamedMap instances should be invoked. If invoked they will return an error.

func (*Session) AddSessionLifecycleListener

func (s *Session) AddSessionLifecycleListener(listener SessionLifecycleListener)

AddSessionLifecycleListener adds a SessionLifecycleListener that will receive events (connected, closed, disconnected or reconnected) that occur against the session.

func (*Session) Close

func (s *Session) Close()

Close closes a connection.

func (*Session) GetDisconnectTimeout

func (s *Session) GetDisconnectTimeout() time.Duration

GetDisconnectTimeout returns the session disconnect timeout in millis.

func (*Session) GetOptions

func (s *Session) GetOptions() *SessionOptions

GetOptions returns the options that were passed during this session creation.

func (*Session) GetReadyTimeout

func (s *Session) GetReadyTimeout() time.Duration

GetReadyTimeout returns the session disconnect timeout in millis.

func (*Session) GetRequestTimeout

func (s *Session) GetRequestTimeout() time.Duration

GetRequestTimeout returns the session timeout in millis.

func (*Session) ID

func (s *Session) ID() string

ID returns the identifier of a session.

func (*Session) IsClosed

func (s *Session) IsClosed() bool

IsClosed returns true if the Session is closed. Returns false otherwise.

func (*Session) RemoveSessionLifecycleListener

func (s *Session) RemoveSessionLifecycleListener(listener SessionLifecycleListener)

RemoveSessionLifecycleListener removes SessionLifecycleListener for a session.

func (*Session) String

func (s *Session) String() string

type SessionLifecycleEvent

type SessionLifecycleEvent interface {
	Type() SessionLifecycleEventType
	Source() *Session
}

SessionLifecycleEvent defines a session lifecycle event

type SessionLifecycleEventType

type SessionLifecycleEventType string

SessionLifecycleEventType describes an event that may be raised during the lifecycle of a Session.

type SessionLifecycleListener

type SessionLifecycleListener interface {
	OnAny(callback func(SessionLifecycleEvent)) SessionLifecycleListener
	OnConnected(callback func(SessionLifecycleEvent)) SessionLifecycleListener
	OnClosed(callback func(SessionLifecycleEvent)) SessionLifecycleListener
	OnDisconnected(callback func(SessionLifecycleEvent)) SessionLifecycleListener
	OnReconnected(callback func(SessionLifecycleEvent)) SessionLifecycleListener
	// contains filtered or unexported methods
}

func NewSessionLifecycleListener

func NewSessionLifecycleListener() SessionLifecycleListener

NewSessionLifecycleListener creates and returns a pointer to a new SessionLifecycleListener instance.

type SessionOptions

type SessionOptions struct {
	Address            string
	Scope              string
	Format             string
	ClientCertPath     string
	ClientKeyPath      string
	CaCertPath         string
	PlainText          bool
	IgnoreInvalidCerts bool
	RequestTimeout     time.Duration
	DisconnectTimeout  time.Duration
	ReadyTimeout       time.Duration
	TlSConfig          *tls.Config
}

SessionOptions holds the session attributes like host, port, tls attributes etc.

func (*SessionOptions) IsPlainText

func (s *SessionOptions) IsPlainText() bool

IsPlainText returns true if plain text, e.g. Non TLS. Returns false otherwise.

func (*SessionOptions) String

func (s *SessionOptions) String() string

String returns a string representation of SessionOptions.

type StreamedEntry

type StreamedEntry[K comparable, V any] struct {
	// Err contains the error (if any) while obtaining the value.
	Err error
	// Key contains the key of the entry.
	Key K
	// Value contains the value of the entry.
	Value V
}

StreamedEntry is wrapper object that wraps an error and a Key and a Value . As always, the Err object must be checked for errors before accessing the Key or the Value fields.

type StreamedKey

type StreamedKey[K comparable] struct {
	// Err contains the error (if any) while obtaining the key.
	Err error
	// Key contains the key of the entry.
	Key K
}

StreamedKey is wrapper object that wraps an error and a key. The Err object must be checked for errors before accessing the Key field.

type StreamedValue

type StreamedValue[V any] struct {
	// Err contains the error (if any) while obtaining the value.
	Err error
	// Value contains the value of the entry.
	Value V
}

StreamedValue is wrapper object that wraps an error and a value. The Err object must be checked for errors before accessing the Value field.

Directories

Path Synopsis
Package aggregators provides various aggregator functions and types.
Package aggregators provides various aggregator functions and types.
Package discovery provides an implementation of Coherence NSLookup.
Package discovery provides an implementation of Coherence NSLookup.
Package extractors provides various extractor functions and types.
Package extractors provides various extractor functions and types.
Package filters provides various filter functions and types.
Package filters provides various filter functions and types.
Package processors provides various entry processor functions and types.
Package processors provides various entry processor functions and types.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL