cqrs

package module
v0.0.0-...-1a717d7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2024 License: MIT Imports: 11 Imported by: 0

README

CQRS

CQRS splits your application (and even the database in some cases) into two different paths: Commands and Queries.

Command side

Every operation that can trigger an side effect on the server must pass through the CQRS "command side". I like to put the Handlers (commands handlers and events handlers) inside the application layer because their goals are almost the same: orchestrate domain operations (also usually using infrastructure services).

command side

Query side

Pretty straight forward, the controller receives the request, calls the related query repo and returns a DTO (defined on infrastructure layer itself).

query side

Install

go install github.com/go-leo/cqrs/cmd/protoc-gen-cqrs@latest

Create a new project

├── api
│   └── demo
├── cmd
│   └── demo
│       ├── client
│       └── server
├── internal
│   └── demo
│       ├── assembler
│       ├── command
│       ├── model
│       └── query
└── third_party
    └── leo
        └── cqrs
           └── annotations.proto

Write proto api

add cqrs option at service

service DemoService {
  // Define command package information
  option(leo.cqrs.command) = {
    // package: the full package name of the command.
    package: "github.com/go-leo/cqrs/example/internal/demo/command"
    // relative: the package path of the command, relative to the current proto file.
    relative: "../../../internal/demo/command"
  };

  // Define command package information
  option(leo.cqrs.query) = {
    // package: the full package name of the query.
    package: "github.com/go-leo/cqrs/example/internal/demo/query"
    // relative: the package path of the command, relative to the current proto file.
    relative:  "../../../internal/demo/query"
  };
  ......
}

add responsibility at method


service DemoService {
  ......
  // GetUsers sync get users
  rpc GetUsers (GetUsersRequest) returns (GetUsersResponse) {
    option(leo.cqrs.responsibility) = Query;
  }

  // DeleteUser sync delete user
  rpc DeleteUser (DeleteUsersRequest) returns (google.protobuf.Empty) {
    option(leo.cqrs.responsibility) = Command;
  }

  // DeleteUser async get users
  rpc AsyncGetUsers (AsyncGetUsersRequest) returns (stream AsyncGetUsersResponse) {
    option(leo.cqrs.responsibility) = Query;
  }

  // AsyncDeleteUsers async delete users
  rpc AsyncDeleteUsers(AsyncDeleteUsersRequest) returns (stream google.protobuf.Empty) {
    option(leo.cqrs.responsibility) = Command;
  }
}
  • option(leo.cqrs.responsibility) = Query; means this method is a query method
  • option(leo.cqrs.responsibility) = Command; means this method is a command method
  • returns (Response) means this method is sync method
  • returns (stream Response) means this method is async method

demo.proto

generate file

protoc \
--proto_path=. \
--proto_path=third_party \
--go_out=. \
--go_opt=paths=source_relative \
--go-grpc_out=. \
--go-grpc_opt=paths=source_relative \
--go-grpc_opt=require_unimplemented_servers=false \
--cqrs_out=. \
--cqrs_opt=paths=source_relative \
--cqrs_opt=require_unimplemented_servers=false \
api/*/*.proto

will generate new files:

├── api
│   └── demo
│       ├── demo.cqrs.pb.go
│       ├── demo.pb.go
│       ├── demo.proto
│       └── demo_grpc.pb.go
├── cmd
│   └── demo
│       ├── client
│       └── server
├── compile.sh
├── internal
│   └── demo
│       ├── assembler
│       ├── command
│       │   ├── async_delete_users.go
│       │   ├── create_user.go
│       │   ├── delete_user.go
│       │   └── update_user.go
│       ├── model
│       └── query
│           ├── async_get_users.go
│           ├── get_user.go
│           └── get_users.go
└── third_party
    └── leo
        └── cqrs
            └── annotations.proto

fill the logic

write the grpc server launcher

package main

import (
	"github.com/go-leo/cqrs/example/api/demo"
	"github.com/go-leo/cqrs/example/internal/demo/assembler"
	"github.com/go-leo/cqrs/example/internal/demo/command"
	"github.com/go-leo/cqrs/example/internal/demo/query"
	"google.golang.org/grpc"
	"log"
	"net"
)

func main() {
	lis, err := net.Listen("tcp", ":8080")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	bus, err := demo.NewDemoServiceBus(
		command.NewCreateUser(),
		command.NewUpdateUser(),
		query.NewGetUser(),
		query.NewGetUsers(),
		command.NewDeleteUser(),
		query.NewAsyncGetUsers(),
		command.NewAsyncDeleteUsers(),
	)
	if err != nil {
		panic(err)
	}
	service := demo.NewDemoServiceCQRSService(bus, assembler.NewDemoServiceAssembler())
	demo.RegisterDemoServiceServer(s, service)
	log.Printf("server listening at %v", lis.Addr())
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

write grpc client call

package main

import (
	"context"
	"flag"
	"fmt"
	"github.com/go-leo/cqrs/example/api/demo"
	"github.com/go-leo/gox/mathx/randx"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"log"
)

func main() {
	flag.Parse()
	// Set up a connection to the server.
	conn, err := grpc.Dial(":8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	client := demo.NewDemoServiceClient(conn)

	createUserResp, err := client.CreateUser(context.Background(), &demo.CreateUserRequest{
		Name:   randx.HexString(12),
		Age:    randx.Int31n(50),
		Salary: float64(randx.Int31n(100000)),
		Token:  randx.NumericString(16),
	})
	if err != nil {
		panic(err)
	}
	fmt.Println("CreateUser:", createUserResp)

	updateUserResp, err := client.UpdateUser(context.Background(), &demo.UpdateUserRequest{
		Name:   randx.HexString(12),
		Age:    randx.Int31n(50),
		Salary: float64(randx.Int31n(100000)),
		Token:  randx.NumericString(16),
	})
	if err != nil {
		panic(err)
	}
	fmt.Println("UpdateUser:", updateUserResp)

	getUserResp, err := client.GetUser(context.Background(), &demo.GetUserRequest{
		Name:   "tom",
		Age:    30,
		Salary: 30000,
		Token:  "4108475619",
	})
	if err != nil {
		panic(err)
	}
	fmt.Println("GetUser:", getUserResp)

	getUsersResp, err := client.GetUsers(context.Background(), &demo.GetUsersRequest{
		PageNo:   1,
		PageSize: 10,
	})
	if err != nil {
		panic(err)
	}
	fmt.Println("GetUsers:", getUsersResp)

	deleteUserResp, err := client.DeleteUser(context.Background(), &demo.DeleteUsersRequest{
		Name: "jax",
	})
	if err != nil {
		panic(err)
	}
	fmt.Println("DeleteUser:", deleteUserResp)

	asyncGetUsersRespStream, err := client.AsyncGetUsers(context.Background(), &demo.AsyncGetUsersRequest{
		PageNo:   1,
		PageSize: 10,
	})
	if err != nil {
		panic(err)
	}
	fmt.Println("AsyncGetUsers wait...")
	asyncGetUsersResp, err := asyncGetUsersRespStream.Recv()
	if err != nil {
		panic(err)
	}
	fmt.Println("AsyncGetUsers:", asyncGetUsersResp)

	asyncDeleteUsersStream, err := client.AsyncDeleteUsers(context.Background(), &demo.AsyncDeleteUsersRequest{
		Names: []string{"jax", "tom", "jerry"},
	})
	if err != nil {
		panic(err)
	}
	fmt.Println("AsyncDeleteUsers wait...")
	asyncDeleteUsersResp, err := asyncDeleteUsersStream.Recv()
	if err != nil {
		panic(err)
	}
	fmt.Println("AsyncDeleteUsers:", asyncDeleteUsersResp)

}

sample

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrHandlerNil CommandHandler or QueryHandler is nil
	ErrHandlerNil = errors.New("cqrs: handler is nil")

	// ErrRegistered not register CommandHandler or QueryHandler
	ErrRegistered = errors.New("cqrs: handler registered")

	// ErrUnregistered is not register CommandHandler or QueryHandler
	ErrUnregistered = errors.New("cqrs: handler unregistered")

	// ErrArgsNil arguments is nil
	ErrArgsNil = errors.New("cqrs: arguments is nil")

	// ErrBusClosed bus was closed
	ErrBusClosed = errors.New("cqrs: bus was closed")

	// ErrUnimplemented handler is not implement CommandHandler or QueryHandler
	ErrUnimplemented = errors.New("cqrs: handler is not implement CommandHandler or QueryHandler")
)

Functions

This section is empty.

Types

type Bus

type Bus interface {

	// RegisterCommand register CommandHandler
	RegisterCommand(handler any) error

	// RegisterQuery register QueryHandler
	RegisterQuery(handler any) error

	// Exec synchronously executes a command
	Exec(ctx context.Context, args any) error

	// Query synchronously executes a query
	Query(ctx context.Context, args any) (any, error)

	// AsyncExec asynchronously executes a command, result in Future
	AsyncExec(ctx context.Context, args any) (Future, error)

	// AsyncQuery asynchronously executes a query, result in Future
	AsyncQuery(ctx context.Context, args any) (Future, error)

	// Close bus gracefully
	Close(ctx context.Context) error
}

Bus is a bus, register CommandHandler and QueryHandler, execute Command and query Query

func NewBus

func NewBus(opts ...Option) Bus

type CommandHandler

type CommandHandler[Args any] interface {
	Handle(ctx context.Context, args Args) error
}

CommandHandler is a command handler that to update data.

type CommandHandlerFunc

type CommandHandlerFunc[Args any] func(ctx context.Context, args Args) error

The CommandHandlerFunc type is an adapter to allow the use of ordinary functions as CommandHandler.

func (CommandHandlerFunc[Args]) Handle

func (f CommandHandlerFunc[Args]) Handle(ctx context.Context, args Args) error

Handle calls f(ctx).

type Future

type Future interface {
	// Get wait for computation completion, and to retrieve the result of the computation.
	Get(ctx context.Context) (any, error)
}

Future represents the result of an asynchronous computation.

type NoopCommand

type NoopCommand[Args any] struct{}

NoopCommand is an CommandHandler that does nothing and returns a nil error.

func (NoopCommand[Args]) Handle

func (NoopCommand[Args]) Handle(context.Context, Args) error

type NoopQuery

type NoopQuery[Args any, Result any] struct{}

NoopQuery is an QueryHandler that does nothing and returns a nil error.

func (NoopQuery[Args, Result]) Handle

func (NoopQuery[Args, Result]) Handle(context.Context, Args) (Result, error)

type Option

type Option func(*option)

func Pool

func Pool(pool gopher.Gopher) Option

type QueryHandler

type QueryHandler[Args any, Result any] interface {
	Handle(ctx context.Context, args Args) (Result, error)
}

QueryHandler is a query handler that to handlers to read data.

type QueryHandlerFunc

type QueryHandlerFunc[Args any, Result any] func(ctx context.Context, args Args) (Result, error)

The QueryHandlerFunc type is an adapter to allow the use of ordinary functions as QueryHandler.

func (QueryHandlerFunc[Args, Result]) Handle

func (f QueryHandlerFunc[Args, Result]) Handle(ctx context.Context, args Args) (Result, error)

Handle calls f(ctx).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL