promise

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 11, 2019 License: MIT Imports: 8 Imported by: 0

README

Workshop

 Workshop is a latency and fault tolerance library designed to isolate points of access to remote systems, services and 3rd party libraries, stop cascading failure and enable resilience in complex distributed systems where failure is inevitable.
 
 + Inspire by [Netflix/Hystrix](https://github.com/Netflix/Hystrix)
 
 + Promise like interface
 
 + support intercepter to extand like rate / limit
examples
  • promise
    func ExamplePromise() {
    	pool := NewPool(3)
    	pms := NewPromise(pool, Process{
    		Process: func(ctx context.Context, last interface{}) (interface{}, error) {
    			// .... Do task
    			return true, nil
    		},
    	})
    
    	// close promise will close task context
    	defer pms.Close()
    
    	ctx, _ := context.WithTimeout(context.Background(), 100 * time.Millisecond)
    	_, err := pms.Get(ctx)
    	if err != nil {
    		// ... Handle err
    	}
    }

  • promise chain
func ExamplePromiseChain() {
   	pool := NewPool(3)
   	pms := NewPromise(pool, Process{
   		Process: func(ctx context.Context, last interface{}) (interface{}, error) {
   			// .... Do task 1
   			log.Println("do task one")
   			return true, nil
   		},
   	}).Then(Process{
   		Process: func(ctx context.Context, last interface{}) (interface{}, error) {
   			// .... Do task 2
   			log.Println("do task two")
   			return true, nil
   		},
   	})
   
   	// close promise will close task context
   	defer pms.Close()
   
   	ctx, _ := context.WithTimeout(context.Background(), 100 * time.Millisecond)
   	_, err := pms.Get(ctx)
   	if err != nil {
   		// ... Handle err
   	}
}
  • retry
func ExamplePromiseRetry() {
	pool := NewPool(3)
	pms := NewPromise(pool, Process{
		Process: func(ctx context.Context, last interface{}) (interface{}, error) {
			// .... Do task 1
			log.Println("do task one")
			return true, nil
		},
	}).RecoverAndRetry(ExceptionProcess{
		Process: func(ctx context.Context, err error, last interface{}) (interface{}, error) {
			// .... try recover
			log.Println("try recover")
			return last, nil
		},
	})

	// close promise will close task context
	defer pms.Close()

	ctx, _ := context.WithTimeout(context.Background(), 100 * time.Millisecond)
	_, err := pms.Get(ctx)
	if err != nil {
		// ... Handle err
	}
}
  • breaker
func ExampleBreaker() {
	var failedLimit float64 = 6
	bucketSize := 6
	middle := NewBreakerMiddle(time.Second, failedLimit, bucketSize)

	pool := workshop.NewPool(4)
	var err error
	pms := workshop.NewPromise(pool, workshop.Process{
		Process: func(ctx context.Context, last interface{}) (interface{}, error) {
			return nil, nil
		},
		EventKey: 1,
		Middles: []workshop.Middle{middle}, // use breaker middle
	})
	err = pms.Wait(context.Background())
	if err == BreakerOpenErr {
		// rate limit
		log.Println("breaker open")
	}
}
  • rate limit
func ExampleRate() {
	pool := workshop.NewPool(4)
	var md = NewRateMiddle(6, 6)
	var err error
	pms := workshop.NewPromise(pool, workshop.Process{
		Process: func(ctx context.Context, last interface{}) (interface{}, error) {
			return nil, nil
		},
		EventKey: 1,
		Middles: []workshop.Middle{md}, // use rate middle
	})
	err = pms.Wait(context.Background())
	if err == RateLimitErr {
		// rate limit
		log.Println("rate limit occur")
	}
}

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func WaitTime

func WaitTime(ctx context.Context, waitTime time.Duration) error

Types

type ExceptionPromise

type ExceptionPromise struct {
	// contains filtered or unexported fields
}

func (ExceptionPromise) HandleException

func (p ExceptionPromise) HandleException(ps ProcessFunc, middles ...Middle) *Promise

type Middle

type Middle struct {
	Name        string
	Wrapper     func(process Profile) Profile
	Inheritable bool
	Labels      map[string]string
}

func EventKeyMiddle

func EventKeyMiddle(eventKey int) Middle

func PartitionMiddle

func PartitionMiddle(partition bool) Middle

func WaitMiddle

func WaitMiddle(wait func(ctx context.Context, req Request) error) Middle

func WaitTimeMiddle

func WaitTimeMiddle(waitTime time.Duration) Middle

func WrapProcess

func WrapProcess(name string, wrapper func(ProcessFunc) ProcessFunc) Middle

func WrapTimeout

func WrapTimeout(name string, timeout time.Duration) Middle

func (Middle) WithInheritable

func (mid Middle) WithInheritable(inheritable bool) Middle

func (Middle) WithName

func (mid Middle) WithName(name string) Middle
type MiddleLink struct {
	// contains filtered or unexported fields
}

func (*MiddleLink) Append

func (link *MiddleLink) Append(md ...Middle) *MiddleLink

func (*MiddleLink) IncFragmentID

func (link *MiddleLink) IncFragmentID() *MiddleLink

func (*MiddleLink) Range

func (link *MiddleLink) Range(walk func(middle Middle) bool)

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

func NewPool

func NewPool(maxConcurrent int) *Pool

func (*Pool) Close

func (pool *Pool) Close() error

func (*Pool) Feed

func (pool *Pool) Feed(ctx context.Context, box TaskBox) error

func (*Pool) MaxLocalID

func (pool *Pool) MaxLocalID() int

type ProcessFunc

type ProcessFunc func(ctx context.Context, req Request) Result

type Profile

type Profile struct {
	Wait      func(ctx context.Context, req Request) error
	Process   ProcessFunc
	MiddleEnd bool
	// contains filtered or unexported fields
}

func (Profile) GetPartition

func (p Profile) GetPartition() bool

func (Profile) GetPromiseKey

func (p Profile) GetPromiseKey() int

func (Profile) SetPartition

func (p Profile) SetPartition(partition bool)

func (Profile) SetPromiseKey

func (p Profile) SetPromiseKey(eventKey int)

type Promise

type Promise struct {
	// contains filtered or unexported fields
}
Example
pool := NewPool(3)
pms := NewPromise(pool, func(ctx context.Context, req Request) Result {
	// .... Do task
	return Result{
		Err:     nil,
		Payload: 25,
	}
})

// close promise will close task context
defer pms.Close()

ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond)
_, err := pms.Get(ctx, true)
if err != nil {
	// ... Handle err
}
Output:

func NewPromise

func NewPromise(pool *Pool, process ProcessFunc, middles ...Middle) *Promise

func (*Promise) Close

func (p *Promise) Close()

close will close all parent or sub future

func (*Promise) Get

func (p *Promise) Get(ctx context.Context, close bool) (interface{}, error)

func (*Promise) IsClosed

func (p *Promise) IsClosed() bool

func (*Promise) IsStarted added in v0.2.0

func (p *Promise) IsStarted() bool

func (*Promise) OnException

func (p *Promise) OnException(ps ProcessFunc, middles ...Middle) *Promise

return new exception future, basic interface

func (*Promise) Recover

func (p *Promise) Recover(recover ProcessFunc, middles ...Middle) ExceptionPromise

func (*Promise) Start

func (p *Promise) Start(ctx context.Context) bool

Try start, if chan is started or closed return false, other return true

func (*Promise) Then

func (p *Promise) Then(ps ProcessFunc, middles ...Middle) *Promise

only on success, return a new Process, basic interface

func (*Promise) TryRecover

func (p *Promise) TryRecover(recover ProcessFunc, onRecoverFailed ProcessFunc, middles ...Middle) *Promise

func (*Promise) Wait

func (p *Promise) Wait(ctx context.Context, close bool) error

Wait all sub route finish or closed

type Request

type Request struct {
	// contains filtered or unexported fields
}

func (Request) GetHead

func (r Request) GetHead(key string) string

func (Request) LastErr

func (r Request) LastErr() error

func (Request) LastPayload

func (r Request) LastPayload() interface{}

func (Request) Partition

func (r Request) Partition() bool

func (Request) PromiseKey

func (r Request) PromiseKey() int

func (Request) RangeHead

func (r Request) RangeHead(iter func(k, v string) bool)

func (Request) WithHead

func (r Request) WithHead(key, value string) Request

type Result

type Result struct {
	Err     error
	Payload interface{}
}

type TaskBox

type TaskBox struct {
	// contains filtered or unexported fields
}

type TaskFunc

type TaskFunc func(ctx context.Context, localId int)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL