eclipse/lib/leader/db.go

186 lines
5.2 KiB
Go
Raw Permalink Normal View History

2024-01-17 17:32:48 +00:00
// Copyright (C) 2024 Umorpha Systems
// SPDX-License-Identifier: AGPL-3.0-or-later
package leader
2024-01-17 17:32:48 +00:00
import (
"database/sql"
"encoding/json"
"fmt"
2024-01-18 05:49:09 +00:00
"path/filepath"
2024-01-18 19:08:10 +00:00
"time"
2024-01-17 17:32:48 +00:00
_ "github.com/mattn/go-sqlite3"
"git.mothstuff.lol/lukeshu/eclipse/lib/common"
2024-01-17 17:32:48 +00:00
)
2024-01-18 05:49:09 +00:00
type DB struct {
db *sql.DB
jobsdir string
2024-01-17 17:32:48 +00:00
}
2024-01-18 05:49:09 +00:00
func (cfg Config) DB() (*DB, error) {
db, err := sql.Open("sqlite3", filepath.Join(cfg.StateDir, "state.sqlite"))
if err != nil {
return nil, fmt.Errorf("statedb: open: %w", err)
}
if _, err := db.Exec(`
CREATE TABLE IF NOT EXISTS triggers (
expression TEXT NOT NULL PRIMARY KEY,
value TEXT NOT NULL
2024-01-18 19:08:10 +00:00
) STRICT;
2024-01-18 05:49:09 +00:00
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY,
trigger_expressions TEXT NOT NULL, -- JSON array of strings
trigger_values TEXT NOT NULL, -- JSON array of strings
2024-01-23 04:10:47 +00:00
command TEXT NOT NULL,
status INTEGER,
status_msg TEXT,
2024-01-18 19:08:10 +00:00
created_at INTEGER, -- seconds since the Unix epoch
updated_at INTEGER -- seconds since the Unix epoch
) STRICT;
2024-01-18 05:49:09 +00:00
`); err != nil {
_ = db.Close()
2024-01-18 05:49:09 +00:00
return nil, fmt.Errorf("statedb: migrate: %w", err)
}
return &DB{
db: db,
jobsdir: filepath.Join(cfg.StateDir, "jobfiles"),
}, nil
}
// SQL /////////////////////////////////////////////////////////////////////////
func (s *DB) Close() error {
2024-01-17 17:32:48 +00:00
return s.db.Close()
}
2024-01-18 05:49:09 +00:00
func (s *DB) GetTrigger(expr string) (string, error) {
2024-01-17 17:32:48 +00:00
var val string
err := s.db.QueryRow(`SELECT value FROM triggers WHERE expression = ?`,
expr).Scan(&val)
if err != nil {
return "", fmt.Errorf("statedb: get trigger: %q: %w", expr, err)
}
return val, nil
}
2024-01-18 05:49:09 +00:00
func (s *DB) SetTrigger(expr string, val string) error {
2024-01-17 17:32:48 +00:00
if _, err := s.db.Exec(`INSERT OR REPLACE INTO triggers(expression, value) VALUES(?, ?)`,
expr, val); err != nil {
return fmt.Errorf("statedb: set trigger: %q => %q: %w", expr, val, err)
}
return nil
}
func (s *DB) NewJob(triggerExprs []string, triggerVals []string, cmd string) (common.JobID, error) {
2024-01-17 17:32:48 +00:00
triggerExprsJSON, err := json.Marshal(triggerExprs)
if err != nil {
return 0, err
}
triggerValsJSON, err := json.Marshal(triggerVals)
if err != nil {
return 0, err
}
var id common.JobID
2024-01-18 19:08:10 +00:00
now := time.Now().Unix()
2024-01-18 21:17:26 +00:00
if err := s.db.QueryRow(`INSERT INTO jobs(trigger_expressions, trigger_values, command, status, created_at, updated_at) VALUES(?, ?, ?, ?, ?, ?) RETURNING id`,
string(triggerExprsJSON), string(triggerValsJSON), cmd, common.StatusNew, now, now).Scan(&id); err != nil {
2024-01-17 17:32:48 +00:00
return 0, fmt.Errorf("statedb: create job: %w", err)
}
return id, nil
}
2024-01-18 01:08:22 +00:00
func scanJob(row interface{ Scan(...any) error }) (common.Job, error) {
var job common.Job
2024-01-18 04:15:20 +00:00
var exprsJSON, valsJSON []byte
2024-01-23 20:02:32 +00:00
var statusMsg sql.NullString
2024-01-18 19:08:10 +00:00
var created, updated int64
2024-01-23 20:02:32 +00:00
if err := row.Scan(&job.ID, &exprsJSON, &valsJSON, &job.Command, &job.Status, &statusMsg, &created, &updated); err != nil {
return common.Job{}, err
2024-01-18 04:15:20 +00:00
}
2024-01-23 20:02:32 +00:00
job.StatusMsg = statusMsg.String
2024-01-18 04:15:20 +00:00
if err := json.Unmarshal(exprsJSON, &job.TriggerExpressions); err != nil {
return common.Job{}, err
2024-01-18 04:15:20 +00:00
}
if err := json.Unmarshal(valsJSON, &job.TriggerValues); err != nil {
return common.Job{}, err
2024-01-18 04:15:20 +00:00
}
2024-01-28 00:18:22 +00:00
job.CreatedAt = time.Unix(created, 0).UTC()
job.UpdatedAt = time.Unix(updated, 0).UTC()
2024-01-18 04:15:20 +00:00
return job, nil
}
func (s *DB) ListJobs(status common.JobStatus) (_ []common.Job, err error) {
maybeSetErr := func(_err error) {
if err == nil && _err != nil {
err = _err
}
}
var rows *sql.Rows
if status < 0 {
rows, err = s.db.Query(`SELECT * FROM jobs ORDER BY id DESC`)
} else {
rows, err = s.db.Query(`SELECT * FROM jobs WHERE status = ? ORDER BY id DESC`, status)
}
2024-01-18 01:08:22 +00:00
if err != nil {
2024-01-18 04:15:20 +00:00
return nil, fmt.Errorf("statedb: list jobs: %w", err)
2024-01-18 01:08:22 +00:00
}
defer func() { maybeSetErr(rows.Close()) }()
2024-01-18 01:08:22 +00:00
var jobs []common.Job
2024-01-18 01:08:22 +00:00
for rows.Next() {
2024-01-18 04:15:20 +00:00
job, err := scanJob(rows)
if err != nil {
return nil, fmt.Errorf("statedb: list jobs: %w", err)
2024-01-18 01:08:22 +00:00
}
2024-01-18 05:49:09 +00:00
job.Dir = filepath.Join(s.jobsdir, fmt.Sprintf("%v", job.ID))
2024-01-18 01:08:22 +00:00
jobs = append(jobs, job)
}
if err := rows.Err(); err != nil {
2024-01-18 04:15:20 +00:00
return nil, fmt.Errorf("statedb: list jobs: %w", err)
2024-01-18 01:08:22 +00:00
}
return jobs, nil
}
2024-01-18 04:15:20 +00:00
func (s *DB) GetJob(id common.JobID) (common.Job, error) {
2024-01-18 19:08:10 +00:00
job, err := scanJob(s.db.QueryRow(`SELECT * FROM jobs WHERE id = ?`,
2024-01-18 04:15:20 +00:00
id))
if err != nil {
return common.Job{}, fmt.Errorf("statedb: list jobs: %v: %w", id, err)
2024-01-18 04:15:20 +00:00
}
2024-01-18 05:49:09 +00:00
job.Dir = filepath.Join(s.jobsdir, fmt.Sprintf("%v", job.ID))
2024-01-18 04:15:20 +00:00
return job, nil
}
2024-01-18 19:08:10 +00:00
2024-01-23 04:10:47 +00:00
func (s *DB) SwapJobStatus(id common.JobID, oldStatus, newStatus common.JobStatus, statusMsg string) (ok bool, _ error) {
2024-01-18 19:08:10 +00:00
now := time.Now().Unix()
2024-01-23 04:10:47 +00:00
res, err := s.db.Exec(`UPDATE jobs SET status = ?, status_msg = ?, updated_at = ? WHERE id = ? and status = ?`,
newStatus, statusMsg, now, id, oldStatus)
if err != nil {
2024-01-23 04:10:47 +00:00
return false, fmt.Errorf("statedb: update job: %v: %v => %v (%q): %w", id, oldStatus, newStatus, statusMsg, err)
2024-01-18 19:08:10 +00:00
}
n, err := res.RowsAffected()
if err != nil {
2024-01-23 04:10:47 +00:00
return false, fmt.Errorf("statedb: update job: %v: %v => %v (%q): %w", id, oldStatus, newStatus, statusMsg, err)
2024-01-18 19:08:10 +00:00
}
return n != 0, nil
}
2024-01-18 21:17:26 +00:00
func (s *DB) GCRunningJobs() error {
now := time.Now().Unix()
if _, err := s.db.Exec(`UPDATE jobs SET status = ?, updated_at = ? WHERE status = ?`,
common.StatusError, now, common.StatusRunning); err != nil {
2024-01-18 21:17:26 +00:00
return fmt.Errorf("statedb: gc running jobs: %w", err)
}
return nil
}