Documentation ¶
Overview ¶
Package norm provides reliable UDP using multicast and unicast sockets. norm is a cgo wrapper for NACK-Oriented Reliable Multicast (NORM). libnorm can replace TCP/IP, and in many use cases, provide greater performance.
The goal of this package is to provide a one-to-one functional feature set with libnorm. The main exception is the Instance object and NormGetNextEvent(). Instance will create one goroutine to talk to libnorm. Multiple Instance objects can be used when necessary. In addition, multiple Session objects can be used under one Instance.
The maximum Object_type_data size is limited to the largest []byte indexed by an int type. The maximum []byte on a 32 bit system is ~ 2 GB (math.MaxInt32). The maximum []byte on a 64 bit system is much greater (math.MaxInt64). One can send larger data than an int type by sending multiple Object_type_data messages, or using an Object_type_file, or Object_type_stream. This package could easily be extended to support larger Object_type_data types. Make a github.com issue if this is necessary.
The Object_type_file and Object_type_data data types are delivered reliably, however, objects can be delivered out-of-order. Object_type_stream is delivered reliably and bytes arrive in order, however, the application determines the message boundaries for a group of bytes.
The Object_type_data and Object_type_file data types will be retained (Object.Retain()/(NormObjectRetain()) automatically. The application must call Object.Release() to free memory when these Object_types are no longer used.
Package norm is safe for concurrent goroutine use.
Index ¶
- Constants
- Variables
- type Acking_status
- type Event
- type Event_type
- type Flush_mode
- type Instance
- func (o *Instance) Close_debug_log()
- func (o *Instance) Close_debug_pipe()
- func (o *Instance) Create_session(address string, port int, node_id Node_id) (r *Session, err error)
- func (o *Instance) Destroy()
- func (o *Instance) Get_debug() bool
- func (o *Instance) Get_debug_level() uint
- func (o *Instance) Get_version() string
- func (o *Instance) Open_debug_log(file_name string) bool
- func (o *Instance) Open_debug_pipe(file_name string) bool
- func (o *Instance) Restart_instance() bool
- func (o *Instance) Set_cache_directory(cache_path string) bool
- func (o *Instance) Set_debug(debug bool)
- func (o *Instance) Set_debug_level(debug_level uint)
- func (o *Instance) Stop_instance()
- type Nacking_mode
- type Node
- func (o *Node) Delete()
- func (o *Node) Free_buffers()
- func (o *Node) Get_address() (address string, port uint16, success bool)
- func (o *Node) Get_address_all() string
- func (o *Node) Get_command() (cmd *bytes.Buffer)
- func (o *Node) Get_grtt() float64
- func (o *Node) Get_id() uint32
- func (o *Node) Release()
- func (o *Node) Retain()
- func (o *Node) Set_nacking_mode(mode Nacking_mode)
- func (o *Node) Set_repair_boundary(boundary Repair_boundary)
- func (o *Node) Set_unicast_nack(enable bool)
- type Node_id
- type Object
- func (o *Object) Cancel()
- func (o *Object) Data_access_data(release bool) (data *bytes.Buffer)
- func (o *Object) Data_detach_data()
- func (o *Object) File_get_name() (file_name *bytes.Buffer)
- func (o *Object) File_rename(new_file_name string) bool
- func (o *Object) Get_bytes_pending() uint64
- func (o *Object) Get_info() (info *bytes.Buffer)
- func (o *Object) Get_info_length() int
- func (o *Object) Get_sender() (node *Node)
- func (o *Object) Get_size() uint64
- func (o *Object) Get_type() Object_type
- func (o *Object) Has_info() bool
- func (o *Object) Is_retained() bool
- func (o *Object) Release()
- func (o *Object) Retain()
- func (o *Object) Set_nacking_mode(mode Nacking_mode)
- func (o *Object) Stream_close(graceful bool)
- func (o *Object) Stream_flush(eom bool, mode Flush_mode)
- func (o *Object) Stream_get_read_offset() uint64
- func (o *Object) Stream_has_vacancy() bool
- func (o *Object) Stream_mark_eom()
- func (o *Object) Stream_read() (data *bytes.Buffer, ret bool)
- func (o *Object) Stream_seek_msg_start() bool
- func (o *Object) Stream_set_auto_flush(mode Flush_mode)
- func (o *Object) Stream_set_push_enable(enable bool)
- func (o *Object) Stream_write(data []byte) int
- func (o Object) String() string
- type Object_type
- type Probe_mode
- type Repair_boundary
- type Session
- func (o *Session) Add_acking_node(node_id Node_id) bool
- func (o *Session) Add_remove_acking_node(node_id uint32)
- func (o *Session) Cancel_command()
- func (o *Session) Cancel_watermark()
- func (o *Session) Data_enqueue(data, info []byte) (*Object, error)
- func (o *Session) Destroy()
- func (o *Session) Events() <-chan *Event
- func (o *Session) File_enqueue(file_name string, info []byte) (*Object, error)
- func (o *Session) Get_acking_status(node_id uint32) Acking_status
- func (o *Session) Get_events() Event_type
- func (o *Session) Get_grtt_estimate() float64
- func (o *Session) Get_local_node_id() Node_id
- func (o *Session) Get_next_acking_node(status Acking_status) (node_id uint32, success bool)
- func (o *Session) Get_tx_rate() float64
- func (o *Session) Get_user_data() interface{}
- func (o *Session) Requeue_object(obj *Object) bool
- func (o *Session) Send_command(cmd []byte, robust bool) bool
- func (o *Session) Set_auto_parity(auto_parity uint8)
- func (o *Session) Set_backoff_factor(factor float64)
- func (o *Session) Set_congestion_control(enable, adjust_rate bool)
- func (o *Session) Set_default_nacking_mode(mode Nacking_mode)
- func (o *Session) Set_default_repair_boundary(boundary Repair_boundary)
- func (o *Session) Set_default_rx_robust_factor(rx_robust_factor int)
- func (o *Session) Set_default_sync_Policy(policy Sync_policy)
- func (o *Session) Set_default_unicast_nack(enable bool)
- func (o *Session) Set_events(events Event_type)
- func (o *Session) Set_flow_control(factor float64)
- func (o *Session) Set_fragmentation(enable bool) bool
- func (o *Session) Set_group_size(size uint)
- func (o *Session) Set_grtt_estimate(grtt float64)
- func (o *Session) Set_grtt_max(max float64)
- func (o *Session) Set_grtt_probing_interval(min, max float64)
- func (o *Session) Set_grtt_probing_mode(mode Probe_mode)
- func (o *Session) Set_loopback(enable bool)
- func (o *Session) Set_message_trace(trace bool)
- func (o *Session) Set_multicast_interface(address string) bool
- func (o *Session) Set_rx_cache_limit(rx_cache_limit int)
- func (o *Session) Set_rx_port_reuse(rx_port_reuse bool, rx_bind_address, rx_sender_address string, ...)
- func (o *Session) Set_rx_socket_buffer(buffer_size uint) bool
- func (o *Session) Set_silent_receiver(silent bool, max_delay int)
- func (o *Session) Set_ssm(address string) bool
- func (o *Session) Set_tos(tos uint8) bool
- func (o *Session) Set_ttl(ttl uint8) bool
- func (o *Session) Set_tx_cache_bounds(size_max uint, count_min, count_max uint32)
- func (o *Session) Set_tx_only(tx_only, connect_to_session_address bool)
- func (o *Session) Set_tx_port(tx_port uint16, tx_port_reuse bool, tx_bind_address string) bool
- func (o *Session) Set_tx_rate(tx_rate float64)
- func (o *Session) Set_tx_rate_bounds(rate_min, rate_max float64)
- func (o *Session) Set_tx_robust_factor(factor int)
- func (o *Session) Set_tx_socket_buffer(buffer_size uint) bool
- func (o *Session) Set_user_data(user_data interface{})
- func (o *Session) Set_watermark(obj *Object, override_flush bool) bool
- func (o *Session) Start_receiver(buffer uint) bool
- func (o *Session) Start_sender(id uint32, buffer_space uint, segment_size, block_size, num_parity uint16, ...) bool
- func (o *Session) Stop_receiver()
- func (o *Session) Stop_sender()
- func (o *Session) Stream_open(buffer_size int, info []byte) (obj *Object, err error)
- type Sync_policy
Constants ¶
const ( Max_rx_cache_limit = 16384 Max_int = int(^uint(0) >> 1) )
const ( // These events are bit flags and can be or'd together. // See Session.Set_events(). Event_type_all = event_type_all Event_type_invalid = event_type_invalid Event_type_tx_queue_vacancy = event_type_tx_queue_vacancy Event_type_tx_queue_empty = event_type_tx_queue_empty Event_type_tx_flush_completed = event_type_tx_flush_completed Event_type_tx_watermark_completed = event_type_tx_watermark_completed Event_type_tx_cmd_sent = event_type_tx_cmd_sent Event_type_tx_object_sent = event_type_tx_object_sent Event_type_tx_object_purged = event_type_tx_object_purged Event_type_tx_rate_changed = event_type_tx_rate_changed Event_type_local_sender_closed = event_type_local_sender_closed Event_type_remote_sender_new = event_type_remote_sender_new Event_type_remote_sender_reset = event_type_remote_sender_reset Event_type_remote_sender_address = event_type_remote_sender_address Event_type_remote_sender_active = event_type_remote_sender_active Event_type_remote_sender_inactive = event_type_remote_sender_inactive Event_type_remote_sender_purged = event_type_remote_sender_purged Event_type_rx_cmd_new = event_type_rx_cmd_new Event_type_rx_object_new = event_type_rx_object_new Event_type_rx_object_info = event_type_rx_object_info Event_type_rx_object_updated = event_type_rx_object_updated // Not sent for Object_type_stream until Stream_close() // Object_type_data and Object_type_file are retained (Object.Retain()) // automatically. Must call Object.Release() to release memory. Event_type_rx_object_completed = event_type_rx_object_completed Event_type_rx_object_aborted = event_type_rx_object_aborted Event_type_grtt_updated = event_type_grtt_updated Event_type_cc_active = event_type_cc_active Event_type_cc_inactive = event_type_cc_inactive Event_type_acking_node_new = event_type_acking_node_new Event_type_send_error = event_type_send_error // ICMP error (e.g. destination unreachable) Event_type_user_timeout = event_type_user_timeout )
NormDeveloperGuide.html events:
const ( Object_type_none Object_type = object_type_none Object_type_data = object_type_data Object_type_file = object_type_file Object_type_stream = object_type_stream )
const ( Ack_invalid Acking_status = ack_invalid Ack_failure = ack_failure Ack_pending = ack_pending Ack_success = ack_success )
const ( Nack_none Nacking_mode = nack_none Nack_info_only = nack_info_only Nack_normal = nack_normal )
const ( Flush_none Flush_mode = flush_none Flush_passive = flush_passive Flush_active = flush_active )
const ( Probe_none Probe_mode = probe_none Probe_passive = probe_passive Probe_active = probe_active )
Variables ¶
Functions ¶
This section is empty.
Types ¶
type Acking_status ¶
type Acking_status norm_acking_status
func (Acking_status) String ¶
func (o Acking_status) String() string
type Event ¶
type Event struct { Type Event_type O *Object Node *Node Grtt float64 Tx_rate float64 }
type Event_type ¶
type Event_type uint
func (Event_type) String ¶
func (o Event_type) String() string
type Flush_mode ¶
type Flush_mode int
func (Flush_mode) String ¶
func (o Flush_mode) String() string
type Instance ¶
type Instance struct {
// contains filtered or unexported fields
}
func Create_instance ¶
If wg is not nil, wg.Add()/wg.Done() will be called for the internal goroutine thus allowing for a clean shutdown.
func (*Instance) Close_debug_log ¶
func (o *Instance) Close_debug_log()
func (*Instance) Close_debug_pipe ¶
func (o *Instance) Close_debug_pipe()
func (*Instance) Create_session ¶
func (o *Instance) Create_session(address string, port int, node_id Node_id) (r *Session, err error)
node_id: 0, set to time.Now()
https://raw.githubusercontent.com/aletheia7/norm/master/norm/NormSocketBindingNotes.txt
func (*Instance) Get_debug_level ¶
func (*Instance) Get_version ¶
func (*Instance) Open_debug_log ¶
default: stderr
func (*Instance) Open_debug_pipe ¶
func (*Instance) Restart_instance ¶
func (*Instance) Set_cache_directory ¶
func (*Instance) Set_debug_level ¶
debug_level: 3, set between 0 and 12 inclusive.
func (*Instance) Stop_instance ¶
func (o *Instance) Stop_instance()
type Nacking_mode ¶
type Nacking_mode norm_nacking_mode
func (Nacking_mode) String ¶
func (o Nacking_mode) String() string
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
func (*Node) Free_buffers ¶
func (o *Node) Free_buffers()
func (*Node) Get_address ¶
Returns: address ("127.0.0.1") and port, or nil
func (*Node) Get_address_all ¶
Returns: address:port ("127.0.0.1:32233") or nil
func (*Node) Get_command ¶
Returns: cmd will contain the command or be empty and non-nil.
func (*Node) Set_repair_boundary ¶
func (o *Node) Set_repair_boundary(boundary Repair_boundary)
Receiver func
func (*Node) Set_unicast_nack ¶
Receiver func
type Object ¶
type Object struct { // sequential integer (1 based) per Instance Id uint64 // contains filtered or unexported fields }
func (*Object) Data_access_data ¶
func (*Object) File_get_name ¶
func (*Object) File_rename ¶
func (*Object) Get_bytes_pending ¶
func (*Object) Get_info_length ¶
func (*Object) Get_sender ¶
Returns: *Node or nil
func (*Object) Get_type ¶
func (o *Object) Get_type() Object_type
func (*Object) Is_retained ¶
func (*Object) Set_nacking_mode ¶
func (o *Object) Set_nacking_mode(mode Nacking_mode)
Receiver func
func (*Object) Stream_get_read_offset ¶
Receiver func
func (*Object) Stream_has_vacancy ¶
Sender func
func (*Object) Stream_read ¶
Receiver func
func (*Object) Stream_seek_msg_start ¶
Receiver func
func (*Object) Stream_set_auto_flush ¶
func (o *Object) Stream_set_auto_flush(mode Flush_mode)
Sender func
func (*Object) Stream_set_push_enable ¶
Sender func
func (*Object) Stream_write ¶
Sender func
type Object_type ¶
type Object_type uint32
func (Object_type) String ¶
func (o Object_type) String() string
type Probe_mode ¶
type Probe_mode int
func (Probe_mode) String ¶
func (o Probe_mode) String() string
type Repair_boundary ¶
type Repair_boundary norm_repair_boundary
const ( Boundary_block Repair_boundary = repair_block Boundary_object = repair_object )
func (Repair_boundary) String ¶
func (o Repair_boundary) String() string
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
func (*Session) Add_acking_node ¶
Sender func
func (*Session) Add_remove_acking_node ¶
Sender func
func (*Session) Data_enqueue ¶
Sender func
Data_enqueue does not copy data or info. Do not modify info and data until an Event_type_tx_object_purged is received for the returned *Object.Id. Object_type_data and Object_type_file objects are received in the Event_type_rx_object_completed Event.O.Data.
func (*Session) File_enqueue ¶
Sender func
Instance.Set_cache_directory() must be called in the Session Receiver in order for file transfers to occur.
func (*Session) Get_acking_status ¶
func (o *Session) Get_acking_status(node_id uint32) Acking_status
Sender func
func (*Session) Get_events ¶
func (o *Session) Get_events() Event_type
func (*Session) Get_grtt_estimate ¶
Sender func
func (*Session) Get_local_node_id ¶
Sender func
func (*Session) Get_next_acking_node ¶
func (o *Session) Get_next_acking_node(status Acking_status) (node_id uint32, success bool)
Sender func
func (*Session) Get_tx_rate ¶
Sender func
func (*Session) Send_command ¶
Sender func
func (*Session) Set_auto_parity ¶
func (*Session) Set_backoff_factor ¶
Sender func
func (*Session) Set_congestion_control ¶
Sender func
enable: recommended is true. adjust_rate: recommended is true, false is experimental.
func (*Session) Set_default_nacking_mode ¶
func (o *Session) Set_default_nacking_mode(mode Nacking_mode)
Receiver func
func (*Session) Set_default_repair_boundary ¶
func (o *Session) Set_default_repair_boundary(boundary Repair_boundary)
Receiver func
func (*Session) Set_default_rx_robust_factor ¶
Receiver func
func (*Session) Set_default_sync_Policy ¶
func (o *Session) Set_default_sync_Policy(policy Sync_policy)
Receiver func
func (*Session) Set_events ¶
func (o *Session) Set_events(events Event_type)
Set_events determines the events that are delivered through the Session.Events() channel. events is a bit flag. Default: Event_type_all
var sess Session log.Println(sess.Get_events()) // Output: Event_type_all // Event_type_all minus Event_type_grtt_updated minus Event_type_tx_object_sent sess.Set_events(sess.Get_events() &^ (Event_type_grtt_updated | Event_type_tx_object_sent))
func (*Session) Set_flow_control ¶
func (*Session) Set_fragmentation ¶
Sender func
func (*Session) Set_group_size ¶
Sender func
func (*Session) Set_grtt_estimate ¶
Sender func
func (*Session) Set_grtt_max ¶
Sender func
func (*Session) Set_grtt_probing_interval ¶
Sender func
func (*Session) Set_grtt_probing_mode ¶
func (o *Session) Set_grtt_probing_mode(mode Probe_mode)
Sender func
func (*Session) Set_loopback ¶
func (*Session) Set_message_trace ¶
func (*Session) Set_multicast_interface ¶
Sender func
func (*Session) Set_rx_cache_limit ¶
Receiver func
Call before Start_receiver()
default: 256, max: Max_rx_cache_limit
func (*Session) Set_rx_port_reuse ¶
func (o *Session) Set_rx_port_reuse(rx_port_reuse bool, rx_bind_address, rx_sender_address string, rx_sender_address_port uint16)
rx_address: can be nil
rx_send_address can be nil
func (*Session) Set_rx_socket_buffer ¶
func (*Session) Set_silent_receiver ¶
Receiver func
func (*Session) Set_tx_cache_bounds ¶
Sender func
defaults: size_max = 20 Mbyte, count_min = 8, recommended min = 2, count_max = 256
func (*Session) Set_tx_only ¶
func (*Session) Set_tx_port ¶
default: session port in Instance.Create_session().
address can be nil. tx_port_reuse default: false.
func (*Session) Set_tx_rate ¶
Sender func
func (*Session) Set_tx_rate_bounds ¶
Send func
func (*Session) Set_tx_robust_factor ¶
Sender func
func (*Session) Set_tx_socket_buffer ¶
func (*Session) Set_user_data ¶
func (o *Session) Set_user_data(user_data interface{})
Sender func
Set_user_data stores data with the Session. Set_user_data does not store user_data in Norm.
func (*Session) Set_watermark ¶
Sender func
func (*Session) Start_receiver ¶
Receiver func
func (*Session) Start_sender ¶
func (o *Session) Start_sender(id uint32, buffer_space uint, segment_size, block_size, num_parity uint16, fec_id uint8) bool
Sender func
defaults: id: 0 = set to time.Now(), buffer_space: 0 = 1024 * 1024, block_size: 0 = 64, num_parity: 0 = 16.
func (*Session) Stop_sender ¶
func (o *Session) Stop_sender()
func (*Session) Stream_open ¶
Sender func
type Sync_policy ¶
type Sync_policy norm_sync_policy
const ( Sync_current Sync_policy = sync_current Sync_all = sync_all )
func (Sync_policy) String ¶
func (o Sync_policy) String() string