dag_go

package module
v0.0.0-...-058d1e0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2022 License: Apache-2.0 Imports: 13 Imported by: 0

README

dag-go

Go Reference Build Status CodeFactor

Documentation

Index

Constants

View Source
const (
	Create createEdgeErrorType = iota
	Exist
	Fault
)
View Source
const (
	Start runningStatus = iota
	Preflight
	PreflightFailed
	InFlight
	InFlightFailed
	PostFlight
	PostFlightFailed
	FlightEnd
	Failed
	Succeed
)

The status displayed when running the runner on each node.

View Source
const (
	StartNode = "start_node"
	EndNode   = "end_node"
)
View Source
const (
	Max           int = 100
	Min           int = 1
	StatusDefault int = 3
)

channel buffer size

Variables

View Source
var Log = logrus.New()

Functions

func CreateCustomImage

func CreateCustomImage(n *Node) *string

Types

type Container

type Container struct {
	Context   context.Context
	BaseImage string
}

func Connect

func Connect() *Container

func (*Container) CreateImage

func (c *Container) CreateImage(a interface{}, healthChecker string) error

CreateImage TODO healthCheckr 의 empty 검사만 하지만 실제로 healthchecker.sh 가 있는지 파악하는 구문 들어갈지 생각하자. 각 노드의 이미지 를만들어 줌.

func (*Container) RunE

func (c *Container) RunE(a interface{}) (int, error)

RunE 8 is 'None' -> check podbridge config.go

func (*Container) RunET

func (c *Container) RunET(a interface{}) (pbr.ContainerStatus, error)

type Dag

type Dag struct {
	Pid   string
	Id    string
	Edges []*Edge

	StartNode *Node
	EndNode   *Node

	RunningStatus chan *printStatus

	// timeout
	Timeout time.Duration

	ContainerCmd Runnable
	// contains filtered or unexported fields
}

Dag (Directed Acyclic Graph) is an acyclic graph, not a cyclic graph. In other words, there is no cyclic cycle in the DAG algorithm, and it has only one direction.

func CopyDag

func CopyDag(original *Dag, Id string) (copied *Dag)

CopyDag dag 를 복사함.

func NewDag

func NewDag() *Dag

NewDag creates a pointer to the Dag structure. One channel must be put in the start node. Enter this channel value in the start function. And this channel is not included in Edge. TODO 파라미터 nil 허용해주도록 바꿔줄지 생각함.

func XmlParser

func XmlParser(d []byte) (context.Context, bool, *Dag)

func (*Dag) AddCommand

func (dag *Dag) AddCommand(id, cmd string) (node *Node)

AddCommand add command to node.

func (*Dag) AddEdge

func (dag *Dag) AddEdge(from, to string) error

AddEdge error log 는 일단 여기서만 작성

func (*Dag) AddNodeToStartNode

func (dag *Dag) AddNodeToStartNode(to *Node) error

AddNodeToStartNode check TODO 확인하자.

func (*Dag) ConnectRunner

func (dag *Dag) ConnectRunner() bool

func (*Dag) CreateImage

func (dag *Dag) CreateImage(healthChecker string)

func (*Dag) CreateImageT

func (dag *Dag) CreateImageT(ctx context.Context, healthChecker string)

CreateImageT 이건 컨테이너 전용- 이미지 생성할때 고루틴 돌리니 에러 발생.. TODO check ContainerCmd

func (*Dag) DisableTimeout

func (dag *Dag) DisableTimeout()

DisableTimeout commit by seoy

func (*Dag) FinishDag

func (dag *Dag) FinishDag() error

FinishDag finally, connect end_node to dag

func (*Dag) GetReady

func (dag *Dag) GetReady(ctx context.Context) bool

func (*Dag) GetReadyT

func (dag *Dag) GetReadyT(ctx context.Context) bool

GetReadyT TODO 맨 마지막에 end 넣는 것은 생각해보자. 굳이 필요 없는 것 같긴하다.

func (*Dag) SetContainerCmd

func (dag *Dag) SetContainerCmd(r Runnable)

func (*Dag) SetTimeout

func (dag *Dag) SetTimeout(d time.Duration)

SetTimeout commit by seoy

func (*Dag) Start

func (dag *Dag) Start() bool

Start start_node has one vertex. That is, it has only one channel and this channel is not included in the edge. It is started by sending a value to this channel when starting the dag's operation.

func (*Dag) Wait

func (dag *Dag) Wait(ctx context.Context) bool

Wait waits until all channels are closed. RunningStatus channel has multiple senders and one receiver Closing a channel on a receiver violates the general channel close principle. However, when Wait terminates, it seems safe to close the channel here because all tasks are finished.

type Edge

type Edge struct {
	// contains filtered or unexported fields
}

Edge is a channel. It has the same meaning as the connecting line connecting the parent and child nodes.

func CopyEdge

func CopyEdge(original []*Edge) (copied []*Edge)

type ErrorType

type ErrorType int
const (
	AddEdge ErrorType = iota
	AddEdgeFromXmlNode
	AddEdgeFromXmlNodeToStartNode
	AddEdgeFromXmlNodeToEndNode
)

type Node

type Node struct {
	Id string
	// 컨테이너 빌드를 위한 from 이미지.
	ImageName string
	// TODO 이름 추후 수정하자.
	RunCommand Runnable

	Commands string
	// contains filtered or unexported fields
}

func (*Node) Execute

func (n *Node) Execute() (r int, err error)

Execute 이것을 작성하면 된다.

type Pipeline

type Pipeline struct {
	Id   string
	Dags []*Dag

	ContainerCmd Runnable
}

func NewPipeline

func NewPipeline() *Pipeline

NewPipeline 파이프라인은 dag NewPipeline와 데이터를 연계해야 하는데 데이터의 경우는 다른 xml 처리하는 것이 바람직할 것이다. 외부에서 데이터를 가지고 올 경우, ftp 나 scp 나 기타 다른 프롤토콜을 사용할 경우도 생각을 해야 한다.

func (*Pipeline) NewDags

func (pipe *Pipeline) NewDags(ds int, original *Dag) *Pipeline

NewDags 파이프라인과 dag 의 차이점은 데이터의 차이이다. 즉, 같은 dag 이지만 데이터가 다를 수 있다. 파이프라인에서 데이터 연계가 일어난다. 하지만 데이터 관련 datakit 이 아직 만들어 지지 않았기 때문에 입력파라미터로 dag 수를 지정한다. 이부분에서 두가지를 생각할 수 있다. dag 하나를 받아들여서 늘리는 방향과 dag 는 하나이고 데이터만큼만 어떠한 방식으로 진행하는 것이다. 전자가 쉽게 생각할 수 있지만 메모리 낭비 가있다. 일단 전자로 개발한다. 후자는 아직 아이디어가 없다. TODO 데이터와 관련해서 추가 해서 수정해줘야 한다. 추후 안정화 되면 panic 은 error 로 교체한다.

func (*Pipeline) ReStart

func (pipe *Pipeline) ReStart(ctx context.Context, dag *Dag)

ReStart 개발 중

func (*Pipeline) SetContainerCmd

func (pipe *Pipeline) SetContainerCmd(r Runnable) error

func (*Pipeline) Start

func (pipe *Pipeline) Start(ctx context.Context)

Start TODO 모든 dag 들을 실행 시킬 수 있어야 한다. 수정해줘야 한다

func (*Pipeline) Stop

func (pipe *Pipeline) Stop(ctx context.Context, dag *Dag)

Stop 개발 중

type Runnable

type Runnable interface {
	RunE(a interface{}) (int, error)
	CreateImage(a interface{}, healthChecker string) error
}

Notes

Bugs

  • 이상 현상 발생 detectCycle runningStatus 과 start, restart, pause, stop 등 추가 후 버그 개선

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL