connpool

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2022 License: MIT Imports: 4 Imported by: 0

README

connpool

Package connpool is a general purpose object pool which can be used as a connection pool or a freelist.

go get "github.com/marlonche/connpool"

API docs: GoDoc

Documentation

Overview

Package connpool is a general purpose object pool which can be used as a connection pool or a freelist.

Below is a demo showing how to use it.

The flowing two files can be found under github.com/marlonche/connpool/example/.

streampool.go

package main

import (
	"bufio"
	"fmt"
	"github.com/marlonche/connpool"
	"net"
	"sync"
)

type PooledStreamErr string

func (self PooledStreamErr) Error() string {
	return string(self)
}

var invalidStream = PooledStreamErr("invalid pooled stream")

// Implemented PoolItem: pooled TCP stream
type PooledStream struct {
	sync.RWMutex
	stream    net.Conn
	pool      *StreamPool
	err       error
	container connpool.PoolItem
	closed    bool
}

func NewPooledStream(stream net.Conn, pool *StreamPool) *PooledStream {
	return &PooledStream{
		stream: stream,
		pool:   pool,
		closed: false,
	}
}

// Just save the parameter passed in
func (self *PooledStream) SetContainer(container connpool.PoolItem) {
	self.container = container
}

// Just return the saved parameter of SetContainer()
func (self *PooledStream) GetContainer() connpool.PoolItem {
	return self.container
}

// This method is called by connpool as well as by users who encounter errors when
// using PooledStream, e.g., PooledStream.Read().
// You can keep the error if it's unrecoverable and you want to discard the PooledStream,
// or you can ignore the error if it doesn't affect the reuse of the PooledStream.
// In PooledStream.Close(), you can check the kept error to discard or reuse PooledStream.
func (self *PooledStream) SetErr(err error) {
	self.Lock()
	defer self.Unlock()
	if self.closed {
		return
	}
	if err != nil {
		if self.err != nil {
			if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
				return
			}
		}
		self.err = err
	}
}

// Return the error set by SetErr().
func (self *PooledStream) GetErr() error {
	self.RLock()
	defer self.RUnlock()
	return self.err
}

// Called after finishing using the PooledStream.
// If the item is in error state, clear it by calling Pool.ClearItem(),
// otherwise give it back by calling Pool.GiveBack().
func (self *PooledStream) Close() error {
	self.Lock()
	defer self.Unlock()
	if self.closed {
		return invalidStream
	}
	err := self.err
	if err != nil {
		self.pool.clearConn(self)
		self.pool = nil
		self.closed = true
		self.stream.Close()
	} else {
		self.pool.giveBack(self)
	}
	return nil
}

// Not part of PoolItem interface, just application logic.
func (self *PooledStream) Write(b []byte) (int, error) {
	return self.stream.Write(b)
}

func (self *PooledStream) Read(b []byte) (int, error) {
	return self.stream.Read(b)
}

// Implemented PoolItem creator
type streamCreator struct {
	pool *StreamPool
	addr string
}

// Called by connpool when more PoolItems are needed.
func (self *streamCreator) NewItem() (connpool.PoolItem, error) {
	conn, err := net.Dial("tcp", self.addr)
	if err != nil {
		return nil, err
	}
	pooledStream := NewPooledStream(conn, self.pool)
	return pooledStream, nil
}

// Called by connpool every time before Pool.Get()'s return.
// n = 1 means the first time.
func (self *streamCreator) InitItem(item connpool.PoolItem, n uint64) error {
	if 1 == n {
		// first Get()
		if stream, _ := item.(*PooledStream); stream != nil {
			// receive from stream
			go func() {
				r := bufio.NewReader(stream)
				for {
					s, err := r.ReadString('\n')
					if err != nil {
						stream.SetErr(err)
						stream.Close()
						break
					}
					fmt.Printf("get echo from server: %v", s)
				}
			}()
		}
	}
	return nil
}

func (self *streamCreator) Close() error {
	self.pool = nil
	return nil
}

// pool wrapper
type StreamPool struct {
	pool *connpool.Pool
}

func NewStreamPool(name string, addr string, maxTotal int, maxIdle int, idleTimeout int) *StreamPool {
	creator := &streamCreator{
		addr: addr,
	}
	streamPool := &StreamPool{
		pool: connpool.NewPool(name, creator, maxTotal, maxIdle, idleTimeout),
	}
	creator.pool = streamPool
	return streamPool
}

// wrapper of Pool.Get()
func (self *StreamPool) Get() (*PooledStream, error) {
	item, err := self.pool.Get()
	if err != nil {
		return nil, err
	}
	if stream, ok := item.(*PooledStream); ok && stream != nil {
		return stream, nil
	}
	return nil, invalidStream
}

// wrapper of Pool.ClearItem()
func (self *StreamPool) clearConn(pooledStream *PooledStream) {
	self.pool.ClearItem(pooledStream)
}

// wrapper of Pool.GiveBack()
func (self *StreamPool) giveBack(pooledStream *PooledStream) {
	self.pool.GiveBack(pooledStream)
}

// wrapper of Pool.Close()
func (self *StreamPool) Close() {
	self.pool.Close()
}

main.go

package main

import (
	"bufio"
	"flag"
	"fmt"

	"net"
	"time"
)

var flagAsServer = flag.Bool("asServer", false, "run as server demo")

func main() {
	flag.Parse()
	asServer := *flagAsServer
	if asServer {
		runAsServer()
	} else {
		runAsClient()
	}
}

func runAsClient() {
	pool := NewStreamPool("pool-name", "127.0.0.1:9999", 10, 5, 60)
	for i := 0; i < 5000; i++ {
		go func() {
			conn, err := pool.Get()
			if err != nil {
				fmt.Printf("pool.Get() error: %v", err)
				return
			}
			defer conn.Close()
			content := fmt.Sprintf("Hello, my id is %v\n", time.Now().Nanosecond())
			_, err = conn.Write([]byte(content))
			if err != nil {
				fmt.Printf("conn write error: %v", err)
				conn.SetErr(err)
			}
		}()
	}
	time.Sleep(time.Hour)
}

func runAsServer() {
	l, err := net.Listen("tcp", ":9999")
	if err != nil {
		fmt.Printf("listen error: %v", err)
		return
	}
	for {
		conn, err := l.Accept()
		if err != nil {
			fmt.Printf("Accept error: %v", err)
			return
		}
		go func() {
			r := bufio.NewReader(conn)
			for {
				s, err := r.ReadString('\n')
				if err != nil {
					fmt.Printf("ReadString err: %v", err)
					return
				}
				if _, err = conn.Write([]byte(s)); err != nil {
					fmt.Printf("Write error: %v", err)
					return
				}
			}
		}()
	}
}

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrPoolClosed  = errors.New("the pool is closed")
	ErrIdleTimeout = errors.New("the item is idle timeout")
	ErrIdleFull    = errors.New("idle items are full")
	ErrGetTimeout  = errors.New("no item to get")
)

Functions

This section is empty.

Types

type Creator

type Creator interface {
	// Used to create a new item which will be returned by Pool.Get().
	// It will be called if there are not enough items
	// and there is still capacity space to allocate new items.
	NewItem() (PoolItem, error)

	// This method will be called every time before Pool.Get() returning a PoolItem to user.
	// Every item returned from Get() will be initialized with this method.
	//
	// item is the one will be returned by Pool.Get().
	// n is the use count of this item.
	// n = 1 means the first use of this item.
	//
	// If the returned error is not nil, item.SetErr() and item.Close() will be
	// called sequentially by connpool, and item will not be returned by Pool.Get()
	InitItem(item PoolItem, n uint64) error
	Close() error
}

Users should implement this interface to create PoolItems.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

The main pool struct.

func NewPool

func NewPool(name string, creator Creator, maxTotalNum int, maxIdleNum int, idleTimeout int) *Pool

Create a connection pool.

name is an unique id of this pool;

creator is the Creator interface implemented by user;

maxTotalNum is the maximum total number of active and idle connections hold by this pool;

Here active refers to an item being hold by a user after Pool.Get(), while idle refers to an item in the pool waiting for Pool.Get().

maxIdleNum is the maximum number of idle connections hold by this pool;

idleTimeout is the timeout in second of idle connections, 0 means no timeout; If an item is in idle state for at least idleTimeout seconds, the item will be closed with error ErrIdleTimeout.

func (*Pool) ClearItem

func (self *Pool) ClearItem(item PoolItem)

Call this method to clear items with error from the pool.

This method is called by user in the implementation of PoolItem.Close() when an error previously set by PoolItem.SetErr() is detected.

func (*Pool) Close

func (self *Pool) Close()

Close the pool.

func (*Pool) Closed

func (self *Pool) Closed() bool

Pool closed or not.

func (*Pool) Get

func (self *Pool) Get() (_item PoolItem, _err error)

Get pooled item originally created by Creator.NewItem().

If SetGetTimeout() is called with non-zero value, Get() will return with error ErrGetTimeout after timeout.

func (*Pool) GetIdleNum

func (self *Pool) GetIdleNum() int

Get the number of idle items.

func (*Pool) GetName

func (self *Pool) GetName() string

Get the name of pool specified at NewPool()

func (*Pool) GetTotalNum

func (self *Pool) GetTotalNum() int

Get the total number of all items including active and idle.

func (*Pool) GiveBack

func (self *Pool) GiveBack(item PoolItem)

Call this method to give normal(non-error) items back to the pool after finishing using.

This method is called by user in the implementation of PoolItem.Close() when no error with item is detected.

If idle items are full, this item will be closed with error ErrIdleFull.

func (*Pool) IsItemActive

func (self *Pool) IsItemActive(_item PoolItem) bool

Check whether an item is active or not.

func (*Pool) SetGetTimeout

func (self *Pool) SetGetTimeout(timeout int)

Set Get()'s timeout in second, 0 means no timeout, default 0. Get() will return with error ErrGetTimeout on timeout.

This method can be called after NewPool().

type PoolItem

type PoolItem interface {
	// Called after finishing using the PoolItem.
	// If the item is in error state, clear it by calling pool.ClearItem(),
	// otherwise give it back by calling pool.GiveBack().
	Close() error

	// Save the error if the error is not recoverable.
	// This method is called by connpool as well as by users who encounter
	// errors when using PoolItem.
	SetErr(error)
	// Return error saved previously by SetErr().
	GetErr() error

	// The two methods below are just called by connpool.
	// Implementers just need to save the parameter passed in.
	SetContainer(PoolItem)
	// Implementers just need to return the parameter saved by SetContainer().
	GetContainer() PoolItem
}

Pooled items should implement this interface.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL