cl-scopes/core/actor.lisp

120 lines
3.3 KiB
Common Lisp

;;;; cl-scopes/core/actor - basic actor definitions
(defpackage :scopes/core/actor
(:use :common-lisp)
(:local-nicknames (:async :scopes/util/async)
(:shape :scopes/shape)
(:util :scopes/util))
(:export #:start #:stop #:create #:send #:become
#:message #:content #:customer #:set-content
#:*logger* #:*root*
#:echo #:inc #:lgi
#:calculator #:plus #:minus #:show))
(in-package :scopes/core/actor)
;;;; basic message implementation
(defgeneric content (msg)
(:method (msg) msg))
(defgeneric customer (msg)
(:method (msg) nil))
(defclass message ()
((content :reader content :initarg :content :initform nil)
(customer :reader customer :initarg :customer :initform nil)))
(defun message (content &optional customer)
(make-instance 'message :content content :customer customer))
(defgeneric set-content (msg fn)
(:method (msg fn) (funcall fn msg))
(:method ((msg message) fn)
(message (funcall fn (content msg)) (customer msg))))
;;;; actor loop (listener)
(eval-when (:compile-toplevel :load-toplevel :execute)
(when (not (boundp '+quit-message+))
(defconstant +quit-message+ (gensym "QUIT"))))
(defun start (tsk bhv &key foreground)
(if foreground
(ac-loop tsk bhv)
(async:submit-task tsk #'ac-loop tsk bhv)))
(defun stop (mb)
(send mb +quit-message+))
(defvar *self* nil)
(defgeneric ac-loop (tsk bhv)
(:method ((tsk async:mailbox) bhv)
(let ((msg (async:rcv tsk))
result)
(unless (eq (content msg) +quit-message+)
(let ((*self* tsk))
(setf result (funcall bhv msg)))
(ac-loop tsk (or result bhv))))))
;;;; the core (classical, i.e. Hewitt) actor API
(defun create (bhv)
(async:make-task bhv :restartable t))
(defgeneric send (tsk msg)
(:method ((tsk async:mailbox) msg)
(async:snd tsk msg)))
(defun become (bhv)
(setf (async:behavior *self*) bhv))
;;;; handling restartable tasks
(defmethod ac-loop ((tsk async:restartable-task) bhv)
(async:get-status tsk) ; wait / lock status
(multiple-value-bind (msg ok) (async:try-rcv tsk)
(if ok
(progn
(async:set-status tsk :running)
(let ((*self* tsk))
(handler-case (funcall bhv msg)
(error (error) (util:lg :error "handling message" msg error)))
(ac-loop tsk (async:behavior tsk))))
(async:set-status tsk :suspended))))
(defmethod send ((tsk async:restartable-task) msg)
(let ((status (async:get-status tsk)))
(async:snd tsk msg)
(unless (eq status :running)
(async:try-receive-result tsk)
(async:submit-task
tsk (lambda () (ac-loop tsk (async:behavior tsk)))))
(async:set-status tsk :running)))
;;;; predefined behaviors
(defun no-op (msg))
(defun lgi (msg)
(util:lgi (content msg)))
(defun echo (msg)
(send (customer msg) msg))
;;;; example behavior: calculator
(defun calculator (&optional (val 0))
(lambda (msg)
;(format t "calc ~a ~a~%" val (content msg))
(destructuring-bind (fn &optional param) (content msg)
(become (funcall fn msg val param)))))
(defun plus (msg val param)
(calculator (+ val param)))
(defun minus (msg val param)
(calculator (- val param)))
(defun show (msg val param)
(send (customer msg) (message val)))