ssf

package module
v0.0.0-...-2219ab8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2016 License: BSD-3-Clause Imports: 28 Imported by: 0

README

SSF

A BSD licensed simple streaming process framework.

Dependency

Features

  • Two Cluster Mode Support
    • Static Multi Servers(no zookeeper dependency)
    • Dynamic Multi Servers(with zookeeper dependency)
  • Standalone Process Task
    • All tasks is running as standalone process, which communicate framework over unix sockets.
    • Multi language support
  • Write Ahead Log
    • Cache data when cluster do failover work or some cluster node is temporary unavailable.
  • Online Trouble Shooting
    • Use 'telent' to attach running process and eneter 'otsc' to enter online trouble shooting interactive mode.

Disvantages

  • No message delivery guarantees
    • One-way message delivery between nodes.
    • Message may lost when cluster failover, process restart, etc.

Example

Documentation

Overview

Package ssf is a generated protocol buffer package.

It is generated from these files:

ssf.proto

It has these top-level messages:

EventHeader
HeartBeat
EventACK
AdminEvent
AdminRequest
AdminResponse

Index

Constants

View Source
const (
	NODE_ACTIVE  uint8 = 1
	NODE_LOADING uint8 = 2
	NODE_FAULT   uint8 = 3
)
View Source
const WALMetaSize int64 = 4096

Variables

View Source
var (
	ErrInvalidLengthSsf = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowSsf   = fmt.Errorf("proto: integer overflow")
)
View Source
var ErrEmptyProto = errors.New("Empty proto message")
View Source
var ErrNoClusterName = errors.New("No cluster name defined")
View Source
var ErrNoNode = errors.New("No cluster node to emit msg")
View Source
var ErrNoProcessor = errors.New("No processor to dispatch msg")
View Source
var ErrProcessorDisconnect = errors.New("Processor disconnect")
View Source
var ErrRPCTimeout = errors.New("Processor RPC timeout")
View Source
var EventType_name = map[int32]string{
	0: "NOTIFY",
	1: "REQUEST",
	2: "RESPONSE",
}
View Source
var EventType_value = map[string]int32{
	"NOTIFY":   0,
	"REQUEST":  1,
	"RESPONSE": 2,
}
View Source
var MAGIC_EVENT_HEADER []byte = []byte("SSFE")
View Source
var MAGIC_OTSC_HEADER []byte = []byte("OTSC")

Functions

func Emit

func Emit(msg proto.Message, hashCode uint64) error

Emit would write msg to SSF framework over IPC unix socket

func HashCode

func HashCode(s []byte) uint64

HashCode return the md5 hashcode of raw bytes

func LocalRPC

func LocalRPC(processor string, request proto.Message, attach interface{}, timeout time.Duration) (proto.Message, error)

LocalRPC would RPC specified processor with given proto request & wait for response

func Notify

func Notify(msg proto.Message, hashCode uint64, writer io.Writer) error

Notify write proto message with given hashcode to writer

func Start

func Start(cfg *ClusterConfig)

Start launch ssf cluster server

func StartProcessor

func StartProcessor(config *ProcessorConfig) error

StartProcessor init & launch go processor

func Stop

func Stop()

Stop ssf cluster server

func UpdateConfig

func UpdateConfig(cfg *ClusterConfig)

UpdateConfig update current config on the fly

Types

type AdminEvent

type AdminEvent struct {
	Content          *string `protobuf:"bytes,1,opt,name=content" json:"content,omitempty"`
	XXX_unrecognized []byte  `json:"-"`
}

func (*AdminEvent) GetContent

func (m *AdminEvent) GetContent() string

func (*AdminEvent) Marshal

func (m *AdminEvent) Marshal() (data []byte, err error)

func (*AdminEvent) MarshalTo

func (m *AdminEvent) MarshalTo(data []byte) (int, error)

func (*AdminEvent) ProtoMessage

func (*AdminEvent) ProtoMessage()

func (*AdminEvent) Reset

func (m *AdminEvent) Reset()

func (*AdminEvent) Size

func (m *AdminEvent) Size() (n int)

func (*AdminEvent) String

func (m *AdminEvent) String() string

func (*AdminEvent) Unmarshal

func (m *AdminEvent) Unmarshal(data []byte) error

type AdminRequest

type AdminRequest struct {
	Line             *string `protobuf:"bytes,1,opt,name=line" json:"line,omitempty"`
	XXX_unrecognized []byte  `json:"-"`
}

func (*AdminRequest) GetLine

func (m *AdminRequest) GetLine() string

func (*AdminRequest) Marshal

func (m *AdminRequest) Marshal() (data []byte, err error)

func (*AdminRequest) MarshalTo

func (m *AdminRequest) MarshalTo(data []byte) (int, error)

func (*AdminRequest) ProtoMessage

func (*AdminRequest) ProtoMessage()

func (*AdminRequest) Reset

func (m *AdminRequest) Reset()

func (*AdminRequest) Size

func (m *AdminRequest) Size() (n int)

func (*AdminRequest) String

func (m *AdminRequest) String() string

func (*AdminRequest) Unmarshal

func (m *AdminRequest) Unmarshal(data []byte) error

type AdminResponse

type AdminResponse struct {
	Close            *bool  `protobuf:"varint,1,opt,name=close" json:"close,omitempty"`
	XXX_unrecognized []byte `json:"-"`
}

func (*AdminResponse) GetClose

func (m *AdminResponse) GetClose() bool

func (*AdminResponse) Marshal

func (m *AdminResponse) Marshal() (data []byte, err error)

func (*AdminResponse) MarshalTo

func (m *AdminResponse) MarshalTo(data []byte) (int, error)

func (*AdminResponse) ProtoMessage

func (*AdminResponse) ProtoMessage()

func (*AdminResponse) Reset

func (m *AdminResponse) Reset()

func (*AdminResponse) Size

func (m *AdminResponse) Size() (n int)

func (*AdminResponse) String

func (m *AdminResponse) String() string

func (*AdminResponse) Unmarshal

func (m *AdminResponse) Unmarshal(data []byte) error

type ClusterConfig

type ClusterConfig struct {
	ZookeeperServers []string
	SSFServers       []string
	//Handler          EventProcessor
	ListenAddr  string
	ProcHome    string
	ClusterName string
	Weight      uint32
	Dispatch    map[string][]string
}

ClusterConfig :SSF cluster launch option

type Consistent

type Consistent struct {
	NumberOfVirtualNode int
	// contains filtered or unexported fields
}

Consistent manager virutal nodes by consistent hash algorithm

func NewConsistent

func NewConsistent(numberOfVirtualNode int) *Consistent

func NewConsistentCopy

func NewConsistentCopy(other *Consistent) *Consistent

func (*Consistent) Add

func (c *Consistent) Add(server string)

func (*Consistent) Get

func (c *Consistent) Get(server string) []int32

func (*Consistent) Remove

func (c *Consistent) Remove(server string)

func (*Consistent) Servers

func (c *Consistent) Servers() []Partition

Servers return sorted partitions

func (*Consistent) Set

func (c *Consistent) Set(partition Partition)

func (*Consistent) Update

func (c *Consistent) Update()

Update update internal partitions

type Event

type Event struct {
	EventHeader
	Msg proto.Message
	// contains filtered or unexported fields
}

type EventACK

type EventACK struct {
	Mask             *uint64 `protobuf:"varint,1,opt,name=mask" json:"mask,omitempty"`
	XXX_unrecognized []byte  `json:"-"`
}

func (*EventACK) GetMask

func (m *EventACK) GetMask() uint64

func (*EventACK) Marshal

func (m *EventACK) Marshal() (data []byte, err error)

func (*EventACK) MarshalTo

func (m *EventACK) MarshalTo(data []byte) (int, error)

func (*EventACK) ProtoMessage

func (*EventACK) ProtoMessage()

func (*EventACK) Reset

func (m *EventACK) Reset()

func (*EventACK) Size

func (m *EventACK) Size() (n int)

func (*EventACK) String

func (m *EventACK) String() string

func (*EventACK) Unmarshal

func (m *EventACK) Unmarshal(data []byte) error

type EventHeader

type EventHeader struct {
	SequenceId       *uint64    `protobuf:"varint,1,opt,name=sequenceId" json:"sequenceId,omitempty"`
	HashCode         *uint64    `protobuf:"varint,2,opt,name=hashCode" json:"hashCode,omitempty"`
	NodeId           *int32     `protobuf:"varint,3,opt,name=nodeId" json:"nodeId,omitempty"`
	MsgType          *string    `protobuf:"bytes,4,opt,name=msgType" json:"msgType,omitempty"`
	From             *string    `protobuf:"bytes,5,opt,name=from" json:"from,omitempty"`
	To               *string    `protobuf:"bytes,6,opt,name=to" json:"to,omitempty"`
	Type             *EventType `protobuf:"varint,7,opt,name=type,enum=ssf.EventType" json:"type,omitempty"`
	XXX_unrecognized []byte     `json:"-"`
}

func (*EventHeader) GetFrom

func (m *EventHeader) GetFrom() string

func (*EventHeader) GetHashCode

func (m *EventHeader) GetHashCode() uint64

func (*EventHeader) GetMsgType

func (m *EventHeader) GetMsgType() string

func (*EventHeader) GetNodeId

func (m *EventHeader) GetNodeId() int32

func (*EventHeader) GetSequenceId

func (m *EventHeader) GetSequenceId() uint64

func (*EventHeader) GetTo

func (m *EventHeader) GetTo() string

func (*EventHeader) GetType

func (m *EventHeader) GetType() EventType

func (*EventHeader) Marshal

func (m *EventHeader) Marshal() (data []byte, err error)

func (*EventHeader) MarshalTo

func (m *EventHeader) MarshalTo(data []byte) (int, error)

func (*EventHeader) ProtoMessage

func (*EventHeader) ProtoMessage()

func (*EventHeader) Reset

func (m *EventHeader) Reset()

func (*EventHeader) Size

func (m *EventHeader) Size() (n int)

func (*EventHeader) String

func (m *EventHeader) String() string

func (*EventHeader) Unmarshal

func (m *EventHeader) Unmarshal(data []byte) error

type EventType

type EventType int32
const (
	EventType_NOTIFY   EventType = 0
	EventType_REQUEST  EventType = 1
	EventType_RESPONSE EventType = 2
)

func (EventType) Enum

func (x EventType) Enum() *EventType

func (EventType) String

func (x EventType) String() string

func (*EventType) UnmarshalJSON

func (x *EventType) UnmarshalJSON(data []byte) error

type HeartBeat

type HeartBeat struct {
	Req              *bool   `protobuf:"varint,1,opt,name=req" json:"req,omitempty"`
	Res              *bool   `protobuf:"varint,2,opt,name=res" json:"res,omitempty"`
	Ts               *uint32 `protobuf:"varint,3,opt,name=ts" json:"ts,omitempty"`
	XXX_unrecognized []byte  `json:"-"`
}

func (*HeartBeat) GetReq

func (m *HeartBeat) GetReq() bool

func (*HeartBeat) GetRes

func (m *HeartBeat) GetRes() bool

func (*HeartBeat) GetTs

func (m *HeartBeat) GetTs() uint32

func (*HeartBeat) Marshal

func (m *HeartBeat) Marshal() (data []byte, err error)

func (*HeartBeat) MarshalTo

func (m *HeartBeat) MarshalTo(data []byte) (int, error)

func (*HeartBeat) ProtoMessage

func (*HeartBeat) ProtoMessage()

func (*HeartBeat) Reset

func (m *HeartBeat) Reset()

func (*HeartBeat) Size

func (m *HeartBeat) Size() (n int)

func (*HeartBeat) String

func (m *HeartBeat) String() string

func (*HeartBeat) Unmarshal

func (m *HeartBeat) Unmarshal(data []byte) error

type Node

type Node struct {
	Id          int32
	PartitionID int32
	Addr        string
	Status      uint8
}

type NodeIOEvent

type NodeIOEvent struct {
	// contains filtered or unexported fields
}

type Partition

type Partition struct {
	Id    int32
	Addr  string
	Nodes []int32
}

type Processor

type Processor interface {
	OnStart() error
	OnStop() error
	OnRPC(request proto.Message) proto.Message
	OnMessage(msg proto.Message, hashCode uint64)
}

Processor define sub processor interface

type ProcessorConfig

type ProcessorConfig struct {
	Home         string
	ClusterName  string
	Proc         Processor
	MaxGORoutine int32
	Name         string
}

ProcessorConfig is the start option

type RawMessage

type RawMessage struct {
	// contains filtered or unexported fields
}

func NewRawMessage

func NewRawMessage(str string) *RawMessage

NewRawMessage create a []byte proto message

func (*RawMessage) Data

func (m *RawMessage) Data() []byte

func (*RawMessage) Marshal

func (m *RawMessage) Marshal() ([]byte, error)

func (*RawMessage) ProtoMessage

func (m *RawMessage) ProtoMessage()

func (*RawMessage) Reset

func (m *RawMessage) Reset()

func (*RawMessage) String

func (m *RawMessage) String() string

func (*RawMessage) Unmarshal

func (m *RawMessage) Unmarshal(p []byte) error

type ServerData

type ServerData struct {
	Addr          string
	Weight        uint32
	ConnectedTime int64
}

Zookeeper path data

type WAL

type WAL struct {
	// contains filtered or unexported fields
}

func (*WAL) Write

func (wal *WAL) Write(content []byte) (int, error)

type WALMeta

type WALMeta struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis
examples
wc
Package main is a generated protocol buffer package.
Package main is a generated protocol buffer package.
tools

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL