server

package
v0.0.0-...-e5597af Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2024 License: MIT Imports: 53 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MetaFileName         = "wandb-metadata.json"
	SummaryFileName      = "wandb-summary.json"
	OutputFileName       = "output.log"
	DiffFileName         = "diff.patch"
	RequirementsFileName = "requirements.txt"
	ConfigFileName       = "config.yaml"
)
View Source
const BufferSize = 32
View Source
const (
	// RFC3339Micro Modified from time.RFC3339Nano
	RFC3339Micro = "2006-01-02T15:04:05.000000Z07:00"
)

Variables

This section is empty.

Functions

func NewBackend

func NewBackend(
	logger *observability.CoreLogger,
	settings *settings.Settings,
) *api.Backend

NewBackend returns a Backend or nil if we're offline.

func NewFileStream

func NewFileStream(
	backend *api.Backend,
	logger *observability.CoreLogger,
	settings *settings.Settings,
	peeker api.Peeker,
) filestream.FileStream

func NewFileTransferManager

func NewFileTransferManager(
	fileTransferStats filetransfer.FileTransferStats,
	logger *observability.CoreLogger,
	settings *settings.Settings,
) filetransfer.FileTransferManager

func NewGraphQLClient

func NewGraphQLClient(
	backend *api.Backend,
	settings *settings.Settings,
	peeker *observability.Peeker,
) graphql.Client

func NewRunfilesUploader

func NewRunfilesUploader(
	ctx context.Context,
	logger *observability.CoreLogger,
	settings *settings.Settings,
	fileStream filestream.FileStream,
	fileTransfer filetransfer.FileTransferManager,
	graphQL graphql.Client,
) runfiles.Uploader

Types

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection is the connection for a stream. It is a wrapper around the underlying connection It handles the incoming messages from the client and passes them to the stream

func NewConnection

func NewConnection(
	ctx context.Context,
	conn net.Conn,
	teardown chan struct{},
) *Connection

NewConnection creates a new connection

func (*Connection) Close

func (nc *Connection) Close()

Close closes the connection

func (*Connection) HandleConnection

func (nc *Connection) HandleConnection()

HandleConnection handles the connection by reading from the connection and passing the messages to the stream and writing messages from the stream to the connection

func (*Connection) Respond

func (nc *Connection) Respond(resp *service.ServerResponse)

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

func NewDispatcher

func NewDispatcher(logger *observability.CoreLogger) *Dispatcher

func (*Dispatcher) AddResponders

func (d *Dispatcher) AddResponders(entries ...ResponderEntry)

AddResponders adds the given responders to the stream's dispatcher.

type Git

type Git struct {
	// contains filtered or unexported fields
}

func NewGit

func NewGit(path string, logger *observability.CoreLogger) *Git

func (*Git) IsAvailable

func (g *Git) IsAvailable() bool

func (*Git) LatestCommit

func (g *Git) LatestCommit(ref string) (string, error)

func (*Git) SavePatch

func (g *Git) SavePatch(ref, output string) error

SavePatch saves a patch file of the diff between the current working tree and the given ref. Returns an error if the operation fails, or if no diff is found.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler is the handler for a stream it handles the incoming messages, processes them and passes them to the writer

func NewHandler

func NewHandler(
	ctx context.Context,
	params *HandlerParams,
) *Handler

NewHandler creates a new handler

func (*Handler) Close

func (h *Handler) Close()

func (*Handler) Do

func (h *Handler) Do(inChan <-chan *service.Record)

Do starts the handler

func (*Handler) GetRun

func (h *Handler) GetRun() *service.RunRecord

type HandlerParams

type HandlerParams struct {
	Settings          *service.Settings
	FwdChan           chan *service.Record
	OutChan           chan *service.Result
	Logger            *observability.CoreLogger
	Mailbox           *mailbox.Mailbox
	RunSummary        *runsummary.RunSummary
	MetricHandler     *MetricHandler
	FileTransferStats filetransfer.FileTransferStats
	RunfilesUploader  runfiles.Uploader
	TBHandler         *TBHandler
	SystemMonitor     *monitor.SystemMonitor
}
type Header struct {
	Magic      uint8
	DataLength uint32
}

type HeaderOptions

type HeaderOptions struct {
	IDENT   [4]byte
	Magic   uint16
	Version byte
}

func NewHeader

func NewHeader() *HeaderOptions

NewHeader returns a new header with default values.

func (*HeaderOptions) MarshalBinary

func (o *HeaderOptions) MarshalBinary(w io.Writer) error

MarshalBinary encodes the header to binary format.

func (*HeaderOptions) UnmarshalBinary

func (o *HeaderOptions) UnmarshalBinary(r io.Reader) error

UnmarshalBinary decodes binary data into the header.

func (*HeaderOptions) Valid

func (o *HeaderOptions) Valid() bool

Valid checks if the header is valid based on a reference header.

type MetricHandler

type MetricHandler struct {
	// contains filtered or unexported fields
}

func NewMetricHandler

func NewMetricHandler() *MetricHandler

type MetricSender

type MetricSender struct {
	// contains filtered or unexported fields
}

func NewMetricSender

func NewMetricSender() *MetricSender

type Responder

type Responder interface {
	Respond(response *service.ServerResponse)
}

type ResponderEntry

type ResponderEntry struct {
	Responder Responder
	ID        string
}

type Sender

type Sender struct {

	// RunRecord is the run record
	// TODO: remove this and use properly updated settings
	//       + a flag indicating whether the run has started
	RunRecord *service.RunRecord
	// contains filtered or unexported fields
}

Sender is the sender for a stream it handles the incoming messages and sends to the server or/and to the dispatcher/handler

func NewSender

func NewSender(
	ctx context.Context,
	cancel context.CancelFunc,
	params *SenderParams,
) *Sender

NewSender creates a new Sender with the given settings

func (*Sender) Close

func (s *Sender) Close()

func (*Sender) Do

func (s *Sender) Do(inChan <-chan *service.Record)

do sending of messages to the server

func (*Sender) SendRecord

func (s *Sender) SendRecord(record *service.Record)

type SenderParams

type SenderParams struct {
	Logger              *observability.CoreLogger
	Settings            *service.Settings
	Backend             *api.Backend
	FileStream          fs.FileStream
	FileTransferManager filetransfer.FileTransferManager
	RunfilesUploader    runfiles.Uploader
	GraphqlClient       graphql.Client
	Peeker              *observability.Peeker
	RunSummary          *runsummary.RunSummary
	Mailbox             *mailbox.Mailbox
	OutChan             chan *service.Result
	FwdChan             chan *service.Record
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the core server

func NewServer

func NewServer(ctx context.Context, addr string, portFile string) (*Server, error)

NewServer creates a new server

func (*Server) Close

func (s *Server) Close()

Close closes the server

func (*Server) Serve

func (s *Server) Serve()

Serve serves the server

func (*Server) SetDefaultLoggerPath

func (s *Server) SetDefaultLoggerPath(path string)

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store is the persistent store for a stream

func NewStore

func NewStore(ctx context.Context, fileName string, logger *observability.CoreLogger) *Store

NewStore creates a new store

func (*Store) Close

func (sr *Store) Close() error

Close closes the store

func (*Store) Open

func (sr *Store) Open(flag int) error

Open opens the store

func (*Store) Read

func (sr *Store) Read() (*service.Record, error)

func (*Store) Write

func (sr *Store) Write(msg *service.Record) error

func (*Store) WriteDirectlyToDB

func (sr *Store) WriteDirectlyToDB(data []byte) (int, error)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream is a collection of components that work together to handle incoming data for a W&B run, store it locally, and send it to a W&B server. Stream.handler receives incoming data from the client and dispatches it to Stream.writer, which writes it to a local file. Stream.writer then sends the data to Stream.sender, which sends it to the W&B server. Stream.dispatcher handles dispatching responses to the appropriate client responders.

func NewStream

func NewStream(ctx context.Context, settings *settings.Settings, _ string) *Stream

NewStream creates a new stream with the given settings and responders.

func (*Stream) AddResponders

func (s *Stream) AddResponders(entries ...ResponderEntry)

AddResponders adds the given responders to the stream's dispatcher.

func (*Stream) Close

func (s *Stream) Close()

Close Gracefully wait for handler, writer, sender, dispatcher to shut down cleanly assumes an exit record has already been sent

func (*Stream) FinishAndClose

func (s *Stream) FinishAndClose(exitCode int32)

func (*Stream) GetRun

func (s *Stream) GetRun() *service.RunRecord

func (*Stream) HandleRecord

func (s *Stream) HandleRecord(rec *service.Record)

HandleRecord handles the given record by sending it to the stream's handler.

func (*Stream) PrintFooter

func (s *Stream) PrintFooter()

func (*Stream) Respond

func (s *Stream) Respond(resp *service.ServerResponse)

Respond Handle internal responses like from the finish and close path

func (*Stream) Start

func (s *Stream) Start()

Start starts the stream's handler, writer, sender, and dispatcher. We use Stream's wait group to ensure that all of these components are cleanly finalized and closed when the stream is closed in Stream.Close().

type StreamMux

type StreamMux struct {
	// contains filtered or unexported fields
}

StreamMux is a multiplexer for streams. It is thread-safe and is used to ensure that only one stream exists for a given streamId so that we can safely add responders to streams.

func NewStreamMux

func NewStreamMux() *StreamMux

NewStreamMux creates a new stream mux.

func (*StreamMux) AddStream

func (sm *StreamMux) AddStream(streamId string, stream *Stream) error

AddStream adds a stream to the mux if it doesn't already exist.

func (*StreamMux) FinishAndCloseAllStreams

func (sm *StreamMux) FinishAndCloseAllStreams(exitCode int32)

FinishAndCloseAllStreams closes all streams in the mux.

func (*StreamMux) GetStream

func (sm *StreamMux) GetStream(streamId string) (*Stream, error)

GetStream gets a stream from the mux.

func (*StreamMux) RemoveStream

func (sm *StreamMux) RemoveStream(streamId string) (*Stream, error)

RemoveStream removes a stream from the mux.

type SyncService

type SyncService struct {
	// contains filtered or unexported fields
}

func NewSyncService

func NewSyncService(ctx context.Context, opts ...SyncServiceOption) *SyncService

func (*SyncService) Close

func (s *SyncService) Close()

func (*SyncService) Flush

func (s *SyncService) Flush()

func (*SyncService) Start

func (s *SyncService) Start()

func (*SyncService) SyncRecord

func (s *SyncService) SyncRecord(record *service.Record, err error)

type SyncServiceOption

type SyncServiceOption func(*SyncService)

func WithSyncServiceFlushCallback

func WithSyncServiceFlushCallback(syncResultCallback func(error)) SyncServiceOption

func WithSyncServiceLogger

func WithSyncServiceLogger(logger *observability.CoreLogger) SyncServiceOption

func WithSyncServiceOverwrite

func WithSyncServiceOverwrite(overwrite *service.SyncOverwrite) SyncServiceOption

func WithSyncServiceSenderFunc

func WithSyncServiceSenderFunc(senderFunc func(*service.Record)) SyncServiceOption

func WithSyncServiceSkip

func WithSyncServiceSkip(skip *service.SyncSkip) SyncServiceOption

type TBHandler

type TBHandler struct {
	Active bool
	// contains filtered or unexported fields
}

func NewTBHandler

func NewTBHandler(
	watcher *watcher.Watcher,
	logger *observability.CoreLogger,
	settings *service.Settings,
	outChan chan *service.Record,
) *TBHandler

func (*TBHandler) Close

func (tb *TBHandler) Close()

func (*TBHandler) Handle

func (tb *TBHandler) Handle(record *service.Record) error

type Tokenizer

type Tokenizer struct {
	// contains filtered or unexported fields
}

func (*Tokenizer) Split

func (x *Tokenizer) Split(data []byte, _ bool) (advance int, token []byte, err error)

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer is responsible for writing messages to the append-only log. It receives messages from the handler, processes them, if the message is to be persisted it writes them to the log. It also sends the messages to the sender.

func NewWriter

func NewWriter(ctx context.Context, params *WriterParams) *Writer

NewWriter returns a new Writer

func (*Writer) Close

func (w *Writer) Close()

Close closes the writer and all its resources which includes the store

func (*Writer) Do

func (w *Writer) Do(inChan <-chan *service.Record)

Do is the main loop of the writer to process incoming messages

type WriterOption

type WriterOption func(*Writer)

func WithWriterFwdChannel

func WithWriterFwdChannel(fwd chan *service.Record) WriterOption

func WithWriterSettings

func WithWriterSettings(settings *service.Settings) WriterOption

type WriterParams

type WriterParams struct {
	Logger   *observability.CoreLogger
	Settings *service.Settings
	FwdChan  chan *service.Record
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL