transport

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2017 License: Apache-2.0 Imports: 13 Imported by: 0

README

transport

BuildStatus GoDoc GoWalker

data transportation tool, from one to another.such as,file, kafka, hdfs etc.

Framwwork

Framework

Go version >= 1.9.0

接口

// 将配置文件解析成制定格式v的结构
type Configer interface {
    Parse(v interface{}) error
}

任何实现了Inputer接口,即可做为input组件
type Inputer interface {
    Init(config Configer) error
    Start() error
    Read([]byte) (int, error)
    Close() error
    Version() string
}

任何实现了Adapter接口,即可做为数据处理组件
type Adapter interface {
    Init(config Configer) error
    Handle(in, out byte) error
    Version() string
}


任何实现了Outputer接口,即可作为output组件
type Outputer interface {
    Init(config Configer) error
    Start() error
    Write([]byte) (int, error)
    Close() error
    Version() string
}
Input组件:
Output组件:
Handler组件:
  • null,直接连接input,output
  • addenter,在行尾加入换行符,例子:写文件
  • grok,正则格式化成json格式,说明: ^(?P<命名>子表达式)$ 被捕获的组,该组被编号且被命名 (子匹配)"
  • kv,string split 成json格式
Handler可以组合Inject struct,以实现向Input/Output中注入数据,示例
package codec

import (
    "github.com/luopengift/transport"
    "time"
)

type DebugInjectHandler struct {
    *transport.Inject
}

func (h *DebugInjectHandler) Init(config transport.Configer) error {
    return nil
}

func (h *DebugInjectHandler) Handle(in, out []byte) (int, error) {
    time.Sleep(1 * time.Second) // make program run slow down
    h.InjectInput(in)   //将输入数据,再次inject回recv_chan,实现数据循环处理
    n := copy(out, in)
    return n, nil
}

func init() {
    transport.RegistHandler("DEBUG_InjectInput", new(DebugInjectHandler))
}
使用
  1. 下载
git clone https://github.com/luopengift/transport.git
cd transport
  1. 编译
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ./init.sh build cmd/main.go 
2017-09-14.15:09:14
GOPATH init Finished. GOPATH=/data/golang:/data/golang/src
build transport success.
  1. 查看插件列表
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ./transport -h
Usage of ./transport:
  -f string
        (config)配置文件
  -l    (list)查看插件列表和插件版本
  -r    (read)读取当前配置文件
  -v    (version)版本号
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ./transport -l
[Inputs]         version
  random          0.0.1
  std             0.0.1
  exec            0.0.1
  file            0.0.1
  files           0.0.1
  hdfs            0.0.1
  http            0.0.1
  kafka           0.0.1
[Adapters]       
  kv              0.0.3
  zhizilog        0.0.1
  null            0.0.1
  addenter        0.0.1
  grok            0.0.1
  inject          0.0.1_debug
[Outputers]      
  elasticsearch   0.0.1
  file            0.0.1
  hdfs            0.0.1
  kafka           0.0.1
  null            0.0.1
  std             0.0.1
  tcp             0.0.1

  1. 查看当前配置文件是否可以加载成功
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ./transport -f test/kafka-file.json -r
2017-09-14 15:11:59.955 [I] <file test/kafka-file.json is END:EOF> 
config info:
[Inputs]
  kafka:
    offset: -1
    addrs: ["10.10.20.14:9092","10.10.20.15:9092","10.10.20.16:9092"]
    topics: ["zhizi-log"]
[Adapts]
  addenter:
[Outputs]
  file:
    path: /tmp/tmp.log

  1. 运行
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ./transport -f test/kafka-file.json  
2017-09-14 15:13:22.107 [I] <file test/kafka-file.json is END:EOF> 
2017-09-14 15:13:22.107 [I] Transport starting... 
2017-09-14 15:13:22.107 [W] Starting loading performance data, please press CTRL+C exit... 
HttpsServer Start 0.0.0.0:12345

^C2017-09-14 15:13:31.526 [W] Get signal:interrupt, Profile File is cpu.prof/mem.prof
  1. 启动服务[加载config.json配置文件]
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ./init.sh start
2017-09-14.15:16:45
GOPATH init Finished. GOPATH=/data/golang:/data/golang/src
transport started..., PID=22778
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ps -ef |grep -v grep |grep transport
root     22778     1  0 15:18 pts/0    00:00:00 ./transport -f config.json
  1. 查看服务状态
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ./init.sh status
2017-09-14.15:18:24
GOPATH init Finished. GOPATH=/data/golang:/data/golang/src
root     22778     1  0 15:18 pts/0    00:00:00 ./transport -f config.json
transport now is running already, PID=22778
  1. 查看运行日志
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ./init.sh tail 
2017-09-14.15:22:07
GOPATH init Finished. GOPATH=/data/golang:/data/golang/src
2017-09-14 15:18:16.399 [D] [transport] [files] recv 2017-01-02 15:58:43 DEBUG This is a debug Test 
2017-09-14 15:18:16.399 [D] [transport] [files] recv 44 
2017-09-14 15:18:16.399 [D] [transport] send 44
......
  1. 停止服务
[root@iZm5egf7xb48axmu4z1t3fZ transport]# ./init.sh stop
2017-09-14.15:19:09
GOPATH init Finished. GOPATH=/data/golang:/data/golang/src
transport stoped...

TODO

  1. 优化性能
  2. 加入更多组件

Documentation

Index

Constants

View Source
const (
	B  = 1         //1B = 8bit
	KB = 1024 * B  //1KB
	MB = 1024 * KB //1MB
	GB = 1024 * MB //1GB
	TB = 1024 * GB //1TB
	PB = 1024 * TB //1PB

)
View Source
const (
	VERSION = "0.0.2"
)

Variables

View Source
var (
	ClosedError = errors.New("Closed")

	ReadBufferClosedError  = errors.New("ReadBufferClosedError:chan is closed")
	WriteBufferClosedError = errors.New("WriteBufferClosedError:chan is closed")

	MaxBytesError   = errors.New("MaxBytesError:message is larger than byte buffer")
	InputNullError  = errors.New("InputError:plugin is null")
	OutputNullError = errors.New("OutputError:plugin is null")
	UnknownError    = errors.New("UnknownError:unknow error")
)

Functions

func AddCronTask

func AddCronTask(name, spec string, fun func() error) error

增加定时任务

func DelCronTask

func DelCronTask(name string) error

删除定时任务

func PluginDetail

func PluginDetail() string

func RegistHandler

func RegistHandler(key string, a Adapter)

func RegistInputer

func RegistInputer(key string, input Inputer)

func RegistOutputer

func RegistOutputer(key string, output Outputer)

Types

type Adapter

type Adapter interface {
	Init(config Configer) error
	Handle(in, out []byte) (n int, err error)
	Version() string
}

type Codec

type Codec struct {
	Name string

	*sync.Mutex

	Adapter
	// contains filtered or unexported fields
}

func NewCodec

func NewCodec(name string, a Adapter, size int) *Codec

func (*Codec) Count

func (c *Codec) Count() uint64

func (*Codec) Handle

func (c *Codec) Handle(in, out []byte) (int, error)

func (*Codec) Init

func (c *Codec) Init(config Configer) error

type Config

type Config struct {
	Runtime      *RuntimeConfig          `json:"runtime"`
	InputConfig  map[string]pluginConfig `json:"inputs"`
	HandleConfig map[string]pluginConfig `json:"handles"`
	OutputConfig map[string]pluginConfig `json:"outputs"`
}

func NewConfig

func NewConfig(path string) *Config

func (*Config) Init

func (cfg *Config) Init(path string) error

func (*Config) InitCodecs

func (cfg *Config) InitCodecs() ([]*Codec, error)

func (*Config) InitInputs

func (cfg *Config) InitInputs() ([]*Input, error)

func (*Config) InitOutputs

func (cfg *Config) InitOutputs() ([]*Output, error)

func (*Config) String

func (cfg *Config) String() string

type Configer

type Configer interface {
	Parse(interface{}) error
}

type Inject

type Inject struct{}

func (*Inject) InjectInput

func (i *Inject) InjectInput(p []byte) error

func (*Inject) InjectOutput

func (i *Inject) InjectOutput(p []byte) error

type Input

type Input struct {
	Name string

	*sync.Mutex
	Inputer
	// contains filtered or unexported fields
}

func NewInput

func NewInput(name string, in Inputer) *Input

func (*Input) Close

func (i *Input) Close() error

func (*Input) Count

func (i *Input) Count() uint64

func (*Input) Read

func (i *Input) Read(p []byte) (int, error)

func (*Input) Set

func (i *Input) Set(in Inputer) error

func (*Input) Start

func (i *Input) Start() error

func (*Input) Version

func (i *Input) Version() string

type Inputer

type Inputer interface {
	Init(Configer) error
	Start() error
	Read(p []byte) (n int, err error)
	Close() error
	Version() string
}

数据输入接口

type Output

type Output struct {
	Name string

	*sync.Mutex
	Outputer
	// contains filtered or unexported fields
}

func NewOutput

func NewOutput(name string, out Outputer) *Output

func (*Output) Close

func (o *Output) Close() error

func (*Output) Count

func (o *Output) Count() uint64

func (*Output) Set

func (o *Output) Set(out Outputer) error

func (*Output) Start

func (o *Output) Start() error

func (*Output) Write

func (o *Output) Write(p []byte) (int, error)

type Outputer

type Outputer interface {
	Init(Configer) error
	Write(p []byte) (n int, err error)
	Start() error
	Close() error

	Version() string
}

数据输入接口, 实现了标准io库中的ReadCloser接口

type PluginsMap

type PluginsMap struct {
	Inputers  map[string]Inputer
	Outputers map[string]Outputer
	Adapters  map[string]Adapter
}
var Plugins *PluginsMap

func NewPluginsMap

func NewPluginsMap() *PluginsMap

type RuntimeConfig

type RuntimeConfig struct {
	DEBUG    bool   `json:"DEBUG"`
	MAXPROCS int    `json:"MAXPROCS"`
	BYTESIZE int    `json:"BYTESIZE"`
	CHANSIZE int    `json:"CHANSIZE"`
	VERSION  string `json:"VERSION"`
	HTTP     string `json:"HTTP"`
}

func NewRuntimeConfig

func NewRuntimeConfig() *RuntimeConfig

type Transport

type Transport struct {
	Inputs  []*Input
	Outputs []*Output
	Codecs  []*Codec
	// contains filtered or unexported fields
}
var T *Transport

func NewTransport

func NewTransport(cfg *Config) (*Transport, error)

func (*Transport) Run

func (t *Transport) Run()

func (*Transport) RunCodecs

func (t *Transport) RunCodecs()

func (*Transport) RunInputs

func (t *Transport) RunInputs()

func (*Transport) RunOutputs

func (t *Transport) RunOutputs()

func (*Transport) Stat

func (t *Transport) Stat() string

func (*Transport) Stop

func (t *Transport) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL