go-scopes/storage/tracking/tracking.go

328 lines
7 KiB
Go

// Package `tracking` defines a generic track (sort of record) type
// and a container type that allows storing of tracks in a SQL database.
package tracking
import (
sqllib "database/sql"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
lib "git.sr.ht/~cco/go-scopes"
"git.sr.ht/~cco/go-scopes/logging/log"
"git.sr.ht/~cco/go-scopes/storage"
sql "git.sr.ht/~cco/go-scopes/storage"
)
type ContFactory func(*sql.Storage) *Container
type ItemFactory func(*Container, ...string) *Track
// basic track implementation
type Track struct {
trackId lib.Ident
Head lib.StrMap
TimeStamp *time.Time
Data lib.Map
container *Container
}
func (tr *Track) TrackId() lib.Ident {
return tr.trackId
}
func (tr *Track) Container() *Container {
return tr.container
}
func (tr *Track) Uid() string {
return fmt.Sprintf("%s-%d", tr.container.Prefix, tr.trackId)
}
func MakeTrack(cont *Container, h ...string) *Track {
tr := Track{
Head: lib.StrMap{},
container: cont,
}
tr.SetHead(h...)
return &tr
}
func (tr *Track) SetHead(h ...string) {
for i, k := range tr.container.HeadFields {
if i >= len(h) {
break
}
if h[i] != "" {
tr.Head[k] = h[i]
}
}
}
func (tr *Track) ScanP(rows *sql.Rows) error {
tr.Head = lib.StrMap{}
var d []any
for range tr.container.HeadFields {
var hv string
d = append(d, &hv)
}
var ts, rd string
d = append(d, &ts, &rd, &tr.trackId)
err := rows.Scan(d...)
for i, k := range tr.container.HeadFields {
tr.Head[k] = *d[i].(*string)
}
tr.TimeStamp = ParseDateTime(ts)
err = json.Unmarshal([]byte(rd), &tr.Data)
if err != nil {
log.Error(err).Msg("storage.tracking.ScanP")
}
return err
}
// basic container implementation
type ContDef struct {
Prefix string
ContFactory ContFactory
ItemFactory ItemFactory
TableName string
HeadFields lib.StrSlice
IdFields lib.StrSlice
Indexes []lib.StrSlice
InsertOnChange bool
}
type Container struct {
*ContDef
Storage *sql.Storage
}
func Tracks(db *sql.Storage) *Container {
return &Container{container_definition, db}
}
type querySpec struct {
Table string
Headvals lib.StrSlice
Scols lib.StrSlice
Qucols, Ordcols lib.StrSlice
Quspecs []struct{ Col, Op string }
Ordspecs []ordSpec
Limit int
Quvals []any
Cont *Container
}
type ordSpec struct {
Col string
Desc bool
}
func (spec *querySpec) AddQu(col, op string) {
spec.Quspecs = append(spec.Quspecs, struct{ Col, Op string }{col, op})
}
func (spec *querySpec) AddOrd(col string, desc bool) {
spec.Ordspecs = append(spec.Ordspecs, ordSpec{col, desc})
}
func (spec *querySpec) setup(cont *Container) {
if cont.Storage.Schema == "" {
spec.Table = cont.TableName
} else {
spec.Table = fmt.Sprintf("%s.%s", cont.Storage.Schema, cont.TableName)
}
if spec.Scols == nil {
spec.Scols = append(cont.HeadFields, "timestamp", "data", "trackid")
}
for i, v := range spec.Headvals {
if v != "" {
spec.Qucols = append(spec.Qucols, cont.HeadFields[i])
spec.Quvals = append(spec.Quvals, v)
}
}
for _, c := range spec.Qucols {
spec.AddQu(c, "=")
}
for _, c := range spec.Ordcols {
spec.AddOrd(c, false)
}
spec.Cont = cont
}
func (cont *Container) Get(id lib.Ident) *Track {
quSpec := &querySpec{
Qucols: lib.StrSlice{"trackid"},
Quvals: []any{id},
}
return cont.QueryOne(quSpec)
}
func (cont *Container) QueryLast(hv ...string) *Track {
quSpec := &querySpec{
Headvals: hv,
Limit: 1,
}
quSpec.AddOrd("timestamp", true)
return cont.QueryOne(quSpec)
}
func (cont *Container) QueryOne(quSpec *querySpec) *Track {
var tr *Track
proc := func(r *sql.Rows) error {
tr = cont.ItemFactory(cont)
return tr.ScanP(r)
}
quSpec.setup(cont)
sql := storage.BuildSql(SqlSelect, quSpec)
cont.Storage.Query(proc, sql, quSpec.Quvals...)
return tr
}
func (cont *Container) Query(quSpec *querySpec) []*Track {
var trs []*Track
proc := func(r *sql.Rows) error {
tr := cont.ItemFactory(cont)
err := tr.ScanP(r)
if err == nil {
trs = append(trs, tr)
}
return err
}
quSpec.setup(cont)
sql := storage.BuildSql(SqlSelect, quSpec)
cont.Storage.Query(proc, sql, quSpec.Quvals...)
return trs
}
func (cont *Container) NewTrack(h []string, data lib.Map) *Track {
tr := cont.ItemFactory(cont, h...)
tr.Data = data
cont.Insert(tr)
return tr
}
func (cont *Container) Save(tr *Track) *Track {
return tr
}
func (cont *Container) Insert(tr *Track) *Track {
quSpec := &querySpec{
Scols: append(cont.HeadFields, "Data"),
}
quSpec.setup(cont)
sql := storage.BuildSql(SqlInsert, quSpec)
var values []any
for _, k := range cont.HeadFields {
values = append(values, tr.Head[k])
}
b, _ := json.Marshal(tr.Data)
values = append(values, b)
var tsstr string
proc := func(r *sqllib.Rows) error {
err := r.Scan(&tr.trackId, &tsstr)
tr.TimeStamp = ParseDateTime(tsstr)
return err
}
if err := cont.Storage.Query(proc, sql, values...); err == nil {
return tr
}
return nil
}
func (cont *Container) Update(tr *Track) *Track {
if tr.TimeStamp == nil {
ts := time.Now()
tr.TimeStamp = &ts
}
quSpec := &querySpec{
Scols: append(cont.HeadFields, "timestamp", "data"),
}
quSpec.setup(cont)
sql := storage.BuildSql(SqlUpdate, quSpec)
vals := []any{tr.trackId}
for _, k := range cont.HeadFields {
vals = append(vals, tr.Head[k])
}
vals = append(vals, tr.TimeStamp)
b, _ := json.Marshal(tr.Data)
vals = append(vals, b)
n, _ := cont.Storage.Exec(sql, vals...)
if n == 1 {
return tr
}
return nil
}
func ParseDateTime(inp string) *time.Time {
ts, err := time.Parse(time.RFC3339, inp)
if err == nil {
return &ts
}
ts, err = time.Parse("2006-01-02 15:04:05", inp)
if err == nil {
return &ts
}
ts, err = time.Parse("2006-01-02 15:04:05 -0700 MST", inp)
if err == nil {
return &ts
}
log.Error(err).Msg("storage.tracking.ParseDateTime")
return nil
}
func (cont *Container) CreateTable() {
spec := querySpec{}
spec.setup(cont)
sql := storage.BuildSql(SqlCreate, spec)
if _, err := cont.Storage.Exec(sql); err != nil {
panic(err)
}
}
// container registration
var container_registry map[string]*ContDef
func RegisterContainerDef(cd *ContDef) {
container_registry[cd.Prefix] = cd
}
func GetContainerDef(p string) *ContDef {
return container_registry[p]
}
func Get(db *sql.Storage, uid string) *Track {
parts := strings.Split(uid, "-")
cd := GetContainerDef(parts[0])
cont := cd.ContFactory(db)
id, _ := strconv.ParseUint(parts[1], 10, 64)
return cont.Get(lib.Ident(id))
}
// container definition for the tracking package
var container_definition *ContDef
const type_prefix = "rec"
func init() {
container_registry = map[string]*ContDef{}
hf := lib.StrSlice{"taskId", "userName"}
ixs := []lib.StrSlice{hf}
ixs = append(ixs, lib.StrSlice{"userName"})
container_definition = &ContDef{
Prefix: type_prefix,
ContFactory: Tracks,
ItemFactory: MakeTrack,
TableName: "tracks",
HeadFields: hf,
IdFields: hf,
Indexes: ixs,
InsertOnChange: true,
}
RegisterContainerDef(container_definition)
}