diff --git a/scopes-core.asd b/scopes-core.asd index 39a0e65..eb66653 100644 --- a/scopes-core.asd +++ b/scopes-core.asd @@ -8,11 +8,12 @@ :description "Core packages of the scopes project." :depends-on (:alexandria :cl-dotenv :com.inuoe.jzon :flexi-streams :ironclad :local-time :log4cl - :lparallel :qbase64 :str :trivial-signal) + :lparallel :qbase64 :serapeum :str :trivial-signal) :components ((:file "config" :depends-on ("util/util")) (:file "core/core" :depends-on ("core/message" "config" - "forge/forge" "logging" "util/util")) + "forge/forge" "logging" + "util/async" "util/util")) (:file "core/message" :depends-on ("shape/shape")) (:file "forge/forge" :depends-on ("util/iter" "util/util")) (:file "logging" :depends-on ("config" "util/util")) diff --git a/test/test-core.lisp b/test/test-core.lisp index c94172e..55f1929 100644 --- a/test/test-core.lisp +++ b/test/test-core.lisp @@ -86,7 +86,12 @@ (deftest test-util-async () (let ((tsk (async:make-task))) - (format t "~%~a~%" (async::taskid tsk)))) + (== (async:status tsk) :new) + (async:start tsk) + (== (async:status tsk) :running) + (sleep 0.1) + (== (async:status tsk) :finished) + )) (deftest test-util-crypt () (util:lgi (crypt:create-secret)) diff --git a/util/async.lisp b/util/async.lisp index ca617b1..e89f33e 100644 --- a/util/async.lisp +++ b/util/async.lisp @@ -5,38 +5,51 @@ (:local-nicknames (:util :scopes/util) (:lp :lparallel) (:lpq :lparallel.queue)) - (:export #:startup #:shutdown - #:task #:make-task #:start #:restart #:stop #:kill #:status #:logdata + (:export #:task #:make-task #:start #:restart #:stop #:kill #:status #:logdata #:mailbox #:send #:receive)) (in-package :scopes/util/async) -(when (not (boundp '+quit-message+)) - (defconstant +quit-message+ (gensym "QUIT"))) +;;;; general definitions -(when (null lp:*kernel*) - (setf lp:*kernel* (lp:make-kernel 2))) +(eval-when (:compile-toplevel :load-toplevel :execute) + (progn + (when (not (boundp '+quit-message+)) + (defconstant +quit-message+ (gensym "QUIT"))) + (when (null lp:*kernel*) + (setf lp:*kernel* (lp:make-kernel (serapeum:count-cpus)))))) -(defclass task () - ((job :accessor job :initform nil) - (taskid :reader taskid :initform (gensym "TSK")) - (channel :reader channel :initform (lp:make-channel)) - (mailbox :reader mailbox :initarg :mailbox) - (status :accessor status :initform :new) - (logdata :accessor logdata :initform nil))) - -(defun make-task () - (make-instance 'task)) +(defun noop (&rest params)) (defun standard-job (tsk &key (startup #'noop) (teardown #'noop) handle-message) (let ((mb (mailbox tsk))) - (startup tsk) + (funcall startup tsk) (if mb - (loop - (let ((msg (lpq:pop-queue mb))) - (if (eq msg +quit-message+) - (return-from nil) - (handle-message tsk msg))))) - (teardown tsk))) + (loop for msg = (lpq:pop-queue mb) + until (eq msg +quit-message+) + do (funcall handle-message tsk msg))) + (funcall teardown tsk) + (setf (status tsk) :finished))) -(defun noop (&rest params)) +;;;; task class and related functions / methods + +(defclass task () + ((job :accessor job) + (taskid :reader taskid :initform (gensym "TSK")) + (channel :reader channel :initform (lp:make-channel)) + (mailbox :accessor mailbox :initform nil) + (status :accessor status :initform :new) + (logdata :accessor logdata :initform nil))) + +(defun make-task (&key (startup #'noop) (teardown #'noop) handle-message) + (let ((tsk (make-instance 'task))) + (setf (job tsk) + (lambda () (standard-job tsk :startup startup :teardown teardown + :handle-message handle-message))) + (if handle-message + (setf (mailbox tsk) (lpq:make-queue))) + tsk)) + +(defun start (tsk) + (lp:submit-task (channel tsk) (job tsk)) + (setf (status tsk) :running))