writers

package
v0.1.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 30, 2023 License: AGPL-3.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AMQPBaseForm = forms.Form{
	ErrorMsg: "invalid data encountered in the AMQP form",
	Fields: []forms.Field{
		{
			Name: "format",
			Validators: []forms.Validator{
				forms.IsOptional{Default: "json"},
				forms.IsIn{Choices: []interface{}{"json"}},
			},
		},
		{
			Name: "compress",
			Validators: []forms.Validator{
				forms.IsOptional{Default: false},
				forms.IsBoolean{},
			},
		},
		{
			Name: "queue",
			Validators: []forms.Validator{
				forms.IsString{},
			},
		},
		{
			Name: "routing_key",
			Validators: []forms.Validator{
				forms.IsString{},
			},
		},
		{
			Name: "exchange",
			Validators: []forms.Validator{
				forms.IsString{},
			},
		},
		{
			Name: "exchange_type",
			Validators: []forms.Validator{
				forms.IsIn{Choices: []interface{}{"fanout", "direct", "topic"}},
			},
		},
		{
			Name: "queue_expires_after_ms",
			Validators: []forms.Validator{
				forms.IsOptional{Default: int64(0)},
				forms.IsInteger{HasMin: true, Min: 0},
			},
		},
		{
			Name: "url",
			Validators: []forms.Validator{
				forms.IsString{},
			},
		},
	},
}
View Source
var AMQPWriterForm = forms.Form{
	ErrorMsg: "invalid data encountered in the AMQP writer form",
	Fields: append([]forms.Field{
		{
			Name: "confirmation_timeout",
			Validators: []forms.Validator{
				forms.IsOptional{Default: 1.0},
				forms.IsFloat{Convert: false},
			},
		},
	}, AMQPBaseForm.Fields...),
}
View Source
var BytesWriterForm = forms.Form{
	ErrorMsg: "invalid data encountered in the bytes writer form",
	Fields: []forms.Field{
		{
			Name: "format",
			Validators: []forms.Validator{
				forms.IsIn{Choices: []interface{}{"json"}},
			},
		},
		{
			Name: "compress",
			Validators: []forms.Validator{
				forms.IsOptional{Default: false},
				forms.IsBoolean{},
			},
		},
	},
}
View Source
var CountWriterForm = forms.Form{
	ErrorMsg: "invalid data encountered in the count writer form",
	Fields: []forms.Field{
		{
			Name: "max",
			Validators: []forms.Validator{
				forms.IsOptional{Default: 10},
				forms.IsInteger{HasMin: true, Min: 0, HasMax: true, Max: 100},
			},
		},
	},
}
View Source
var FileWriterForm = forms.Form{
	ErrorMsg: "invalid data encountered in the file writer form",
	Fields: []forms.Field{
		{
			Name: "path",
			Validators: []forms.Validator{
				forms.IsOptional{Default: ""},
				forms.IsString{},
			},
		},
		{
			Name: "base-name",
			Validators: []forms.Validator{
				forms.IsRequired{},
				forms.IsString{},
			},
		},
		{
			Name: "format",
			Validators: []forms.Validator{
				forms.IsOptional{Default: "json"},
				forms.IsIn{Choices: []interface{}{"json", "csv"}},
			},
		},
		{
			Name: "compress",
			Validators: []forms.Validator{
				forms.IsOptional{Default: false},
				forms.IsBoolean{},
			},
		},
		{
			Name: "add-time",
			Validators: []forms.Validator{
				forms.IsOptional{Default: false},
				forms.IsBoolean{},
			},
		},
	},
}
View Source
var HTTPWriterForm = forms.Form{
	ErrorMsg: "invalid data encountered in the HTTP writer form",
	Fields: []forms.Field{
		{
			Name: "format",
			Validators: []forms.Validator{
				forms.IsIn{Choices: []interface{}{"json"}},
			},
		},
		{
			Name: "url",
			Validators: []forms.Validator{
				forms.IsOptional{Default: false},
				forms.IsString{},
			},
		},
		{
			Name: "headers",
			Validators: []forms.Validator{
				forms.IsOptional{Default: false},
				forms.IsStringMap{
					Form: &forms.Form{
						Fields: []forms.Field{
							{
								Name: "*",
								Validators: []forms.Validator{
									forms.IsString{},
								},
							},
						},
					},
				},
			},
		},
	},
}
View Source
var Writers = kodex.WriterDefinitions{
	"file": kodex.WriterDefinition{
		Maker:    MakeFileWriter,
		Form:     FileWriterForm,
		Internal: true,
	},
	"http": kodex.WriterDefinition{
		Maker: MakeHTTPWriter,
		Form:  HTTPWriterForm,
	},
	"bytes": kodex.WriterDefinition{
		Maker:    MakeBytesWriter,
		Form:     BytesWriterForm,
		Internal: true,
	},
	"in-memory": kodex.WriterDefinition{
		Maker:    MakeInMemoryWriter,
		Internal: true,
	},
	"stdout": kodex.WriterDefinition{
		Maker:    MakeStdoutWriter,
		Internal: true,
	},
	"amqp": kodex.WriterDefinition{
		Maker:    MakeAMQPWriter,
		Form:     AMQPWriterForm,
		Internal: false,
	},
	"count": kodex.WriterDefinition{
		Maker:    MakeCountWriter,
		Form:     CountWriterForm,
		Internal: true,
	},
}

Functions

func MakeAMQPWriter

func MakeAMQPWriter(config map[string]interface{}) (kodex.Writer, error)

func MakeBytesWriter

func MakeBytesWriter(config map[string]interface{}) (kodex.Writer, error)

func MakeCountWriter

func MakeCountWriter(config map[string]interface{}) (kodex.Writer, error)

func MakeFileWriter

func MakeFileWriter(config map[string]interface{}) (kodex.Writer, error)

func MakeHTTPWriter

func MakeHTTPWriter(config map[string]interface{}) (kodex.Writer, error)

func MakeInMemoryWriter

func MakeInMemoryWriter(params map[string]interface{}) (kodex.Writer, error)

func MakeStdoutWriter

func MakeStdoutWriter(params map[string]interface{}) (kodex.Writer, error)

Types

type AMQPBase

type AMQPBase struct {
	Connection          *amqp.Connection
	Channel             *amqp.Channel
	Queue               amqp.Queue
	QueueExpiresAfterMs int64
	URL                 string
	Format              string
	Compress            bool
	QueueName           string
	RoutingKey          string
	BaseRoutingKey      string
	BaseQueueName       string
	Exchange            string
	ExchangeType        string
	Model               kodex.Model
}

func MakeAMQPBase

func MakeAMQPBase(params map[string]interface{}) (AMQPBase, error)

func (*AMQPBase) Setup

func (a *AMQPBase) Setup(config kodex.Config) error

func (*AMQPBase) SetupWithModel

func (a *AMQPBase) SetupWithModel(model kodex.Model) error

func (*AMQPBase) Teardown

func (a *AMQPBase) Teardown() error

type AMQPWriter

type AMQPWriter struct {
	ConfirmationTimeout float64
	Confirmations       chan amqp.Confirmation
	AMQPBase
}

func (*AMQPWriter) Setup

func (a *AMQPWriter) Setup(config kodex.Config) error

func (*AMQPWriter) SetupWithModel

func (a *AMQPWriter) SetupWithModel(model kodex.Model) error

func (*AMQPWriter) Write

func (a *AMQPWriter) Write(payload kodex.Payload) error

type BytesWriter

type BytesWriter struct {
	Output   []byte
	Format   string
	Compress bool
	// contains filtered or unexported fields
}

func (*BytesWriter) Setup

func (s *BytesWriter) Setup(config kodex.Config) error

func (*BytesWriter) Teardown

func (s *BytesWriter) Teardown() error

func (*BytesWriter) Write

func (s *BytesWriter) Write(payload kodex.Payload) error

type CountWriter

type CountWriter struct {
	// contains filtered or unexported fields
}

func (*CountWriter) Count

func (s *CountWriter) Count() int64

func (*CountWriter) LastItems

func (s *CountWriter) LastItems() []map[string]interface{}

func (*CountWriter) Setup

func (s *CountWriter) Setup(config kodex.Config) error

func (*CountWriter) Teardown

func (s *CountWriter) Teardown() error

func (*CountWriter) Write

func (s *CountWriter) Write(payload kodex.Payload) error

type FileWriter

type FileWriter struct {
	BasePath string
	Name     string
	Format   string
	Compress bool
	AddTime  bool
	// contains filtered or unexported fields
}

func (*FileWriter) Setup

func (s *FileWriter) Setup(config kodex.Config) error

func (*FileWriter) Teardown

func (s *FileWriter) Teardown() error

func (*FileWriter) Write

func (s *FileWriter) Write(payload kodex.Payload) error

type HTTPWriter

type HTTPWriter struct {
	Format  string
	URL     string
	Config  kodex.Config
	Headers map[string]interface{}
}

func (*HTTPWriter) Setup

func (s *HTTPWriter) Setup(config kodex.Config) error

func (*HTTPWriter) Teardown

func (s *HTTPWriter) Teardown() error

func (*HTTPWriter) Write

func (s *HTTPWriter) Write(payload kodex.Payload) error

type InMemoryWriter

type InMemoryWriter struct {
	// contains filtered or unexported fields
}

func (*InMemoryWriter) Result

func (s *InMemoryWriter) Result() []map[string]interface{}

func (*InMemoryWriter) Setup

func (s *InMemoryWriter) Setup(config kodex.Config) error

func (*InMemoryWriter) Teardown

func (s *InMemoryWriter) Teardown() error

func (*InMemoryWriter) Write

func (s *InMemoryWriter) Write(payload kodex.Payload) error

type StdoutWriter

type StdoutWriter struct {
}

func (*StdoutWriter) Setup

func (s *StdoutWriter) Setup(config kodex.Config) error

func (*StdoutWriter) Teardown

func (s *StdoutWriter) Teardown() error

func (*StdoutWriter) Write

func (s *StdoutWriter) Write(payload kodex.Payload) error

type TopicExchangeChosen

type TopicExchangeChosen struct{}

func (TopicExchangeChosen) Validate

func (t TopicExchangeChosen) Validate(value interface{}, values map[string]interface{}) (interface{}, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL