word_count

package
v0.89.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 2, 2021 License: MIT Imports: 26 Imported by: 0

Documentation

Overview

Package word_count is an example application which provides a gRPC API for publishing texts and querying running counts of NGrams extracted from previously published texts.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidLengthWordCount        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowWordCount          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroupWordCount = fmt.Errorf("proto: unexpected end of group")
)

Functions

func RegisterNGramServer

func RegisterNGramServer(s *grpc.Server, srv NGramServer)

Types

type Counter

type Counter struct {
	// contains filtered or unexported fields
}

Counter consumes NGramCount messages and aggregates total counts of each NGram. It also provides gRPC APIs for publishing text and querying NGram counts. It implements the following interfaces: - runconsumer.Application - NGramServer (generated gRPC service stub).

func (Counter) ConsumeMessage

func (Counter) ConsumeMessage(_ consumer.Shard, store consumer.Store, env message.Envelope, _ *message.Publisher) error

ConsumeMessage folds an NGramCount into its respective running NGram count. Implements consumer.Application.

func (Counter) FinalizeTxn

func (Counter) FinalizeTxn(_ consumer.Shard, store consumer.Store, _ *message.Publisher) error

FinalizeTxn marshals in-memory NGram counts to the |store|, ensuring persistence across consumer transactions. Implements consumer.Application.

func (*Counter) InitApplication

func (counter *Counter) InitApplication(args runconsumer.InitArgs) error

InitApplication initializes the application to serve the NGram gRPC service.

func (Counter) NewConfig

func (Counter) NewConfig() runconsumer.Config

NewConfig returns a new configuration instance.

func (Counter) NewMessage

func (Counter) NewMessage(*pb.JournalSpec) (message.Message, error)

NewMessage returns an NGramCount message. Implements consumer.Application.

func (Counter) NewStore

func (Counter) NewStore(shard consumer.Shard, rec *recoverylog.Recorder) (consumer.Store, error)

NewStore builds a RocksDB store for the Shard. Implements consumer.Application.

func (*Counter) Publish

func (counter *Counter) Publish(ctx context.Context, req *PublishRequest) (*PublishResponse, error)

Publish extracts NGrams of the configured length from the PublishRequest, and publishes an NGramCount message for each. It returns after all published messages have committed to their respective journals.

func (*Counter) Query

func (counter *Counter) Query(ctx context.Context, req *QueryRequest) (resp *QueryResponse, err error)

Query a count for an NGram count (or counts for a prefix thereof). If the requested or imputed Shard does not resolve locally, Query will proxy the request to the responsible process.

type NGram

type NGram string

NGram is a string of N space-delimited tokens, where N is fixed.

type NGramClient

type NGramClient interface {
	// Publish text to the word-count example. The published text is tokenized
	// into NGrams, indexed, and aggregated into total NGram counts.
	Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishResponse, error)
	// Query for a specific NGram, or NGram prefixes.
	Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryResponse, error)
}

NGramClient is the client API for NGram service.

For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.

func NewNGramClient

func NewNGramClient(cc *grpc.ClientConn) NGramClient

type NGramCount

type NGramCount struct {
	Uuid                 []byte   `protobuf:"bytes,1,opt,name=uuid,proto3" json:"uuid,omitempty"`
	NGram                NGram    `protobuf:"bytes,2,opt,name=n_gram,json=nGram,proto3,casttype=NGram" json:"n_gram,omitempty"`
	Count                uint64   `protobuf:"varint,3,opt,name=count,proto3" json:"count,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*NGramCount) Descriptor

func (*NGramCount) Descriptor() ([]byte, []int)

func (*NGramCount) GetCount

func (m *NGramCount) GetCount() uint64

func (*NGramCount) GetNGram

func (m *NGramCount) GetNGram() NGram

func (*NGramCount) GetUUID added in v0.83.1

func (c *NGramCount) GetUUID() (uuid message.UUID)

func (*NGramCount) GetUuid added in v0.83.1

func (m *NGramCount) GetUuid() []byte

func (*NGramCount) Marshal

func (m *NGramCount) Marshal() (dAtA []byte, err error)

func (*NGramCount) MarshalTo

func (m *NGramCount) MarshalTo(dAtA []byte) (int, error)

func (*NGramCount) MarshalToSizedBuffer added in v0.86.1

func (m *NGramCount) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*NGramCount) NewAcknowledgement added in v0.83.1

func (c *NGramCount) NewAcknowledgement(pb.Journal) message.Message

func (*NGramCount) ProtoMessage

func (*NGramCount) ProtoMessage()

func (*NGramCount) ProtoSize

func (m *NGramCount) ProtoSize() (n int)

func (*NGramCount) Reset

func (m *NGramCount) Reset()

func (*NGramCount) SetUUID added in v0.83.1

func (c *NGramCount) SetUUID(uuid message.UUID)

func (*NGramCount) String

func (m *NGramCount) String() string

func (*NGramCount) Unmarshal

func (m *NGramCount) Unmarshal(dAtA []byte) error

func (*NGramCount) XXX_DiscardUnknown

func (m *NGramCount) XXX_DiscardUnknown()

func (*NGramCount) XXX_Marshal

func (m *NGramCount) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*NGramCount) XXX_Merge

func (m *NGramCount) XXX_Merge(src proto.Message)

func (*NGramCount) XXX_Size

func (m *NGramCount) XXX_Size() int

func (*NGramCount) XXX_Unmarshal

func (m *NGramCount) XXX_Unmarshal(b []byte) error

type NGramServer

type NGramServer interface {
	// Publish text to the word-count example. The published text is tokenized
	// into NGrams, indexed, and aggregated into total NGram counts.
	Publish(context.Context, *PublishRequest) (*PublishResponse, error)
	// Query for a specific NGram, or NGram prefixes.
	Query(context.Context, *QueryRequest) (*QueryResponse, error)
}

NGramServer is the server API for NGram service.

type PublishRequest

type PublishRequest struct {
	Text                 string   `protobuf:"bytes,1,opt,name=text,proto3" json:"text,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*PublishRequest) Descriptor

func (*PublishRequest) Descriptor() ([]byte, []int)

func (*PublishRequest) GetText

func (m *PublishRequest) GetText() string

func (*PublishRequest) Marshal

func (m *PublishRequest) Marshal() (dAtA []byte, err error)

func (*PublishRequest) MarshalTo

func (m *PublishRequest) MarshalTo(dAtA []byte) (int, error)

func (*PublishRequest) MarshalToSizedBuffer added in v0.86.1

func (m *PublishRequest) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*PublishRequest) ProtoMessage

func (*PublishRequest) ProtoMessage()

func (*PublishRequest) ProtoSize

func (m *PublishRequest) ProtoSize() (n int)

func (*PublishRequest) Reset

func (m *PublishRequest) Reset()

func (*PublishRequest) String

func (m *PublishRequest) String() string

func (*PublishRequest) Unmarshal

func (m *PublishRequest) Unmarshal(dAtA []byte) error

func (*PublishRequest) XXX_DiscardUnknown

func (m *PublishRequest) XXX_DiscardUnknown()

func (*PublishRequest) XXX_Marshal

func (m *PublishRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*PublishRequest) XXX_Merge

func (m *PublishRequest) XXX_Merge(src proto.Message)

func (*PublishRequest) XXX_Size

func (m *PublishRequest) XXX_Size() int

func (*PublishRequest) XXX_Unmarshal

func (m *PublishRequest) XXX_Unmarshal(b []byte) error

type PublishResponse

type PublishResponse struct {
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*PublishResponse) Descriptor

func (*PublishResponse) Descriptor() ([]byte, []int)

func (*PublishResponse) Marshal

func (m *PublishResponse) Marshal() (dAtA []byte, err error)

func (*PublishResponse) MarshalTo

func (m *PublishResponse) MarshalTo(dAtA []byte) (int, error)

func (*PublishResponse) MarshalToSizedBuffer added in v0.86.1

func (m *PublishResponse) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*PublishResponse) ProtoMessage

func (*PublishResponse) ProtoMessage()

func (*PublishResponse) ProtoSize

func (m *PublishResponse) ProtoSize() (n int)

func (*PublishResponse) Reset

func (m *PublishResponse) Reset()

func (*PublishResponse) String

func (m *PublishResponse) String() string

func (*PublishResponse) Unmarshal

func (m *PublishResponse) Unmarshal(dAtA []byte) error

func (*PublishResponse) XXX_DiscardUnknown

func (m *PublishResponse) XXX_DiscardUnknown()

func (*PublishResponse) XXX_Marshal

func (m *PublishResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*PublishResponse) XXX_Merge

func (m *PublishResponse) XXX_Merge(src proto.Message)

func (*PublishResponse) XXX_Size

func (m *PublishResponse) XXX_Size() int

func (*PublishResponse) XXX_Unmarshal

func (m *PublishResponse) XXX_Unmarshal(b []byte) error

type QueryRequest

