gotools

package module
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 3, 2023 License: MulanPSL-2.0 Imports: 21 Imported by: 0

README

通用golang函数库

// RunParallel 该函数提供同时运行max个协程fn,一旦fn有err返回则停止接下来的fn运行。
// 朝返回的 add 函数中添加任务,若正在运行的任务数已达到max则会阻塞当前程序。
// add 函数返回err为任务 fn 返回的第一个err。与 wait 函数返回的err为同一个。
// 注意请在 add 完所有任务后调用 wait。
func RunParallel[T any](max int, fn func(T) error) (add func(T) error, wait func() error)

// RunParallelNoBlock 该函数提供同RunParallel一样,但是add函数不会阻塞。注意请在add完所有任务后调用wait。
func RunParallelNoBlock[T any](max int, fn func(T) error) (add func(T) error, wait func() error)

// RunParallelNoLimit 该函数提供同RunParallel一样,但是不限制协程数。注意请在add完所有任务后调用wait。
func RunParallelNoLimit[T any](fn func(T) error) (add func(T) error, wait func() error)

// Run 并发将jobs传递给proc函数运行,一旦发生error便立即返回该error,并结束其它协程。
// 当ctx被cancel时也将立即返回,此时返回cancel时的error。
// 当proc运行发生panic将立即返回该panic字符串化的error。
// proc为nil时函数将panic。
func Run[T any](ctx context.Context, proc func(context.Context, T) error, jobs ...T) error

// StartProcess 将每个jobs依次递给steps函数处理。一旦某个step发生error或者panic,StartProcess立即返回该error,
// 并及时结束其他StartProcess开启的goroutine,也不开启新的goroutine运行step。
// 一个job最多在一个step中运行一次,且一个job一定是依次序递给steps,前一个step处理完毕才会给下一个step处理。
// 每个step并发运行jobs。
// StartProcess等待所有goroutine运行结束才返回,或者ctx被cancel时也将及时结束开启的goroutine后返回。
// StartProcess因被ctx cancel而结束时函数返回nil。若steps中含有nil StartProcess将会panic。
func StartProcess[T any](ctx context.Context, jobs []T, steps ...func(context.Context, T) error) error

// Listen 监听chans,一旦有一个chan激活便立即将T发送给ch,并close ch。
// 若所有chan都未曾激活(chan是nil也认为未激活)且都close了,或者ctx被cancel了,则ch被close。
// 若同时chan被激活和ctx被cancel,则随机返回一个激活发送给chan的值。
func Listen[T any](ctx context.Context, chans ...<-chan T) (ch <-chan T)

// WriteAtReader 获取一个WriterAt和Reader对象,其中WriterAt用于并发写入数据,而与此同时Reader对象同时读取出已经写入好的数据。
// WriterAt写入完毕后调用Close,则Reader会全部读取完后结束读取。
// WriterAt发生的error会传递给Reader返回。
// 该接口是特定为一个目的实现————服务器分片下载数据中转给客户端下载,提高中转数据效率。
func WriteAtReader() (WriteAtCloser, io.ReadCloser)

// NewMultiReader 依次从reader读出数据并写入writer中,并close reader。
// 返回send用于添加reader,readSize表示需要从reader读出的字节数,order用于表示记录读取序数并传递给writer,若读取字节数对不上则返回error。
// 返回wait用于等待所有reader读完,若读取发生error,wait返回该error,并结束读取。
// 务必等所有reader都已添加给send后再调用wait。
// 该函数可用于需要非同一时间多个读取流和一个写入流的工作模型。
func NewMultiReader(ctx context.Context, writer func(order int, p []byte)) (
send func(readSize, order int, reader io.ReadCloser), wait func() error)

// MultiReadCloser 依次从rc中读出数据直到io.EOF则close rc。从r获取rc中读出的数据。
// add添加rc,返回error表明读取rc发生错误,可以安全的添加nil。调用endAdd表明不会再有rc添加,当所有数据读完了时,r将返回EOF。
// 如果ctx被cancel,将停止读取并返回error。
// 所有添加进去的io.ReadCloser都会被close。
func MultiReadCloser(ctx context.Context, rc ...io.ReadCloser) (r io.Reader, add func(rc io.ReadCloser) error, endAdd func())

联系电邮:ivfzhou@126.com

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func CloseIO added in v1.4.0

func CloseIO(c io.Closer)

CloseIO 调用c.Close()。

func Contains

func Contains[E comparable](arr []E, elem E) bool

func Convert

func Convert[E, T any](sli []E, fn func(E) T) []T

func ConvertToMap

func ConvertToMap[K comparable, V, E any](sli []E, fn func(E) (K, V)) map[K]V

func ConvertToSlice

func ConvertToSlice[K comparable, V, T any](m map[K]V, fn func(K, V) T) []T

