sgload

package
v0.0.0-...-a137165 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2020 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CHANGES_LIMIT = 100
)
View Source
const FEED_TYPE_LONGPOLL = ChangesFeedType("longpoll")
View Source
const FEED_TYPE_NORMAL = ChangesFeedType("normal")
View Source
const (
	USER_PREFIX_READER = "reader"
)
View Source
const (
	USER_PREFIX_WRITER = "writer"
)

Variables

This section is empty.

Functions

func Logger

func Logger() log15.Logger

func NewUuid

func NewUuid() string

func RetryLoop

func RetryLoop(description string, worker RetryWorker, sleeper RetrySleeper) (error, interface{})

func SetLogLevel

func SetLogLevel(level log15.Lvl)

Types

type Agent

type Agent struct {
	AgentSpec
	StatsdClient        g2s.Statter          // The statsd client instance to use to push stats to statdsd
	ExpVarStats         ExpVarStatsCollector // The expvar progress stats map for this agent
	CreateUserSemaphore *semaphore.Semaphore // Semaphore to ensure max # of concrrent createuser requests
	CreatedSGUser       bool                 // State to track whether SG user has already been created
}

Contains common fields and functionality between readers and writers

func (*Agent) SetCreateUserSemaphore

func (a *Agent) SetCreateUserSemaphore(sem *semaphore.Semaphore)

func (*Agent) SetStatsdClient

func (a *Agent) SetStatsdClient(statsdClient g2s.Statter)

type AgentSpec

type AgentSpec struct {
	FinishedWg *sync.WaitGroup // Allows interested party to block until agent is done
	UserCred
	ID                      int             // The numeric ID of the writer (ephemeral, only stored in memory)
	CreateDataStoreUser     bool            // Whether this writer must first create a user on the DataStore service, ot just assume it already exists
	DataStore               DataStore       // The target data store where docs will be written
	BatchSize               int             // bulk_get or bulk_docs batch size
	AttachSizeBytes         int             // If > 0, and BatchSize == 1, then it will add attachments of this size during doc creates/updates.
	ExpvarProgressEnabled   bool            // Whether to publish reader/writer/updater progress to expvars
	MaxConcurrentCreateUser int             // The maximum number of concurrent outstanding createuser requests.  0 means no maximum
	AllSGUsersCreated       *sync.WaitGroup // Wait Group to allow waiting until all SG users created before applying load
}

type AttachmentMeta

type AttachmentMeta struct {
	Follows     bool   `json:"follows"`
	ContentType string `json:"content_type"`
	Length      int    `json:"length"`
	Digest      string `json:"digest"`
}

type BulkDocs

type BulkDocs struct {
	NewEdits  bool       `json:"new_edits"`
	Documents []Document `json:"docs"`
}

type Change

type Change interface{} // TODO: spec this out further

type ChangesFeedParams

type ChangesFeedParams struct {
	// contains filtered or unexported fields
}

func NewChangesFeedParams

func NewChangesFeedParams(sinceVal Sincer, limit int, feedType ChangesFeedType) *ChangesFeedParams

func (ChangesFeedParams) String

func (p ChangesFeedParams) String() string

type ChangesFeedType

type ChangesFeedType string

type DataStore

type DataStore interface {

	// Creates a new user in the data store (admin port)
	CreateUser(u UserCred, channelNames []string) error

	// Create a single document, possibly with attachment if attachSizeBytes > 0
	CreateDocument(doc Document, attachSizeBytes int, newEdits bool) (DocumentMetadata, error)

	// Bulk creates a set of documents in the data store
	BulkCreateDocuments(d []Document, newEdits bool) ([]DocumentMetadata, error)

	// Same as BulkCreateDocuments, but built-in retry for temporary errors
	BulkCreateDocumentsRetry(d []Document, newEdits bool) ([]DocumentMetadata, error)

	// Sets the user credentials to use for all subsequent requests
	SetUserCreds(u UserCred)

	// Get all the changes since the since value
	Changes(sinceVal Sincer, limit int, feedType ChangesFeedType) (changes sgreplicate.Changes, newSinceVal Sincer, err error)

	// Does a bulk get on docs in bulk get request, discards actual docs
	BulkGetDocuments(sgreplicate.BulkGetRequest) ([]sgreplicate.Document, error)
}

type DocUpdateStatus

type DocUpdateStatus struct {
	NumUpdates       int
	DocumentMetadata DocumentMetadata
}

type Document

type Document map[string]interface{}

func DocumentFromSGReplicateDocument

func DocumentFromSGReplicateDocument(sgrDoc sgreplicate.Document) Document

func (Document) Copy

func (d Document) Copy() Document

func (Document) GenerateAndAddAttachmentMeta

func (d Document) GenerateAndAddAttachmentMeta(name, contentType string, attachmentContent []byte)

func (Document) GenerateHtmlAttachmentContent

func (d Document) GenerateHtmlAttachmentContent(approxAttachSizeBytes int) []byte

Generate an attachment approximately with size specified in approxAttachSizeBytes

func (Document) GetBodySizeBytes

func (d Document) GetBodySizeBytes() (docSizeBytes int)

func (Document) Id

func (d Document) Id() string

func (Document) Revision

func (d Document) Revision() string

func (Document) SetChannels

func (d Document) SetChannels(channels []string)

func (Document) SetId

func (d Document) SetId(id string)

func (Document) SetRevision

func (d Document) SetRevision(revision string)

func (Document) SetRevisions

func (d Document) SetRevisions(start int, digests []string)

Standard CouchDB encoding of a revision list: digests without numeric generation prefixes go in the "ids" property, and the first (largest) generation number in the "start" property.

type DocumentMetadata

type DocumentMetadata struct {
	sgreplicate.DocumentRevisionPair
	Channels []string
}

type DocumentRevisionPair

type DocumentRevisionPair struct {
	Id       string `json:"id"`
	Revision string `json:"rev"`
	Error    string `json:"error,omitempty"`
	Reason   string `json:"reason,omitempty"`
}

Copy-pasted from sg-replicate -- needs to be refactored into common code per DRY principle

type ExpVarStatsCollector

type ExpVarStatsCollector interface {
	Set(key string, av expvar.Var)
	Add(key string, delta int64)
}

Since sometimes we want to just ignore any calls to update expvarstats depending on CLI args, wrap up expvar.Map behind an interface and offer a "no-op" version

type GateLoadRunner

type GateLoadRunner struct {
	LoadRunner
	WriteLoadRunner
	ReadLoadRunner
	UpdateLoadRunner
	GateLoadSpec GateLoadSpec
	PushedDocs   chan []DocumentMetadata
}

func NewGateLoadRunner

func NewGateLoadRunner(gls GateLoadSpec) *GateLoadRunner

func (GateLoadRunner) CreateAllSGUsersWaitGroup

func (glr GateLoadRunner) CreateAllSGUsersWaitGroup() *sync.WaitGroup

func (GateLoadRunner) Run

func (glr GateLoadRunner) Run() error

type GateLoadSpec

type GateLoadSpec struct {
	LoadSpec
	WriteLoadSpec
	ReadLoadSpec
	UpdateLoadSpec
}

func (GateLoadSpec) MustValidate

func (gls GateLoadSpec) MustValidate()

Validate this spec or panic

func (GateLoadSpec) Validate

func (gls GateLoadSpec) Validate() error

type LoadRunner

type LoadRunner struct {
	LoadSpec     LoadSpec
	StatsdClient g2s.Statter
}

func (*LoadRunner) CreateStatsdClient

func (lr *LoadRunner) CreateStatsdClient()

type LoadSpec

type LoadSpec struct {
	SyncGatewayUrl        string    // The Sync Gateway public URL with port and DB, eg "http://localhost:4984/db"
	SyncGatewayAdminPort  int       // The Sync Gateway admin port, eg, 4985
	MockDataStore         bool      // If true, will use a MockDataStore instead of a real sync gateway
	StatsdEnabled         bool      // If true, will push stats to StatsdEndpoint
	StatsdEndpoint        string    // The endpoint of the statds server, eg localhost:8125
	StatsdPrefix          string    // The metrics prefix to use (for example, some hosted statsd services require a token)
	TestSessionID         string    // A unique identifier for this test session.  It's used for creating channel names and possibly more
	AttachSizeBytes       int       // If > 0, and BatchSize == 1, then it will add attachments of this size during doc creates/updates.
	BatchSize             int       // How many docs to read (bulk_get) or write (bulk_docs) in bulk
	NumChannels           int       // How many channels to create/use during this test
	DocSizeBytes          int       // Doc size in bytes to create during this test
	NumDocs               int       // Number of docs to read/write during this test
	CompressionEnabled    bool      // Whether requests and responses should be compressed (when supported)
	ExpvarProgressEnabled bool      // Whether to publish reader/writer/updater progress to expvars (disabled by default to not bloat expvar json)
	LogLevel              log15.Lvl // The log level.  Defaults to LvlWarn

}

This is the specification for this load test scenario. The values contained here are common to all load test scenarios.

func (LoadSpec) Validate

func (ls LoadSpec) Validate() error

type MockDataStore

type MockDataStore struct{}

func NewMockDataStore

func NewMockDataStore() *MockDataStore

func (MockDataStore) BulkCreateDocuments

func (m MockDataStore) BulkCreateDocuments(docs []Document, newEdits bool) ([]DocumentMetadata, error)

func (MockDataStore) BulkCreateDocumentsRetry

func (m MockDataStore) BulkCreateDocumentsRetry(docs []Document, newEdits bool) ([]DocumentMetadata, error)

func (MockDataStore) BulkGetDocuments

func (MockDataStore) Changes

func (m MockDataStore) Changes(sinceVal Sincer, limit int, feedType ChangesFeedType) (changes sgreplicate.Changes, newSinceVal Sincer, err error)

func (MockDataStore) CreateDocument

func (m MockDataStore) CreateDocument(doc Document, attachSizeBytes int, newEdits bool) (DocumentMetadata, error)

func (MockDataStore) CreateUser

func (m MockDataStore) CreateUser(u UserCred, channelNames []string) error

func (*MockDataStore) SetUserCreds

func (m *MockDataStore) SetUserCreds(u UserCred)

type MockStatter

type MockStatter struct{}

func (MockStatter) Counter

func (ms MockStatter) Counter(sampleRate float32, bucket string, n ...int)

func (MockStatter) Gauge

func (ms MockStatter) Gauge(sampleRate float32, bucket string, value ...string)

func (MockStatter) Timing

func (ms MockStatter) Timing(sampleRate float32, bucket string, d ...time.Duration)

type NoOpExpvarStatsCollector

type NoOpExpvarStatsCollector struct{}

An impl of ExpVarStatsCollector which ignores every operation

func (NoOpExpvarStatsCollector) Add

func (e NoOpExpvarStatsCollector) Add(key string, delta int64)

func (NoOpExpvarStatsCollector) Set

func (e NoOpExpvarStatsCollector) Set(key string, av expvar.Var)

type ReadLoadRunner

type ReadLoadRunner struct {
	LoadRunner
	ReadLoadSpec ReadLoadSpec
}

func NewReadLoadRunner

func NewReadLoadRunner(rls ReadLoadSpec) *ReadLoadRunner

func (ReadLoadRunner) Run

func (rlr ReadLoadRunner) Run() error

type ReadLoadSpec

type ReadLoadSpec struct {
	LoadSpec
	CreateReaders             bool // Whether or not to create users for readers
	NumReaders                int
	NumChansPerReader         int
	NumRevGenerationsExpected int
	SkipWriteLoadSetup        bool            // By default the readload scenario runs the writeload scenario first.  If this is true, it will skip the writeload scenario.
	FeedType                  ChangesFeedType // "Normal" or "Longpoll"

}

func (ReadLoadSpec) MustValidate

func (rls ReadLoadSpec) MustValidate()

Validate this spec or panic

func (ReadLoadSpec) Validate

func (rls ReadLoadSpec) Validate() error

type Reader

type Reader struct {
	Agent
	SGChannels                []string // The Sync Gateway channels this reader is assigned to pull from
	NumDocsExpected           int      // The total number of docs this reader is expected to pull'
	NumRevGenerationsExpected int      // The expected generate that each doc is expected to reach
	BatchSize                 int      // The number of docs to pull in batch (_changes feed and bulk_get)
	// contains filtered or unexported fields
}

func NewReader

func NewReader(agentSpec AgentSpec) *Reader

func (*Reader) Run

func (r *Reader) Run()

Main loop of reader goroutine

func (*Reader) SetBatchSize

func (r *Reader) SetBatchSize(batchSize int)

func (*Reader) SetChannels

func (r *Reader) SetChannels(sgChannels []string)

func (*Reader) SetFeedType

func (r *Reader) SetFeedType(feedType ChangesFeedType)

func (*Reader) SetNumDocsExpected

func (r *Reader) SetNumDocsExpected(n int)

func (*Reader) SetNumRevGenerationsExpected

func (r *Reader) SetNumRevGenerationsExpected(n int)

type RetrySleeper

type RetrySleeper func(retryCount int) (shouldContinue bool, timeTosleepMs int)

A retry sleeper is called back by the retry loop and passed the current retryCount, and should return the amount of milliseconds that the retry should sleep.

func CreateDoublingSleeperFunc

func CreateDoublingSleeperFunc(maxNumAttempts, initialTimeToSleepMs int) RetrySleeper

Create a RetrySleeper that will double the retry time on every iteration and use the given parameters

type RetryWorker

type RetryWorker func() (shouldRetry bool, err error, value interface{})

A RetryWorker encapsulates the work being done in a Retry Loop. The shouldRetry return value determines whether the worker will retry, regardless of the err value. If the worker has exceeded it's retry attempts, then it will not be called again even if it returns shouldRetry = true.

type SGDataStore

type SGDataStore struct {
	SyncGatewayUrl       string
	SyncGatewayAdminPort int
	UserCreds            UserCred
	StatsdClient         g2s.Statter
	CompressionEnabled   bool
}

func NewSGDataStore

func NewSGDataStore(sgUrl string, sgAdminPort int, statsdClient g2s.Statter, compressionEnabled bool) *SGDataStore

func (SGDataStore) BulkCreateDocuments

func (s SGDataStore) BulkCreateDocuments(docs []Document, newEdits bool) ([]DocumentMetadata, error)

Bulk create/update a set of documents in Sync Gateway

func (SGDataStore) BulkCreateDocumentsRetry

func (s SGDataStore) BulkCreateDocumentsRetry(docs []Document, newEdits bool) ([]DocumentMetadata, error)

func (SGDataStore) BulkGetDocuments

func (s SGDataStore) BulkGetDocuments(r sgreplicate.BulkGetRequest) ([]sgreplicate.Document, error)

func (SGDataStore) Changes

func (s SGDataStore) Changes(sinceVal Sincer, limit int, feedType ChangesFeedType) (changes sgreplicate.Changes, newSinceVal Sincer, err error)

func (SGDataStore) CreateDocument

func (s SGDataStore) CreateDocument(doc Document, attachSizeBytes int, newEdits bool) (DocumentMetadata, error)

Create or update a single document with attachment data

func (SGDataStore) CreateDocumentNoAttachment

func (s SGDataStore) CreateDocumentNoAttachment(doc Document, newEdits bool) (DocumentMetadata, error)

Create or update a single document

func (SGDataStore) CreateUser

func (s SGDataStore) CreateUser(u UserCred, channelNames []string) error

func (*SGDataStore) SetUserCreds

func (s *SGDataStore) SetUserCreds(u UserCred)

type Sincer

type Sincer interface {
	Empty() bool
	String() string
	Equals(other Sincer) bool
}

type StringSincer

type StringSincer struct {
	Since string
}

func (StringSincer) Empty

func (s StringSincer) Empty() bool

func (StringSincer) Equals

func (s StringSincer) Equals(otherSincer Sincer) bool

func (StringSincer) String

func (s StringSincer) String() string

type UpdateLoadRunner

type UpdateLoadRunner struct {
	LoadRunner
	UpdateLoadSpec UpdateLoadSpec
}

type UpdateLoadSpec

type UpdateLoadSpec struct {
	LoadSpec
	NumUpdatesPerDoc    int           // The total number of revisions to add per doc
	NumRevsPerUpdate    int           // The number of revisions to add per update
	NumUpdaters         int           // The number of updater goroutines
	DelayBetweenUpdates time.Duration // Delay between updates (subtracting out the time they are blocked during write)

}

func (UpdateLoadSpec) MustValidate

func (uls UpdateLoadSpec) MustValidate()

Validate this spec or panic

func (UpdateLoadSpec) Validate

func (uls UpdateLoadSpec) Validate() error

type Updater

type Updater struct {
	Agent
	UpdaterSpec

	DocsToUpdate <-chan []DocumentMetadata // This is a channel that this updater listens to for docs that are ready to be updated

	DocUpdateStatuses map[string]DocUpdateStatus // The number of updates and latest rev that have been done per doc id.  Key = doc id, value = number of updates and latest rev

}

func NewUpdater

func NewUpdater(agentSpec AgentSpec, numUniqueDocsPerUpdater, numUpdatesPerDoc, batchsize, docSizeBytes int, revsPerUpdate int, docsToUpdate <-chan []DocumentMetadata, delayBetweenUpdates time.Duration) *Updater

func (Updater) LookupCurrentRevisions

func (u Updater) LookupCurrentRevisions(docsToLookup []Document) ([]sgreplicate.DocumentRevisionPair, error)

func (*Updater) Run

func (u *Updater) Run()

type UpdaterSpec

type UpdaterSpec struct {
	NumUpdatesPerDocRequired int           // The number of updates this updater is supposed to do for each doc.
	NumUniqueDocsPerUpdater  int           // The number of unique docs this updater is tasked to update.
	BatchSize                int           // How many docs to update per bulk_docs request
	RevsPerUpdate            int           // How many revisions to include in each document update
	DocSizeBytes             int           // The doc size in bytes to use when generating update docs
	DelayBetweenUpdates      time.Duration // Delay between updates (subtracting out the time they are blocked during write)
}

type UserCred

type UserCred struct {
	Username string `json:"username"` // Username part of basicauth credentials for this writer to use
	Password string `json:"password"` // Password part of basicauth credentials for this writer to use
}

func (UserCred) Empty

func (u UserCred) Empty() bool

type WriteLoadRunner

type WriteLoadRunner struct {
	LoadRunner
	WriteLoadSpec WriteLoadSpec
}

func NewWriteLoadRunner

func NewWriteLoadRunner(wls WriteLoadSpec) *WriteLoadRunner

func (WriteLoadRunner) Run

func (wlr WriteLoadRunner) Run() error

type WriteLoadSpec

type WriteLoadSpec struct {
	LoadSpec

	CreateWriters bool // Whether or not to create users for writers
	NumWriters    int

	// How long writers should try to delay between writes
	// (subtracting out the time they are blocked during actual write)
	DelayBetweenWrites time.Duration
}

func (WriteLoadSpec) MustValidate

func (wls WriteLoadSpec) MustValidate()

Validate this spec or panic

func (WriteLoadSpec) Validate

func (wls WriteLoadSpec) Validate() error

type Writer

type Writer struct {
	Agent

	WriterSpec

	OutboundDocs chan []Document           // The Docfeeder pushes outbound docs to the writer
	PushedDocs   chan<- []DocumentMetadata // After docs are sent, push to this channel

}

func NewWriter

func NewWriter(agentSpec AgentSpec, spec WriterSpec) *Writer

func (*Writer) AddToDataStore

func (w *Writer) AddToDataStore(docs []Document)

func (*Writer) Run

func (w *Writer) Run()

func (*Writer) SetApproxExpectedDocsWritten

func (w *Writer) SetApproxExpectedDocsWritten(numdocs int)

type WriterSpec

type WriterSpec struct {

	// How long writers should try to delay between writes
	// (subtracting out the time they are blocked during actual write)
	DelayBetweenWrites time.Duration
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL