rkcy

package
v0.0.0-...-160e8be Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2022 License: MPL-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CREATE       = "Create"
	READ         = "Read"
	UPDATE       = "Update"
	UPDATE_ASYNC = "UpdateAsync"
	DELETE       = "Delete"

	VALIDATE_CREATE = "ValidateCreate"
	VALIDATE_UPDATE = "ValidateUpdate"

	REFRESH_INSTANCE = "RefreshInstance"
	FLUSH_INSTANCE   = "FlushInstance"

	REQUEST_RELATED = "RequestRelated"
	REFRESH_RELATED = "RefreshRelated"
)
View Source
const (
	RKCY string = "rkcy"

	DIRECTIVE_HEADER    string = "rkcy-directive"
	TRACE_PARENT_HEADER string = "traceparent"

	RKCY_TOPIC     string = "rkcy.topic"
	RKCY_PARTITION string = "rkcy.partition"

	MAX_PARTITION                  int32 = 1024
	PLATFORM_TOPIC_RETENTION_BYTES int32 = 10 * 1024 * 1024
)
View Source
const (
	ADMIN        StandardTopicName = "admin"
	PROCESS                        = "process"
	ERROR                          = "error"
	COMPLETE                       = "complete"
	STORAGE                        = "storage"
	STORAGE_SCND                   = "storage-scnd"
)

Variables

This section is empty.

Functions

func ApecsTxnCurrentStep

func ApecsTxnCurrentStep(txn *rkcypb.ApecsTxn) *rkcypb.ApecsTxn_Step

func BuildFullTopicName

func BuildFullTopicName(platformName string, environment string, concernName string, concernType rkcypb.Concern_Type, name string, generation int32) string

func BuildTopicName

func BuildTopicName(topicNamePrefix string, name string, generation int32) string

func BuildTopicNamePrefix

func BuildTopicNamePrefix(platformName string, environment string, concernName string, concernType rkcypb.Concern_Type) string

func BytesToInt

func BytesToInt(arr []byte) int

func ComplexConfigUnmarshal

func ComplexConfigUnmarshal(msgType string, b []byte) (proto.Message, error)

func ComplexConfigUnmarshalJson

func ComplexConfigUnmarshalJson(msgType string, b []byte) (proto.Message, error)

func ConfigToJson

func ConfigToJson(conf *Config) (string, error)

func ConfigTopic

func ConfigTopic(platformName string, environment string) string

func ConsumersTopic

func ConsumersTopic(platformName string, environment string) string

func Contains

func Contains(slice []string, item string) bool

func CreatePlatformTopics

func CreatePlatformTopics(
	ctx context.Context,
	strmprov StreamProvider,
	platform string,
	environment string,
	adminBrokers string,
) error

func DecodeInstance

func DecodeInstance(cncHdlrs ConcernHandlers, concern string, payload64 string)

func GetDirective

func GetDirective(msg *kafka.Message) rkcypb.Directive

func GetTraceId

func GetTraceId(msg *kafka.Message) string

func GetTraceParent

func GetTraceParent(msg *kafka.Message) string

func IntToBytes

func IntToBytes(num int) []byte

func IsACETopic

func IsACETopic(topic string) bool

func IsInstanceStoreStep

func IsInstanceStoreStep(step *rkcypb.ApecsTxn_Step) bool

func IsKeylessStep

func IsKeylessStep(step *rkcypb.ApecsTxn_Step) bool

func IsPackedPayload

func IsPackedPayload(payload []byte) bool

func IsPlatformCommand

func IsPlatformCommand(cmd string) bool

func IsReservedCommand

func IsReservedCommand(cmd string) bool

func IsStorageSystem

func IsStorageSystem(system rkcypb.System) bool

func IsTxnProhibitedCommand

func IsTxnProhibitedCommand(cmd string) bool

func IsValidName

func IsValidName(name string) bool

func LogProto

func LogProto(msg proto.Message)

func LogResult

func LogResult(rslt *rkcypb.ApecsTxn_Step_Result, sev rkcypb.Severity, format string, args ...interface{})

func LogResultDebug

func LogResultDebug(rslt *rkcypb.ApecsTxn_Step_Result, format string, args ...interface{})

func LogResultError

func LogResultError(rslt *rkcypb.ApecsTxn_Step_Result, format string, args ...interface{})

func LogResultInfo

func LogResultInfo(rslt *rkcypb.ApecsTxn_Step_Result, format string, args ...interface{})

func LogResultWarn

func LogResultWarn(rslt *rkcypb.ApecsTxn_Step_Result, format string, args ...interface{})

func Maxi

func Maxi(x, y int) int

func Maxi32

func Maxi32(x, y int32) int32

func Maxi64

func Maxi64(x, y int64) int64

func Mini

func Mini(x, y int) int

func Mini32

func Mini32(x, y int32) int32

func Mini64

func Mini64(x, y int64) int64

func NewApecsTxn

func NewApecsTxn(
	txnId string,
	assocTxn *rkcypb.AssocTxn,
	respTarget *rkcypb.TopicTarget,
	uponError rkcypb.UponError,
	steps []*rkcypb.ApecsTxn_Step,
) (*rkcypb.ApecsTxn, error)

func NewKafkaMessage

func NewKafkaMessage(
	topic *string,
	partition int32,
	value proto.Message,
	directive rkcypb.Directive,
	traceParent string,
) (*kafka.Message, error)

func NewSpanId

func NewSpanId() string

func NewTraceId

func NewTraceId() string

func OffsetGT

func OffsetGT(lhs *rkcypb.CompoundOffset, rhs *rkcypb.CompoundOffset) bool

func OffsetGTE

func OffsetGTE(lhs *rkcypb.CompoundOffset, rhs *rkcypb.CompoundOffset) bool

func PackPayloads

func PackPayloads(payload0 []byte, payload1 []byte) []byte

func ParsePayload

func ParsePayload(payload []byte) ([]byte, []byte, error)

func PlatformTopic

func PlatformTopic(platformName string, environment string) string

func PrepLogging

func PrepLogging()

func PrintKafkaLogs

func PrintKafkaLogs(ctx context.Context, kafkaLogCh <-chan kafka.LogEvent)

func ProducersTopic

func ProducersTopic(platformName string, environment string) string

func ProgKey

func ProgKey(prog *rkcypb.Program) string

func RegisterComplexConfigHandler

func RegisterComplexConfigHandler(msgType string, handler ComplexConfigHandler)

func RegisterGlobalConcernHandlerNewFunc

func RegisterGlobalConcernHandlerNewFunc(newCncHdlr func() ConcernHandler)

func RegisterHashFunc

func RegisterHashFunc(name string, hashFunc HashFunc)

func StandardHeaders

func StandardHeaders(directive rkcypb.Directive, traceParent string) []kafka.Header

func StepSystemName

func StepSystemName(step *rkcypb.ApecsTxn_Step) string

func StorageInitNoop

func StorageInitNoop(
	ctx context.Context,
	wg *sync.WaitGroup,
	config map[string]string,
) error

func TraceIdFromTraceParent

func TraceIdFromTraceParent(traceParent string) string

func TraceParentIsValid

func TraceParentIsValid(traceParent string) bool

func TraceParentParts

func TraceParentParts(traceParent string) []string

func TxnDirectionName

func TxnDirectionName(txn *rkcypb.ApecsTxn) string

func UncommittedGroupName

func UncommittedGroupName(topic string, partition int) string

func UncommittedGroupNameAllPartitions

func UncommittedGroupNameAllPartitions(topic string) string

func UnpackPayloads

func UnpackPayloads(packed []byte) ([]byte, []byte, error)

func UpdateTopics

func UpdateTopics(
	ctx context.Context,
	strmprov StreamProvider,
	platDef *rkcypb.PlatformDef,
) error

func ValidateTxn

func ValidateTxn(txn *Txn) error

Types

type AdminClient

type AdminClient interface {
	GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)
	CreateTopics(
		ctx context.Context,
		topics []kafka.TopicSpecification,
		options ...kafka.CreateTopicsAdminOption,
	) ([]kafka.TopicResult, error)
	Close()
}

type ClientCode

type ClientCode struct {
	StorageInits    map[string]StorageInit
	ConcernHandlers ConcernHandlers
}

func NewClientCode

func NewClientCode() *ClientCode

func (*ClientCode) AddCrudHandler

func (clientCode *ClientCode) AddCrudHandler(concern string, storageType string, handler interface{})

func (*ClientCode) AddLogicHandler

func (clientCode *ClientCode) AddLogicHandler(concern string, handler interface{})

func (*ClientCode) AddStorageInit

func (clientCode *ClientCode) AddStorageInit(storageType string, storageInit StorageInit)

func (*ClientCode) UpdateStorageTargets

func (clientCode *ClientCode) UpdateStorageTargets(storageTargets map[string]*rkcypb.StorageTarget)

type ComplexConfigHandler

type ComplexConfigHandler interface {
	GetKey(proto.Message) string
	Unmarshal(b []byte) (proto.Message, error)
	UnmarshalJson(b []byte) (proto.Message, error)
}

type ConcernHandler

type ConcernHandler interface {
	ConcernName() string
	HandleLogicCommand(
		ctx context.Context,
		system rkcypb.System,
		command string,
		direction rkcypb.Direction,
		args *StepArgs,
		instanceStore *InstanceStore,
		confRdr ConfigRdr,
	) (*rkcypb.ApecsTxn_Step_Result, []*rkcypb.ApecsTxn_Step)
	HandleCrudCommand(
		ctx context.Context,
		wg *sync.WaitGroup,
		system rkcypb.System,
		command string,
		direction rkcypb.Direction,
		args *StepArgs,
		storageType string,
	) (*rkcypb.ApecsTxn_Step_Result, []*rkcypb.ApecsTxn_Step)
	DecodeInstance(ctx context.Context, buffer []byte) (*ResultProto, error)
	DecodeArg(ctx context.Context, system rkcypb.System, command string, buffer []byte) (*ResultProto, error)
	DecodeResult(ctx context.Context, system rkcypb.System, command string, buffer []byte) (*ResultProto, error)
	DecodeRelatedRequest(ctx context.Context, relReq *rkcypb.RelatedRequest) (*ResultProto, error)

	SetLogicHandler(commands interface{}) error
	SetCrudHandler(storageType string, commands interface{}) error
	ValidateHandlers() bool
	SetStorageTargets(storageTargets map[string]*StorageTargetInit)
}

type ConcernHandlers

type ConcernHandlers map[string]ConcernHandler

func NewConcernHandlers

func NewConcernHandlers() ConcernHandlers

func NewGlobalConcernHandlerRegistry

func NewGlobalConcernHandlerRegistry() ConcernHandlers

func (ConcernHandlers) DecodeArgPayload

func (concernHandlers ConcernHandlers) DecodeArgPayload(
	ctx context.Context,
	concern string,
	system rkcypb.System,
	command string,
	buffer []byte,
) (*ResultProto, ConcernHandler, error)

func (ConcernHandlers) DecodeArgPayload64

func (concernHandlers ConcernHandlers) DecodeArgPayload64(
	ctx context.Context,
	concern string,
	system rkcypb.System,
	command string,
	buffer64 string,
) (*ResultProto, ConcernHandler, error)

func (ConcernHandlers) DecodeArgPayload64Json

func (concernHandlers ConcernHandlers) DecodeArgPayload64Json(
	ctx context.Context,
	concern string,
	system rkcypb.System,
	command string,
	buffer64 string,
) ([]byte, error)

func (ConcernHandlers) DecodeArgPayloadJson

func (concernHandlers ConcernHandlers) DecodeArgPayloadJson(
	ctx context.Context,
	concern string,
	system rkcypb.System,
	command string,
	buffer []byte,
) ([]byte, error)

func (ConcernHandlers) DecodeInstance

func (concernHandlers ConcernHandlers) DecodeInstance(ctx context.Context, concern string, buffer []byte) (*ResultProto, error)

func (ConcernHandlers) DecodeInstance64

func (concernHandlers ConcernHandlers) DecodeInstance64(ctx context.Context, concern string, buffer64 string) (*ResultProto, error)

func (ConcernHandlers) DecodeInstance64Json

func (concernHandlers ConcernHandlers) DecodeInstance64Json(ctx context.Context, concern string, buffer64 string) ([]byte, error)

func (ConcernHandlers) DecodeInstanceJson

func (concernHandlers ConcernHandlers) DecodeInstanceJson(ctx context.Context, concern string, buffer []byte) ([]byte, error)

func (ConcernHandlers) DecodeResultPayload

func (concernHandlers ConcernHandlers) DecodeResultPayload(
	ctx context.Context,
	concern string,
	system rkcypb.System,
	command string,
	buffer []byte,
) (*ResultProto, ConcernHandler, error)

func (ConcernHandlers) DecodeResultPayload64

func (concernHandlers ConcernHandlers) DecodeResultPayload64(
	ctx context.Context,
	concern string,
	system rkcypb.System,
	command string,
	buffer64 string,
) (*ResultProto, ConcernHandler, error)

func (ConcernHandlers) DecodeResultPayload64Json

func (concernHandlers ConcernHandlers) DecodeResultPayload64Json(
	ctx context.Context,
	concern string,
	system rkcypb.System,
	command string,
	buffer64 string,
) ([]byte, error)

func (ConcernHandlers) DecodeResultPayloadJson

func (concernHandlers ConcernHandlers) DecodeResultPayloadJson(
	ctx context.Context,
	concern string,
	system rkcypb.System,
	command string,
	buffer []byte,
) ([]byte, error)

func (ConcernHandlers) RegisterCrudHandler

func (concernHandlers ConcernHandlers) RegisterCrudHandler(concern string, storageType string, handler interface{})

func (ConcernHandlers) RegisterLogicHandler

func (concernHandlers ConcernHandlers) RegisterLogicHandler(concern string, handler interface{})

func (ConcernHandlers) ValidateConcernHandlers

func (concernHandlers ConcernHandlers) ValidateConcernHandlers() bool

type Config

type Config struct {
	rkcypb.Config
}

func JsonToConfig

func JsonToConfig(data []byte) (*Config, error)

func NewConfig

func NewConfig() *Config

func (*Config) GetBool

func (conf *Config) GetBool(key string) (bool, bool)

func (*Config) GetComplexBytes

func (conf *Config) GetComplexBytes(msgType string, key string) ([]byte, bool)

func (*Config) GetComplexMsg

func (conf *Config) GetComplexMsg(msgType string, key string) (proto.Message, bool)

func (*Config) GetFloat64

func (conf *Config) GetFloat64(key string) (float64, bool)

func (*Config) GetString

func (conf *Config) GetString(key string) (string, bool)

func (*Config) SetBool

func (conf *Config) SetBool(key string, val bool)

func (*Config) SetComplexBytes

func (conf *Config) SetComplexBytes(msgType string, key string, val []byte)

func (*Config) SetComplexMsg

func (conf *Config) SetComplexMsg(msgType string, key string, msg proto.Message)

func (*Config) SetFloat64

func (conf *Config) SetFloat64(key string, val float64)

func (*Config) SetString

func (conf *Config) SetString(key string, val string)

type ConfigRdr

type ConfigRdr interface {
	GetString(key string) (string, bool)
	GetBool(key string) (bool, bool)
	GetFloat64(key string) (float64, bool)
	GetComplexMsg(msgType string, key string) (proto.Message, bool)
	GetComplexBytes(msgType string, key string) ([]byte, bool)

	BuildConfigResponse() *rkcypb.ConfigReadResponse
}

type Consumer

type Consumer interface {
	Assign(partitions []kafka.TopicPartition) error
	Close() error
	Commit() ([]kafka.TopicPartition, error)
	CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error)
	QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (int64, int64, error)
	ReadMessage(timeout time.Duration) (*kafka.Message, error)
	StoreOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error)
}

type Error

type Error struct {
	Code rkcypb.Code
	Msg  string
}

func NewError

func NewError(code rkcypb.Code, msg string) *Error

func (*Error) Error

func (e *Error) Error() string

type Fnv64HashFunc

type Fnv64HashFunc struct {
	// contains filtered or unexported fields
}

func NewFnv64HashFunc

func NewFnv64HashFunc() *Fnv64HashFunc

func (*Fnv64HashFunc) Hash

func (fnv *Fnv64HashFunc) Hash(val []byte, count int32) int32

type HashFunc

type HashFunc interface {
	Hash(val []byte, count int32) int32
}

func GetHashFunc

func GetHashFunc(name string) HashFunc

type InstanceRecord

type InstanceRecord struct {
	Instance   []byte
	Related    []byte
	CmpdOffset *rkcypb.CompoundOffset
	LastAccess time.Time
}

type InstanceStore

type InstanceStore struct {
	// contains filtered or unexported fields
}

func NewInstanceStore

func NewInstanceStore() *InstanceStore

func (*InstanceStore) Get

func (instStore *InstanceStore) Get(key string) *InstanceRecord

func (*InstanceStore) GetInstance

func (instStore *InstanceStore) GetInstance(key string) []byte

func (*InstanceStore) GetPacked

func (instStore *InstanceStore) GetPacked(key string) []byte

func (*InstanceStore) GetRelated

func (instStore *InstanceStore) GetRelated(key string) []byte

func (*InstanceStore) Remove

func (instStore *InstanceStore) Remove(key string)

func (*InstanceStore) Set

func (instStore *InstanceStore) Set(key string, instance []byte, related []byte, cmpdOffset *rkcypb.CompoundOffset)

func (*InstanceStore) SetInstance

func (instStore *InstanceStore) SetInstance(key string, instance []byte, cmpdOffset *rkcypb.CompoundOffset)

func (*InstanceStore) SetRelated

func (instStore *InstanceStore) SetRelated(key string, related []byte, cmpdOffset *rkcypb.CompoundOffset) error

type PlatformArgs

type PlatformArgs struct {
	Id                string
	Platform          string
	Environment       string
	AdminBrokers      string
	AdminPingInterval time.Duration
}

type PlatformDiff

type PlatformDiff struct {
	ProgsToStop  []*rkcypb.Program
	ProgsToStart []*rkcypb.Program
}

type Producer

type Producer interface {
	Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error
	Close()
	Events() chan kafka.Event
	Flush(timeoutMs int) int
}

type ProducerCh

type ProducerCh chan *kafka.Message

type ResultProto

type ResultProto struct {
	Type     string
	Instance proto.Message
	Related  proto.Message
}

func ApecsTxnResult

func ApecsTxnResult(
	ctx context.Context,
	cncHdlrs ConcernHandlers,
	txn *rkcypb.ApecsTxn,
) (bool, *ResultProto, *rkcypb.ApecsTxn_Step_Result)

type RevertType

type RevertType int
const (
	REVERTABLE     RevertType = 0
	NON_REVERTABLE            = 1
)

type RtApecsTxn

type RtApecsTxn struct {
	Txn         *rkcypb.ApecsTxn
	TraceParent string
}

func NewRtApecsTxn

func NewRtApecsTxn(txn *rkcypb.ApecsTxn, traceParent string) (*RtApecsTxn, error)

func (*RtApecsTxn) AdvanceStepIdx

func (rtxn *RtApecsTxn) AdvanceStepIdx() bool

func (*RtApecsTxn) CanAdvance

func (rtxn *RtApecsTxn) CanAdvance() bool

func (*RtApecsTxn) CurrentStep

func (rtxn *RtApecsTxn) CurrentStep() *rkcypb.ApecsTxn_Step

func (*RtApecsTxn) FirstForwardStep

func (rtxn *RtApecsTxn) FirstForwardStep() *rkcypb.ApecsTxn_Step

func (*RtApecsTxn) InsertSteps

func (rtxn *RtApecsTxn) InsertSteps(idx int32, steps ...*rkcypb.ApecsTxn_Step) error

func (*RtApecsTxn) PreviousStep

func (rtxn *RtApecsTxn) PreviousStep() *rkcypb.ApecsTxn_Step

func (*RtApecsTxn) ReplaceStep

func (rtxn *RtApecsTxn) ReplaceStep(idx int32, step *rkcypb.ApecsTxn_Step) error

func (*RtApecsTxn) Validate

func (rtxn *RtApecsTxn) Validate() error

type RtConcern

type RtConcern struct {
	Concern *rkcypb.Concern
	Topics  map[string]*RtTopics
}

type RtPlatformDef

type RtPlatformDef struct {
	PlatformDef          *rkcypb.PlatformDef
	Hash                 string
	Concerns             map[string]*RtConcern
	DefaultResponseTopic *RtTopics
	Clusters             map[string]*rkcypb.Cluster
	AdminCluster         *rkcypb.Cluster
	StorageTargets       map[string]*rkcypb.StorageTarget
	PrimaryStorageTarget string
}

func NewRtPlatformDef

func NewRtPlatformDef(platDef *rkcypb.PlatformDef, platformName string, environment string) (*RtPlatformDef, error)

func NewRtPlatformDefFromJson

func NewRtPlatformDefFromJson(platDefJson []byte) (*RtPlatformDef, error)

func (*RtPlatformDef) Diff

func (lhs *RtPlatformDef) Diff(rhs *RtPlatformDef, streamType string, adminBrokers string, otelcolEndpoint string) *PlatformDiff

type RtTopics

type RtTopics struct {
	Topics                     *rkcypb.Concern_Topics
	CurrentTopic               string
	CurrentTopicPartitionCount int32
	CurrentCluster             *rkcypb.Cluster
	FutureTopic                string
	FutureTopicPartitionCount  int32
	FutureCluster              *rkcypb.Cluster
}

type StandardTopicName

type StandardTopicName string

type Step

type Step struct {
	Concern       string
	Command       string
	Key           string
	Payload       proto.Message
	EffectiveTime time.Time
}

type StepArgs

type StepArgs struct {
	TxnId         string
	Key           string
	Instance      []byte
	Payload       []byte
	EffectiveTime time.Time

	CmpdOffset    *rkcypb.CompoundOffset
	ForwardResult *rkcypb.ApecsTxn_Step_Result
}

type StorageInit

type StorageInit func(ctx context.Context, wg *sync.WaitGroup, config map[string]string) error

type StorageTargetInit

type StorageTargetInit struct {
	*rkcypb.StorageTarget
	Init StorageInit
}

type StreamProvider

type StreamProvider interface {
	Type() string
	NewConsumer(brokers string, groupName string, logCh chan kafka.LogEvent) (Consumer, error)
	NewProducer(brokers string, logCh chan kafka.LogEvent) (Producer, error)
	NewAdminClient(brokers string) (AdminClient, error)
}

type TopicParts

type TopicParts struct {
	FullTopic   string
	Platform    string
	Environment string
	Concern     string
	Topic       string
	System      rkcypb.System
	ConcernType rkcypb.Concern_Type
	Generation  int32
}

func ParseFullTopicName

func ParseFullTopicName(fullTopic string) (*TopicParts, error)

type Txn

type Txn struct {
	Revert RevertType
	Steps  []Step
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL