Documentation ¶
Index ¶
- Constants
- func NewBackend(logger *observability.CoreLogger, settings *settings.Settings) *api.Backend
- func NewFileStream(backend *api.Backend, logger *observability.CoreLogger, ...) filestream.FileStream
- func NewFileTransferManager(fileTransferStats filetransfer.FileTransferStats, ...) filetransfer.FileTransferManager
- func NewGraphQLClient(backend *api.Backend, settings *settings.Settings, ...) graphql.Client
- func NewRunfilesUploader(ctx context.Context, logger *observability.CoreLogger, ...) runfiles.Uploader
- type Connection
- type Dispatcher
- type Git
- type Handler
- type HandlerParams
- type Header
- type HeaderOptions
- type MetricHandler
- type MetricSender
- type Responder
- type ResponderEntry
- type Sender
- type SenderParams
- type Server
- type Store
- type Stream
- func (s *Stream) AddResponders(entries ...ResponderEntry)
- func (s *Stream) Close()
- func (s *Stream) FinishAndClose(exitCode int32)
- func (s *Stream) GetRun() *service.RunRecord
- func (s *Stream) HandleRecord(rec *service.Record)
- func (s *Stream) PrintFooter()
- func (s *Stream) Respond(resp *service.ServerResponse)
- func (s *Stream) Start()
- type StreamMux
- type SyncService
- type SyncServiceOption
- func WithSyncServiceFlushCallback(syncResultCallback func(error)) SyncServiceOption
- func WithSyncServiceLogger(logger *observability.CoreLogger) SyncServiceOption
- func WithSyncServiceOverwrite(overwrite *service.SyncOverwrite) SyncServiceOption
- func WithSyncServiceSenderFunc(senderFunc func(*service.Record)) SyncServiceOption
- func WithSyncServiceSkip(skip *service.SyncSkip) SyncServiceOption
- type TBHandler
- type Tokenizer
- type Writer
- type WriterOption
- type WriterParams
Constants ¶
const ( MetaFileName = "wandb-metadata.json" SummaryFileName = "wandb-summary.json" OutputFileName = "output.log" DiffFileName = "diff.patch" RequirementsFileName = "requirements.txt" ConfigFileName = "config.yaml" )
const BufferSize = 32
const (
// RFC3339Micro Modified from time.RFC3339Nano
RFC3339Micro = "2006-01-02T15:04:05.000000Z07:00"
)
Variables ¶
This section is empty.
Functions ¶
func NewBackend ¶
func NewBackend( logger *observability.CoreLogger, settings *settings.Settings, ) *api.Backend
NewBackend returns a Backend or nil if we're offline.
func NewFileStream ¶
func NewFileStream( backend *api.Backend, logger *observability.CoreLogger, settings *settings.Settings, peeker api.Peeker, ) filestream.FileStream
func NewFileTransferManager ¶
func NewFileTransferManager( fileTransferStats filetransfer.FileTransferStats, logger *observability.CoreLogger, settings *settings.Settings, ) filetransfer.FileTransferManager
func NewGraphQLClient ¶
func NewRunfilesUploader ¶
func NewRunfilesUploader( ctx context.Context, logger *observability.CoreLogger, settings *settings.Settings, fileStream filestream.FileStream, fileTransfer filetransfer.FileTransferManager, graphQL graphql.Client, ) runfiles.Uploader
Types ¶
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection is the connection for a stream. It is a wrapper around the underlying connection It handles the incoming messages from the client and passes them to the stream
func NewConnection ¶
func NewConnection( ctx context.Context, conn net.Conn, teardown chan struct{}, ) *Connection
NewConnection creates a new connection
func (*Connection) HandleConnection ¶
func (nc *Connection) HandleConnection()
HandleConnection handles the connection by reading from the connection and passing the messages to the stream and writing messages from the stream to the connection
func (*Connection) Respond ¶
func (nc *Connection) Respond(resp *service.ServerResponse)
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
func NewDispatcher ¶
func NewDispatcher(logger *observability.CoreLogger) *Dispatcher
func (*Dispatcher) AddResponders ¶
func (d *Dispatcher) AddResponders(entries ...ResponderEntry)
AddResponders adds the given responders to the stream's dispatcher.
type Git ¶
type Git struct {
// contains filtered or unexported fields
}
func NewGit ¶
func NewGit(path string, logger *observability.CoreLogger) *Git
func (*Git) IsAvailable ¶
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler is the handler for a stream it handles the incoming messages, processes them and passes them to the writer
func NewHandler ¶
func NewHandler( ctx context.Context, params *HandlerParams, ) *Handler
NewHandler creates a new handler
type HandlerParams ¶
type HandlerParams struct { Settings *service.Settings FwdChan chan *service.Record OutChan chan *service.Result Logger *observability.CoreLogger Mailbox *mailbox.Mailbox RunSummary *runsummary.RunSummary MetricHandler *MetricHandler FileTransferStats filetransfer.FileTransferStats RunfilesUploader runfiles.Uploader TBHandler *TBHandler SystemMonitor *monitor.SystemMonitor }
type HeaderOptions ¶
func NewHeader ¶
func NewHeader() *HeaderOptions
NewHeader returns a new header with default values.
func (*HeaderOptions) MarshalBinary ¶
func (o *HeaderOptions) MarshalBinary(w io.Writer) error
MarshalBinary encodes the header to binary format.
func (*HeaderOptions) UnmarshalBinary ¶
func (o *HeaderOptions) UnmarshalBinary(r io.Reader) error
UnmarshalBinary decodes binary data into the header.
func (*HeaderOptions) Valid ¶
func (o *HeaderOptions) Valid() bool
Valid checks if the header is valid based on a reference header.
type MetricHandler ¶
type MetricHandler struct {
// contains filtered or unexported fields
}
func NewMetricHandler ¶
func NewMetricHandler() *MetricHandler
type MetricSender ¶
type MetricSender struct {
// contains filtered or unexported fields
}
func NewMetricSender ¶
func NewMetricSender() *MetricSender
type Responder ¶
type Responder interface {
Respond(response *service.ServerResponse)
}
type ResponderEntry ¶
type Sender ¶
type Sender struct { // RunRecord is the run record // TODO: remove this and use properly updated settings // + a flag indicating whether the run has started RunRecord *service.RunRecord // contains filtered or unexported fields }
Sender is the sender for a stream it handles the incoming messages and sends to the server or/and to the dispatcher/handler
func NewSender ¶
func NewSender( ctx context.Context, cancel context.CancelFunc, params *SenderParams, ) *Sender
NewSender creates a new Sender with the given settings
func (*Sender) SendRecord ¶
type SenderParams ¶
type SenderParams struct { Logger *observability.CoreLogger Settings *service.Settings Backend *api.Backend FileStream fs.FileStream FileTransferManager filetransfer.FileTransferManager RunfilesUploader runfiles.Uploader GraphqlClient graphql.Client Peeker *observability.Peeker RunSummary *runsummary.RunSummary Mailbox *mailbox.Mailbox OutChan chan *service.Result FwdChan chan *service.Record }
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the core server
func (*Server) SetDefaultLoggerPath ¶
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store is the persistent store for a stream
func NewStore ¶
func NewStore(ctx context.Context, fileName string, logger *observability.CoreLogger) *Store
NewStore creates a new store
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream is a collection of components that work together to handle incoming data for a W&B run, store it locally, and send it to a W&B server. Stream.handler receives incoming data from the client and dispatches it to Stream.writer, which writes it to a local file. Stream.writer then sends the data to Stream.sender, which sends it to the W&B server. Stream.dispatcher handles dispatching responses to the appropriate client responders.
func (*Stream) AddResponders ¶
func (s *Stream) AddResponders(entries ...ResponderEntry)
AddResponders adds the given responders to the stream's dispatcher.
func (*Stream) Close ¶
func (s *Stream) Close()
Close Gracefully wait for handler, writer, sender, dispatcher to shut down cleanly assumes an exit record has already been sent
func (*Stream) FinishAndClose ¶
func (*Stream) HandleRecord ¶
HandleRecord handles the given record by sending it to the stream's handler.
func (*Stream) PrintFooter ¶
func (s *Stream) PrintFooter()
func (*Stream) Respond ¶
func (s *Stream) Respond(resp *service.ServerResponse)
Respond Handle internal responses like from the finish and close path
type StreamMux ¶
type StreamMux struct {
// contains filtered or unexported fields
}
StreamMux is a multiplexer for streams. It is thread-safe and is used to ensure that only one stream exists for a given streamId so that we can safely add responders to streams.
func (*StreamMux) FinishAndCloseAllStreams ¶
FinishAndCloseAllStreams closes all streams in the mux.
type SyncService ¶
type SyncService struct {
// contains filtered or unexported fields
}
func NewSyncService ¶
func NewSyncService(ctx context.Context, opts ...SyncServiceOption) *SyncService
func (*SyncService) Close ¶
func (s *SyncService) Close()
func (*SyncService) Flush ¶
func (s *SyncService) Flush()
func (*SyncService) Start ¶
func (s *SyncService) Start()
func (*SyncService) SyncRecord ¶
func (s *SyncService) SyncRecord(record *service.Record, err error)
type SyncServiceOption ¶
type SyncServiceOption func(*SyncService)
func WithSyncServiceFlushCallback ¶
func WithSyncServiceFlushCallback(syncResultCallback func(error)) SyncServiceOption
func WithSyncServiceLogger ¶
func WithSyncServiceLogger(logger *observability.CoreLogger) SyncServiceOption
func WithSyncServiceOverwrite ¶
func WithSyncServiceOverwrite(overwrite *service.SyncOverwrite) SyncServiceOption
func WithSyncServiceSenderFunc ¶
func WithSyncServiceSenderFunc(senderFunc func(*service.Record)) SyncServiceOption
func WithSyncServiceSkip ¶
func WithSyncServiceSkip(skip *service.SyncSkip) SyncServiceOption
type TBHandler ¶
type TBHandler struct { Active bool // contains filtered or unexported fields }
func NewTBHandler ¶
func NewTBHandler( watcher *watcher.Watcher, logger *observability.CoreLogger, settings *service.Settings, outChan chan *service.Record, ) *TBHandler
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer is responsible for writing messages to the append-only log. It receives messages from the handler, processes them, if the message is to be persisted it writes them to the log. It also sends the messages to the sender.
func NewWriter ¶
func NewWriter(ctx context.Context, params *WriterParams) *Writer
NewWriter returns a new Writer
type WriterOption ¶
type WriterOption func(*Writer)
func WithWriterFwdChannel ¶
func WithWriterFwdChannel(fwd chan *service.Record) WriterOption
func WithWriterSettings ¶
func WithWriterSettings(settings *service.Settings) WriterOption
type WriterParams ¶
type WriterParams struct { Logger *observability.CoreLogger Settings *service.Settings FwdChan chan *service.Record }