// Copyright (C) 2024 Umorpha Systems // SPDX-License-Identifier: AGPL-3.0-or-later package main import ( "cmp" "context" "encoding/json" "fmt" "os" "sync" "github.com/datawire/ocibuild/pkg/cliutil" "github.com/spf13/cobra" source "git.mothstuff.lol/lukeshu/eclipse" "git.mothstuff.lol/lukeshu/eclipse/lib/leader" ) func min[T cmp.Ordered](a, b T) T { if a < b { return a } return b } func keys[K comparable, V any](m map[K]V) []K { ret := make([]K, 0, len(m)) for k := range m { ret = append(ret, k) } return ret } type Set[T comparable] map[T]struct{} func (s Set[T]) Has(v T) bool { _, ok := s[v] return ok } func parallel[T any]( nWorkers int, generator func(yield func(T)), consumer func(T), ) { ch := make(chan T) go func() { defer close(ch) generator(func(v T) { ch <- v }) }() var wg sync.WaitGroup wg.Add(nWorkers) for i := 0; i < nWorkers; i++ { go func() { defer wg.Done() for v := range ch { consumer(v) } }() } wg.Wait() } func Trigger(cfgFile string, allowedRepos Set[string]) (err error) { maybeSetErr := func(_err error) { if err == nil && _err != nil { err = _err } } cfg, err := leader.LoadConfig(cfgFile) if err != nil { return err } actions, err := cfg.Actions() if err != nil { return err } store, err := cfg.DB() if err != nil { return err } defer func() { maybeSetErr(store.Close()) }() repos := map[string]map[string]string{} for _, action := range actions { for _, trigger := range action.Triggers { if len(allowedRepos) > 0 && !allowedRepos.Has(trigger.RepoURL) { continue } if repos[trigger.RepoURL] == nil { repos[trigger.RepoURL] = make(map[string]string, 1) } repos[trigger.RepoURL][trigger.Rev] = "" } } cache := cfg.GitCache() // Fetch. parallel[string](min(cfg.ParallelDownloads, len(repos)), func(yield func(string)) { for repo := range repos { yield(repo) } }, func(repo string) { if err := cache.Fetch(os.Stderr, repo); err != nil { fmt.Fprintf(os.Stderr, "error: updating %q: %v\n", repo, err) } }, ) // Resolve revs. parallel[string](len(repos), func(yield func(string)) { for repo := range repos { yield(repo) } }, func(repo string) { exprs := keys(repos[repo]) hashs, err := cache.RevParse(os.Stderr, repo, exprs...) if err != nil { fmt.Fprintf(os.Stderr, "error: resolving in %q: %v\n", repo, err) return } for i := range hashs { repos[repo][exprs[i]] = hashs[i] } }, ) // Trigger. updatedTriggers := make(map[string]string) for _, action := range actions { var triggerExprs []string var triggerVals []string for _, trigger := range action.Triggers { newVal, ok := repos[trigger.RepoURL][trigger.Rev] if !ok { continue } triggerExprBytes, err := json.Marshal(trigger) if err != nil { return err } triggerExpr := string(triggerExprBytes) oldVal, _ := store.GetTrigger(triggerExpr) if newVal != oldVal { triggerExprs = append(triggerExprs, triggerExpr) triggerVals = append(triggerVals, newVal) updatedTriggers[triggerExpr] = newVal } } if len(triggerExprs) > 0 { jobID, err := store.NewJob(triggerExprs, triggerVals, action.Run) if err != nil { return err } fmt.Printf("created job %v:\n", jobID) for i := range triggerExprs { fmt.Printf("\ttrigger: %s => %q\n", triggerExprs[i], triggerVals[i]) } fmt.Printf("\trun: %q\n", action.Run) } } for expr, val := range updatedTriggers { fmt.Printf("remembering that %s => %q\n", expr, val) if err := store.SetTrigger(expr, val); err != nil { return err } } return nil } func main() { argparser := &cobra.Command{ Use: os.Args[0] + " [flags] [ALLOWED_REPOS...]", Short: "Add new jobs to the leader", SilenceErrors: true, // we'll handle this ourselves after .ExecuteContext() SilenceUsage: true, // FlagErrorFunc will handle this CompletionOptions: cobra.CompletionOptions{ DisableDefaultCmd: true, }, } argparser.SetFlagErrorFunc(cliutil.FlagErrorFunc) argparser.SetHelpTemplate(cliutil.HelpTemplate) var cfgFile string argparser.Flags().StringVar(&cfgFile, "config", source.DefaultLeaderConfigFile, "Config file to use") _ = argparser.MarkFlagFilename("config", "yml", "yaml") argparser.RunE = func(_ *cobra.Command, args []string) error { allowedRepos := make(Set[string], len(args)) for _, repo := range args { allowedRepos[repo] = struct{}{} } return Trigger(cfgFile, allowedRepos) } ctx := context.Background() if err := argparser.ExecuteContext(ctx); err != nil { fmt.Fprintf(os.Stderr, "%v: error: %v\n", argparser.CommandPath(), err) os.Exit(1) } }