discovery

package
v0.0.0-...-d989b54 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2020 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrCodeInternal = 500 //内部错误

)

Variables

View Source
var (
	ErrServiceNotFound  = errors.New("service not found")
	ErrZoneNotFound     = errors.New("zone not found")
	ErrInstanceNotFound = errors.New("instance not found")
	ErrInvalidEventData = errors.New("event data is invalid")
)
View Source
var (
	ErrEmptyService = errors.New("service is empty")
)

Functions

func Deregister

func Deregister(instance *dm.Instance) error

解注册

func GetService

func GetService(serviceName string) (*dm.Service, error)

获取某个服务

func GetServiceNames

func GetServiceNames() ([]string, error)

获取服务名列表

func InitServiceBook

func InitServiceBook(opts ...OpOption) error

初始化services 如果discovery server重启后,service的version字段也需要初始化,目前etcd版本中,读取key的最大modVersion作为service version

func InitSessionBook

func InitSessionBook()

func KeepAlive

func KeepAlive(instance *dm.Instance, registryTTL time.Duration) error

保活

func Polls

func Polls(stream pb.Discovery_PollsServer) error

轮询获取service配置

func Register

func Register(instance *dm.Instance, registryTTL time.Duration) error

注册

func SetProperty

func SetProperty(property *dm.Property) error

设置属性

Types

type OpOption

type OpOption func(option *option)

func WithRegistry

func WithRegistry(registry dr.Registry) OpOption

type Server

type Server struct{}

func (*Server) Deregister

func (s *Server) Deregister(ctx context.Context, req *pb.DeregisterReq) (*empty.Empty, error)

服务解除注册接口,服务下线时调用

func (*Server) KeepAlive

func (s *Server) KeepAlive(ctx context.Context, req *pb.KeepAliveReq) (*empty.Empty, error)

服务心跳接口

func (*Server) Polls

func (s *Server) Polls(stream pb.Discovery_PollsServer) error

批量获取服务的节点信息,如果没有相关数据更新,则阻塞一段时间 @todo 是否需要client携带业务标识,方便后台查看哪些服务订阅了某个服务?

func (*Server) Register

func (s *Server) Register(ctx context.Context, req *pb.RegisterReq) (*empty.Empty, error)

服务注册接口,服务就绪后调用

type ServiceBook

type ServiceBook struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

所有服务列表

func (*ServiceBook) FindInstance

func (book *ServiceBook) FindInstance(serviceName, zone, id string) (*dm.Instance, error)

从ServiceBook中查找对应的instance

func (*ServiceBook) GetUpgradedServices

func (book *ServiceBook) GetUpgradedServices(serviceVersions map[string]int64, reconnect bool) map[string]*dm.Service

根据版本号获取已更新的service,reconnect为true时,代表重新建立连接,此时推全量数据 TODO reconnect

func (*ServiceBook) Watch

func (book *ServiceBook) Watch(discoverySessionBook SessionBooker)

监听服务配置

type ServiceBooker

type ServiceBooker interface {
	FindInstance(serviceName, zone, id string) (*dm.Instance, error)
	GetUpgradedServices(serviceVersions map[string]int64, reconnect bool) map[string]*dm.Service
}

type Session

type Session struct {
	//唯一id,server端接收到请求时生成
	Id string
	//stream流信息
	Stream pb.Discovery_PollsServer
	//订阅者
	Subscriber string
	//订阅的服务列表
	ServiceNames []string
	//变动的服务列表
	Services chan map[string]*dm.Service
	//session.Loop消费
	CloseCh chan struct{}
	//error,主要由stream Recv、Send产生,在rpc接口中消费,一旦出错,则关闭当前session
	ErrCh chan error
}

会话信息

func NewSession

func NewSession(subscriber string, stream pb.Discovery_PollsServer, serviceNames []string, serviceBook ServiceBooker) *Session

创建session,生成唯一的id

func (*Session) CheckUpgradedService

func (session *Session) CheckUpgradedService(serviceBook ServiceBooker, req *pb.PollsReq, reconnect bool)

根据client的请求参数,检查需要更新的service,如果reconnect时,推全量数据

func (*Session) Close

func (session *Session) Close()

关闭session

type SessionBook

type SessionBook struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

会话列表信息

func (*SessionBook) Add

func (book *SessionBook) Add(session *Session)

添加session会话

func (*SessionBook) Delete

func (book *SessionBook) Delete(session *Session)

删除session会话

func (*SessionBook) GetSubscribers

func (book *SessionBook) GetSubscribers(serviceName string) ([]string, error)

获取某个服务的订阅列表,如果后台需要改数据,需要考虑分布式多节点情况,最好周期存库,方便处理

func (*SessionBook) Push

func (book *SessionBook) Push(upgradeServices map[string]*dm.Service) error

推送service变更

func (book *SessionBook) Push(serviceName string) error {
	book.RLock()
	defer book.RUnlock()

	sessions, ok := book.sessions[serviceName]
	if !ok {
		return nil
	}

	//获取已经更新的服务配置项
	upgradeServices := registryServiceBook.GetUpgradedServices(map[string]int64{serviceName: 0}, false)
	if len(upgradeServices) == 0 {
		logger.Logex.Error("SessionBook Push GetUpgradedSe"+
			"rvices error:", serviceName)
		return ErrEmptyService
	}

	for _, session := range sessions {
		session.Services <- upgradeServices
	}
	return nil
}

减少外部依赖,便于添加单元测试,同时支持批量push

type SessionBooker

type SessionBooker interface {
	Add(session *Session)
	Delete(session *Session)
	GetSubscribers(serviceName string) ([]string, error)
	Push(upgradeServices map[string]*dm.Service) error
	sync.Locker
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL