service

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2020 License: Apache-2.0 Imports: 43 Imported by: 0

Documentation

Index

Constants

View Source
const AcceptHeader string = "Accept"
View Source
const AuthorizationHeader string = "Authorization"

Variables

This section is empty.

Functions

func Get

func Get(ctx context.Context, url string, linkedAccount store.LinkedAccount, linkedCloud store.LinkedCloud, v interface{}) error

func NewHTTP

func NewHTTP(requestHandler *RequestHandler, authInterceptor kitNetHttp.Interceptor) *http.Server

NewHTTP returns HTTP server

func ParseAuth

func ParseAuth(auth string) (token, sub string, err error)

func RefreshToken

func RefreshToken(ctx context.Context, linkedAccount store.LinkedAccount, linkedCloud store.LinkedCloud, oauthCallback string, s *Store) (store.LinkedAccount, error)

Types

type Cache

type Cache struct {
	// contains filtered or unexported fields
}

func NewCache

func NewCache() *Cache

func (*Cache) Dump

func (s *Cache) Dump() interface{}

func (*Cache) DumpClouds

func (s *Cache) DumpClouds() map[string]*CloudData

func (*Cache) DumpDevices

func (s *Cache) DumpDevices() []subscriptionData

func (*Cache) DumpLinkedAccounts

func (s *Cache) DumpLinkedAccounts() []provisionCacheData

func (*Cache) DumpTasks

func (s *Cache) DumpTasks() []Task

func (*Cache) LoadCloud

func (s *Cache) LoadCloud(cloudID string) (store.LinkedCloud, bool)

func (*Cache) LoadDeviceSubscription

func (s *Cache) LoadDeviceSubscription(cloudID, linkedAccountID, deviceID string) (subscriptionData, bool)

func (*Cache) LoadDevicesSubscription

func (s *Cache) LoadDevicesSubscription(cloudID, linkedAccountID string) (subscriptionData, bool)

func (*Cache) LoadOrCreateCloud

func (s *Cache) LoadOrCreateCloud(cloud store.LinkedCloud) (store.LinkedCloud, bool)

func (*Cache) LoadOrCreateLinkedAccount

func (s *Cache) LoadOrCreateLinkedAccount(linkedAccount store.LinkedAccount) (store.LinkedAccount, bool, error)

func (*Cache) LoadOrCreateSubscription

func (s *Cache) LoadOrCreateSubscription(sub Subscription) (subscriptionData, bool, error)

func (*Cache) LoadResourceSubscription

func (s *Cache) LoadResourceSubscription(cloudID, linkedAccountID, deviceID, href string) (subscriptionData, bool)

func (*Cache) LoadSubscription

func (s *Cache) LoadSubscription(ID string) (subscriptionData, bool)

func (*Cache) PullOutCloud

func (s *Cache) PullOutCloud(cloudID string) (*CloudData, bool)

func (*Cache) PullOutDevice

func (s *Cache) PullOutDevice(cloudID, linkedAccountID, deviceID string) (*DeviceData, bool)

func (*Cache) PullOutLinkedAccount

func (s *Cache) PullOutLinkedAccount(cloudID, linkedAccountID string) (*LinkedAccountData, bool)

func (*Cache) PullOutResource

func (s *Cache) PullOutResource(cloudID, linkedAccountID, deviceID, href string) (*ResourceData, bool)

func (*Cache) PullOutSubscription

func (s *Cache) PullOutSubscription(subscripionID string) (subscriptionData, bool)

func (*Cache) UpdateLinkedAccount

func (s *Cache) UpdateLinkedAccount(l store.LinkedAccount) error

type CloudData

type CloudData struct {
	// contains filtered or unexported fields
}

func NewCloudData

func NewCloudData(linkedCloud store.LinkedCloud) *CloudData

func (*CloudData) Dump

func (d *CloudData) Dump() interface{}

func (*CloudData) DumpLinkedAccounts

func (d *CloudData) DumpLinkedAccounts() map[string]*LinkedAccountData

func (*CloudData) DumpTasks

func (d *CloudData) DumpTasks() []Task

type Config

type Config struct {
	grpc.Config
	AuthServerAddr        string              `envconfig:"AUTH_SERVER_ADDRESS" default:"127.0.0.1:9100"`
	ResourceAggregateAddr string              `envconfig:"RESOURCE_AGGREGATE_ADDRESS"  default:"127.0.0.1:9100"`
	ResourceDirectoryAddr string              `envconfig:"RESOURCE_DIRECTORY_ADDRESS"  default:"127.0.0.1:9100"`
	OAuthCallback         string              `envconfig:"OAUTH_CALLBACK"`
	EventsURL             string              `envconfig:"EVENTS_URL"`
	PullDevicesDisabled   bool                `envconfig:"PULL_DEVICES_DISABLED" default:"false"`
	PullDevicesInterval   time.Duration       `envconfig:"PULL_DEVICES_INTERVAL" default:"5s"`
	TaskProcessor         TaskProcessorConfig `envconfig:"TASK_PROCESSOR"`
	ReconnectInterval     time.Duration       `envconfig:"RECONNECT_INTERVAL" default:"10s"`
	ResubscribeInterval   time.Duration       `envconfig:"RESUBSCRIBE_INTERVAL" default:"10s"`
	JwksURL               string              `envconfig:"JWKS_URL"`
	OAuth                 manager.Config      `envconfig:"OAUTH"`
}

Config represent application configuration

func (Config) String

func (c Config) String() string

String return string representation of Config

type Device

type Device struct {
	Device schema.Device `json:"device"`
	Status string        `json:"status"`
}

type DeviceData

type DeviceData struct {
	// contains filtered or unexported fields
}

func NewDeviceData

func NewDeviceData() *DeviceData

func (*DeviceData) Dump

func (d *DeviceData) Dump() interface{}

func (*DeviceData) DumpResources

func (d *DeviceData) DumpResources() map[string]*ResourceData

func (*DeviceData) DumpTasks

func (d *DeviceData) DumpTasks(linkedCloud store.LinkedCloud, linkedAccount store.LinkedAccount, deviceID string) []Task

func (*DeviceData) LoadOrCreate

func (d *DeviceData) LoadOrCreate(sub Subscription) (Subscription, bool)

func (*DeviceData) PullOut

func (d *DeviceData) PullOut(sub Subscription) (Subscription, bool)

func (*DeviceData) Subscription

func (d *DeviceData) Subscription() (Subscription, bool)

type DevicesSubscription

type DevicesSubscription struct {
	// contains filtered or unexported fields
}

func NewDevicesSubscription

func NewDevicesSubscription(ctx context.Context, rdClient pb.GrpcGatewayClient, raClient pbRA.ResourceAggregateClient, reconnectInterval time.Duration) *DevicesSubscription

func (*DevicesSubscription) Add

func (c *DevicesSubscription) Add(deviceID string, linkedAccount store.LinkedAccount, linkedCloud store.LinkedCloud) error

func (*DevicesSubscription) Delete

func (c *DevicesSubscription) Delete(userID, deviceID string) error

type DialCertManager

type DialCertManager = interface {
	GetClientTLSConfig() *tls.Config
}

type LinkedAccountData

type LinkedAccountData struct {
	// contains filtered or unexported fields
}

func NewLinkedAccountData

func NewLinkedAccountData(linkedAccount store.LinkedAccount) *LinkedAccountData

func (*LinkedAccountData) Dump

func (d *LinkedAccountData) Dump() interface{}

func (*LinkedAccountData) DumpDevices

func (d *LinkedAccountData) DumpDevices() map[string]*DeviceData

func (*LinkedAccountData) DumpTasks

func (d *LinkedAccountData) DumpTasks(linkedCloud store.LinkedCloud) []Task

func (*LinkedAccountData) LinkedAccount

func (d *LinkedAccountData) LinkedAccount() store.LinkedAccount

func (*LinkedAccountData) LoadOrCreate

func (d *LinkedAccountData) LoadOrCreate(sub Subscription) (Subscription, bool)

func (*LinkedAccountData) PullOut

func (d *LinkedAccountData) PullOut(sub Subscription) (Subscription, bool)

func (*LinkedAccountData) Subscription

func (d *LinkedAccountData) Subscription() (Subscription, bool)

type LinkedAccountHandler

type LinkedAccountHandler struct {
	// contains filtered or unexported fields
}

func (*LinkedAccountHandler) Handle

func (h *LinkedAccountHandler) Handle(ctx context.Context, iter store.LinkedAccountIter) (err error)

type LinkedCloudHandler

type LinkedCloudHandler struct {
	// contains filtered or unexported fields
}

func (*LinkedCloudHandler) Handle

func (h *LinkedCloudHandler) Handle(ctx context.Context, iter store.LinkedCloudIter) (err error)

type ListenCertManager

type ListenCertManager = interface {
	GetServerTLSConfig() *tls.Config
}

type Representation

type Representation struct {
	Href           string      `json:"href"`
	Representation interface{} `json:"rep"`
}

type RequestHandler

type RequestHandler struct {
	// contains filtered or unexported fields
}

RequestHandler for handling incoming request

func NewRequestHandler

func NewRequestHandler(
	oauthCallback string,
	subManager *SubscriptionManager,
	asClient pbAS.AuthorizationServiceClient,
	raClient pbRA.ResourceAggregateClient,
	store *Store,
	triggerTask func(Task),
) *RequestHandler

NewRequestHandler factory for new RequestHandler

func (*RequestHandler) AddLinkedAccount

func (rh *RequestHandler) AddLinkedAccount(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) AddLinkedCloud

func (rh *RequestHandler) AddLinkedCloud(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) DeleteLinkedAccount

func (rh *RequestHandler) DeleteLinkedAccount(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) DeleteLinkedCloud

func (rh *RequestHandler) DeleteLinkedCloud(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) HandleLinkedAccount

func (rh *RequestHandler) HandleLinkedAccount(ctx context.Context, linkedCloud store.LinkedCloud, authCode string) (store.Token, error)

func (*RequestHandler) HandleOAuth

func (rh *RequestHandler) HandleOAuth(w http.ResponseWriter, r *http.Request, linkedAccount store.LinkedAccount, linkedCloud store.LinkedCloud) (int, error)

func (*RequestHandler) OAuthCallback

func (rh *RequestHandler) OAuthCallback(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) ProcessEvent

func (rh *RequestHandler) ProcessEvent(w http.ResponseWriter, r *http.Request)

func (*RequestHandler) RetrieveLinkedClouds

func (rh *RequestHandler) RetrieveLinkedClouds(w http.ResponseWriter, r *http.Request)

type ResourceData

type ResourceData struct {
	// contains filtered or unexported fields
}

func NewResourceData

func NewResourceData() *ResourceData

func (*ResourceData) Dump

func (d *ResourceData) Dump() interface{}

func (*ResourceData) DumpTasks

func (d *ResourceData) DumpTasks(linkedCloud store.LinkedCloud, linkedAccount store.LinkedAccount, deviceID, href string) []Task

func (*ResourceData) LoadOrCreate

func (d *ResourceData) LoadOrCreate(sub Subscription) (Subscription, bool)

func (*ResourceData) PullOut

func (d *ResourceData) PullOut(sub Subscription) (Subscription, bool)

func (*ResourceData) Subscription

func (d *ResourceData) Subscription() (Subscription, bool)

type RetrieveDeviceContentAllResponse

type RetrieveDeviceContentAllResponse struct {
	Device
	Links []Representation `json:"links"`
}

type RetrieveDeviceWithLinksResponse

type RetrieveDeviceWithLinksResponse struct {
	Device
	Links []schema.ResourceLink `json:"links"`
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server handle HTTP request

func New

func New(config Config, dialCertManager DialCertManager, listenCertManager ListenCertManager, db connectorStore.Store) *Server

New create new Server with provided store and bus

func (*Server) Serve

func (s *Server) Serve() error

Serve starts the service's HTTP server and blocks.

func (*Server) Shutdown

func (s *Server) Shutdown() error

Shutdown ends serving

type Store

type Store struct {
	// contains filtered or unexported fields
}

func NewStore

func NewStore(ctx context.Context, db store.Store) (*Store, error)

func (*Store) Dump

func (s *Store) Dump() interface{}

func (*Store) DumpDevices

func (s *Store) DumpDevices() []subscriptionData

func (*Store) DumpLinkedAccounts

func (s *Store) DumpLinkedAccounts() []provisionCacheData

func (*Store) DumpTasks

func (s *Store) DumpTasks() []Task

func (*Store) LoadCloud

func (s *Store) LoadCloud(cloudID string) (store.LinkedCloud, bool)

func (*Store) LoadDeviceSubscription

func (s *Store) LoadDeviceSubscription(cloudID, linkedAccountID, deviceID string) (subscriptionData, bool)

func (*Store) LoadDevicesSubscription

func (s *Store) LoadDevicesSubscription(cloudID, linkedAccountID string) (subscriptionData, bool)

func (*Store) LoadOrCreateCloud

func (s *Store) LoadOrCreateCloud(ctx context.Context, cloud store.LinkedCloud) (store.LinkedCloud, bool, error)

func (*Store) LoadOrCreateLinkedAccount

func (s *Store) LoadOrCreateLinkedAccount(ctx context.Context, linkedAccount store.LinkedAccount) (store.LinkedAccount, bool, error)

func (*Store) LoadOrCreateSubscription

func (s *Store) LoadOrCreateSubscription(sub Subscription) (subscriptionData, bool, error)

func (*Store) LoadResourceSubscription

func (s *Store) LoadResourceSubscription(cloudID, linkedAccountID, deviceID, href string) (subscriptionData, bool)

func (*Store) LoadSubscription

func (s *Store) LoadSubscription(subscripionID string) (subscriptionData, bool)

func (*Store) PullOutCloud

func (s *Store) PullOutCloud(ctx context.Context, cloudID string) (*CloudData, error)

func (*Store) PullOutDevice

func (s *Store) PullOutDevice(cloudID, linkedAccountID, deviceID string) (*DeviceData, bool)

func (*Store) PullOutLinkedAccount

func (s *Store) PullOutLinkedAccount(ctx context.Context, cloudID, linkedAccountID string) (*LinkedAccountData, error)

func (*Store) PullOutResource

func (s *Store) PullOutResource(cloudID, linkedAccountID, deviceID, href string) (*ResourceData, bool)

func (*Store) PullOutSubscription

func (s *Store) PullOutSubscription(subscripionID string) (subscriptionData, bool)

func (*Store) UpdateLinkedAccount

func (s *Store) UpdateLinkedAccount(ctx context.Context, linkedAccount store.LinkedAccount) error

type Subscription

type Subscription struct {
	ID              string
	Type            Type
	LinkedAccountID string
	LinkedCloudID   string
	DeviceID        string
	Href            string
	SigningSecret   string
	CorrelationID   string
}

type SubscriptionManager

type SubscriptionManager struct {
	// contains filtered or unexported fields
}

func NewSubscriptionManager

func NewSubscriptionManager(
	EventsURL string,
	asClient pbAS.AuthorizationServiceClient,
	raClient pbRA.ResourceAggregateClient,
	store *Store,
	devicesSubscription *DevicesSubscription,
	oauthCallback string,
	triggerTask func(Task),
	interval time.Duration,
) *SubscriptionManager

func (*SubscriptionManager) HandleCancelEvent

func (s *SubscriptionManager) HandleCancelEvent(ctx context.Context, header events.EventHeader, linkedAccount store.LinkedAccount) error

func (*SubscriptionManager) HandleDeviceEvent

func (s *SubscriptionManager) HandleDeviceEvent(ctx context.Context, header events.EventHeader, body []byte, subscriptionData subscriptionData) error

HandleDeviceEvent handles device events.

func (*SubscriptionManager) HandleDevicesEvent

func (s *SubscriptionManager) HandleDevicesEvent(ctx context.Context, header events.EventHeader, body []byte, d subscriptionData) error

func (*SubscriptionManager) HandleDevicesOffline

func (s *SubscriptionManager) HandleDevicesOffline(ctx context.Context, d subscriptionData, header events.EventHeader, devices events.DevicesOffline) error

HandleDevicesOffline sets device off to resource aggregate and unregister device to projection.

func (*SubscriptionManager) HandleDevicesOnline

func (s *SubscriptionManager) HandleDevicesOnline(ctx context.Context, d subscriptionData, header events.EventHeader, devices events.DevicesOnline) error

HandleDevicesOnline sets device online to resource aggregate and register device to projection.

func (*SubscriptionManager) HandleDevicesRegistered

func (s *SubscriptionManager) HandleDevicesRegistered(ctx context.Context, d subscriptionData, devices events.DevicesRegistered, header events.EventHeader) error

func (*SubscriptionManager) HandleDevicesUnregistered

func (s *SubscriptionManager) HandleDevicesUnregistered(ctx context.Context, subscriptionData subscriptionData, correlationID string, devices events.DevicesUnregistered) error

func (*SubscriptionManager) HandleEvent

func (s *SubscriptionManager) HandleEvent(ctx context.Context, header events.EventHeader, body []byte) (int, error)

func (*SubscriptionManager) HandleResourceChangedEvent

func (s *SubscriptionManager) HandleResourceChangedEvent(ctx context.Context, subscriptionData subscriptionData, header events.EventHeader, body []byte) error

func (*SubscriptionManager) HandleResourceEvent

func (s *SubscriptionManager) HandleResourceEvent(ctx context.Context, header events.EventHeader, body []byte, subscriptionData subscriptionData) error

func (*SubscriptionManager) HandleResourcesPublished

func (s *SubscriptionManager) HandleResourcesPublished(ctx context.Context, d subscriptionData, header events.EventHeader, links events.ResourcesPublished) error

HandleResourcesPublished publish resources to resource aggregate and subscribes to resources.

func (*SubscriptionManager) HandleResourcesUnpublished

func (s *SubscriptionManager) HandleResourcesUnpublished(ctx context.Context, d subscriptionData, header events.EventHeader, links events.ResourcesUnpublished) error

HandleResourcesUnpublished unpublish resources from resource aggregate and cancel resources subscriptions.

func (*SubscriptionManager) Run

func (s *SubscriptionManager) Run(ctx context.Context)

func (*SubscriptionManager) SubscribeToDevice

func (s *SubscriptionManager) SubscribeToDevice(ctx context.Context, deviceID string, linkedAccount store.LinkedAccount, linkedCloud store.LinkedCloud) error

func (*SubscriptionManager) SubscribeToDevices

func (s *SubscriptionManager) SubscribeToDevices(ctx context.Context, linkedAccount store.LinkedAccount, linkedCloud store.LinkedCloud) error

func (*SubscriptionManager) SubscribeToResource

func (s *SubscriptionManager) SubscribeToResource(ctx context.Context, deviceID, href string, linkedAccount store.LinkedAccount, linkedCloud store.LinkedCloud) error

type Task

type Task struct {
	// contains filtered or unexported fields
}

type TaskProcessor

type TaskProcessor struct {
	// contains filtered or unexported fields
}

func NewTaskProcessor

func NewTaskProcessor(raClient pbRA.ResourceAggregateClient, maxParallelGets int64, cacheSize int, timeout, delay time.Duration) *TaskProcessor

func (*TaskProcessor) Run

func (h *TaskProcessor) Run(ctx context.Context, subscriptionManager *SubscriptionManager) error

func (*TaskProcessor) Trigger

func (h *TaskProcessor) Trigger(task Task)

type TaskProcessorConfig

type TaskProcessorConfig struct {
	CacheSize   int           `envconfig:"CACHE_SIZE" default:"2048"`
	Timeout     time.Duration `envconfig:"TIMEOUT" default:"5s"`
	MaxParallel int64         `envconfig:"MAX_PARALLEL" default:"128"`
	Delay       time.Duration `envconfig:"DELAY"` // Used for CTT test with 10s.
}

type TaskType

type TaskType string
const (
	TaskType_PullDevice          TaskType = "pulldevice"
	TaskType_SubscribeToDevices  TaskType = "subdevices"
	TaskType_SubscribeToDevice   TaskType = "subdevice"
	TaskType_SubscribeToResource TaskType = "subresource"
)

type Type

type Type string
const (
	Type_Devices  Type = "devices"
	Type_Device   Type = "device"
	Type_Resource Type = "resource"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL