;;;; cl-scopes/core/actor - basic actor definitions (defpackage :scopes/core/actor (:use :common-lisp) (:local-nicknames (:async :scopes/util/async) (:shape :scopes/shape) (:util :scopes/util)) (:export #:start #:stop #:create #:send #:become #:message #:content #:customer #:set-content #:*logger* #:*root* #:echo #:inc #:lgi #:calculator #:plus #:minus #:show)) (in-package :scopes/core/actor) ;;;; basic message implementation (defgeneric content (msg) (:method (msg) msg)) (defgeneric customer (msg) (:method (msg) nil)) (defclass message () ((content :reader content :initarg :content :initform nil) (customer :reader customer :initarg :customer :initform nil))) (defun message (content &optional customer) (make-instance 'message :content content :customer customer)) (defgeneric set-content (msg fn) (:method (msg fn) (funcall fn msg)) (:method ((msg message) fn) (message (funcall fn (content msg)) (customer msg)))) ;;;; actor loop (listener) (eval-when (:compile-toplevel :load-toplevel :execute) (when (not (boundp '+quit-message+)) (defconstant +quit-message+ (gensym "QUIT")))) (defun start (tsk bhv &key foreground) (setf (async:behavior tsk) bhv) (if foreground (ac-loop tsk bhv) (async:submit-task tsk #'ac-loop tsk bhv))) (defun stop (mb) (send mb +quit-message+)) (defvar *self* nil) (defgeneric ac-loop (tsk bhv) (:method ((tsk async:task) bhv) (let ((msg (async:rcv tsk))) (unless (eq (content msg) +quit-message+) (ac-step tsk bhv msg) (ac-loop tsk (async:behavior tsk)))))) (defun ac-step (tsk bhv msg) (let ((*self* tsk)) (handler-case (funcall bhv msg) (error (err) ;(util:lg :error "behavior" msg err) (invoke-debugger err)) ))) ;;;; the core (classical, i.e. Hewitt) actor API (defun create (bhv) (async:make-task bhv :restartable t)) (defgeneric send (tsk msg) (:method ((tsk async:task) msg) (async:snd tsk msg))) (defun become (bhv) (setf (async:behavior *self*) bhv)) ;;;; handling restartable tasks (defmethod ac-loop ((tsk async:restartable-task) bhv) (async:get-status tsk) ; wait / lock status (multiple-value-bind (msg ok) (async:try-rcv tsk) (if ok (progn (async:set-status tsk :running) (ac-step tsk bhv msg) (ac-loop tsk (async:behavior tsk))) (async:set-status tsk :suspended)))) (defmethod send ((tsk async:restartable-task) msg) (let ((status (async:get-status tsk))) (async:snd tsk msg) (unless (eq status :running) (async:try-receive-result tsk) (async:submit-task tsk (lambda () (ac-loop tsk (async:behavior tsk))))) (async:set-status tsk :running))) ;;;; predefined behaviors (defun no-op (msg)) (defun lgi (msg) (util:lgi (content msg))) (defun echo (msg) (send (customer msg) msg)) ;;;; example behavior: calculator (defun calculator (&optional (val 0)) (lambda (msg) ;(format t "calc ~a ~a~%" val (content msg)) (destructuring-bind (fn &optional param) (content msg) (become (funcall fn msg val param))))) (defun plus (msg val param) (calculator (+ val param))) (defun minus (msg val param) (calculator (- val param))) (defun show (msg val param) (send (customer msg) (message val)))