func CopyFile added in v1.2.0

func CopyFile(src, dest string) error

CopyFile 复制文件。

func Distinct

func Distinct[E comparable](sli []E) []E

func DropZero

func DropZero[E any](sli []E) []E

func Filter

func Filter[E any](sli []E, fn func(E) bool) []E

func FilterMap

func FilterMap[K comparable, V any](m map[K]V, fn func(K, V) bool) map[K]V

func Foreach

func Foreach[E any](sli []E, fn func(E))

func ForeachCanBreak

func ForeachCanBreak[E any](sli []E, fn func(E) bool)

func ForeachWithReturn

func ForeachWithReturn[E, T any](sli []E, fn func(E) (T, bool)) T

func GCD

func GCD(x, y int) int

GCD x与y的最大公约数

func IPv4ToNum

func IPv4ToNum(ip string) uint32

IPv4ToNum ipv4字符串转数字。 若是非ip地址则返回0。

func IPv4ToStr

func IPv4ToStr(ip uint32) string

IPv4ToStr ipv4数字转字符串。

func IsIPv4 added in v1.2.0

func IsIPv4(s string) bool

IsIPv4 判断是否是ipv4。

func IsIPv6 added in v1.2.0

func IsIPv6(s string) bool

IsIPv6 判断是否是ipv6。

func IsIntranet added in v1.2.0

func IsIntranet(ipv4 string) bool

IsIntranet 判断是否是内网IP。

func IsMAC added in v1.2.0

func IsMAC(s string) bool

IsMAC 判断是否是mac地址。

func Join

func Join[T fmt.Stringer](arr []T, sep string) string

Join 拼接元素返回字符串。

func LimitRun added in v1.4.0

func LimitRun(max int) (run func(f func()), adjust func(max int))

LimitRun 限制同一时间最大(max)运行次数。run会等待f运行完毕。 adjust调整最大运行数量。

func Listen

func Listen[T any](ctx context.Context, chans ...<-chan T) (ch <-chan T)

Listen 监听chans,一旦有一个chan激活便立即将T发送给ch,并close ch。 若所有chan都未曾激活(chan是nil也认为未激活)且都close了,或者ctx被cancel了,则ch被close。 若同时chan被激活和ctx被cancel,则随机返回一个激活发送给chan的值。

func Max

func Max[T Number](x, y T) T

Max 求最大值。

func Min

func Min[T Number](x, y T) T

Min 求最小值。

func MultiReadCloser added in v1.4.0

func MultiReadCloser(ctx context.Context, rc ...io.ReadCloser) (r io.Reader, add func(rc io.ReadCloser) error, endAdd func())

MultiReadCloser 依次从rc中读出数据直到io.EOF则close rc。从r获取rc中读出的数据。 add添加rc,返回error表明读取rc发生错误,可以安全的添加nil。调用endAdd表明不会再有rc添加,当所有数据读完了时,r将返回EOF。 如果ctx被cancel,将停止读取并返回error。 所有添加进去的io.ReadCloser都会被close。

func NewMultiReader added in v1.3.0

func NewMultiReader(ctx context.Context, writer func(order int, p []byte)) (
	send func(readSize, order int, reader io.ReadCloser), wait func() error)

NewMultiReader 依次从reader读出数据并写入writer中,并close reader。 返回send用于添加reader,readSize表示需要从reader读出的字节数,order用于表示记录读取序数并传递给writer,若读取字节数对不上则返回error。 返回wait用于等待所有reader读完,若读取发生error,wait返回该error,并结束读取。 务必等所有reader都已添加给send后再调用wait。 该函数可用于需要非同一时间多个读取流和一个写入流的工作模型。

func PickMapKey

func PickMapKey[K comparable, V any](m map[K]V) []K

func PickMapValue

func PickMapValue[K comparable, V any](m map[K]V) []V

func RandomChars added in v1.3.0

func RandomChars(length int) string

RandomChars 生成随机字符串(数字加字母组合)。

func RandomCharsCaseInsensitive added in v1.3.0

func RandomCharsCaseInsensitive(length int) string

RandomCharsCaseInsensitive 生成随机字符串(数字加字母组合)。

func RecoverAndPrintStack added in v1.4.0

func RecoverAndPrintStack()

RecoverAndPrintStack 恢复panic并打印堆栈。

func Run

func Run[T any](ctx context.Context, proc func(context.Context, T) error, jobs ...T) error

Run 并发将jobs传递给proc函数运行,一旦发生error便立即返回该error,并结束其它协程。 当ctx被cancel时也将立即返回,此时返回cancel时的error。 当proc运行发生panic将立即返回该panic字符串化的error。 proc为nil时函数将panic。

func RunCommandAndPrompt added in v1.2.0

func RunCommandAndPrompt(cmd string, prompts ...string) (stdout, stderr []byte, err error)

RunCommandAndPrompt 运行命令并响应输入prompts。

func RunConcurrently

func RunConcurrently(fn ...func() error) (wait func() error)

RunConcurrently 并发运行fn。

Example
package main

import (
	"gitee.com/ivfzhou/gotools"
)

func main() {
	var order any
	queryDB1 := func() error {
		// op order
		order = nil
		return nil
	}

	var stock any
	queryDB2 := func() error {
		// op stock
		stock = nil
		return nil
	}
	err := gotools.RunConcurrently(queryDB1, queryDB2)()
	// check err
	if err != nil {
		return
	}

	// do your want
	_ = order
	_ = stock
}
Output:

func RunParallel

func RunParallel[T any](max int, fn func(T) error) (add func(T) error, wait func() error)

RunParallel 该函数提供同时运行max个协程fn,一旦fn有err返回则停止接下来的fn运行。 朝返回的add函数中添加任务,若正在运行的任务数已达到max则会阻塞当前程序。 add函数返回err为任务fn返回的第一个err。与wait函数返回的err为同一个。 注意请在add完所有任务后调用wait。

Example
package main

import (
	"gitee.com/ivfzhou/gotools"
)

func main() {
	type product struct {
		// some stuff
	}
	op := func(data *product) error {
		// do something
		return nil
	}
	add, wait := gotools.RunParallel[*product](12, op)

	// many products
	var projects []*product
	for _, v := range projects {
		// blocked since number of ops running simultaneously reaches 12
		if err := add(v); err != nil {
			// means having a op return err
		}
	}

	// wait all op done and check err
	if err := wait(); err != nil {
		// op occur err
	}
}
Output:

func RunParallelNoBlock

func RunParallelNoBlock[T any](max int, fn func(T) error) (add func(T) error, wait func() error)

RunParallelNoBlock 该函数提供同RunParallel一样,但是add函数不会阻塞。注意请在add完所有任务后调用wait。

Example
package main

import (
	"gitee.com/ivfzhou/gotools"
)

func main() {
	type product struct {
		// some stuff
	}
	op := func(data *product) error {
		// do something
		return nil
	}
	add, wait := gotools.RunParallelNoBlock[*product](12, op)

	// many products
	var projects []*product
	for _, v := range projects {
		// unblocked anywhere
		if err := add(v); err != nil {
			// means having a op return err
		}
	}

	// wait all op done and check err
	if err := wait(); err != nil {
		// op occur err
	}
}
Output:

func RunParallelNoLimit added in v1.5.0

func RunParallelNoLimit[T any](fn func(T) error) (add func(T) error, wait func() error)

RunParallelNoLimit 该函数提供同RunParallel一样,但是不限制协程数。注意请在add完所有任务后调用wait。

func RunPeriodically added in v1.2.0

func RunPeriodically(period time.Duration) (add func(fn func()))

RunPeriodically 依次运行fn,每个fn之间至少间隔period时间。add用于添加fn。

func RunSequently

func RunSequently(fn ...func() error) (wait func() error)

RunSequently 依次运行fn,当有err时停止后续fn运行。

Example
package main

import (
	"gitee.com/ivfzhou/gotools"
)

func main() {
	first := func() error { return nil }
	then := func() error { return nil }
	last := func() error { return nil }
	err := gotools.RunSequently(first, then, last)()
	if err != nil {
		// return err
	}
}
Output:

func RunShellCommand

func RunShellCommand(bin string, args ...string) (std []byte, estd []byte, err error)

RunShellCommand 交互式运行shell命令bin,args为每次输入的命令。 Deprecated: 使用RunCommand替代。

func StartProcess

func StartProcess[T any](ctx context.Context, jobs []T, steps ...func(context.Context, T) error) error

StartProcess 将每个jobs依次递给steps函数处理。一旦某个step发生error或者panic,StartProcess立即返回该error, 并及时结束其他StartProcess开启的goroutine,也不开启新的goroutine运行step。 一个job最多在一个step中运行一次,且一个job一定是依次序递给steps,前一个step处理完毕才会给下一个step处理。 每个step并发运行jobs。 StartProcess等待所有goroutine运行结束才返回,或者ctx被cancel时也将及时结束开启的goroutine后返回。 StartProcess因被ctx cancel而结束时函数返回nil。若steps中含有nil StartProcess将会panic。

Example
package main

import (
	"context"

	"gitee.com/ivfzhou/gotools"
)

func main() {
	type data struct{}
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	jobs := []*data{{}, {}}
	work1 := func(ctx context.Context, d *data) error { return nil }
	work2 := func(ctx context.Context, d *data) error { return nil }
	err := gotools.StartProcess(ctx, jobs, work1, work2) // 等待运行结束
	if err != nil {
		// return err
	}
}
Output:

func UUIDLike added in v1.3.0

func UUIDLike() string

UUIDLike 生成随机字符串(数字加字母组合)。

func UnzipFromBytes added in v1.2.0

func UnzipFromBytes(bs []byte, parentPath string) (filePaths []string, err error)

UnzipFromBytes 将压缩数据解压写入硬盘。

func ZipFilesToBytes added in v1.2.0

func ZipFilesToBytes(files ...string) ([]byte, error)

Types

type AxisMarker added in v1.5.0

type AxisMarker struct {
	// contains filtered or unexported fields
}

AxisMarker 坐标轴记录器。

func (*AxisMarker) GetMaxMarkLine added in v1.5.0

func (m *AxisMarker) GetMaxMarkLine(begin int64) int64

GetMaxMarkLine 获取从某点开始连续被标记的长度。

func (*AxisMarker) Mark added in v1.5.0

func (m *AxisMarker) Mark(offset, length int64)

Mark 记录某段为标记态。

func (*AxisMarker) String added in v1.5.0

func (m *AxisMarker) String() string

type Command added in v1.1.0

type Command struct {
	// contains filtered or unexported fields
}

func RunCommand added in v1.1.0

func RunCommand(cmd string) *Command

RunCommand 交互式运行cmd命令。

func (*Command) IsExit added in v1.1.0

func (c *Command) IsExit() bool

IsExit 是否已运行完毕。

func (*Command) Out added in v1.1.0

func (c *Command) Out() (stdout, stderr []byte, err error)

Out 等待运行结束,返回运行结果

func (*Command) Read added in v1.1.0

func (c *Command) Read() []byte

Read 读取标准输出。

func (*Command) Write added in v1.1.0

func (c *Command) Write(input string) error

Write 响应输入。

type FairLocker

type FairLocker struct {
	// contains filtered or unexported fields
}

FairLocker 公平读写锁 先到达的请求先获取处理器,无论是读请求还是写请求。

func (*FairLocker) RLock

func (locker *FairLocker) RLock()

func (*FairLocker) RUnlock

func (locker *FairLocker) RUnlock()

func (*FairLocker) WLock

func (locker *FairLocker) WLock()

func (*FairLocker) WUnlock

func (locker *FairLocker) WUnlock()

type Number

type Number interface {
	~int | ~int8 | ~int16 | ~int32 | ~int64 |
		~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 |
		~uintptr | ~float32 | ~float64
}

type ReadFirstLocker

type ReadFirstLocker struct {
	// contains filtered or unexported fields
}

ReadFirstLocker 读优先锁 读优先意味着,连续的读请求获取处理器的概率高于写请求。

func (*ReadFirstLocker) RLock

func (locker *ReadFirstLocker) RLock()

func (*ReadFirstLocker) RUnlock

func (locker *ReadFirstLocker) RUnlock()

func (*ReadFirstLocker) WLock

func (locker *ReadFirstLocker) WLock()

func (*ReadFirstLocker) WUnlock

func (locker *ReadFirstLocker) WUnlock()

type WriteAtCloser added in v1.1.0

type WriteAtCloser interface {
	io.WriterAt
	io.Closer
}

func WriteAtReader added in v1.1.0

func WriteAtReader() (WriteAtCloser, io.ReadCloser)

WriteAtReader 获取一个WriterAt和Reader对象,其中WriterAt用于并发写入数据,而与此同时Reader对象同时读取出已经写入好的数据。 WriterAt写入完毕后调用Close,则Reader会全部读取完后结束读取。 WriterAt发生的error会传递给Reader返回。 该接口是特定为一个目的实现————服务器分片下载数据中转给客户端下载,提高中转数据效率。

Example
package main

import (
	"gitee.com/ivfzhou/gotools"
)

func main() {
	writeAtCloser, readCloser := gotools.WriteAtReader()

	// 并发写入数据
	go func() {
		writeAtCloser.WriteAt(nil, 0)
	}()
	// 写完close
	writeAtCloser.Close()

	// 同时读出写入的数据
	go func() {
		readCloser.Read(nil)
	}()
	// 读完close
	readCloser.Close()
}
Output:

type WriteFirstLocker

type WriteFirstLocker struct {
	// contains filtered or unexported fields
}

WriteFirstLocker 写优先锁 读优先意味着,连续的写请求获取处理器的概率高于读请求。

func (*WriteFirstLocker) RLock

func (locker *WriteFirstLocker) RLock()

func (*WriteFirstLocker) RUnlock

func (locker *WriteFirstLocker) RUnlock()

func (*WriteFirstLocker) WLock

func (locker *WriteFirstLocker) WLock()

func (*WriteFirstLocker) WUnlock

func (locker *WriteFirstLocker) WUnlock()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL