Compare commits
2 commits
0c35d7bf97
...
386d286fe6
Author | SHA1 | Date | |
---|---|---|---|
386d286fe6 | |||
751163b801 |
6 changed files with 159 additions and 32 deletions
98
core/actor-ng.lisp
Normal file
98
core/actor-ng.lisp
Normal file
|
@ -0,0 +1,98 @@
|
||||||
|
;;;; cl-scopes/core/actor - basic actor definitions
|
||||||
|
|
||||||
|
(defpackage :scopes/core/actor-ng
|
||||||
|
(:use :common-lisp)
|
||||||
|
(:local-nicknames (:async :scopes/util/async)
|
||||||
|
(:lp :lparallel)
|
||||||
|
(:lpq :lparallel.queue)
|
||||||
|
(:util :scopes/util))
|
||||||
|
(:export #:start #:stop #:become #:create #:send
|
||||||
|
#:message #:content #:customer
|
||||||
|
#:*logger* #:*root*
|
||||||
|
#:echo #:inc #:lgi
|
||||||
|
#:calculator #:plus #:minus #:show #:send-value))
|
||||||
|
|
||||||
|
(in-package :scopes/core/actor-ng)
|
||||||
|
|
||||||
|
(eval-when (:compile-toplevel :load-toplevel :execute)
|
||||||
|
(async:init))
|
||||||
|
|
||||||
|
;;;; basic message implementation
|
||||||
|
|
||||||
|
(defclass message ()
|
||||||
|
((content :reader content :initarg :content :initform nil)
|
||||||
|
(customer :reader customer :initarg :customer :initform nil)))
|
||||||
|
|
||||||
|
(defun message (content &optional customer)
|
||||||
|
(make-instance 'message :content content :customer customer))
|
||||||
|
|
||||||
|
;;;; actor loop (listener)
|
||||||
|
|
||||||
|
(eval-when (:compile-toplevel :load-toplevel :execute)
|
||||||
|
(when (not (boundp '+quit-message+))
|
||||||
|
(defconstant +quit-message+ (gensym "QUIT"))))
|
||||||
|
|
||||||
|
(defun start (mb bhv &key fore-ground)
|
||||||
|
(if fore-ground
|
||||||
|
(ac-loop mb bhv)
|
||||||
|
(let ((ch (lp:make-channel)))
|
||||||
|
(lp:submit-task ch (lambda () (ac-loop mb bhv)))
|
||||||
|
ch)))
|
||||||
|
|
||||||
|
(defun stop (mb)
|
||||||
|
(send mb +quit-message+))
|
||||||
|
|
||||||
|
(defun ac-loop (mb bhv)
|
||||||
|
(let ((next (ac-step mb bhv)))
|
||||||
|
(unless (eq next +quit-message+)
|
||||||
|
(ac-loop mb (or next bhv)))))
|
||||||
|
|
||||||
|
(defun ac-step (mb bhv)
|
||||||
|
(let ((msg (lpq:pop-queue mb)))
|
||||||
|
(funcall bhv msg)))
|
||||||
|
|
||||||
|
;;;; the core (classical, i.e. Hewitt) actor API
|
||||||
|
;;; there is no `become` operation: the behavior just returns the new behavior
|
||||||
|
|
||||||
|
(defun create (bhv)
|
||||||
|
(let ((mb (lpq:make-queue)))
|
||||||
|
(values mb (start mb bhv))))
|
||||||
|
|
||||||
|
(defun send (mb msg)
|
||||||
|
(lpq:push-queue msg mb))
|
||||||
|
|
||||||
|
;;;; predefined behaviors
|
||||||
|
|
||||||
|
(defun no-op (msg))
|
||||||
|
|
||||||
|
(defun lgi (msg)
|
||||||
|
(util:lgi (content msg)))
|
||||||
|
|
||||||
|
(defun echo (msg)
|
||||||
|
(send (customer msg) msg))
|
||||||
|
|
||||||
|
;;;; predefined global actors
|
||||||
|
|
||||||
|
(defvar *logger* (create #'lgi))
|
||||||
|
|
||||||
|
(defun root-bhv (ac msg)
|
||||||
|
(send *logger* msg))
|
||||||
|
|
||||||
|
(defvar *root* (create #'root-bhv))
|
||||||
|
|
||||||
|
;;;; example behavior: calculator
|
||||||
|
|
||||||
|
(defun calculator (&optional (val 0))
|
||||||
|
#'(lambda (msg)
|
||||||
|
(destructuring-bind (fn &optional param) (content msg)
|
||||||
|
(funcall fn msg val param))))
|
||||||
|
|
||||||
|
(defun plus (msg val param)
|
||||||
|
(calculator (+ val param)))
|
||||||
|
(defun minus (msg val param)
|
||||||
|
(calculator (- val param)))
|
||||||
|
(defun show (msg val param)
|
||||||
|
(send (or (customer msg) *logger*) val))
|
||||||
|
(defun send-value (msg val param)
|
||||||
|
(send (customer msg) val))
|
||||||
|
|
|
@ -4,7 +4,7 @@
|
||||||
(:use :common-lisp)
|
(:use :common-lisp)
|
||||||
(:local-nicknames (:async :scopes/util/async)
|
(:local-nicknames (:async :scopes/util/async)
|
||||||
(:util :scopes/util))
|
(:util :scopes/util))
|
||||||
(:export #:actor #:bg-actor #:fg-actor #:make-actor #:start #:stop
|
(:export #:bg-actor #:fg-actor #:make-actor #:start #:stop
|
||||||
#:become #:create #:send
|
#:become #:create #:send
|
||||||
#:message #:content #:customer
|
#:message #:content #:customer
|
||||||
#:*logger* #:*root*
|
#:*logger* #:*root*
|
||||||
|
@ -46,10 +46,10 @@
|
||||||
(:method ((ac bg-actor))
|
(:method ((ac bg-actor))
|
||||||
(async:stop (task ac))))
|
(async:stop (task ac))))
|
||||||
|
|
||||||
(defun make-actor (bhv &optional (cls 'actor) &rest args &key &allow-other-keys)
|
(defun make-actor (bhv &optional (cls 'bg-actor) &rest args &key &allow-other-keys)
|
||||||
(apply #'make-instance cls :behavior bhv args))
|
(apply #'make-instance cls :behavior bhv args))
|
||||||
|
|
||||||
(defun make-task (ac &optional (cls 'async:bg-task))
|
(defun make-task (ac &optional (cls 'async:task))
|
||||||
(async:make-task :cls cls
|
(async:make-task :cls cls
|
||||||
:handle-message
|
:handle-message
|
||||||
#'(lambda (ax msg) (funcall (behavior ac) ac msg))))
|
#'(lambda (ax msg) (funcall (behavior ac) ac msg))))
|
||||||
|
|
|
@ -60,7 +60,7 @@
|
||||||
|
|
||||||
(defvar *root* nil)
|
(defvar *root* nil)
|
||||||
|
|
||||||
(defclass base-context (actor:actor)
|
(defclass base-context (actor:bg-actor)
|
||||||
((actions :accessor actions :initform nil)))
|
((actions :accessor actions :initform nil)))
|
||||||
|
|
||||||
(defclass context (base-context)
|
(defclass context (base-context)
|
||||||
|
|
|
@ -12,6 +12,8 @@
|
||||||
:components ((:file "config" :depends-on ("util/util"))
|
:components ((:file "config" :depends-on ("util/util"))
|
||||||
(:file "core/actor"
|
(:file "core/actor"
|
||||||
:depends-on ("shape/shape" "util/async" "util/util"))
|
:depends-on ("shape/shape" "util/async" "util/util"))
|
||||||
|
(:file "core/actor-ng"
|
||||||
|
:depends-on ("shape/shape" "util/async" "util/util"))
|
||||||
(:file "core/core"
|
(:file "core/core"
|
||||||
:depends-on ("config"
|
:depends-on ("config"
|
||||||
"core/actor" "core/message"
|
"core/actor" "core/message"
|
||||||
|
|
|
@ -3,7 +3,8 @@
|
||||||
(defpackage :scopes/test-core
|
(defpackage :scopes/test-core
|
||||||
(:use :common-lisp)
|
(:use :common-lisp)
|
||||||
(:local-nicknames (:alx :alexandria)
|
(:local-nicknames (:alx :alexandria)
|
||||||
(:actor :scopes/core/actor)
|
(:actor :scopes/core/actor-ng)
|
||||||
|
(:actorx :scopes/core/actor)
|
||||||
(:async :scopes/util/async)
|
(:async :scopes/util/async)
|
||||||
(:config :scopes/config)
|
(:config :scopes/config)
|
||||||
(:core :scopes/core)
|
(:core :scopes/core)
|
||||||
|
@ -53,6 +54,7 @@
|
||||||
((receiver :accessor receiver :initarg :receiver)))
|
((receiver :accessor receiver :initarg :receiver)))
|
||||||
|
|
||||||
(defun run ()
|
(defun run ()
|
||||||
|
(async:init)
|
||||||
(let* ((t:*test-suite* (make-instance 'test-suite :name "core")))
|
(let* ((t:*test-suite* (make-instance 'test-suite :name "core")))
|
||||||
(load (t:test-path "config-core" "etc"))
|
(load (t:test-path "config-core" "etc"))
|
||||||
(unwind-protect
|
(unwind-protect
|
||||||
|
@ -63,6 +65,7 @@
|
||||||
(test-util-iter)
|
(test-util-iter)
|
||||||
(test-shape)
|
(test-shape)
|
||||||
(core:setup-services)
|
(core:setup-services)
|
||||||
|
(test-actor-x)
|
||||||
(test-actor)
|
(test-actor)
|
||||||
(setf (receiver t:*test-suite*) (core:find-service :test-receiver))
|
(setf (receiver t:*test-suite*) (core:find-service :test-receiver))
|
||||||
(test-send))
|
(test-send))
|
||||||
|
@ -87,7 +90,7 @@
|
||||||
(== pl '(:b 1 :a 0))))
|
(== pl '(:b 1 :a 0))))
|
||||||
|
|
||||||
(deftest test-util-async ()
|
(deftest test-util-async ()
|
||||||
(async:init)
|
;(async:init)
|
||||||
(let ((tsk (async:make-task :startup (lambda (&rest args) (sleep 0.01)))))
|
(let ((tsk (async:make-task :startup (lambda (&rest args) (sleep 0.01)))))
|
||||||
(== (async:status tsk) :new)
|
(== (async:status tsk) :new)
|
||||||
(async:start tsk)
|
(async:start tsk)
|
||||||
|
@ -102,7 +105,8 @@
|
||||||
(async:send tsk :hello)
|
(async:send tsk :hello)
|
||||||
(== (async:stop tsk) '(:hello))
|
(== (async:stop tsk) '(:hello))
|
||||||
(== (async:status tsk) :done))
|
(== (async:status tsk) :done))
|
||||||
(async:finish))
|
;(async:finish)
|
||||||
|
)
|
||||||
|
|
||||||
(deftest test-util-crypt ()
|
(deftest test-util-crypt ()
|
||||||
(let ((s1 (crypt:create-secret))
|
(let ((s1 (crypt:create-secret))
|
||||||
|
@ -126,24 +130,31 @@
|
||||||
(setf (shape:head-value rec :username) :u1)
|
(setf (shape:head-value rec :username) :u1)
|
||||||
(== (shape:head-value rec :username) :u1)))
|
(== (shape:head-value rec :username) :u1)))
|
||||||
|
|
||||||
(deftest test-actor ()
|
(deftest test-actor-x ()
|
||||||
(let* ((calc (actor:create (actor:calculator) 'actor:bg-actor))
|
(let* ((calc (actorx:create (actorx:calculator) 'actorx:bg-actor))
|
||||||
val
|
val
|
||||||
(collector
|
(collector
|
||||||
(actor:create
|
(actorx:create
|
||||||
#'(lambda (ac msg) (setf val (actor:content msg))))))
|
#'(lambda (ac msg) (setf val (actorx:content msg))))))
|
||||||
(actor:send calc '(actor:plus 2))
|
(actorx:send calc '(actorx:plus 2))
|
||||||
(actor:send calc '(actor:minus 3))
|
(actorx:send calc '(actorx:minus 3))
|
||||||
(actor:send calc '(actor:show))
|
(actorx:send calc '(actorx:show))
|
||||||
(actor:send calc '(actor:send-value) :customer collector)
|
(actorx:send calc '(actorx:send-value) :customer collector)
|
||||||
(sleep 0.1)
|
(sleep 0.1)
|
||||||
(== val -1)
|
(== val -1)
|
||||||
))
|
))
|
||||||
|
|
||||||
|
(deftest test-actor ()
|
||||||
|
;(async:init)
|
||||||
|
(let (calc (actor:create (actor:calculator)))
|
||||||
|
;(actor:stop calc)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
(deftest test-send ()
|
(deftest test-send ()
|
||||||
(let ((rcvr (receiver t:*test-suite*))
|
(let ((rcvr (receiver t:*test-suite*))
|
||||||
(msg (message:create '(:test :dummy) :data "dummy payload"))
|
(msg (message:create '(:test :dummy) :data "dummy payload"))
|
||||||
(msg-exp (message:create '(:test :dummy) :data "dummy payload")))
|
(msg-exp (message:create '(:test :dummy) :data "dummy payload")))
|
||||||
(expect rcvr msg-exp)
|
(expect rcvr msg-exp)
|
||||||
(== (core:name rcvr) :test-receiver)
|
(== (core:name rcvr) :test-receiver)
|
||||||
(actor:send rcvr msg)))
|
(actorx:send rcvr msg)))
|
||||||
|
|
|
@ -5,25 +5,41 @@
|
||||||
(:local-nicknames (:util :scopes/util)
|
(:local-nicknames (:util :scopes/util)
|
||||||
(:lp :lparallel)
|
(:lp :lparallel)
|
||||||
(:lpq :lparallel.queue))
|
(:lpq :lparallel.queue))
|
||||||
(:export #:init #:finish #:bg-task #:fg-task
|
(:export #:init #:finish #:make-mb #:receive #:submit-task
|
||||||
|
#:fg-task #:task
|
||||||
#:make-task #:start #:stop #:status #:data #:send))
|
#:make-task #:start #:stop #:status #:data #:send))
|
||||||
|
|
||||||
(in-package :scopes/util/async)
|
(in-package :scopes/util/async)
|
||||||
|
|
||||||
;;;; general definitions
|
;;;; general definitions (lparallel wrappers)
|
||||||
|
|
||||||
|
(defun init ()
|
||||||
|
(when (null lp:*kernel*)
|
||||||
|
(format t "async:init ~a ~%"
|
||||||
|
(setf lp:*kernel* (lp:make-kernel (serapeum:count-cpus))))
|
||||||
|
))
|
||||||
|
|
||||||
|
(defun finish ()
|
||||||
|
(when lp:*kernel*
|
||||||
|
(lp:end-kernel)
|
||||||
|
;(setf lp:*kernel* nil)
|
||||||
|
))
|
||||||
|
|
||||||
|
(defun make-mb ()
|
||||||
|
(lpq:make-queue))
|
||||||
|
|
||||||
|
(defun receive (mb)
|
||||||
|
(lpq:pop-queue mb))
|
||||||
|
|
||||||
|
(defun submit-task (ch job)
|
||||||
|
(lp:submit-task ch job))
|
||||||
|
|
||||||
|
;;;; job - probably obsolete
|
||||||
|
|
||||||
(eval-when (:compile-toplevel :load-toplevel :execute)
|
(eval-when (:compile-toplevel :load-toplevel :execute)
|
||||||
(when (not (boundp '+quit-message+))
|
(when (not (boundp '+quit-message+))
|
||||||
(defconstant +quit-message+ (gensym "QUIT"))))
|
(defconstant +quit-message+ (gensym "QUIT"))))
|
||||||
|
|
||||||
(defun init ()
|
|
||||||
(when (null lp:*kernel*)
|
|
||||||
(setf lp:*kernel* (lp:make-kernel (serapeum:count-cpus)))))
|
|
||||||
|
|
||||||
(defun finish ()
|
|
||||||
(when lp:*kernel*
|
|
||||||
(lp:end-kernel)))
|
|
||||||
|
|
||||||
(defun noop (&rest params))
|
(defun noop (&rest params))
|
||||||
|
|
||||||
(defun standard-job (tsk &key (startup #'noop) (teardown #'noop) handle-message)
|
(defun standard-job (tsk &key (startup #'noop) (teardown #'noop) handle-message)
|
||||||
|
@ -53,17 +69,17 @@
|
||||||
(status :accessor status :initform :new)
|
(status :accessor status :initform :new)
|
||||||
(data :accessor data :initform nil)))
|
(data :accessor data :initform nil)))
|
||||||
|
|
||||||
(defclass bg-task (fg-task)
|
(defclass task (fg-task)
|
||||||
((channel :reader channel :initform (lp:make-channel))))
|
((channel :reader channel :initform (lp:make-channel))))
|
||||||
|
|
||||||
(defun make-task (&key (startup #'noop) (teardown #'noop) handle-message
|
(defun make-task (&key (startup #'noop) (teardown #'noop) handle-message
|
||||||
(cls 'bg-task))
|
(cls 'task))
|
||||||
(let ((tsk (make-instance cls)))
|
(let ((tsk (make-instance cls)))
|
||||||
(setf (job tsk)
|
(setf (job tsk)
|
||||||
(lambda () (standard-job tsk :startup startup :teardown teardown
|
(lambda () (standard-job tsk :startup startup :teardown teardown
|
||||||
:handle-message handle-message)))
|
:handle-message handle-message)))
|
||||||
(if handle-message
|
(if handle-message
|
||||||
(setf (mailbox tsk) (lpq:make-queue)))
|
(setf (mailbox tsk) (make-mb)))
|
||||||
tsk))
|
tsk))
|
||||||
|
|
||||||
(defun start (tsk)
|
(defun start (tsk)
|
||||||
|
@ -76,8 +92,8 @@
|
||||||
(defgeneric submit (tsk)
|
(defgeneric submit (tsk)
|
||||||
(:method ((tsk fg-task))
|
(:method ((tsk fg-task))
|
||||||
(funcall (job tsk)))
|
(funcall (job tsk)))
|
||||||
(:method ((tsk bg-task))
|
(:method ((tsk task))
|
||||||
(lp:submit-task (channel tsk) (job tsk))))
|
(submit-task (channel tsk) (job tsk))))
|
||||||
|
|
||||||
(defun stop (tsk &key (wait t))
|
(defun stop (tsk &key (wait t))
|
||||||
(when (mailbox tsk)
|
(when (mailbox tsk)
|
||||||
|
@ -87,7 +103,7 @@
|
||||||
|
|
||||||
(defgeneric wait-result (tsk)
|
(defgeneric wait-result (tsk)
|
||||||
(:method ((tsk fg-task)))
|
(:method ((tsk fg-task)))
|
||||||
(:method ((tsk bg-task))
|
(:method ((tsk task))
|
||||||
(lp:receive-result (channel tsk))))
|
(lp:receive-result (channel tsk))))
|
||||||
|
|
||||||
(defun send (tsk msg)
|
(defun send (tsk msg)
|
||||||
|
|
Loading…
Add table
Reference in a new issue