async: provide higher-level mailbox with task channel

This commit is contained in:
Helmut Merz 2025-06-12 10:49:43 +02:00
parent 5ff08dcde6
commit 41ed781a44
4 changed files with 28 additions and 52 deletions

View file

@ -45,9 +45,7 @@
(defun start (mb bhv &key foreground)
(if foreground
(ac-loop mb bhv)
(let ((ch (async:make-ch)))
(async:submit-task ch (lambda () (ac-loop mb bhv)))
ch)))
(async:submit-task mb (lambda () (ac-loop mb bhv)))))
(defun stop (mb)
(send mb +quit-message+))

View file

@ -10,8 +10,7 @@
(:shape :scopes/shape)
(:util :scopes/util)
(:alx :alexandria))
(:export #:*dispatcher* #:setup #:send
#:printer
(:export #:*dispatcher* #:printer #:setup #:send
))
(in-package :scopes/csys)
@ -33,8 +32,8 @@
(util:lgw "no action selected" msg))))
(defun run-action (job msg)
(let ((ch (async:make-ch)))
(async:submit-task ch (lambda () (funcall job msg)))))
(let ((mb (async:make-mb)))
(async:submit-task mb (lambda () (funcall job msg)))))
;;;; example behaviors / actions

View file

@ -88,15 +88,9 @@
(== pl '(:b 1 :a 0))))
(deftest test-util-async ()
;(async:init)
(let ((tsk (async:make-task (lambda (&rest args) (sleep 0.01)))))
(== (async:status tsk) :new)
(async:start tsk)
(== (async:status tsk) :running)
(async:wait-receive tsk)
(== (async:status tsk) :done))
;(async:finish)
)
(let ((mb (async:make-mb)))
(async:submit-task mb (lambda () (sleep 0.1) :done))
))
(deftest test-util-crypt ()
(let ((s1 (crypt:create-secret))

View file

@ -2,12 +2,10 @@
(defpackage :scopes/util/async
(:use :common-lisp)
(:local-nicknames (:util :scopes/util)
(:lp :lparallel)
(:local-nicknames (:lp :lparallel)
(:lpq :lparallel.queue))
(:export #:init #:finish #:make-ch #:make-mb #:rcv #:snd
#:submit-task #:receive-result
#:task #:make-task #:start #:status #:wait-receive))
(:export #:init #:finish #:make-ch #:make-mb #:rcv #:try-rcv #:snd
#:submit-task #:receive-result #:try-receive-result))
(in-package :scopes/util/async)
@ -22,43 +20,30 @@
(when lp:*kernel*
(lp:end-kernel)))
(defun make-ch ()
(lp:make-channel))
;;;; higher-level mailbox / task / channel combination
(defclass mailbox ()
((queue :reader queue :initform (lpq:make-queue))
(channel :reader channel :initform (lp:make-channel))))
(defun make-mb ()
(lpq:make-queue))
(make-instance 'mailbox))
(defun rcv (mb)
(lpq:pop-queue mb))
(lpq:pop-queue (queue mb)))
(defun try-rcv (mb)
(lpq:peek-queue (queue mb)))
(defun snd (mb msg)
(lpq:push-queue msg mb))
(lpq:push-queue msg (queue mb)))
(defun submit-task (ch job)
(lp:submit-task ch job))
(defun submit-task (mb job)
(lp:submit-task (channel mb) job))
(defun receive-result (ch)
(lp:receive-result ch))
(defun receive-result (mb)
(lp:receive-result (channel mb)))
;;;; task class and related functions
(defclass task ()
((job :reader job :initarg :job)
(taskid :reader taskid :initform (gensym "TSK"))
(status :accessor status :initform :new)
(channel :reader channel :initform (make-ch))))
(defun make-task (job &key (cls 'task))
(make-instance cls :job job))
(defun start (tsk)
(when (eq (status tsk) :running)
(util:lgw "task already running" (taskid tsk))
(return-from start))
(setf (status tsk) :running)
(submit-task (channel tsk) (job tsk)))
(defun wait-receive (tsk)
(let ((data (receive-result (channel tsk))))
(setf (status tsk) :done)
data))
(defun try-receive-result (mb)
(lp:try-receive-result (channel mb)))