cluster

package
v0.8.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2014 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PER_SERVER_BUFFER_SIZE  = 10
	LOCAL_WRITE_BUFFER_SIZE = 10
)
View Source
const (
	SEVEN_DAYS                        = time.Hour * 24 * 7
	DEFAULT_SPLIT                     = 1
	DEFAULT_REPLICATION_FACTOR        = 1
	DEFAULT_SHARD_DURATION            = SEVEN_DAYS
	DEFAULT_RETENTION_POLICY_DURATION = 0
)
View Source
const (
	DEFAULT_SHARD_SPACE_NAME = "default"
)
View Source
const (
	HEARTBEAT_TIMEOUT = 100 * time.Millisecond
)

Variables

View Source
var HEARTBEAT_TYPE = protocol.Request_HEARTBEAT

Functions

func HashDbAndSeriesToInt

func HashDbAndSeriesToInt(database, series string) int

func HashPassword

func HashPassword(password string) ([]byte, error)

func SortShardsByTimeAscending

func SortShardsByTimeAscending(shards []*ShardData)

func SortShardsByTimeDescending

func SortShardsByTimeDescending(shards []*ShardData)

Types

type ByShardTimeAsc

type ByShardTimeAsc struct{ ShardCollection }

func (ByShardTimeAsc) Less

func (s ByShardTimeAsc) Less(i, j int) bool

type ByShardTimeDesc

type ByShardTimeDesc struct{ ShardCollection }

func (ByShardTimeDesc) Less

func (s ByShardTimeDesc) Less(i, j int) bool

type ClusterAdmin

type ClusterAdmin struct {
	CommonUser `json:"common"`
}

func (*ClusterAdmin) GetReadPermission

func (self *ClusterAdmin) GetReadPermission() string

func (*ClusterAdmin) GetWritePermission

func (self *ClusterAdmin) GetWritePermission() string

func (*ClusterAdmin) HasReadAccess

func (self *ClusterAdmin) HasReadAccess(_ string) bool

func (*ClusterAdmin) HasWriteAccess

func (self *ClusterAdmin) HasWriteAccess(_ string) bool

func (*ClusterAdmin) IsClusterAdmin

func (self *ClusterAdmin) IsClusterAdmin() bool

func (*ClusterAdmin) IsDbAdmin

func (self *ClusterAdmin) IsDbAdmin(_ string) bool

type ClusterConfiguration

type ClusterConfiguration struct {
	DatabaseReplicationFactors map[string]struct{}

	ParsedContinuousQueries map[string]map[uint32]*parser.SelectQuery

	LocalServer *ClusterServer

	LocalRaftName string

	MetaStore *metastore.Store
	// contains filtered or unexported fields
}

This struct stores all the metadata confiugration information about a running cluster. This includes the servers in the cluster and their state, databases, users, and which continuous queries are running.

func NewClusterConfiguration

func NewClusterConfiguration(
	config *configuration.Configuration,
	wal WAL,
	shardStore LocalShardStore,
	connectionCreator func(string) ServerConnection,
	metaStore *metastore.Store) *ClusterConfiguration

func (*ClusterConfiguration) AddPotentialServer

func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer)

func (*ClusterConfiguration) AddShardSpace

func (self *ClusterConfiguration) AddShardSpace(space *ShardSpace) error

func (*ClusterConfiguration) AddShards

func (self *ClusterConfiguration) AddShards(shards []*NewShardData) ([]*ShardData, error)

Add shards expects all shards to be of the same type (long term or short term) and have the same start and end times. This is called to add the shard set for a given duration. If existing shards have the same times, those are returned.

func (*ClusterConfiguration) AuthenticateClusterAdmin

func (self *ClusterConfiguration) AuthenticateClusterAdmin(username, password string) (common.User, error)

func (*ClusterConfiguration) AuthenticateDbUser

func (self *ClusterConfiguration) AuthenticateDbUser(db, username, password string) (common.User, error)

func (*ClusterConfiguration) ChangeDbUserPassword

func (self *ClusterConfiguration) ChangeDbUserPassword(db, username, hash string) error

func (*ClusterConfiguration) ChangeDbUserPermissions

func (self *ClusterConfiguration) ChangeDbUserPermissions(db, username, readPermissions, writePermissions string) error

func (*ClusterConfiguration) ChangeProtobufConnectionString

func (self *ClusterConfiguration) ChangeProtobufConnectionString(server *ClusterServer)

func (*ClusterConfiguration) CreateCheckpoint

func (self *ClusterConfiguration) CreateCheckpoint() error

func (*ClusterConfiguration) CreateContinuousQuery

func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string) error

func (*ClusterConfiguration) CreateDatabase

func (self *ClusterConfiguration) CreateDatabase(name string) error

func (*ClusterConfiguration) DatabaseExists

func (self *ClusterConfiguration) DatabaseExists(name string) bool

func (*ClusterConfiguration) DatabasesExists

func (self *ClusterConfiguration) DatabasesExists(db string) bool

func (*ClusterConfiguration) DeleteContinuousQuery

func (self *ClusterConfiguration) DeleteContinuousQuery(db string, id uint32) error

func (*ClusterConfiguration) DropDatabase

func (self *ClusterConfiguration) DropDatabase(name string) error

func (*ClusterConfiguration) DropSeries

func (self *ClusterConfiguration) DropSeries(database, series string) error

func (*ClusterConfiguration) DropShard

func (self *ClusterConfiguration) DropShard(shardId uint32, serverIds []uint32) error

func (*ClusterConfiguration) GetClusterAdmin

func (self *ClusterConfiguration) GetClusterAdmin(username string) *ClusterAdmin

func (*ClusterConfiguration) GetClusterAdmins

func (self *ClusterConfiguration) GetClusterAdmins() (names []string)

func (*ClusterConfiguration) GetContinuousQueries

func (self *ClusterConfiguration) GetContinuousQueries(db string) []*ContinuousQuery

func (*ClusterConfiguration) GetDatabases

func (self *ClusterConfiguration) GetDatabases() []*Database

func (*ClusterConfiguration) GetDbUser

func (self *ClusterConfiguration) GetDbUser(db, username string) *DbUser

func (*ClusterConfiguration) GetDbUsers

func (self *ClusterConfiguration) GetDbUsers(db string) []common.User

func (*ClusterConfiguration) GetExpiredShards

func (self *ClusterConfiguration) GetExpiredShards() []*ShardData

func (*ClusterConfiguration) GetLocalConfiguration

func (self *ClusterConfiguration) GetLocalConfiguration() *configuration.Configuration

func (*ClusterConfiguration) GetLocalShardById

func (self *ClusterConfiguration) GetLocalShardById(id uint32) *ShardData

This function is for the request handler to get the shard to write a request to locally.

func (*ClusterConfiguration) GetServerById

func (self *ClusterConfiguration) GetServerById(id *uint32) *ClusterServer

func (*ClusterConfiguration) GetServerByProtobufConnectionString

func (self *ClusterConfiguration) GetServerByProtobufConnectionString(connectionString string) *ClusterServer

func (*ClusterConfiguration) GetServerByRaftName

func (self *ClusterConfiguration) GetServerByRaftName(name string) *ClusterServer

func (*ClusterConfiguration) GetShard

func (self *ClusterConfiguration) GetShard(id uint32) *ShardData

func (*ClusterConfiguration) GetShardSpaces

func (self *ClusterConfiguration) GetShardSpaces() []*ShardSpace

func (*ClusterConfiguration) GetShardSpacesForDatabase added in v0.8.4

func (self *ClusterConfiguration) GetShardSpacesForDatabase(database string) []*ShardSpace

func (*ClusterConfiguration) GetShardToWriteToBySeriesAndTime

func (self *ClusterConfiguration) GetShardToWriteToBySeriesAndTime(db, series string, microsecondsEpoch int64) (*ShardData, error)

func (*ClusterConfiguration) GetShards

func (self *ClusterConfiguration) GetShards() []*ShardData

func (*ClusterConfiguration) GetShardsForQuery

func (self *ClusterConfiguration) GetShardsForQuery(querySpec *parser.QuerySpec) (Shards, error)

func (*ClusterConfiguration) HasContinuousQueries

func (self *ClusterConfiguration) HasContinuousQueries() bool

func (*ClusterConfiguration) HasUncommitedWrites

func (self *ClusterConfiguration) HasUncommitedWrites() bool

Return per shard request numbers for the local server and all remote servers

func (*ClusterConfiguration) IsSingleServer

func (self *ClusterConfiguration) IsSingleServer() bool

func (*ClusterConfiguration) LastContinuousQueryRunTime

func (self *ClusterConfiguration) LastContinuousQueryRunTime() time.Time

func (*ClusterConfiguration) MarshalNewShardArrayToShards

func (self *ClusterConfiguration) MarshalNewShardArrayToShards(newShards []*NewShardData) ([]*ShardData, error)

func (*ClusterConfiguration) RecoverFromWAL

func (self *ClusterConfiguration) RecoverFromWAL() error

func (*ClusterConfiguration) Recovery

func (self *ClusterConfiguration) Recovery(b []byte) error

func (*ClusterConfiguration) RemoveServer

func (self *ClusterConfiguration) RemoveServer(server *ClusterServer) error

func (*ClusterConfiguration) RemoveShardSpace

func (self *ClusterConfiguration) RemoveShardSpace(database, name string) error

func (*ClusterConfiguration) Save

func (self *ClusterConfiguration) Save() ([]byte, error)

func (*ClusterConfiguration) SaveClusterAdmin

func (self *ClusterConfiguration) SaveClusterAdmin(u *ClusterAdmin)

func (*ClusterConfiguration) SaveDbUser

func (self *ClusterConfiguration) SaveDbUser(u *DbUser)

func (*ClusterConfiguration) SerializableConfiguration added in v0.8.3

func (self *ClusterConfiguration) SerializableConfiguration() *SavedConfiguration

func (*ClusterConfiguration) ServerId

func (self *ClusterConfiguration) ServerId() uint32

func (*ClusterConfiguration) Servers

func (self *ClusterConfiguration) Servers() []*ClusterServer

func (*ClusterConfiguration) SetContinuousQueryTimestamp

func (self *ClusterConfiguration) SetContinuousQueryTimestamp(timestamp time.Time) error

func (*ClusterConfiguration) SetLastContinuousQueryRunTime

func (self *ClusterConfiguration) SetLastContinuousQueryRunTime(t time.Time)

func (*ClusterConfiguration) SetShardCreator

func (self *ClusterConfiguration) SetShardCreator(shardCreator ShardCreator)

func (*ClusterConfiguration) ShardSpaceExists added in v0.8.2

func (self *ClusterConfiguration) ShardSpaceExists(space *ShardSpace) bool

func (*ClusterConfiguration) UpdateShardSpace added in v0.8.2

func (self *ClusterConfiguration) UpdateShardSpace(space *ShardSpace) error

func (*ClusterConfiguration) WaitForLocalServerLoaded

func (self *ClusterConfiguration) WaitForLocalServerLoaded()

This function will wait until the configuration has received an addPotentialServer command for this local server.

type ClusterServer

type ClusterServer struct {
	Id                       uint32
	RaftName                 string
	State                    ServerState
	RaftConnectionString     string
	ProtobufConnectionString string

	HeartbeatInterval time.Duration
	Backoff           time.Duration
	MinBackoff        time.Duration
	MaxBackoff        time.Duration
	// contains filtered or unexported fields
}

func NewClusterServer

func NewClusterServer(raftName, raftConnectionString, protobufConnectionString string, connection ServerConnection, config *c.Configuration) *ClusterServer

func (*ClusterServer) BufferWrite

func (self *ClusterServer) BufferWrite(request *protocol.Request)

func (*ClusterServer) Connect

func (self *ClusterServer) Connect()

func (*ClusterServer) GetId

func (self *ClusterServer) GetId() uint32

func (*ClusterServer) GetStateName

func (self *ClusterServer) GetStateName() (stateName string)

func (*ClusterServer) IsUp

func (self *ClusterServer) IsUp() bool

func (*ClusterServer) MakeRequest

func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan<- *protocol.Response)

func (*ClusterServer) SetWriteBuffer

func (self *ClusterServer) SetWriteBuffer(writeBuffer *WriteBuffer)

func (*ClusterServer) StartHeartbeat

func (self *ClusterServer) StartHeartbeat()

func (*ClusterServer) Write

func (self *ClusterServer) Write(request *protocol.Request) error

type CommonUser

type CommonUser struct {
	Name          string `json:"name"`
	Hash          string `json:"hash"`
	IsUserDeleted bool   `json:"is_deleted"`
	CacheKey      string `json:"cache_key"`
}

func (*CommonUser) ChangePassword

func (self *CommonUser) ChangePassword(hash string) error

func (*CommonUser) GetDb

func (self *CommonUser) GetDb() string

func (*CommonUser) GetName

func (self *CommonUser) GetName() string

func (*CommonUser) HasReadAccess

func (self *CommonUser) HasReadAccess(name string) bool

func (*CommonUser) HasWriteAccess

func (self *CommonUser) HasWriteAccess(name string) bool

func (*CommonUser) IsClusterAdmin

func (self *CommonUser) IsClusterAdmin() bool

func (*CommonUser) IsDbAdmin

func (self *CommonUser) IsDbAdmin(db string) bool

func (*CommonUser) IsDeleted

func (self *CommonUser) IsDeleted() bool

type ContinuousQuery

type ContinuousQuery struct {
	Id    uint32
	Query string
}

type Database

type Database struct {
	Name string `json:"name"`
}

type DbUser

type DbUser struct {
	CommonUser `json:"common"`
	Db         string     `json:"db"`
	ReadFrom   []*Matcher `json:"read_matchers"`
	WriteTo    []*Matcher `json:"write_matchers"`
	IsAdmin    bool       `json:"is_admin"`
}

func (*DbUser) ChangePermissions

func (self *DbUser) ChangePermissions(readPermissions, writePermissions string)

func (*DbUser) GetDb

func (self *DbUser) GetDb() string

func (*DbUser) GetReadPermission

func (self *DbUser) GetReadPermission() string

func (*DbUser) GetWritePermission

func (self *DbUser) GetWritePermission() string

func (*DbUser) HasReadAccess

func (self *DbUser) HasReadAccess(name string) bool

func (*DbUser) HasWriteAccess

func (self *DbUser) HasWriteAccess(name string) bool

func (*DbUser) IsDbAdmin

func (self *DbUser) IsDbAdmin(db string) bool

type LocalShardDb

type LocalShardDb interface {
	Write(database string, series []*p.Series) error
	Query(*parser.QuerySpec, engine.Processor) error
	DropFields(fields []*metastore.Field) error
	IsClosed() bool
}

type LocalShardStore

type LocalShardStore interface {
	Write(request *p.Request) error
	SetWriteBuffer(writeBuffer *WriteBuffer)
	BufferWrite(request *p.Request)
	GetOrCreateShard(id uint32) (LocalShardDb, error)
	ReturnShard(id uint32)
	DeleteShard(shardId uint32)
}

type Matcher

type Matcher struct {
	IsRegex bool
	Name    string
}

func (*Matcher) Matches

func (self *Matcher) Matches(name string) bool

type NewShardData

type NewShardData struct {
	Id        uint32 `json:",omitempty"`
	SpaceName string
	Database  string
	StartTime time.Time
	EndTime   time.Time
	ServerIds []uint32
}

type NilProcessor added in v0.8.4

type NilProcessor struct{}

func (NilProcessor) Close added in v0.8.4

func (np NilProcessor) Close() error

func (NilProcessor) Name added in v0.8.4

func (np NilProcessor) Name() string

func (NilProcessor) Yield added in v0.8.4

func (np NilProcessor) Yield(s *protocol.Series) (bool, error)

type QuerySpec

type QuerySpec interface {
	GetStartTime() time.Time
	GetEndTime() time.Time
	Database() string
	TableNames() []string
	TableNamesAndRegex() ([]string, *regexp.Regexp)
	GetGroupByInterval() *time.Duration
	AllShardsQuery() bool
	IsRegex() bool
}

defined by cluster config (in cluster package)

type ResponseChannel added in v0.8.4

type ResponseChannel interface {
	Yield(r *protocol.Response) bool
	Name() string
}

ResponseChannel is a processor for Responses as opposed to Series like `engine.Processor'

func NewResponseChannelWrapper added in v0.8.4

func NewResponseChannelWrapper(c chan<- *protocol.Response) ResponseChannel

type ResponseChannelProcessor added in v0.8.4

type ResponseChannelProcessor struct {
	// contains filtered or unexported fields
}

ResponseChannelProcessor converts Series to Responses. This is used to chain `engine.Processor` with a `ResponseChannel'

func NewResponseChannelProcessor added in v0.8.4

func NewResponseChannelProcessor(r ResponseChannel) *ResponseChannelProcessor

func (*ResponseChannelProcessor) Close added in v0.8.4

func (p *ResponseChannelProcessor) Close() error

func (*ResponseChannelProcessor) Name added in v0.8.4

func (p *ResponseChannelProcessor) Name() string

func (*ResponseChannelProcessor) Yield added in v0.8.4

type ResponseChannelWrapper added in v0.8.4

type ResponseChannelWrapper struct {
	// contains filtered or unexported fields
}

A `ResponseProcessor' that wraps a go channel.

func (*ResponseChannelWrapper) Name added in v0.8.4

func (w *ResponseChannelWrapper) Name() string

func (*ResponseChannelWrapper) Yield added in v0.8.4

type SavedConfiguration

type SavedConfiguration struct {
	Databases           map[string]uint8
	Admins              map[string]*ClusterAdmin
	DbUsers             map[string]map[string]*DbUser
	Servers             []*ClusterServer
	ContinuousQueries   map[string][]*ContinuousQuery
	MetaStore           *metastore.Store
	LastShardIdUsed     uint32
	LastServerIdUsed    uint32
	DatabaseShardSpaces map[string][]*ShardSpace
	Shards              []*NewShardData
}

type ServerConnection

type ServerConnection interface {
	Connect()
	Close()
	ClearRequests()
	MakeRequest(*protocol.Request, ResponseChannel) error
	CancelRequest(*protocol.Request)
}

type ServerState

type ServerState int
const (
	LoadingRingData ServerState = iota
	SendingRingData
	DeletingOldData
	Running
	Potential
)

type Shard

type Shard interface {
	Id() uint32
	StartTime() time.Time
	EndTime() time.Time
	Write(*p.Request) error
	SyncWrite(req *p.Request, assignSeqNum bool) error
	Query(querySpec *parser.QuerySpec, response chan<- *p.Response)
	ReplicationFactor() int
	IsMicrosecondInRange(t int64) bool
}

A shard implements an interface for writing and querying data. It can be copied to multiple servers or the local datastore. Shards contains data from [startTime, endTime] Ids are unique across the cluster

type ShardCollection

type ShardCollection []*ShardData

func (ShardCollection) Len

func (s ShardCollection) Len() int

func (ShardCollection) Swap

func (s ShardCollection) Swap(i, j int)

type ShardCreator

type ShardCreator interface {
	// the shard creator expects all shards to be of the same type (long term or short term) and have the same
	// start and end times. This is called to create the shard set for a given duration.
	CreateShards(shards []*NewShardData) ([]*ShardData, error)
	CreateShardSpace(shardSpace *ShardSpace) error
}

type ShardData

type ShardData struct {
	IsLocal   bool
	SpaceName string
	Database  string
	// contains filtered or unexported fields
}

func NewShard

func NewShard(id uint32, startTime, endTime time.Time, database, spaceName string, wal WAL) *ShardData

func (*ShardData) DropFields

func (self *ShardData) DropFields(fields []*metastore.Field) error

func (*ShardData) EndMicro

func (self *ShardData) EndMicro() int64

func (*ShardData) EndTime

func (self *ShardData) EndTime() time.Time

func (*ShardData) HandleDestructiveQuery

func (self *ShardData) HandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan<- *p.Response, runLocalOnly bool)

func (*ShardData) Id

func (self *ShardData) Id() uint32

func (*ShardData) IsMicrosecondInRange

func (self *ShardData) IsMicrosecondInRange(t int64) bool

func (*ShardData) LogAndHandleDestructiveQuery

func (self *ShardData) LogAndHandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan<- *p.Response, runLocalOnly bool)

func (*ShardData) Query

func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan<- *p.Response)

func (*ShardData) QueryResponseBufferSize

func (self *ShardData) QueryResponseBufferSize(querySpec *parser.QuerySpec, batchPointSize int) int

func (*ShardData) ReplicationFactor

func (self *ShardData) ReplicationFactor() int

func (*ShardData) ServerIds

func (self *ShardData) ServerIds() []uint32

func (*ShardData) SetLocalStore

func (self *ShardData) SetLocalStore(store LocalShardStore, localServerId uint32) error

func (*ShardData) SetServers

func (self *ShardData) SetServers(servers []*ClusterServer)

func (*ShardData) ShouldAggregateLocally

func (self *ShardData) ShouldAggregateLocally(querySpec *parser.QuerySpec) bool

Returns true if we can aggregate the data locally per shard, i.e. the group by interval lines up with the shard duration and there are no joins or merges

func (*ShardData) StartMicro

func (self *ShardData) StartMicro() int64

func (*ShardData) StartTime

func (self *ShardData) StartTime() time.Time

func (*ShardData) String

func (self *ShardData) String() string

func (*ShardData) SyncWrite

func (self *ShardData) SyncWrite(request *p.Request, assignSeqNum bool) error

func (*ShardData) ToNewShardData

func (self *ShardData) ToNewShardData() *NewShardData

used to serialize shards when sending around in raft or when snapshotting in the log

func (*ShardData) Write

func (self *ShardData) Write(request *p.Request) error

func (*ShardData) WriteLocalOnly

func (self *ShardData) WriteLocalOnly(request *p.Request) error

type ShardIdInserterProcessor added in v0.8.4

type ShardIdInserterProcessor struct {
	// contains filtered or unexported fields
}

A processor to set the ShardId on the series to `id`

func NewShardIdInserterProcessor added in v0.8.4

func NewShardIdInserterProcessor(id uint32, next engine.Processor) ShardIdInserterProcessor

func (ShardIdInserterProcessor) Close added in v0.8.4

func (sip ShardIdInserterProcessor) Close() error

func (ShardIdInserterProcessor) Name added in v0.8.4

func (sip ShardIdInserterProcessor) Name() string

func (ShardIdInserterProcessor) Yield added in v0.8.4

type ShardSpace

type ShardSpace struct {
	// required, must be unique within the database
	Name string `json:"name"`
	// required, a database has many shard spaces and a shard space belongs to a database
	Database string `json:"database"`
	// this is optional, if they don't set it, we'll set to /.*/
	Regex string `json:"regex"`

	// this is optional, if they don't set it, it will default to the storage.dir in the config
	RetentionPolicy   string `json:"retentionPolicy"`
	ShardDuration     string `json:"shardDuration"`
	ReplicationFactor uint32 `json:"replicationFactor"`
	Split             uint32 `json:"split"`
	// contains filtered or unexported fields
}

func NewShardSpace

func NewShardSpace(database, name string) *ShardSpace

func (*ShardSpace) MatchesSeries

func (s *ShardSpace) MatchesSeries(name string) bool

func (*ShardSpace) ParsedRetentionPeriod

func (s *ShardSpace) ParsedRetentionPeriod() time.Duration

func (*ShardSpace) ParsedShardDuration

func (s *ShardSpace) ParsedShardDuration() time.Duration

func (*ShardSpace) SecondsOfDuration

func (s *ShardSpace) SecondsOfDuration() float64

func (*ShardSpace) SetDefaults

func (s *ShardSpace) SetDefaults()

func (*ShardSpace) UpdateFromSpace added in v0.8.2

func (s *ShardSpace) UpdateFromSpace(space *ShardSpace) error

func (*ShardSpace) Validate

func (s *ShardSpace) Validate(clusterConfig *ClusterConfiguration, checkForDb bool) error

type ShardType

type ShardType int
const (
	LONG_TERM ShardType = iota
	SHORT_TERM
)

type Shards added in v0.8.4

type Shards []*ShardData

func (Shards) ShouldAggregateLocally added in v0.8.4

func (shards Shards) ShouldAggregateLocally(querySpec *parser.QuerySpec) bool

Return true iff we can aggregate locally on all the given shards, false otherwise

type WAL

type WAL interface {
	AssignSequenceNumbersAndLog(request *protocol.Request, shard wal.Shard) (uint32, error)
	AssignSequenceNumbers(request *protocol.Request) error
	Commit(requestNumber uint32, serverId uint32) error
	CreateCheckpoint() error
	RecoverServerFromRequestNumber(requestNumber uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error
	RecoverServerFromLastCommit(serverId uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error
}

type WriteBuffer

type WriteBuffer struct {
	// contains filtered or unexported fields
}

Acts as a buffer for writes

func NewWriteBuffer

func NewWriteBuffer(writerInfo string, writer Writer, wal WAL, serverId uint32, bufferSize int) *WriteBuffer

func (*WriteBuffer) HasUncommitedWrites

func (self *WriteBuffer) HasUncommitedWrites() bool

func (*WriteBuffer) ShardsRequestNumber

func (self *WriteBuffer) ShardsRequestNumber() map[uint32]uint32

func (*WriteBuffer) Write

func (self *WriteBuffer) Write(request *protocol.Request)

This method never blocks. It'll buffer writes until they fill the buffer then drop the on the floor and let the background goroutine replay from the WAL

type Writer

type Writer interface {
	Write(request *protocol.Request) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL