streaming

package
v1.0.1-0...-386defc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2023 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Overview

Copyright 2022 Google LLC

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

This section is empty.

Variables

View Source
var (

	// Max allowed value for maxWorkers and numWorkers.
	MAX_WORKER_LIMIT int32 = 1000
	// Min allowed value for maxWorkers and numWorkers.
	MIN_WORKER_LIMIT int32 = 1
)

Functions

func CleanUpStreamingJobs

func CleanUpStreamingJobs(ctx context.Context, conv *internal.Conv, projectID, region string) error

func CleanupDataflowJob

func CleanupDataflowJob(ctx context.Context, client *dataflow.JobsV1Beta3Client, dataflowJobId string, projectID, region string) error

func CleanupDatastream

func CleanupDatastream(ctx context.Context, client *datastream.Client, dsName string, projectID, region string) error

func CleanupPubsubResources

func CleanupPubsubResources(ctx context.Context, pubsubClient *pubsub.Client, storageClient *storage.Client, pubsubCfg internal.PubsubCfg, projectID string)

func CreatePubsubResources

func CreatePubsubResources(ctx context.Context, projectID string, datastreamDestinationConnCfg DstConnCfg, dbName string) (*internal.PubsubCfg, error)

func LaunchDataflowJob

func LaunchDataflowJob(ctx context.Context, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv) (internal.DataflowOutput, error)

LaunchDataflowJob populates the parameters from the streaming config and triggers a Dataflow job.

func LaunchStream

func LaunchStream(ctx context.Context, sourceProfile profiles.SourceProfile, dbList []profiles.LogicalShard, projectID string, datastreamCfg DatastreamCfg) error

LaunchStream populates the parameters from the streaming config and triggers a stream on Cloud Datastream.

func StartDataflow

func StartDataflow(ctx context.Context, targetProfile profiles.TargetProfile, streamingCfg StreamingCfg, conv *internal.Conv) (internal.DataflowOutput, error)

func StoreGeneratedResources

func StoreGeneratedResources(conv *internal.Conv, streamingCfg StreamingCfg, dfJobId, gcloudDataflowCmd, project, dataShardId string)

func VerifyAndUpdateCfg

func VerifyAndUpdateCfg(streamingCfg *StreamingCfg, dbName string, tableList []string) error

VerifyAndUpdateCfg checks the fields and errors out if certain fields are empty. It then auto-populates certain empty fields like StreamId and Dataflow JobName.

Types

type DataflowCfg

type DataflowCfg struct {
	JobName             string
	Location            string
	HostProjectId       string
	Network             string
	Subnetwork          string
	MaxWorkers          string
	NumWorkers          string
	ServiceAccountEmail string
	DbNameToShardIdMap  map[string]string
}

type DatastreamCfg

type DatastreamCfg struct {
	StreamId                    string
	StreamLocation              string
	StreamDisplayName           string
	SourceConnectionConfig      SrcConnCfg
	DestinationConnectionConfig DstConnCfg
	Properties                  string
	// contains filtered or unexported fields
}

type DstConnCfg

type DstConnCfg struct {
	Name     string
	Location string
	Prefix   string
}

type SrcConnCfg

type SrcConnCfg struct {
	Name     string
	Location string
}

type StreamingCfg

type StreamingCfg struct {
	DatastreamCfg DatastreamCfg
	DataflowCfg   DataflowCfg
	TmpDir        string
	PubsubCfg     internal.PubsubCfg
	DataShardId   string
}

func CreateStreamingConfig

func CreateStreamingConfig(pl profiles.DataShard) StreamingCfg

func ReadStreamingConfig

func ReadStreamingConfig(file, dbName string, tableList []string) (StreamingCfg, error)

ReadStreamingConfig reads the file and unmarshalls it into the StreamingCfg struct.

func StartDatastream

func StartDatastream(ctx context.Context, streamingCfg StreamingCfg, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile, tableList []string) (StreamingCfg, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL