dealtracker

package
v0.5.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2024 License: Apache-2.0, MIT Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrAlreadyRunning = errors.New("another worker already running")
View Source
var Logger = log.Logger("dealtracker")

Functions

This section is empty.

Types

type Cid

type Cid struct {
	Root string `json:"/" mapstructure:"/"`
}

type CloserFunc

type CloserFunc func() error

func (CloserFunc) Close

func (c CloserFunc) Close() error

type Counter added in v0.3.0

type Counter interface {
	N() int64
	Speed() float64
}

Counter represents an interface for counting operations.

The Counter interface defines two methods: N and Speed. Implementing types should provide implementations for these methods.

N returns the current byte count.

Speed returns the speed of the counting operation in bytes per second.

func DealStateStreamFromHTTPRequest

func DealStateStreamFromHTTPRequest(request *http.Request, depth int, decompress bool) (chan *jstream.MetaValue, Counter, io.Closer, error)

DealStateStreamFromHTTPRequest retrieves the deal state from an HTTP request and returns a stream of jstream.MetaValue, along with a Counter, io.Closer, and any error encountered.

The function takes the following parameters:

  • request: The HTTP request to retrieve the deal state.
  • depth: The depth of the JSON decoding.
  • decompress: A boolean flag indicating whether to decompress the response body.

The function performs the following steps:

  1. Sends an HTTP request using http.DefaultClient.Do.

  2. If an error occurs during the request, it returns nil for the channel, Counter, io.Closer, and the error wrapped with an appropriate message.

  3. If the response status code is not http.StatusOK, it closes the response body and returns nil for the channel, Counter, io.Closer, and an error indicating the failure.

  4. Creates a countingReader using NewCountingReader to count the number of bytes read from the response body.

  5. If decompress is true, creates a zstd decompressor using zstd.NewReader and wraps it in a ThreadSafeReadCloser. - If an error occurs during decompression, it closes the response body and returns nil for the channel, Counter, io.Closer, and the error wrapped with an appropriate message. - Creates a jstream.Decoder using jstream.NewDecoder with the decompressor and specified depth, and sets it to emit key-value pairs. - Creates a CloserFunc that closes the decompressor and response body.

  6. If decompress is false, creates a jstream.Decoder using jstream.NewDecoder with the countingReader and specified depth, and sets it to emit key-value pairs. - Sets the response body as the closer.

  7. Returns the jstream.MetaValue stream from jsonDecoder.Stream(), the countingReader, closer, and nil for the error.

type CountingReader added in v0.3.0

type CountingReader struct {
	// contains filtered or unexported fields
}

CountingReader is an io.Reader that counts the number of bytes read

func NewCountingReader added in v0.3.0

func NewCountingReader(r io.Reader) *CountingReader

func (*CountingReader) N added in v0.3.0

func (cr *CountingReader) N() int64

N returns the number of bytes read so far.

func (*CountingReader) Read added in v0.3.0

func (cr *CountingReader) Read(p []byte) (n int, err error)

Read reads data from the underlying reader and updates the byte count. It implements the io.Reader interface.

Parameters:

  • p: The byte slice to read data into.

Returns:

  • n: The number of bytes read.
  • err: Any error encountered during the read operation.

func (*CountingReader) Speed added in v0.3.0

func (cr *CountingReader) Speed() float64

Speed returns the number of bytes read per second

type Deal

type Deal struct {
	Proposal DealProposal
	State    DealState
}

func (Deal) GetState

func (d Deal) GetState(headTime time.Time) model.DealState

func (Deal) Key

func (d Deal) Key() string

type DealProposal

type DealProposal struct {
	PieceCID             Cid
	PieceSize            int64
	VerifiedDeal         bool
	Client               string
	Provider             string
	Label                string
	StartEpoch           int32
	EndEpoch             int32
	StoragePricePerEpoch string
}

type DealState

type DealState struct {
	SectorStartEpoch int32
	LastUpdatedEpoch int32
	SlashEpoch       int32
}

type DealTracker

type DealTracker struct {
	// contains filtered or unexported fields
}

func NewDealTracker

func NewDealTracker(
	db *gorm.DB,
	interval time.Duration,
	dealZstURL string,
	lotusURL string,
	lotusToken string,
	once bool) DealTracker

func (*DealTracker) Name added in v0.3.0

func (*DealTracker) Name() string

func (*DealTracker) Start added in v0.3.0

func (d *DealTracker) Start(ctx context.Context, exitErr chan<- error) error

Start starts the DealTracker and returns a list of service.Done channels, a service.Fail channel, and an error.

The Start method takes a context.Context as input and performs the following steps:

  1. Defines a getState function that returns a healthcheck.State with JobType set to model.DealTracking.

  2. Registers the worker using healthcheck.Register with the provided context, dbNoContext, workerID, getState function, and false for the force flag. - If an error occurs during registration, it returns nil for the service.Done channels, nil for the service.Fail channel, and the error wrapped with an appropriate message. - If another worker is already running, it logs a warning and checks if d.once is true. If d.once is true, it returns nil for the service.Done channels, nil for the service.Fail channel, and an error indicating that another worker is already running.

  3. Logs a warning message and waits for 1 minute before retrying. - If the context is done during the wait, it returns nil for the service.Done channels, nil for the service.Fail channel, and the context error.

  4. Starts reporting health using healthcheck.StartReportHealth with the provided context, dbNoContext, workerID, and getState function in a separate goroutine.

  5. Runs the main loop in a separate goroutine. - Calls d.runOnce to execute the main logic of the DealTracker. - If an error occurs during execution, it logs an error message. - If d.once is true, it returns from the goroutine. - Waits for the specified interval before running the next iteration. - If the context is done during the wait, it returns from the goroutine.

  6. Cleans up resources when the context is done. - Calls d.cleanup to perform cleanup operations. - If an error occurs during cleanup, it logs an error message.

  7. Returns a list of service.Done channels containing healthcheckDone, runDone, and cleanupDone, the service.Fail channel fail, and nil for the error.

type KnownDeal added in v0.3.0

type KnownDeal struct {
	State model.DealState
}

type ThreadSafeReadCloser added in v0.3.0

type ThreadSafeReadCloser struct {
	// contains filtered or unexported fields
}

ThreadSafeReadCloser is a thread-safe implementation of the io.ReadCloser interface.

The ThreadSafeReadCloser struct has the following fields:

  • reader: The underlying io.Reader.
  • closer: The function to close the reader.
  • closed: A boolean indicating whether the reader is closed.
  • mu: A mutex used to synchronize access to the closed field.

The ThreadSafeReadCloser struct implements the io.ReadCloser interface and provides the following methods:

  • Read: Reads data from the underlying reader. It acquires a lock on the mutex to ensure thread safety.
  • Close: Closes the reader. It acquires a lock on the mutex to ensure thread safety and sets the closed field to true before calling the closer function.

func (*ThreadSafeReadCloser) Close added in v0.3.0

func (t *ThreadSafeReadCloser) Close()

func (*ThreadSafeReadCloser) Read added in v0.3.0

func (t *ThreadSafeReadCloser) Read(p []byte) (n int, err error)

type UnknownDeal added in v0.3.0

type UnknownDeal struct {
	ID         model.DealID
	ClientID   string
	Provider   string
	PieceCID   model.CID
	StartEpoch int32
	EndEpoch   int32
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL