mr

package module
v0.0.0-...-d23c3ed Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 29, 2023 License: MIT Imports: 10 Imported by: 0

README

mr

The MapReduce system, which implements a workflow that calls the application Map and Reduce functions and processes reading and writing files, as well as a coordinating process that distributes tasks among workers and copes with failed workers.

Documentation

Index

Constants

View Source
const (
	SNOOZE = iota
	ABORT
	REDUCE
	MAP
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ByKey

type ByKey []KeyValue

func (ByKey) Len

func (a ByKey) Len() int

for sorting by key.

func (ByKey) Less

func (a ByKey) Less(i, j int) bool

func (ByKey) Swap

func (a ByKey) Swap(i, j int)

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

func MakeCoordinator

func MakeCoordinator(
	files []string,
	nReduce int,
	opts ...Options,
) *Coordinator

MakeCoordinator creates a new Coordinator instance.

It takes in a slice of file names, an integer representing the number of reduces, and optional options for the Coordinator. It returns a pointer to the newly created Coordinator.

func (*Coordinator) Done

func (c *Coordinator) Done() bool

Done returns the value of the isDone atomic boolean.

No parameters. Returns a boolean value.

func (*Coordinator) DoneTask

func (c *Coordinator) DoneTask(taskID int) error

DoneTask marks a task as done in the Coordinator.

taskID: the ID of the task to mark as done. Returns an error if the task is not found.

func (*Coordinator) GetTask

func (c *Coordinator) GetTask() *Task

GetTask returns a task from the Coordinator's queue.

It checks if the coordinator is done and returns an ABORT task if it is. Otherwise, it locks the coordinator's mutex and checks if the queue is empty. If the queue is empty, it returns a SNOOZE task. If the task in the queue is a REDUCE task and there are still map tasks running, it returns a SNOOZE task. Otherwise, it removes the task from the queue, adds it to the running tasks, and returns the task.

type Coordinatorer

type Coordinatorer interface {
	GetTask() *Task
	DoneTask(taskID int) error
	Done() bool
}

type Delivery

type Delivery interface {
	GetTask() (*Task, error)
	DoneTask(taskID int) error
}

type KeyValue

type KeyValue struct {
	Key   string
	Value string
}

Map functions return a slice of KeyValue.

type MapReduce

type MapReduce struct {
	// contains filtered or unexported fields
}

func NewMapReduce

func NewMapReduce(
	mapf func(string, string) []KeyValue,
	reducef func(string, []string) string,
) *MapReduce

NewMapReduce creates a new MapReduce object.

It takes two functions as parameters: mapf and reducef. mapf is a function that takes a key-value pair and returns a slice of KeyValue objects. reducef is a function that takes a key and a slice of values and returns a string.

The function returns a pointer to a MapReduce object.

func (*MapReduce) MapAndShuffle

func (w *MapReduce) MapAndShuffle(filename string, nReduce int) [][]KeyValue

MapAndShuffle maps the content of a file to key-value pairs and shuffles the data.

filename: the name of the file to be mapped. nReduce: the number of reduce tasks. [][]KeyValue: a slice of slices containing the shuffled key-value pairs.

func (*MapReduce) ReadMap

func (w *MapReduce) ReadMap(src string) ([]KeyValue, error)

ReadMap reads the contents of a file and returns a slice of KeyValue structs.

src: the path to the file to be read. []KeyValue: a slice of KeyValue structs representing the contents of the file. error: an error if the file cannot be opened or read.

func (*MapReduce) Reduce

func (w *MapReduce) Reduce(kv []KeyValue) []KeyValue

Reduce is a function that takes a slice of KeyValue pairs and returns a slice of KeyValue pairs.

It sorts the input slice of KeyValue pairs by key and then iterates through the sorted slice, grouping together consecutive KeyValue pairs with the same key. For each group, it calls the reducef function to produce a single output KeyValue pair. The output slice contains all the output KeyValue pairs produced by the reducef function.

Parameters: - kv: a slice of KeyValue pairs to be reduced.

Returns: - a slice of KeyValue pairs produced by the reducef function.

func (*MapReduce) StoreMap

func (w *MapReduce) StoreMap(kvs [][]KeyValue)

StoreMap stores the given key-value pairs in a temporary file.

The function takes a 2D slice of KeyValue structs as input. Each inner slice represents a set of key-value pairs. The function iterates over each inner slice and writes the key-value pairs to a temporary file. The file name is generated using the index of the inner slice. The function uses the json.Encoder to encode each key-value pair and writes it to the file. If there is an error opening or encoding the data, a warning message is logged.

func (*MapReduce) StoreReduce

func (w *MapReduce) StoreReduce(
	nReduce int,
	src string,
	kv []KeyValue,
) error

StoreReduce stores the reduced key-value pairs in the output file.

nReduce: the number of reduce tasks. src: the source string. kv: the key-value pairs to be stored. error: an error if any occurred during the process.

type MapReducer

type MapReducer interface {
	MapAndShuffle(filename string, nReduce int) [][]KeyValue
	StoreMap(kvs [][]KeyValue)
	ReadMap(src string) ([]KeyValue, error)
	Reduce(kv []KeyValue) []KeyValue
	StoreReduce(nReduce int, src string, kv []KeyValue) error
}

type Options

type Options interface {
	// contains filtered or unexported methods
}

func WithTimeout

func WithTimeout(timeout time.Duration) Options

WithTimeout returns an Options object that sets the timeout duration for some operation.

timeout: the duration for the timeout. Returns: an Options object that can be used to configure some operation.

type Task

type Task struct {
	ID         int
	Assignment byte
	File       string
	NReduce    int
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func New

func New(
	mapReduce MapReducer,
	delivery Delivery,
	opts ...Options,
) *Worker

New creates a new Worker instance.

It takes a MapReducer, a Delivery, and optional Options as parameters. It returns a pointer to a Worker.

func (*Worker) Run

func (m *Worker) Run()

Run runs the worker.

It retrieves tasks from the delivery and performs the corresponding actions based on the task assignment. The function does not take any parameters and does not return any values.

Directories

Path Synopsis
delivery
rpc
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL