node

package
v0.0.0-...-59f270b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2021 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ACTION_JOIN        ActionType = 1 << 0
	ACTION_REPLY_JOIN  ActionType = 1 << 1
	ACTION_NEW_NODE    ActionType = 1 << 2
	ACTION_DELETE_NODE ActionType = 1 << 3
	ACTION_HEARTBEAT   ActionType = 1 << 4
	ACTION_PING        ActionType = 1 << 5
	ACTION_ACK         ActionType = 1 << 6

	STATUS_OK   StatusType = 1 << 0
	STATUS_FAIL StatusType = 1 << 1
	STATUS_END  StatusType = 1 << 2

	LOSS_RATE              = 0.00
	NUM_MONITORS       int = 3
	HEARTBEAT_INTERVAL     = 1500 * time.Millisecond
	TIMEOUT_THRESHOLD      = 4 * time.Second
)
View Source
const (
	GETRequest string = "GET"
	PUTRequest string = "PUT"
)
View Source
const DEBUG = false
View Source
const DUPLICATE_CNT = 4
View Source
const DcliReceiverPort = "8013"
View Source
const FILE_LIST_FILE = "/tmp/file.list"
View Source
const FileServiceName = "SimpleFileService"
View Source
const JuicePartitionMethod = "range"
View Source
const MAX_CAPACITY = 1024
View Source
const MEMBER_LIST_FILE = "/tmp/member.list"
View Source
const MIN_UPDATE_INTERVAL = 60 * 1000
View Source
const MapleJuiceServiceName = "MapleJuiceService"

MapleJuiceServiceName ...

View Source
const READ_QUORUM = 2
View Source
const RPC_DEFAULT_PORT = "8011"
View Source
const SPLIT = "___"
View Source
const TCPBufferSize = 1024
View Source
const TCP_FILE_PORT = "8012"
View Source
const WRITE_QUORUM = 3

Variables

View Source
var HEARTBEAT_LOG_FLAG = false // debug

Functions

func CallGetTimeStamp

func CallGetTimeStamp(address, sdfsFileName string, c chan Pair)

func CallMapleJuiceRequest

func CallMapleJuiceRequest(workerID int, workerAddress string, files []string, args *MapleJuiceTaskArgs, waitChan chan int)

func CallNodesMergeTmpFiles

func CallNodesMergeTmpFiles(receiverAddress []string)

func CallSingleNodeMergeTmpFiles

func CallSingleNodeMergeTmpFiles(address string, ts int, c chan int)

func CheckFile

func CheckFile(sdfsfilename, address string) string

func DeleteFile

func DeleteFile(address, sdfsName string, c chan string) error

func DeleteSDFSDir

func DeleteSDFSDir(address, dir string) error

func GetFile

func GetFile(address, sdfsfilename string, data *[]byte) error

func GetMillisecond

func GetMillisecond() int

func IsInCircleRange

func IsInCircleRange(id, start, end int) bool

func ListFileInSDFSDir

func ListFileInSDFSDir(address, dir string) []string

func ListFilesWithPrefixInNode

func ListFilesWithPrefixInNode(address, prefix string) []string

func NormalCharToSpecial

func NormalCharToSpecial(s string) string

func PutFile

func PutFile(address string, args *StoreFileArgs, c chan int)

func ReadContent

func ReadContent(conn net.Conn) []byte

func ReplyTaskResultToDcli

func ReplyTaskResultToDcli(message, clientAddress string)

func SpecialCharToNormal

func SpecialCharToNormal(s string) string

func WriteJuicePairToLocal

func WriteJuicePairToLocal(outputfile string, kvpair map[string]string)

Types

type ActionType

type ActionType int8

type FileInfo

type FileInfo struct {
	HashID       int
	Sdfsfilename string
	Localpath    string
	Timestamp    int
	MasterNodeID int
	FileLock     *sync.Mutex
	Tmp          bool
}

type FileList

type FileList struct {
	ID       int
	FileMap  map[string]*FileInfo // Key: sdfsfilename, value: fileinfo
	ListLock *sync.Mutex
}

func CreateFileList

func CreateFileList(selfID int) *FileList

func (*FileList) AppendFile

func (fl *FileList) AppendFile(
	sdfsName string,
	root_dir string,
	timestamp int,
	masterNodeID int,
	data []byte) error

func (*FileList) DeleteFileAndInfo

func (fl *FileList) DeleteFileAndInfo(sdfsName string) bool

func (*FileList) DeleteFileInfo

func (fl *FileList) DeleteFileInfo(sdfsfilename string) bool

func (*FileList) DeleteFileInfosOutOfRange

func (fl *FileList) DeleteFileInfosOutOfRange(start, end int) []string

func (*FileList) DeleteSDFSDir

func (fl *FileList) DeleteSDFSDir(dirName string)

func (*FileList) DeleteTmpFilesFromFailedWorker

func (fl *FileList) DeleteTmpFilesFromFailedWorker(workerId int)

func (*FileList) GetFileInfo

func (fl *FileList) GetFileInfo(sdfsfilename string) *FileInfo

func (*FileList) GetFilesInRange

func (fl *FileList) GetFilesInRange(startID, endID int) []string

func (*FileList) GetOwnedFileInfos

func (fl *FileList) GetOwnedFileInfos(masterId int) []FileInfo

func (*FileList) GetTimeStamp

func (fl *FileList) GetTimeStamp(sdfsfilename string) int

func (*FileList) ListFileInDir

func (fl *FileList) ListFileInDir(dir string) []string

func (*FileList) ListFilesWithPrefix

func (fl *FileList) ListFilesWithPrefix(prefix string) []string

func (*FileList) MergeDirectoryWithSurfix

func (fl *FileList) MergeDirectoryWithSurfix(surffix string)

func (*FileList) MergeTmpFiles

func (fl *FileList) MergeTmpFiles(tmpDir, desDir string, ts int)

func (*FileList) PutFileInfo

func (fl *FileList) PutFileInfo(
	sdfsName string,
	path string,
	timestamp int,
	masterNodeID int)

func (*FileList) PutFileInfoBase

func (fl *FileList) PutFileInfoBase(
	hashId int,
	sdfsfilename string,
	abs_path string,
	timestamp int,
	masterNodeID int,
	tmp bool)

func (*FileList) PutFileInfoObject

func (fl *FileList) PutFileInfoObject(sdfsfilename string, fi *FileInfo)

PutFileInfoObject is used For testing

func (*FileList) ServeFile

func (fl *FileList) ServeFile(sdfsfilename string) ([]byte, error)

func (*FileList) StoreFile

func (fl *FileList) StoreFile(
	sdfsName string,
	root_dir string,
	timestamp int,
	masterNodeID int,
	data []byte) error

func (*FileList) StoreFileBase

func (fl *FileList) StoreFileBase(
	hashId int,
	sdfsName string,
	root_dir string,
	timestamp int,
	masterNodeID int,
	data []byte,
	appending bool,
	tmp bool) error

This should only be used in test

func (*FileList) StoreTmpFile

func (fl *FileList) StoreTmpFile(
	sdfsName string,
	root_dir string,
	timestamp int,
	masterNodeID int,
	data []byte) error

func (*FileList) UpdateMasterID

func (fl *FileList) UpdateMasterID(new_master_id int, needUpdate func(fileInfo *FileInfo) bool)

type FileService

type FileService struct {
	// contains filtered or unexported fields
}

func (*FileService) CheckFileExists

func (fileService *FileService) CheckFileExists(sdfsfilename string, hostname *string) error

func (*FileService) DeleteFileRequest

func (fileService *FileService) DeleteFileRequest(sdfsName string, result *RPCResultType) error

func (*FileService) DeleteLocalFile

func (fileService *FileService) DeleteLocalFile(sdfsName string, result *RPCResultType) error

func (*FileService) DeleteSDFSDir

func (fileService *FileService) DeleteSDFSDir(dir string, result *RPCResultType) error

func (*FileService) DeleteSDFSDirRequest

func (fileService *FileService) DeleteSDFSDirRequest(sdfsdir string, result *RPCResultType) error

func (*FileService) GetFileRequest

func (fileService *FileService) GetFileRequest(args []string, result *RPCResultType) error

Executed in coordinator

func (*FileService) GetTimeStamp

func (fileService *FileService) GetTimeStamp(sdfsFileName string, timestamp *int) error

func (*FileService) ListFileInDirRequest

func (fileService *FileService) ListFileInDirRequest(sdfsDir string, res *[]string) error

func (*FileService) ListFileInLocalDir

func (fileService *FileService) ListFileInLocalDir(dir string, result *[]string) error

func (*FileService) ListFilesWithPrefix

func (fileService *FileService) ListFilesWithPrefix(prefix string, result *[]string) error

func (*FileService) Ls

func (fileService *FileService) Ls(sdfsfilename string, hostnames *[]string) error

func (*FileService) PutFileRequest

func (fileService *FileService) PutFileRequest(args *PutFileArgs, result *RPCResultType) error

Callee begin

func (*FileService) ServeLocalFile

func (fileService *FileService) ServeLocalFile(sdfsfilename string, result *[]byte) error

func (*FileService) StoreFileToLocal

func (fileService *FileService) StoreFileToLocal(args *StoreFileArgs, result *RPCResultType) error

type MapleJuiceService

type MapleJuiceService struct {
	TaskQueue chan *MapleJuiceTaskArgs
	SelfNode  *Node
}

func (*MapleJuiceService) AddMapleJuiceTask

func (mj *MapleJuiceService) AddMapleJuiceTask(args *MapleJuiceTaskArgs, result *RPCResultType) error

add maple juice task to queue

func (*MapleJuiceService) ForwardMapleJuiceRequest

func (mj *MapleJuiceService) ForwardMapleJuiceRequest(args *MapleJuiceTaskArgs, result *RPCResultType) error

handle maple/juice request from Dcli send request to Master

func (*MapleJuiceService) MergeTmpFiles

func (mj *MapleJuiceService) MergeTmpFiles(ts int, result *RPCResultType) error

func (*MapleJuiceService) StartMapleJuiceTask

func (mj *MapleJuiceService) StartMapleJuiceTask(des *TaskDescription, result *RPCResultType) error

****

  • Worker ****

type MapleJuiceTaskArgs

type MapleJuiceTaskArgs struct {
	TaskType    MapleJuiceTaskType // "MapleTask" or "JuiceTask"
	Exe         string
	NumWorkers  int
	InputPath   string // sdfs_src_dir for maple, prefix for juice
	OutputPath  string // prefix for maple, sdfs_dest_filename for juice
	ClientAddr  string
	DeleteInput bool
}

MapleJuiceTaskArgs ...

type MapleJuiceTaskType

type MapleJuiceTaskType int8
const (
	MapleTask MapleJuiceTaskType = 1
	JuiceTask MapleJuiceTaskType = 2
)

Task names

type MemberList

type MemberList struct {
	Member_map     map[int]*MemberNode
	Capacity, Size int
	SelfId         int
	// contains filtered or unexported fields
}

func ConstructFromTmpFile

func ConstructFromTmpFile() *MemberList

func CreateMemberList

func CreateMemberList(selfId, capacity int) *MemberList

func (*MemberList) DeleteNode

func (mbList *MemberList) DeleteNode(id int)

func (*MemberList) DumpToTmpFile

func (mbList *MemberList) DumpToTmpFile()

func (*MemberList) FindLeastFreeId

func (mbList *MemberList) FindLeastFreeId() int

func (*MemberList) GetAddress

func (mbList *MemberList) GetAddress(id int) string

func (*MemberList) GetAllAddressesExcludeSelf

func (mbList *MemberList) GetAllAddressesExcludeSelf() []string

func (*MemberList) GetAllRPCAddresses

func (mbList *MemberList) GetAllRPCAddresses() []string

func (*MemberList) GetIP

func (mbList *MemberList) GetIP(id int) string

func (*MemberList) GetNextKNodes

func (mbList *MemberList) GetNextKNodes(id, k int) []MemberNode

func (*MemberList) GetNode

func (mbList *MemberList) GetNode(id int) *MemberNode

func (*MemberList) GetPrevKNodes

func (mbList *MemberList) GetPrevKNodes(id, k int) []MemberNode

func (*MemberList) GetRPCAddress

func (mbList *MemberList) GetRPCAddress(id int) string

func (*MemberList) GetRPCAddressesForNextKNodes

func (mbList *MemberList) GetRPCAddressesForNextKNodes(start, k int) []string

func (*MemberList) GetSmallestNode

func (mbList *MemberList) GetSmallestNode() *MemberNode

func (*MemberList) GetTimeOutNodes

func (mbList *MemberList) GetTimeOutNodes(deadline, id, k int) []MemberNode

func (*MemberList) InsertNode

func (mbList *MemberList) InsertNode(id int, ip, port, rpc_port string, heartbeat_t int, hostname string)

func (*MemberList) NicePrint

func (mbList *MemberList) NicePrint()

func (*MemberList) NodeTimeOut

func (mbList *MemberList) NodeTimeOut(deadline, id int) bool

*** this is for passive monitoring

func (*MemberList) ToJson

func (mbList *MemberList) ToJson() []byte

func (*MemberList) UpdateNodeHeartbeat

func (mbList *MemberList) UpdateNodeHeartbeat(id, heartbeat_t int)

type MemberNode

type MemberNode struct {
	Id          int
	Heartbeat_t int
	JoinTime    string
	Hostname    string
	Ip          string
	Port        string
	RPC_Port    string
	// contains filtered or unexported fields
}

func CreateMemberNode

func CreateMemberNode(id int, ip, port, rpc_port string, heartbeat_t int, hostname string) *MemberNode

func (*MemberNode) GetNextNode

func (mNode *MemberNode) GetNextNode() *MemberNode

func (*MemberNode) GetPrevNode

func (mNode *MemberNode) GetPrevNode() *MemberNode

type Node

type Node struct {
	Id                 int
	IP, Port, RPC_Port string
	MbList             *MemberList

	FileList *FileList
	Root_dir string

	Hostname string

	DisableMonitorHB bool // Disalbe monitor heartbeat, for test
	FailureNodeChan  chan int
	// contains filtered or unexported fields
}

func CreateNode

func CreateNode(ip, port, rpc_port string) *Node

func (*Node) Broadcast

func (node *Node) Broadcast(packet *Packet)

func (*Node) DeleteRedundantFile

func (node *Node) DeleteRedundantFile()

func (*Node) DeleteSDFSDirRequest

func (node *Node) DeleteSDFSDirRequest(sdfsdir string) error

func (*Node) DuplicateReplica

func (node *Node) DuplicateReplica()

func (*Node) GetAddressOfLatestTS

func (node *Node) GetAddressOfLatestTS(sdfsfilename string) (string, int)

func (*Node) GetAddressesWithIds

func (node *Node) GetAddressesWithIds(ids []int) []string

func (*Node) GetFileRequest

func (node *Node) GetFileRequest(args []string, result *RPCResultType) error

func (*Node) GetFilesFromSDFS

func (node *Node) GetFilesFromSDFS(sdfsfiles []string, dir string) error

func (*Node) GetFirstKReplicaNodeID

func (node *Node) GetFirstKReplicaNodeID(sdfsfilename string, K int) []int

func (*Node) GetMasterID

func (node *Node) GetMasterID(sdfsfilename string) int

func (*Node) GetResponsibleAddresses

func (node *Node) GetResponsibleAddresses(sdfsfilename string) []string

func (*Node) GetResponsibleHostname

func (node *Node) GetResponsibleHostname(sdfsName string) []string

func (*Node) HandleJuiceTask

func (node *Node) HandleJuiceTask(input_dir, output_file string, f plugin.Symbol)

func (*Node) HandleMapleTask

func (node *Node) HandleMapleTask(input_dir, output_dir, prefix string, f plugin.Symbol)

func (*Node) HandleTCPFileRequest

func (node *Node) HandleTCPFileRequest(conn net.Conn)

func (*Node) IndividualPutFileRequest

func (node *Node) IndividualPutFileRequest(sdfsName, localName string, forceUpdate, appending, tmp bool, result *RPCResultType) error

func (*Node) InitMemberList

func (node *Node) InitMemberList()

func (*Node) IsAlive

func (node *Node) IsAlive() bool

func (*Node) Join

func (node *Node) Join(address string) bool

func (*Node) JoinNode

func (node *Node) JoinNode(packet Packet)

func (*Node) Leave

func (node *Node) Leave()

func (*Node) ListFileInDirRequest

func (node *Node) ListFileInDirRequest(sdfsDir string) []string

func (*Node) ListFilesWithPrefixRequest

func (node *Node) ListFilesWithPrefixRequest(prefix string) []string

func (*Node) LostNode

func (node *Node) LostNode(id int, lose_heartbeat bool)

func (*Node) MonitorInputPacket

func (node *Node) MonitorInputPacket()

func (*Node) PartitionFiles

func (node *Node) PartitionFiles(files []string, numWorkers int, partitionMethod string) map[int][]string

func (*Node) PutFileRequest

func (node *Node) PutFileRequest(args *PutFileArgs, result *RPCResultType) error

func (*Node) RegisterFileService

func (node *Node) RegisterFileService(address string) error

func (*Node) RegisterMapleJuiceService

func (node *Node) RegisterMapleJuiceService(address string, mjService *MapleJuiceService) error

func (*Node) RegisterRPCMapleJuiceService

func (node *Node) RegisterRPCMapleJuiceService()

func (*Node) ScanIntroducer

func (node *Node) ScanIntroducer(addresses []string) (string, bool)

func (*Node) SendFileIfNecessary

func (node *Node) SendFileIfNecessary(info FileInfo, targetRPCAddr []string)

func (*Node) SendHeartbeat

func (node *Node) SendHeartbeat()

func (*Node) SendHeartbeatRoutine

func (node *Node) SendHeartbeatRoutine()

func (*Node) SetFileDir

func (node *Node) SetFileDir(dir string)

func (*Node) StartMapleJuiceTask

func (node *Node) StartMapleJuiceTask(des *TaskDescription) error

func (*Node) StartRPCService

func (node *Node) StartRPCService()

func (*Node) StartTCPService

func (node *Node) StartTCPService()

func (*Node) StoreFileToLocal

func (node *Node) StoreFileToLocal(args *StoreFileArgs) error

func (*Node) TransferOwnership

func (node *Node) TransferOwnership(newMasterId int)

func (*Node) UpdateHostname

func (node *Node) UpdateHostname(name string)

func (*Node) WriteMaplePairToLocal

func (node *Node) WriteMaplePairToLocal(dir, prefix string, kvpair map[string]string)

type Packet

type Packet struct {
	Action   ActionType
	Id       int
	Hostname string
	IP       string
	Port     string
	RPC_Port string
	Map      *MemberList
}

type Pair

type Pair struct {
	Address string
	Ts      int // Timestamp
}

type PutFileArgs

type PutFileArgs struct {
	LocalName   string
	SdfsName    string
	ForceUpdate bool
	Appending   bool
	Tmp         bool
}

type RPCResultType

type RPCResultType int8
const (
	RPC_SUCCESS    RPCResultType = 1 << 0
	RPC_DUMMY      RPCResultType = 1 << 1
	RPC_FAIL       RPCResultType = 1 << 2
	RPC_PROMPT     RPCResultType = 1 << 3
	FILES_ROOT_DIR               = "/apps/files"
)

type StatusType

type StatusType int8

type StoreFileArgs

type StoreFileArgs struct {
	MasterNodeId int
	SdfsName     string
	Ts           int
	Content      []byte
	Appending    bool
	Tmp          bool
}

func ParsePutArgs

func ParsePutArgs(lines []string) (*StoreFileArgs, error)

type TaskDescription

type TaskDescription struct {
	TaskType        MapleJuiceTaskType
	TaskID          string
	ExeFile         string
	InputFiles      []string
	OutputPath      string   // prefix for maple
	MasterAddresses []string // includes backup master
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL