Documentation ¶
Index ¶
- Constants
- Variables
- func NewProducerConfig(proxyAddress string, topic string, routingHeader string) producer.MessageProducerConfig
- type Annotation
- type Annotations
- type CombinedModel
- type ContentModel
- type DataCombiner
- type DataCombinerI
- type Forwarder
- type Identifier
- type KafkaQConsumer
- type KafkaQMessage
- type MessageContent
- type MsgProcessor
- type MsgProcessorConfig
- type QConsumer
- type RequestProcessor
- type RequestProcessorI
- type Thing
Constants ¶
View Source
const ( CombinerOrigin = "forced-combined-msg" ContentType = "application/json" )
View Source
const (
CombinerMessageType = "cms-combined-content-published"
)
Variables ¶
View Source
var ( NotFoundError = errors.New("content not found") // used when the content can not be found by the platform InvalidContentTypeError = errors.New("invalid content type") )
Functions ¶
func NewProducerConfig ¶
func NewProducerConfig(proxyAddress string, topic string, routingHeader string) producer.MessageProducerConfig
Types ¶
type Annotation ¶
type Annotation struct {
Thing `json:"thing,omitempty"`
}
type Annotations ¶
type Annotations struct { Annotations []Annotation `json:"annotations"` UUID string `json:"uuid"` }
type CombinedModel ¶
type CombinedModel struct { UUID string `json:"uuid"` Content ContentModel `json:"content"` Metadata []Annotation `json:"metadata"` ContentURI string `json:"contentUri"` LastModified string `json:"lastModified"` MarkedDeleted string `json:"markedDeleted"` }
type ContentModel ¶
type ContentModel map[string]interface{}
type DataCombiner ¶
type DataCombiner struct { ContentRetriever contentRetrieverI MetadataRetriever metadataRetrieverI }
func (DataCombiner) GetCombinedModel ¶
func (dc DataCombiner) GetCombinedModel(uuid string) (CombinedModel, error)
func (DataCombiner) GetCombinedModelForAnnotations ¶
func (dc DataCombiner) GetCombinedModelForAnnotations(metadata Annotations) (CombinedModel, error)
func (DataCombiner) GetCombinedModelForContent ¶
func (dc DataCombiner) GetCombinedModelForContent(content ContentModel) (CombinedModel, error)
type DataCombinerI ¶
type DataCombinerI interface { GetCombinedModelForContent(content ContentModel) (CombinedModel, error) GetCombinedModelForAnnotations(metadata Annotations) (CombinedModel, error) GetCombinedModel(uuid string) (CombinedModel, error) }
func NewDataCombiner ¶
type Forwarder ¶
type Forwarder struct { MsgProducer producer.MessageProducer SupportedContentTypes []string }
func NewForwarder ¶
func NewForwarder(msgProducer producer.MessageProducer, supportedContentTypes []string) Forwarder
type Identifier ¶
type KafkaQConsumer ¶
type KafkaQConsumer struct { Consumer consumer.MessageConsumer // contains filtered or unexported fields }
func NewKafkaQConsumer ¶
func NewKafkaQConsumer(cConf consumer.QueueConfig, ch chan<- *KafkaQMessage, client *http.Client) *KafkaQConsumer
func (*KafkaQConsumer) ProcessMsg ¶
func (c *KafkaQConsumer) ProcessMsg(m consumer.Message)
type KafkaQMessage ¶
type KafkaQMessage struct {
// contains filtered or unexported fields
}
type MessageContent ¶
type MessageContent struct { ContentURI string `json:"contentUri"` ContentModel ContentModel `json:"payload"` LastModified string `json:"lastModified"` }
type MsgProcessor ¶
type MsgProcessor struct { DataCombiner DataCombinerI Forwarder Forwarder // contains filtered or unexported fields }
func NewMsgProcessor ¶
func NewMsgProcessor(srcCh <-chan *KafkaQMessage, config MsgProcessorConfig, dataCombiner DataCombinerI, producer producer.MessageProducer, whitelistedContentTypes []string) *MsgProcessor
func (*MsgProcessor) ProcessMessages ¶
func (p *MsgProcessor) ProcessMessages()
type MsgProcessorConfig ¶
type MsgProcessorConfig struct { SupportedContentURIs []string SupportedHeaders []string ContentTopic string MetadataTopic string }
func NewMsgProcessorConfig ¶
func NewMsgProcessorConfig(supportedURIs []string, supportedHeaders []string, contentTopic string, metadataTopic string) MsgProcessorConfig
type RequestProcessor ¶
type RequestProcessor struct { DataCombiner DataCombinerI Forwarder Forwarder }
func NewRequestProcessor ¶
func NewRequestProcessor(dataCombiner DataCombinerI, producer producer.MessageProducer, whitelistedContentTypes []string) *RequestProcessor
func (*RequestProcessor) ForceMessagePublish ¶
func (p *RequestProcessor) ForceMessagePublish(uuid string, tid string) error
type RequestProcessorI ¶
Click to show internal directories.
Click to hide internal directories.