work in progress: implement actor methods for restartable task
This commit is contained in:
parent
eef43f090e
commit
8de0ee8927
2 changed files with 48 additions and 23 deletions
|
@ -5,7 +5,7 @@
|
|||
(:local-nicknames (:async :scopes/util/async)
|
||||
(:shape :scopes/shape)
|
||||
(:util :scopes/util))
|
||||
(:export #:start #:stop #:create #:send
|
||||
(:export #:start #:stop #:create #:send #:become
|
||||
#:message #:content #:customer #:set-content
|
||||
#:*logger* #:*root*
|
||||
#:echo #:inc #:lgi
|
||||
|
@ -39,10 +39,10 @@
|
|||
(when (not (boundp '+quit-message+))
|
||||
(defconstant +quit-message+ (gensym "QUIT"))))
|
||||
|
||||
(defun start (mb bhv &key foreground)
|
||||
(defun start (tsk bhv &key foreground)
|
||||
(if foreground
|
||||
(ac-loop mb bhv)
|
||||
(async:submit-task mb #'ac-loop mb bhv)))
|
||||
(ac-loop tsk bhv)
|
||||
(async:submit-task tsk #'ac-loop tsk bhv)))
|
||||
|
||||
(defun stop (mb)
|
||||
(send mb +quit-message+))
|
||||
|
@ -53,33 +53,52 @@
|
|||
(unless (eq (content msg) +quit-message+)
|
||||
(ac-loop tsk (or (funcall bhv msg) bhv))))))
|
||||
|
||||
(defun ac-vloop (mb bhv)
|
||||
(multiple-value-bind (msg ok) (async:try-rcv mb)
|
||||
(if ok
|
||||
(if (eq (content msg) +quit-message+)
|
||||
nil
|
||||
(ac-vloop mb (or (funcall bhv msg) bhv)))
|
||||
bhv)))
|
||||
|
||||
;;;; the core (classical, i.e. Hewitt) actor API
|
||||
;;; there is no `become` operation: the behavior just returns the new behavior
|
||||
|
||||
(defun create (bhv)
|
||||
(let ((tsk (async:make-task bhv)))
|
||||
(start tsk bhv)
|
||||
tsk))
|
||||
|
||||
(defun send (mb msg)
|
||||
;(util:lgi msg)
|
||||
(async:snd mb msg))
|
||||
(defgeneric send (tsk msg)
|
||||
(:method ((tsk async:mailbox) msg)
|
||||
;(util:lgi msg)
|
||||
(async:snd tsk msg)))
|
||||
|
||||
(defun vsend (mb msg)
|
||||
(async:snd mb msg)
|
||||
(multiple-value-bind (bhv done) (async:try-receive-result mb)
|
||||
(if (and done bhv)
|
||||
(async:submit-task mb (lambda () (ac-vloop mb bhv))))))
|
||||
(defgeneric become (tsk bhv)
|
||||
(:method ((tsk async:mailbox) bhv) bhv))
|
||||
|
||||
;;;; predefined behaviors
|
||||
;;;; handling restartable tasks
|
||||
|
||||
(defmethod ac-loop ((tsk async:restartable-task) bhv)
|
||||
(async:get-status tsk) ; wait for end of concurrent activities
|
||||
(multiple-value-bind (msg ok) (async:try-rcv tsk)
|
||||
(if ok
|
||||
(if (eq (content msg) +quit-message+)
|
||||
(progn (async:set-status tsk :stopped) nil)
|
||||
(progn
|
||||
(async:set-status tsk :running)
|
||||
(ac-vloop tsk (or (funcall bhv msg) bhv))))
|
||||
(progn (async:set-status tsk :suspended) bhv))))
|
||||
|
||||
(defmethod send ((tsk async:restartable-task) msg)
|
||||
(let ((status (async:get-status tsk)))
|
||||
(when (eq status :stopped)
|
||||
(util:lgw "trying to send message to stopped task")
|
||||
(async:set-status tsk :stopped)
|
||||
(return-from send))
|
||||
(async:snd tsk msg)
|
||||
(when (or (eq status :new) (eq status :suspended))
|
||||
(async:try-receive-result tsk)
|
||||
(async:submit-task
|
||||
tsk (lambda () (ac-loop tsk (async:behavior tsk)))))
|
||||
(async:set-status :running)))
|
||||
|
||||
(defmethod become ((tsk async:task) bhv)
|
||||
(setf (async:behavior tsk) bhv)
|
||||
bhv)
|
||||
|
||||
;;;; predefined behaviors
|
||||
|
||||
(defun no-op (msg))
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
(:local-nicknames (:lp :lparallel)
|
||||
(:lpq :lparallel.queue))
|
||||
(:export #:init #:finish #:make-ch #:make-mb #:make-task #:rcv #:try-rcv #:snd
|
||||
#:mailbox #:task #:restartable-task #:behavior #:status
|
||||
#:mailbox #:task #:restartable-task #:behavior #:get-status #:set-status
|
||||
#:submit-task #:receive-result #:try-receive-result))
|
||||
|
||||
(in-package :scopes/util/async)
|
||||
|
@ -60,3 +60,9 @@
|
|||
(defun make-task (bhv &key restartable (cls 'task))
|
||||
(if restartable (setf cls 'restartable-task))
|
||||
(make-instance cls :behavior bhv))
|
||||
|
||||
(defun get-status (tsk)
|
||||
(lpq:pop-queue (status tsk)))
|
||||
|
||||
(defun set-status (tsk st)
|
||||
(lpq:push-queue st (status tsk)))
|
||||
|
|
Loading…
Add table
Reference in a new issue