gomsgprocessor

package module
v0.1.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2024 License: BSD-3-Clause Imports: 5 Imported by: 0

README

GoMsgProcessor

A Golang library for parallel processing of messages to structured documents

Table of Contents

1. Description

GoMsgProcessor is a generic library to read messages, in a recursively and parallel way, requiring only a builder to transform then to final documents. It is possible to set multiple builders, associating each one with a message type, allowing to work with messages from different sources. Through the namespaces, it is also be able to work with different targets. In addition, a deduplication function can be injected to clean up the slice of documents after the process.

2. Technology Stack

Stack Version
Golang v1.18
golangci-lint v1.46.2

3. Getting Started

  • Prerequisites
    • Any Golang programming language version installed, preferred 1.18 or later.
  • Install
    go get -u github.com/arquivei/gomsgprocessor
    
  • Configuration Setup
    go mod vendor
    go mod tidy
    
  • Usage
    • Import the package

      import (
          "github.com/arquivei/gomsgprocessor"
      )
      
    • Define a incoming message struct

      type ExampleMessage struct {
          ID            int      `json:"id"`
          Name          string   `json:"name"`
          Age           int      `json:"age"`
          City          string   `json:"city"`
          State         string   `json:"state"`
          ChildrenNames []string `json:"childrenNames"`
          Namespace     string   `json:"namespace"`
      }
      
    • Implement the Message interface, witch is the input of ParallelProcessor's MakeDocuments.

      func (e *ExampleMessage) GetNamespace() gomsgprocessor.Namespace {
          // Namespace is a logical separator that will be used to group messages while processing then.
          return gomsgprocessor.Namespace(e.Namespace)
      }
      
      func (e *ExampleMessage) GetType() gomsgprocessor.MessageType {
          // MessageType is used to decide which DocumentBuilder to use for each Message.
          return gomsgprocessor.MessageType("typeExample")
      }
      
      func (e *ExampleMessage) UpdateLogWithData(ctx context.Context) {
          // Optional logger method
          log.Ctx(ctx).UpdateContext(func(zc zerolog.Context) zerolog.Context {
              return zc.
                  Int("msg_id", e.ID).
                  Str("msg_name", e.Name).
                  Strs("msg_children_names", e.ChildrenNames).
                  Int("msg_age", e.Age).
                  Str("msg_city", e.City).
                  Str("msg_state", e.State).
                  Str("msg_type", string(e.GetType())).
                  Str("msg_namespace", e.Namespace)
          })
      }
      
    • Define a outcoming document struct, witch is the result of a DocumentBuilder's Build.

      type ExampleDocument struct {
          ID              string
          CreatedAt       time.Time
          ParentName      string
          ParentBirthYear int
          ChildName       string
          CityAndState    string
          Namespace       string
      }
      
    • Implement the DocumentBuilder interface, witch transforms a Message into a slice of Documents.

      type ExampleBuilder struct{}
      
      // Build transforms a Message into []Document.
      func (b *ExampleBuilder) Build(_ context.Context, msg gomsgprocessor.Message) ([]gomsgprocessor.Document, error) {
          exampleMsg, ok := msg.(*ExampleMessage)
          if !ok {
              return nil, errors.New("failed to cast message")
          }
      
          // Parallel Processor will ignore this message
          if len(exampleMsg.ChildrenNames) == 0 {
              return nil, nil
          }
      
          documents := make([]gomsgprocessor.Document, 0, len(exampleMsg.ChildrenNames))
      
          for _, childName := range exampleMsg.ChildrenNames {
              documents = append(documents, ExampleDocument{
                  ID:              strconv.Itoa(exampleMsg.ID) + "_" + childName,
                  CreatedAt:       time.Now(),
                  ParentName:      exampleMsg.Name,
                  CityAndState:    exampleMsg.City + " - " + exampleMsg.State,
                  ChildName:       childName,
                  ParentBirthYear: time.Now().Year() - exampleMsg.Age,
                  Namespace:       exampleMsg.Namespace,
              })
          }
      
          return documents, nil
      }
      
    • Define a (optional) function, used for deduplicate the slice of documents.

        func ExampleDeduplicateDocuments(documents []gomsgprocessor.Document) ([]gomsgprocessor.Document, error) {
            examplesDocuments := make([]ExampleDocument, 0, len(documents))
            for _, document := range documents {
                exampleDocument, ok := document.(ExampleDocument)
                if !ok {
                  return nil, errors.New("failed to cast document")
                }
                examplesDocuments = append(examplesDocuments, exampleDocument)
            }
      
            documentsByID := make(map[string]ExampleDocument, len(examplesDocuments))
            for _, exampleDocument := range examplesDocuments {
                documentsByID[exampleDocument.ID] = exampleDocument
            }
      
            deduplicatedDocuments := make([]gomsgprocessor.Document, 0, len(documentsByID))
            for _, documentByID := range documentsByID {
                deduplicatedDocuments = append(deduplicatedDocuments, documentByID)
            }
            return deduplicatedDocuments, nil
        }
      
    • And now, it's time!

      func main() {
      
      	// NewParallelProcessor returns a new ParallelProcessor with a map of
      	// DocumentBuilder for each MessageType.
      	//
      	// A list of Option is also available for this method. See option.go for more
      	// information.
      	parallelProcessor := gomsgprocessor.NewParallelProcessor(
      		map[gomsgprocessor.MessageType]gomsgprocessor.DocumentBuilder{
      			"typeExample": &ExampleBuilder{},
      		},
      		gomsgprocessor.WithDeduplicateDocumentsOption(ExampleDeduplicateDocuments),
      	)
      
      	messages := []gomsgprocessor.Message{
      		&ExampleMessage{
      			ID:            1,
      			Name:          "John",
      			Age:           30,
      			City:          "New York",
      			State:         "NY",
      			ChildrenNames: []string{"John", "Jane", "Mary"},
      			Namespace:     "namespace1",
      		},
      		&ExampleMessage{
      			ID:            2,
      			Name:          "Poul",
      			Age:           25,
      			City:          "New Jersey",
      			State:         "NY",
      			ChildrenNames: []string{},
      			Namespace:     "namespace1",
      		},
      		&ExampleMessage{
      			ID:            3,
      			Name:          "Chris",
      			Age:           35,
      			City:          "Washington",
      			State:         "DC",
      			ChildrenNames: []string{"Bob"},
      			Namespace:     "namespace1",
      		},
      		&ExampleMessage{
      			ID:            3,
      			Name:          "Chris",
      			Age:           35,
      			City:          "Washington",
      			State:         "DC",
      			ChildrenNames: []string{"Bob"},
      			Namespace:     "namespace2",
      		},
      		&ExampleMessage{
      			ID:            1,
      			Name:          "John",
      			Age:           30,
      			City:          "New York",
      			State:         "NY",
      			ChildrenNames: []string{"John", "Jane", "Mary"},
      			Namespace:     "namespace1",
      		},
      	}
      
      	// MakeDocuments creates in parallel a slice of Document for given []Message
      	// using the map of DocumentBuilder (see NewParallelProcessor).
      	//
      	// This method returns a []Document and a (foundationkit/errors).Error.
      	// If not nil, this error has a (foundationkit/errors).Code associated with and
      	// can be a ErrCodeBuildDocuments or a ErrCodeDeduplicateDocuments.
      	documents, err := parallelProcessor.MakeDocuments(context.Background(), messages)
      	if err != nil {
      		panic(err)
      	}
      
      	examplesDocuments := make([]ExampleDocument, 0, len(documents))
      	for _, document := range documents {
      		exampleDocument, ok := document.(ExampleDocument)
      		if !ok {
      			panic("failed to cast document")
      		}
      		examplesDocuments = append(examplesDocuments, exampleDocument)
      	}
      
      	fmt.Println(JSONMarshal(examplesDocuments))
      }
      
      // Simple json marshaler with indentation
      func JSONMarshal(t interface{}) (string, error) {
      	buffer := &bytes.Buffer{}
      	encoder := json.NewEncoder(buffer)
      	encoder.SetEscapeHTML(false)
      	encoder.SetIndent("", "  ")
      	err := encoder.Encode(t)
      	return buffer.String(), err
      }
      
  • Examples

4. Changelog

  • GoMsgProcessor 0.1.0 (May 20, 2022)

    • [New] Decoupling this package from Arquivei's API projects.
    • [New] Setting github's workflow with golangci-lint
    • [New] Example for usage.
    • [New] Documents: Code of Conduct, Contributing, License and Readme.

5. Collaborators

6. Contributing

Please read CONTRIBUTING.md for details on our code of conduct, and the process for submitting pull requests to us.

7. Versioning

We use Semantic Versioning for versioning. For the versions available, see the tags on this repository.

8. License

This project is licensed under the BSD 3-Clause - see the LICENSE.md file for details.

9. Contact Information

Contacts can be made by email: rilder.almeida@arquivei.com.br

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrCodeBuildDocuments = errors.Code("FAILED_BUILD_DOCUMENTS")

ErrCodeBuildDocuments is returned when the operation failed to build the response for given []Message.

View Source
var ErrCodeDeduplicateDocuments = errors.Code("FAILED_DEDUPLICATE_DOCUMENTS")

ErrCodeDeduplicateDocuments is returned when the operation failed to deduplicate []Documents by Namespace.

View Source
var ErrMsgTypeHasNoBuilder = errors.New("message type has no document builder")

ErrMsgTypeHasNoBuilder is returned when MessageType has no DocumentBuilder associated with it.

Functions

This section is empty.

Types

type DeduplicateDocumentsFunc

type DeduplicateDocumentsFunc func([]Document) ([]Document, error)

DeduplicateDocumentsFunc is used to deduplicate a slice of Document.

type Document

type Document interface{}

Document is the result of a DocumentBuilder's Build.

type DocumentBuilder

type DocumentBuilder interface {
	// Build transforms a Message into []Document.
	Build(context.Context, Message) ([]Document, error)
}

DocumentBuilder is a interface that transforms a Message into []Document.

type Message

type Message interface {
	GetNamespace() Namespace
	GetType() MessageType
	UpdateLogWithData(context.Context)
}

Message is the input of ParallelProcessor's MakeDocuments.

type MessageType

type MessageType string

MessageType is used to decide which DocumentBuilder to use for each Message.

type Namespace

type Namespace string

Namespace is a logical separator that will be used to group messages while processing then.

type Option

type Option func(*parallelProcessor)

Option is used to configure the processor.

func WithDeduplicateDocumentsOption

func WithDeduplicateDocumentsOption(d DeduplicateDocumentsFunc) Option

WithDeduplicateDocumentsOption adds a DeduplicateDocumentsFunc in ParallelProcessor, used to deduplicate a slice of Document.

type ParallelProcessor

type ParallelProcessor interface {
	// MakeDocuments creates in parallel a slice of Document for given []Message
	// using the map of DocumentBuilder (see NewParallelProcessor).
	//
	// This method returns a []Document and a (foundationkit/errors).Error.
	// If not nil, this error has a (foundationkit/errors).Code associated with and
	// can be a ErrCodeBuildDocuments or a ErrCodeDeduplicateDocuments.
	MakeDocuments(context.Context, []Message) ([]Document, error)
}

ParallelProcessor is a interface that process in parallel a slice of Message.

func NewParallelProcessor

func NewParallelProcessor(
	builders map[MessageType]DocumentBuilder,
	opts ...Option,
) ParallelProcessor

NewParallelProcessor returns a new ParallelProcessor with a map of DocumentBuilder for each MessageType.

A list of Option is also available for this method. See option.go for more information.

Directories

Path Synopsis
nolint
nolint

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL