cl-scopes/storage/tracking.lisp

234 lines
8.2 KiB
Common Lisp

;;;; cl-scopes/storage/tracking - a simple generic (SQL-based) storage
;;;; for tracks, messages, and other stuff.
(defpackage :scopes/storage/tracking
(:use :common-lisp)
(:local-nicknames (:shape :scopes/shape)
(:storage :scopes/storage)
(:util :scopes/util)
(:alx :alexandria)
(:jzon :com.inuoe.jzon)
(:mop :closer-mop))
(:export #:track #:trackid #:id-str #:key-fields #:timestamp
#:container #:make-container #:table-name #:storage #:make-item
#:get-track #:query-last #:query-one #:query #:make-where
#:save #:insert #:update
#:create-indexes #:create-table))
(in-package :scopes/storage/tracking)
(defvar *containers* (make-hash-table))
(defvar *build-track-data* #'alx:hash-table-plist)
(defclass track (shape:record)
((shape:meta :initform (make-instance 'shape:record-meta) :allocation :class)
(key-fields :reader key-fields :initarg :key-fields :initform nil :allocation :class)
(trackid :accessor trackid :initform nil)
(timestamp :accessor timestamp :initform nil)
(container :reader container :initarg :container)))
(defun keys-plist (rec)
(let ((kf (key-fields rec))
(hv (shape:head-plist rec)))
(if kf
(util:filter-plist hv kf)
hv)))
(defun id-str (tr)
(write-to-string (trackid tr)))
(defun uid (tr)
(format nil "~(~a~)-~a" (short-name (container tr)) (trackid tr)))
(defun timestamp-to-sql (tr ts)
(funcall (getf (storage:params (storage (container tr))) :ts-sql) ts))
(defun default-indexes (cont)
(maplist #'identity (item-head-fields cont)))
;'((taskid username) (username)))
(defclass container ()
((item-class :reader item-class :initarg :item-class :initform 'track)
(short-name :reader short-name :initarg :short-name :initform :track)
(table-name :reader table-name :initarg :table-name :initform :tracks)
(index-factory :reader index-factory :initarg :index-factory
:initform #'default-indexes)
(force-insert-when :reader force-insert-when
:initarg :force-insert-when :initform nil)
(storage :reader storage :initarg :storage)
(item-head-fields :accessor item-head-fields)))
(defun make-container (st)
(make-instance 'container :short-name :trk :storage st))
(defmethod initialize-instance :after ((cont container) &key &allow-other-keys)
(setf (gethash (short-name cont) *containers*) cont))
(defun item-head-fields (cont)
(shape:head-fields (shape:get-meta (item-class cont))))
(defun make-item (cont &rest head)
(make-instance (item-class cont) :head head :container cont))
(defun query-last (cont head-plist)
(let ((parts (util:plist-pairs head-plist)))
(query-one cont (make-where parts) :order-by '((:desc :trackid)))))
(defun get-track (cont trid)
(query-one cont (make-where (list (list :trackid trid)))))
;(query-one cont (list := :trackid trid)))
(defun query-one (cont crit &key order-by)
;(util:lgd crit)
(let ((rows (storage:query (storage cont)
(setup-select cont crit :order-by order-by :limit 1))))
(setup-track (make-item cont) (car rows))))
(defun query (cont crit &key order-by)
(let ((rows (storage:query (storage cont)
(setup-select cont crit :order-by order-by))))
(mapcar #'(lambda (row) (setup-track (make-item cont) row)) rows)))
(defun save (track)
(let* ((cont (container track))
(force-insert (force-insert-when cont)))
(if (eql force-insert :always)
(insert track)
(let ((found (query-last cont (keys-plist track))))
(if found
(if (track-equal found track)
found
(if (eql force-insert :changed)
(insert track)
(progn
(setf (trackid track) (trackid found))
(unless (timestamp track)
(setf (timestamp track) (timestamp found)))
(update track)
track)))
(insert track))))))
(defun insert (track)
(ensure-timestamp track)
(setf (trackid track) nil)
(let* ((cont (container track))
(st (storage cont))
(table (storage:qualified-table-name st (table-name cont)))
(res (car (storage:query st
(sxql:insert-into table
(apply #'sxql:make-clause ':set= (plist track))
(sxql:returning :trackid))))))
(setf (trackid track) (cadr res))
track))
(defun update (track)
(ensure-timestamp track)
(let* ((cont (container track))
(st (storage cont))
(table (storage:qualified-table-name st (table-name cont))))
(storage:do-sql st
(sxql:update table
(apply #'sxql:make-clause ':set= (plist track))
(sxql:where (list := :trackid (trackid track)))))))
;;;; auxiliary functions for queries, ...
(defun track-equal (old new)
(unless (equal (shape:head old) (shape:head new))
(return-from track-equal nil))
(if (timestamp new)
(unless (equal (timestamp new) (timestamp old))
(return-from track-equal nil)))
(util:plist-equal (shape:data old) (shape:data new)))
(defun setup-select (cont crit &key order-by limit)
(let ((table (storage:qualified-table-name (storage cont) (table-name cont)))
(cols (cons :trackid (append (item-head-fields cont) '(:timestamp :data))))
clauses)
(if limit
(push (sxql:limit limit) clauses))
(if order-by
(push (sxql:order-by (values-list order-by)) clauses))
(if crit
(push (sxql:where crit) clauses))
(if clauses
(sxql:select cols (sxql:from table) (values-list clauses))
(sxql:select cols (sxql:from table)))))
(defun make-where (specs)
(let (crit)
(dolist (spec specs)
(destructuring-bind (f v &optional op) spec
(unless v
(setf v ""))
(unless op
(setf op (if (eql f :timestamp) :>= :=)))
(when (symbolp v)
(setf v (string-downcase v)))
(push (list op f v) crit)))
(if (cdr crit)
(cons :and crit)
(car crit))))
(defun setup-track (tr row)
;(util:logd tr row)
(when row
(let ((hv (mapcar #'(lambda (x) (util:to-keyword (getf row x)))
(shape:head-fields tr))))
(setf (slot-value tr 'shape:head) hv)
(setf (trackid tr) (getf row :trackid))
(setf (timestamp tr) (getf row :timestamp))
(setf (shape:data tr)
(funcall *build-track-data*
(jzon:parse (getf row :data) :key-fn #'util:to-keyword)))
tr)))
(defun ensure-timestamp (track)
(if (not (timestamp track))
(setf (timestamp track) (get-universal-time))))
(defun plist (track)
(let ((vl (shape:head-plist track))
(data (shape:data track)))
(with-slots ((trid trackid) (ts timestamp)) track
(when trid (setf (getf vl :trackid) trid))
(when ts (setf (getf vl :timestamp) (timestamp-to-sql track ts)))
(when data
(unless (hash-table-p data)
(setf data (alx:plist-hash-table data)))
(setf (getf vl :data) (jzon:stringify data))))
vl))
;;;; create table and indexes
(defun create-table (cont)
(let* ((st (storage cont))
(tn (table-name cont))
(table (storage:qualified-table-name st tn))
(params (storage:params st))
(id-type (getf params :id-type))
(json-type (getf params :json-type))
(hf-def (mapcar #'(lambda (x) (list x :type 'text :not-null t :default '|''|))
(item-head-fields cont))))
(storage:do-sql st
(sxql:make-statement :create-table table
(nconc
`((trackid :type ,id-type :primary-key t :not-null t))
hf-def
`((timestamp :type timestamptz :not-null t :default current_timestamp)
(data :type ,json-type :not-null t :default |'{}'|)))))
(create-indexes st table tn
(funcall (index-factory cont) cont))))
(defun create-indexes (st table tname ixs)
(let ((i 1)
(tn (symbol-name tname)))
(dolist (ix ixs)
(let ((ixname (intern (format nil "IDX_~a_~d" tn i))))
(incf i)
(storage:do-sql st
(sxql:create-index ixname :on (cons table ix)))))
(storage:do-sql st
(sxql:create-index (intern (format nil "IDX_~a_TS" tn))
:on (cons table '(timestamp))))))