streamer

package
v0.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2019 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Static = iota
	Dynamic
	Increment
	DynInc
)

Variables

This section is empty.

Functions

func BSearch

func BSearch(records []*IncRecord, progress int64) int

Types

type BaseInfo

type BaseInfo struct {
	Name        string
	Progress    int64
	DataVersion int
	Data        map[container.MapKey]interface{}
}

type BaseReq

type BaseReq struct {
	Name     string
	Space    string
	Progress int64
}

type BaseRes

type BaseRes struct {
	Status   Status // streamer name
	BaseInfo *BaseInfo
}

type BiFrostStreamerCfg

type BiFrostStreamerCfg struct {
	Name         string      // streamer名字
	NameSpace    string      // streamer命名空间
	Version      int         // 数据格式的版本
	URI          string      //
	BaseFilePath string      // 基准文件路径
	Interval     int         // 增量更新时间间隔
	IsSync       bool        // 是否同步加载
	IsOnline     bool        // 离线模式生效
	WriteFile    bool        // 离线模式生效
	UserData     interface{} // 用户自定义数据
	Logger       log.BiLogger
}

type BifrostService

type BifrostService struct {
	StreamerManager *StreamerProviderManager
}

func NewBifrostServer

func NewBifrostServer(streamerManager *StreamerProviderManager) *BifrostService

func (*BifrostService) GetBase

func (bs *BifrostService) GetBase(ctx context.Context, req *BaseReq, res *BaseRes) error

func (*BifrostService) GetInc

func (bs *BifrostService) GetInc(ctx context.Context, req *IncReq, res *IncRes) error

type BifrostStreamer

type BifrostStreamer struct {
	// contains filtered or unexported fields
}

func NewBiFrostStreamer

func NewBiFrostStreamer(cfg *BiFrostStreamerCfg) *BifrostStreamer

func (*BifrostStreamer) GetContainer

func (bs *BifrostStreamer) GetContainer() container.Container

func (*BifrostStreamer) GetSchedInfo

func (bs *BifrostStreamer) GetSchedInfo() *SchedInfo

func (*BifrostStreamer) HasNext

func (bs *BifrostStreamer) HasNext() bool

func (*BifrostStreamer) InfoStatus

func (fs *BifrostStreamer) InfoStatus(s string)

func (*BifrostStreamer) Next

func (bs *BifrostStreamer) Next() (container.DataMode, container.MapKey, interface{}, error)

func (*BifrostStreamer) SetContainer

func (bs *BifrostStreamer) SetContainer(container container.Container)

func (*BifrostStreamer) UpdateData

func (bs *BifrostStreamer) UpdateData(ctx context.Context) error

func (*BifrostStreamer) WarnStatus

func (fs *BifrostStreamer) WarnStatus(s string)

type DataParser

type DataParser interface {
	Parse([]byte, interface{}) []ParserResult
}

type DefaultTextParser

type DefaultTextParser struct {
}

func (*DefaultTextParser) Parse

func (*DefaultTextParser) Parse(data []byte, userData interface{}) []ParserResult

type GobCodec

type GobCodec struct {
}

func (*GobCodec) Decode

func (c *GobCodec) Decode(data []byte, i interface{}) error

func (*GobCodec) Encode

func (c *GobCodec) Encode(i interface{}) ([]byte, error)

type IncRecord

type IncRecord struct {
	DataMode container.DataMode
	MapKey   container.MapKey
	Progress int64 // 更新时间
	Value    interface{}
}

type IncReq

type IncReq struct {
	Name     string
	Space    string
	Batch    int
	Progress int64
}

type IncRes

type IncRes struct {
	Status     Status // streamer name
	IncRecords []*IncRecord
}

type LocalFileStreamer

type LocalFileStreamer struct {
	// contains filtered or unexported fields
}

func NewFileStreamer

func NewFileStreamer(cfg *LocalFileStreamerCfg) *LocalFileStreamer

func (*LocalFileStreamer) GetContainer

func (fs *LocalFileStreamer) GetContainer() container.Container

func (*LocalFileStreamer) GetSchedInfo

func (fs *LocalFileStreamer) GetSchedInfo() *SchedInfo

func (*LocalFileStreamer) HasNext

func (fs *LocalFileStreamer) HasNext() bool

func (*LocalFileStreamer) InfoStatus

func (ms *LocalFileStreamer) InfoStatus(s string)

func (*LocalFileStreamer) Next

func (fs *LocalFileStreamer) Next() (container.DataMode, container.MapKey, interface{}, error)

func (*LocalFileStreamer) SetContainer

func (fs *LocalFileStreamer) SetContainer(container container.Container)

func (*LocalFileStreamer) UpdateData

func (fs *LocalFileStreamer) UpdateData(ctx context.Context) error

func (*LocalFileStreamer) WarnStatus

func (ms *LocalFileStreamer) WarnStatus(s string)

type LocalFileStreamerCfg

type LocalFileStreamerCfg struct {
	Name       string
	Path       string
	UpdatMode  UpdatMode
	Interval   int
	IsSync     bool
	DataParser DataParser
	UserData   interface{}
	Logger     log.BiLogger
}

type MongoStreamer

type MongoStreamer struct {
	// contains filtered or unexported fields
}

func NewMongoStreamer

func NewMongoStreamer(mongoConfig *MongoStreamerCfg) (*MongoStreamer, error)

func (*MongoStreamer) GetContainer

func (ms *MongoStreamer) GetContainer() container.Container

func (*MongoStreamer) GetSchedInfo

func (ms *MongoStreamer) GetSchedInfo() *SchedInfo

func (*MongoStreamer) HasNext

func (ms *MongoStreamer) HasNext() bool

func (*MongoStreamer) InfoStatus

func (ms *MongoStreamer) InfoStatus(s string)

func (*MongoStreamer) Next

func (ms *MongoStreamer) Next() (container.DataMode, container.MapKey, interface{}, error)

func (*MongoStreamer) SetContainer

func (ms *MongoStreamer) SetContainer(container container.Container)

func (*MongoStreamer) UpdateData

func (ms *MongoStreamer) UpdateData(ctx context.Context) error

func (*MongoStreamer) WarnStatus

func (ms *MongoStreamer) WarnStatus(s string)

type MongoStreamerCfg

type MongoStreamerCfg struct {
	Name           string
	UpdatMode      UpdatMode
	IncInterval    int
	IsSync         bool
	URI            string
	DB             string
	Collection     string
	ConnectTimeout int
	ReadTimeout    int
	BaseParser     DataParser
	IncParser      DataParser
	BaseQuery      interface{}
	IncQuery       interface{}
	UserData       interface{}
	FindOpt        *options.FindOptions
	OnBeforeBase   func(interface{}) interface{}
	OnBeforeInc    func(interface{}) interface{}
	Logger         log.BiLogger
}

type ParserResult

type ParserResult struct {
	DataMode container.DataMode
	Key      container.MapKey
	Value    interface{}
	Err      error
}

type Sched

type Sched []*SchedUnit

func (*Sched) AddStreamer

func (s *Sched) AddStreamer(name string, dataStreamer Streamer)

func (Sched) Len

func (s Sched) Len() int

func (Sched) Less

func (s Sched) Less(i, j int) bool

func (*Sched) Pop

func (s *Sched) Pop() interface{}

func (*Sched) Push

func (s *Sched) Push(x interface{})

func (*Sched) Schedule

func (s *Sched) Schedule(ctx context.Context)

func (Sched) Swap

func (s Sched) Swap(i, j int)

func (*Sched) Top

func (s *Sched) Top() *SchedUnit

type SchedInfo

type SchedInfo struct {
	TimeInterval int
}

type SchedUnit

type SchedUnit struct {
	// contains filtered or unexported fields
}

type Status

type Status int
const (
	Ok Status = iota
	Error
)

type Streamer

type Streamer interface {
	SetContainer(container.Container)
	GetContainer() container.Container
	GetSchedInfo() *SchedInfo
	UpdateData(ctx context.Context) error
}

type StreamerProvider

type StreamerProvider struct {
	Cfg      *StreamerProviderCfg
	BaseInfo *BaseInfo
	Cached   []*IncRecord
	// contains filtered or unexported fields
}

func NewStreamerProvider

func NewStreamerProvider(cfg *StreamerProviderCfg) *StreamerProvider

func (*StreamerProvider) AddInc

func (sp *StreamerProvider) AddInc(incs []*IncRecord)

func (*StreamerProvider) GetBase

func (sp *StreamerProvider) GetBase() *BaseInfo

func (*StreamerProvider) GetInc

func (sp *StreamerProvider) GetInc(progress int64, size int) ([]*IncRecord, error)

func (*StreamerProvider) SetBase

func (sp *StreamerProvider) SetBase(baseInfo *BaseInfo)

type StreamerProviderCfg

type StreamerProviderCfg struct {
	Name       string
	ExpireTime int64
	Logger     log.BiLogger
}

type StreamerProviderManager

type StreamerProviderManager struct {
	StreamerProviders map[string]*StreamerProvider
}

func NewStreamerProviderManager

func NewStreamerProviderManager() *StreamerProviderManager

func (*StreamerProviderManager) GetProvider

func (spm *StreamerProviderManager) GetProvider(name string, progress int64) *StreamerProvider

func (*StreamerProviderManager) RegiterProvider

func (spm *StreamerProviderManager) RegiterProvider(name string, provider *StreamerProvider) error

type UpdatMode

type UpdatMode int64

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL