opa: github.com/open-policy-agent/opa/watch Index | Examples | Files

package watch

import "github.com/open-policy-agent/opa/watch"

Package watch provides the ability to set a watch on a Rego query. A watch will monitor the query and determine when any of it's dependencies change, notifying the client of the new results of the query evaluation whenever this occurs.

Index

Examples

Package Files

doc.go watch.go

type Event Uses

type Event struct {
    Query string         `json:"query"`
    Value rego.ResultSet `json:"value"`
    Error error          `json:"error,omitempty"`

    Metrics metrics.Metrics      `json:"-"`
    Tracer  topdown.BufferTracer `json:"-"`
}

Event represents a change to a query. Query is the query in question and Value is the JSON encoded results of the new query evaluation. Error will be populated if evaluating the new query results encountered an error for any reason. If Error is not nil, the contents of Value are undefined.

Metrics and Tracer represent the metrics and trace from the evaluation of the query.

type Handle Uses

type Handle struct {
    C <-chan Event
    // contains filtered or unexported fields
}

Handle allows a user to listen to and end a watch on a query.

func (*Handle) Start Uses

func (h *Handle) Start() error

Start registers and starts the watch.

func (*Handle) Stop Uses

func (h *Handle) Stop()

Stop ends the watch on the query associated with the Handle. It will close the channel that was delivering notifications through the Handle. This may happen before or after Stop returns.

func (*Handle) WithInstrumentation Uses

func (h *Handle) WithInstrumentation(yes bool) *Handle

WithInstrumentation enables instrumentation on the query to diagnose performance issues.

func (*Handle) WithRuntime Uses

func (h *Handle) WithRuntime(term *ast.Term) *Handle

WithRuntime sets the runtime data to provide to the evaluation engine.

type Watcher Uses

type Watcher struct {
    // contains filtered or unexported fields
}

Watcher allows for watches to be registered on queries.

func New Uses

func New(ctx context.Context, s storage.Store, c *ast.Compiler, txn storage.Transaction) (w *Watcher, err error)

New creates and returns a new Watcher on the store using the compiler provided. Once a compiler is provided to create a Watcher, it must not be modified, or else the results produced by the Watcher are undefined.

func (*Watcher) Close Uses

func (w *Watcher) Close(txn storage.Transaction)

Close ends the watches on all queries this Watcher has.

Further attempts to register or end watches will result in an error after Close() is called.

func (*Watcher) Migrate Uses

func (w *Watcher) Migrate(c *ast.Compiler, txn storage.Transaction) (*Watcher, error)

Migrate creates a new Watcher with the same watches as w, but using the new compiler. Like when creating a Watcher with New, the provided compiler must not be modified after being passed to Migrate, or else behavior is undefined.

After Migrate returns, the old watcher will be closed, and the new will be ready for use. All Handles from the old watcher will still be active, via the returned Watcher, with the exception of those Handles who's query is no longer valid with the new compiler. Such Handles will be shutdown and a final Event sent along their channel indicating the cause of the error.

If an error occurs creating the new Watcher, the state of the old Watcher will not be changed.

Code:

ctx := context.Background()
store := inmem.NewFromObject(loadSmallTestData())

// This example syncs the reader and writing to make the output deterministic.
var notifyAlert chan struct{}
done := make(chan struct{})
gotNotification1 := make(chan struct{})
gotNotification2 := make(chan struct{})

txn, err := store.NewTransaction(ctx, storage.WriteParams)
if err != nil {
    // Handle error
}

// Create a new Watcher that uses the given store and compiler to monitor
// queries. The watcher must be creating inside a transaction so that it can
// properly hook into the store.
w, err := watch.New(ctx, store, ast.NewCompiler(), txn)
if err != nil {
    // Handle error
}

if err := store.Commit(ctx, txn); err != nil {
    // Handle error
}

handle1, err := w.Query("x = data.y")
if err != nil {
    // Handle error
}

go func() {
    for e := range handle1.C {
        value := fmt.Sprint(e.Value)
        if len(e.Value) > 0 {
            value = fmt.Sprint(e.Value[0].Bindings)
        }
        fmt.Printf("%s: %s (%v)\n", e.Query, value, e.Error)

        if notifyAlert != nil {
            notifyAlert <- struct{}{}
        }
        gotNotification1 <- struct{}{}
    }
    done <- struct{}{}
}()

// One notification will be sent on watch creation with the initial query
// value. It will be empty since the document we are watching is not yet defined.
<-gotNotification1

mod, err := ast.ParseModule("example", "package y\nr = s { s = data.a }")
if err != nil {
    // Handle error
}

compiler := ast.NewCompiler()
if compiler.Compile(map[string]*ast.Module{"example": mod}); compiler.Failed() {
    // Handle error
}

if txn, err = store.NewTransaction(ctx, storage.WriteParams); err != nil {
    // Handle error
}

// The handle from before will still be active after we migrate to the
// new compiler. Changes to data.a will now cause notifications since data.y now
// exists.
m, err := w.Migrate(compiler, txn)
if err != nil {
    // Handle error
}

if err := store.Commit(ctx, txn); err != nil {
    // Handle error
}

// After migrating, all existing watches will get a notification, as if they had
// just started.
<-gotNotification1

// The old watcher will be closed. Watches can no longer be registered on it.
_, err = w.Query("foo")
fmt.Println(err)

handle2, err := m.Query("y = data.a")
if err != nil {
    // Handle error
}

go func() {
    for e := range handle2.C {
        if notifyAlert != nil {
            <-notifyAlert
        } else {
            notifyAlert = make(chan struct{})
        }

        value := fmt.Sprint(e.Value)
        if len(e.Value) > 0 {
            value = fmt.Sprint(e.Value[0].Bindings)
        }
        fmt.Printf("%s: %s (%v)\n", e.Query, value, e.Error)
        gotNotification2 <- struct{}{}
    }
    done <- struct{}{}
}()
<-gotNotification2

for i := 0; i < 4; i++ {
    path, _ := storage.ParsePath(fmt.Sprintf("/a/%d", i))
    if err := storage.WriteOne(ctx, store, storage.ReplaceOp, path, json.Number(fmt.Sprint(i))); err != nil {
        // Handle error
    }

    <-gotNotification1
    <-gotNotification2
}

// Ending the queries will close the notification channels.
handle1.Stop()
handle2.Stop()
<-done
<-done

Output:

x = data.y: [] (<nil>)
x = data.y: map[x:map[r:[1 2 3 4]]] (<nil>)
cannot start query watch with closed Watcher
y = data.a: map[y:[1 2 3 4]] (<nil>)
x = data.y: map[x:map[r:[0 2 3 4]]] (<nil>)
y = data.a: map[y:[0 2 3 4]] (<nil>)
x = data.y: map[x:map[r:[0 1 3 4]]] (<nil>)
y = data.a: map[y:[0 1 3 4]] (<nil>)
x = data.y: map[x:map[r:[0 1 2 4]]] (<nil>)
y = data.a: map[y:[0 1 2 4]] (<nil>)
x = data.y: map[x:map[r:[0 1 2 3]]] (<nil>)
y = data.a: map[y:[0 1 2 3]] (<nil>)

func (*Watcher) NewQuery Uses

func (w *Watcher) NewQuery(query string) *Handle

NewQuery returns a new watch Handle that can be run. Callers must invoke the Run function on the handle to start the watch.

func (*Watcher) Query Uses

func (w *Watcher) Query(query string) (*Handle, error)

Query registers a watch on the provided Rego query. Whenever changes are made to a base or virtual document that the query depends on, an Event describing the new result of the query will be sent through the Handle.

Query will return an error if registering the watch fails for any reason.

Code:

ctx := context.Background()
store := inmem.NewFromObject(loadSmallTestData())

// This example syncs the reader and writing to make the output deterministic.
done := make(chan struct{})
gotNotification := make(chan struct{})

mod, err := ast.ParseModule("example", "package y\nr = s { s = data.a }")
if err != nil {
    // Handle error
}

compiler := ast.NewCompiler()
if compiler.Compile(map[string]*ast.Module{"example": mod}); compiler.Failed() {
    // Handle error
}

txn, err := store.NewTransaction(ctx, storage.WriteParams)
if err != nil {
    // Handle error
}

// Create a new Watcher that uses the given store and compiler to monitor
// queries. The watcher must be creating inside a transaction so that it can
// properly hook into the store.
w, err := watch.New(ctx, store, compiler, txn)
if err != nil {
    // Handle error
}
if err := store.Commit(ctx, txn); err != nil {
    // Handle error
}

// Create a new watch on the query. Whenever its result changes, the result of
// the change will be sent to `handle.C`.
handle, err := w.Query("x = data.y")
if err != nil {
    // Handle error
}

go func() {
    for e := range handle.C {
        fmt.Printf("%s: %v (%v)\n", e.Query, e.Value[0].Bindings, e.Error)

        gotNotification <- struct{}{}
    }
    close(done)
}()
<-gotNotification // One notification will be sent on watch creation with the initial query value.

for i := 0; i < 4; i++ {
    path, _ := storage.ParsePath(fmt.Sprintf("/a/%d", i))
    if err := storage.WriteOne(ctx, store, storage.ReplaceOp, path, json.Number(fmt.Sprint(i))); err != nil {
        // Handle error
    }

    <-gotNotification
}

// Ending the query will close `handle.C`.
handle.Stop()
<-done

Output:

x = data.y: map[x:map[r:[1 2 3 4]]] (<nil>)
x = data.y: map[x:map[r:[0 2 3 4]]] (<nil>)
x = data.y: map[x:map[r:[0 1 3 4]]] (<nil>)
x = data.y: map[x:map[r:[0 1 2 4]]] (<nil>)
x = data.y: map[x:map[r:[0 1 2 3]]] (<nil>)

Package watch imports 10 packages (graph) and is imported by 6 packages. Updated 2018-10-28. Refresh now. Tools for package owners.