eclipse/cmd/eclipse-trigger/main.go

215 lines
4.7 KiB
Go

// Copyright (C) 2024 Umorpha Systems
// SPDX-License-Identifier: AGPL-3.0-or-later
package main
import (
"cmp"
"context"
"encoding/json"
"fmt"
"os"
"runtime"
"sync"
"github.com/datawire/ocibuild/pkg/cliutil"
"github.com/spf13/cobra"
source "git.mothstuff.lol/lukeshu/eclipse"
"git.mothstuff.lol/lukeshu/eclipse/lib/leader"
)
func min[T cmp.Ordered](a, b T) T {
if a < b {
return a
}
return b
}
func keys[K comparable, V any](m map[K]V) []K {
ret := make([]K, 0, len(m))
for k := range m {
ret = append(ret, k)
}
return ret
}
type Set[T comparable] map[T]struct{}
func (s Set[T]) Has(v T) bool {
_, ok := s[v]
return ok
}
func parallel[T any](
nWorkers int,
generator func(yield func(T)),
consumer func(T),
) {
ch := make(chan T)
go func() {
defer close(ch)
generator(func(v T) { ch <- v })
}()
var wg sync.WaitGroup
wg.Add(nWorkers)
for i := 0; i < nWorkers; i++ {
go func() {
defer wg.Done()
for v := range ch {
consumer(v)
}
}()
}
wg.Wait()
}
func Trigger(cfgFile string, allowedRepos Set[string]) (err error) {
maybeSetErr := func(_err error) {
if err == nil && _err != nil {
err = _err
}
}
cfg, err := leader.LoadConfig(cfgFile)
if err != nil {
return err
}
actions, err := cfg.Actions()
if err != nil {
return err
}
store, err := cfg.DB()
if err != nil {
return err
}
defer func() { maybeSetErr(store.Close()) }()
revs := make(map[string]map[string]map[string]string) // map["url"]map["revglob"]map["revexpr"]"hash"
for _, action := range actions {
for _, trigger := range action.Triggers {
if len(allowedRepos) > 0 && !allowedRepos.Has(trigger.RepoURL) {
continue
}
if revs[trigger.RepoURL] == nil {
revs[trigger.RepoURL] = make(map[string]map[string]string, 1)
}
revs[trigger.RepoURL][trigger.Rev] = nil
}
}
cache := cfg.GitCache()
// Fetch.
parallel[string](min(cfg.ParallelDownloads, len(revs)),
func(yield func(string)) {
for url := range revs {
yield(url)
}
},
func(url string) {
if err := cache.Fetch(os.Stderr, url); err != nil {
fmt.Fprintf(os.Stderr, "error: updating %q: %v\n", url, err)
}
},
)
// Resolve revs.
parallel[string](min(runtime.GOMAXPROCS(0), len(revs)),
func(yield func(string)) {
for url := range revs {
yield(url)
}
},
func(url string) {
matches, err := cache.RevParse(os.Stderr, url, keys(revs[url])...)
if err != nil {
fmt.Fprintf(os.Stderr, "error: resolving in %q: %v\n", url, err)
return
}
for k, v := range matches {
revs[url][k] = v
}
},
)
// Trigger.
updatedTriggers := make(map[string]string)
for _, action := range actions {
var triggerExprs []string
var triggerVals []string
for _, trigger := range action.Triggers {
for revexpr, newVal := range revs[trigger.RepoURL][trigger.Rev] {
trigger.Rev = revexpr
triggerExprBytes, err := json.Marshal(trigger)
if err != nil {
return err
}
triggerExpr := string(triggerExprBytes)
oldVal, _ := store.GetTrigger(triggerExpr)
if newVal != oldVal {
triggerExprs = append(triggerExprs, triggerExpr)
triggerVals = append(triggerVals, newVal)
updatedTriggers[triggerExpr] = newVal
}
}
}
if len(triggerExprs) > 0 {
jobID, err := store.NewJob(triggerExprs, triggerVals, action.Run)
if err != nil {
return err
}
fmt.Printf("created job %v:\n", jobID)
for i := range triggerExprs {
fmt.Printf("\ttrigger: %s => %q\n", triggerExprs[i], triggerVals[i])
}
fmt.Printf("\trun: %q\n", action.Run)
}
}
for expr, val := range updatedTriggers {
fmt.Printf("remembering that %s => %q\n", expr, val)
if err := store.SetTrigger(expr, val); err != nil {
return err
}
}
return nil
}
func main() {
argparser := &cobra.Command{
Use: os.Args[0] + " [flags] [ALLOWED_REPOS...]",
Short: "Add new jobs to the leader",
SilenceErrors: true, // we'll handle this ourselves after .ExecuteContext()
SilenceUsage: true, // FlagErrorFunc will handle this
CompletionOptions: cobra.CompletionOptions{
DisableDefaultCmd: true,
},
}
argparser.SetFlagErrorFunc(cliutil.FlagErrorFunc)
argparser.SetHelpTemplate(cliutil.HelpTemplate)
var cfgFile string
argparser.Flags().StringVar(&cfgFile, "config", source.DefaultLeaderConfigFile,
"Config file to use")
_ = argparser.MarkFlagFilename("config", "yml", "yaml")
argparser.RunE = func(_ *cobra.Command, args []string) error {
allowedRepos := make(Set[string], len(args))
for _, repo := range args {
allowedRepos[repo] = struct{}{}
}
return Trigger(cfgFile, allowedRepos)
}
ctx := context.Background()
if err := argparser.ExecuteContext(ctx); err != nil {
fmt.Fprintf(os.Stderr, "%v: error: %v\n", argparser.CommandPath(), err)
os.Exit(1)
}
}