more on new actor implementation

This commit is contained in:
Helmut Merz 2025-06-01 16:09:52 +02:00
parent 751163b801
commit 386d286fe6
4 changed files with 120 additions and 32 deletions

View file

@ -2,9 +2,11 @@
(defpackage :scopes/core/actor-ng (defpackage :scopes/core/actor-ng
(:use :common-lisp) (:use :common-lisp)
(:local-nicknames (:async :scopes/util/async) (:local-nicknames (:async :scopes/util/async)
(:lp :lparallel)
(:lpq :lparallel.queue)
(:util :scopes/util)) (:util :scopes/util))
(:export #:ac-loop #:become #:create #:send (:export #:start #:stop #:become #:create #:send
#:message #:content #:customer #:message #:content #:customer
#:*logger* #:*root* #:*logger* #:*root*
#:echo #:inc #:lgi #:echo #:inc #:lgi
@ -12,19 +14,85 @@
(in-package :scopes/core/actor-ng) (in-package :scopes/core/actor-ng)
;;;; virtual actor - async:task + behavior (eval-when (:compile-toplevel :load-toplevel :execute)
(async:init))
;;;; basic message implementation
(defclass message ()
((content :reader content :initarg :content :initform nil)
(customer :reader customer :initarg :customer :initform nil)))
(defun message (content &optional customer)
(make-instance 'message :content content :customer customer))
;;;; actor loop (listener)
(eval-when (:compile-toplevel :load-toplevel :execute) (eval-when (:compile-toplevel :load-toplevel :execute)
(when (not (boundp '+quit-message+)) (when (not (boundp '+quit-message+))
(defconstant +quit-message+ (gensym "QUIT")))) (defconstant +quit-message+ (gensym "QUIT"))))
(defun ac-loop (tsk bhv) (defun start (mb bhv &key fore-ground)
(let ((next (ac-step tsk bhv))) (if fore-ground
(ac-loop mb bhv)
(let ((ch (lp:make-channel)))
(lp:submit-task ch (lambda () (ac-loop mb bhv)))
ch)))
(defun stop (mb)
(send mb +quit-message+))
(defun ac-loop (mb bhv)
(let ((next (ac-step mb bhv)))
(unless (eq next +quit-message+) (unless (eq next +quit-message+)
(ac-loop tsk (or next bhv))))) (ac-loop mb (or next bhv)))))
(defun ac-step (tsk bhv) (defun ac-step (mb bhv)
(let ((msg (async:receive tsk))) (let ((msg (lpq:pop-queue mb)))
(funcall bhv tsk msg))) (funcall bhv msg)))
;;;; the core (classical, i.e. Hewitt) actor API
;;; there is no `become` operation: the behavior just returns the new behavior
(defun create (bhv)
(let ((mb (lpq:make-queue)))
(values mb (start mb bhv))))
(defun send (mb msg)
(lpq:push-queue msg mb))
;;;; predefined behaviors
(defun no-op (msg))
(defun lgi (msg)
(util:lgi (content msg)))
(defun echo (msg)
(send (customer msg) msg))
;;;; predefined global actors
(defvar *logger* (create #'lgi))
(defun root-bhv (ac msg)
(send *logger* msg))
(defvar *root* (create #'root-bhv))
;;;; example behavior: calculator
(defun calculator (&optional (val 0))
#'(lambda (msg)
(destructuring-bind (fn &optional param) (content msg)
(funcall fn msg val param))))
(defun plus (msg val param)
(calculator (+ val param)))
(defun minus (msg val param)
(calculator (- val param)))
(defun show (msg val param)
(send (or (customer msg) *logger*) val))
(defun send-value (msg val param)
(send (customer msg) val))

View file

@ -49,7 +49,7 @@
(defun make-actor (bhv &optional (cls 'bg-actor) &rest args &key &allow-other-keys) (defun make-actor (bhv &optional (cls 'bg-actor) &rest args &key &allow-other-keys)
(apply #'make-instance cls :behavior bhv args)) (apply #'make-instance cls :behavior bhv args))
(defun make-task (ac &optional (cls 'async:bg-task)) (defun make-task (ac &optional (cls 'async:task))
(async:make-task :cls cls (async:make-task :cls cls
:handle-message :handle-message
#'(lambda (ax msg) (funcall (behavior ac) ac msg)))) #'(lambda (ax msg) (funcall (behavior ac) ac msg))))

View file

@ -54,6 +54,7 @@
((receiver :accessor receiver :initarg :receiver))) ((receiver :accessor receiver :initarg :receiver)))
(defun run () (defun run ()
(async:init)
(let* ((t:*test-suite* (make-instance 'test-suite :name "core"))) (let* ((t:*test-suite* (make-instance 'test-suite :name "core")))
(load (t:test-path "config-core" "etc")) (load (t:test-path "config-core" "etc"))
(unwind-protect (unwind-protect
@ -89,7 +90,7 @@
(== pl '(:b 1 :a 0)))) (== pl '(:b 1 :a 0))))
(deftest test-util-async () (deftest test-util-async ()
(async:init) ;(async:init)
(let ((tsk (async:make-task :startup (lambda (&rest args) (sleep 0.01))))) (let ((tsk (async:make-task :startup (lambda (&rest args) (sleep 0.01)))))
(== (async:status tsk) :new) (== (async:status tsk) :new)
(async:start tsk) (async:start tsk)
@ -104,7 +105,8 @@
(async:send tsk :hello) (async:send tsk :hello)
(== (async:stop tsk) '(:hello)) (== (async:stop tsk) '(:hello))
(== (async:status tsk) :done)) (== (async:status tsk) :done))
(async:finish)) ;(async:finish)
)
(deftest test-util-crypt () (deftest test-util-crypt ()
(let ((s1 (crypt:create-secret)) (let ((s1 (crypt:create-secret))
@ -142,7 +144,12 @@
(== val -1) (== val -1)
)) ))
(deftest test-actor ()) (deftest test-actor ()
;(async:init)
(let (calc (actor:create (actor:calculator)))
;(actor:stop calc)
)
)
(deftest test-send () (deftest test-send ()
(let ((rcvr (receiver t:*test-suite*)) (let ((rcvr (receiver t:*test-suite*))

View file

@ -5,25 +5,41 @@
(:local-nicknames (:util :scopes/util) (:local-nicknames (:util :scopes/util)
(:lp :lparallel) (:lp :lparallel)
(:lpq :lparallel.queue)) (:lpq :lparallel.queue))
(:export #:init #:finish #:bg-task #:fg-task #:receive (:export #:init #:finish #:make-mb #:receive #:submit-task
#:fg-task #:task
#:make-task #:start #:stop #:status #:data #:send)) #:make-task #:start #:stop #:status #:data #:send))
(in-package :scopes/util/async) (in-package :scopes/util/async)
;;;; general definitions ;;;; general definitions (lparallel wrappers)
(defun init ()
(when (null lp:*kernel*)
(format t "async:init ~a ~%"
(setf lp:*kernel* (lp:make-kernel (serapeum:count-cpus))))
))
(defun finish ()
(when lp:*kernel*
(lp:end-kernel)
;(setf lp:*kernel* nil)
))
(defun make-mb ()
(lpq:make-queue))
(defun receive (mb)
(lpq:pop-queue mb))
(defun submit-task (ch job)
(lp:submit-task ch job))
;;;; job - probably obsolete
(eval-when (:compile-toplevel :load-toplevel :execute) (eval-when (:compile-toplevel :load-toplevel :execute)
(when (not (boundp '+quit-message+)) (when (not (boundp '+quit-message+))
(defconstant +quit-message+ (gensym "QUIT")))) (defconstant +quit-message+ (gensym "QUIT"))))
(defun init ()
(when (null lp:*kernel*)
(setf lp:*kernel* (lp:make-kernel (serapeum:count-cpus)))))
(defun finish ()
(when lp:*kernel*
(lp:end-kernel)))
(defun noop (&rest params)) (defun noop (&rest params))
(defun standard-job (tsk &key (startup #'noop) (teardown #'noop) handle-message) (defun standard-job (tsk &key (startup #'noop) (teardown #'noop) handle-message)
@ -53,20 +69,17 @@
(status :accessor status :initform :new) (status :accessor status :initform :new)
(data :accessor data :initform nil))) (data :accessor data :initform nil)))
(defun receive (tsk) (defclass task (fg-task)
(lpq:pop-queue (mailbox tsk)))
(defclass bg-task (fg-task)
((channel :reader channel :initform (lp:make-channel)))) ((channel :reader channel :initform (lp:make-channel))))
(defun make-task (&key (startup #'noop) (teardown #'noop) handle-message (defun make-task (&key (startup #'noop) (teardown #'noop) handle-message
(cls 'bg-task)) (cls 'task))
(let ((tsk (make-instance cls))) (let ((tsk (make-instance cls)))
(setf (job tsk) (setf (job tsk)
(lambda () (standard-job tsk :startup startup :teardown teardown (lambda () (standard-job tsk :startup startup :teardown teardown
:handle-message handle-message))) :handle-message handle-message)))
(if handle-message (if handle-message
(setf (mailbox tsk) (lpq:make-queue))) (setf (mailbox tsk) (make-mb)))
tsk)) tsk))
(defun start (tsk) (defun start (tsk)
@ -79,8 +92,8 @@
(defgeneric submit (tsk) (defgeneric submit (tsk)
(:method ((tsk fg-task)) (:method ((tsk fg-task))
(funcall (job tsk))) (funcall (job tsk)))
(:method ((tsk bg-task)) (:method ((tsk task))
(lp:submit-task (channel tsk) (job tsk)))) (submit-task (channel tsk) (job tsk))))
(defun stop (tsk &key (wait t)) (defun stop (tsk &key (wait t))
(when (mailbox tsk) (when (mailbox tsk)
@ -90,7 +103,7 @@
(defgeneric wait-result (tsk) (defgeneric wait-result (tsk)
(:method ((tsk fg-task))) (:method ((tsk fg-task)))
(:method ((tsk bg-task)) (:method ((tsk task))
(lp:receive-result (channel tsk)))) (lp:receive-result (channel tsk))))
(defun send (tsk msg) (defun send (tsk msg)