eclipse/cmd/eclipse-trigger/main.go

216 lines
4.6 KiB
Go
Raw Permalink Normal View History

2024-01-13 02:31:32 +00:00
// Copyright (C) 2024 Umorpha Systems
// SPDX-License-Identifier: AGPL-3.0-or-later
package main
import (
"cmp"
"context"
2024-01-17 17:32:48 +00:00
"encoding/json"
2024-01-13 02:31:32 +00:00
"fmt"
"os"
"sync"
"github.com/datawire/ocibuild/pkg/cliutil"
"github.com/spf13/cobra"
2024-01-18 05:49:09 +00:00
source "git.mothstuff.lol/lukeshu/eclipse"
"git.mothstuff.lol/lukeshu/eclipse/lib/leader"
2024-01-13 02:31:32 +00:00
)
func min[T cmp.Ordered](a, b T) T {
if a < b {
return a
}
return b
}
func keys[K comparable, V any](m map[K]V) []K {
ret := make([]K, 0, len(m))
for k := range m {
ret = append(ret, k)
}
return ret
}
2024-01-13 02:35:59 +00:00
type Set[T comparable] map[T]struct{}
func (s Set[T]) Has(v T) bool {
_, ok := s[v]
return ok
}
2024-01-13 02:31:32 +00:00
func parallel[T any](
nWorkers int,
generator func(yield func(T)),
consumer func(T),
) {
ch := make(chan T)
go func() {
defer close(ch)
generator(func(v T) { ch <- v })
}()
var wg sync.WaitGroup
wg.Add(nWorkers)
for i := 0; i < nWorkers; i++ {
go func() {
defer wg.Done()
for v := range ch {
consumer(v)
}
}()
}
wg.Wait()
}
2024-01-17 21:52:40 +00:00
func Trigger(cfgFile string, allowedRepos Set[string]) (err error) {
maybeSetErr := func(_err error) {
if err == nil && _err != nil {
err = _err
}
}
cfg, err := leader.LoadConfig(cfgFile)
2024-01-13 02:31:32 +00:00
if err != nil {
return err
}
2024-01-18 05:49:09 +00:00
actions, err := cfg.Actions()
if err != nil {
return err
}
store, err := cfg.DB()
2024-01-17 17:32:48 +00:00
if err != nil {
return err
}
2024-01-17 21:52:40 +00:00
defer func() { maybeSetErr(store.Close()) }()
2024-01-17 17:32:48 +00:00
2024-01-13 02:31:32 +00:00
repos := map[string]map[string]string{}
2024-01-18 05:49:09 +00:00
for _, action := range actions {
2024-01-13 02:31:32 +00:00
for _, trigger := range action.Triggers {
2024-01-13 02:35:59 +00:00
if len(allowedRepos) > 0 && !allowedRepos.Has(trigger.RepoURL) {
continue
}
2024-01-13 02:31:32 +00:00
if repos[trigger.RepoURL] == nil {
repos[trigger.RepoURL] = make(map[string]string, 1)
}
repos[trigger.RepoURL][trigger.Rev] = ""
}
}
2024-01-18 05:49:09 +00:00
cache := cfg.GitCache()
2024-01-13 02:31:32 +00:00
// Fetch.
2024-01-18 05:49:09 +00:00
parallel[string](min(cfg.ParallelDownloads, len(repos)),
2024-01-13 02:31:32 +00:00
func(yield func(string)) {
for repo := range repos {
yield(repo)
}
},
func(repo string) {
if err := cache.Fetch(os.Stderr, repo); err != nil {
fmt.Fprintf(os.Stderr, "error: updating %q: %v\n", repo, err)
}
},
)
// Resolve revs.
parallel[string](len(repos),
func(yield func(string)) {
for repo := range repos {
yield(repo)
}
},
func(repo string) {
exprs := keys(repos[repo])
hashs, err := cache.RevParse(os.Stderr, repo, exprs...)
if err != nil {
fmt.Fprintf(os.Stderr, "error: resolving in %q: %v\n", repo, err)
return
}
for i := range hashs {
repos[repo][exprs[i]] = hashs[i]
}
},
)
2024-01-17 17:32:48 +00:00
// Trigger.
updatedTriggers := make(map[string]string)
2024-01-18 05:49:09 +00:00
for _, action := range actions {
2024-01-17 17:32:48 +00:00
var triggerExprs []string
var triggerVals []string
for _, trigger := range action.Triggers {
newVal, ok := repos[trigger.RepoURL][trigger.Rev]
if !ok {
continue
}
triggerExprBytes, err := json.Marshal(trigger)
if err != nil {
return err
}
triggerExpr := string(triggerExprBytes)
oldVal, _ := store.GetTrigger(triggerExpr)
if newVal != oldVal {
triggerExprs = append(triggerExprs, triggerExpr)
triggerVals = append(triggerVals, newVal)
updatedTriggers[triggerExpr] = newVal
}
}
if len(triggerExprs) > 0 {
jobID, err := store.NewJob(triggerExprs, triggerVals, action.Run)
if err != nil {
return err
}
fmt.Printf("created job %v:\n", jobID)
for i := range triggerExprs {
fmt.Printf("\ttrigger: %s => %q\n", triggerExprs[i], triggerVals[i])
}
fmt.Printf("\trun: %q\n", action.Run)
}
}
for expr, val := range updatedTriggers {
fmt.Printf("remembering that %s => %q\n", expr, val)
if err := store.SetTrigger(expr, val); err != nil {
return err
2024-01-13 02:31:32 +00:00
}
}
2024-01-17 17:32:48 +00:00
2024-01-13 02:31:32 +00:00
return nil
}
func main() {
argparser := &cobra.Command{
2024-01-18 07:43:02 +00:00
Use: os.Args[0] + " [flags] [ALLOWED_REPOS...]",
2024-01-20 21:08:44 +00:00
Short: "Add new jobs to the leader",
2024-01-13 02:31:32 +00:00
SilenceErrors: true, // we'll handle this ourselves after .ExecuteContext()
SilenceUsage: true, // FlagErrorFunc will handle this
CompletionOptions: cobra.CompletionOptions{
DisableDefaultCmd: true,
},
}
2024-01-13 02:40:04 +00:00
argparser.SetFlagErrorFunc(cliutil.FlagErrorFunc)
argparser.SetHelpTemplate(cliutil.HelpTemplate)
2024-01-13 02:31:32 +00:00
2024-01-17 21:11:23 +00:00
var cfgFile string
2024-01-19 21:02:27 +00:00
argparser.Flags().StringVar(&cfgFile, "config", source.DefaultLeaderConfigFile,
2024-01-17 21:11:23 +00:00
"Config file to use")
_ = argparser.MarkFlagFilename("config", "yml", "yaml")
2024-01-17 21:11:23 +00:00
argparser.RunE = func(_ *cobra.Command, args []string) error {
allowedRepos := make(Set[string], len(args))
for _, repo := range args {
allowedRepos[repo] = struct{}{}
}
return Trigger(cfgFile, allowedRepos)
}
2024-01-13 02:31:32 +00:00
2024-01-17 21:11:23 +00:00
ctx := context.Background()
2024-01-13 02:31:32 +00:00
if err := argparser.ExecuteContext(ctx); err != nil {
fmt.Fprintf(os.Stderr, "%v: error: %v\n", argparser.CommandPath(), err)
os.Exit(1)
}
}