374 lines
7.7 KiB
Go
374 lines
7.7 KiB
Go
// Package `tracking` defines a generic track (sort of record) type
|
|
// and a container type that allows storing of tracks in a SQL database.
|
|
package tracking
|
|
|
|
import (
|
|
sqllib "database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
lib "git.sr.ht/~cco/go-scopes"
|
|
"git.sr.ht/~cco/go-scopes/logging/log"
|
|
"git.sr.ht/~cco/go-scopes/storage"
|
|
sql "git.sr.ht/~cco/go-scopes/storage"
|
|
)
|
|
|
|
type ContFactory func(*sql.Storage) *Container
|
|
type ItemFactory func(*Container, ...string) Track
|
|
|
|
type Track interface {
|
|
TrackId() lib.Ident
|
|
Head() lib.StrMap
|
|
TimeStamp() *time.Time
|
|
Data() lib.Map
|
|
Container() *Container
|
|
Uid() string
|
|
SetHead(...string)
|
|
SetTimeStamp(*time.Time)
|
|
SetData(lib.Map)
|
|
Base() *BaseTrack
|
|
}
|
|
|
|
// basic track implementation
|
|
|
|
type track struct {
|
|
trackId lib.Ident
|
|
head lib.StrMap
|
|
timeStamp *time.Time
|
|
data lib.Map
|
|
container *Container
|
|
}
|
|
|
|
type BaseTrack = track
|
|
|
|
func MakeBaseTrack(cont *Container, h ...string) *track {
|
|
tr := track{
|
|
head: lib.StrMap{},
|
|
data: lib.Map{},
|
|
container: cont,
|
|
}
|
|
tr.SetHead(h...)
|
|
return &tr
|
|
}
|
|
|
|
func MakeTrack(cont *Container, h ...string) Track {
|
|
return MakeBaseTrack(cont, h...)
|
|
}
|
|
|
|
func New(cont *Container, h ...string) Track {
|
|
return cont.ItemFactory(cont, h...)
|
|
}
|
|
|
|
func (tr *track) Base() *BaseTrack {
|
|
return tr
|
|
}
|
|
|
|
func (tr *track) TrackId() lib.Ident {
|
|
return tr.trackId
|
|
}
|
|
|
|
func (tr *track) Head() lib.StrMap {
|
|
return tr.head
|
|
}
|
|
|
|
func (tr *track) TimeStamp() *time.Time {
|
|
return tr.timeStamp
|
|
}
|
|
|
|
func (tr *track) Data() lib.Map {
|
|
return tr.data
|
|
}
|
|
|
|
func (tr *track) Container() *Container {
|
|
return tr.container
|
|
}
|
|
|
|
func (tr *track) Uid() string {
|
|
return fmt.Sprintf("%s-%d", tr.container.Prefix, tr.trackId)
|
|
}
|
|
|
|
func (tr *track) SetHead(h ...string) {
|
|
for i, k := range tr.container.HeadFields {
|
|
if i >= len(h) {
|
|
break
|
|
}
|
|
if h[i] != "" {
|
|
tr.head[k] = h[i]
|
|
}
|
|
}
|
|
}
|
|
|
|
func (tr *track) SetTimeStamp(ts *time.Time) {
|
|
tr.timeStamp = ts
|
|
}
|
|
|
|
func (tr *track) SetData(data lib.Map) {
|
|
tr.data = data
|
|
}
|
|
|
|
func (tr *track) ScanP(rows *sql.Rows) error {
|
|
tr.head = lib.StrMap{}
|
|
var d []any
|
|
for range tr.container.HeadFields {
|
|
var hv string
|
|
d = append(d, &hv)
|
|
}
|
|
var ts, rd string
|
|
d = append(d, &ts, &rd, &tr.trackId)
|
|
err := rows.Scan(d...)
|
|
if err != nil {
|
|
log.Error(err).Msg("storage.tracking.ScanP: rows.Scan")
|
|
}
|
|
for i, k := range tr.container.HeadFields {
|
|
tr.head[k] = *d[i].(*string)
|
|
}
|
|
tr.timeStamp = ParseDateTime(ts)
|
|
err = json.Unmarshal([]byte(rd), &tr.data)
|
|
if err != nil {
|
|
log.Error(err).Msg("storage.tracking.ScanP: json.Unmarshal")
|
|
}
|
|
return err
|
|
}
|
|
|
|
// basic container implementation
|
|
|
|
type ContDef struct {
|
|
Prefix string
|
|
ContFactory ContFactory
|
|
ItemFactory ItemFactory
|
|
TableName string
|
|
HeadFields lib.StrSlice
|
|
IdFields lib.StrSlice
|
|
Indexes []lib.StrSlice
|
|
InsertOnChange bool
|
|
}
|
|
|
|
type Container struct {
|
|
*ContDef
|
|
Storage *sql.Storage
|
|
}
|
|
|
|
func Tracks(db *sql.Storage) *Container {
|
|
return &Container{container_definition, db}
|
|
}
|
|
|
|
type querySpec struct {
|
|
Table string
|
|
Headvals lib.StrSlice
|
|
Scols lib.StrSlice
|
|
Qucols, Ordcols lib.StrSlice
|
|
Quspecs []struct{ Col, Op string }
|
|
Ordspecs []ordSpec
|
|
Limit int
|
|
Quvals []any
|
|
Cont *Container
|
|
}
|
|
|
|
type ordSpec struct {
|
|
Col string
|
|
Desc bool
|
|
}
|
|
|
|
func (spec *querySpec) AddQu(col, op string) {
|
|
spec.Quspecs = append(spec.Quspecs, struct{ Col, Op string }{col, op})
|
|
}
|
|
|
|
func (spec *querySpec) AddOrd(col string, desc bool) {
|
|
spec.Ordspecs = append(spec.Ordspecs, ordSpec{col, desc})
|
|
}
|
|
|
|
func (spec *querySpec) setup(cont *Container) {
|
|
if cont.Storage.Schema == "" {
|
|
spec.Table = cont.TableName
|
|
} else {
|
|
spec.Table = fmt.Sprintf("%s.%s", cont.Storage.Schema, cont.TableName)
|
|
}
|
|
if spec.Scols == nil {
|
|
spec.Scols = append(cont.HeadFields, "timestamp", "data", "trackid")
|
|
}
|
|
for i, v := range spec.Headvals {
|
|
if v != "" {
|
|
spec.Qucols = append(spec.Qucols, cont.HeadFields[i])
|
|
spec.Quvals = append(spec.Quvals, v)
|
|
}
|
|
}
|
|
for _, c := range spec.Qucols {
|
|
spec.AddQu(c, "=")
|
|
}
|
|
for _, c := range spec.Ordcols {
|
|
spec.AddOrd(c, false)
|
|
}
|
|
spec.Cont = cont
|
|
}
|
|
|
|
func (cont *Container) Get(id lib.Ident) Track {
|
|
quSpec := &querySpec{
|
|
Qucols: lib.StrSlice{"trackid"},
|
|
Quvals: []any{id},
|
|
}
|
|
return cont.QueryOne(quSpec)
|
|
}
|
|
|
|
func (cont *Container) QueryLast(hv ...string) Track {
|
|
quSpec := &querySpec{
|
|
Headvals: hv,
|
|
Limit: 1,
|
|
}
|
|
quSpec.AddOrd("timestamp", true)
|
|
return cont.QueryOne(quSpec)
|
|
}
|
|
|
|
func (cont *Container) QueryOne(quSpec *querySpec) Track {
|
|
var tr Track
|
|
proc := func(r *sql.Rows) error {
|
|
tr = New(cont)
|
|
return tr.Base().ScanP(r)
|
|
}
|
|
quSpec.setup(cont)
|
|
sql := storage.BuildSql(SqlSelect, quSpec)
|
|
cont.Storage.Query(proc, sql, quSpec.Quvals...)
|
|
return tr
|
|
}
|
|
|
|
func (cont *Container) Query(quSpec *querySpec) []Track {
|
|
var trs []Track
|
|
proc := func(r *sql.Rows) error {
|
|
tr := New(cont)
|
|
err := tr.Base().ScanP(r)
|
|
if err == nil {
|
|
trs = append(trs, tr)
|
|
}
|
|
return err
|
|
}
|
|
quSpec.setup(cont)
|
|
sql := storage.BuildSql(SqlSelect, quSpec)
|
|
cont.Storage.Query(proc, sql, quSpec.Quvals...)
|
|
return trs
|
|
}
|
|
|
|
func (cont *Container) Save(tr Track) Track {
|
|
return tr
|
|
}
|
|
|
|
func (cont *Container) Insert(trk Track) Track {
|
|
tr := trk.Base()
|
|
quSpec := &querySpec{
|
|
Scols: append(cont.HeadFields, "Data"),
|
|
}
|
|
quSpec.setup(cont)
|
|
sql := storage.BuildSql(SqlInsert, quSpec)
|
|
var values []any
|
|
for _, k := range cont.HeadFields {
|
|
values = append(values, tr.head[k])
|
|
}
|
|
b, _ := json.Marshal(tr.data)
|
|
values = append(values, b)
|
|
var tsstr string
|
|
proc := func(r *sqllib.Rows) error {
|
|
err := r.Scan(&tr.trackId, &tsstr)
|
|
tr.timeStamp = ParseDateTime(tsstr)
|
|
return err
|
|
}
|
|
if err := cont.Storage.Query(proc, sql, values...); err == nil {
|
|
return trk
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (cont *Container) Update(trk Track) Track {
|
|
tr := trk.Base()
|
|
if tr.timeStamp == nil {
|
|
ts := time.Now()
|
|
tr.timeStamp = &ts
|
|
}
|
|
quSpec := &querySpec{
|
|
Scols: append(cont.HeadFields, "timestamp", "data"),
|
|
}
|
|
quSpec.setup(cont)
|
|
sql := storage.BuildSql(SqlUpdate, quSpec)
|
|
vals := []any{tr.trackId}
|
|
for _, k := range cont.HeadFields {
|
|
vals = append(vals, tr.head[k])
|
|
}
|
|
vals = append(vals, tr.timeStamp)
|
|
b, _ := json.Marshal(tr.data)
|
|
vals = append(vals, b)
|
|
n, _ := cont.Storage.Exec(sql, vals...)
|
|
if n == 1 {
|
|
return trk
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func ParseDateTime(inp string) *time.Time {
|
|
ts, err := time.Parse(time.RFC3339, inp)
|
|
if err == nil {
|
|
return &ts
|
|
}
|
|
ts, err = time.Parse("2006-01-02 15:04:05", inp)
|
|
if err == nil {
|
|
return &ts
|
|
}
|
|
ts, err = time.Parse("2006-01-02 15:04:05 -0700 MST", inp)
|
|
if err == nil {
|
|
return &ts
|
|
}
|
|
log.Error(err).Msg("storage.tracking.ParseDateTime")
|
|
return nil
|
|
}
|
|
|
|
func (cont *Container) CreateTable() {
|
|
spec := querySpec{}
|
|
spec.setup(cont)
|
|
sql := storage.BuildSql(SqlCreate, spec)
|
|
if _, err := cont.Storage.Exec(sql); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
// container registration
|
|
|
|
var container_registry map[string]*ContDef
|
|
|
|
func RegisterContainerDef(cd *ContDef) {
|
|
container_registry[cd.Prefix] = cd
|
|
}
|
|
|
|
func GetContainerDef(p string) *ContDef {
|
|
return container_registry[p]
|
|
}
|
|
|
|
func Get(db *sql.Storage, uid string) Track {
|
|
parts := strings.Split(uid, "-")
|
|
cd := GetContainerDef(parts[0])
|
|
cont := cd.ContFactory(db)
|
|
id, _ := strconv.ParseUint(parts[1], 10, 64)
|
|
return cont.Get(lib.Ident(id))
|
|
}
|
|
|
|
// container definition for the tracking package
|
|
|
|
var container_definition *ContDef
|
|
|
|
const type_prefix = "rec"
|
|
|
|
func init() {
|
|
container_registry = map[string]*ContDef{}
|
|
hf := lib.StrSlice{"taskId", "userName"}
|
|
ixs := []lib.StrSlice{hf}
|
|
ixs = append(ixs, lib.StrSlice{"userName"})
|
|
container_definition = &ContDef{
|
|
Prefix: type_prefix,
|
|
ContFactory: Tracks,
|
|
ItemFactory: MakeTrack,
|
|
TableName: "tracks",
|
|
HeadFields: hf,
|
|
IdFields: hf,
|
|
Indexes: ixs,
|
|
InsertOnChange: true,
|
|
}
|
|
RegisterContainerDef(container_definition)
|
|
}
|