cmd

package
v0.0.0-...-e1e30c8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2022 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var RootCmd = &cobra.Command{
	Use:   "pubsub-to-pubsub",
	Short: "pubsub-to-pubsub",
	Long:  "pubsub-to-pubsub",
	Run: func(cmd *cobra.Command, args []string) {
		ctx := context.Background()

		util.SetLogger(cfg.LogLevel, cfg.LogFormat)

		logrus.
			WithField(paramConfig, cfgFile).
			WithField(paramLogLevel, cfg.LogLevel).
			WithField(paramLogFormat, cfg.LogFormat).
			WithField(paramFromGoogleCloudProject, cfg.FromGoogleCloudProject).
			WithField(paramToGoogleCloudProject, cfg.ToGoogleCloudProject).
			WithField(paramFromGoogleApplicationCredentials, cfg.FromGoogleApplicationCredentials).
			WithField(paramToGoogleApplicationCredentials, cfg.ToGoogleApplicationCredentials).
			WithField(paramPubSubSubscription, cfg.PubSubSubscription).
			WithField(paramPubSubDestinationTopic, cfg.PubSubDestinationTopic).
			Debug("Configuration")

		if cfg.FromGoogleCloudProject == "" {
			_, _ = fmt.Fprintf(os.Stderr, "FROM_GOOGLE_CLOUD_PROJECT variable must be set.\n")
			os.Exit(1)
		}

		if cfg.ToGoogleCloudProject == "" {
			_, _ = fmt.Fprintf(os.Stderr, "TO_GOOGLE_CLOUD_PROJECT variable must be set.\n")
			os.Exit(1)
		}

		if cfg.PubSubSubscription == "" {
			_, _ = fmt.Fprintf(os.Stderr, "PUBSUB_SUBSCRIPTION variable must be set.\n")
			os.Exit(1)
		}
		if cfg.PubSubDestinationTopic == "" {
			_, _ = fmt.Fprintf(os.Stderr, "PUBSUB_DESTINATION_TOPIC variable must be set.\n")
			os.Exit(1)
		}

		fromCreds, err := google.CredentialsFromJSON(ctx, []byte(cfg.FromGoogleApplicationCredentials), pubsub.ScopePubSub)
		toCreds, err := google.CredentialsFromJSON(ctx, []byte(cfg.ToGoogleApplicationCredentials), pubsub.ScopePubSub)

		if err != nil {
			logrus.Fatalf("Could not find credentials: %v", err)
			os.Exit(1)
		}

		fromClient, err := pubsub.NewClient(ctx, cfg.FromGoogleCloudProject, option.WithCredentials(fromCreds))
		toClient, err := pubsub.NewClient(ctx, cfg.ToGoogleCloudProject, option.WithCredentials(toCreds))

		if err != nil {
			logrus.Fatalf("Could not create pubsub Client: %v", err)
			os.Exit(1)
		}

		sub := fromClient.Subscription(cfg.PubSubSubscription)
		sub.ReceiveSettings.MaxOutstandingMessages = pubSubMaxOutstandingMessages

		topic := toClient.Topic(cfg.PubSubDestinationTopic)

		err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
			if _, err = topic.Publish(ctx, msg).Get(ctx); err == nil {
				msg.Ack()
			} else {
				logrus.Errorf("err when inserting data: %v", err)
				msg.Nack()
			}
		})

		if err != nil {
			logrus.Fatal(err)
		}
	},
}

RootCmd represents the base command when called without any subcommands

Functions

func Execute

func Execute()

Execute adds all child commands to the root command sets flags appropriately. This is called by main.main(). It only needs to happen once to the rootCmd.

Types

type Config

type Config struct {
	LogFormat                        string
	LogLevel                         string
	FromGoogleCloudProject           string
	ToGoogleCloudProject             string
	FromGoogleApplicationCredentials string
	ToGoogleApplicationCredentials   string
	PubSubSubscription               string
	PubSubDestinationTopic           string
}

Config configuration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL