mr

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2023 License: Apache-2.0 Imports: 4 Imported by: 0

README

mapreduce

为什么需要 MapReduce

在实际的业务场景中我们常常需要从不同的 rpc 服务中获取相应属性来组装成复杂对象。

比如要查询商品详情:

  1. 商品服务-查询商品属性
  2. 库存服务-查询库存属性
  3. 价格服务-查询价格属性
  4. 营销服务-查询营销属性

如果是串行调用的话响应时间会随着 rpc 调用次数呈线性增长,所以我们要优化性能一般会将串行改并行。

简单的场景下使用 WaitGroup 也能够满足需求,但是如果我们需要对 rpc 调用返回的数据进行校验、数据加工转换、数据汇总呢?继续使用 WaitGroup 就有点力不从心了,go 的官方库中并没有这种工具(java 中提供了 CompleteFuture),我们依据 MapReduce 架构思想实现了进程内的数据批处理 MapReduce 并发工具类。

设计思路

我们尝试把自己代入到作者的角色梳理一下并发工具可能的业务场景:

  1. 查询商品详情:支持并发调用多个服务来组合产品属性,支持调用错误可以立即结束。
  2. 商品详情页自动推荐用户卡券:支持并发校验卡券,校验失败自动剔除,返回全部卡券。

以上实际都是在进行对输入数据进行处理最后输出清洗后的数据,针对数据处理有个非常经典的异步模式:生产者消费者模式。于是我们可以抽象一下数据批处理的生命周期,大致可以分为三个阶段:

  1. 数据生产 generate
  2. 数据加工 mapper
  3. 数据聚合 reducer

其中数据生产是不可或缺的阶段,数据加工、数据聚合是可选阶段,数据生产与加工支持并发调用,数据聚合基本属于纯内存操作单协程即可。

再来思考一下不同阶段之间数据应该如何流转,既然不同阶段的数据处理都是由不同 goroutine 执行的,那么很自然的可以考虑采用 channel 来实现 goroutine 之间的通信。

如何实现随时终止流程呢?

goroutine 中监听一个全局的结束 channel 和调用方提供的 ctx 就行。

简单示例

并行求平方和(不要嫌弃示例简单,只是模拟并发)

package main

import (
	"fmt"
	"log"

	"github.com/LabKiko/kiko-gokit/mr"
)

func main() {
	val, err := mr.MapReduce(func(source chan<- interface{}) {
		// generator
		for i := 0; i < 10; i++ {
			source <- i
		}
	}, func(item interface{}, writer mr.Writer, cancel func(error)) {
		// mapper
		i := item.(int)
		writer.Write(i * i)
	}, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
		// reducer
		var sum int
		for i := range pipe {
			sum += i.(int)
		}
		writer.Write(sum)
	})
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println("result:", val)
}

更多示例:https://github.com/zeromicro/zero-examples/tree/main/mapreduce

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrCancelWithNil is an error that mapreduce was cancelled with nil.
	ErrCancelWithNil = errors.New("mapreduce cancelled with nil")
	// ErrReduceNoOutput is an error that reduce did not output a value.
	ErrReduceNoOutput = errors.New("reduce not writing value")
)

Functions

func Finish

func Finish(fns ...func() error) error

Finish runs fns parallelly, cancelled on any error.

func FinishVoid

func FinishVoid(fns ...func())

FinishVoid runs fns parallelly.

func ForEach

func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option)

ForEach maps all elements from given generate but no output.

func MapReduce

func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc,
	opts ...Option) (interface{}, error)

MapReduce maps all elements generated from given generate func, and reduces the output elements with given reducer.

func MapReduceChan

func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
	opts ...Option) (interface{}, error)

MapReduceChan maps all elements from source, and reduce the output elements with given reducer.

func MapReduceVoid

func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error

MapReduceVoid maps all elements generated from given generate, and reduce the output elements with given reducer.

Types

type ForEachFunc

type ForEachFunc func(item interface{})

ForEachFunc is used to do element processing, but no output.

type GenerateFunc

type GenerateFunc func(source chan<- interface{})

GenerateFunc is used to let callers send elements into source.

type MapFunc

type MapFunc func(item interface{}, writer Writer)

MapFunc is used to do element processing and write the output to writer.

type MapperFunc

type MapperFunc func(item interface{}, writer Writer, cancel func(error))

MapperFunc is used to do element processing and write the output to writer, use cancel func to cancel the processing.

type Option

type Option func(opts *mapReduceOptions)

Option defines the method to customize the mapreduce.

func WithContext

func WithContext(ctx context.Context) Option

WithContext customizes a mapreduce processing accepts a given ctx.

func WithWorkers

func WithWorkers(workers int) Option

WithWorkers customizes a mapreduce processing with given workers.

type ReducerFunc

type ReducerFunc func(pipe <-chan interface{}, writer Writer, cancel func(error))

ReducerFunc is used to reduce all the mapping output and write to writer, use cancel func to cancel the processing.

type VoidReducerFunc

type VoidReducerFunc func(pipe <-chan interface{}, cancel func(error))

VoidReducerFunc is used to reduce all the mapping output, but no output. Use cancel func to cancel the processing.

type Writer

type Writer interface {
	Write(v interface{})
}

Writer interface wraps Write method.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL