jocko

package module
v0.0.0-...-d1cd037 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 5, 2017 License: MIT Imports: 7 Imported by: 0

README

JOCKO

Kafka/distributed commit log service in Go.

Travis CI

Goals of this project:

  • Implement Kafka in Go
  • Protocol compatible with Kafka so Kafka clients and services work with Jocko
  • Make operating simpler
  • Distribute a single binary
  • Use Serf for discovery, Raft for consensus (and remove the need to run ZooKeeper)
  • Smarter configuration settings
    • Able to use percentages of disk space for retention policies rather than only bytes and time kept
    • Handling size configs when you change the number of partitions or add topics
  • Learn a lot and have fun

TODO

  • Producing
  • Fetching
  • Partition consensus and distribution
  • Protocol
    • Produce
    • Fetch
    • Metadata
    • Create Topics
    • Delete Topics
    • Consumer group
  • Discovery
  • API versioning
  • Replication [current task]
  • Tests [current task]

Reading

Project Layout

├── broker        broker subsystem
├── cmd           commands
│   └── jocko     command to run a Jocko broker and manage topics
├── commitlog     low-level commit log implementation
├── examples      examples running/using Jocko
│   ├── cluster   example booting up a 3-broker Jocko cluster
│   └── sarama    example producing/consuming with Sarama
├── protocol      golang implementation of Kafka's protocol
├── raft          wrapper around Hashicorp's Raft lib to handle consensus
├── serf          wrapper around Hashicorp's Serf lib to handle service discovery
├── prometheus    wrapper around Prometheus' client lib to handle metrics
├── server        API subsystem
└── testutil      test utils
    └── mock      mocks of the various subsystems

License

MIT


Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Broker

type Broker interface {
	Run(context.Context, <-chan Request, chan<- Response)
	Join(addr ...string) protocol.Error
	Shutdown() error
}

Broker is the interface that wraps the Broker's methods.

type Client

type Client interface {
	FetchMessages(clientID string, fetchRequest *protocol.FetchRequest) (*protocol.FetchResponses, error)
	CreateTopic(clientID string, createRequest *protocol.CreateTopicRequest) (*protocol.CreateTopicsResponse, error)
}

Client is used to request other brokers.

type ClusterMember

type ClusterMember struct {
	ID   int32  `json:"id"`
	Port int    `json:"port"`
	IP   string `json:"addr"`

	SerfPort int          `json:"-"`
	RaftPort int          `json:"-"`
	Status   MemberStatus `json:"-"`
	// contains filtered or unexported fields
}

ClusterMember is used as a wrapper around a broker's info and a connection to it.

func (*ClusterMember) Addr

func (b *ClusterMember) Addr() *net.TCPAddr

Addr is used to get the address of the member.

func (*ClusterMember) Read

func (b *ClusterMember) Read(p []byte) (int, error)

Read is used to read from the member.

func (*ClusterMember) Write

func (b *ClusterMember) Write(p []byte) (int, error)

Write is used to write the member.

type CommitLog

type CommitLog interface {
	Delete() error
	NewReader(offset int64, maxBytes int32) (io.Reader, error)
	Truncate(int64) error
	NewestOffset() int64
	OldestOffset() int64
	Append([]byte) (int64, error)
}

CommitLog is the interface that wraps the commit log's methods and is used to manage a partition's data.

type Counter

type Counter = prometheus.Counter

Alias prometheus' counter, probably only need to use Inc() though.

type MemberStatus

type MemberStatus int

MemberStatus is the state that a member is in.

const (
	StatusNone MemberStatus = iota
	StatusAlive
	StatusLeaving
	StatusLeft
	StatusFailed
	StatusReap
)

Different possible states of serf member.

type Metrics

type Metrics struct {
	RequestsHandled Counter
}

Metrics is used for tracking metrics.

type Partition

type Partition struct {
	Topic           string  `json:"topic"`
	ID              int32   `json:"id"`
	Replicas        []int32 `json:"replicas"`
	ISR             []int32 `json:"isr"`
	Leader          int32   `json:"leader"`
	PreferredLeader int32   `json:"preferred_leader"`

	LeaderAndISRVersionInZK int32     `json:"-"`
	CommitLog               CommitLog `json:"-"`
	// Conn is a connection to the broker that is this partition's leader, used for replication.
	Conn io.ReadWriter `json:"-"`
}

Partition is the unit of storage in Jocko.

func NewPartition

func NewPartition(topic string, id int32) *Partition

NewPartition is used to create a new partition.

func (*Partition) Append

func (p *Partition) Append(ms []byte) (int64, error)

Append is used to append message sets to the partition.

func (*Partition) Delete

func (p *Partition) Delete() error

Delete is used to delete the partition's data/commitlog.

func (*Partition) HighWatermark

func (p *Partition) HighWatermark() int64

HighWatermark is used to get the newest offset of the partition.

func (*Partition) IsFollowing

func (r *Partition) IsFollowing(id int32) bool

IsFollowing is used to check if the given broker ID's should follow/replicate the leader.

func (*Partition) IsLeader

func (r *Partition) IsLeader(id int32) bool

IsLeader is used to check if the given broker ID's the partition's leader.

func (*Partition) IsOpen

func (r *Partition) IsOpen() bool

IsOpen is used to check whether the partition's commit log has been initialized.

func (*Partition) LeaderID

func (p *Partition) LeaderID() int32

LeaderID is used to get the partition's leader broker ID.

func (*Partition) LowWatermark

func (p *Partition) LowWatermark() int64

LowWatermark is used to oldest offset of the partition.

func (*Partition) NewReader

func (p *Partition) NewReader(offset int64, maxBytes int32) (io.Reader, error)

NewReader is used to create a reader at the given offset and will read up to maxBytes.

func (*Partition) Read

func (p *Partition) Read(b []byte) (int, error)

Write is used to directly read the given bytes from the partition's leader.

func (*Partition) String

func (r *Partition) String() string

String returns the topic/Partition as a string.

func (*Partition) Truncate

func (p *Partition) Truncate(offset int64) error

Truncate is used to truncate the partition's logs before the given offset.

func (*Partition) Write

func (p *Partition) Write(b []byte) (int, error)

Write is used to directly write the given bytes to the partition's leader.

type Raft

type Raft interface {
	Bootstrap(serf Serf, serfEventCh <-chan *ClusterMember, commandCh chan<- RaftCommand) error
	Apply(cmd RaftCommand) error
	IsLeader() bool
	LeaderID() string
	Shutdown() error
	Addr() string
}

Raft is the interface that wraps Raft's methods and is used to manage consensus for the Jocko cluster.

type RaftCmdType

type RaftCmdType int

type RaftCommand

type RaftCommand struct {
	Cmd  RaftCmdType      `json:"type"`
	Data *json.RawMessage `json:"data"`
}

type Request

type Request struct {
	Conn    io.ReadWriter
	Header  *protocol.RequestHeader
	Request interface{}
}

Request represents an API request.

type Response

type Response struct {
	Conn     io.ReadWriter
	Header   *protocol.RequestHeader
	Response interface{}
}

Request represents an API request.

type Serf

type Serf interface {
	Bootstrap(node *ClusterMember, reconcileCh chan<- *ClusterMember) error
	Cluster() []*ClusterMember
	Member(memberID int32) *ClusterMember
	Join(addrs ...string) (int, error)
	Shutdown() error
	ID() int32
}

Serf is the interface that wraps Serf methods and is used to manage the cluster membership for Jocko nodes.

Directories

Path Synopsis
cmd
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL