server

package
v0.0.0-...-99c2a2b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2023 License: MIT Imports: 42 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MetaFilename = "wandb-metadata.json"
	// RFC3339Micro Modified from time.RFC3339Nano
	RFC3339Micro = "2006-01-02T15:04:05.000000Z07:00"
)
View Source
const BufferSize = 32

Variables

This section is empty.

Functions

func LogError

func LogError(log *slog.Logger, msg string, err error)

func SetupDefaultLogger

func SetupDefaultLogger() *slog.Logger

func SetupStreamLogger

func SetupStreamLogger(name string, settings *service.Settings) *observability.NexusLogger

Types

type ActiveHistory

type ActiveHistory struct {
	// contains filtered or unexported fields
}

func NewActiveHistory

func NewActiveHistory(opts ...ActiveHistoryOptions) *ActiveHistory

func (*ActiveHistory) Clear

func (ah *ActiveHistory) Clear()

func (*ActiveHistory) Flush

func (ah *ActiveHistory) Flush()

func (*ActiveHistory) GetItem

func (ah *ActiveHistory) GetItem(key string) (*service.HistoryItem, bool)

func (*ActiveHistory) GetStep

func (ah *ActiveHistory) GetStep() *service.HistoryStep

func (*ActiveHistory) GetValues

func (ah *ActiveHistory) GetValues() []*service.HistoryItem

func (*ActiveHistory) UpdateStep

func (ah *ActiveHistory) UpdateStep(step int64)

func (*ActiveHistory) UpdateValues

func (ah *ActiveHistory) UpdateValues(values []*service.HistoryItem)

type ActiveHistoryOptions

type ActiveHistoryOptions func(ac *ActiveHistory)

func WithFlush

func WithFlush(flush func(*service.HistoryStep, []*service.HistoryItem)) ActiveHistoryOptions

func WithStep

func WithStep(step int64) ActiveHistoryOptions

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection is the connection for a stream. It is a wrapper around the underlying connection It handles the incoming messages from the client and passes them to the stream

func NewConnection

func NewConnection(
	ctx context.Context,
	conn net.Conn,
	teardown chan struct{},
) *Connection

NewConnection creates a new connection

func (*Connection) Close

func (nc *Connection) Close()

Close closes the connection

func (*Connection) HandleConnection

func (nc *Connection) HandleConnection()

HandleConnection handles the connection by reading from the connection and passing the messages to the stream and writing messages from the stream to the connection

func (*Connection) Respond

func (nc *Connection) Respond(resp *service.ServerResponse)

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

func NewDispatcher

func NewDispatcher(logger *observability.NexusLogger) *Dispatcher

func (*Dispatcher) AddResponders

func (d *Dispatcher) AddResponders(entries ...ResponderEntry)

AddResponders adds the given responders to the stream's dispatcher.

type FileHandler

type FileHandler struct {
	// contains filtered or unexported fields
}

func NewFileHandler

func NewFileHandler(logger *observability.NexusLogger, watcherOutChan chan *service.Record) *FileHandler

func (*FileHandler) Close

func (fh *FileHandler) Close()

Close closes the file handler and the watcher

func (*FileHandler) Final

func (fh *FileHandler) Final() *service.Record

Final returns the stored record to be uploaded at the end of the run (DeferRequest_FLUSH_DIR)

func (*FileHandler) Handle

func (fh *FileHandler) Handle(record *service.Record) *service.Record

Handle handles file uploads preprocessing, depending on their policies: - NOW: upload immediately - END: upload at the end of the run - LIVE: upload immediately, on changes, and at the end of the run

func (*FileHandler) Start

func (fh *FileHandler) Start()

Start starts the file handler and the watcher

type FileTransferHandler

type FileTransferHandler struct {
	DedupedBytes int64
	// contains filtered or unexported fields
}

func NewFileTransferHandler

func NewFileTransferHandler() *FileTransferHandler

func (*FileTransferHandler) GetDedupedBytes

func (fth *FileTransferHandler) GetDedupedBytes() int64

func (*FileTransferHandler) GetFileCounts

func (fth *FileTransferHandler) GetFileCounts() *service.FileCounts

TODO: with the new rust client we want to get rid of this

func (*FileTransferHandler) GetTotalBytes

func (fth *FileTransferHandler) GetTotalBytes() int64

func (*FileTransferHandler) GetUploadedBytes

func (fth *FileTransferHandler) GetUploadedBytes() int64

func (*FileTransferHandler) Handle

func (*FileTransferHandler) IsDone

func (fth *FileTransferHandler) IsDone() bool

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler is the handler for a stream it handles the incoming messages, processes them and passes them to the writer

func NewHandler

func NewHandler(
	ctx context.Context,
	settings *service.Settings,
	logger *observability.NexusLogger,
) *Handler

NewHandler creates a new handler

func (*Handler) Close

func (h *Handler) Close()

func (*Handler) DisableSummaryDebouncer

func (h *Handler) DisableSummaryDebouncer()

func (*Handler) GetRun

func (h *Handler) GetRun() *service.RunRecord

func (*Handler) Handle

func (h *Handler) Handle()

Handle starts the handler

func (*Handler) SetInboundChannels

func (h *Handler) SetInboundChannels(in <-chan *service.Record, lb chan *service.Record)

func (*Handler) SetOutboundChannels

func (h *Handler) SetOutboundChannels(fwd chan *service.Record, out chan *service.Result)

type HandlerInterface

type HandlerInterface interface {
	SetInboundChannels(in <-chan *service.Record, lb chan *service.Record)
	SetOutboundChannels(fwd chan *service.Record, out chan *service.Result)
	Handle()
	Close()
	GetRun() *service.RunRecord
}
type Header struct {
	Magic      uint8
	DataLength uint32
}

type Item

type Item[T comparable] struct {
	// contains filtered or unexported fields
}

func (Item[T]) String

func (item Item[T]) String() string

type MetricHandler

type MetricHandler struct {
	// contains filtered or unexported fields
}

func NewMetricHandler

func NewMetricHandler() *MetricHandler

type MetricSender

type MetricSender struct {
	// contains filtered or unexported fields
}

func NewMetricSender

func NewMetricSender() *MetricSender

type PriorityQueue

type PriorityQueue[T comparable] []*Item[T]

func (PriorityQueue[T]) Len

func (pq PriorityQueue[T]) Len() int

func (PriorityQueue[T]) Less

func (pq PriorityQueue[T]) Less(i, j int) bool

func (*PriorityQueue[T]) Pop

func (pq *PriorityQueue[T]) Pop() interface{}

func (*PriorityQueue[T]) Push

func (pq *PriorityQueue[T]) Push(x interface{})

func (PriorityQueue[T]) Swap

func (pq PriorityQueue[T]) Swap(i, j int)

type ReservoirSampling

type ReservoirSampling[T comparable] struct {
	// contains filtered or unexported fields
}

func (*ReservoirSampling[T]) Add

func (r *ReservoirSampling[T]) Add(value T)

func (*ReservoirSampling[T]) GetSample

func (r *ReservoirSampling[T]) GetSample() []T

type Responder

type Responder interface {
	Respond(response *service.ServerResponse)
}

type ResponderEntry

type ResponderEntry struct {
	Responder Responder
	ID        string
}

type ResumeState

type ResumeState struct {
	FileStreamOffset fs.FileStreamOffsetMap
}

func NewResumeState

func NewResumeState() *ResumeState

func (*ResumeState) AddOffset

func (r *ResumeState) AddOffset(key fs.ChunkTypeEnum, offset int)

func (*ResumeState) GetFileStreamOffset

func (r *ResumeState) GetFileStreamOffset() fs.FileStreamOffsetMap

type Sender

type Sender struct {

	// RunRecord is the run record
	RunRecord *service.RunRecord
	// contains filtered or unexported fields
}

Sender is the sender for a stream it handles the incoming messages and sends to the server or/and to the dispatcher/handler

func NewSender

func NewSender(ctx context.Context, settings *service.Settings, logger *observability.NexusLogger, loopbackChan chan *service.Record) *Sender

NewSender creates a new Sender with the given settings

func (*Sender) Close

func (s *Sender) Close()

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the nexus server

func NewServer

func NewServer(ctx context.Context, addr string, portFile string) *Server

NewServer creates a new server

func (*Server) Close

func (s *Server) Close()

Close closes the server

func (*Server) Serve

func (s *Server) Serve()

Serve serves the server

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store is the persistent store for a stream

func NewStore

func NewStore(ctx context.Context, fileName string, logger *observability.NexusLogger) (*Store, error)

NewStore creates a new store

func (*Store) Close

func (sr *Store) Close() error

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream is a collection of components that work together to handle incoming data for a W&B run, store it locally, and send it to a W&B server. Stream.handler receives incoming data from the client and dispatches it to Stream.writer, which writes it to a local file. Stream.writer then sends the data to Stream.sender, which sends it to the W&B server. Stream.dispatcher handles dispatching responses to the appropriate client responders.

func NewStream

func NewStream(ctx context.Context, settings *service.Settings, streamId string) *Stream

NewStream creates a new stream with the given settings and responders.

func (*Stream) AddResponders

func (s *Stream) AddResponders(entries ...ResponderEntry)

AddResponders adds the given responders to the stream's dispatcher.

func (*Stream) Close

func (s *Stream) Close()

Close Gracefully wait for handler, writer, sender, dispatcher to shut down cleanly assumes an exit record has already been sent

func (*Stream) FinishAndClose

func (s *Stream) FinishAndClose(exitCode int32)

func (*Stream) GetRun

func (s *Stream) GetRun() *service.RunRecord

func (*Stream) HandleRecord

func (s *Stream) HandleRecord(rec *service.Record)

HandleRecord handles the given record by sending it to the stream's handler.

func (*Stream) PrintFooter

func (s *Stream) PrintFooter()

func (*Stream) Respond

func (s *Stream) Respond(resp *service.ServerResponse)

Respond Handle internal responses like from the finish and close path

func (*Stream) SetHandler

func (s *Stream) SetHandler(handler HandlerInterface)

func (*Stream) Start

func (s *Stream) Start()

Start starts the stream's handler, writer, sender, and dispatcher. We use Stream's wait group to ensure that all of these components are cleanly finalized and closed when the stream is closed in Stream.Close().

type StreamMux

type StreamMux struct {
	// contains filtered or unexported fields
}

StreamMux is a multiplexer for streams. It is thread-safe and is used to ensure that only one stream exists for a given streamId so that we can safely add responders to streams.

func NewStreamMux

func NewStreamMux() *StreamMux

NewStreamMux creates a new stream mux.

func (*StreamMux) AddStream

func (sm *StreamMux) AddStream(streamId string, stream *Stream) error

AddStream adds a stream to the mux if it doesn't already exist.

func (*StreamMux) FinishAndCloseAllStreams

func (sm *StreamMux) FinishAndCloseAllStreams(exitCode int32)

FinishAndCloseAllStreams closes all streams in the mux.

func (*StreamMux) GetStream

func (sm *StreamMux) GetStream(streamId string) (*Stream, error)

GetStream gets a stream from the mux.

func (*StreamMux) RemoveStream

func (sm *StreamMux) RemoveStream(streamId string) (*Stream, error)

RemoveStream removes a stream from the mux.

type Timer

type Timer struct {
	// contains filtered or unexported fields
}

Timer is used to track the run start and execution times

func (*Timer) Elapsed

func (t *Timer) Elapsed() time.Duration

func (*Timer) GetStartTimeMicro

func (t *Timer) GetStartTimeMicro() float64

func (*Timer) Pause

func (t *Timer) Pause()

func (*Timer) Resume

func (t *Timer) Resume()

func (*Timer) Start

func (t *Timer) Start(startTime *time.Time)

type Tokenizer

type Tokenizer struct {
	// contains filtered or unexported fields
}

func (*Tokenizer) Split

func (x *Tokenizer) Split(data []byte, _ bool) (advance int, token []byte, err error)

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer is responsible for writing messages to the append-only log. It receives messages from the handler, processes them, if the message is to be persisted it writes them to the log. It also sends the messages to the sender.

func NewWriter

func NewWriter(ctx context.Context, settings *service.Settings, logger *observability.NexusLogger) *Writer

NewWriter returns a new Writer

func (*Writer) Close

func (w *Writer) Close()

Close closes the writer and all its resources which includes the store

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL