cofire

package module
v0.0.0-...-2c1810d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 10, 2018 License: BSD-3-Clause Imports: 7 Imported by: 0

README

Cofire License GoDoc

Cofire is a stream-based collaborative filtering implementation for recommendation engines solely running on Apache Kafka. Leveraging the Goka stream processing library, Cofire continously learns user-product ratings arriving from Kafka event streams and maintains its model up-to-date in Kafka tables. Cofire implements streaming matrix factorization employing stochastic gradient descent (SGD).

We assume you understand the SGD algorithm, so this README focus on how Cofire works on top of Goka. See this blog post for a great introduction on SGD for streams.

Components

Cofire has two types of stream processors:

  • The learner consumes ratings from an input topic and applies SGD to learn the latent features of users and products. Here is where machine learning happens. Learners are stateful and store the latent features in Kafka in a log-compacted topic.
  • The refeeder reemits already learnt ratings into the learner's input topic after a predefined delay. The refeeder effectively implements training iterations. By default, the number of iterations is configured to be 1, so the refeeder is optional.

Besides these processors, two other components are necessary to get the system running:

  • At least one producer that writes the ratings into the learner's input topic.
  • At least one predictor that performs predictions using the learnt model. The predictor does not need to be co-located with the learner, it can simply keep a local view of the model using Goka.

Preparation

Before starting any processors, one has to do the following:

  1. Choose a group name, eg, "cofire-app".
  2. Create topics for the processors (if not auto created):
    • <group>-input as input for the learner, eg, "cofire-app-input"
    • <group>-loop to loopback messages among learner instances, eg, "cofire-app-loop"
    • <group>-table to store the learnt model, eg, "cofire-app-table"
    • <group>-update to overwrite the learner model if desired , eg, "cofire-app-update"
    • <group>-refeed to send ratings from the learner to the refeeder, eg, "cofire-app-refeed"
  3. Ensure all topics have the same number of partitions.
  4. Ensure <group>-table is configured with log compaction.

See the examples directory for detailed examples.

How it works

Learners (as any Goka processor) have processing partitions, which match the number of partitions of the inputs and state. These processing partitions may be distributed over multiple processor instances, ie, multiple program instances running in multiple hosts.

Ratings and other messages are assigned to partitions via the key used when emitting the message.

To simplify the explanation, we refer to keys as if they would be active entities. For example, "we send a message to a key k" means that we send a message using key k and the learner partition responsible for the key k receives that message; also, "a key k processes a message" means the learner partition responsible for key k processes a message.

Producing ratings

A rating is defined as follows.

message Rating {
  string user_id    = 1;
  string product_id = 2;
  double score      = 3;
}

A producer sends ratings to the learner instances via <group>-input topic. The key of each rating message is the user_id.

Learning

Cofire can be configured with these parameters:

type Parameters struct {
  // Rank is the number of latent factors (features).
  Rank int
  // Gamma is the learning step of SGD.
  Gamma float64
  // Lambda is the regularization parameter of SGD.
  Lambda float64
  // Iterations is the number of times the data will be used for training.
  Iterations int
}

Learners have state, which is partitioned and stored in Kafka in the <group>-table topic. Each key has an entry in learner state defined as follows.

message Entry {
  Features u = 1;
  Features p = 2;
}

The algorithm for one rating has 3 steps:

  1. When user_id receives a rating via the input topic, it retrieves the U features and sends (rating,U) to the product_id via the <group>-loop topic.
  2. When product_id receives (rating,U), it retrieves the P features, applies SGD, and sends (rating,P) back to user_id.
  3. When user_id receives (rating,P), it retrieves the U features, applies SGD, and sends the rating to the refeeder via <group>-refeed.

With that the iteration for this rating is finished.

In ASCII-art, one iteration would look like this

    Rating
      |
      v
     USER
      |
      * Entry                     PRODUCT
      |        (Rating, U)           |
      +----------------------------->|
      |                              |
      |                              * Update P
      |        (Rating, P)           |
      |<-----------------------------+
      |
      * Update U
      |
      v
   REFEEDER
Iterating

If the algorithm is configured to run multiple iterations, the refeeder sends the rating back to the user_id to retrain the rating. That happens for the configured number of iterations. Since the stream never ends, the refeeder creates iterations by delaying the <group>-refeed topic by a configurable duration. Note that the retention time configured for the topic has to be longer than the delay duration of the refeeder, otherwise ratings will be lost.

In ASCII-art, the complete flow is as follows. Here we see the three components: producer, learner and refeeder. The producer sends a rating to the learner. The user key receives the rating and sends the rating plus its U vector to the product key in the learner (by using Goka's loopback). The product updates P, sends it back to the user, which updates U and sends the rating to the refeeder. The refeeder sends the rating back to the learner/user-key after a delay, and the number of remaining iterations is decremented. The user receives the rating and the next iteration starts.

    PRODUCER             ,..LEARNER..,               REFEEDER
      |             USER'             'PRODUCT          .
      |   Rating     .                   .              .
      +------------->+                   .              .
      .              |                   .              .
      .              |                   .              .
      .              * Entry             .              .
      .              |                   .              .
      .              |    (Rating, U)    .              .
      .              +------------------>+              .
      .              .                   |              .
      .              .                   |              .
      .              .                   * Update P     .
      .              .                   |              .
      .              .    (Rating, P)    |              .
      .              +<------------------+              .
      .              |                   .              .
      .              |                   .              .
      .              * Update U          .              .
      .              |                   .              .
      .              |      (Rating, #iterations)       .
      .              +--------------------------------->+
      .              .                   .              |
      .              .                   .              |
      .              .                   .              * Delay
      .              .                   .              |
      .              .      (Rating, #iterations--)     |
      .              +<---------------------------------+
      .              |                   .              .
      .              |                   .              .
      .              * Entry             .              .
      .              |                   .              .
      .              |   next iteration  .              .
      .              +------------------>+              .
Predicting

Every update of U or P in a learner produces an update of <group>-table. To perform predictions, one simply creates a Goka view of the <group>-table> and gets the entries for the desired user and product. For example:

view, _ := goka.NewView(brokers, goka.GroupTable(group), new(cofire.EntryCodec))

user, _ := view.Get("user")
u := user.(*cofire.Entry).U
product, _ := view.Get("product")
p := ep.(*cofire.Entry).P

prediction := u.Predict(p, bias)
Global bias

The global bias of SGD is not stored anywhere in the state, only in memory. So to apply predictions, one needs to compute the bias manually. However, if one is simply creating product recommendations for a user, bias can be set to 0 since that won't affect the sorted order of the scored products.

How to contribute

Contributions are always welcome. Please fork the repo, create a pull request against master, and be sure tests pass. See the GitHub Flow for details.

Documentation

Overview

Package cofire is a generated protocol buffer package.

It is generated from these files:

cofire.proto

It has these top-level messages:

Features
Entry
Rating
Message
Update

Index

Constants

This section is empty.

Variables

View Source
var Stage_name = map[int32]string{
	0: "ENTRY",
	1: "PRODUCT",
	2: "USER",
}
View Source
var Stage_value = map[string]int32{
	"ENTRY":   0,
	"PRODUCT": 1,
	"USER":    2,
}

Functions

func NewLearner

func NewLearner(group goka.Group, validator Validator, params Parameters) *goka.GroupGraph

NewLearner returns the GroupGraph for a learner processor.

func NewRefeeder

func NewRefeeder(cofireGroup goka.Group, delay time.Duration) *goka.GroupGraph

NewRefeeder returns the GroupGraph for a processor that refeeds the input of the learner after a specified delay.

Types

type Entry

type Entry struct {
	U *Features `protobuf:"bytes,1,opt,name=u" json:"u,omitempty"`
	P *Features `protobuf:"bytes,2,opt,name=p" json:"p,omitempty"`
}

Entry are the factors (either U or P) for a user or product.

func (*Entry) Descriptor

func (*Entry) Descriptor() ([]byte, []int)

func (*Entry) GetP

func (m *Entry) GetP() *Features

func (*Entry) GetU

func (m *Entry) GetU() *Features

func (*Entry) ProtoMessage

func (*Entry) ProtoMessage()

func (*Entry) Reset

func (m *Entry) Reset()

func (*Entry) String

func (m *Entry) String() string

type EntryCodec

type EntryCodec struct{}

func (*EntryCodec) Decode

func (c *EntryCodec) Decode(b []byte) (interface{}, error)

func (*EntryCodec) Encode

func (c *EntryCodec) Encode(v interface{}) ([]byte, error)

type ErrorValidator

type ErrorValidator struct {
	// contains filtered or unexported fields
}

ErrorValidator validates each prediction calculating the root mean square error.

func NewErrorValidator

func NewErrorValidator() *ErrorValidator

NewErrorValidator create a new ErrorValidator.

func (*ErrorValidator) Count

func (v *ErrorValidator) Count() int

Count returns the number of values validated.

func (*ErrorValidator) RMSE

func (v *ErrorValidator) RMSE() float64

RMSE returns the current root mean square error.

func (*ErrorValidator) Reset

func (v *ErrorValidator) Reset() float64

Reset resets the state of the ErrorValidator and returns the last value.

func (*ErrorValidator) Validate

func (v *ErrorValidator) Validate(prediction, score float64)

Validate validates the prediction given a score.

type Features

type Features struct {
	V    []float64 `protobuf:"fixed64,1,rep,packed,name=v" json:"v,omitempty"`
	Bias float64   `protobuf:"fixed64,2,opt,name=bias" json:"bias,omitempty"`
}

Features are the factors and a bias for a user or product.

func NewFeatures

func NewFeatures(rank int) *Features

NewFeatures creates a feature vector with rank features.

func (*Features) Descriptor

func (*Features) Descriptor() ([]byte, []int)

func (*Features) GetBias

func (m *Features) GetBias() float64

func (*Features) GetV

func (m *Features) GetV() []float64

func (*Features) Predict

func (f *Features) Predict(o *Features, bias float64) float64

Predict predicts a r^ for a given user and a product

func (*Features) ProtoMessage

func (*Features) ProtoMessage()

func (*Features) Randomize

func (f *Features) Randomize() *Features

Randomize initializes the feature vector with random numbers

func (*Features) Rank

func (f *Features) Rank() int

Rank returns the number of features in the vector.

func (*Features) Reset

func (m *Features) Reset()

func (*Features) String

func (m *Features) String() string

type Learner

type Learner struct {
	// contains filtered or unexported fields
}

Learner factorizes a user-prodcut rating matrix by learning latent features for users and products.

type Message

type Message struct {
	Stage  Stage     `protobuf:"varint,1,opt,name=stage,enum=cofire.Stage" json:"stage,omitempty"`
	Rating *Rating   `protobuf:"bytes,2,opt,name=rating" json:"rating,omitempty"`
	F      *Features `protobuf:"bytes,3,opt,name=f" json:"f,omitempty"`
	Iters  uint32    `protobuf:"varint,4,opt,name=iters" json:"iters,omitempty"`
}

Message are internal messages of the Cofire Learner.

func (*Message) Descriptor

func (*Message) Descriptor() ([]byte, []int)

func (*Message) GetF

func (m *Message) GetF() *Features

func (*Message) GetIters

func (m *Message) GetIters() uint32

func (*Message) GetRating

func (m *Message) GetRating() *Rating

func (*Message) GetStage

func (m *Message) GetStage() Stage

func (*Message) ProtoMessage

func (*Message) ProtoMessage()

func (*Message) Reset

func (m *Message) Reset()

func (*Message) String

func (m *Message) String() string

type Parameters

type Parameters struct {
	// Rank is the number of latent factors (features).
	Rank int
	// Gamma is the learning step.
	Gamma float64
	// Lambda is the regularization parameter.
	Lambda float64
	// Iterations is the number of times the data will be used for training.
	Iterations int
}

Parameters configure the SGD algorithm.

func DefaultParams

func DefaultParams() Parameters

DefaultParams return the default parameters of SGD.

type Rating

type Rating struct {
	UserId    string  `protobuf:"bytes,1,opt,name=user_id,json=userId" json:"user_id,omitempty"`
	ProductId string  `protobuf:"bytes,2,opt,name=product_id,json=productId" json:"product_id,omitempty"`
	Score     float64 `protobuf:"fixed64,3,opt,name=score" json:"score,omitempty"`
}

Rating represents the score that a user gives to a product. Cofire Learner accepts Rating messages to factorize the rating matrix.

func (*Rating) Descriptor

func (*Rating) Descriptor() ([]byte, []int)

func (*Rating) GetProductId

func (m *Rating) GetProductId() string

func (*Rating) GetScore

func (m *Rating) GetScore() float64

func (*Rating) GetUserId

func (m *Rating) GetUserId() string

func (*Rating) ProtoMessage

func (*Rating) ProtoMessage()

func (*Rating) Reset

func (m *Rating) Reset()

func (*Rating) String

func (m *Rating) String() string

type RatingCodec

type RatingCodec struct{}

func (*RatingCodec) Decode

func (c *RatingCodec) Decode(b []byte) (interface{}, error)

func (*RatingCodec) Encode

func (c *RatingCodec) Encode(v interface{}) ([]byte, error)

type SGD

type SGD struct {
	// Gamma constant for learning speed
	Gamma float64

	// Lambda constant for regularization
	Lambda float64
	// contains filtered or unexported fields
}

SGD is a helper to apply stochastic gradient descent.

func NewSGD

func NewSGD(gamma, lambda float64) *SGD

NewSGD returns a configured SGD helper.

func (*SGD) Add

func (s *SGD) Add(bias float64)

Add adds bias to the prediction error. If Add is called multiple times, the average bias is computed.

func (*SGD) Apply

func (s *SGD) Apply(f, o *Features, score float64)

Apply applies the stochastic gradient descent on features f with o and a score. Apply also adds the score to the bias.

func (*SGD) ApplyError

func (s *SGD) ApplyError(f, o *Features, e float64)

ApplyError applies the stochastic gradient descent on features f with o and error e.

func (*SGD) Bias

func (s *SGD) Bias() float64

Bias returns the average bias added to the sgd object.

func (*SGD) Error

func (s *SGD) Error(f, o *Features, score float64) float64

Error computes the error between the score prediction (with f and o) and the real score.

type Stage

type Stage int32

Stage are the internal stages of the cofire learner.

const (
	Stage_ENTRY   Stage = 0
	Stage_PRODUCT Stage = 1
	Stage_USER    Stage = 2
)

func (Stage) EnumDescriptor

func (Stage) EnumDescriptor() ([]byte, []int)

func (Stage) String

func (x Stage) String() string

type Update

type Update struct {
	U *Features `protobuf:"bytes,1,opt,name=u" json:"u,omitempty"`
	P *Features `protobuf:"bytes,2,opt,name=p" json:"p,omitempty"`
}

Update messages overwrite the U or P features of in the user/product's entry.

func (*Update) Descriptor

func (*Update) Descriptor() ([]byte, []int)

func (*Update) GetP

func (m *Update) GetP() *Features

func (*Update) GetU

func (m *Update) GetU() *Features

func (*Update) ProtoMessage

func (*Update) ProtoMessage()

func (*Update) Reset

func (m *Update) Reset()

func (*Update) String

func (m *Update) String() string

type UpdateCodec

type UpdateCodec struct{}

func (*UpdateCodec) Decode

func (c *UpdateCodec) Decode(b []byte) (interface{}, error)

func (*UpdateCodec) Encode

func (c *UpdateCodec) Encode(v interface{}) ([]byte, error)

type Validator

type Validator interface {
	// Validate validates the prediction given a score.
	Validate(prediction, score float64)
}

Validator is used by the processor to validate each incoming rating.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL