meta

package
v0.0.0-...-8aeb8a1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2023 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DestinationNamespace = "destination"
	SourceNamespace      = "source"

	//536-issue DEPRECATED
	//instead of this name - all sources will be in SourceNamespace and for push/pull events different keys will be selected
	PushSourceNamespace = "push_source"

	PushEventType = "push"
	PullEventType = "pull"

	SuccessStatus = "success"
	ErrorStatus   = "errors"
	SkipStatus    = "skip"

	ConfigPrefix = "config#"
	SystemKey    = "system"

	EventsTokenNamespace       = "token"
	EventsDestinationNamespace = "destination"
	EventsErrorStatus          = "error"
	EventsPureStatus           = ""
)
View Source
const (
	DummyType = "Dummy"
	RedisType = "Redis"
)

Variables

View Source
var DefaultOptions = Options{
	DefaultDialConnectTimeout: 10 * time.Second,
	DefaultDialReadTimeout:    10 * time.Second,
	DefaultDialWriteTimeout:   10 * time.Second,

	MaxIdle:     10,
	MaxActive:   600,
	IdleTimeout: 240 * time.Second,
	PingTimeout: 30 * time.Second,
}

DefaultOptions for Redis Pool

View Source
var (
	ErrTaskNotFound = errors.New("Sync task wasn't found")
)

Functions

This section is empty.

Types

type Dummy

type Dummy struct {
}

func (*Dummy) AddEvent

func (d *Dummy) AddEvent(namespace, id, status string, entity *Event) error

func (*Dummy) AppendTaskLog

func (d *Dummy) AppendTaskLog(taskID string, now time.Time, system, message, level string) error

func (*Dummy) Close

func (d *Dummy) Close() error

func (*Dummy) CreateTask

func (d *Dummy) CreateTask(sourceID, collection string, task *Task, createdAt time.Time) error

func (*Dummy) DeleteSignature

func (d *Dummy) DeleteSignature(sourceID, collection string) error

func (*Dummy) GetAllTaskIDs

func (d *Dummy) GetAllTaskIDs(sourceID, collection string, descendingOrder bool) ([]string, error)

func (*Dummy) GetAllTasks

func (d *Dummy) GetAllTasks(sourceID, collection string, from, to time.Time, limit int) ([]Task, error)

func (*Dummy) GetAllTasksForInitialHeartbeat

func (d *Dummy) GetAllTasksForInitialHeartbeat(runningStatus, scheduledStatus string, lastActivityThreshold time.Duration) ([]string, error)

func (*Dummy) GetAllTasksHeartBeat

func (d *Dummy) GetAllTasksHeartBeat() (map[string]string, error)

func (*Dummy) GetEvents

func (d *Dummy) GetEvents(namespace, id, status string, limit int) ([]Event, error)

func (*Dummy) GetEventsWithGranularity

func (d *Dummy) GetEventsWithGranularity(namespace, status, eventType string, ids []string, start, end time.Time, granularity Granularity) ([]EventsPerTime, error)

func (*Dummy) GetLastTask

func (d *Dummy) GetLastTask(sourceID, collection string, offset int) (*Task, error)

func (*Dummy) GetOrCreateClusterID

func (d *Dummy) GetOrCreateClusterID() string

func (*Dummy) GetProjectDestinationIDs

func (d *Dummy) GetProjectDestinationIDs(projectID string) ([]string, error)

func (*Dummy) GetProjectPushSourceIDs

func (d *Dummy) GetProjectPushSourceIDs(projectID string) ([]string, error)

func (*Dummy) GetProjectSourceIDs

func (d *Dummy) GetProjectSourceIDs(projectID string) ([]string, error)

func (*Dummy) GetSignature

func (d *Dummy) GetSignature(sourceID, collection, interval string) (string, error)

func (*Dummy) GetTask

func (d *Dummy) GetTask(taskID string) (*Task, error)

func (*Dummy) GetTaskLogs

func (d *Dummy) GetTaskLogs(taskID string, from, to time.Time) ([]TaskLogRecord, error)

func (*Dummy) GetTotalEvents

func (d *Dummy) GetTotalEvents(namespace, id, status string) (int, error)

func (*Dummy) IncrementEventsCount

func (d *Dummy) IncrementEventsCount(id, namespace, eventType, status string, now time.Time, value int64) error

func (*Dummy) PollTask

func (d *Dummy) PollTask() (*Task, error)

func (*Dummy) PushTask

func (d *Dummy) PushTask(task *Task) error

task queue

func (*Dummy) RemoveTaskFromHeartBeat

func (d *Dummy) RemoveTaskFromHeartBeat(taskID string) error

func (*Dummy) RemoveTasks

func (d *Dummy) RemoveTasks(sourceID, collection string, taskIDs ...string) (int, error)

