dddfirework

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2023 License: Apache-2.0 Imports: 14 Imported by: 0

README

English | 中文README

Overview

DDD Firework is a framework that supports the implementation of DDD (Domain-Driven Design). It provides a complete lifecycle wrapper for domain entities, including creation, modification, persistence, event sending, event listening, and integration with components such as locks, databases, and eventbuses.

Features

  1. Providing a unified application-level interface ICommand to encapsulate the entire lifecycle (build, behavior, persistence, event sending, etc.) management of domain layer entities.
  2. Automatic detection of changes to domain entities and persistence, without the need for users to handle complex storage logic.
  3. Support for rapid rebuild of complex aggregations (multiple levels of entity combinations), without the need to write specific query statements.
  4. Support for domain events, including domain event sending, registration of listener functions, and event callbacks.
  5. Hexagonal architecture with abstract definitions of each underlying component (locks, persistence, event buses, etc.) allows for flexible pluggable components.

Architecture

img.png

Usage

  1. Define domain entities

     import ddd "github.com/bytedance/dddfirework"
    
     type Order struct {
         ddd.BaseEntity
    
         UserID      string
         TotalAmount int64
         Remark      string
     }
    
  2. Register persistence models

     import ddd "github.com/bytedance/dddfirework"
     import "github.com/bytedance/dddfirework/executor/mysql"
    
     func init() {
         mysql.RegisterEntity2Model(&domain.Order{}, func(entity, parent ddd.IEntity, op ddd.OpType) (mysql.IModel, error) {
             do := entity.(*domain.Order)
             return &po.OrderPO{
                 ID:          do.GetID(),
                 User:        do.UserID,
                 TotalAmount: do.TotalAmount,
                 Remark:      do.Remark,
             }, nil
         }, func(m mysql.IModel, do ddd.IEntity) error {
             orderPO, order := m.(*po.OrderPO), do.(*domain.Order)
             order.UserID = orderPO.User
             order.TotalAmount = orderPO.TotalAmount
             order.Remark = orderPO.Remark
             return nil
         }
     }
    
  3. Define Command

     type UpdateOrderOpt struct {
         Remark *string
     }
    
     type UpdateOrderCommand struct {
         ddd.Command
    
         orderID string
         opt     UpdateOrderOpt
     }
    
     func NewUpdateOrderCommand(orderID string, opt UpdateOrderOpt) *UpdateOrderCommand {
         return &UpdateOrderCommand{
             orderID: orderID,
             opt:     opt,
         }
     }
    
     func (c *UpdateOrderCommand) Init(ctx context.Context) (lockIDs []string, err error) {
         return []string{c.orderID}, nil
     }
    
     func (c *UpdateOrderCommand) Build(ctx context.Context, builder dddfirework.DomainBuilder) (roots []dddfirework.IEntity, err error) {
         order := &domain.Order{
             ID:      id,
             Items:   []*domain.SaleItem{},
             Coupons: []*domain.Coupon{},
         }
         if err := builder.Build(ctx, order, &order.Items, &order.Coupons); err != nil {
             return nil, err
         }
         return []dddfirework.IEntity{order}, nil
     }
    
     func (c *UpdateOrderCommand) Act(ctx context.Context, container dddfirework.RootContainer, roots ...dddfirework.IEntity) error {
         order := roots[0].(*domain.Order)
         if c.opt.Remark != nil {
             order.Remark = *c.opt.Remark
             order.Dirty()
         }
    
         return nil
     }
    
  4. Execute command

     import (
         ddd "github.com/bytedance/dddfirework" 
         db_executor "github.com/bytedance/dddfirework/executor/mysql"
         db_eventbus "github.com/bytedance/dddfirework/eventbus/mysql"
         db_lock "github.com/bytedance/dddfirework/lock/db"
         "gorm.io/driver/mysql"
         "gorm.io/gorm"
     )
    
     func main() {
         lock := db_lock.NewDBLock(db, time.Second*10)
         executor := db_executor.NewExecutor(db)
         eventBus := db_eventbus.NewEventBus("svc_example", db)
         engine := dddfirework.NewEngine(lock, executor, eventBus.Options()...)
         engine.RunCommand(ctx, command.NewUpdateOrderCommand(
             req.ID, command.UpdateOrderOpt{Remark: req.Remark},
         ))
     }
    

More Example

ref: example/main.go

Documentation

Index

Constants

View Source
const WithRecursiveDelete = RecursiveDeleteOption(true)
View Source
const WithTransaction = TransactionOption(true)
View Source
const WithoutTransaction = TransactionOption(false)

Variables

View Source
var ErrBreak = fmt.Errorf("break process") // 中断流程,不返回错误
View Source
var ErrEntityNotFound = fmt.Errorf("entity not found")
View Source
var ErrEntityNotRegister = fmt.Errorf("entity not registered")
View Source
var ErrEntityRepeated = fmt.Errorf("entity already added")
View Source
var ErrNoEventBusFound = fmt.Errorf("no eventbus found")
View Source
var ErrNoEventTimerFound = fmt.Errorf("no event_timer found")

Functions

func RegisterEventBus

func RegisterEventBus(eventBus IEventBus)

RegisterEventBus 注册事件总线

func RegisterEventHandler

func RegisterEventHandler(t EventType, handler EventHandler)

RegisterEventHandler 注册事件处理器

func RegisterEventTXChecker

func RegisterEventTXChecker(t EventType, checker EventTXChecker)

RegisterEventTXChecker 注册事务反查接口

Types

type ActFunc

type ActFunc func(ctx context.Context, container RootContainer, roots ...IEntity) error

type Action

type Action struct {
	Op OpType

	Models      []IModel    // 当前待操作模型,stage 确保一个 Action 下都是同类的模型
	PrevModels  []IModel    // 基于快照生成的模型,跟 Models 一一对应,Executor 需要对两者做差异比对后更新
	Query       IModel      // 指定查询字段的数据模型
	QueryResult interface{} // Model 的 slice 的指针,形如 *[]ExampleModel
}

type ActionResult

type ActionResult struct {
	Data interface{}
}

type BaseEntity

type BaseEntity struct {
	// contains filtered or unexported fields
}

func NewBase

func NewBase(id string) BaseEntity

func (*BaseEntity) AddEvent

func (e *BaseEntity) AddEvent(evt IEvent, opts ...EventOpt)

AddEvent 实体发送事件,调用方需要保证事件是可序列化的,否则会导致 panic

func (*BaseEntity) Dirty

func (e *BaseEntity) Dirty()

func (*BaseEntity) GetChildren

func (e *BaseEntity) GetChildren() map[string][]IEntity

func (*BaseEntity) GetEvents

func (e *BaseEntity) GetEvents() []*DomainEvent

func (*BaseEntity) GetID

func (e *BaseEntity) GetID() string

func (*BaseEntity) IsDirty

func (e *BaseEntity) IsDirty() bool

func (*BaseEntity) SetID

func (e *BaseEntity) SetID(id string)

func (*BaseEntity) UnDirty

func (e *BaseEntity) UnDirty()

type BuildFunc

type BuildFunc func(ctx context.Context, h DomainBuilder) (roots []IEntity, err error)

type Command

type Command struct {
	// contains filtered or unexported fields
}

func (*Command) Act

func (c *Command) Act(ctx context.Context, container RootContainer, roots ...IEntity) (err error)

func (*Command) Build

func (c *Command) Build(ctx context.Context, builder DomainBuilder) (roots []IEntity, err error)

func (*Command) Commit

func (c *Command) Commit(ctx context.Context) error

Commit 提交当前事务,对当前的变更执行持久化操作,新建的实体会获得 ID

func (*Command) Init

func (c *Command) Init(ctx context.Context) (lockIDs []string, err error)

func (*Command) Output

func (c *Command) Output(data interface{})

Output 设定命令的返回值,data 会被赋值到 Result.Output 该方法可以在 Init - Build - Act 中调用,调用后,后续的方法将不会被执行 例如在 Init 中调用 Return,Build、Act 方法不会执行,但是 PostSave 会被执行

func (*Command) PostSave

func (c *Command) PostSave(ctx context.Context, res *Result)

func (*Command) SetStage

func (c *Command) SetStage(s StageAgent)

type DTimerOption added in v1.1.0

type DTimerOption struct {
	// contains filtered or unexported fields
}

func WithTimer added in v1.1.0

func WithTimer(timer ITimer) DTimerOption

func (DTimerOption) ApplyToOptions added in v1.1.0

func (t DTimerOption) ApplyToOptions(opts *Options)

type DomainBuilder

type DomainBuilder struct {
	// contains filtered or unexported fields
}

func (DomainBuilder) Build

func (h DomainBuilder) Build(ctx context.Context, parent IEntity, children ...interface{}) error

Build 查询并构建 parent 以及 children 实体 parent 必须指定 id,children 为可选,需要是 *IEntity 或者 *[]IEntity 类型

type DomainEvent

type DomainEvent struct {
	ID        string
	Type      EventType
	SendType  SendType
	Sender    string // 事件发出实体 ID
	Payload   []byte
	CreatedAt time.Time
}

func NewDomainEvent

func NewDomainEvent(event IEvent, opts ...EventOpt) *DomainEvent

func (*DomainEvent) GetSender

func (d *DomainEvent) GetSender() string

func (*DomainEvent) GetType

func (d *DomainEvent) GetType() EventType

type DomainEventHandler

type DomainEventHandler func(ctx context.Context, evt *DomainEvent) error

DomainEventHandler 通用 DomainEvent 的事件处理器

type DomainEventTXChecker

type DomainEventTXChecker func(evt *DomainEvent) TXStatus

DomainEventTXChecker 通用 DomainEvent 的事务回查

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

func NewEngine

func NewEngine(l ILock, e IExecutor, opts ...Option) *Engine

func (*Engine) Create

func (e *Engine) Create(ctx context.Context, roots ...IEntity) *Result

func (*Engine) Delete

func (e *Engine) Delete(ctx context.Context, roots ...IEntity) *Result

func (*Engine) NewStage

func (e *Engine) NewStage() *Stage

func (*Engine) RegisterCronTask added in v1.1.0

func (e *Engine) RegisterCronTask(key EventType, cron string, f func(key, cron string))

RegisterCronTask 注册定时任务

func (*Engine) RegisterEventHandler

func (e *Engine) RegisterEventHandler(eventType EventType, construct EventHandlerConstruct)

func (*Engine) RunCommand

func (e *Engine) RunCommand(ctx context.Context, c ICommand, opts ...Option) *Result

type EntityContainer

type EntityContainer struct {
	BaseEntity
	// contains filtered or unexported fields
}

EntityContainer 负责维护领域内所有聚合根实体的实体

func (*EntityContainer) Add

func (w *EntityContainer) Add(root IEntity) error

func (*EntityContainer) GetChildren

func (w *EntityContainer) GetChildren() map[string][]IEntity

func (*EntityContainer) GetDeleted

func (w *EntityContainer) GetDeleted() []IEntity

func (*EntityContainer) Recycle

func (w *EntityContainer) Recycle(e IEntity)

Recycle 回收所有被删除的实体

func (*EntityContainer) Remove

func (w *EntityContainer) Remove(root IEntity) error

func (*EntityContainer) SetChildren

func (w *EntityContainer) SetChildren(roots []IEntity)

type EntitySlice

type EntitySlice []IEntity

type ErrList

type ErrList []error

func (ErrList) Error

func (e ErrList) Error() string

type EventBusOption

type EventBusOption struct {
	// contains filtered or unexported fields
}

func WithEventBus

func WithEventBus(eventBus IEventBus) EventBusOption

func (EventBusOption) ApplyToOptions

func (t EventBusOption) ApplyToOptions(opts *Options)

type EventHandler

type EventHandler interface{}

EventHandler 指定特定领域事件的事件处理器,必须是带有 2 个入参的函数类型,第一个参数为 context.Context 类型 第二个为与 EventType 匹配的事件数据指针类型, 示例 func(ctx context.Context, evt *OrderCreatedEvent) error 当第二个参数声明为 *DomainEvent,EventHandler 回调时会传入原始的事件类型,用户定义事件以序列化形式存在 DomainEvent.Payload 中

type EventHandlerConstruct

type EventHandlerConstruct interface{}

EventHandlerConstruct EventHandler 的构造函数,带一个入参和一个返回值,入参是与事件类型匹配的事件数据指针类型,返回值是 ICommand 示例 func(evt *OrderCreatedEvent) *OnEventCreateCommand

type EventOpt

type EventOpt func(opt *EventOption)

func WithSendType

func WithSendType(t SendType) EventOpt

type EventOption

type EventOption struct {
	SendType SendType
	SendTime time.Time // 设定发送时间
}

type EventPersist

type EventPersist func(event *DomainEvent) (IModel, error)

type EventSaveOption

type EventSaveOption EventPersist

func WithEventPersist

func WithEventPersist(f EventPersist) EventSaveOption

func (EventSaveOption) ApplyToOptions

func (t EventSaveOption) ApplyToOptions(opts *Options)

type EventTXChecker

type EventTXChecker interface{}

EventTXChecker 事务事件的回查接口,必须带 1 个事件入参,1 个 TXStatus 返回值的函数 示例:func(evt *OrderCreatedEvent) TXStatus

type EventType

type EventType string

type IAfterCreate

type IAfterCreate interface {
	AfterCreate(ctx context.Context) error
}

type IAfterDelete

type IAfterDelete interface {
	AfterDelete(ctx context.Context) error
}

type IAfterUpdate

type IAfterUpdate interface {
	AfterUpdate(ctx context.Context) error
}

type IBeforeCreate

type IBeforeCreate interface {
	BeforeCreate(ctx context.Context) error
}

IBeforeCreate hook

type IBeforeDelete

type IBeforeDelete interface {
	BeforeDelete(ctx context.Context) error
}

type IBeforeUpdate

type IBeforeUpdate interface {
	BeforeUpdate(ctx context.Context) error
}

type ICommand

type ICommand interface {
	// Init 会在锁和事务之前执行,可进行数据校验,前置准备工作,可选返回锁ID
	Init(ctx context.Context) (lockKeys []string, err error)

	// Build 构建并返回聚合根实体,框架会为返回值自动生成快照,作为持久化比对的依据
	// 注意,Create 的情况不需要在 Build 方法返回
	Build(ctx context.Context, builder DomainBuilder) (roots []IEntity, err error)

	// Act 调用实体行为
	Act(ctx context.Context, container RootContainer, roots ...IEntity) (err error)

	// PostSave Save 事务完成后回调,可以执行组装返回数据等操作
	PostSave(ctx context.Context, res *Result)
}

type IConverter

type IConverter interface {
	// Entity2Model 实体转化为数据模型,entity: 当前实体 parent: 父实体 op: 调用场景,实体新建、实体更新、实体删除、实体查询等
	// 注意,对于Model上entity没有对应的字段,除新建场景外,其他需要保持为数据类型的零值,新建场景可以额外指定默认值
	// 实体未注册返回 ErrEntityNotRegister 错误
	Entity2Model(entity, parent IEntity, op OpType) (IModel, error)
	// Model2Entity 模型转换为实体,在创建实体的时候调用
	Model2Entity(model IModel, entity IEntity) error
}

type IDGeneratorOption

type IDGeneratorOption struct {
	// contains filtered or unexported fields
}

func WithIDGenerator

func WithIDGenerator(idGen IIDGenerator) IDGeneratorOption

func (IDGeneratorOption) ApplyToOptions

func (t IDGeneratorOption) ApplyToOptions(opts *Options)

type IDirty

type IDirty interface {
	// Dirty 标记实体对象是否需要更新
	Dirty()
	// UnDirty 取消实体的更新标记
	UnDirty()
	// IsDirty 判断实体是否需要更新
	IsDirty() bool
}

type IEntity

type IEntity interface {
	IDirty

	SetID(id string)
	GetID() string

	GetChildren() map[string][]IEntity
	GetEvents() []*DomainEvent
}

type IEvent

type IEvent interface {
	GetType() EventType // 事件类型
	GetSender() string  // 发送者id,可以用来实现事件保序
}

type IEventBus

type IEventBus interface {
	// Dispatch 发送领域事件到 EventBus,该方法会在事务内被同步调用
	// 对于每个事件,EventBus 必须要至少保证 at least once 送达
	Dispatch(ctx context.Context, evt ...*DomainEvent) error

	// RegisterEventHandler 注册事件回调,IEventBus 的实现必须保证收到事件同步调用该回调
	RegisterEventHandler(cb DomainEventHandler)
}

type IExecutor

type IExecutor interface {
	ITransaction
	IConverter

	Exec(ctx context.Context, actions *Action) error
}

type IIDGenerator

type IIDGenerator interface {
	NewID() (string, error)
}

type ILock

type ILock interface {
	Lock(ctx context.Context, key string) (keyLock interface{}, err error)
	UnLock(ctx context.Context, keyLock interface{}) error
}

type IModel

type IModel interface {
	GetID() string
}

type IStageSetter

type IStageSetter interface {
	SetStage(s StageAgent)
}

type ITimer added in v1.1.0

type ITimer interface {
	// RegisterTimerHandler 注册定时任务,定时到来时候调用该回调函数
	RegisterTimerHandler(cb TimerHandler)

	// RunCron 按照 cron 语法设置定时,并在定时到达后作为参数调用定时任务回调
	// key: 定时任务唯一标识,重复调用时不覆盖已有计时; cron: 定时配置; data: 透传数据,回调函数传入
	RunCron(key, cron string, data []byte) error

	// RunOnce 指定时间单次运行
	// key: 定时任务唯一标识,重复调用时不覆盖已有计时; t: 执行时间; data: 透传数据,回调函数传入
	RunOnce(key string, t time.Time, data []byte) error

	// Cancel 删除某个定时
	Cancel(key string) error
}

ITimer 分布式定时器协议

type ITransaction

type ITransaction interface {
	// Begin 开启事务,返回带有事务标识的 context,该 context 会原样传递给 Commit 或者 RollBack 方法
	Begin(ctx context.Context) (context.Context, error)
	// Commit 提交事务
	Commit(ctx context.Context) error
	// RollBack 回滚事务
	RollBack(ctx context.Context) error
}

type ITransactionEventBus

type ITransactionEventBus interface {
	IEventBus

	// RegisterEventTXChecker 注册事件回查
	RegisterEventTXChecker(cb DomainEventTXChecker)

	// DispatchBegin 开始发送事务,eventbus 需要保证同步成功,但是暂不发送到消费端
	// 返回值 context.Context 会传递给 Commie 和 Rollback 接口,实现方可以利用机制传递上下文信息
	DispatchBegin(ctx context.Context, evt ...*DomainEvent) (context.Context, error)

	// Commit 发送事件到消费端
	Commit(ctx context.Context) error

	// Rollback 回滚事件
	Rollback(ctx context.Context) error
}

ITransactionEventBus 支持事务性消息的实现 如果 eventbus 实现了这个接口,当发送事件的 DomainEvent.SendType == SendTypeTransaction 会优先选择事务的方式来发送

type LoggerOption

type LoggerOption struct {
	// contains filtered or unexported fields
}

func WithLogger

func WithLogger(logger logr.Logger) LoggerOption

func (LoggerOption) ApplyToOptions

func (t LoggerOption) ApplyToOptions(opts *Options)

type OpType

type OpType int8
const (
	OpUnknown OpType = 0
	OpInsert  OpType = 1
	OpUpdate  OpType = 2
	OpDelete  OpType = 3
	OpQuery   OpType = 4
)

type Option

type Option interface {
	ApplyToOptions(*Options)
}

type Options

type Options struct {
	WithTransaction bool
	RecursiveDelete bool         // 删除根实体是否递归删除所有子实体
	EventPersist    EventPersist // 是否保存领域事件到 DB
	Logger          logr.Logger
	EventBus        IEventBus
	Timer           ITimer
	IDGenerator     IIDGenerator
	PostSaveHooks   []PostSaveFunc
}

type PostSaveFunc

type PostSaveFunc func(ctx context.Context, res *Result)

type PostSaveOption

type PostSaveOption PostSaveFunc

func WithPostSave

func WithPostSave(f PostSaveFunc) PostSaveOption

func (PostSaveOption) ApplyToOptions

func (t PostSaveOption) ApplyToOptions(opts *Options)

type RecursiveDeleteOption

type RecursiveDeleteOption bool

func (RecursiveDeleteOption) ApplyToOptions

func (t RecursiveDeleteOption) ApplyToOptions(opts *Options)

type Result

type Result struct {
	Error   error
	Break   bool
	Actions []*Action
	Output  interface{}
}

func ResultErrOrBreak

func ResultErrOrBreak(err error) *Result

func ResultError

func ResultError(err error) *Result

func ResultErrors

func ResultErrors(err ...error) *Result

type RootContainer

type RootContainer struct {
	// contains filtered or unexported fields
}

RootContainer 聚合根实体容器

func (*RootContainer) Add

func (h *RootContainer) Add(root IEntity)

Add 创建聚合根实体

func (*RootContainer) Remove

func (h *RootContainer) Remove(root IEntity)

Remove 删除聚合根实体

type SendType

type SendType string

SendType 事件发送类型

const (
	SendTypeNormal      SendType = "normal"      // 普通事件
	SendTypeFIFO        SendType = "FIFO"        // 保序事件,即事件以 Sender 的发送时间顺序被消费执行
	SendTypeTransaction SendType = "transaction" // 事务事件
	SendTypeDelay       SendType = "delay"       // 延时发送
)

type Stage

type Stage struct {
	// contains filtered or unexported fields
}

Stage 取舞台的意思,表示单次运行

func (*Stage) Act

func (e *Stage) Act(f ActFunc) *Stage

func (*Stage) Build

func (e *Stage) Build(f BuildFunc) *Stage

func (*Stage) BuildEntity

func (e *Stage) BuildEntity(ctx context.Context, parent IEntity, children ...interface{}) error

BuildEntity 查询并构建 parent 以及 children 实体 parent 必须指定 id,children 为可选,需要是 *IEntity 或者 *[]IEntity 类型

func (*Stage) Lock

func (e *Stage) Lock(keys ...string) *Stage

func (*Stage) RunCommand

func (e *Stage) RunCommand(ctx context.Context, c ICommand) *Result

func (*Stage) Save

func (e *Stage) Save(ctx context.Context) *Result

func (*Stage) WithOption

func (e *Stage) WithOption(opts ...Option) *Stage

type StageAgent

type StageAgent struct {
	// contains filtered or unexported fields
}

func (*StageAgent) Commit

func (s *StageAgent) Commit(ctx context.Context) error

func (*StageAgent) Output

func (s *StageAgent) Output(data interface{})

type TXStatus

type TXStatus int
const (
	TXUnknown  TXStatus = 0 // 未知状态,EventBus 应当支持未知的重试
	TXCommit   TXStatus = 1 // 提交事务
	TXRollBack TXStatus = 2 // 回滚事务
)

type TimerEvent added in v1.1.0

type TimerEvent struct {
	Key     string
	Cron    string
	Payload []byte
}

TimerEvent 定时器专用的事件

func (*TimerEvent) GetSender added in v1.1.0

func (e *TimerEvent) GetSender() string

func (*TimerEvent) GetType added in v1.1.0

func (e *TimerEvent) GetType() EventType

type TimerHandler added in v1.1.0

type TimerHandler func(ctx context.Context, key, cron string, data []byte) error

type TransactionOption

type TransactionOption bool

func (TransactionOption) ApplyToOptions

func (t TransactionOption) ApplyToOptions(opts *Options)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL