Compare commits
2 commits
09636d9960
...
f45d152e47
Author | SHA1 | Date | |
---|---|---|---|
f45d152e47 | |||
25cc9c6efe |
3 changed files with 15 additions and 202 deletions
|
@ -1,111 +0,0 @@
|
||||||
;;;; cl-scopes/core/actor - basic actor definitions
|
|
||||||
|
|
||||||
(defpackage :scopes/core/actor
|
|
||||||
(:use :common-lisp)
|
|
||||||
(:local-nicknames (:async :scopes/util/async)
|
|
||||||
(:util :scopes/util))
|
|
||||||
(:export #:bg-actor #:fg-actor #:make-actor #:start #:stop
|
|
||||||
#:become #:create #:send
|
|
||||||
#:message #:content #:customer
|
|
||||||
#:*logger* #:*root*
|
|
||||||
#:echo #:inc #:lgi
|
|
||||||
#:calculator #:plus #:minus #:show #:send-value))
|
|
||||||
|
|
||||||
(in-package :scopes/core/actor)
|
|
||||||
|
|
||||||
;;;; basic message and actor implementations
|
|
||||||
|
|
||||||
(defclass message ()
|
|
||||||
((content :reader content :initarg :content :initform nil)
|
|
||||||
(customer :reader customer :initarg :customer :initform nil)))
|
|
||||||
|
|
||||||
(defun message (content &optional customer)
|
|
||||||
(make-instance 'message :content content :customer customer))
|
|
||||||
|
|
||||||
(defclass actor ()
|
|
||||||
((behavior :accessor behavior :initarg :behavior :initform #'no-op)))
|
|
||||||
|
|
||||||
(defclass bg-actor (actor)
|
|
||||||
((task :accessor task :initform nil)))
|
|
||||||
|
|
||||||
(defmethod initialize-instance :after ((ac bg-actor) &key &allow-other-keys)
|
|
||||||
(setf (task ac) (make-task ac)))
|
|
||||||
|
|
||||||
(defclass fg-actor (bg-actor) ())
|
|
||||||
|
|
||||||
(defmethod initialize-instance :after ((ac fg-actor) &key &allow-other-keys)
|
|
||||||
(setf (task ac) (make-task ac 'async:fg-task)))
|
|
||||||
|
|
||||||
(defgeneric start (ac)
|
|
||||||
(:method ((ac actor)))
|
|
||||||
(:method ((ac bg-actor))
|
|
||||||
(async:start (task ac))))
|
|
||||||
|
|
||||||
(defgeneric stop (ac)
|
|
||||||
(:method ((ac actor)))
|
|
||||||
(:method ((ac bg-actor))
|
|
||||||
(async:stop (task ac))))
|
|
||||||
|
|
||||||
(defun make-actor (bhv &optional (cls 'bg-actor) &rest args &key &allow-other-keys)
|
|
||||||
(apply #'make-instance cls :behavior bhv args))
|
|
||||||
|
|
||||||
(defun make-task (ac &optional (cls 'async:task))
|
|
||||||
(async:make-task :cls cls
|
|
||||||
:handle-message
|
|
||||||
#'(lambda (ax msg) (funcall (behavior ac) ac msg))))
|
|
||||||
|
|
||||||
;;;; the core (classical, i.e. Hewitt) actor API
|
|
||||||
|
|
||||||
(defun become (ac bhv)
|
|
||||||
(setf (behavior ac) bhv))
|
|
||||||
|
|
||||||
(defun create (bhv &optional (cls 'actor) &rest args &key &allow-other-keys)
|
|
||||||
(let ((ac (apply #'make-actor bhv cls args)))
|
|
||||||
(start ac)
|
|
||||||
ac))
|
|
||||||
|
|
||||||
(defgeneric send (addr content &key &allow-other-keys)
|
|
||||||
(:method ((addr t) (content t) &key customer &allow-other-keys)
|
|
||||||
(let ((ac addr) (msg (message content customer)))
|
|
||||||
(send ac msg)))
|
|
||||||
(:method ((ac actor) (msg message) &key &allow-other-keys)
|
|
||||||
(funcall (behavior ac) ac msg))
|
|
||||||
(:method ((ac bg-actor) (msg message) &key &allow-other-keys)
|
|
||||||
(async:send (task ac) msg)))
|
|
||||||
|
|
||||||
;;;; predefined behaviors
|
|
||||||
|
|
||||||
(defun no-op (ac msg))
|
|
||||||
|
|
||||||
(defun lgi (ac msg)
|
|
||||||
(util:lgi (content msg)))
|
|
||||||
|
|
||||||
(defun echo (ac msg)
|
|
||||||
(send (customer msg) msg))
|
|
||||||
|
|
||||||
;;;; predefined global actors
|
|
||||||
|
|
||||||
(defvar *logger* (create #'lgi))
|
|
||||||
|
|
||||||
(defclass root (actor) ())
|
|
||||||
|
|
||||||
(defun root-bhv (ac msg)
|
|
||||||
(send *logger* msg))
|
|
||||||
|
|
||||||
(defvar *root* (create #'root-bhv 'root))
|
|
||||||
|
|
||||||
;;;; example behavior: calculator
|
|
||||||
|
|
||||||
(defun calculator (&optional (val 0))
|
|
||||||
#'(lambda (ac msg)
|
|
||||||
(destructuring-bind (fn &optional param) (content msg)
|
|
||||||
(funcall fn ac msg val param))))
|
|
||||||
|
|
||||||
(defun plus (ac msg val param)
|
|
||||||
(become ac (calculator (+ val param))))
|
|
||||||
(defun minus (ac msg val param)
|
|
||||||
(become ac (calculator (- val param))))
|
|
||||||
(defun show (ac msg val param)
|
|
||||||
(send (or (customer msg) *logger*) val))
|
|
||||||
(defun send-value (ac msg val param)
|
|
||||||
(send (customer msg) val))
|
|
|
@ -89,19 +89,11 @@
|
||||||
|
|
||||||
(deftest test-util-async ()
|
(deftest test-util-async ()
|
||||||
;(async:init)
|
;(async:init)
|
||||||
(let ((tsk (async:make-task :startup (lambda (&rest args) (sleep 0.01)))))
|
(let ((tsk (async:make-task (lambda (&rest args) (sleep 0.01)))))
|
||||||
(== (async:status tsk) :new)
|
(== (async:status tsk) :new)
|
||||||
(async:start tsk)
|
(async:start tsk)
|
||||||
(== (async:status tsk) :running)
|
(== (async:status tsk) :running)
|
||||||
(async:stop tsk)
|
(async:wait-receive tsk)
|
||||||
(== (async:status tsk) :done))
|
|
||||||
(let ((tsk (async:make-task :handle-message
|
|
||||||
#'(lambda (tsk msg) (push msg (async:data tsk))))))
|
|
||||||
(== (async:status tsk) :new)
|
|
||||||
(async:start tsk)
|
|
||||||
(== (async:status tsk) :running)
|
|
||||||
(async:send tsk :hello)
|
|
||||||
(== (async:stop tsk) '(:hello))
|
|
||||||
(== (async:status tsk) :done))
|
(== (async:status tsk) :done))
|
||||||
;(async:finish)
|
;(async:finish)
|
||||||
)
|
)
|
||||||
|
|
|
@ -6,7 +6,7 @@
|
||||||
(:lp :lparallel)
|
(:lp :lparallel)
|
||||||
(:lpq :lparallel.queue))
|
(:lpq :lparallel.queue))
|
||||||
(:export #:init #:finish #:make-ch #:make-mb #:rcv #:snd #:submit-task
|
(:export #:init #:finish #:make-ch #:make-mb #:rcv #:snd #:submit-task
|
||||||
#:fg-task #:task #:make-task #:start #:stop #:status #:data #:send))
|
#:task #:make-task #:start #:status #:wait-receive))
|
||||||
|
|
||||||
(in-package :scopes/util/async)
|
(in-package :scopes/util/async)
|
||||||
|
|
||||||
|
@ -36,96 +36,28 @@
|
||||||
(defun submit-task (ch job)
|
(defun submit-task (ch job)
|
||||||
(lp:submit-task ch job))
|
(lp:submit-task ch job))
|
||||||
|
|
||||||
;;;; not used at the moment
|
;;;; task class and related functions
|
||||||
|
|
||||||
(defun receive-result
|
(defun receive-result (ch)
|
||||||
(lp:receive-result ch))
|
(lp:receive-result ch))
|
||||||
|
|
||||||
;;;; job - probably obsolete
|
(defclass task ()
|
||||||
|
((job :reader job :initarg :job)
|
||||||
(eval-when (:compile-toplevel :load-toplevel :execute)
|
|
||||||
(when (not (boundp '+quit-message+))
|
|
||||||
(defconstant +quit-message+ (gensym "QUIT"))))
|
|
||||||
|
|
||||||
(defun noop (&rest params))
|
|
||||||
|
|
||||||
(defun standard-job (tsk &key (startup #'noop) (teardown #'noop) handle-message)
|
|
||||||
(unwind-protect
|
|
||||||
(progn
|
|
||||||
(funcall startup tsk)
|
|
||||||
(when (mailbox tsk)
|
|
||||||
(do-listen tsk handle-message))
|
|
||||||
(data tsk))
|
|
||||||
(setf (status tsk) :done)
|
|
||||||
(funcall teardown tsk)))
|
|
||||||
|
|
||||||
(defun do-listen (tsk handle-message)
|
|
||||||
(handler-case
|
|
||||||
(loop for msg = (lpq:pop-queue (mailbox tsk))
|
|
||||||
until (eq msg +quit-message+)
|
|
||||||
do (funcall handle-message tsk msg))
|
|
||||||
(sb-sys:interactive-interrupt (condition)
|
|
||||||
(util:lgi condition))))
|
|
||||||
|
|
||||||
;;;; task classes and related functions / methods
|
|
||||||
|
|
||||||
(defclass fg-task ()
|
|
||||||
((job :accessor job)
|
|
||||||
(taskid :reader taskid :initform (gensym "TSK"))
|
(taskid :reader taskid :initform (gensym "TSK"))
|
||||||
(mailbox :accessor mailbox :initform nil)
|
|
||||||
(status :accessor status :initform :new)
|
(status :accessor status :initform :new)
|
||||||
(data :accessor data :initform nil)))
|
(channel :reader channel :initform (make-ch))))
|
||||||
|
|
||||||
(defclass task (fg-task)
|
(defun make-task (job &key (cls 'task))
|
||||||
((channel :reader channel :initform (lp:make-channel))))
|
(make-instance cls :job job))
|
||||||
|
|
||||||
(defun make-task (&key (startup #'noop) (teardown #'noop) handle-message
|
|
||||||
(cls 'task))
|
|
||||||
(let ((tsk (make-instance cls)))
|
|
||||||
(setf (job tsk)
|
|
||||||
(lambda () (standard-job tsk :startup startup :teardown teardown
|
|
||||||
:handle-message handle-message)))
|
|
||||||
(if handle-message
|
|
||||||
(setf (mailbox tsk) (make-mb)))
|
|
||||||
tsk))
|
|
||||||
|
|
||||||
(defun start (tsk)
|
(defun start (tsk)
|
||||||
(when (eq (status tsk) :running)
|
(when (eq (status tsk) :running)
|
||||||
(util:lgw "task already running" (taskid tsk))
|
(util:lgw "task already running" (taskid tsk))
|
||||||
(return-from start))
|
(return-from start))
|
||||||
(setf (status tsk) :running)
|
(setf (status tsk) :running)
|
||||||
(submit tsk))
|
(submit-task (channel tsk) (job tsk)))
|
||||||
|
|
||||||
(defgeneric submit (tsk)
|
|
||||||
(:method ((tsk fg-task))
|
|
||||||
(funcall (job tsk)))
|
|
||||||
(:method ((tsk task))
|
|
||||||
(submit-task (channel tsk) (job tsk))))
|
|
||||||
|
|
||||||
(defun stop (tsk &key (wait t))
|
|
||||||
(when (mailbox tsk)
|
|
||||||
(send tsk +quit-message+))
|
|
||||||
(when wait
|
|
||||||
(wait-result tsk)))
|
|
||||||
|
|
||||||
(defgeneric wait-result (tsk)
|
|
||||||
(:method ((tsk fg-task)))
|
|
||||||
(:method ((tsk task))
|
|
||||||
(lp:receive-result (channel tsk))))
|
|
||||||
|
|
||||||
(defun send (tsk msg)
|
|
||||||
(if (mailbox tsk)
|
|
||||||
(lpq:push-queue msg (mailbox tsk))
|
|
||||||
(util:lgw "task has no mailbox" (taskid tsk))))
|
|
||||||
|
|
||||||
;;; alternate implementation - may be removed
|
|
||||||
(defun do-listen-hb (tsk handle-message)
|
|
||||||
(handler-bind
|
|
||||||
((sb-sys:interactive-interrupt
|
|
||||||
(lambda (condition)
|
|
||||||
(util:lgi condition)
|
|
||||||
(return-from do-listen-hb))))
|
|
||||||
(loop for msg = (lpq:pop-queue (mailbox tsk))
|
|
||||||
until (eq msg +quit-message+)
|
|
||||||
do (funcall handle-message tsk msg))))
|
|
||||||
|
|
||||||
|
(defun wait-receive (tsk)
|
||||||
|
(let ((data (receive-result (channel tsk))))
|
||||||
|
(setf (status tsk) :done)
|
||||||
|
data))
|
||||||
|
|
Loading…
Add table
Reference in a new issue