Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type InputHandler ¶
type InputHandler interface { Start(context.Context) func() error Messages() <-chan *dto.KafkaMessage Progress() ProgressSource }
func NewFileInputHandler ¶
func NewFileInputHandler(reader formatters.Reader) (InputHandler, error)
func NewKafkaInputHandler ¶
func NewKafkaInputHandler(client sarama.Client, topic string) (InputHandler, error)
type OutputHandler ¶
type OutputHandler interface { Run() error Progress() ProgressSource }
func NewFileOutputHandler ¶
func NewFileOutputHandler(input <-chan *dto.KafkaMessage, writer formatters.Writer) (OutputHandler, error)
func NewKafkaOutputHandler ¶
func NewKafkaOutputHandler(input <-chan *dto.KafkaMessage, pacer <-chan time.Time, client sarama.Client, topic string) (OutputHandler, error)
type ProgressSource ¶
type ProgressSource <-chan error
type ReportingHandler ¶
type ReportingHandler interface {
Start(ProgressSource, ...ProgressSource) func() error
}
func NewReportingHandler ¶
func NewReportingHandler(logger *log.Logger, period time.Duration) ReportingHandler
Click to show internal directories.
Click to hide internal directories.