diff --git a/core/actor-ng.lisp b/core/actor-ng.lisp index 504d277..1ca2842 100644 --- a/core/actor-ng.lisp +++ b/core/actor-ng.lisp @@ -2,9 +2,11 @@ (defpackage :scopes/core/actor-ng (:use :common-lisp) - (:local-nicknames (:async :scopes/util/async) + (:local-nicknames (:async :scopes/util/async) + (:lp :lparallel) + (:lpq :lparallel.queue) (:util :scopes/util)) - (:export #:ac-loop #:become #:create #:send + (:export #:start #:stop #:become #:create #:send #:message #:content #:customer #:*logger* #:*root* #:echo #:inc #:lgi @@ -12,19 +14,85 @@ (in-package :scopes/core/actor-ng) -;;;; virtual actor - async:task + behavior +(eval-when (:compile-toplevel :load-toplevel :execute) + (async:init)) + +;;;; basic message implementation + +(defclass message () + ((content :reader content :initarg :content :initform nil) + (customer :reader customer :initarg :customer :initform nil))) + +(defun message (content &optional customer) + (make-instance 'message :content content :customer customer)) + +;;;; actor loop (listener) (eval-when (:compile-toplevel :load-toplevel :execute) (when (not (boundp '+quit-message+)) (defconstant +quit-message+ (gensym "QUIT")))) -(defun ac-loop (tsk bhv) - (let ((next (ac-step tsk bhv))) +(defun start (mb bhv &key fore-ground) + (if fore-ground + (ac-loop mb bhv) + (let ((ch (lp:make-channel))) + (lp:submit-task ch (lambda () (ac-loop mb bhv))) + ch))) + +(defun stop (mb) + (send mb +quit-message+)) + +(defun ac-loop (mb bhv) + (let ((next (ac-step mb bhv))) (unless (eq next +quit-message+) - (ac-loop tsk (or next bhv))))) + (ac-loop mb (or next bhv))))) -(defun ac-step (tsk bhv) - (let ((msg (async:receive tsk))) - (funcall bhv tsk msg))) +(defun ac-step (mb bhv) + (let ((msg (lpq:pop-queue mb))) + (funcall bhv msg))) +;;;; the core (classical, i.e. Hewitt) actor API +;;; there is no `become` operation: the behavior just returns the new behavior + +(defun create (bhv) + (let ((mb (lpq:make-queue))) + (values mb (start mb bhv)))) + +(defun send (mb msg) + (lpq:push-queue msg mb)) + +;;;; predefined behaviors + +(defun no-op (msg)) + +(defun lgi (msg) + (util:lgi (content msg))) + +(defun echo (msg) + (send (customer msg) msg)) + +;;;; predefined global actors + +(defvar *logger* (create #'lgi)) + +(defun root-bhv (ac msg) + (send *logger* msg)) + +(defvar *root* (create #'root-bhv)) + +;;;; example behavior: calculator + +(defun calculator (&optional (val 0)) + #'(lambda (msg) + (destructuring-bind (fn &optional param) (content msg) + (funcall fn msg val param)))) + +(defun plus (msg val param) + (calculator (+ val param))) +(defun minus (msg val param) + (calculator (- val param))) +(defun show (msg val param) + (send (or (customer msg) *logger*) val)) +(defun send-value (msg val param) + (send (customer msg) val)) diff --git a/core/actor.lisp b/core/actor.lisp index c01ae00..634c3f4 100644 --- a/core/actor.lisp +++ b/core/actor.lisp @@ -49,7 +49,7 @@ (defun make-actor (bhv &optional (cls 'bg-actor) &rest args &key &allow-other-keys) (apply #'make-instance cls :behavior bhv args)) -(defun make-task (ac &optional (cls 'async:bg-task)) +(defun make-task (ac &optional (cls 'async:task)) (async:make-task :cls cls :handle-message #'(lambda (ax msg) (funcall (behavior ac) ac msg)))) diff --git a/test/test-core.lisp b/test/test-core.lisp index 227ce34..96b040d 100644 --- a/test/test-core.lisp +++ b/test/test-core.lisp @@ -54,6 +54,7 @@ ((receiver :accessor receiver :initarg :receiver))) (defun run () + (async:init) (let* ((t:*test-suite* (make-instance 'test-suite :name "core"))) (load (t:test-path "config-core" "etc")) (unwind-protect @@ -89,7 +90,7 @@ (== pl '(:b 1 :a 0)))) (deftest test-util-async () - (async:init) + ;(async:init) (let ((tsk (async:make-task :startup (lambda (&rest args) (sleep 0.01))))) (== (async:status tsk) :new) (async:start tsk) @@ -104,7 +105,8 @@ (async:send tsk :hello) (== (async:stop tsk) '(:hello)) (== (async:status tsk) :done)) - (async:finish)) + ;(async:finish) + ) (deftest test-util-crypt () (let ((s1 (crypt:create-secret)) @@ -142,7 +144,12 @@ (== val -1) )) -(deftest test-actor ()) +(deftest test-actor () + ;(async:init) + (let (calc (actor:create (actor:calculator))) + ;(actor:stop calc) + ) +) (deftest test-send () (let ((rcvr (receiver t:*test-suite*)) diff --git a/util/async.lisp b/util/async.lisp index 32f8e50..97c0bf2 100644 --- a/util/async.lisp +++ b/util/async.lisp @@ -5,25 +5,41 @@ (:local-nicknames (:util :scopes/util) (:lp :lparallel) (:lpq :lparallel.queue)) - (:export #:init #:finish #:bg-task #:fg-task #:receive + (:export #:init #:finish #:make-mb #:receive #:submit-task + #:fg-task #:task #:make-task #:start #:stop #:status #:data #:send)) (in-package :scopes/util/async) -;;;; general definitions +;;;; general definitions (lparallel wrappers) + +(defun init () + (when (null lp:*kernel*) + (format t "async:init ~a ~%" + (setf lp:*kernel* (lp:make-kernel (serapeum:count-cpus)))) + )) + +(defun finish () + (when lp:*kernel* + (lp:end-kernel) + ;(setf lp:*kernel* nil) +)) + +(defun make-mb () + (lpq:make-queue)) + +(defun receive (mb) + (lpq:pop-queue mb)) + +(defun submit-task (ch job) + (lp:submit-task ch job)) + +;;;; job - probably obsolete (eval-when (:compile-toplevel :load-toplevel :execute) (when (not (boundp '+quit-message+)) (defconstant +quit-message+ (gensym "QUIT")))) -(defun init () - (when (null lp:*kernel*) - (setf lp:*kernel* (lp:make-kernel (serapeum:count-cpus))))) - -(defun finish () - (when lp:*kernel* - (lp:end-kernel))) - (defun noop (&rest params)) (defun standard-job (tsk &key (startup #'noop) (teardown #'noop) handle-message) @@ -53,20 +69,17 @@ (status :accessor status :initform :new) (data :accessor data :initform nil))) -(defun receive (tsk) - (lpq:pop-queue (mailbox tsk))) - -(defclass bg-task (fg-task) +(defclass task (fg-task) ((channel :reader channel :initform (lp:make-channel)))) (defun make-task (&key (startup #'noop) (teardown #'noop) handle-message - (cls 'bg-task)) + (cls 'task)) (let ((tsk (make-instance cls))) (setf (job tsk) (lambda () (standard-job tsk :startup startup :teardown teardown :handle-message handle-message))) (if handle-message - (setf (mailbox tsk) (lpq:make-queue))) + (setf (mailbox tsk) (make-mb))) tsk)) (defun start (tsk) @@ -79,8 +92,8 @@ (defgeneric submit (tsk) (:method ((tsk fg-task)) (funcall (job tsk))) - (:method ((tsk bg-task)) - (lp:submit-task (channel tsk) (job tsk)))) + (:method ((tsk task)) + (submit-task (channel tsk) (job tsk)))) (defun stop (tsk &key (wait t)) (when (mailbox tsk) @@ -90,7 +103,7 @@ (defgeneric wait-result (tsk) (:method ((tsk fg-task))) - (:method ((tsk bg-task)) + (:method ((tsk task)) (lp:receive-result (channel tsk)))) (defun send (tsk msg)