183 lines
5.1 KiB
Go
183 lines
5.1 KiB
Go
// Copyright (C) 2024 Umorpha Systems
|
|
// SPDX-License-Identifier: AGPL-3.0-or-later
|
|
|
|
package leader
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
_ "github.com/mattn/go-sqlite3"
|
|
|
|
"git.mothstuff.lol/lukeshu/eclipse/lib/common"
|
|
)
|
|
|
|
type DB struct {
|
|
db *sql.DB
|
|
jobsdir string
|
|
}
|
|
|
|
func (cfg Config) DB() (*DB, error) {
|
|
db, err := sql.Open("sqlite3", filepath.Join(cfg.StateDir, "state.sqlite"))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("statedb: open: %w", err)
|
|
}
|
|
|
|
if _, err := db.Exec(`
|
|
CREATE TABLE IF NOT EXISTS triggers (
|
|
expression TEXT NOT NULL PRIMARY KEY,
|
|
value TEXT NOT NULL
|
|
) STRICT;
|
|
|
|
CREATE TABLE IF NOT EXISTS jobs (
|
|
id INTEGER PRIMARY KEY,
|
|
trigger_expressions TEXT NOT NULL, -- JSON array of strings
|
|
trigger_values TEXT NOT NULL, -- JSON array of strings
|
|
command TEXT NOT NULL,
|
|
status INTEGER,
|
|
created_at INTEGER, -- seconds since the Unix epoch
|
|
updated_at INTEGER -- seconds since the Unix epoch
|
|
) STRICT;
|
|
`); err != nil {
|
|
_ = db.Close()
|
|
return nil, fmt.Errorf("statedb: migrate: %w", err)
|
|
}
|
|
|
|
return &DB{
|
|
db: db,
|
|
jobsdir: filepath.Join(cfg.StateDir, "jobfiles"),
|
|
}, nil
|
|
}
|
|
|
|
// SQL /////////////////////////////////////////////////////////////////////////
|
|
|
|
func (s *DB) Close() error {
|
|
return s.db.Close()
|
|
}
|
|
|
|
func (s *DB) GetTrigger(expr string) (string, error) {
|
|
var val string
|
|
err := s.db.QueryRow(`SELECT value FROM triggers WHERE expression = ?`,
|
|
expr).Scan(&val)
|
|
if err != nil {
|
|
return "", fmt.Errorf("statedb: get trigger: %q: %w", expr, err)
|
|
}
|
|
return val, nil
|
|
}
|
|
|
|
func (s *DB) SetTrigger(expr string, val string) error {
|
|
if _, err := s.db.Exec(`INSERT OR REPLACE INTO triggers(expression, value) VALUES(?, ?)`,
|
|
expr, val); err != nil {
|
|
return fmt.Errorf("statedb: set trigger: %q => %q: %w", expr, val, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *DB) NewJob(triggerExprs []string, triggerVals []string, cmd string) (common.JobID, error) {
|
|
triggerExprsJSON, err := json.Marshal(triggerExprs)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
triggerValsJSON, err := json.Marshal(triggerVals)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
var id common.JobID
|
|
now := time.Now().Unix()
|
|
if err := s.db.QueryRow(`INSERT INTO jobs(trigger_expressions, trigger_values, command, status, created_at, updated_at) VALUES(?, ?, ?, ?, ?, ?) RETURNING id`,
|
|
string(triggerExprsJSON), string(triggerValsJSON), cmd, common.StatusNew, now, now).Scan(&id); err != nil {
|
|
return 0, fmt.Errorf("statedb: create job: %w", err)
|
|
}
|
|
return id, nil
|
|
}
|
|
|
|
func scanJob(row interface{ Scan(...any) error }) (common.Job, error) {
|
|
var job common.Job
|
|
var exprsJSON, valsJSON []byte
|
|
var created, updated int64
|
|
if err := row.Scan(&job.ID, &exprsJSON, &valsJSON, &job.Command, &job.Status, &created, &updated); err != nil {
|
|
return common.Job{}, err
|
|
}
|
|
if err := json.Unmarshal(exprsJSON, &job.TriggerExpressions); err != nil {
|
|
return common.Job{}, err
|
|
}
|
|
if err := json.Unmarshal(valsJSON, &job.TriggerValues); err != nil {
|
|
return common.Job{}, err
|
|
}
|
|
job.CreatedAt = time.Unix(created, 0)
|
|
job.UpdatedAt = time.Unix(updated, 0)
|
|
return job, nil
|
|
}
|
|
|
|
func (s *DB) ListJobs(status common.JobStatus) (_ []common.Job, err error) {
|
|
maybeSetErr := func(_err error) {
|
|
if err == nil && _err != nil {
|
|
err = _err
|
|
}
|
|
}
|
|
|
|
var rows *sql.Rows
|
|
if status < 0 {
|
|
rows, err = s.db.Query(`SELECT * FROM jobs ORDER BY id DESC`)
|
|
} else {
|
|
rows, err = s.db.Query(`SELECT * FROM jobs WHERE status = ? ORDER BY id DESC`, status)
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("statedb: list jobs: %w", err)
|
|
}
|
|
defer func() { maybeSetErr(rows.Close()) }()
|
|
|
|
var jobs []common.Job
|
|
for rows.Next() {
|
|
job, err := scanJob(rows)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("statedb: list jobs: %w", err)
|
|
}
|
|
job.Dir = filepath.Join(s.jobsdir, fmt.Sprintf("%v", job.ID))
|
|
jobs = append(jobs, job)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("statedb: list jobs: %w", err)
|
|
}
|
|
|
|
return jobs, nil
|
|
}
|
|
|
|
func (s *DB) GetJob(id common.JobID) (common.Job, error) {
|
|
job, err := scanJob(s.db.QueryRow(`SELECT * FROM jobs WHERE id = ?`,
|
|
id))
|
|
if err != nil {
|
|
return common.Job{}, fmt.Errorf("statedb: list jobs: %v: %w", id, err)
|
|
}
|
|
job.Dir = filepath.Join(s.jobsdir, fmt.Sprintf("%v", job.ID))
|
|
return job, nil
|
|
}
|
|
|
|
func (s *DB) SwapJobStatus(id common.JobID, oldStatus, newStatus common.JobStatus) (ok bool, _ error) {
|
|
now := time.Now().Unix()
|
|
res, err := s.db.Exec(`UPDATE jobs SET status = ?, updated_at = ? WHERE id = ? and status = ?`,
|
|
newStatus, now, id, oldStatus)
|
|
if err != nil {
|
|
return false, fmt.Errorf("statedb: update job: %v: %v => %v: %w", id, oldStatus, newStatus, err)
|
|
}
|
|
n, err := res.RowsAffected()
|
|
if err != nil {
|
|
return false, fmt.Errorf("statedb: update job: %v: %v => %v: %w", id, oldStatus, newStatus, err)
|
|
}
|
|
return n != 0, nil
|
|
}
|
|
|
|
func (s *DB) GCRunningJobs() error {
|
|
now := time.Now().Unix()
|
|
if _, err := s.db.Exec(`UPDATE jobs SET status = ?, updated_at = ? WHERE status = ?`,
|
|
common.StatusError, now, common.StatusRunning); err != nil {
|
|
return fmt.Errorf("statedb: gc running jobs: %w", err)
|
|
}
|
|
return nil
|
|
}
|