func (*Dummy) SaveSignature

func (d *Dummy) SaveSignature(sourceID, collection, interval, signature string) error

func (*Dummy) TaskHeartBeat

func (d *Dummy) TaskHeartBeat(taskID string) error

func (*Dummy) TrimEvents

func (d *Dummy) TrimEvents(namespace, id, status string, capacity int) error

func (*Dummy) Type

func (d *Dummy) Type() string

func (*Dummy) UpdateFinishedTask

func (d *Dummy) UpdateFinishedTask(taskID, status string) error

func (*Dummy) UpdateStartedTask

func (d *Dummy) UpdateStartedTask(taskID, status string) error

type ErrorMetrics

type ErrorMetrics struct {
	// contains filtered or unexported fields
}

func NewErrorMetrics

func NewErrorMetrics(metricFunc func(string)) *ErrorMetrics

func (*ErrorMetrics) NoticeError

func (em *ErrorMetrics) NoticeError(err error)

type Event

type Event struct {
	Malformed     string `json:"malformed,omitempty" redis:"malformed"`
	Original      string `json:"original,omitempty" redis:"original"`
	Success       string `json:"success,omitempty" redis:"success"`
	Error         string `json:"error,omitempty" redis:"error"`
	Skip          string `json:"skip,omitempty" redis:"skip"`
	Timestamp     string `json:"timestamp,omitempty" redis:"timestamp"`
	UID           string `json:"uid,omitempty" redis:"uid"`
	DestinationID string `json:"destination_id,omitempty" redis:"destination_id"`
	TokenID       string `json:"token_id,omitempty" redis:"token_id"`
}

type EventsPerTime

type EventsPerTime struct {
	Key    string `json:"key"`
	Events int    `json:"events"`
}

type Granularity

type Granularity string

Granularity is used for gathering statistics

const (
	UNKNOWN Granularity = ""

	DAY  Granularity = "day"
	HOUR Granularity = "hour"
)

func GranularityFromString

func GranularityFromString(value string) (Granularity, error)

func (Granularity) String

func (g Granularity) String() string

type Options

type Options struct {
	DefaultDialConnectTimeout time.Duration
	DefaultDialReadTimeout    time.Duration
	DefaultDialWriteTimeout   time.Duration

	MaxIdle     int
	MaxActive   int
	IdleTimeout time.Duration
	PingTimeout time.Duration
}

Options for Redis Pool

type Redis

type Redis struct {
	// contains filtered or unexported fields
}

func NewRedis

func NewRedis(pool *RedisPool, ttlsMinutes *viper.Viper) *Redis

NewRedis returns configured Redis struct with connection pool

func (*Redis) AddEvent

func (r *Redis) AddEvent(namespace, id, status string, entity *Event) error

AddEvent saves event JSON string into Redis

func (*Redis) AppendTaskLog

func (r *Redis) AppendTaskLog(taskID string, now time.Time, system, message, level string) error

AppendTaskLog appends log record into task logs sorted set

func (*Redis) Close

func (r *Redis) Close() error

func (*Redis) CreateTask

func (r *Redis) CreateTask(sourceID, collection string, task *Task, createdAt time.Time) error

CreateTask saves task into Redis and add Task ID in index

func (*Redis) DeleteSignature

func (r *Redis) DeleteSignature(sourceID, collection string) error

DeleteSignature deletes source collection signature from Redis

func (*Redis) GetAllTaskIDs

func (r *Redis) GetAllTaskIDs(sourceID, collection string, descendingOrder bool) ([]string, error)

GetAllTaskIDs returns all source's tasks ids by collection

func (*Redis) GetAllTasks

func (r *Redis) GetAllTasks(sourceID, collection string, start, end time.Time, limit int) ([]Task, error)

GetAllTasks returns all source's tasks by collection and time criteria

func (*Redis) GetAllTasksForInitialHeartbeat

func (r *Redis) GetAllTasksForInitialHeartbeat(runningStatus, scheduledStatus string, lastActivityThreshold time.Duration) ([]string, error)

GetAllTasksForInitialHeartbeat returns all task IDs where: 1. task is RUNNING and last log time more than last activity threshold 2. task is SCHEDULED and task creation time more than last activity threshold

func (*Redis) GetAllTasksHeartBeat

func (r *Redis) GetAllTasksHeartBeat() (map[string]string, error)

GetAllTasksHeartBeat returns map with taskID-last heartbeat timestamp pairs

func (*Redis) GetEvents

func (r *Redis) GetEvents(namespace, id, status string, limit int) ([]Event, error)

GetEvents returns last n events from namespace with id

func (*Redis) GetEventsWithGranularity

func (r *Redis) GetEventsWithGranularity(namespace, status, eventType string, ids []string, start, end time.Time, granularity Granularity) ([]EventsPerTime, error)

GetEventsWithGranularity returns events amount with time criteria by granularity, status and sources/destination ids

func (*Redis) GetLastTask

func (r *Redis) GetLastTask(sourceID, collection string, offset int) (*Task, error)

GetLastTask returns last sync task

func (*Redis) GetOrCreateClusterID

func (r *Redis) GetOrCreateClusterID() string

GetOrCreateClusterID returns clusterID from Redis or save input one

func (*Redis) GetProjectDestinationIDs

func (r *Redis) GetProjectDestinationIDs(projectID string) ([]string, error)

GetProjectDestinationIDs returns project's destination ids

func (*Redis) GetProjectPushSourceIDs

func (r *Redis) GetProjectPushSourceIDs(projectID string) ([]string, error)

GetProjectPushSourceIDs returns project's pushed sources ids (api keys)

func (*Redis) GetProjectSourceIDs

func (r *Redis) GetProjectSourceIDs(projectID string) ([]string, error)

GetProjectSourceIDs returns project's sources ids

func (*Redis) GetSignature

func (r *Redis) GetSignature(sourceID, collection, interval string) (string, error)

GetSignature returns sync interval signature from Redis

func (*Redis) GetTask

func (r *Redis) GetTask(taskID string) (*Task, error)

GetTask opens connection and returns result of getTask

func (*Redis) GetTaskLogs

func (r *Redis) GetTaskLogs(taskID string, start, end time.Time) ([]TaskLogRecord, error)

GetTaskLogs returns task logs with time criteria

func (*Redis) GetTotalEvents

func (r *Redis) GetTotalEvents(namespace, id, status string) (int, error)

GetTotalEvents returns total of cached events

func (*Redis) IncrementEventsCount

func (r *Redis) IncrementEventsCount(id, namespace, eventType, status string, now time.Time, value int64) error

IncrementEventsCount increment events counter namespaces: [destination, source] eventType: [push, pull] status: [success, error, skip]

func (*Redis) PollTask

func (r *Redis) PollTask() (*Task, error)

PollTask return task from the Queue or nil if the queue is empty

func (*Redis) PushTask

func (r *Redis) PushTask(task *Task) error

PushTask saves task into priority queue

func (*Redis) RemoveTaskFromHeartBeat

func (r *Redis) RemoveTaskFromHeartBeat(taskID string) error

RemoveTaskFromHeartBeat removes taskID current timestamp from heartbeat key

func (*Redis) RemoveTasks

func (r *Redis) RemoveTasks(sourceID, collection string, taskIDs ...string) (int, error)

RemoveTasks tasks with provided taskIds from specified source's collections. All task logs removed as well

func (*Redis) SaveSignature

func (r *Redis) SaveSignature(sourceID, collection, interval, signature string) error

SaveSignature saves sync interval signature in Redis

func (*Redis) TaskHeartBeat

func (r *Redis) TaskHeartBeat(taskID string) error

TaskHeartBeat sets current timestamp into heartbeat key

func (*Redis) TrimEvents

func (r *Redis) TrimEvents(namespace, id, status string, capacity int) error

TrimEvents keeps only last capacity events in Redis list key with trim function

func (*Redis) Type

func (r *Redis) Type() string

func (*Redis) UpdateFinishedTask

func (r *Redis) UpdateFinishedTask(taskID, status string) error

UpdateFinishedTask updates only status and finished_at field in the task

func (*Redis) UpdateStartedTask

func (r *Redis) UpdateStartedTask(taskID, status string) error

UpdateStartedTask updates only status and started_at field in the task

type RedisPool

type RedisPool struct {
	// contains filtered or unexported fields
}

RedisPool is a wrapper for keeping redis.Pool and sentinel.Sentinel and close them

func (*RedisPool) Close

func (rp *RedisPool) Close() (multiErr error)

Close closes redis pool and sentinel if configured

func (*RedisPool) Get

func (rp *RedisPool) Get() redis.Conn

Get returns a connection from the pool

func (*RedisPool) GetContext

func (rp *RedisPool) GetContext(ctx context.Context) (redis.Conn, error)

func (*RedisPool) GetPool

func (rp *RedisPool) GetPool() *redis.Pool

GetPool returns the underlying redigo pool

type RedisPoolFactory

type RedisPoolFactory struct {
	// contains filtered or unexported fields
}

RedisPoolFactory is a factory for creating RedisPool supports creating RedisPool from URLs: redis://, rediss://, sentinel:// and from config parameters like host,port, etc

func NewRedisPoolFactory

func NewRedisPoolFactory(host string, port int, password string, database int, tlsSkipVerify bool, sentinelMasterMame string) *RedisPoolFactory

NewRedisPoolFactory returns filled RedisPoolFactory and removes quotes in host

func (*RedisPoolFactory) CheckAndSetDefaultPort

func (rpf *RedisPoolFactory) CheckAndSetDefaultPort() (int, bool)

CheckAndSetDefaultPort checks if port isn't set - put defaultRedisPort, if sentinel mode put defaultSentinelPort

func (*RedisPoolFactory) Create

func (rpf *RedisPoolFactory) Create() (*RedisPool, error)

Create returns configured RedisPool or err if ping failed host might be URLS: 1. redis://:password@host:port 2. rediss://:password@host:port 3. sentinel://master_name:password@node1:port,node2:port 4. plain host

func (*RedisPoolFactory) Details

func (rpf *RedisPoolFactory) Details() string

Details returns host:port or host if host is a URL with sentinel information

func (*RedisPoolFactory) GetOptions

func (rpf *RedisPoolFactory) GetOptions() Options

func (*RedisPoolFactory) WithOptions

func (rpf *RedisPoolFactory) WithOptions(options Options) *RedisPoolFactory

WithOptions overrides options

type Storage

type Storage interface {
	io.Closer

	//** Sources **
	//signatures
	GetSignature(sourceID, collection, interval string) (string, error)
	SaveSignature(sourceID, collection, interval, signature string) error
	DeleteSignature(sourceID, collection string) error

	//** Counters **
	//events counters
	IncrementEventsCount(id, namespace, eventType, status string, now time.Time, value int64) error
	GetProjectSourceIDs(projectID string) ([]string, error)
	GetProjectDestinationIDs(projectID string) ([]string, error)
	//536-issue DEPRECATED instead of it all project sources will be selected
	GetProjectPushSourceIDs(projectID string) ([]string, error)
	GetEventsWithGranularity(namespace, status, eventType string, ids []string, start, end time.Time, granularity Granularity) ([]EventsPerTime, error)

	//** Events Cache **
	//events caching
	AddEvent(namespace, id, status string, entity *Event) error
	TrimEvents(namespace, id, status string, capacity int) error
	GetEvents(namespace, id, status string, limit int) ([]Event, error)
	GetTotalEvents(namespace, id, status string) (int, error)

	// ** Sync Tasks **
	CreateTask(sourceID, collection string, task *Task, createdAt time.Time) error
	GetAllTasks(sourceID, collection string, start, end time.Time, limit int) ([]Task, error)
	GetLastTask(sourceID, collection string, offset int) (*Task, error)
	GetTask(taskID string) (*Task, error)
	GetAllTaskIDs(sourceID, collection string, descendingOrder bool) ([]string, error)
	RemoveTasks(sourceID, collection string, taskIDs ...string) (int, error)
	UpdateStartedTask(taskID, status string) error
	UpdateFinishedTask(taskID, status string) error

	//heartbeat
	TaskHeartBeat(taskID string) error
	RemoveTaskFromHeartBeat(taskID string) error
	GetAllTasksHeartBeat() (map[string]string, error)
	GetAllTasksForInitialHeartbeat(runningStatus, scheduledStatus string, lastActivityThreshold time.Duration) ([]string, error)

	//task logs
	AppendTaskLog(taskID string, now time.Time, system, message, level string) error
	GetTaskLogs(taskID string, start, end time.Time) ([]TaskLogRecord, error)

	//task queue
	PushTask(task *Task) error
	PollTask() (*Task, error)

	//system
	GetOrCreateClusterID() string

	Type() string
}

func InitializeStorage

func InitializeStorage(metaStorageConfiguration *viper.Viper) (Storage, error)

type Task

type Task struct {
	ID         string `json:"id,omitempty" redis:"id"`
	SourceType string `json:"source_type" redis:"source_type"`
	Source     string `json:"source,omitempty" redis:"source"`
	Collection string `json:"collection,omitempty" redis:"collection"`
	Priority   int64  `json:"priority,omitempty" redis:"priority"`
	CreatedAt  string `json:"created_at,omitempty" redis:"created_at"`
	StartedAt  string `json:"started_at,omitempty" redis:"started_at"`
	FinishedAt string `json:"finished_at,omitempty" redis:"finished_at"`
	Status     string `json:"status,omitempty" redis:"status"`
}

Task is a Redis entity some fields are updated using names in Storage (like status updating)

type TaskLogRecord

type TaskLogRecord struct {
	Time    string `json:"time,omitempty" redis:"time"`
	System  string `json:"system,omitempty" redis:"system"`
	Message string `json:"message,omitempty" redis:"message"`
	Level   string `json:"level,omitempty" redis:"level"`
}

TaskLogRecord is a Redis entity

func (*TaskLogRecord) Marshal

func (tlr *TaskLogRecord) Marshal() string

Marshal returns serialized JSON object string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL