Documentation ¶
Index ¶
- Constants
- func LogError(log *slog.Logger, msg string, err error)
- func SetupDefaultLogger() *slog.Logger
- func SetupStreamLogger(name string, settings *service.Settings) *observability.NexusLogger
- type ActiveHistory
- func (ah *ActiveHistory) Clear()
- func (ah *ActiveHistory) Flush()
- func (ah *ActiveHistory) GetItem(key string) (*service.HistoryItem, bool)
- func (ah *ActiveHistory) GetStep() *service.HistoryStep
- func (ah *ActiveHistory) GetValues() []*service.HistoryItem
- func (ah *ActiveHistory) UpdateStep(step int64)
- func (ah *ActiveHistory) UpdateValues(values []*service.HistoryItem)
- type ActiveHistoryOptions
- type Bucket
- type Connection
- type Dispatcher
- type FileHandler
- type FileTransferHandler
- func (fth *FileTransferHandler) GetDedupedBytes() int64
- func (fth *FileTransferHandler) GetFileCounts() *service.FileCounts
- func (fth *FileTransferHandler) GetTotalBytes() int64
- func (fth *FileTransferHandler) GetUploadedBytes() int64
- func (fth *FileTransferHandler) Handle(record *service.FileTransferInfoRequest)
- func (fth *FileTransferHandler) IsDone() bool
- type Handler
- func (h *Handler) Close()
- func (h *Handler) DisableSummaryDebouncer()
- func (h *Handler) GetRun() *service.RunRecord
- func (h *Handler) Handle()
- func (h *Handler) SetInboundChannels(in <-chan *service.Record, lb chan *service.Record)
- func (h *Handler) SetOutboundChannels(fwd chan *service.Record, out chan *service.Result)
- type HandlerInterface
- type Header
- type Item
- type MetricHandler
- type MetricSender
- type PriorityQueue
- type ReservoirSampling
- type Responder
- type ResponderEntry
- type ResumeState
- type Sender
- type Server
- type Store
- type Stream
- func (s *Stream) AddResponders(entries ...ResponderEntry)
- func (s *Stream) Close()
- func (s *Stream) FinishAndClose(exitCode int32)
- func (s *Stream) GetRun() *service.RunRecord
- func (s *Stream) HandleRecord(rec *service.Record)
- func (s *Stream) PrintFooter()
- func (s *Stream) Respond(resp *service.ServerResponse)
- func (s *Stream) SetHandler(handler HandlerInterface)
- func (s *Stream) Start()
- type StreamMux
- type Timer
- type Tokenizer
- type Writer
Constants ¶
const ( MetaFilename = "wandb-metadata.json" // RFC3339Micro Modified from time.RFC3339Nano RFC3339Micro = "2006-01-02T15:04:05.000000Z07:00" )
const BufferSize = 32
Variables ¶
This section is empty.
Functions ¶
func SetupDefaultLogger ¶
func SetupStreamLogger ¶
func SetupStreamLogger(name string, settings *service.Settings) *observability.NexusLogger
Types ¶
type ActiveHistory ¶
type ActiveHistory struct {
// contains filtered or unexported fields
}
func NewActiveHistory ¶
func NewActiveHistory(opts ...ActiveHistoryOptions) *ActiveHistory
func (*ActiveHistory) Clear ¶
func (ah *ActiveHistory) Clear()
func (*ActiveHistory) Flush ¶
func (ah *ActiveHistory) Flush()
func (*ActiveHistory) GetItem ¶
func (ah *ActiveHistory) GetItem(key string) (*service.HistoryItem, bool)
func (*ActiveHistory) GetStep ¶
func (ah *ActiveHistory) GetStep() *service.HistoryStep
func (*ActiveHistory) GetValues ¶
func (ah *ActiveHistory) GetValues() []*service.HistoryItem
func (*ActiveHistory) UpdateStep ¶
func (ah *ActiveHistory) UpdateStep(step int64)
func (*ActiveHistory) UpdateValues ¶
func (ah *ActiveHistory) UpdateValues(values []*service.HistoryItem)
type ActiveHistoryOptions ¶
type ActiveHistoryOptions func(ac *ActiveHistory)
func WithFlush ¶
func WithFlush(flush func(*service.HistoryStep, []*service.HistoryItem)) ActiveHistoryOptions
func WithStep ¶
func WithStep(step int64) ActiveHistoryOptions
type Bucket ¶
type Bucket = gql.RunResumeStatusModelProjectBucketRun
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection is the connection for a stream. It is a wrapper around the underlying connection It handles the incoming messages from the client and passes them to the stream
func NewConnection ¶
func NewConnection( ctx context.Context, conn net.Conn, teardown chan struct{}, ) *Connection
NewConnection creates a new connection
func (*Connection) HandleConnection ¶
func (nc *Connection) HandleConnection()
HandleConnection handles the connection by reading from the connection and passing the messages to the stream and writing messages from the stream to the connection
func (*Connection) Respond ¶
func (nc *Connection) Respond(resp *service.ServerResponse)
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
func NewDispatcher ¶
func NewDispatcher(logger *observability.NexusLogger) *Dispatcher
func (*Dispatcher) AddResponders ¶
func (d *Dispatcher) AddResponders(entries ...ResponderEntry)
AddResponders adds the given responders to the stream's dispatcher.
type FileHandler ¶
type FileHandler struct {
// contains filtered or unexported fields
}
func NewFileHandler ¶
func NewFileHandler(logger *observability.NexusLogger, watcherOutChan chan *service.Record) *FileHandler
func (*FileHandler) Close ¶
func (fh *FileHandler) Close()
Close closes the file handler and the watcher
func (*FileHandler) Final ¶
func (fh *FileHandler) Final() *service.Record
Final returns the stored record to be uploaded at the end of the run (DeferRequest_FLUSH_DIR)
func (*FileHandler) Handle ¶
func (fh *FileHandler) Handle(record *service.Record) *service.Record
Handle handles file uploads preprocessing, depending on their policies: - NOW: upload immediately - END: upload at the end of the run - LIVE: upload immediately, on changes, and at the end of the run
func (*FileHandler) Start ¶
func (fh *FileHandler) Start()
Start starts the file handler and the watcher
type FileTransferHandler ¶
type FileTransferHandler struct { DedupedBytes int64 // contains filtered or unexported fields }
func NewFileTransferHandler ¶
func NewFileTransferHandler() *FileTransferHandler
func (*FileTransferHandler) GetDedupedBytes ¶
func (fth *FileTransferHandler) GetDedupedBytes() int64
func (*FileTransferHandler) GetFileCounts ¶
func (fth *FileTransferHandler) GetFileCounts() *service.FileCounts
TODO: with the new rust client we want to get rid of this
func (*FileTransferHandler) GetTotalBytes ¶
func (fth *FileTransferHandler) GetTotalBytes() int64
func (*FileTransferHandler) GetUploadedBytes ¶
func (fth *FileTransferHandler) GetUploadedBytes() int64
func (*FileTransferHandler) Handle ¶
func (fth *FileTransferHandler) Handle(record *service.FileTransferInfoRequest)
func (*FileTransferHandler) IsDone ¶
func (fth *FileTransferHandler) IsDone() bool
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler is the handler for a stream it handles the incoming messages, processes them and passes them to the writer
func NewHandler ¶
func NewHandler( ctx context.Context, settings *service.Settings, logger *observability.NexusLogger, ) *Handler
NewHandler creates a new handler
func (*Handler) DisableSummaryDebouncer ¶
func (h *Handler) DisableSummaryDebouncer()
func (*Handler) SetInboundChannels ¶
type HandlerInterface ¶
type Item ¶
type Item[T comparable] struct { // contains filtered or unexported fields }
type MetricHandler ¶
type MetricHandler struct {
// contains filtered or unexported fields
}
func NewMetricHandler ¶
func NewMetricHandler() *MetricHandler
type MetricSender ¶
type MetricSender struct {
// contains filtered or unexported fields
}
func NewMetricSender ¶
func NewMetricSender() *MetricSender
type PriorityQueue ¶
type PriorityQueue[T comparable] []*Item[T]
func (PriorityQueue[T]) Len ¶
func (pq PriorityQueue[T]) Len() int
func (PriorityQueue[T]) Less ¶
func (pq PriorityQueue[T]) Less(i, j int) bool
func (*PriorityQueue[T]) Pop ¶
func (pq *PriorityQueue[T]) Pop() interface{}
func (*PriorityQueue[T]) Push ¶
func (pq *PriorityQueue[T]) Push(x interface{})
func (PriorityQueue[T]) Swap ¶
func (pq PriorityQueue[T]) Swap(i, j int)
type ReservoirSampling ¶
type ReservoirSampling[T comparable] struct { // contains filtered or unexported fields }
func (*ReservoirSampling[T]) Add ¶
func (r *ReservoirSampling[T]) Add(value T)
func (*ReservoirSampling[T]) GetSample ¶
func (r *ReservoirSampling[T]) GetSample() []T
type Responder ¶
type Responder interface {
Respond(response *service.ServerResponse)
}
type ResponderEntry ¶
type ResumeState ¶
type ResumeState struct {
FileStreamOffset fs.FileStreamOffsetMap
}
func NewResumeState ¶
func NewResumeState() *ResumeState
func (*ResumeState) AddOffset ¶
func (r *ResumeState) AddOffset(key fs.ChunkTypeEnum, offset int)
func (*ResumeState) GetFileStreamOffset ¶
func (r *ResumeState) GetFileStreamOffset() fs.FileStreamOffsetMap
type Sender ¶
type Sender struct { // RunRecord is the run record RunRecord *service.RunRecord // contains filtered or unexported fields }
Sender is the sender for a stream it handles the incoming messages and sends to the server or/and to the dispatcher/handler
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the nexus server
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store is the persistent store for a stream
func NewStore ¶
func NewStore(ctx context.Context, fileName string, logger *observability.NexusLogger) (*Store, error)
NewStore creates a new store
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream is a collection of components that work together to handle incoming data for a W&B run, store it locally, and send it to a W&B server. Stream.handler receives incoming data from the client and dispatches it to Stream.writer, which writes it to a local file. Stream.writer then sends the data to Stream.sender, which sends it to the W&B server. Stream.dispatcher handles dispatching responses to the appropriate client responders.
func (*Stream) AddResponders ¶
func (s *Stream) AddResponders(entries ...ResponderEntry)
AddResponders adds the given responders to the stream's dispatcher.
func (*Stream) Close ¶
func (s *Stream) Close()
Close Gracefully wait for handler, writer, sender, dispatcher to shut down cleanly assumes an exit record has already been sent
func (*Stream) FinishAndClose ¶
func (*Stream) HandleRecord ¶
HandleRecord handles the given record by sending it to the stream's handler.
func (*Stream) PrintFooter ¶
func (s *Stream) PrintFooter()
func (*Stream) Respond ¶
func (s *Stream) Respond(resp *service.ServerResponse)
Respond Handle internal responses like from the finish and close path
func (*Stream) SetHandler ¶
func (s *Stream) SetHandler(handler HandlerInterface)
type StreamMux ¶
type StreamMux struct {
// contains filtered or unexported fields
}
StreamMux is a multiplexer for streams. It is thread-safe and is used to ensure that only one stream exists for a given streamId so that we can safely add responders to streams.
func (*StreamMux) FinishAndCloseAllStreams ¶
FinishAndCloseAllStreams closes all streams in the mux.
type Timer ¶
type Timer struct {
// contains filtered or unexported fields
}
Timer is used to track the run start and execution times
func (*Timer) GetStartTimeMicro ¶
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer is responsible for writing messages to the append-only log. It receives messages from the handler, processes them, if the message is to be persisted it writes them to the log. It also sends the messages to the sender.
func NewWriter ¶
func NewWriter(ctx context.Context, settings *service.Settings, logger *observability.NexusLogger) *Writer
NewWriter returns a new Writer