pulsar

package module
v0.0.0-...-cc3673b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2023 License: MIT Imports: 11 Imported by: 1

README

pulsar-client-go

English | 中文版

Introduction

Tuya pulsar client SDK for Golang

Preparation

  1. AccessID: Provided by Tuya platform.
  2. AccessKey: provided by Tuya platform.
  3. Pulsar address: Select the pulsar address according to different business areas. You can find out the address from documents.

Example

package main

import (
	"context"
	"encoding/base64"
	"encoding/json"

	pulsar "github.com/tuya/tuya-pulsar-sdk-go"
	"github.com/tuya/tuya-pulsar-sdk-go/pkg/tylog"
	"github.com/tuya/tuya-pulsar-sdk-go/pkg/tyutils"
)

func main() {
	// SetInternalLogLevel(logrus.DebugLevel)
	tylog.SetGlobalLog("sdk", false)
	accessID := "accessID"
	accessKey := "accessKey"
	topic := pulsar.TopicForAccessID(accessID)

	// create client
	cfg := pulsar.ClientConfig{
		PulsarAddr: pulsar.PulsarAddrCN,
	}
	c := pulsar.NewClient(cfg)

	// create consumer
	csmCfg := pulsar.ConsumerConfig{
		Topic: topic,
		Auth:  pulsar.NewAuthProvider(accessID, accessKey),
	}
	csm, _ := c.NewConsumer(csmCfg)

	// handle message
	csm.ReceiveAndHandle(context.Background(), &helloHandler{AesSecret: accessKey[8:24]})
}

type helloHandler struct {
	AesSecret string
}

func (h *helloHandler) HandlePayload(ctx context.Context, msg pulsar.Message, payload []byte) error {
	tylog.Info("payload preview", tylog.String("payload", string(payload)))

	// let's decode the payload with AES
	m := map[string]interface{}{}
	err := json.Unmarshal(payload, &m)
	if err != nil {
		tylog.Error("json unmarshal failed", tylog.ErrorField(err))
		return nil
	}
	bs := m["data"].(string)
	de, err := base64.StdEncoding.DecodeString(string(bs))
	if err != nil {
		tylog.Error("base64 decode failed", tylog.ErrorField(err))
		return nil
	}
	decode := tyutils.EcbDecrypt(de, []byte(h.AesSecret))
	tylog.Info("aes decode", tylog.ByteString("decode payload", decode))

	return nil
}



Precautions

  1. Make sure that the accessID and accessKey are correct.
  2. Make sure that the Pulsar address is correct, you should use pulsar://mqe.tuyaus.com:7285 instead of pulsar+ssl://mqe.tuyaus.com:7285.
  3. Make sure that the SDK code version you use is the latest.

About debug

Through the following code, you can see all communications with the pulsar service in the terminal.

func main(){
	pulsar.SetInternalLogLevel(logrus.DebugLevel)
	// other code
}

Through the following code, you can see the log information of tuya_pulsar_go_sdk. At the same time, the log will be saved in the logs/sdk.log file.

func main(){
	tylog.SetGlobalLog("sdk", false)
}

In a formal environment, you may not want the SDK logs to be output to the terminal. It is recommended that you use the following code to output the log to a file.

func main(){
	tylog.SetGlobalLog("sdk", true)
}

Support

You can get support from Tuya with the following methods:

Documentation

Index

Constants

View Source
const (
	PulsarAddrCN = "pulsar+ssl://mqe.tuyacn.com:7285"
	PulsarAddrEU = "pulsar+ssl://mqe.tuyaeu.com:7285"
	PulsarAddrUS = "pulsar+ssl://mqe.tuyaus.com:7285"
)

Variables

This section is empty.

Functions

func NewAuthProvider

func NewAuthProvider(accessID, accessKey string) *authProvider

func SetInternalLogLevel

func SetInternalLogLevel(level logrus.Level)

func TopicForAccessID

func TopicForAccessID(accessID string) string

Types

type Client

type Client interface {
	NewConsumer(config ConsumerConfig) (Consumer, error)
}

func NewClient

func NewClient(cfg ClientConfig) Client

type ClientConfig

type ClientConfig struct {
	PulsarAddr string
	Auth       interface{}
}

type Consumer

type Consumer interface {
	ReceiveAndHandle(ctx context.Context, handler PayloadHandlerV2)
	Close() error
}

type ConsumerConfig

type ConsumerConfig struct {
	Topic        string
	Subscription string
	Auth         interface{}
}

type Message

type Message = pulsar.Message

type PayloadHandlerV2

type PayloadHandlerV2 interface {
	HandlePayload(ctx context.Context, msg Message, payload []byte) error
}

type ProducerMessage

type ProducerMessage struct {
	Payload []byte
	Key     string
}

Directories

Path Synopsis
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL