instance

package
v1.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2022 License: Apache-2.0 Imports: 36 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AbortReconcileIfHintFound = xstorev1reconcile.NewStepBinder("AbortReconcileIfHintFound",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		if rc.ContainsControllerHint(xstoremeta.HintForbidden) {
			return flow.Wait("Found hint, abort reconcile.")
		}
		return flow.Pass()
	},
)
View Source
var AddLearnerNodesToClusterOnLeader = xstorev1reconcile.NewStepBinder("AddLearnerNodesToClusterOnLeader",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		pods, err := rc.GetXStorePods()
		if err != nil {
			return flow.Error(err, "Unable to get xstore pods.")
		}

		learnerPods := k8shelper.FilterPodsBy(pods, xstoremeta.IsPodRoleLearner)

		if len(learnerPods) == 0 {
			return flow.Pass()
		}

		leaderPod, err := rc.TryGetXStoreLeaderPod()
		if err != nil {
			return flow.Error(err, "Unable to get leader pod.")
		}

		if leaderPod == nil {
			return flow.Wait("Leader not found, keep waiting...")
		}

		for _, learnerPod := range learnerPods {
			cmd := xstoreexec.NewCanonicalCommandBuilder().
				Consensus().
				AddNode(learnerPod.Name, xstoremeta.RoleLearner).
				Build()

			err := rc.ExecuteCommandOn(leaderPod, convention.ContainerEngine, cmd, control.ExecOptions{
				Logger:  flow.Logger(),
				Timeout: 2 * time.Second,
			})

			if err != nil {
				return flow.Error(err, "Unable to add learner node.", "pod", learnerPod.Name)
			}
		}

		return flow.Continue("Learner nodes added.")
	},
)
View Source
var BindHostPathVolumesToHost = xstorev1reconcile.NewStepBinder("BindHostPathVolumesToHost",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		xstore := rc.MustGetXStore()

		pods, err := rc.GetXStorePods()
		if err != nil {
			return flow.Error(err, "Unable to get pods.")
		}

		binding := xstore.Status.BoundVolumes
		for _, pod := range pods {
			if len(pod.Spec.NodeName) == 0 {
				return flow.Wait("Some pod is still not scheduled.", "pod", pod.Name)
			}
			binding[pod.Name].Host = pod.Spec.NodeName
		}

		return flow.Pass()
	},
)
View Source
var BindPodPorts = xstorev1reconcile.NewStepBinder("BindPodPorts",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		xstore := rc.MustGetXStore()

		pods, err := rc.GetXStorePods()
		if err != nil {
			return flow.Error(err, "Unable to get pods.")
		}

		podPorts := make(map[string]polardbxv1xstore.PodPorts)
		for _, pod := range pods {
			ports := polardbxv1xstore.PodPorts{}
			for _, container := range pod.Spec.Containers {
				for _, port := range container.Ports {
					ports[port.Name] = port.ContainerPort
				}
			}
			podPorts[pod.Name] = ports
		}

		xstore.Status.PodPorts = podPorts
		return flow.Continue("Pod ports updated!")
	},
)
View Source
var CancelAsyncTasks = xstorev1reconcile.NewStepBinder("CancelAsyncTasks",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		pods, err := rc.GetXStorePods()
		if err != nil {
			return flow.Error(err, "Unable to get pods.")
		}

		hpfsClient, err := rc.GetHpfsClient()
		if err != nil {
			return flow.Error(err, "Unable to get hpfs client.")
		}

		for _, pod := range pods {

			err := CancelHpfsAsyncTasks(rc.Context(), hpfsClient, &pod)
			if err != nil {
				return flow.Error(err, "Unable to cancel async task", "pod", pod.Name)
			}
		}

		return flow.Pass()
	},
)
View Source
var CheckConnectivityAndSetEngineVersion = xstorev1reconcile.NewStepBinder("CheckConnectivityFromController",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		passwd, err := rc.GetXStoreAccountPassword(convention.SuperAccount)
		if err != nil {
			return flow.Error(err, "Unable to get password for super account.")
		}

		clusterAddr, err := rc.GetXStoreClusterAddr(convention.ServiceTypeReadWrite, convention.PortAccess)
		if err != nil {
			return flow.Error(err, "Unable to get cluster address.")
		}

		db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/information_schema?timeout=1s",
			convention.SuperAccount, passwd, clusterAddr))
		if err != nil {
			return flow.Error(err, "Unable to open connection to cluster address.")
		}
		defer db.Close()

		if err := db.PingContext(rc.Context()); err != nil {

			flow.Logger().Error(err, "Ping failed.")
			return flow.RetryAfter(10*time.Second, "Failed to ping, wait for 10 seconds and retry...")
		}

		row := db.QueryRowContext(rc.Context(), "SELECT VERSION()")
		var version string
		err = row.Scan(&version)
		if err != nil {
			return flow.Error(err, "Unable to read version")
		}
		xstore := rc.MustGetXStore()
		xstore.Status.EngineVersion = version

		return flow.Continue("Succeed.")
	},
)
View Source
var CheckTopologySpec = xstorev1reconcile.NewStepBinder("CheckTopologySpec",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		xstore := rc.MustGetXStore()
		err := checkTopologySpec(xstore)
		if err != nil {
			xstore.Status.Phase = polardbxv1xstore.PhaseFailed
			return flow.Error(err, "Check topology failed. Transfer phase into Failed.")
		}
		return flow.Pass()
	},
)
View Source
var CreateAccounts = xstorev1reconcile.NewStepBinder("CreateAccounts",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		leaderPod, err := rc.TryGetXStoreLeaderPod()
		if err != nil {
			return flow.Error(err, "Unable to get leader pod.")
		}

		if leaderPod == nil {
			return flow.RetryAfter(5*time.Second, "Leader not found, wait 5 seconds and retry...")
		}

		secret, err := rc.GetXStoreSecret()
		if err != nil {
			return flow.Error(err, "Unable to get secret.")
		}

		for user, passwd := range secret.Data {
			cmd := xstoreexec.NewCanonicalCommandBuilder().
				Account().Create(user, string(passwd)).
				Build()

			err := rc.ExecuteCommandOn(leaderPod, "engine", cmd, control.ExecOptions{
				Logger: flow.Logger(),
			})

			if err != nil {

				if k8shelper.IsExitError(err) {
					flow.Logger().Error(err, "Failed to create account.")
					return flow.Wait("Failed to create account.", "leader-pod", leaderPod.Name)
				}
				return flow.Error(err, "Unable to create account.", "leader-pod", leaderPod.Name)
			}
		}

		return flow.Continue("All accounts are created.")
	},
)
View Source
var CreateSecret = xstorev1reconcile.NewStepBinder("CreateSecret",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		xstore := rc.MustGetXStore()

		secret, err := rc.GetXStoreSecret()
		if client.IgnoreNotFound(err) != nil {
			return flow.Error(err, "Unable to get secret.")
		}

		if secret == nil {
			secret = factory.NewSecret(xstore)
			err := rc.SetControllerRefAndCreate(secret)
			if err != nil {
				return flow.Error(err, "Unable to create secret.")
			}
		}

		return flow.Continue("Secret ready.")
	},
)
View Source
var DeleteAllPods = xstorev1reconcile.NewStepBinder("DeleteAllPods",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		pods, err := rc.GetXStorePods()
		if err != nil {
			return flow.Error(err, "Unable to get pods.")
		}

		for _, pod := range pods {
			if pod.DeletionTimestamp.IsZero() {
				if err := rc.Client().Delete(rc.Context(), &pod); err != nil {
					if apierrors.IsNotFound(err) {
						continue
					}
					return flow.Error(err, "Unable to delete pod.", "pod", pod.Name)
				}
			}
		}

		return flow.Continue("All pods deleted.")
	},
)
View Source
var DeleteExecutionContext = xstorev1reconcile.NewStepBinder("DeleteExecutionContext",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		taskCm, err := rc.GetXStoreConfigMap(convention.ConfigMapTypeTask)
		if err != nil {
			return flow.Error(err, "Unable to get task config map.")
		}

		_, ok := taskCm.Data["exec"]
		if !ok {
			return flow.Pass()
		}

		delete(taskCm.Data, "exec")
		if err := rc.Client().Update(rc.Context(), taskCm); err != nil {
			return flow.Error(err, "Unable to update tass config map.")
		}
		return flow.Continue("Deleted!")
	},
)
View Source
var DeleteHostPathVolumes = xstorev1reconcile.NewStepBinder("DeleteHostPathVolumes",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		xstore := rc.MustGetXStore()

		volumes := xstore.Status.BoundVolumes
		if volumes == nil {
			return flow.Pass()
		}

		nodes, err := rc.GetNodes()
		if err != nil {
			return flow.Error(err, "Unable to get nodes.")
		}
		observedNodes := k8shelper.ToObjectNameSet(nodes)

		pods, err := rc.GetXStorePods()
		if err != nil {
			return flow.Error(err, "Unable to get pods.")
		}
		podMap := k8shelper.BuildPodMap(pods, func(pod *corev1.Pod) string {
			return pod.Name
		})

		hpfsClient, err := rc.GetHpfsClient()
		if err != nil {
			return flow.Error(err, "Unable to get hpfs client.")
		}
		for podName, vol := range volumes {

			if len(vol.Host) == 0 {
				pod := podMap[podName]
				if pod != nil {
					vol.Host = pod.Spec.NodeName
				}
			}

			if _, ok := observedNodes[vol.Host]; !ok {
				continue
			}

			err := DeleteHostPathVolume(rc.Context(), hpfsClient, vol)
			if err != nil {
				return flow.Error(err, "Unable to remove host path volume.", "vol.pod", podName,
					"vol.host", vol.Host, "vol.type", vol.Type, "vol.path", vol.HostPath)
			}
		}

		xstore.Status.BoundVolumes = nil

		return flow.Continue("Volumes are deleted.")
	})
View Source
var FillServiceNameIfNotProvided = xstorev1reconcile.NewStepBinder("FillServiceNameIfNotProvided",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		xstore := rc.MustGetXStore()
		if len(xstore.Spec.ServiceName) == 0 {
			xstore.Spec.ServiceName = xstore.Name
			rc.MarkXStoreChanged()
		}
		return flow.Pass()
	},
)
View Source
var GenerateRandInStatus = xstorev1reconcile.NewStepBinder("GenerateRandInStatus",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		xstore := rc.MustGetXStore()

		if len(xstore.Status.Rand) == 0 {
			if val, ok := xstore.Annotations[xstoremeta.AnnotationGuideRand]; ok {
				xstore.Status.Rand = val
			} else {
				xstore.Status.Rand = rand.String(4)
			}
		}

		return flow.Pass()
	},
)
View Source
var InjectFinalizerOnXStore = xstorev1reconcile.NewStepBinder("InjectFinalizerOnXStore",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		xstore := rc.MustGetXStore()

		if controllerutil.ContainsFinalizer(xstore, xstoremeta.Finalizer) {
			return flow.Pass()
		}

		controllerutil.AddFinalizer(xstore, xstoremeta.Finalizer)
		rc.MarkXStoreChanged()

		return flow.Continue("Inject finalizer.")
	},
)
View Source
var MoveToPhaseDeletingIfDeleted = xstorev1reconcile.NewStepBinder("MoveToPhaseDeletingIfDeleted",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		xstore, err := rc.GetXStore()
		if err != nil {
			return flow.Error(err, "Unable to get xstore.")
		}

		if xstore.Status.Phase == polardbxv1xstore.PhaseDeleting {
			return flow.Pass()
		}

		if !xstore.DeletionTimestamp.IsZero() {

			if len(xstore.Finalizers) == 0 ||
				(len(xstore.Finalizers) == 1 &&
					controllerutil.ContainsFinalizer(xstore, xstoremeta.Finalizer)) {
				xstore.Status.Phase = polardbxv1xstore.PhaseDeleting
				xstore.Status.Stage = polardbxv1xstore.StageEmpty
				return flow.Retry("Move phase to deleting. Retry immediately!")
			} else {
				return flow.Wait("Other finalizers found, wait until removed...")
			}
		}

		return flow.Pass()
	})
View Source
var PersistentStatus = xstorev1reconcile.NewStepBinder("PersistentStatus",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		if rc.IsXStoreStatusChanged() {
			if err := rc.UpdateXStoreStatus(); err != nil {
				return flow.Error(err, "Unable to persistent status.")
			}
			return flow.Continue("Succeeds to persistent status.")
		}
		return flow.Continue("Status not changed.")
	})
View Source
var PersistentXStore = xstorev1reconcile.NewStepBinder("PersistentXStore",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		if rc.IsXStoreChanged() {
			if err := rc.UpdateXStore(); err != nil {
				return flow.Error(err, "Unable to persistent xstore.")
			}
			return flow.Continue("Succeeds to persistent xstore.")
		}
		return flow.Continue("Object not changed.")
	})
View Source
var PrepareHostPathVolumes = xstorev1reconcile.NewStepBinder("PrepareHostPathVolumes",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		xstore := rc.MustGetXStore()

		podVolumes := preparePodVolumeBindings(xstore, rc.Config())
		volumes := make(map[string]*polardbxv1xstore.HostPathVolume)
		for pod, vPath := range podVolumes {
			volumes[pod] = &polardbxv1xstore.HostPathVolume{
				Pod:      pod,
				HostPath: vPath,
				Type:     corev1.HostPathDirectory,
			}
		}
		xstore.Status.BoundVolumes = volumes

		return flow.Continue("Host path volumes prepared.")
	},
)
View Source
var QueryAndUpdateEngineVersion = xstorev1reconcile.NewStepBinder("QueryAndUpdateEngineVersion",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		leaderPod, err := rc.TryGetXStoreLeaderPod()
		if err != nil {
			return flow.Error(err, "Unable to get leader pod.")
		}
		if leaderPod == nil {
			return flow.Wait("Leader pod not found, wait.")
		}

		cmd := command.NewCanonicalCommandBuilder().Engine().Version().Build()
		buf := &bytes.Buffer{}
		err = rc.ExecuteCommandOn(leaderPod, convention.ContainerEngine, cmd, control.ExecOptions{
			Logger:  flow.Logger(),
			Stdout:  buf,
			Timeout: 2 * time.Second,
		})
		if err != nil {
			return flow.Error(err, "Failed to query version on leader pod.", "pod", leaderPod.Name)
		}

		engineVersion := strings.TrimSpace(buf.String())
		if engineVersion == "" {
			return flow.Error(errors.New("empty engine version"), "Engine version is empty.")
		}

		xstore := rc.MustGetXStore()
		xstore.Status.EngineVersion = engineVersion

		return flow.Pass()
	},
)
View Source
var ReconcileConsensusRoleLabels = xstorev1reconcile.NewStepBinder("ReconcileConsensusRoleLabels",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		xstore := rc.MustGetXStore()

		pods, err := rc.GetXStorePods()
		if err != nil {
			return flow.Error(err, "Unable to get pods.")
		}

		flow.Logger().Info("Try detecting leader and reconciling the labels...")
		currentLeader, leaderSwitched := TryDetectLeaderAndTryReconcileLabels(rc, pods, flow.Logger())

		if len(currentLeader) == 0 {
			xstore.Status.LeaderPod = ""

			rc.UpdateXStoreCondition(&polardbxv1xstore.Condition{
				Type:    polardbxv1xstore.LeaderReady,
				Status:  corev1.ConditionFalse,
				Reason:  "LeaderNotFound",
				Message: "Leader not found",
			})

			return flow.Continue("Leader not found!")
		} else if leaderSwitched {
			xstore.Status.LeaderPod = currentLeader

			rc.UpdateXStoreCondition(&polardbxv1xstore.Condition{
				Type:    polardbxv1xstore.LeaderReady,
				Status:  corev1.ConditionTrue,
				Reason:  "LeaderFound",
				Message: "Leader found: " + currentLeader,
			})

			return flow.Continue("Leader changed!", "leader-pod", currentLeader)
		} else {
			xstore.Status.LeaderPod = currentLeader

			rc.UpdateXStoreCondition(&polardbxv1xstore.Condition{
				Type:    polardbxv1xstore.LeaderReady,
				Status:  corev1.ConditionTrue,
				Reason:  "LeaderFound",
				Message: "Leader found: " + currentLeader,
			})

			return flow.Continue("Leader not changed.", "leader-pod", currentLeader)
		}
	},
)
View Source
var RemoveFinalizerFromXStore = xstorev1reconcile.NewStepBinder("RemoveFinalizerFromXStore",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		xstore := rc.MustGetXStore()

		if !controllerutil.ContainsFinalizer(xstore, xstoremeta.Finalizer) {
			return flow.Pass()
		}

		controllerutil.RemoveFinalizer(xstore, xstoremeta.Finalizer)
		rc.MarkXStoreChanged()

		return flow.Continue("Remove finalizer.")
	},
)
View Source
var SyncBlkioCgroupResourceLimits = xstorev1reconcile.NewStepBinder("SyncBlkioCgroupResourceLimits",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		pods, err := rc.GetXStorePods()
		if err != nil {
			return flow.Error(err, "Unable to get pods.")
		}

		hpfsClient, err := rc.GetHpfsClient()
		if err != nil {
			return flow.Error(err, "Unable to get hpfs client")
		}

		ctx, cancel := context.WithTimeout(rc.Context(), 10*time.Second)
		defer cancel()

		xstore := rc.MustGetXStore()
		topology := xstore.Spec.Topology
		nodeSets := make(map[string]*polardbxv1xstore.NodeSet)
		for i := range topology.NodeSets {
			ns := &topology.NodeSets[i]
			nodeSets[ns.Name] = ns
		}

		volumes := xstore.Status.BoundVolumes
		for _, pod := range pods {
			ns := nodeSets[pod.Labels[xstoremeta.LabelNodeSet]]
			if ns == nil {
				continue
			}

			resources := topology.Template.Spec.Resources
			if ns.Template != nil && ns.Template.Spec.Resources != nil {
				resources = ns.Template.Spec.Resources
			}

			if resources == nil || resources.LimitsIO == nil {
				continue
			}

			blkioVal, ok := polardbxv1common.ResourceBlkioValueStr(resources.LimitsIO)
			if !ok {
				continue
			}

			annotationVal := pod.ObjectMeta.Annotations[xstoremeta.AnnotationBlkioResourceLimit]
			if annotationVal == blkioVal {
				continue
			}

			err := SyncBlkioCgroupValuesViaHpfs(ctx, hpfsClient, &pod, volumes[pod.Name], resources)
			if err != nil {
				return flow.Error(err, "Failed to sync blkio cgroup values.", "host", pod.Spec.NodeName,
					"pod", pod.Name, "blkio", blkioVal)
			}

			metav1.SetMetaDataAnnotation(&pod.ObjectMeta, xstoremeta.AnnotationBlkioResourceLimit, blkioVal)
			if err := rc.Client().Update(ctx, &pod); err != nil {
				return flow.Error(err, "Unable to update pod.", "pod", pod.Name)
			}
		}

		return flow.Continue("Succeeds to sync cgroup blkio values.")
	},
)
View Source
var UpdateDisplayStatus = xstorev1reconcile.NewStepBinder("UpdateDisplayStatus",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		xstore := rc.MustGetXStore()
		status := &xstore.Status

		topology := status.ObservedTopology

		if topology != nil {
			status.TotalPods = 0
			for _, t := range topology.NodeSets {
				status.TotalPods += t.Replicas
			}
		}

		pods, err := rc.GetXStorePods()
		if err != nil {
			return flow.Error(err, "Unable to get pods.")
		}

		status.ReadyPods = 0
		for _, po := range pods {
			if k8shelper.IsPodReady(&po) {
				status.ReadyPods++
			}
		}

		status.ReadyStatus = fmt.Sprintf("%d/%d", status.ReadyPods, status.TotalPods)

		if status.BoundVolumes != nil {
			totalDataDirSize := int64(0)
			for _, v := range status.BoundVolumes {
				if v != nil {
					totalDataDirSize += v.Size
				}
			}
			status.TotalDataDirSize = unit.ByteCountIEC(totalDataDirSize)
		}

		return flow.Continue("Display status updated!")
	})
View Source
var UpdateObservedGeneration = xstorev1reconcile.NewStepBinder("UpdateObservedGeneration",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		xstore := rc.MustGetXStore()
		prevGen := xstore.Status.ObservedGeneration
		xstore.Status.ObservedGeneration = xstore.Generation
		return flow.Continue("Update observed generation.", "previous-generation", prevGen,
			"current-generation", xstore.Generation)
	},
)
View Source
var UpdateObservedTopologyAndConfig = xstorev1reconcile.NewStepBinder("UpdateObservedTopologyAndConfig",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		xstore := rc.MustGetXStore()
		xstore.Status.ObservedTopology = xstore.Spec.Topology.DeepCopy()
		xstore.Status.ObservedConfig = xstore.Spec.Config.DeepCopy()
		return flow.Continue("Update observed topology and config.", "current-generation", xstore.Generation)
	},
)
View Source
var WaitUntilAsyncTasksCanceled = xstorev1reconcile.NewStepBinder("WaitUntilAsyncTasksCanceled",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		pods, err := rc.GetXStorePods()
		if err != nil {
			return flow.Error(err, "Unable to get pods.")
		}

		hpfsClient, err := rc.GetHpfsClient()
		if err != nil {
			return flow.Error(err, "Unable to get hpfs client.")
		}

		for _, pod := range pods {

			completed, err := IsHpfsAsyncTaskComplete(rc.Context(), hpfsClient, &pod)
			if err != nil {
				return flow.Error(err, "Unable to determine the async task's status", "pod", pod.Name)
			}
			if !completed {
				return flow.Wait("Found async hpfs task that is still not completed or canceled.", "pod", pod.Name)
			}
		}

		return flow.Pass()
	},
)
View Source
var WaitUntilCandidatesAndVotersReady = xstorev1reconcile.NewStepBinder("WaitUntilCandidatesAndVotersReady",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		pods, err := rc.GetXStorePods()
		if err != nil {
			return flow.Error(err, "Unable to get pods.")
		}

		for _, pod := range pods {

			if xstoremeta.IsPodRoleLearner(&pod) {
				continue
			}

			if !k8shelper.IsPodReady(&pod) {
				return flow.Wait("Found candidate or voter pod not ready. Just wait.",
					"pod", pod.Name, "pod.phase", pod.Status.Phase)
			}

			err := xstoreexec.CheckConnectivityLocally(rc, &pod, "engine", flow.Logger())
			if err != nil {
				return flow.Error(err, "Failed to check connectivity locally.", "pod", pod.Name)
			}
		}
		return flow.Continue("All candidates and voters are ready for connections.")
	},
)
View Source
var WaitUntilLeaderElected = xstorev1reconcile.NewStepBinder("WaitUntilLeaderElected",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		leaderPod, err := rc.TryGetXStoreLeaderPod()
		if err != nil {
			return flow.Error(err, "Unable to get leader pod.")
		}

		if leaderPod == nil {
			return flow.Wait("Leader not found, keep waiting...")
		}

		return flow.Continue("Leader found.", "leader-pod", leaderPod.Name)
	},
)
View Source
var WaitUntilLearnersReady = xstorev1reconcile.NewStepBinder("WaitUntilLearnersReady",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		pods, err := rc.GetXStorePods()
		if err != nil {
			return flow.Error(err, "Unable to get pods.")
		}

		for _, pod := range pods {

			if !xstoremeta.IsPodRoleLearner(&pod) {
				continue
			}

			if !k8shelper.IsPodReady(&pod) {
				return flow.Wait("Found learner pod not ready. Just wait.",
					"pod", pod.Name, "pod.phase", pod.Status.Phase)
			}

			err := xstoreexec.CheckConnectivityLocally(rc, &pod, "engine", flow.Logger())
			if err != nil {
				return flow.Error(err, "Failed to check connectivity locally.", "pod", pod.Name)
			}
		}
		return flow.Continue("All learners are ready for connections.")
	},
)
View Source
var WaitUntilPodsReady = xstorev1reconcile.NewStepBinder("WaitUntilPodsReady",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		pods, err := rc.GetXStorePods()
		if err != nil {
			return flow.Error(err, "Unable to get pods.")
		}

		unready := k8shelper.FilterPodsBy(pods, func(pod *corev1.Pod) bool {
			return !k8shelper.IsPodReady(pod)
		})

		if len(unready) > 0 {
			return flow.Wait("Found unready pods, keep waiting...", "unready-pods",
				strings.Join(k8shelper.ToObjectNames(unready), ","))
		}

		return flow.Pass()
	},
)
View Source
var WaitUntilPodsScheduled = xstorev1reconcile.NewStepBinder("WaitUntilPodsScheduled",
	func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		pods, err := rc.GetXStorePods()
		if err != nil {
			return flow.Error(err, "Unable to get pods.")
		}

		unscheduled := k8shelper.FilterPodsBy(pods, func(pod *corev1.Pod) bool {
			return !k8shelper.IsPodScheduled(pod) || pod.Status.PodIP == ""
		})

		if len(unscheduled) > 0 {
			return flow.Wait("Found unscheduled pods, keep waiting...", "unscheduled-pods",
				strings.Join(k8shelper.ToObjectNames(unscheduled), ","))
		}

		return flow.Pass()
	},
)

Functions

func CancelHpfsAsyncTasks

func CancelHpfsAsyncTasks(ctx context.Context, client hpfs.HpfsServiceClient, pod *corev1.Pod) error

func CreatePodsAndHeadlessServicesWithExtraFactory

func CreatePodsAndHeadlessServicesWithExtraFactory(extraPodFactory factory.ExtraPodFactory) control.BindFunc

func DeleteHostPathVolume

func DeleteHostPathVolume(ctx context.Context, hpfsClient hpfs.HpfsServiceClient, vol *polardbxv1xstore.HostPathVolume) error

func IsDiskQuotaExceeds

func IsDiskQuotaExceeds(xstore *polardbxv1.XStore) bool

func IsHpfsAsyncTaskComplete

func IsHpfsAsyncTaskComplete(ctx context.Context, client hpfs.HpfsServiceClient, pod *corev1.Pod) (bool, error)

func NewExecutionContext

func NewExecutionContext(rc *xstorev1reconcile.Context, xstore *polardbxv1.XStore, selfHeal bool) (*context.ExecutionContext, error)

func ReportRoleAndCurrentLeader

func ReportRoleAndCurrentLeader(rc *xstorev1reconcile.Context, pod *corev1.Pod, logger logr.Logger) (string, string, error)

func TrackAndLazyUpdateExecuteContext

func TrackAndLazyUpdateExecuteContext(current *context.ExecutionContext) control.BindFunc

func TryDetectLeaderAndTryReconcileLabels

func TryDetectLeaderAndTryReconcileLabels(rc *xstorev1reconcile.Context, pods []corev1.Pod, logger logr.Logger) (string, bool)

func TryDetectLeaderChange

func TryDetectLeaderChange(rc *xstorev1reconcile.Context, pods []corev1.Pod, logger logr.Logger) (string, bool)

func TryReconcileLabels

func TryReconcileLabels(rc *xstorev1reconcile.Context, pods []corev1.Pod, leaderPod string, logger logr.Logger)

func UpdateHostPathVolumeSizesTemplate

func UpdateHostPathVolumeSizesTemplate(d time.Duration) control.BindFunc

func UpdatePhaseTemplate

func UpdatePhaseTemplate(phase polardbxv1xstore.Phase, requeue ...bool) control.BindFunc

func UpdateStageTemplate

func UpdateStageTemplate(stage polardbxv1xstore.Stage, requeue ...bool) control.BindFunc

func WhenDiskQuotaExceeds

func WhenDiskQuotaExceeds(binders ...control.BindFunc) control.BindFunc

func WhenDiskQuotaNotExceeds

func WhenDiskQuotaNotExceeds(binders ...control.BindFunc) control.BindFunc

func WhenPodsDeletedFound

func WhenPodsDeletedFound(binders ...control.BindFunc) control.BindFunc

func WhenTopologyChanged

func WhenTopologyChanged(binders ...control.BindFunc) control.BindFunc

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL