;;;; cl-scopes/core/actor - basic actor definitions (defpackage :scopes/core/actor (:use :common-lisp) (:local-nicknames (:async :scopes/util/async) (:shape :scopes/shape) (:util :scopes/util)) (:export #:start #:stop #:create #:send #:become #:message #:content #:customer #:set-content #:*logger* #:*root* #:echo #:inc #:lgi #:calculator #:plus #:minus #:show)) (in-package :scopes/core/actor) ;;;; basic message implementation (defgeneric content (msg) (:method (msg) msg)) (defgeneric customer (msg) (:method (msg) nil)) (defclass message () ((content :reader content :initarg :content :initform nil) (customer :reader customer :initarg :customer :initform nil))) (defun message (content &optional customer) (make-instance 'message :content content :customer customer)) (defgeneric set-content (msg fn) (:method (msg fn) (funcall fn msg)) (:method ((msg message) fn) (message (funcall fn (content msg)) (customer msg)))) ;;;; actor loop (listener) (eval-when (:compile-toplevel :load-toplevel :execute) (when (not (boundp '+quit-message+)) (defconstant +quit-message+ (gensym "QUIT")))) (defun start (tsk bhv &key foreground) (if foreground (ac-loop tsk bhv) (async:submit-task tsk #'ac-loop tsk bhv))) (defun stop (mb) (send mb +quit-message+)) (defvar *self* nil) (defgeneric ac-loop (tsk bhv) (:method ((tsk async:mailbox) bhv) (let ((*self* tsk) (msg (async:rcv tsk))) (unless (eq (content msg) +quit-message+) (ac-loop tsk (or (funcall bhv msg) bhv)))))) (defgeneric set-bhv (tsk bhv) (:method ((tsk async:mailbox) bhv))) ;;;; the core (classical, i.e. Hewitt) actor API (defun create (bhv) (let ((tsk (async:make-task bhv))) (start tsk bhv) tsk)) (defgeneric send (tsk msg) (:method ((tsk async:mailbox) msg) ;(util:lgi msg) (async:snd tsk msg))) (defun become (bhv) (set-bhv *self* bhv) bhv) ;;;; handling restartable tasks (defmethod ac-loop ((tsk async:restartable-task) bhv) (async:get-status tsk) ; wait for end of concurrent activities (multiple-value-bind (msg ok) (async:try-rcv tsk) (if ok (if (eq (content msg) +quit-message+) (progn (async:set-status tsk :stopped) nil) (let ((*self* tsk)) (async:set-status tsk :running) (ac-loop tsk (or (funcall bhv msg) bhv)))) (progn (async:set-status tsk :suspended) bhv)))) (defmethod send ((tsk async:restartable-task) msg) (let ((status (async:get-status tsk))) (when (eq status :stopped) (util:lgw "trying to send message to stopped task") (async:set-status tsk :stopped) (return-from send)) (async:snd tsk msg) (when (or (eq status :new) (eq status :suspended)) (async:try-receive-result tsk) (async:submit-task tsk (lambda () (ac-loop tsk (async:behavior tsk))))) (async:set-status :running))) (defmethod set-bhv ((tsk async:task) bhv) (setf (async:behavior tsk) bhv)) ;;;; predefined behaviors (defun no-op (msg)) (defun lgi (msg) (util:lgi (content msg))) (defun echo (msg) (send (customer msg) msg)) ;;;; example behavior: calculator (defun calculator (&optional (val 0)) (lambda (msg) ;(format t "calc ~a ~a~%" val (content msg)) (destructuring-bind (fn &optional param) (content msg) (become (funcall fn msg val param))))) (defun plus (msg val param) (calculator (+ val param))) (defun minus (msg val param) (calculator (- val param))) (defun show (msg val param) (send (customer msg ) (message val)))