quokka

package module
v0.0.0-...-08c9172 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2017 License: Apache-2.0 Imports: 16 Imported by: 0

README

quokka

document database built on top of mysql

architecture

There are two main parts

  • the main storage
  • the views

The main storage will store the authentic copy of data in a reliable way. It will also need to handle any conflict of business rule (such as balance never become negative) in a thread safe way.

The views will represent the data in different kind of indices to answer queries faster. It might also merge multiple sources into one view.

The goal is to build a document database that will have these two parts acting as one, with these features:

  • Good old technology: only depends on mysql to function properly
  • Schema free: the main storage takes JSON
  • Handle update in a idempotent way: free user from doing this hard work himself
  • Keep business rule on data integrity: it should work like database unique constraint.
  • High performance on concurrent update to same entity: more than 10k tps on same entity
  • View can be easily implemented: do not impose limit on view storage, mysql/redis/elasticsearch they all can be view storage.
  • Reliable view update: the view can not have loss update, every change should be synchronized
  • Low latency view update: if user want, the view can be updated in the same request handing the main storage
  • Long term stability: historic data should be automatically archived properly

Main storage correctness

the main storage table is defined by this sql (assuming the entity is called account)

CREATE TABLE `v2pro`.`account` (
  `event_id`     BIGINT       NOT NULL       AUTO_INCREMENT,
  `entity_id`    CHAR(20)     NOT NULL,
  `version`      BIGINT       NOT NULL,
  `command_id`   VARCHAR(256) NOT NULL,
  `command_name` VARCHAR(256) NOT NULL,
  `request`      JSON         NULL,
  `response`     JSON         NOT NULL,
  `state`        JSON         NOT NULL,
  `committed_at` DATETIME     NOT NULL       DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`event_id`),
  UNIQUE KEY `unique_version` (`entity_id`, `version`),
  UNIQUE KEY `unique_command` (`entity_id`, `command_id`)
);

The process to update one entity

  • load the old state
  • handle the request, generate response and new state
  • save the new state with version + 1

If there are concurrent update the same entity, the unique_version constraint will prevent the later one being saved. The user defined handler can enforce any kind of business rule to avoid state being updated to a unexpected new state.

Another key concern is to be idempotent. As long as the command id is same, the response should be the same. This is achieved by unique_command constraint. If same command being handled twice, the first response will be always used.

As long as mysql unique constraint is working properly, we are able to keep our promises. We do not rely on etcd to shard the traffic to corresponding application server reliably. We do not do any decision solely based on application server local cache. We do not store any persistent local state on the application server. We choose to keep the application server stateless, and use the good old mysql to ensure transaction safety. All of that is based on two unique indices.

Main storage performance

Mysql can not provide more than 10k tps update on same entity. We need to do something more. In essence, wee need to make the application server "stateful" so that we can

  • queue on the application server instead of pushing the load to the database
  • batch on the application server, so we can write in bigger chunk for better throughput

However, this "stateful" is just a performance optimization. We do not rely on the state to keep the logic right. Because we are not relying the "state" on application server, when we lost the state or we screwed up the sharding, data integrity will not be compromised. Everything still works if traffic not sharded, just slower.

So, we shard the application server, so that request to same entity always hit same application server. Then, we queue the requests up and batch process them. The internal queue is just a golang channel. Batching is implemented by "select" on the channel.

func (worker *worker) fetchCommands() []*command {
	done := false
	commands := []*command{}
	for !done {
		select {
		case cmd := <-worker.commandQ:
			commands = append(commands, cmd)
		default:
			done = true
		}
		if len(commands) > 1000 {
			break
		}
	}
	return commands
}

The sharding is done by this simple logic:

  • use etcd to elect one server as the leader: this is optional, we can choose to use a static topology
  • every server keep a heart beat to the elected leader
  • when doing heart beat, leader will piggy back the latest shard allocation, every server locally cache it
  • any request can hit any server, the server will redirect it to the right shard
  • when server received a redirected request, it will handle it unconditionally

The key design decisions are:

  • we only use etcd to elect a leader, we do not use it to allocate shard. losing leader will not stop the traffic. only the topology is no longer the latest, which might result in more lock contention.
  • request will only be redirected once, there is no harm when two servers are handling same shard, only performance will be downgraded (due to optimistic lock contention)
  • every server is both "gateway" and "command handler"

Reliable view update

We do not use mysql binlog to synchronize view. Instead the entity table is partitioned like kafka partition. event_id is a auto increment big int works like kafka offset. In essence, every entity table partition is a event queue. The view updater will scan the event table and write update to the view side. There two things to watch for

  • How to ensure idempotence on view update, if the synchronization is retried?
  • How to remember the last updated offset?

The updated offset can be recorded as a mysql table. Every entity table partition will have a highest sync offset. We batch process the update, and only update the offset record every second.

Because the offset is not recorded in the view side with transaction, and committed on every write, the update to the view will be retried. We record the entity version on the view side. When we update the view, we compare the event (which contains entity version) with the applied version. If one event has already been applied, we skip it.

Having a separate view updater run in the background every 100ms, we can ensure we do not lose the update on the view side. However, the table polling will result in high latency.

Low latency view update

We allow synchronous view update directly from command handler, for latency sensitive view. When a command is committed, the view update is done immediately afterwards by same worker goroutine. This way we can cut down the hand over time to nearly zero, there is no queue can beat this. However, there things to care about

  • update view directly from command handler means there will be concurrent updates.
  • update view might fail.

The concurrent update will be handled by the optimistic lock on the view side, same as view updater retry. Update view might fail. We might lose update if only rely on direct update from command handler. Luckily, we have separate view updater working in "pull" mode to ensure integrity. It is just like a patch for "push" mode view update.

Closing thought

Both the main storage and views are built upon optimistic lock. And sharding & queueing is a optimization to reduce the lock contention.

Documentation

Overview

Package xid is a globally unique id generator suited for web scale

Xid is using Mongo Object ID algorithm to generate globally unique ids: https://docs.mongodb.org/manual/reference/object-id/

  • 4-byte value representing the seconds since the Unix epoch,
  • 3-byte machine identifier,
  • 2-byte process id, and
  • 3-byte counter, starting with a random value.

The binary representation of the id is compatible with Mongo 12 bytes Object IDs. The string representation is using base32 hex (w/o padding) for better space efficiency when stored in that form (20 bytes). The hex variant of base32 is used to retain the sortable property of the id.

Xid doesn't use base64 because case sensitivity and the 2 non alphanum chars may be an issue when transported as a string between various systems. Base36 wasn't retained either because 1/ it's not standard 2/ the resulting size is not predictable (not bit aligned) and 3/ it would not remain sortable. To validate a base32 `xid`, expect a 20 chars long, all lowercase sequence of `a` to `v` letters and `0` to `9` numbers (`[0-9a-v]{20}`).

UUID is 16 bytes (128 bits), snowflake is 8 bytes (64 bits), xid stands in between with 12 bytes with a more compact string representation ready for the web and no required configuration or central generation server.

Features:

  • Size: 12 bytes (96 bits), smaller than UUID, larger than snowflake
  • Base32 hex encoded by default (16 bytes storage when transported as printable string)
  • Non configured, you don't need set a unique machine and/or data center id
  • K-ordered
  • Embedded time with 1 second precision
  • Unicity guaranted for 16,777,216 (24 bits) unique ids per second and per host/process

Best used with xlog's RequestIDHandler (https://godoc.org/github.com/rs/xlog#RequestIDHandler).

References:

Index

Constants

This section is empty.

Variables

View Source
var ConfigDefault = Config{}.Froze()
View Source
var ErrInvalidID = errors.New("xid: invalid ID")

ErrInvalidID is returned when trying to unmarshal an invalid ID

Functions

func StartHttpServer

func StartHttpServer()

func StoreOf

func StoreOf(entityName string) *entityStore

Types

type ClientConfig

type ClientConfig struct {
	HttpAddr string
}

type Config

type Config struct {
	JsonApi  codec.Codec
	HttpAddr string
}

func (Config) Froze

func (cfg Config) Froze() *frozenConfig

type Entity

type Entity struct {
	EntityId  string
	Version   int64
	StateJson []byte
	State     interface{}
	UpdatedAt time.Time
}

type HandleCommand

type HandleCommand func(request interface{}, state interface{}) (response interface{}, newState interface{}, err error)

type ID

type ID [rawLen]byte

ID represents a unique request id

func FromString

func FromString(id string) (ID, error)

FromString reads an ID from its string representation

func NewID

func NewID() ID

New generates a globaly unique ID

func (ID) Counter

func (id ID) Counter() int32

Counter returns the incrementing value part of the id. It's a runtime error to call this method with an invalid id.

func (ID) Machine

func (id ID) Machine() []byte

Machine returns the 3-byte machine id part of the id. It's a runtime error to call this method with an invalid id.

func (ID) MarshalText

func (id ID) MarshalText() ([]byte, error)

MarshalText implements encoding/text TextMarshaler interface

func (ID) Pid

func (id ID) Pid() uint16

Pid returns the process id part of the id. It's a runtime error to call this method with an invalid id.

func (*ID) Scan

func (id *ID) Scan(value interface{}) (err error)

Scan implements the sql.Scanner interface.

func (ID) String

func (id ID) String() string

String returns a base32 hex lowercased with no padding representation of the id (char set is 0-9, a-v).

func (ID) Time

func (id ID) Time() time.Time

Time returns the timestamp part of the id. It's a runtime error to call this method with an invalid id.

func (*ID) UnmarshalText

func (id *ID) UnmarshalText(text []byte) error

UnmarshalText implements encoding/text TextUnmarshaler interface

func (ID) Value

func (id ID) Value() (driver.Value, error)

Value implements the driver.Valuer interface.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL