util/async: +start, with test
This commit is contained in:
parent
8267ed8b38
commit
fcdacb677f
3 changed files with 46 additions and 27 deletions
|
@ -8,11 +8,12 @@
|
||||||
:description "Core packages of the scopes project."
|
:description "Core packages of the scopes project."
|
||||||
:depends-on (:alexandria :cl-dotenv :com.inuoe.jzon
|
:depends-on (:alexandria :cl-dotenv :com.inuoe.jzon
|
||||||
:flexi-streams :ironclad :local-time :log4cl
|
:flexi-streams :ironclad :local-time :log4cl
|
||||||
:lparallel :qbase64 :str :trivial-signal)
|
:lparallel :qbase64 :serapeum :str :trivial-signal)
|
||||||
:components ((:file "config" :depends-on ("util/util"))
|
:components ((:file "config" :depends-on ("util/util"))
|
||||||
(:file "core/core"
|
(:file "core/core"
|
||||||
:depends-on ("core/message" "config"
|
:depends-on ("core/message" "config"
|
||||||
"forge/forge" "logging" "util/util"))
|
"forge/forge" "logging"
|
||||||
|
"util/async" "util/util"))
|
||||||
(:file "core/message" :depends-on ("shape/shape"))
|
(:file "core/message" :depends-on ("shape/shape"))
|
||||||
(:file "forge/forge" :depends-on ("util/iter" "util/util"))
|
(:file "forge/forge" :depends-on ("util/iter" "util/util"))
|
||||||
(:file "logging" :depends-on ("config" "util/util"))
|
(:file "logging" :depends-on ("config" "util/util"))
|
||||||
|
|
|
@ -86,7 +86,12 @@
|
||||||
|
|
||||||
(deftest test-util-async ()
|
(deftest test-util-async ()
|
||||||
(let ((tsk (async:make-task)))
|
(let ((tsk (async:make-task)))
|
||||||
(format t "~%~a~%" (async::taskid tsk))))
|
(== (async:status tsk) :new)
|
||||||
|
(async:start tsk)
|
||||||
|
(== (async:status tsk) :running)
|
||||||
|
(sleep 0.1)
|
||||||
|
(== (async:status tsk) :finished)
|
||||||
|
))
|
||||||
|
|
||||||
(deftest test-util-crypt ()
|
(deftest test-util-crypt ()
|
||||||
(util:lgi (crypt:create-secret))
|
(util:lgi (crypt:create-secret))
|
||||||
|
|
|
@ -5,38 +5,51 @@
|
||||||
(:local-nicknames (:util :scopes/util)
|
(:local-nicknames (:util :scopes/util)
|
||||||
(:lp :lparallel)
|
(:lp :lparallel)
|
||||||
(:lpq :lparallel.queue))
|
(:lpq :lparallel.queue))
|
||||||
(:export #:startup #:shutdown
|
(:export #:task #:make-task #:start #:restart #:stop #:kill #:status #:logdata
|
||||||
#:task #:make-task #:start #:restart #:stop #:kill #:status #:logdata
|
|
||||||
#:mailbox #:send #:receive))
|
#:mailbox #:send #:receive))
|
||||||
|
|
||||||
(in-package :scopes/util/async)
|
(in-package :scopes/util/async)
|
||||||
|
|
||||||
|
;;;; general definitions
|
||||||
|
|
||||||
|
(eval-when (:compile-toplevel :load-toplevel :execute)
|
||||||
|
(progn
|
||||||
(when (not (boundp '+quit-message+))
|
(when (not (boundp '+quit-message+))
|
||||||
(defconstant +quit-message+ (gensym "QUIT")))
|
(defconstant +quit-message+ (gensym "QUIT")))
|
||||||
|
|
||||||
(when (null lp:*kernel*)
|
(when (null lp:*kernel*)
|
||||||
(setf lp:*kernel* (lp:make-kernel 2)))
|
(setf lp:*kernel* (lp:make-kernel (serapeum:count-cpus))))))
|
||||||
|
|
||||||
(defclass task ()
|
(defun noop (&rest params))
|
||||||
((job :accessor job :initform nil)
|
|
||||||
(taskid :reader taskid :initform (gensym "TSK"))
|
|
||||||
(channel :reader channel :initform (lp:make-channel))
|
|
||||||
(mailbox :reader mailbox :initarg :mailbox)
|
|
||||||
(status :accessor status :initform :new)
|
|
||||||
(logdata :accessor logdata :initform nil)))
|
|
||||||
|
|
||||||
(defun make-task ()
|
|
||||||
(make-instance 'task))
|
|
||||||
|
|
||||||
(defun standard-job (tsk &key (startup #'noop) (teardown #'noop) handle-message)
|
(defun standard-job (tsk &key (startup #'noop) (teardown #'noop) handle-message)
|
||||||
(let ((mb (mailbox tsk)))
|
(let ((mb (mailbox tsk)))
|
||||||
(startup tsk)
|
(funcall startup tsk)
|
||||||
(if mb
|
(if mb
|
||||||
(loop
|
(loop for msg = (lpq:pop-queue mb)
|
||||||
(let ((msg (lpq:pop-queue mb)))
|
until (eq msg +quit-message+)
|
||||||
(if (eq msg +quit-message+)
|
do (funcall handle-message tsk msg)))
|
||||||
(return-from nil)
|
(funcall teardown tsk)
|
||||||
(handle-message tsk msg)))))
|
(setf (status tsk) :finished)))
|
||||||
(teardown tsk)))
|
|
||||||
|
|
||||||
(defun noop (&rest params))
|
;;;; task class and related functions / methods
|
||||||
|
|
||||||
|
(defclass task ()
|
||||||
|
((job :accessor job)
|
||||||
|
(taskid :reader taskid :initform (gensym "TSK"))
|
||||||
|
(channel :reader channel :initform (lp:make-channel))
|
||||||
|
(mailbox :accessor mailbox :initform nil)
|
||||||
|
(status :accessor status :initform :new)
|
||||||
|
(logdata :accessor logdata :initform nil)))
|
||||||
|
|
||||||
|
(defun make-task (&key (startup #'noop) (teardown #'noop) handle-message)
|
||||||
|
(let ((tsk (make-instance 'task)))
|
||||||
|
(setf (job tsk)
|
||||||
|
(lambda () (standard-job tsk :startup startup :teardown teardown
|
||||||
|
:handle-message handle-message)))
|
||||||
|
(if handle-message
|
||||||
|
(setf (mailbox tsk) (lpq:make-queue)))
|
||||||
|
tsk))
|
||||||
|
|
||||||
|
(defun start (tsk)
|
||||||
|
(lp:submit-task (channel tsk) (job tsk))
|
||||||
|
(setf (status tsk) :running))
|
||||||
|
|
Loading…
Add table
Reference in a new issue