sharding

package module
v6.6.4+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2019 License: BSD-2-Clause Imports: 13 Imported by: 0

README

PostgreSQL sharding for go-pg and Golang Build Status

This package uses a go-pg PostgreSQL client to help sharding your data across a set of PostgreSQL servers as described in Sharding & IDs at Instagram. In 2 words it maps many (2048-8192) logical shards implemented using PostgreSQL schemas to far fewer physical PostgreSQL servers.

API docs: http://godoc.org/github.com/go-pg/sharding. Examples: http://godoc.org/github.com/go-pg/sharding#pkg-examples.

Installation

To install:

go get github.com/go-pg/sharding

Quickstart

package sharding_test

import (
	"fmt"

	"github.com/go-pg/sharding"
	"github.com/go-pg/pg"
)

// Users are sharded by AccountId, i.e. users with same account id are
// placed on the same shard.
type User struct {
	tableName string `sql:"?shard.users"`

	Id        int64
	AccountId int64
	Name      string
	Emails    []string
}

func (u User) String() string {
	return u.Name
}

// CreateUser picks shard by account id and creates user in the shard.
func CreateUser(cluster *sharding.Cluster, user *User) error {
	return cluster.Shard(user.AccountId).Insert(user)
}

// GetUser splits shard from user id and fetches user from the shard.
func GetUser(cluster *sharding.Cluster, id int64) (*User, error) {
	var user User
	err := cluster.SplitShard(id).Model(&user).Where("id = ?", id).Select()
	return &user, err
}

// GetUsers picks shard by account id and fetches users from the shard.
func GetUsers(cluster *sharding.Cluster, accountId int64) ([]User, error) {
	var users []User
	err := cluster.Shard(accountId).Model(&users).Where("account_id = ?", accountId).Select()
	return users, err
}

// createShard creates database schema for a given shard.
func createShard(shard *pg.DB) error {
	queries := []string{
		`DROP SCHEMA IF EXISTS ?shard CASCADE`,
		`CREATE SCHEMA ?shard`,
		sqlFuncs,
		`CREATE TABLE ?shard.users (id bigint DEFAULT ?shard.next_id(), account_id int, name text, emails jsonb)`,
	}

	for _, q := range queries {
		_, err := shard.Exec(q)
		if err != nil {
			return err
		}
	}

	return nil
}

func ExampleCluster() {
	db := pg.Connect(&pg.Options{
		User: "postgres",
	})

	dbs := []*pg.DB{db} // list of physical PostgreSQL servers
	nshards := 2        // 2 logical shards
	// Create cluster with 1 physical server and 2 logical shards.
	cluster := sharding.NewCluster(dbs, nshards)

	// Create database schema for our logical shards.
	for i := 0; i < nshards; i++ {
		if err := createShard(cluster.Shard(int64(i))); err != nil {
			panic(err)
		}
	}

	// user1 will be created in shard1 because AccountId % nshards = shard1.
	user1 := &User{
		Name:      "user1",
		AccountId: 1,
		Emails:    []string{"user1@domain"},
	}
	err := CreateUser(cluster, user1)
	if err != nil {
		panic(err)
	}

	// user2 will be created in shard1 too because AccountId is the same.
	user2 := &User{
		Name:      "user2",
		AccountId: 1,
		Emails:    []string{"user2@domain"},
	}
	err = CreateUser(cluster, user2)
	if err != nil {
		panic(err)
	}

	// user3 will be created in shard0 because AccountId % nshards = shard0.
	user3 := &User{
		Name:      "user3",
		AccountId: 2,
		Emails:    []string{"user3@domain"},
	}
	err = CreateUser(cluster, user3)
	if err != nil {
		panic(err)
	}

	user, err := GetUser(cluster, user1.Id)
	if err != nil {
		panic(err)
	}

	users, err := GetUsers(cluster, 1)
	if err != nil {
		panic(err)
	}

	fmt.Println(user)
	fmt.Println(users[0], users[1])
	// Output: user1
	// user1 user2
}

const sqlFuncs = `
CREATE SEQUENCE ?shard.id_seq;

-- _next_id returns unique sortable id.
CREATE FUNCTION ?shard._next_id(tm timestamptz, shard_id int, seq_id bigint)
RETURNS bigint AS $$
DECLARE
  max_shard_id CONSTANT bigint := 2048;
  max_seq_id CONSTANT bigint := 4096;
  id bigint;
BEGIN
  shard_id := shard_id % max_shard_id;
  seq_id := seq_id % max_seq_id;
  id := (floor(extract(epoch FROM tm) * 1000)::bigint - ?epoch) << 23;
  id := id | (shard_id << 12);
  id := id | seq_id;
  RETURN id;
END;
$$
LANGUAGE plpgsql
IMMUTABLE;

CREATE FUNCTION ?shard.next_id()
RETURNS bigint AS $$
BEGIN
   RETURN ?shard._next_id(clock_timestamp(), ?shard_id, nextval('?shard.id_seq'));
END;
$$
LANGUAGE plpgsql;
`

Howto

Please use Golang PostgreSQL client docs to get the idea how to use this package.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	DefaultIdGen = NewIdGen(41, 11, 12, epoch)
)

Functions

func MaxId

func MaxId(tm time.Time) int64

MaxId returns max id for the time.

func MinId

func MinId(tm time.Time) int64

MinId returns min id for the time.

func SplitId

func SplitId(id int64) (tm time.Time, shardId int64, seqId int64)

SplitId splits id into time, shard id, and sequence id.

Types

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

Cluster maps many (up to 2048) logical database shards implemented using PostgreSQL schemas to far fewer physical PostgreSQL servers.

Example
package main

import (
	"fmt"

	"github.com/go-pg/sharding"

	"github.com/go-pg/pg"
)

// Users are sharded by AccountId, i.e. users with same account id are
// placed on same shard.
type User struct {
	tableName string `sql:"?shard.users"`

	Id        int64
	AccountId int64
	Name      string
	Emails    []string
}

func (u User) String() string {
	return u.Name
}

// CreateUser picks shard by account id and creates user in the shard.
func CreateUser(cluster *sharding.Cluster, user *User) error {
	return cluster.Shard(user.AccountId).Insert(user)
}

// GetUser splits shard from user id and fetches user from the shard.
func GetUser(cluster *sharding.Cluster, id int64) (*User, error) {
	var user User
	err := cluster.SplitShard(id).Model(&user).Where("id = ?", id).Select()
	return &user, err
}

// GetUsers picks shard by account id and fetches users from the shard.
func GetUsers(cluster *sharding.Cluster, accountId int64) ([]User, error) {
	var users []User
	err := cluster.Shard(accountId).Model(&users).Where("account_id = ?", accountId).Select()
	return users, err
}

// createShard creates database schema for a given shard.
func createShard(shard *pg.DB) error {
	queries := []string{
		`DROP SCHEMA IF EXISTS ?shard CASCADE`,
		`CREATE SCHEMA ?shard`,
		sqlFuncs,
		`CREATE TABLE ?shard.users (id bigint DEFAULT ?shard.next_id(), account_id int, name text, emails jsonb)`,
	}

	for _, q := range queries {
		_, err := shard.Exec(q)
		if err != nil {
			return err
		}
	}

	return nil
}

func main() {
	db := pg.Connect(&pg.Options{
		User: "postgres",
	})

	dbs := []*pg.DB{db} // list of physical PostgreSQL servers
	nshards := 2        // 2 logical shards
	// Create cluster with 1 physical server and 2 logical shards.
	cluster := sharding.NewCluster(dbs, nshards)

	// Create database schema for our logical shards.
	for i := 0; i < nshards; i++ {
		if err := createShard(cluster.Shard(int64(i))); err != nil {
			panic(err)
		}
	}

	// user1 will be created in shard1 because AccountId % nshards = shard1.
	user1 := &User{
		Name:      "user1",
		AccountId: 1,
		Emails:    []string{"user1@domain"},
	}
	err := CreateUser(cluster, user1)
	if err != nil {
		panic(err)
	}

	// user2 will be created in shard1 too AccountId is the same.
	user2 := &User{
		Name:      "user2",
		AccountId: 1,
		Emails:    []string{"user2@domain"},
	}
	err = CreateUser(cluster, user2)
	if err != nil {
		panic(err)
	}

	// user3 will be created in shard0 because AccountId % nshards = shard0.
	user3 := &User{
		Name:      "user3",
		AccountId: 2,
		Emails:    []string{"user3@domain"},
	}
	err = CreateUser(cluster, user3)
	if err != nil {
		panic(err)
	}

	user, err := GetUser(cluster, user1.Id)
	if err != nil {
		panic(err)
	}

	users, err := GetUsers(cluster, 1)
	if err != nil {
		panic(err)
	}

	fmt.Println(user)
	fmt.Println(users[0], users[1])
}

const sqlFuncs = `
CREATE SEQUENCE ?shard.id_seq;

-- _next_id returns unique sortable id.
CREATE FUNCTION ?shard._next_id(tm timestamptz, shard_id int, seq_id bigint)
RETURNS bigint AS $$
DECLARE
  our_epoch CONSTANT bigint := 1262304000000;
  max_shard_id CONSTANT bigint := 2048;
  max_seq_id CONSTANT bigint := 4096;
  id bigint;
BEGIN
  shard_id := shard_id % max_shard_id;
  seq_id := seq_id % max_seq_id;
  id := (floor(extract(epoch FROM tm) * 1000)::bigint - our_epoch) << 23;
  id := id | (shard_id << 12);
  id := id | seq_id;
  RETURN id;
END;
$$
LANGUAGE plpgsql
IMMUTABLE;

CREATE FUNCTION ?shard.next_id()
RETURNS bigint AS $$
BEGIN
   RETURN ?shard._next_id(clock_timestamp(), ?shard_id, nextval('?shard.id_seq'));
END;
$$
LANGUAGE plpgsql;
`
Output:

user1
user1 user2

func NewCluster

func NewCluster(dbs []*pg.DB, nshards int) *Cluster

func NewClusterWithGen

func NewClusterWithGen(dbs []*pg.DB, nshards int, gen *IdGen) *Cluster

NewClusterWithGen returns new PostgreSQL cluster consisting of physical dbs and running nshards logical shards.

func (*Cluster) Close

func (cl *Cluster) Close() error

func (*Cluster) DB

func (cl *Cluster) DB(number int64) *pg.DB

DB maps the number to the corresponding database server.

func (*Cluster) DBs

func (cl *Cluster) DBs() []*pg.DB

DBs returns list of database servers in the cluster.

func (*Cluster) ForEachDB

func (cl *Cluster) ForEachDB(fn func(db *pg.DB) error) error

ForEachDB concurrently calls the fn on each database in the cluster.

func (*Cluster) ForEachNShards

func (cl *Cluster) ForEachNShards(n int, fn func(shard *pg.DB) error) error

ForEachNShards concurrently calls the fn on each N shards in the cluster.

func (*Cluster) ForEachShard

func (cl *Cluster) ForEachShard(fn func(shard *pg.DB) error) error

ForEachShard concurrently calls the fn on each shard in the cluster. It is the same as ForEachNShards(1, fn).

func (*Cluster) Shard

func (cl *Cluster) Shard(number int64) *pg.DB

Shard maps the number to the corresponding shard in the cluster.

func (*Cluster) Shards

func (cl *Cluster) Shards(db *pg.DB) []*pg.DB

Shards returns list of shards running in the db. If db is nil all shards are returned.

func (*Cluster) SplitShard

func (cl *Cluster) SplitShard(id int64) *pg.DB

SplitShard uses SplitId to extract shard id from the id and then returns corresponding Shard in the cluster.

func (*Cluster) SubCluster

func (cl *Cluster) SubCluster(number int64, size int) *SubCluster

SubCluster returns a subset of the cluster of the given size.

type IdGen

type IdGen struct {
	// contains filtered or unexported fields
}

func NewIdGen

func NewIdGen(timeBits, shardBits, seqBits uint, epoch time.Time) *IdGen

func (*IdGen) MaxId

func (g *IdGen) MaxId(tm time.Time, shard int64) int64

MaxId returns max id for the time.

func (*IdGen) NextId

func (g *IdGen) NextId(tm time.Time, shard, seq int64) int64

NextId returns incremental id for the time. Note that you can only generate 4096 unique numbers per millisecond.

func (*IdGen) NumShards

func (g *IdGen) NumShards() int

func (*IdGen) SplitId

func (g *IdGen) SplitId(id int64) (tm time.Time, shardId int64, seqId int64)

SplitId splits id into time, shard id, and sequence id.

type ShardIdGen

type ShardIdGen struct {
	// contains filtered or unexported fields
}

IdGen generates sortable unique int64 numbers that consist of: - 41 bits for time in milliseconds. - 11 bits for shard id. - 12 bits for auto-incrementing sequence.

As a result we can generate 4096 ids per millisecond for each of 2048 shards. Minimum supported time is 1975-02-28, maximum is 2044-12-31.

func NewShardIdGen

func NewShardIdGen(shard int64, gen *IdGen) *ShardIdGen

NewShardIdGen returns id generator for the shard.

func (*ShardIdGen) MaxId

func (g *ShardIdGen) MaxId(tm time.Time) int64

MaxId returns max id for the time.

func (*ShardIdGen) NextId

func (g *ShardIdGen) NextId(tm time.Time) int64

NextId returns incremental id for the time. Note that you can only generate 4096 unique numbers per millisecond.

func (*ShardIdGen) SplitId

func (g *ShardIdGen) SplitId(id int64) (tm time.Time, shardId int64, seqId int64)

SplitId splits id into time, shard id, and sequence id.

type SubCluster

type SubCluster struct {
	// contains filtered or unexported fields
}

SubCluster is a subset of the cluster.

func (*SubCluster) ForEachNShards

func (cl *SubCluster) ForEachNShards(n int, fn func(shard *pg.DB) error) error

ForEachNShards concurrently calls the fn on each N shards in the subcluster.

func (*SubCluster) ForEachShard

func (cl *SubCluster) ForEachShard(fn func(shard *pg.DB) error) error

ForEachShard concurrently calls the fn on each shard in the subcluster. It is the same as ForEachNShards(1, fn).

func (*SubCluster) Shard

func (cl *SubCluster) Shard(number int64) *pg.DB

Shard maps the number to the corresponding shard in the subscluster.

func (*SubCluster) SplitShard

func (cl *SubCluster) SplitShard(id int64) *pg.DB

SplitShard uses SplitId to extract shard id from the id and then returns corresponding Shard in the subcluster.

type UUID

type UUID [uuidLen]byte

func NewUUID

func NewUUID(shardId int64, tm time.Time) UUID

func ParseUUID

func ParseUUID(b []byte) (UUID, error)

func (UUID) AppendValue

func (u UUID) AppendValue(b []byte, quote int) ([]byte, error)

func (*UUID) IsZero

func (u *UUID) IsZero() bool

func (*UUID) Scan

func (u *UUID) Scan(b interface{}) error

func (*UUID) ShardId

func (u *UUID) ShardId() int64

func (*UUID) Split

func (u *UUID) Split() (shardId int64, tm time.Time)

func (UUID) String

func (u UUID) String() string

func (*UUID) Time

func (u *UUID) Time() time.Time

func (UUID) Value

func (u UUID) Value() (driver.Value, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL