dstore

package
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2019 License: BSD-3-Clause Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TIMEINTERVAL = 30 * 1000 * 1000 * 1000 // proxy 链接 后端 超时时间 为 3 秒,清空 30s 之前的数据,30 * 1000 * 1000 * 1000
	QUEUECAP     = 60
)
View Source
const (
	FeedbackNonConnectErrSet     = -10
	FeedbackNonConnectErrDelete  = -10
	FeedbackConnectErrDefault    = -2
	FeedbackNonConnectErrDefault = -5
)
View Source
const CONSISTENTLEN = 100
View Source
const (
	// 最少保留 MINKEYS/count 的 key 在某一个节点上
	MINKEYS = 1
)
View Source
const RINGLEN = 60
View Source
const (
	WAIT_FOR_RETRY = "wait for retry"
)

Variables

View Source
var (
	ErrQueueFull  = errors.New("queue full")
	ErrQueueEmpty = errors.New("queue empty")
)
View Source
var (
	// ErrWriteFailed 表示成功写入的节点数小于 StorageClient.W
	ErrWriteFailed = errors.New("write failed")
)

Functions

func InitGlobalManualScheduler

func InitGlobalManualScheduler(route *dbcfg.RouteTable, n int)

Types

type Bucket

type Bucket struct {
	ID int
	// contains filtered or unexported fields
}

func (*Bucket) GetHosts

func (bucket *Bucket) GetHosts(key string) (hosts []*Host)

get host by key

func (*Bucket) ReBalance

func (bucket *Bucket) ReBalance()

type ByName

type ByName []*HostInBucket

func (ByName) Len

func (b ByName) Len() int

func (ByName) Less

func (b ByName) Less(i, j int) bool

func (ByName) Swap

func (b ByName) Swap(i, j int)

type Feedback

type Feedback struct {
	// contains filtered or unexported fields
}

type Host

type Host struct {
	// Addr is host:port pair
	Addr string

	// Index is the index of host in Scheduler.hosts
	Index int

	sync.Mutex
	// contains filtered or unexported fields
}

func NewHost

func NewHost(addr string) *Host

func (*Host) Append

func (host *Host) Append(key string, value []byte) (bool, error)

func (*Host) Close

func (host *Host) Close()

func (*Host) Delete

func (host *Host) Delete(key string) (bool, error)

func (*Host) Get

func (host *Host) Get(key string) (*mc.Item, error)

func (*Host) GetMulti

func (host *Host) GetMulti(keys []string) (map[string]*mc.Item, error)

func (*Host) Incr

func (host *Host) Incr(key string, value int) (int, error)

func (*Host) Len

func (host *Host) Len() int

func (*Host) Process

func (host *Host) Process(key string, args []string) (string, string)

func (*Host) Set

func (host *Host) Set(key string, item *mc.Item, noreply bool) (bool, error)

type HostInBucket

type HostInBucket struct {
	// contains filtered or unexported fields
}

type ManualScheduler

type ManualScheduler struct {
	N int
	// contains filtered or unexported fields
}

route request by configure

func NewManualScheduler

func NewManualScheduler(route *dbcfg.RouteTable, n int) *ManualScheduler

func (*ManualScheduler) Close

func (sch *ManualScheduler) Close()

func (*ManualScheduler) DivideKeysByBucket

func (sch *ManualScheduler) DivideKeysByBucket(keys []string) [][]string

func (*ManualScheduler) Feedback

func (sch *ManualScheduler) Feedback(host *Host, key string, startTime time.Time, data float64)

func (*ManualScheduler) FeedbackError

func (sch *ManualScheduler) FeedbackError(host *Host, key string, startTime time.Time, errorCode float64)

func (*ManualScheduler) FeedbackLatency

func (sch *ManualScheduler) FeedbackLatency(host *Host, key string, startTime time.Time, timeUsed time.Duration)

func (*ManualScheduler) GetBucketInfo

func (sch *ManualScheduler) GetBucketInfo(bucketID int64) map[string]map[string]map[string][]Response

return addr:score:offset:response

func (*ManualScheduler) GetHostsByKey

func (sch *ManualScheduler) GetHostsByKey(key string) (hosts []*Host)

func (*ManualScheduler) LatenciesStats

func (sch *ManualScheduler) LatenciesStats() map[string]map[string][QUEUECAP]Response

func (*ManualScheduler) Partition

func (sch *ManualScheduler) Partition() map[string]map[string]int

func (*ManualScheduler) Stats

func (sch *ManualScheduler) Stats() map[string]map[string]float64

Stats return the score of eache addr, it's used in web interface. Result structure is { bucket1: {host1: score1, host2: score2, ...}, ... }

type Partition

type Partition struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

一致性哈希变种

func NewPartition

func NewPartition(count int, nodesNum int) *Partition

--- Partition --------------------------------------------------------------

type Response

type Response struct {
	ReqTime time.Time
	Count   int
	Sum     float64
}

type RingQueue

type RingQueue struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewRingQueue

func NewRingQueue() *RingQueue

func (*RingQueue) Get

func (q *RingQueue) Get(num, dataType int) (responses []Response)

func (*RingQueue) Push

func (q *RingQueue) Push(start time.Time, ResTime float64, dataType int) error

type Scheduler

type Scheduler interface {
	// feedback for auto routing
	FeedbackError(host *Host, key string, startTime time.Time, errorCode float64)
	FeedbackLatency(host *Host, key string, startTime time.Time, timeUsed time.Duration)

	// route a key to hosts
	GetHostsByKey(key string) (hosts []*Host)

	// route some keys to group of hosts
	DivideKeysByBucket(keys []string) [][]string

	// internal status
	Stats() map[string]map[string]float64

	// get latencies of hosts in the bucket
	LatenciesStats() map[string]map[string][QUEUECAP]Response

	// get percentage of hosts in the bucket
	Partition() map[string]map[string]int

	// return average latency  and arc(percentage)
	GetBucketInfo(bucketID int64) map[string]map[string]map[string][]Response

	Close()
}

Scheduler: route request to nodes

func GetScheduler

func GetScheduler() Scheduler

type Storage

type Storage struct {
}

func (*Storage) Client

func (s *Storage) Client() mc.StorageClient

type StorageClient

type StorageClient struct {
	// SuccessedTargets is a list of addrs on which the client request was ran
	// successfully.
	SuccessedTargets []string

	// Dynamo NWR model, please refer to Dynamo paper for details.
	N, W, R int
	// contains filtered or unexported fields
}

client for gobeansdb

func NewStorageClient

func NewStorageClient(n int, w int, r int) (c *StorageClient)

func (*StorageClient) Append

func (c *StorageClient) Append(key string, value []byte) (ok bool, err error)

func (*StorageClient) Clean

func (c *StorageClient) Clean()

func (*StorageClient) Close

func (c *StorageClient) Close()

func (*StorageClient) Delete

func (c *StorageClient) Delete(key string) (flag bool, err error)

TODO: 弄清楚为什么 delete 不遵循 NWR 规则

func (*StorageClient) Get

func (c *StorageClient) Get(key string) (item *mc.Item, err error)

func (*StorageClient) GetMulti

func (c *StorageClient) GetMulti(keys []string) (rs map[string]*mc.Item, err error)

func (*StorageClient) GetSuccessedTargets

func (c *StorageClient) GetSuccessedTargets() []string

func (*StorageClient) Incr

func (c *StorageClient) Incr(key string, value int) (result int, err error)

NOTE: Incr command may has consistency problem link: http://github.com/douban/gobeansproxy/issues/7

func (*StorageClient) Len

func (c *StorageClient) Len() int

func (*StorageClient) Process

func (c *StorageClient) Process(key string, args []string) (string, string)

func (*StorageClient) Set

func (c *StorageClient) Set(key string, item *mc.Item, noreply bool) (ok bool, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL