eclipse/cmd/eclipse-trigger/main.go

259 lines
5.3 KiB
Go

// Copyright (C) 2024 Umorpha Systems
// SPDX-License-Identifier: AGPL-3.0-or-later
package main
import (
"cmp"
"context"
"encoding"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"slices"
"sync"
"time"
"github.com/datawire/ocibuild/pkg/cliutil"
"github.com/spf13/cobra"
"sigs.k8s.io/yaml"
"git.mothstuff.lol/lukeshu/eclipse/lib/gitcache"
)
type Duration struct {
time.Duration
}
var _ encoding.TextUnmarshaler = (*Duration)(nil)
func (d *Duration) UnmarshalText(text []byte) error {
var err error
d.Duration, err = time.ParseDuration(string(text))
return err
}
type Config struct {
Daemon struct {
StateDir string `json:"state_dir"`
GitCacheDir string `json:"cache_git_dir"`
GitCacheFor Duration `json:"cache_git_for"`
ParallelDownloads int `json:"parallel_downloads"`
} `json:"daemon"`
Actions []Action `json:"actions"`
}
type Action struct {
Triggers []GitTrigger `json:"triggers"`
Run string `json:"run"`
}
type GitTrigger struct {
RepoURL string `json:"repo_url"`
Rev string `json:"rev"`
}
func min[T cmp.Ordered](a, b T) T {
if a < b {
return a
}
return b
}
func max[T cmp.Ordered](a, b T) T {
if a > b {
return a
}
return b
}
func keys[K comparable, V any](m map[K]V) []K {
ret := make([]K, 0, len(m))
for k := range m {
ret = append(ret, k)
}
return ret
}
func sortedKeys[K cmp.Ordered, V any](m map[K]V) []K {
ret := keys(m)
slices.Sort(ret)
return ret
}
type Set[T comparable] map[T]struct{}
func (s Set[T]) Has(v T) bool {
_, ok := s[v]
return ok
}
func parallel[T any](
nWorkers int,
generator func(yield func(T)),
consumer func(T),
) {
ch := make(chan T)
go func() {
defer close(ch)
generator(func(v T) { ch <- v })
}()
var wg sync.WaitGroup
wg.Add(nWorkers)
for i := 0; i < nWorkers; i++ {
go func() {
defer wg.Done()
for v := range ch {
consumer(v)
}
}()
}
wg.Wait()
}
func Trigger(allowedRepos Set[string]) error {
cfgBytes, err := io.ReadAll(os.Stdin)
if err != nil {
return err
}
var cfg Config
if err := yaml.UnmarshalStrict(cfgBytes, &cfg); err != nil {
return err
}
store, err := OpenStore(filepath.Join(cfg.Daemon.StateDir, "state.sqlite"))
if err != nil {
return err
}
repos := map[string]map[string]string{}
for _, action := range cfg.Actions {
for _, trigger := range action.Triggers {
if len(allowedRepos) > 0 && !allowedRepos.Has(trigger.RepoURL) {
continue
}
if repos[trigger.RepoURL] == nil {
repos[trigger.RepoURL] = make(map[string]string, 1)
}
repos[trigger.RepoURL][trigger.Rev] = ""
}
}
cache := &gitcache.Cache{
Dir: cfg.Daemon.GitCacheDir,
MinPeriod: cfg.Daemon.GitCacheFor.Duration,
}
// Fetch.
parallel[string](min(cfg.Daemon.ParallelDownloads, len(repos)),
func(yield func(string)) {
for repo := range repos {
yield(repo)
}
},
func(repo string) {
if err := cache.Fetch(os.Stderr, repo); err != nil {
fmt.Fprintf(os.Stderr, "error: updating %q: %v\n", repo, err)
}
},
)
// Resolve revs.
parallel[string](len(repos),
func(yield func(string)) {
for repo := range repos {
yield(repo)
}
},
func(repo string) {
exprs := keys(repos[repo])
hashs, err := cache.RevParse(os.Stderr, repo, exprs...)
if err != nil {
fmt.Fprintf(os.Stderr, "error: resolving in %q: %v\n", repo, err)
return
}
for i := range hashs {
repos[repo][exprs[i]] = hashs[i]
}
},
)
// Trigger.
updatedTriggers := make(map[string]string)
for _, action := range cfg.Actions {
var triggerExprs []string
var triggerVals []string
for _, trigger := range action.Triggers {
newVal, ok := repos[trigger.RepoURL][trigger.Rev]
if !ok {
continue
}
triggerExprBytes, err := json.Marshal(trigger)
if err != nil {
return err
}
triggerExpr := string(triggerExprBytes)
oldVal, _ := store.GetTrigger(triggerExpr)
if newVal != oldVal {
triggerExprs = append(triggerExprs, triggerExpr)
triggerVals = append(triggerVals, newVal)
updatedTriggers[triggerExpr] = newVal
}
}
if len(triggerExprs) > 0 {
jobID, err := store.NewJob(triggerExprs, triggerVals, action.Run)
if err != nil {
return err
}
fmt.Printf("created job %v:\n", jobID)
for i := range triggerExprs {
fmt.Printf("\ttrigger: %s => %q\n", triggerExprs[i], triggerVals[i])
}
fmt.Printf("\trun: %q\n", action.Run)
}
}
for expr, val := range updatedTriggers {
fmt.Printf("remembering that %s => %q\n", expr, val)
if err := store.SetTrigger(expr, val); err != nil {
return err
}
}
return nil
}
func main() {
argparser := &cobra.Command{
Use: os.Args[0] + " [flags] <config.yml",
Short: "Trigger actions",
RunE: func(_ *cobra.Command, args []string) error {
allowedRepos := make(Set[string], len(args))
for _, repo := range args {
allowedRepos[repo] = struct{}{}
}
return Trigger(allowedRepos)
},
SilenceErrors: true, // we'll handle this ourselves after .ExecuteContext()
SilenceUsage: true, // FlagErrorFunc will handle this
CompletionOptions: cobra.CompletionOptions{
DisableDefaultCmd: true,
},
}
argparser.SetFlagErrorFunc(cliutil.FlagErrorFunc)
argparser.SetHelpTemplate(cliutil.HelpTemplate)
ctx := context.Background()
if err := argparser.ExecuteContext(ctx); err != nil {
fmt.Fprintf(os.Stderr, "%v: error: %v\n", argparser.CommandPath(), err)
os.Exit(1)
}
}