readers

package
v0.1.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 30, 2023 License: AGPL-3.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AMQPReaderForm = forms.Form{
	ErrorMsg: "invalid data encountered in the AMQP reader form",
	Fields: append([]forms.Field{
		{
			Name: "consumer",
			Validators: []forms.Validator{

				forms.IsOptional{Default: ""},
				forms.IsString{},
			},
		},
	}, writers.AMQPBaseForm.Fields...),
}
View Source
var BytesReaderForm = forms.Form{
	ErrorMsg: "invalid data encountered in the bytes reader form",
	Fields: []forms.Field{
		{
			Name: "input",
			Validators: []forms.Validator{
				forms.IsRequired{},
				forms.IsBytes{
					Encoding: "base64",
				},
			},
		},
		{
			Name: "format",
			Validators: []forms.Validator{
				forms.IsIn{Choices: []interface{}{"json"}},
			},
		},
		{
			Name: "compressed",
			Validators: []forms.Validator{
				forms.IsOptional{Default: false},
				forms.IsBoolean{},
			},
		},
		{
			Name: "chunk-size",
			Validators: []forms.Validator{
				forms.IsOptional{Default: 100},
				forms.IsInteger{HasMin: true, Min: 0, HasMax: true, Max: 10000},
			},
		},
		{
			Name: "headers",
			Validators: []forms.Validator{
				forms.IsOptional{Default: map[string]interface{}{}},
				forms.IsStringMap{},
			},
		},
	},
}
View Source
var FileReaderForm = forms.Form{
	ErrorMsg: "invalid data encountered in the file reader form",
	Fields: []forms.Field{
		{
			Name: "path",
			Validators: []forms.Validator{
				forms.IsRequired{},
				forms.IsString{},
			},
		},
		{
			Name: "format",
			Validators: []forms.Validator{
				forms.IsRequired{},
				forms.IsIn{Choices: []interface{}{"json"}},
			},
		},
		{
			Name: "compressed",
			Validators: []forms.Validator{
				forms.IsOptional{Default: false},
				forms.IsBoolean{},
			},
		},
		{
			Name: "chunk-size",
			Validators: []forms.Validator{
				forms.IsOptional{Default: 10},
				forms.IsInteger{HasMin: true, Min: 0, HasMax: true, Max: 10000},
			},
		},
		{
			Name: "headers",
			Validators: []forms.Validator{
				forms.IsOptional{Default: map[string]interface{}{}},
				forms.IsStringMap{},
			},
		},
	},
}
View Source
var GenerateForm = forms.Form{
	ErrorMsg: "invalid data encountered in the generate reader form",
	Fields: []forms.Field{
		{
			Name: "fields",
			Validators: []forms.Validator{
				forms.IsStringMap{
					Form: &forms.Form{
						Fields: []forms.Field{
							forms.Field{
								Name: "*",
								Validators: []forms.Validator{
									forms.IsStringMap{
										Form: &GeneratorForm,
									},
									IsGenerator{},
								},
							},
						},
					},
				},
			},
		},
		{
			Name: "frequency",
			Validators: []forms.Validator{
				forms.IsOptional{Default: 1000},
				forms.IsFloat{HasMin: true, HasMax: true, Min: 1e-3, Max: 1e6},
			},
		},
	},
}
View Source
var GeneratorForm = forms.Form{
	ErrorMsg: "invalid data encountered in the field config form",
	Fields: []forms.Field{
		forms.Field{
			Name: "type",
			Validators: []forms.Validator{
				forms.IsString{},
				forms.IsIn{Choices: keys(generators)},
			},
		},
		forms.Field{
			Name: "config",
			Validators: []forms.Validator{
				forms.IsOptional{Default: map[string]interface{}{}},
				forms.IsStringMap{},
			},
		},
	},
}
View Source
var LiteralForm = forms.Form{
	Fields: []forms.Field{
		forms.Field{
			Name: "value",
			Validators: []forms.Validator{
				forms.IsRequired{},
			},
		},
	},
}
View Source
var Readers = kodex.ReaderDefinitions{
	"file": kodex.ReaderDefinition{
		Maker:    MakeFileReader,
		Form:     FileReaderForm,
		Internal: true,
	},
	"stdin": kodex.ReaderDefinition{
		Maker:    MakeStdinReader,
		Form:     StdinReaderForm,
		Internal: true,
	},
	"generate": kodex.ReaderDefinition{
		Maker:    MakeGenerateReader,
		Form:     GenerateForm,
		Internal: true,
	},
	"bytes": kodex.ReaderDefinition{
		Maker:    MakeBytesReader,
		Form:     BytesReaderForm,
		Internal: true,
	},
	"amqp": kodex.ReaderDefinition{
		Maker:    MakeAMQPReader,
		Form:     AMQPReaderForm,
		Internal: false,
	},
}
View Source
var StdinReaderForm = forms.Form{
	ErrorMsg: "invalid data encountered in the stdin reader form",
	Fields: []forms.Field{
		{
			Name: "format",
			Validators: []forms.Validator{
				forms.IsRequired{},
				forms.IsIn{Choices: []interface{}{"json"}},
			},
		},
		{
			Name: "compressed",
			Validators: []forms.Validator{
				forms.IsOptional{Default: false},
				forms.IsBoolean{},
			},
		},
		{
			Name: "chunk-size",
			Validators: []forms.Validator{
				forms.IsOptional{Default: 10},
				forms.IsInteger{HasMin: true, Min: 0, HasMax: true, Max: 10000},
			},
		},
		{
			Name: "headers",
			Validators: []forms.Validator{
				forms.IsOptional{Default: map[string]interface{}{}},
				forms.IsStringMap{},
			},
		},
	},
}
View Source
var TimestampForm = forms.Form{
	Fields: []forms.Field{
		forms.Field{
			Name: "format",
			Validators: []forms.Validator{
				forms.IsString{},
				forms.IsIn{Choices: []interface{}{"rfc3339", "unix"}},
			},
		},
	},
}

Functions

func MakeAMQPReader

func MakeAMQPReader(config map[string]interface{}) (kodex.Reader, error)

func MakeBytesReader

func MakeBytesReader(config map[string]interface{}) (kodex.Reader, error)

func MakeFileReader

func MakeFileReader(config map[string]interface{}) (kodex.Reader, error)

func MakeGenerateReader

func MakeGenerateReader(config map[string]interface{}) (kodex.Reader, error)

func MakeStdinReader

func MakeStdinReader(config map[string]interface{}) (kodex.Reader, error)

Types

type AMQPPayload

type AMQPPayload struct {
	// contains filtered or unexported fields
}

func (*AMQPPayload) Acknowledge

func (f *AMQPPayload) Acknowledge() error

func (*AMQPPayload) EndOfStream

func (f *AMQPPayload) EndOfStream() bool

func (*AMQPPayload) Headers

func (f *AMQPPayload) Headers() map[string]interface{}

func (*AMQPPayload) Items

func (f *AMQPPayload) Items() []*kodex.Item

func (*AMQPPayload) Reject

func (f *AMQPPayload) Reject() error

type AMQPReader

type AMQPReader struct {
	writers.AMQPBase

	ConsumerName string
	// contains filtered or unexported fields
}

func (*AMQPReader) MakeAMQPPayload

func (a *AMQPReader) MakeAMQPPayload(delivery amqp.Delivery) (*AMQPPayload, error)

func (*AMQPReader) Peek

func (a *AMQPReader) Peek() (kodex.Payload, error)

func (*AMQPReader) Purge

func (a *AMQPReader) Purge() error

func (*AMQPReader) Read

func (a *AMQPReader) Read() (kodex.Payload, error)

func (*AMQPReader) Setup

func (a *AMQPReader) Setup(stream kodex.Stream) error

type BytesPayload

type BytesPayload struct {
	// contains filtered or unexported fields
}

func MakeGeneratePayload

func MakeGeneratePayload() *BytesPayload

func (*BytesPayload) Acknowledge

func (f *BytesPayload) Acknowledge() error

func (*BytesPayload) EndOfStream

func (f *BytesPayload) EndOfStream() bool

func (*BytesPayload) Headers

func (f *BytesPayload) Headers() map[string]interface{}

func (*BytesPayload) Items

func (f *BytesPayload) Items() []*kodex.Item

func (*BytesPayload) Reject

func (f *BytesPayload) Reject() error

type BytesReader

type BytesReader struct {
	Input      []byte
	Reader     *bufio.Reader
	Format     string
	Compressed bool
	Headers    map[string]interface{}
	ChunkSize  int
}

func (*BytesReader) MakeBytesPayload

func (s *BytesReader) MakeBytesPayload() (*BytesPayload, error)

func (*BytesReader) Purge

func (f *BytesReader) Purge() error

func (*BytesReader) Read

func (s *BytesReader) Read() (kodex.Payload, error)

func (*BytesReader) Setup

func (b *BytesReader) Setup(stream kodex.Stream) error

func (*BytesReader) Teardown

func (s *BytesReader) Teardown() error

type FilePayload

type FilePayload struct {
	// contains filtered or unexported fields
}

func (*FilePayload) Acknowledge

func (f *FilePayload) Acknowledge() error

func (*FilePayload) EndOfStream

func (f *FilePayload) EndOfStream() bool

func (*FilePayload) Headers

func (f *FilePayload) Headers() map[string]interface{}

func (*FilePayload) Items

func (f *FilePayload) Items() []*kodex.Item

func (*FilePayload) Reject

func (f *FilePayload) Reject() error

type FileReader

type FileReader struct {
	Reader     *bufio.Reader
	File       *os.File
	GzReader   *gzip.Reader
	Format     string
	Compressed bool
	Headers    map[string]interface{}
	Path       string
	ChunkSize  int
}

func (*FileReader) MakeFilePayload

func (s *FileReader) MakeFilePayload() (*FilePayload, error)

func (*FileReader) Purge

func (f *FileReader) Purge() error

func (*FileReader) Read

func (s *FileReader) Read() (kodex.Payload, error)

func (*FileReader) Setup

func (s *FileReader) Setup(stream kodex.Stream) error

func (*FileReader) Teardown

func (s *FileReader) Teardown() error

type GeneratePayload

type GeneratePayload struct {
	// contains filtered or unexported fields
}

func (*GeneratePayload) Acknowledge

func (f *GeneratePayload) Acknowledge() error

func (*GeneratePayload) Headers

func (f *GeneratePayload) Headers() map[string]interface{}

func (*GeneratePayload) Items

func (f *GeneratePayload) Items() []*kodex.Item

func (*GeneratePayload) Reject

func (f *GeneratePayload) Reject() error

func (*GeneratePayload) Teardown

func (s *GeneratePayload) Teardown() error

type GenerateReader

type GenerateReader struct {
	// contains filtered or unexported fields
}

func (*GenerateReader) Purge

func (f *GenerateReader) Purge() error

func (*GenerateReader) Read

func (g *GenerateReader) Read() (kodex.Payload, error)

func (*GenerateReader) Setup

func (g *GenerateReader) Setup(stream kodex.Stream) error

func (*GenerateReader) Teardown

func (g *GenerateReader) Teardown() error

type Generator

type Generator func() interface{}

func Literal

func Literal(config map[string]interface{}) (Generator, error)

func Timestamp

func Timestamp(config map[string]interface{}) (Generator, error)

type GeneratorMaker

type GeneratorMaker func(map[string]interface{}) (Generator, error)

type IsGenerator

type IsGenerator struct{}

func (IsGenerator) MarshalJSON

func (g IsGenerator) MarshalJSON() ([]byte, error)

func (IsGenerator) Validate

func (g IsGenerator) Validate(input interface{}, values map[string]interface{}) (interface{}, error)

type StdinPayload

type StdinPayload struct {
	// contains filtered or unexported fields
}

func (*StdinPayload) Acknowledge

func (f *StdinPayload) Acknowledge() error

func (*StdinPayload) EndOfStream

func (f *StdinPayload) EndOfStream() bool

func (*StdinPayload) Headers

func (f *StdinPayload) Headers() map[string]interface{}

func (*StdinPayload) Items

func (f *StdinPayload) Items() []*kodex.Item

func (*StdinPayload) Reject

func (f *StdinPayload) Reject() error

type StdinReader

type StdinReader struct {
	Reader     *bufio.Reader
	GzReader   *gzip.Reader
	Format     string
	Compressed bool
	Headers    map[string]interface{}
	ChunkSize  int
}

func (*StdinReader) MakeStdinPayload

func (s *StdinReader) MakeStdinPayload() (*StdinPayload, error)

func (*StdinReader) Purge

func (f *StdinReader) Purge() error

func (*StdinReader) Read

func (s *StdinReader) Read() (kodex.Payload, error)

func (*StdinReader) Setup

func (s *StdinReader) Setup(stream kodex.Stream) error

func (*StdinReader) Teardown

func (s *StdinReader) Teardown() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL