From 8de0ee8927f8fb393e7117f4b51f068d0396d80d Mon Sep 17 00:00:00 2001 From: Helmut Merz Date: Mon, 16 Jun 2025 09:59:27 +0200 Subject: [PATCH] work in progress: implement actor methods for restartable task --- core/actor.lisp | 63 ++++++++++++++++++++++++++++++++----------------- util/async.lisp | 8 ++++++- 2 files changed, 48 insertions(+), 23 deletions(-) diff --git a/core/actor.lisp b/core/actor.lisp index d607bfa..b86d40a 100644 --- a/core/actor.lisp +++ b/core/actor.lisp @@ -5,7 +5,7 @@ (:local-nicknames (:async :scopes/util/async) (:shape :scopes/shape) (:util :scopes/util)) - (:export #:start #:stop #:create #:send + (:export #:start #:stop #:create #:send #:become #:message #:content #:customer #:set-content #:*logger* #:*root* #:echo #:inc #:lgi @@ -39,10 +39,10 @@ (when (not (boundp '+quit-message+)) (defconstant +quit-message+ (gensym "QUIT")))) -(defun start (mb bhv &key foreground) +(defun start (tsk bhv &key foreground) (if foreground - (ac-loop mb bhv) - (async:submit-task mb #'ac-loop mb bhv))) + (ac-loop tsk bhv) + (async:submit-task tsk #'ac-loop tsk bhv))) (defun stop (mb) (send mb +quit-message+)) @@ -53,33 +53,52 @@ (unless (eq (content msg) +quit-message+) (ac-loop tsk (or (funcall bhv msg) bhv)))))) -(defun ac-vloop (mb bhv) - (multiple-value-bind (msg ok) (async:try-rcv mb) - (if ok - (if (eq (content msg) +quit-message+) - nil - (ac-vloop mb (or (funcall bhv msg) bhv))) - bhv))) - ;;;; the core (classical, i.e. Hewitt) actor API -;;; there is no `become` operation: the behavior just returns the new behavior (defun create (bhv) (let ((tsk (async:make-task bhv))) (start tsk bhv) tsk)) -(defun send (mb msg) - ;(util:lgi msg) - (async:snd mb msg)) +(defgeneric send (tsk msg) + (:method ((tsk async:mailbox) msg) + ;(util:lgi msg) + (async:snd tsk msg))) -(defun vsend (mb msg) - (async:snd mb msg) - (multiple-value-bind (bhv done) (async:try-receive-result mb) - (if (and done bhv) - (async:submit-task mb (lambda () (ac-vloop mb bhv)))))) +(defgeneric become (tsk bhv) + (:method ((tsk async:mailbox) bhv) bhv)) -;;;; predefined behaviors +;;;; handling restartable tasks + +(defmethod ac-loop ((tsk async:restartable-task) bhv) + (async:get-status tsk) ; wait for end of concurrent activities + (multiple-value-bind (msg ok) (async:try-rcv tsk) + (if ok + (if (eq (content msg) +quit-message+) + (progn (async:set-status tsk :stopped) nil) + (progn + (async:set-status tsk :running) + (ac-vloop tsk (or (funcall bhv msg) bhv)))) + (progn (async:set-status tsk :suspended) bhv)))) + +(defmethod send ((tsk async:restartable-task) msg) + (let ((status (async:get-status tsk))) + (when (eq status :stopped) + (util:lgw "trying to send message to stopped task") + (async:set-status tsk :stopped) + (return-from send)) + (async:snd tsk msg) + (when (or (eq status :new) (eq status :suspended)) + (async:try-receive-result tsk) + (async:submit-task + tsk (lambda () (ac-loop tsk (async:behavior tsk))))) + (async:set-status :running))) + +(defmethod become ((tsk async:task) bhv) + (setf (async:behavior tsk) bhv) + bhv) + + ;;;; predefined behaviors (defun no-op (msg)) diff --git a/util/async.lisp b/util/async.lisp index 183722d..50ff129 100644 --- a/util/async.lisp +++ b/util/async.lisp @@ -5,7 +5,7 @@ (:local-nicknames (:lp :lparallel) (:lpq :lparallel.queue)) (:export #:init #:finish #:make-ch #:make-mb #:make-task #:rcv #:try-rcv #:snd - #:mailbox #:task #:restartable-task #:behavior #:status + #:mailbox #:task #:restartable-task #:behavior #:get-status #:set-status #:submit-task #:receive-result #:try-receive-result)) (in-package :scopes/util/async) @@ -60,3 +60,9 @@ (defun make-task (bhv &key restartable (cls 'task)) (if restartable (setf cls 'restartable-task)) (make-instance cls :behavior bhv)) + +(defun get-status (tsk) + (lpq:pop-queue (status tsk))) + +(defun set-status (tsk st) + (lpq:push-queue st (status tsk)))