meta

package
v0.10.3-0...-f06509b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2024 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultResourceGroupName = "__default_resource_group"
)
View Source
var ErrNodeNotEnough = errors.New("nodes not enough")
View Source
var NilReplica = newReplica(&querypb.Replica{
	ID: -1,
})

NilReplica is used to represent a nil replica.

Functions

This section is empty.

Types

type Broker

type Broker interface {
	DescribeCollection(ctx context.Context, collectionID UniqueID) (*milvuspb.DescribeCollectionResponse, error)
	GetPartitions(ctx context.Context, collectionID UniqueID) ([]UniqueID, error)
	GetRecoveryInfo(ctx context.Context, collectionID UniqueID, partitionID UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentBinlogs, error)
	ListIndexes(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error)
	GetSegmentInfo(ctx context.Context, segmentID ...UniqueID) (*datapb.GetSegmentInfoResponse, error)
	GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentID UniqueID) ([]*querypb.FieldIndexInfo, error)
	GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error)
}

type ChannelDistFilter

type ChannelDistFilter = func(ch *DmChannel) bool

func WithChannelName2Channel

func WithChannelName2Channel(channelName string) ChannelDistFilter

func WithCollectionID2Channel

func WithCollectionID2Channel(collectionID int64) ChannelDistFilter

func WithNodeID2Channel

func WithNodeID2Channel(nodeID int64) ChannelDistFilter

func WithReplica2Channel

func WithReplica2Channel(replica *Replica) ChannelDistFilter

type ChannelDistManager

type ChannelDistManager struct {
	// contains filtered or unexported fields
}

func NewChannelDistManager

func NewChannelDistManager() *ChannelDistManager

func (*ChannelDistManager) GetByFilter

func (m *ChannelDistManager) GetByFilter(filters ...ChannelDistFilter) []*DmChannel

return all channels in list which match all given filters

func (*ChannelDistManager) GetShardLeader

func (m *ChannelDistManager) GetShardLeader(replica *Replica, shard string) (int64, bool)

todo by liuwei: should consider the case of duplicate leader exists GetShardLeader returns the node whthin the given replicaNodes and subscribing the given shard, returns (0, false) if not found.

func (*ChannelDistManager) GetShardLeadersByReplica

func (m *ChannelDistManager) GetShardLeadersByReplica(replica *Replica) map[string]int64

todo by liuwei: should consider the case of duplicate leader exists

func (*ChannelDistManager) Update

func (m *ChannelDistManager) Update(nodeID UniqueID, channels ...*DmChannel)

type ChannelSegDistFilter

type ChannelSegDistFilter string

func (ChannelSegDistFilter) AddFilter

func (f ChannelSegDistFilter) AddFilter(filter *segDistCriterion)

func (ChannelSegDistFilter) Match

func (f ChannelSegDistFilter) Match(s *Segment) bool

type Collection

type Collection struct {
	*querypb.CollectionLoadInfo
	LoadPercentage int32
	CreatedAt      time.Time
	UpdatedAt      time.Time

	LoadSpan trace.Span
	// contains filtered or unexported fields
}

func (*Collection) Clone

func (collection *Collection) Clone() *Collection

func (*Collection) IsRefreshed

func (collection *Collection) IsRefreshed() bool

func (*Collection) SetRefreshNotifier

func (collection *Collection) SetRefreshNotifier(notifier chan struct{})

type CollectionManager

type CollectionManager struct {
	// contains filtered or unexported fields
}

func NewCollectionManager

func NewCollectionManager(catalog metastore.QueryCoordCatalog) *CollectionManager

func (*CollectionManager) CalculateLoadPercentage

func (m *CollectionManager) CalculateLoadPercentage(collectionID typeutil.UniqueID) int32

CalculateLoadPercentage checks if collection is currently fully loaded.

func (*CollectionManager) CalculateLoadStatus

func (m *CollectionManager) CalculateLoadStatus(collectionID typeutil.UniqueID) querypb.LoadStatus

func (*CollectionManager) Exist

func (m *CollectionManager) Exist(collectionID typeutil.UniqueID) bool

func (*CollectionManager) GetAll

func (m *CollectionManager) GetAll() []int64

GetAll returns the collection ID of all loaded collections

func (*CollectionManager) GetAllCollections

func (m *CollectionManager) GetAllCollections() []*Collection

func (*CollectionManager) GetAllPartitions

func (m *CollectionManager) GetAllPartitions() []*Partition

func (*CollectionManager) GetCollection

func (m *CollectionManager) GetCollection(collectionID typeutil.UniqueID) *Collection

func (*CollectionManager) GetFieldIndex

func (m *CollectionManager) GetFieldIndex(collectionID typeutil.UniqueID) map[int64]int64

func (*CollectionManager) GetLoadType

func (m *CollectionManager) GetLoadType(collectionID typeutil.UniqueID) querypb.LoadType

func (*CollectionManager) GetPartition

func (m *CollectionManager) GetPartition(partitionID typeutil.UniqueID) *Partition

func (*CollectionManager) GetPartitionLoadPercentage

func (m *CollectionManager) GetPartitionLoadPercentage(partitionID typeutil.UniqueID) int32

func (*CollectionManager) GetPartitionsByCollection

func (m *CollectionManager) GetPartitionsByCollection(collectionID typeutil.UniqueID) []*Partition

func (*CollectionManager) GetReplicaNumber

func (m *CollectionManager) GetReplicaNumber(collectionID typeutil.UniqueID) int32

func (*CollectionManager) PutCollection

func (m *CollectionManager) PutCollection(collection *Collection, partitions ...*Partition) error

func (*CollectionManager) PutCollectionWithoutSave

func (m *CollectionManager) PutCollectionWithoutSave(collection *Collection) error

func (*CollectionManager) PutPartition

func (m *CollectionManager) PutPartition(partitions ...*Partition) error

func (*CollectionManager) PutPartitionWithoutSave

func (m *CollectionManager) PutPartitionWithoutSave(partitions ...*Partition) error

func (*CollectionManager) Recover

func (m *CollectionManager) Recover(broker Broker) error

Recover recovers collections from kv store, panics if failed

func (*CollectionManager) RemoveCollection

func (m *CollectionManager) RemoveCollection(collectionID typeutil.UniqueID) error

RemoveCollection removes collection and its partitions.

func (*CollectionManager) RemovePartition

func (m *CollectionManager) RemovePartition(collectionID typeutil.UniqueID, partitionIDs ...typeutil.UniqueID) error

func (*CollectionManager) UpdateLoadPercent

func (m *CollectionManager) UpdateLoadPercent(partitionID int64, loadPercent int32) (int32, error)

type CollectionSegDistFilter

type CollectionSegDistFilter int64

func (CollectionSegDistFilter) AddFilter

func (f CollectionSegDistFilter) AddFilter(filter *segDistCriterion)

func (CollectionSegDistFilter) Match

func (f CollectionSegDistFilter) Match(s *Segment) bool

type CollectionTarget

type CollectionTarget struct {
	// contains filtered or unexported fields
}

CollectionTarget collection target is immutable,

func FromPbCollectionTarget

func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget

func NewCollectionTarget

func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel) *CollectionTarget

func (*CollectionTarget) GetAllDmChannelNames

func (p *CollectionTarget) GetAllDmChannelNames() []string

func (*CollectionTarget) GetAllDmChannels

func (p *CollectionTarget) GetAllDmChannels() map[string]*DmChannel

func (*CollectionTarget) GetAllSegmentIDs

func (p *CollectionTarget) GetAllSegmentIDs() []int64

func (*CollectionTarget) GetAllSegments

func (p *CollectionTarget) GetAllSegments() map[int64]*datapb.SegmentInfo

func (*CollectionTarget) GetTargetVersion

func (p *CollectionTarget) GetTargetVersion() int64

func (*CollectionTarget) IsEmpty

func (p *CollectionTarget) IsEmpty() bool

type CoordinatorBroker

type CoordinatorBroker struct {
	// contains filtered or unexported fields
}

func NewCoordinatorBroker

func NewCoordinatorBroker(
	dataCoord types.DataCoordClient,
	rootCoord types.RootCoordClient,
) *CoordinatorBroker

func (*CoordinatorBroker) DescribeCollection

func (broker *CoordinatorBroker) DescribeCollection(ctx context.Context, collectionID UniqueID) (*milvuspb.DescribeCollectionResponse, error)

func (*CoordinatorBroker) GetIndexInfo

func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentID UniqueID) ([]*querypb.FieldIndexInfo, error)

func (*CoordinatorBroker) GetPartitions

func (broker *CoordinatorBroker) GetPartitions(ctx context.Context, collectionID UniqueID) ([]UniqueID, error)

func (*CoordinatorBroker) GetRecoveryInfo

func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collectionID UniqueID, partitionID UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentBinlogs, error)

func (*CoordinatorBroker) GetRecoveryInfoV2

func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error)

func (*CoordinatorBroker) GetSegmentInfo

func (broker *CoordinatorBroker) GetSegmentInfo(ctx context.Context, ids ...UniqueID) (*datapb.GetSegmentInfoResponse, error)

func (*CoordinatorBroker) ListIndexes

func (broker *CoordinatorBroker) ListIndexes(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error)

type DistributionManager

type DistributionManager struct {
	*SegmentDistManager
	*ChannelDistManager
	*LeaderViewManager
}

func NewDistributionManager

func NewDistributionManager() *DistributionManager

type DmChannel

type DmChannel struct {
	*datapb.VchannelInfo
	Node    int64
	Version int64
}

func DmChannelFromVChannel

func DmChannelFromVChannel(channel *datapb.VchannelInfo) *DmChannel

func (*DmChannel) Clone

func (channel *DmChannel) Clone() *DmChannel

type FailedLoadCache

type FailedLoadCache struct {
	// contains filtered or unexported fields
}
var GlobalFailedLoadCache *FailedLoadCache

func NewFailedLoadCache

func NewFailedLoadCache() *FailedLoadCache

func (*FailedLoadCache) Get

func (l *FailedLoadCache) Get(collectionID int64) error

func (*FailedLoadCache) Put

func (l *FailedLoadCache) Put(collectionID int64, err error)

func (*FailedLoadCache) Remove

func (l *FailedLoadCache) Remove(collectionID int64)

func (*FailedLoadCache) TryExpire

func (l *FailedLoadCache) TryExpire()

type LeaderView

type LeaderView struct {
	ID               int64
	CollectionID     int64
	Channel          string
	Version          int64
	Segments         map[int64]*querypb.SegmentDist
	GrowingSegments  map[int64]*Segment
	TargetVersion    int64
	NumOfGrowingRows int64
}

func (*LeaderView) Clone

func (view *LeaderView) Clone() *LeaderView

type LeaderViewFilter

type LeaderViewFilter interface {
	Match(*LeaderView) bool
	AddFilter(*lvCriterion)
}

func WithChannelName2LeaderView

func WithChannelName2LeaderView(channelName string) LeaderViewFilter

func WithCollectionID2LeaderView

func WithCollectionID2LeaderView(collectionID int64) LeaderViewFilter

func WithNodeID2LeaderView

func WithNodeID2LeaderView(nodeID int64) LeaderViewFilter

func WithReplica2LeaderView

func WithReplica2LeaderView(replica *Replica) LeaderViewFilter

func WithSegment2LeaderView

func WithSegment2LeaderView(segmentID int64, isGrowing bool) LeaderViewFilter

type LeaderViewManager

type LeaderViewManager struct {
	// contains filtered or unexported fields
}

func NewLeaderViewManager

func NewLeaderViewManager() *LeaderViewManager

func (*LeaderViewManager) GetByFilter

func (mgr *LeaderViewManager) GetByFilter(filters ...LeaderViewFilter) []*LeaderView

func (*LeaderViewManager) GetLatestShardLeaderByFilter

func (mgr *LeaderViewManager) GetLatestShardLeaderByFilter(filters ...LeaderViewFilter) *LeaderView

func (*LeaderViewManager) GetLeaderShardView

func (mgr *LeaderViewManager) GetLeaderShardView(id int64, shard string) *LeaderView

func (*LeaderViewManager) Update

func (mgr *LeaderViewManager) Update(leaderID int64, views ...*LeaderView)

Update updates the leader's views, all views have to be with the same leader ID

type Meta

func NewMeta

func NewMeta(
	idAllocator func() (int64, error),
	catalog metastore.QueryCoordCatalog,
	nodeMgr *session.NodeManager,
) *Meta

type MockBroker

type MockBroker struct {
	mock.Mock
}

MockBroker is an autogenerated mock type for the Broker type

func NewMockBroker

func NewMockBroker(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockBroker

NewMockBroker creates a new instance of MockBroker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockBroker) DescribeCollection

func (_m *MockBroker) DescribeCollection(ctx context.Context, collectionID int64) (*milvuspb.DescribeCollectionResponse, error)

DescribeCollection provides a mock function with given fields: ctx, collectionID

func (*MockBroker) EXPECT

func (_m *MockBroker) EXPECT() *MockBroker_Expecter

func (*MockBroker) GetIndexInfo

func (_m *MockBroker) GetIndexInfo(ctx context.Context, collectionID int64, segmentID int64) ([]*querypb.FieldIndexInfo, error)

GetIndexInfo provides a mock function with given fields: ctx, collectionID, segmentID

func (*MockBroker) GetPartitions

func (_m *MockBroker) GetPartitions(ctx context.Context, collectionID int64) ([]int64, error)

GetPartitions provides a mock function with given fields: ctx, collectionID

func (*MockBroker) GetRecoveryInfo

func (_m *MockBroker) GetRecoveryInfo(ctx context.Context, collectionID int64, partitionID int64) ([]*datapb.VchannelInfo, []*datapb.SegmentBinlogs, error)

GetRecoveryInfo provides a mock function with given fields: ctx, collectionID, partitionID

func (*MockBroker) GetRecoveryInfoV2

func (_m *MockBroker) GetRecoveryInfoV2(ctx context.Context, collectionID int64, partitionIDs ...int64) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error)

GetRecoveryInfoV2 provides a mock function with given fields: ctx, collectionID, partitionIDs

func (*MockBroker) GetSegmentInfo

func (_m *MockBroker) GetSegmentInfo(ctx context.Context, segmentID ...int64) (*datapb.GetSegmentInfoResponse, error)

GetSegmentInfo provides a mock function with given fields: ctx, segmentID

func (*MockBroker) ListIndexes

func (_m *MockBroker) ListIndexes(ctx context.Context, collectionID int64) ([]*indexpb.IndexInfo, error)

ListIndexes provides a mock function with given fields: ctx, collectionID

type MockBroker_DescribeCollection_Call

type MockBroker_DescribeCollection_Call struct {
	*mock.Call
}

MockBroker_DescribeCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DescribeCollection'

func (*MockBroker_DescribeCollection_Call) Return

func (*MockBroker_DescribeCollection_Call) Run

func (*MockBroker_DescribeCollection_Call) RunAndReturn

type MockBroker_Expecter

type MockBroker_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockBroker_Expecter) DescribeCollection

func (_e *MockBroker_Expecter) DescribeCollection(ctx interface{}, collectionID interface{}) *MockBroker_DescribeCollection_Call

DescribeCollection is a helper method to define mock.On call

  • ctx context.Context
  • collectionID int64

func (*MockBroker_Expecter) GetIndexInfo

func (_e *MockBroker_Expecter) GetIndexInfo(ctx interface{}, collectionID interface{}, segmentID interface{}) *MockBroker_GetIndexInfo_Call

GetIndexInfo is a helper method to define mock.On call

  • ctx context.Context
  • collectionID int64
  • segmentID int64

func (*MockBroker_Expecter) GetPartitions

func (_e *MockBroker_Expecter) GetPartitions(ctx interface{}, collectionID interface{}) *MockBroker_GetPartitions_Call

GetPartitions is a helper method to define mock.On call

  • ctx context.Context
  • collectionID int64

func (*MockBroker_Expecter) GetRecoveryInfo

func (_e *MockBroker_Expecter) GetRecoveryInfo(ctx interface{}, collectionID interface{}, partitionID interface{}) *MockBroker_GetRecoveryInfo_Call

GetRecoveryInfo is a helper method to define mock.On call

  • ctx context.Context
  • collectionID int64
  • partitionID int64

func (*MockBroker_Expecter) GetRecoveryInfoV2

func (_e *MockBroker_Expecter) GetRecoveryInfoV2(ctx interface{}, collectionID interface{}, partitionIDs ...interface{}) *MockBroker_GetRecoveryInfoV2_Call

GetRecoveryInfoV2 is a helper method to define mock.On call

  • ctx context.Context
  • collectionID int64
  • partitionIDs ...int64

func (*MockBroker_Expecter) GetSegmentInfo

func (_e *MockBroker_Expecter) GetSegmentInfo(ctx interface{}, segmentID ...interface{}) *MockBroker_GetSegmentInfo_Call

GetSegmentInfo is a helper method to define mock.On call

  • ctx context.Context
  • segmentID ...int64

func (*MockBroker_Expecter) ListIndexes

func (_e *MockBroker_Expecter) ListIndexes(ctx interface{}, collectionID interface{}) *MockBroker_ListIndexes_Call

ListIndexes is a helper method to define mock.On call

  • ctx context.Context
  • collectionID int64

type MockBroker_GetIndexInfo_Call

type MockBroker_GetIndexInfo_Call struct {
	*mock.Call
}

MockBroker_GetIndexInfo_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetIndexInfo'

func (*MockBroker_GetIndexInfo_Call) Return

func (*MockBroker_GetIndexInfo_Call) Run

func (_c *MockBroker_GetIndexInfo_Call) Run(run func(ctx context.Context, collectionID int64, segmentID int64)) *MockBroker_GetIndexInfo_Call

func (*MockBroker_GetIndexInfo_Call) RunAndReturn

type MockBroker_GetPartitions_Call

type MockBroker_GetPartitions_Call struct {
	*mock.Call
}

MockBroker_GetPartitions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPartitions'

func (*MockBroker_GetPartitions_Call) Return

func (*MockBroker_GetPartitions_Call) Run

func (*MockBroker_GetPartitions_Call) RunAndReturn

type MockBroker_GetRecoveryInfoV2_Call

type MockBroker_GetRecoveryInfoV2_Call struct {
	*mock.Call
}

MockBroker_GetRecoveryInfoV2_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetRecoveryInfoV2'

func (*MockBroker_GetRecoveryInfoV2_Call) Return

func (*MockBroker_GetRecoveryInfoV2_Call) Run

func (_c *MockBroker_GetRecoveryInfoV2_Call) Run(run func(ctx context.Context, collectionID int64, partitionIDs ...int64)) *MockBroker_GetRecoveryInfoV2_Call

func (*MockBroker_GetRecoveryInfoV2_Call) RunAndReturn

type MockBroker_GetRecoveryInfo_Call

type MockBroker_GetRecoveryInfo_Call struct {
	*mock.Call
}

MockBroker_GetRecoveryInfo_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetRecoveryInfo'

func (*MockBroker_GetRecoveryInfo_Call) Return

func (*MockBroker_GetRecoveryInfo_Call) Run

func (_c *MockBroker_GetRecoveryInfo_Call) Run(run func(ctx context.Context, collectionID int64, partitionID int64)) *MockBroker_GetRecoveryInfo_Call

func (*MockBroker_GetRecoveryInfo_Call) RunAndReturn

type MockBroker_GetSegmentInfo_Call

type MockBroker_GetSegmentInfo_Call struct {
	*mock.Call
}

MockBroker_GetSegmentInfo_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSegmentInfo'

func (*MockBroker_GetSegmentInfo_Call) Return

func (*MockBroker_GetSegmentInfo_Call) Run

func (*MockBroker_GetSegmentInfo_Call) RunAndReturn

type MockBroker_ListIndexes_Call

type MockBroker_ListIndexes_Call struct {
	*mock.Call
}

MockBroker_ListIndexes_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListIndexes'

func (*MockBroker_ListIndexes_Call) Return

func (*MockBroker_ListIndexes_Call) Run

func (_c *MockBroker_ListIndexes_Call) Run(run func(ctx context.Context, collectionID int64)) *MockBroker_ListIndexes_Call

func (*MockBroker_ListIndexes_Call) RunAndReturn

type NodeSegDistFilter

type NodeSegDistFilter int64

func (NodeSegDistFilter) AddFilter

func (f NodeSegDistFilter) AddFilter(filter *segDistCriterion)

func (NodeSegDistFilter) Match

func (f NodeSegDistFilter) Match(s *Segment) bool

type Partition

type Partition struct {
	*querypb.PartitionLoadInfo
	LoadPercentage int32
	CreatedAt      time.Time
	UpdatedAt      time.Time
}

func (*Partition) Clone

func (partition *Partition) Clone() *Partition

type Replica

type Replica struct {
	// contains filtered or unexported fields
}

Replica is a immutable type for manipulating replica meta info for replica manager. Performed a copy-on-write strategy to keep the consistency of the replica manager. So only read only operations are allowed on these type.

func NewReplica deprecated

func NewReplica(replica *querypb.Replica, nodes ...typeutil.UniqueSet) *Replica

Deprecated: may break the consistency of ReplicaManager, use `Spawn` of `ReplicaManager` or `newReplica` instead.

func (*Replica) AddRWNode deprecated

func (replica *Replica) AddRWNode(nodes ...int64)

Deprecated: Warning, break the consistency of ReplicaManager, use `SetAvailableNodesInSameCollectionAndRG` in ReplicaManager instead. TODO: removed in future, only for old unittest now.

func (*Replica) ContainRONode

func (replica *Replica) ContainRONode(node int64) bool

ContainRONode checks if the node is in ro nodes of the replica.

func (*Replica) Contains

func (replica *Replica) Contains(node int64) bool

Contains checks if the node is in rw nodes of the replica.

func (*Replica) GetCollectionID

func (replica *Replica) GetCollectionID() typeutil.UniqueID

GetCollectionID returns the collection id of the replica.

func (*Replica) GetID

func (replica *Replica) GetID() typeutil.UniqueID

GetID returns the id of the replica.

func (*Replica) GetNodes

func (replica *Replica) GetNodes() []int64

GetNodes returns the rw nodes of the replica. readonly, don't modify the returned slice.

func (*Replica) GetRONodes

func (replica *Replica) GetRONodes() []int64

GetRONodes returns the ro nodes of the replica. readonly, don't modify the returned slice.

func (*Replica) GetResourceGroup

func (replica *Replica) GetResourceGroup() string

GetResourceGroup returns the resource group name of the replica.

func (*Replica) NodesCount

func (replica *Replica) NodesCount() int

NodesCount returns the count of rw nodes and ro nodes of the replica.

func (*Replica) RONodesCount

func (replica *Replica) RONodesCount() int

RONodesCount returns the count of ro nodes of the replica.

func (*Replica) RWNodesCount

func (replica *Replica) RWNodesCount() int

RWNodesCount returns the count of rw nodes of the replica.

func (*Replica) RangeOverRONodes

func (replica *Replica) RangeOverRONodes(f func(node int64) bool)

RangeOverRONodes iterates over the ro nodes of the replica.

func (*Replica) RangeOverRWNodes

func (replica *Replica) RangeOverRWNodes(f func(node int64) bool)

RangeOverRWNodes iterates over the read and write nodes of the replica.

type ReplicaManager

type ReplicaManager struct {
	// contains filtered or unexported fields
}

func NewReplicaManager

func NewReplicaManager(idAllocator func() (int64, error), catalog metastore.QueryCoordCatalog) *ReplicaManager

func (*ReplicaManager) Get

Get returns the replica by id. Replica should be read-only, do not modify it.

func (*ReplicaManager) GetByCollection

func (m *ReplicaManager) GetByCollection(collectionID typeutil.UniqueID) []*Replica

func (*ReplicaManager) GetByCollectionAndNode

func (m *ReplicaManager) GetByCollectionAndNode(collectionID, nodeID typeutil.UniqueID) *Replica

func (*ReplicaManager) GetByNode

func (m *ReplicaManager) GetByNode(nodeID typeutil.UniqueID) []*Replica

func (*ReplicaManager) GetByResourceGroup

func (m *ReplicaManager) GetByResourceGroup(rgName string) []*Replica

func (*ReplicaManager) GetResourceGroupByCollection

func (m *ReplicaManager) GetResourceGroupByCollection(collection typeutil.UniqueID) typeutil.Set[string]

func (*ReplicaManager) Put deprecated

func (m *ReplicaManager) Put(replicas ...*Replica) error

Deprecated: Warning, break the consistency of ReplicaManager, never use it in non-test code, use Spawn instead.

func (*ReplicaManager) Recover

func (m *ReplicaManager) Recover(collections []int64) error

Recover recovers the replicas for given collections from meta store

func (*ReplicaManager) RecoverNodesInCollection

func (m *ReplicaManager) RecoverNodesInCollection(collectionID typeutil.UniqueID, rgs map[string]typeutil.UniqueSet) error

RecoverNodesInCollection recovers all nodes in collection with latest resource group. Promise a node will be only assigned to one replica in same collection at same time. 1. Move the rw nodes to ro nodes if they are not in related resource group. 2. Add new incoming nodes into the replica if they are not in-used by other replicas of same collection. 3. replicas in same resource group will shared the nodes in resource group fairly.

func (*ReplicaManager) RemoveCollection

func (m *ReplicaManager) RemoveCollection(collectionID typeutil.UniqueID) error

RemoveCollection removes replicas of given collection, returns error if failed to remove replica from KV

func (*ReplicaManager) RemoveNode

func (m *ReplicaManager) RemoveNode(replicaID typeutil.UniqueID, nodes ...typeutil.UniqueID) error

RemoveNode removes the node from all replicas of given collection.

func (*ReplicaManager) Spawn

func (m *ReplicaManager) Spawn(collection int64, replicaNumInRG map[string]int) ([]*Replica, error)

Spawn spawns N replicas at resource group for given collection in ReplicaManager.

func (*ReplicaManager) TransferReplica

func (m *ReplicaManager) TransferReplica(collectionID typeutil.UniqueID, srcRGName string, dstRGName string, replicaNum int) error

TransferReplica transfers N replicas from srcRGName to dstRGName.

type ReplicaSegDistFilter

type ReplicaSegDistFilter struct {
	*Replica
}

func (ReplicaSegDistFilter) AddFilter

func (f ReplicaSegDistFilter) AddFilter(filter *segDistCriterion)

func (*ReplicaSegDistFilter) Match

func (f *ReplicaSegDistFilter) Match(s *Segment) bool

type ResourceGroup

type ResourceGroup struct {
	// contains filtered or unexported fields
}

func NewResourceGroup

func NewResourceGroup(name string, cfg *rgpb.ResourceGroupConfig) *ResourceGroup

NewResourceGroup create resource group.

func NewResourceGroupFromMeta

func NewResourceGroupFromMeta(meta *querypb.ResourceGroup) *ResourceGroup

NewResourceGroupFromMeta create resource group from meta.

func (*ResourceGroup) ContainNode

func (rg *ResourceGroup) ContainNode(id int64) bool

ContainNode return whether resource group contain node.

func (*ResourceGroup) CopyForWrite

func (rg *ResourceGroup) CopyForWrite() *mutableResourceGroup

CopyForWrite return a mutable resource group.

func (*ResourceGroup) GetCapacity

func (rg *ResourceGroup) GetCapacity() int

go:deprecated GetCapacity return resource group capacity.

func (*ResourceGroup) GetConfig

func (rg *ResourceGroup) GetConfig() *rgpb.ResourceGroupConfig

GetConfig return resource group config. Do not change the config directly, use UpdateTxn to update config.

func (*ResourceGroup) GetConfigCloned

func (rg *ResourceGroup) GetConfigCloned() *rgpb.ResourceGroupConfig

GetConfigCloned return a cloned resource group config.

func (*ResourceGroup) GetMeta

func (rg *ResourceGroup) GetMeta() *querypb.ResourceGroup

GetMeta return resource group meta.

func (*ResourceGroup) GetName

func (rg *ResourceGroup) GetName() string

GetName return resource group name.

func (*ResourceGroup) GetNodes

func (rg *ResourceGroup) GetNodes() []int64

GetNodes return nodes of resource group.

func (*ResourceGroup) HasFrom

func (rg *ResourceGroup) HasFrom(rgName string) bool

HasFrom return whether given resource group is in `from` of rg.

func (*ResourceGroup) HasTo

func (rg *ResourceGroup) HasTo(rgName string) bool

HasTo return whether given resource group is in `to` of rg.

func (*ResourceGroup) MeetRequirement

func (rg *ResourceGroup) MeetRequirement() error

MeetRequirement return whether resource group meet requirement. Return error with reason if not meet requirement.

func (*ResourceGroup) MissingNumOfNodes

func (rg *ResourceGroup) MissingNumOfNodes() int

MissingNumOfNodes return lack nodes count. `requests - len(node)`

func (*ResourceGroup) NodeNum

func (rg *ResourceGroup) NodeNum() int

NodeNum return node count of resource group.

func (*ResourceGroup) OversizedNumOfNodes

func (rg *ResourceGroup) OversizedNumOfNodes() int

OversizedNumOfNodes return oversized nodes count. `len(node) - requests`

func (*ResourceGroup) ReachLimitNumOfNodes

func (rg *ResourceGroup) ReachLimitNumOfNodes() int

ReachLimitNumOfNodes return reach limit nodes count. `limits - len(node)`

func (*ResourceGroup) RedundantNumOfNodes

func (rg *ResourceGroup) RedundantNumOfNodes() int

RedundantOfNodes return redundant nodes count. `len(node) - limits`

func (*ResourceGroup) Snapshot

func (rg *ResourceGroup) Snapshot() *ResourceGroup

Snapshot return a snapshot of resource group.

type ResourceManager

type ResourceManager struct {
	// contains filtered or unexported fields
}

func NewResourceManager

func NewResourceManager(catalog metastore.QueryCoordCatalog, nodeMgr *session.NodeManager) *ResourceManager

NewResourceManager is used to create a ResourceManager instance.

func (*ResourceManager) AddResourceGroup

func (rm *ResourceManager) AddResourceGroup(rgName string, cfg *rgpb.ResourceGroupConfig) error

AddResourceGroup create a new ResourceGroup. Do no changed with node, all node will be reassign to new resource group by auto recover.

func (*ResourceManager) AssignPendingIncomingNode

func (rm *ResourceManager) AssignPendingIncomingNode()

AssignPendingIncomingNode assign incoming node to resource group.

func (*ResourceManager) AutoRecoverResourceGroup

func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) error

AutoRecoverResourceGroup auto recover rg, return recover used node num

func (*ResourceManager) CheckIncomingNodeNum

func (rm *ResourceManager) CheckIncomingNodeNum() int

CheckIncomingNodeNum return incoming node num.

func (*ResourceManager) ContainResourceGroup

func (rm *ResourceManager) ContainResourceGroup(rgName string) bool

ContainResourceGroup return whether given resource group is exist.

func (*ResourceManager) ContainsNode

func (rm *ResourceManager) ContainsNode(rgName string, node int64) bool

ContainsNode return whether given node is in given resource group.

func (*ResourceManager) GetNodes

func (rm *ResourceManager) GetNodes(rgName string) ([]int64, error)

GetNodes return nodes of given resource group.

func (*ResourceManager) GetNodesOfMultiRG

func (rm *ResourceManager) GetNodesOfMultiRG(rgName []string) (map[string]typeutil.UniqueSet, error)

GetNodesOfMultiRG return nodes of multi rg, it can be used to get a consistent view of nodes of multi rg.

func (*ResourceManager) GetOutgoingNodeNumByReplica

func (rm *ResourceManager) GetOutgoingNodeNumByReplica(replica *Replica) map[string]int32

GetOutgoingNodeNumByReplica return outgoing node num on each rg from this replica.

func (*ResourceManager) GetResourceGroup

func (rm *ResourceManager) GetResourceGroup(rgName string) *ResourceGroup

GetResourceGroup return resource group snapshot by name.

func (*ResourceManager) HandleNodeDown

func (rm *ResourceManager) HandleNodeDown(node int64)

HandleNodeDown handle the node when node is leave.

func (*ResourceManager) HandleNodeUp

func (rm *ResourceManager) HandleNodeUp(node int64)

HandleNodeUp handle node when new node is incoming.

func (*ResourceManager) ListResourceGroups

func (rm *ResourceManager) ListResourceGroups() []string

ListResourceGroups return all resource groups names.

func (*ResourceManager) ListenNodeChanged

func (rm *ResourceManager) ListenNodeChanged() *syncutil.VersionedListener

ListenNodeChanged return a listener for node changed.

func (*ResourceManager) ListenResourceGroupChanged

func (rm *ResourceManager) ListenResourceGroupChanged() *syncutil.VersionedListener

ListenResourceGroupChanged return a listener for resource group changed.

func (*ResourceManager) MeetRequirement

func (rm *ResourceManager) MeetRequirement(rgName string) error

MeetRequirement return whether resource group meet requirement. Return error with reason if not meet requirement.

func (*ResourceManager) Recover

func (rm *ResourceManager) Recover() error

Recover recover resource group from meta, other interface of ResourceManager can be only called after recover is done.

func (*ResourceManager) RemoveAllDownNode

func (rm *ResourceManager) RemoveAllDownNode()

RemoveAllDownNode remove all down node from resource group.

func (*ResourceManager) RemoveResourceGroup

func (rm *ResourceManager) RemoveResourceGroup(rgName string) error

RemoveResourceGroup remove resource group.

func (*ResourceManager) TransferNode

func (rm *ResourceManager) TransferNode(sourceRGName string, targetRGName string, nodeNum int) error

go:deprecated TransferNode transfer node from source resource group to target resource group. Deprecated, use Declarative API `UpdateResourceGroups` instead.

func (*ResourceManager) UpdateResourceGroups

func (rm *ResourceManager) UpdateResourceGroups(rgs map[string]*rgpb.ResourceGroupConfig) error

UpdateResourceGroups update resource group configuration. Only change the configuration, no change with node. all node will be reassign by auto recover.

type Segment

type Segment struct {
	*datapb.SegmentInfo
	Node               int64                             // Node the segment is in
	Version            int64                             // Version is the timestamp of loading segment
	LastDeltaTimestamp uint64                            // The timestamp of the last delta record
	IndexInfo          map[int64]*querypb.FieldIndexInfo // index info of loaded segment
}

func SegmentFromInfo

func SegmentFromInfo(info *datapb.SegmentInfo) *Segment

func (*Segment) Clone

func (segment *Segment) Clone() *Segment

type SegmentDistFilter

type SegmentDistFilter interface {
	Match(s *Segment) bool
	AddFilter(*segDistCriterion)
}

func WithChannel

func WithChannel(channelName string) SegmentDistFilter

func WithCollectionID

func WithCollectionID(collectionID typeutil.UniqueID) SegmentDistFilter

func WithNodeID

func WithNodeID(nodeID int64) SegmentDistFilter

func WithReplica

func WithReplica(replica *Replica) SegmentDistFilter

func WithSegmentID

func WithSegmentID(segmentID int64) SegmentDistFilter

type SegmentDistFilterFunc

type SegmentDistFilterFunc func(s *Segment) bool

func (SegmentDistFilterFunc) AddFilter

func (f SegmentDistFilterFunc) AddFilter(filter *segDistCriterion)

func (SegmentDistFilterFunc) Match

func (f SegmentDistFilterFunc) Match(s *Segment) bool

type SegmentDistManager

type SegmentDistManager struct {
	// contains filtered or unexported fields
}

func NewSegmentDistManager

func NewSegmentDistManager() *SegmentDistManager

func (*SegmentDistManager) GetByFilter

func (m *SegmentDistManager) GetByFilter(filters ...SegmentDistFilter) []*Segment

GetByFilter return segment list which match all given filters

func (*SegmentDistManager) GetSegmentDist

func (m *SegmentDistManager) GetSegmentDist(segmentID int64) []int64

return node list which contains the given segmentID

func (*SegmentDistManager) Update

func (m *SegmentDistManager) Update(nodeID typeutil.UniqueID, segments ...*Segment)

type TargetManager

type TargetManager struct {
	// contains filtered or unexported fields
}

func NewTargetManager

func NewTargetManager(broker Broker, meta *Meta) *TargetManager

func (*TargetManager) GetCollectionTargetVersion

func (mgr *TargetManager) GetCollectionTargetVersion(collectionID int64, scope TargetScope) int64

func (*TargetManager) GetDmChannel

func (mgr *TargetManager) GetDmChannel(collectionID int64, channel string, scope TargetScope) *DmChannel

func (*TargetManager) GetDmChannelsByCollection

func (mgr *TargetManager) GetDmChannelsByCollection(collectionID int64, scope TargetScope) map[string]*DmChannel

func (*TargetManager) GetDroppedSegmentsByChannel

func (mgr *TargetManager) GetDroppedSegmentsByChannel(collectionID int64,
	channelName string,
	scope TargetScope,
) []int64

func (*TargetManager) GetGrowingSegmentsByChannel

func (mgr *TargetManager) GetGrowingSegmentsByChannel(collectionID int64,
	channelName string,
	scope TargetScope,
) typeutil.UniqueSet

func (*TargetManager) GetGrowingSegmentsByCollection

func (mgr *TargetManager) GetGrowingSegmentsByCollection(collectionID int64,
	scope TargetScope,
) typeutil.UniqueSet

func (*TargetManager) GetSealedSegment

func (mgr *TargetManager) GetSealedSegment(collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo

func (*TargetManager) GetSealedSegmentsByChannel

func (mgr *TargetManager) GetSealedSegmentsByChannel(collectionID int64,
	channelName string,
	scope TargetScope,
) map[int64]*datapb.SegmentInfo

func (*TargetManager) GetSealedSegmentsByCollection

func (mgr *TargetManager) GetSealedSegmentsByCollection(collectionID int64,
	scope TargetScope,
) map[int64]*datapb.SegmentInfo

func (*TargetManager) GetSealedSegmentsByPartition

func (mgr *TargetManager) GetSealedSegmentsByPartition(collectionID int64,
	partitionID int64,
	scope TargetScope,
) map[int64]*datapb.SegmentInfo

func (*TargetManager) IsCurrentTargetExist

func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64) bool

func (*TargetManager) IsNextTargetExist

func (mgr *TargetManager) IsNextTargetExist(collectionID int64) bool

func (*TargetManager) PullNextTargetV1

func (mgr *TargetManager) PullNextTargetV1(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)

func (*TargetManager) PullNextTargetV2

func (mgr *TargetManager) PullNextTargetV2(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)

func (*TargetManager) Recover

func (mgr *TargetManager) Recover(catalog metastore.QueryCoordCatalog) error

func (*TargetManager) RemoveCollection

func (mgr *TargetManager) RemoveCollection(collectionID int64)

RemoveCollection removes all channels and segments in the given collection

func (*TargetManager) RemovePartition

func (mgr *TargetManager) RemovePartition(collectionID int64, partitionIDs ...int64)

RemovePartition removes all segment in the given partition, NOTE: this doesn't remove any channel even the given one is the only partition

func (*TargetManager) SaveCurrentTarget

func (mgr *TargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog)

func (*TargetManager) UpdateCollectionCurrentTarget

func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64) bool

UpdateCollectionCurrentTarget updates the current target to next target, WARN: DO NOT call this method for an existing collection as target observer running, or it will lead to a double-update, which may make the current target not available

func (*TargetManager) UpdateCollectionNextTarget

func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error

UpdateCollectionNextTarget updates the next target with new target pulled from DataCoord, WARN: DO NOT call this method for an existing collection as target observer running, or it will lead to a double-update, which may make the current target not available

type TargetScope

type TargetScope = int32
const (
	CurrentTarget TargetScope = iota + 1
	NextTarget
	CurrentTargetFirst
	NextTargetFirst
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL