Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var Command = &cobra.Command{ Use: "wfx", Short: "wfx server", Long: `This API allows to create and execute reusable workflows for clients. Each workflow is modeled as a state machine running in the storage, with tasks to be executed by clients. Examples of tasks are installation of firmware or other types of commands issued to clients.`, PersistentPreRunE: func(cmd *cobra.Command, _ []string) error { f := cmd.Flags() knownOptions := make(map[string]bool, 64) f.VisitAll(func(flag *pflag.Flag) { knownOptions[flag.Name] = true }) mergeFn := koanf.WithMergeFunc(func(src, dest map[string]any) error { for k, v := range src { if _, exists := knownOptions[k]; !exists { fmt.Fprintf(os.Stderr, "ERROR: Unknown config option '%s'", k) } dest[k] = v } return nil }) cFiles, _ := f.GetStringSlice(configFlag) var fileProvider *file.File for _, fname := range cFiles { if _, err := os.Stat(fname); err == nil { fileProvider = file.Provider(fname) k.Write(func(k *koanf.Koanf) { if err := k.Load(fileProvider, yaml.Parser(), mergeFn); err != nil { fmt.Fprintf(os.Stderr, "ERROR: Failed to load config file '%s'", fname) } }) } } envProvider := env.Provider("WFX_", ".", func(s string) string { return strings.ReplaceAll(strings.ToLower(strings.TrimPrefix(s, "WFX_")), "_", "-") }) k.Write(func(k *koanf.Koanf) { if err := k.Load(envProvider, nil, mergeFn); err != nil { fmt.Fprintln(os.Stderr, "ERROR: Could not load env variables") } if err := k.Load(posflag.Provider(f, ".", k), nil); err != nil { fmt.Fprintln(os.Stderr, "ERROR: Could not load CLI flags") } }) // now that we have merged all config sources, set up logger var logLevel, logFormat string k.Read(func(k *koanf.Koanf) { logLevel = k.String(logLevelFlag) logFormat = k.String(logFormatFlag) }) setupLogging(os.Stdout, logFormat, logLevel) if _, err := maxprocs.Set(maxprocs.Logger(log.Printf)); err != nil { log.Warn().Err(err).Msg("Failed to set GOMAXPROCS") } if fileProvider != nil { err := fileProvider.Watch(func(_ interface{}, err error) { if err == nil { k.Write(func(k *koanf.Koanf) { if err := k.Load(fileProvider, yaml.Parser(), mergeFn); err == nil { if err := reloadConfig(k); err != nil { log.Error().Err(err).Msg("Failed to reload config") } } }) } }) if err != nil { log.Error().Err(err).Msg("Failed to set up config file watcher") } } return nil }, RunE: func(*cobra.Command, []string) error { var username string if u, err := user.Current(); err == nil { username = u.Username } log.Info(). Str("version", metadata.Version). Str("date", metadata.Date). Str("commit", metadata.Commit). Str("user", username). Msg("Starting wfx") var name, options string k.Read(func(k *koanf.Koanf) { name = k.String(storageFlag) options = k.String(storageOptFlag) }) log.Debug().Str("name", name).Str("options", options).Msg("Setting up persistence storage") if name != preferedStorage && options == defaultStorageOpts { options = "" } storage := persistence.GetStorage(name) if storage == nil { return fmt.Errorf("unknown storage %s", name) } var err error for i := 0; i < 300; i++ { log.Debug().Str("name", name).Msg("Initializing storage") err = storage.Initialize(context.Background(), options) if err == nil { log.Info().Str("name", name).Msg("Initialized storage") break } dur := time.Second log.Warn(). Err(err). Str("storage", name). Msg("Failed to initialize persistent storage. Trying again in one second...") time.Sleep(dur) } if err != nil { return fault.Wrap(err) } defer storage.Shutdown() signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM) var schemes []string k.Read(func(k *koanf.Koanf) { schemes = k.Strings(schemeFlag) }) serverCollections := make([]*serverCollection, 0, 3) chQuit := make(chan error) { collection, err := createNorthboundCollection(schemes, storage, chQuit) if err != nil { return fault.Wrap(err) } serverCollections = append(serverCollections, collection) } { collection, err := createSouthboundCollection(schemes, storage, chQuit) if err != nil { return fault.Wrap(err) } serverCollections = append(serverCollections, collection) } listeners, _ := activation.Listeners() serverCollections = append(serverCollections, adoptListeners(listeners, storage, chQuit)...) for _, collection := range serverCollections { for i := range collection.servers { name := collection.name srv := collection.servers[i] go func() { if err := launchServer(name, srv); err != nil { chQuit <- err } }() } } running := true for running { select { case <-signalChannel: running = false case <-chQuit: running = false } } events.ShutdownSubscribers() // create a context with a timeout to allow outstanding requests to complete var timeout time.Duration k.Read(func(k *koanf.Koanf) { timeout = k.Duration(gracefulTimeoutFlag) }) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() for _, collection := range serverCollections { collection.Shutdown(ctx) } return nil }, }
Functions ¶
func LoadNorthboundPlugins ¶ added in v0.2.0
func LoadNorthboundPlugins(chan error) ([]middleware.IntermediateMW, error)
func LoadSouthboundPlugins ¶ added in v0.2.0
func LoadSouthboundPlugins(chan error) ([]middleware.IntermediateMW, error)
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.