mongostore

package
v0.0.0-...-137c36e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2022 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultDatabaseName                         = "datastore"
	DefaultConnectTimeoutSeconds                = uint64(10)
	DefaultTimeoutSecondsShutdown               = uint64(10)
	DefaultTimeoutSecondsQuery                  = uint64(10)
	DefaultPingHeartbeatSeconds                 = uint64(10)
	DefaultMaxFailedEnsureIndexesBackoffSeconds = uint64(300)
	DefaultUsername                             = ""
	DefaultPassword                             = ""
	DefaultAuthMechanism                        = "PLAIN"
	DefaultMaxPoolSize                          = uint64(100)
	DefaultHost                                 = "localhost:27017"
)
View Source
const (
	ASC  = 1
	DESC = -1
)
View Source
const (
	MaxSliceSizePerMongoDocument = uint64(10 * 1024 * 1024)
)

Variables

View Source
var DirtyWriteError = errors.New("dirty write error")
View Source
var ErrorServiceNotStarted = errors.New("getting mongo client failed: service is not started or shutdown")

Functions

func CheckForDirtyWriteOnUpsert

func CheckForDirtyWriteOnUpsert(updateResult *mongo.UpdateResult, inputErr error) (err error)

CheckForDirtyWriteOnUpsert is expected to be used like this: Add a field to your struct called "DirtyWriteGuard"

type Person struct {
  ...
  DirtyWriteGuard uint64  `bson:"dirtyWriteGuard"`
}

Then when you update mongo:

	filter := bson.D{
		{"_id", person.Id},	// Note: must use _id or some indexed field with a unique constraint.
     	// where device.DirtyWriteGuard is 0 on new or == to the dirtyWriteGuard field of the entity we expect in the collection
		{"dirtyWriteGuard", person.DirtyWriteGuard}, // this value should be unmodified by your code as it was loaded from mongo.
	}

	// increment the counter before update or insert
	person.DirtyWriteGuard++
	defer func() {
		if err != nil {
			// if upsert fails decrement the counter
			person.DirtyWriteGuard--
		}
	}()

	updateOptions := &options.ReplaceOptions{}
	updateOptions.SetUpsert(true)

	var updateResult *mongo.UpdateResult
	updateResult, err = collection.ReplaceOne(ctx, filter, person, updateOptions)
	err = mongostore.CheckForDirtyWriteOnUpsert(updateResult, err)
	if err != nil {
		if err != mongostore.DirtyWriteError {
			// only log or mess with err returned if not a DirtyWriteError
			logger.Instance().ErrorIgnoreCancel(ctx, "error on ReplaceOne for Person", logger.Error(err))
			err = errors.Wrap(err, "error on ReplaceOne for Person")
		}
		return
	}

In the expected dirty write case mongo will return updateResult.MatchedCount == 0 && updateResult.UpsertedID == nil. Meaning that 0 documents matched the filter with the unique id and the dirtyWriteGuard equality.

In case of no dirty write and no error returned by the UpdateOne() we expect either an insert (updateResult.UpsertedID has a value) or an updated existing document (updateResult.MatchedCount == 1).

In the the real-world and tested case mongo will return E11000 duplicate key error collection in case of dirty write. This is because no document will exist that matches _id and dirtyWriteGuard causing mongo to attempt to insert a new document which will return duplicate key error.

return E11000 duplicate key error collection in case of dirty write. This is because no document will exist that matches _id and dirtyWriteGuard causing mongo to attempt to insert a new document which will return duplicate key error. In case of no dirty write and no error returned by the UpdateOne() we expect either an insert (updateResult.UpsertedID has a value) or an updated existing document (updateResult.MatchedCount == 1).

func DuplicateKeyFiltersFromBulkWriteError

func DuplicateKeyFiltersFromBulkWriteError(err error) (containsDuplicateKeyError bool, filtersForDups []interface{})

DuplicateKeyFiltersFromBulkWriteError returns true if there are any E11000 duplicate key errors. Returns a slice of whatever you passed into the mongo command for your Filter or Filters if BulkWrite(). Should be a primitive.M or primitive.D.

func IsDuplicateKeyError

func IsDuplicateKeyError(err error) bool

IsDuplicateKeyError can help handle expected behavior for any mongo command that uses Upsert. If IsDuplicateKeyError returns true for the error returned by a mongo operation, and you are using Upsert then you are expected to retry.

Will return false if there are multiple nested mongo writeExceptions and one of the errors has a Code != 11000 (duplicate key) indicating there are other errors that should be handled and not ignored or handled the same.

func IsIndexNotFoundError

func IsIndexNotFoundError(err error) bool

func RetryDirtyWrite

func RetryDirtyWrite(dirtyWriteFunc RetryFunc) (err error)

RetryDirtyWrite is used by callers of functions that call CheckForDirtyWriteOnUpsert and can return DirtyWriteError. It will retry the anonymous function code up to 100 times before giving up if a dirty write error is detected. The caller of RetryDirtyWrite needs to ensure it has logic to refresh the copy of the object or objects its updating with a fresh copy from the collection.

 Example:
 // This code will be run repeatedly until there is no DirtyWriteError or the max retries is exceeded.
	err = mongostore.RetryDirtyWrite(func() error {
		var retryErr error

		// query an entity from the collection that has a dirtyWriteGuard uint64 field
		var existingPerson *Person
		existingPerson, retryErr = YourFunctionThatDoesMongoFind(ctx, personId)

		// ...logic that makes changes to existingPerson which could be now stale

		// YourFunctionThatDoesMongoUpsert can return DirtyWriteError
		if retryErr = YourFunctionThatDoesMongoUpsert(ctx, existingPerson); retryErr != nil {
			if retryErr != mongostore.DirtyWriteError {
				logger.Instance().ErrorIgnoreCancel(ctx, "error in YourFunctionThatDoesMongoUpsert", logger.Error(retryErr))
			}
			return retryErr
		}
		return nil
	})

func RetryUpsertIfDuplicateKey

func RetryUpsertIfDuplicateKey(retryFunc RetryFunc) (err error)

RetryUpsertIfDuplicateKey can help handle expected behavior for any mongo command that uses Upsert. The retryFunc will be tried up to numRetries times before giving up.

BE WARNED: Mongo can at any time return E11000 duplicate key error for ANY command with Upsert enabled. Mongo expects the application to handle this error. This happens if: "During an update with upsert:true option, two (or more) threads may attempt an upsert operation using the same query predicate and, upon not finding a match, the threads will attempt to insert a new document. Both inserts will (and should) succeed, unless the second causes a unique constraint violation." See: https://jira.mongodb.org/browse/SERVER-14322

Note: its documented that mongo retries on its own under the usual cases. While that may be true im seeing an error being returned in the wild as of mongo 6.x.

 	Example:
	opt := &options.ReplaceOptions{}
	opt.SetUpsert(true)
 	// This code will be run repeatedly until there is no DuplicateKeyError or the max retries is exceeded.
	err = mongostore.RetryUpsertIfDuplicateKey(func() error {
		_, retryErr := yourCollection.ReplaceOne(ctx, yourFilter, yourDocument, opt)
		return retryErr
	})

func TruncateStringSliceForMongoDoc

func TruncateStringSliceForMongoDoc(slice []string) (newSlice []string)

TruncateStringSliceForMongoDoc ensures a string slice will fit in the mongodb doc size limit and truncates the slice if necessary logging a warning.

func TruncateUUIDSliceForMongoDoc

func TruncateUUIDSliceForMongoDoc(slice []mongouuid.UUID) (newSlice []mongouuid.UUID)

TruncateUUIDSliceForMongoDoc ensures a mongouuid.UUID slice will fit in the mongodb doc size limit and truncates the slice if necessary logging a warning.

Types

type DataStore

type DataStore struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func Instance

func Instance() *DataStore

Instance returns an instance of the data store singleton. This ensures you only have once instance of this per program. All connections will be polled automatically and you have no work for startup or cleaning up connections than running Instance().StartTask() and Instance().StopTask().

The singleton is multithreading safe. Reference anywhere you need a connection to mongo or want to add Indexes after startup: e.g. mongostore.Instance().CollectionLinearWriteRead(...) mongostore.Instance().AddAndEnsureManagedIndexes(...)

func (*DataStore) AddAndEnsureManagedIndexes

func (a *DataStore) AddAndEnsureManagedIndexes(groupName string, addManagedIndexes []Index)

AddAndEnsureManagedIndexes adds additional indexes to be managed after startup. groupName must be unique and each group must operate on a different set of Collections than another group. If groupName is already registered then this function does nothing and returns. If this group has Collections overlapping with another managed group then panics. AddAndEnsureManagedIndexes will do the work in a new go routine. If there are problems you will need to watch log messages. If it's unable to connect to mongo it will keep retrying using an exponential backoff with a default of DefaultMaxFailedEnsureIndexesBackoffSeconds configurable with WithMaxFailedEnsureIndexesBackoffSeconds().

func (*DataStore) Collection

func (a *DataStore) Collection(ctx context.Context, name string) (*mongo.Collection, error)

Collection calls CollectionLinearWriteRead()

func (*DataStore) CollectionForWatch

func (a *DataStore) CollectionForWatch(ctx context.Context, name string) (*mongo.Collection, error)

CollectionForWatch creates a connection with: - readconcern.Majority() - readpref.SecondaryPreferred() - writeconcern.J(true) - writeconcern.WMajority()

This is recommended for use with Change Streams (Watch()). The write concerns are just in case you use it for writes by accident.

func (*DataStore) CollectionLinearWriteRead

func (a *DataStore) CollectionLinearWriteRead(ctx context.Context, name string) (*mongo.Collection, error)

CollectionLinearWriteRead creates a connection with: - readconcern.Linearizable() - readpref.Primary() - writeconcern.J(true) - writeconcern.WMajority()

This connection supplies: "Casual Consistency" in a sharded cluster inside a single client thread. https://www.mongodb.com/docs/manual/core/read-isolation-consistency-recency/#std-label-sessions

Note: readpref.Primary() is critical for reads to consistently return results in the same go routine immediately after an insert. And perhaps not well documented.

func (*DataStore) CollectionReadNearest

func (a *DataStore) CollectionReadNearest(ctx context.Context, name string) (*mongo.Collection, error)

CollectionReadNearest creates a connection with: - readconcern.Majority() - readpref.Nearest() - writeconcern.J(true) - writeconcern.WMajority()

func (*DataStore) CollectionReadSecondaryPreferred

func (a *DataStore) CollectionReadSecondaryPreferred(ctx context.Context, name string) (*mongo.Collection, error)

CollectionReadSecondaryPreferred creates a connection with: - readconcern.Majority() - readpref.SecondaryPreferred() - writeconcern.J(true) - writeconcern.WMajority()

func (*DataStore) CollectionUnsafeFastWrites

func (a *DataStore) CollectionUnsafeFastWrites(ctx context.Context, name string) (*mongo.Collection, error)

CollectionUnsafeFastWrites creates a connection with: - readconcern.Local() - readpref.Primary() - writeconcern.J(false) - writeconcern.W(1)

func (*DataStore) ContextTimeout

func (a *DataStore) ContextTimeout(ctx context.Context) (context.Context, context.CancelFunc)

func (*DataStore) Index

func (a *DataStore) Index(collectionName string, indexId IndexIdentifier) (idx Index, err error)

func (*DataStore) IndexOrPanic

func (a *DataStore) IndexOrPanic(collectionName string, indexId IndexIdentifier) (idx Index)

func (*DataStore) Ping

func (a *DataStore) Ping(ctx context.Context) error

func (*DataStore) StartTask

func (a *DataStore) StartTask(managedIndexes []Index, opts ...DataStoreOption)

StartTask starts the background routines. Call this once on startup from your main.go. Call StopTask() on exit. Indexes supplied here will be managed in a separate go routine after startup.

func (*DataStore) StopTask

func (a *DataStore) StopTask()

StopTask disconnects the mongo clients and stops the background routines. Call this once on exit of your main.go.

type DataStoreOption

type DataStoreOption func(o *Options)

func WithAuthMechanism

func WithAuthMechanism(authMechanism string) DataStoreOption

func WithConnectTimeoutSeconds

func WithConnectTimeoutSeconds(connectTimeoutSeconds uint64) DataStoreOption

func WithDatabaseName

func WithDatabaseName(databaseName string) DataStoreOption

func WithHosts

func WithHosts(hosts []string) DataStoreOption

func WithMaxFailedEnsureIndexesBackoffSeconds

func WithMaxFailedEnsureIndexesBackoffSeconds(maxFailedEnsureIndexesBackoffSeconds uint64) DataStoreOption

func WithMaxPoolSize

func WithMaxPoolSize(maxPoolSize uint64) DataStoreOption

func WithPassword

func WithPassword(password string) DataStoreOption

func WithPingHeartbeatSeconds

func WithPingHeartbeatSeconds(pingHeartbeatSeconds uint64) DataStoreOption

func WithTimeoutSecondsQuery

func WithTimeoutSecondsQuery(timeoutSecondsQuery uint64) DataStoreOption

func WithTimeoutSecondsShutdown

func WithTimeoutSecondsShutdown(timeoutSecondsShutdown uint64) DataStoreOption

func WithUri

func WithUri(uri string) DataStoreOption

func WithUsername

func WithUsername(username string) DataStoreOption

type Index

type Index struct {
	CollectionName string
	Id             IndexIdentifier
	Version        uint64 // increment any time the model or options changes - calling createIndex() with the same name but different \
	// options than an existing index will throw an error MongoError: \
	// Index with name: **MongoIndexName** already exists with different options
	Model mongo.IndexModel
}

func (Index) MongoIndexName

func (idx Index) MongoIndexName() string

type IndexIdentifier

type IndexIdentifier string

func (IndexIdentifier) String

func (iid IndexIdentifier) String() string

type Options

type Options struct {
	// contains filtered or unexported fields
}

type RetryFunc

type RetryFunc func() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL