work in progress: actor based on restartable-task with thread-safe status flag

This commit is contained in:
Helmut Merz 2025-06-15 16:13:36 +02:00
parent 0d73c7d39e
commit 2042748fc8
2 changed files with 20 additions and 6 deletions

View file

@ -39,10 +39,10 @@
(when (not (boundp '+quit-message+)) (when (not (boundp '+quit-message+))
(defconstant +quit-message+ (gensym "QUIT")))) (defconstant +quit-message+ (gensym "QUIT"))))
(defun start (mb bhv &key foreground (listener #'ac-loop)) (defun start (mb bhv &key foreground)
(if foreground (if foreground
(ac-loop mb bhv) (ac-loop mb bhv)
(async:submit-task mb listener mb bhv))) (async:submit-task mb #'ac-loop mb bhv)))
(defun stop (mb) (defun stop (mb)
(send mb +quit-message+)) (send mb +quit-message+))
@ -66,14 +66,15 @@
(defun create (bhv) (defun create (bhv)
(let ((mb (async:make-mb))) (let ((mb (async:make-mb)))
(start mb bhv) (start mb bhv)
(start mb bhv :listener #'ac-vloop)
mb)) mb))
(defun send (mb msg) (defun send (mb msg)
;(util:lgi msg) ;(util:lgi msg)
(async:snd mb msg))
(defun vsend (mb msg)
(async:snd mb msg) (async:snd mb msg)
(multiple-value-bind (bhv done) (async:try-receive-result mb) (multiple-value-bind (bhv done) (async:try-receive-result mb)
(util:lgi done)
(if (and done bhv) (if (and done bhv)
(async:submit-task mb (lambda () (ac-vloop mb bhv)))))) (async:submit-task mb (lambda () (ac-vloop mb bhv))))))

View file

@ -4,7 +4,8 @@
(:use :common-lisp) (:use :common-lisp)
(:local-nicknames (:lp :lparallel) (:local-nicknames (:lp :lparallel)
(:lpq :lparallel.queue)) (:lpq :lparallel.queue))
(:export #:init #:finish #:make-ch #:make-mb #:rcv #:try-rcv #:snd (:export #:init #:finish #:make-ch #:make-mb #:make-task #:rcv #:try-rcv #:snd
#:mailbox #:task #:restartable-task #:behavior #:status
#:submit-task #:receive-result #:try-receive-result)) #:submit-task #:receive-result #:try-receive-result))
(in-package :scopes/util/async) (in-package :scopes/util/async)
@ -20,7 +21,7 @@
(when lp:*kernel* (when lp:*kernel*
(lp:end-kernel))) (lp:end-kernel)))
;;;; higher-level mailbox / task / channel combination ;;;; simple task = mailbox with result channel
(defclass mailbox () (defclass mailbox ()
((queue :reader queue :initform (lpq:make-queue)) ((queue :reader queue :initform (lpq:make-queue))
@ -47,3 +48,15 @@
(defun try-receive-result (mb) (defun try-receive-result (mb)
(lp:try-receive-result (channel mb))) (lp:try-receive-result (channel mb)))
;;;; tasks - with behavior and thread-safe status
(defclass task (mailbox)
((behavior :accessor behavior :initarg :behavior)))
(defclass restartable-task (task)
((status :reader status
:initform (lp:make-channel :fixed-capacity 1 :initial-contents '(:new)))))
(defun make-task (bhv &key restartable (cls 'task))
(if restartable (setf cls 'restartable-task))
(make-instance cls :behavior bhv))