type QueryRequest struct {
	// Header attached by a proxy-ing peer. Not directly set by clients.
	Header *protocol.Header `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"`
	// NGram prefix to query.
	Prefix NGram `protobuf:"bytes,2,opt,name=prefix,proto3,casttype=NGram" json:"prefix,omitempty"`
	// Shard to query. Optional; if not set, shard is inferred from |prefix|'s current mapping.
	Shard                go_gazette_dev_core_consumer_protocol.ShardID `protobuf:"bytes,3,opt,name=shard,proto3,casttype=go.gazette.dev/core/consumer/protocol.ShardID" json:"shard,omitempty"`
	XXX_NoUnkeyedLiteral struct{}                                      `json:"-"`
	XXX_unrecognized     []byte                                        `json:"-"`
	XXX_sizecache        int32                                         `json:"-"`
}

func (*QueryRequest) Descriptor

func (*QueryRequest) Descriptor() ([]byte, []int)

func (*QueryRequest) GetHeader

func (m *QueryRequest) GetHeader() *protocol.Header

func (*QueryRequest) GetPrefix

func (m *QueryRequest) GetPrefix() NGram

func (*QueryRequest) GetShard

func (*QueryRequest) Marshal

func (m *QueryRequest) Marshal() (dAtA []byte, err error)

func (*QueryRequest) MarshalTo

func (m *QueryRequest) MarshalTo(dAtA []byte) (int, error)

func (*QueryRequest) MarshalToSizedBuffer added in v0.86.1

func (m *QueryRequest) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*QueryRequest) ProtoMessage

func (*QueryRequest) ProtoMessage()

func (*QueryRequest) ProtoSize

func (m *QueryRequest) ProtoSize() (n int)

func (*QueryRequest) Reset

func (m *QueryRequest) Reset()

func (*QueryRequest) String

func (m *QueryRequest) String() string

func (*QueryRequest) Unmarshal

func (m *QueryRequest) Unmarshal(dAtA []byte) error

func (*QueryRequest) XXX_DiscardUnknown

func (m *QueryRequest) XXX_DiscardUnknown()

func (*QueryRequest) XXX_Marshal

func (m *QueryRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*QueryRequest) XXX_Merge

func (m *QueryRequest) XXX_Merge(src proto.Message)

func (*QueryRequest) XXX_Size

func (m *QueryRequest) XXX_Size() int

func (*QueryRequest) XXX_Unmarshal

func (m *QueryRequest) XXX_Unmarshal(b []byte) error

type QueryResponse

type QueryResponse struct {
	Grams                []NGramCount `protobuf:"bytes,1,rep,name=grams,proto3" json:"grams"`
	XXX_NoUnkeyedLiteral struct{}     `json:"-"`
	XXX_unrecognized     []byte       `json:"-"`
	XXX_sizecache        int32        `json:"-"`
}

func (*QueryResponse) Descriptor

func (*QueryResponse) Descriptor() ([]byte, []int)

func (*QueryResponse) GetGrams

func (m *QueryResponse) GetGrams() []NGramCount

func (*QueryResponse) Marshal

func (m *QueryResponse) Marshal() (dAtA []byte, err error)

func (*QueryResponse) MarshalTo

func (m *QueryResponse) MarshalTo(dAtA []byte) (int, error)

func (*QueryResponse) MarshalToSizedBuffer added in v0.86.1

func (m *QueryResponse) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*QueryResponse) ProtoMessage

func (*QueryResponse) ProtoMessage()

func (*QueryResponse) ProtoSize

func (m *QueryResponse) ProtoSize() (n int)

func (*QueryResponse) Reset

func (m *QueryResponse) Reset()

func (*QueryResponse) String

func (m *QueryResponse) String() string

func (*QueryResponse) Unmarshal

func (m *QueryResponse) Unmarshal(dAtA []byte) error

func (*QueryResponse) XXX_DiscardUnknown

func (m *QueryResponse) XXX_DiscardUnknown()

func (*QueryResponse) XXX_Marshal

func (m *QueryResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*QueryResponse) XXX_Merge

func (m *QueryResponse) XXX_Merge(src proto.Message)

func (*QueryResponse) XXX_Size

func (m *QueryResponse) XXX_Size() int

func (*QueryResponse) XXX_Unmarshal

func (m *QueryResponse) XXX_Unmarshal(b []byte) error

type UnimplementedNGramServer added in v0.86.1

type UnimplementedNGramServer struct {
}

UnimplementedNGramServer can be embedded to have forward compatible implementations.

func (*UnimplementedNGramServer) Publish added in v0.86.1

func (*UnimplementedNGramServer) Query added in v0.86.1

Directories

Path Synopsis
Package counter runs the word_count.Counter consumer.
Package counter runs the word_count.Counter consumer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL