start working on classical (Hewitt) actor stuff; + async improvements

This commit is contained in:
Helmut Merz 2025-04-12 10:30:08 +02:00
parent 1a5ff88f03
commit 66dcb7672a
4 changed files with 53 additions and 19 deletions

23
core/actor.lisp Normal file
View file

@ -0,0 +1,23 @@
;;;; cl-scopes/core/actor - basic actor definitions
(defpackage :scopes/core/actor
(:use :common-lisp)
(:export #:actor #:behave #:create #:send
#:handle-message))
(in-package :scopes/core/actor)
(defclass actor ()
((behavior :accessor behavior :initarg :behavior)))
(defun behave (ac new-bhv)
(setf (behavior ac) new-bhv))
(defun create (ac-cls bhv &rest params)
(make-instance ac-cls :behavior bhv))
(defun send (addr msg &key customer))
;;;; behaviors
(defun handle-message (ac msg))

View file

@ -10,11 +10,14 @@
:flexi-streams :ironclad :local-time :log4cl :flexi-streams :ironclad :local-time :log4cl
:lparallel :qbase64 :serapeum :str) :lparallel :qbase64 :serapeum :str)
:components ((:file "config" :depends-on ("util/util")) :components ((:file "config" :depends-on ("util/util"))
(:file "core/actor"
:depends-on ("shape/shape" "util/async" "util/util"))
(:file "core/core" (:file "core/core"
:depends-on ("core/message" "config" :depends-on ("config"
"core/actor" "core/message"
"forge/forge" "logging" "forge/forge" "logging"
"util/async" "util/util")) "util/async" "util/util"))
(:file "core/message" :depends-on ("shape/shape")) (:file "core/message" :depends-on ("core/actor" "shape/shape"))
(:file "forge/forge" :depends-on ("util/iter" "util/util")) (:file "forge/forge" :depends-on ("util/iter" "util/util"))
(:file "logging" :depends-on ("config" "util/util")) (:file "logging" :depends-on ("config" "util/util"))
(:file "shape/shape") (:file "shape/shape")

View file

@ -3,6 +3,7 @@
(defpackage :scopes/test-core (defpackage :scopes/test-core
(:use :common-lisp) (:use :common-lisp)
(:local-nicknames (:alx :alexandria) (:local-nicknames (:alx :alexandria)
(:actor :scopes/core/actor)
(:async :scopes/util/async) (:async :scopes/util/async)
(:config :scopes/config) (:config :scopes/config)
(:core :scopes/core) (:core :scopes/core)
@ -61,6 +62,7 @@
(test-util-crypt) (test-util-crypt)
(test-util-iter) (test-util-iter)
(test-shape) (test-shape)
(test-actor)
(core:setup-services) (core:setup-services)
(setf (receiver t:*test-suite*) (core:find-service :test-receiver)) (setf (receiver t:*test-suite*) (core:find-service :test-receiver))
(test-send)) (test-send))
@ -117,13 +119,17 @@
(== (iter:next it) nil) (== (iter:next it) nil)
(== (string (iter:value it)) "A"))) (== (string (iter:value it)) "A")))
(deftest test-shape() (deftest test-shape ()
(let ((rec (make-instance 'shape:record :head '(:t1)))) (let ((rec (make-instance 'shape:record :head '(:t1))))
(== (shape:head rec) '(:t1 nil)) (== (shape:head rec) '(:t1 nil))
(== (shape:head-value rec :taskid) :t1) (== (shape:head-value rec :taskid) :t1)
(setf (shape:head-value rec :username) :u1) (setf (shape:head-value rec :username) :u1)
(== (shape:head-value rec :username) :u1))) (== (shape:head-value rec :username) :u1)))
(deftest test-actor ()
(let (a1 a2 a3)
(setf a1 (actor:create 'actor:actor 'actor:handle-message))))
(deftest test-send () (deftest test-send ()
(let ((rcvr (receiver t:*test-suite*)) (let ((rcvr (receiver t:*test-suite*))
(msg (message:create '(:test :dummy) :data "dummy payload")) (msg (message:create '(:test :dummy) :data "dummy payload"))

View file

@ -5,7 +5,7 @@
(:local-nicknames (:util :scopes/util) (:local-nicknames (:util :scopes/util)
(:lp :lparallel) (:lp :lparallel)
(:lpq :lparallel.queue)) (:lpq :lparallel.queue))
(:export #:init #:finish #:task #:async-task (:export #:init #:finish #:task #:bg-task
#:make-task #:start #:stop #:status #:data #:send)) #:make-task #:start #:stop #:status #:data #:send))
(in-package :scopes/util/async) (in-package :scopes/util/async)
@ -31,22 +31,12 @@
(progn (progn
(funcall startup tsk) (funcall startup tsk)
(when (mailbox tsk) (when (mailbox tsk)
(do-listen-hc tsk handle-message)) (do-listen tsk handle-message))
(data tsk)) (data tsk))
(setf (status tsk) :done) (setf (status tsk) :done)
(funcall teardown tsk))) (funcall teardown tsk)))
(defun do-listen (tsk handle-message) (defun do-listen (tsk handle-message)
(handler-bind
((sb-sys:interactive-interrupt
(lambda (condition)
(util:lgi condition)
(return-from do-listen))))
(loop for msg = (lpq:pop-queue (mailbox tsk))
until (eq msg +quit-message+)
do (funcall handle-message tsk msg))))
(defun do-listen-hc (tsk handle-message)
(handler-case (handler-case
(loop for msg = (lpq:pop-queue (mailbox tsk)) (loop for msg = (lpq:pop-queue (mailbox tsk))
until (eq msg +quit-message+) until (eq msg +quit-message+)
@ -63,11 +53,11 @@
(status :accessor status :initform :new) (status :accessor status :initform :new)
(data :accessor data :initform nil))) (data :accessor data :initform nil)))
(defclass async-task (task) (defclass bg-task (task)
((channel :reader channel :initform (lp:make-channel)))) ((channel :reader channel :initform (lp:make-channel))))
(defun make-task (&key (startup #'noop) (teardown #'noop) handle-message (defun make-task (&key (startup #'noop) (teardown #'noop) handle-message
(cls 'async-task)) (cls 'bg-task))
(let ((tsk (make-instance cls))) (let ((tsk (make-instance cls)))
(setf (job tsk) (setf (job tsk)
(lambda () (standard-job tsk :startup startup :teardown teardown (lambda () (standard-job tsk :startup startup :teardown teardown
@ -86,7 +76,7 @@
(defgeneric submit (tsk) (defgeneric submit (tsk)
(:method ((tsk task)) (:method ((tsk task))
(funcall (job tsk))) (funcall (job tsk)))
(:method ((tsk async-task)) (:method ((tsk bg-task))
(lp:submit-task (channel tsk) (job tsk)))) (lp:submit-task (channel tsk) (job tsk))))
(defun stop (tsk &key (wait t)) (defun stop (tsk &key (wait t))
@ -97,10 +87,22 @@
(defgeneric wait-result (tsk) (defgeneric wait-result (tsk)
(:method ((tsk task))) (:method ((tsk task)))
(:method ((tsk async-task)) (:method ((tsk bg-task))
(lp:receive-result (channel tsk)))) (lp:receive-result (channel tsk))))
(defun send (tsk msg) (defun send (tsk msg)
(if (mailbox tsk) (if (mailbox tsk)
(lpq:push-queue msg (mailbox tsk)) (lpq:push-queue msg (mailbox tsk))
(util:lgw "task has no mailbox" (taskid tsk)))) (util:lgw "task has no mailbox" (taskid tsk))))
;;; alternate implementation - may be removed
(defun do-listen-hb (tsk handle-message)
(handler-bind
((sb-sys:interactive-interrupt
(lambda (condition)
(util:lgi condition)
(return-from do-listen-hb))))
(loop for msg = (lpq:pop-queue (mailbox tsk))
until (eq msg +quit-message+)
do (funcall handle-message tsk msg))))