explicit definitions for service and root-service classes, using async and blocking task classes

This commit is contained in:
Helmut Merz 2025-03-10 15:00:10 +01:00
parent 5cd84f356c
commit fb33a65d54
2 changed files with 32 additions and 24 deletions

View file

@ -76,21 +76,17 @@
(defclass service (context) (defclass service (context)
((task :accessor task :initform nil))) ((task :accessor task :initform nil)))
(defgeneric do-start (ctx &key wait) (defclass root-service (service) ())
(:method ((ctx context) &key wait))
(:method ((ctx service) &key wait) (defgeneric do-start (ctx)
(:method ((ctx context)))
(:method ((ctx service))
(setf (task ctx) (async:make-task :handle-message #'handle-message)) (setf (task ctx) (async:make-task :handle-message #'handle-message))
(async:start (task ctx :wait wait)))) (async:start (task ctx)))
(:method ((ctx root-service))
(defgeneric do-listen (ctx) (setf (task ctx)
(:method ((ctx service)) (async:make-task :cls 'async:async-task :handle-message #'handle-message))
(do ((r (do-step ctx) (do-step ctx))) (async:start (task ctx :blocking t))))
((eql r '(:quit))))))
(defgeneric do-step (ctx)
(:method ((ctx service))
(let ((msg (lpq:pop-queue (mailbox ctx))))
(handle-message ctx msg))))
(defgeneric send (rcvr msg) (defgeneric send (rcvr msg)
(:method ((rcvr base-context) msg) (:method ((rcvr base-context) msg)
@ -107,7 +103,7 @@
(gethash name services)))) (gethash name services))))
(defun setup-services (&optional (cfg config:*root*)) (defun setup-services (&optional (cfg config:*root*))
(setf *root* (make-instance 'service :config cfg)) (setf *root* (make-instance 'root-service :config cfg))
;(setf (trivial-signal:signal-handler :int) #'quit-handler) ;(setf (trivial-signal:signal-handler :int) #'quit-handler)
(dolist (c (reverse (config:children cfg))) (dolist (c (reverse (config:children cfg)))
(add-service *root* c))) (add-service *root* c)))
@ -122,8 +118,7 @@
(unwind-protect (unwind-protect
(progn (progn
(setup-services cfg) (setup-services cfg)
;(do-listen *root*) (do-start *root*))
(do-start *root* :wait t))
(shutdown))) (shutdown)))
(defun add-action (ctx pat hdlr) (defun add-action (ctx pat hdlr)

View file

@ -5,7 +5,8 @@
(:local-nicknames (:util :scopes/util) (:local-nicknames (:util :scopes/util)
(:lp :lparallel) (:lp :lparallel)
(:lpq :lparallel.queue)) (:lpq :lparallel.queue))
(:export #:task #:make-task #:start #:stop #:status #:data #:send)) (:export #:task #:async-task
#:make-task #:start #:stop #:status #:data #:send))
(in-package :scopes/util/async) (in-package :scopes/util/async)
@ -38,13 +39,16 @@
(defclass task () (defclass task ()
((job :accessor job) ((job :accessor job)
(taskid :reader taskid :initform (gensym "TSK")) (taskid :reader taskid :initform (gensym "TSK"))
(channel :reader channel :initform (lp:make-channel))
(mailbox :accessor mailbox :initform nil) (mailbox :accessor mailbox :initform nil)
(status :accessor status :initform :new) (status :accessor status :initform :new)
(data :accessor data :initform nil))) (data :accessor data :initform nil)))
(defun make-task (&key (startup #'noop) (teardown #'noop) handle-message) (defclass async-task (task)
(let ((tsk (make-instance 'task))) ((channel :reader channel :initform (lp:make-channel))))
(defun make-task (&key (startup #'noop) (teardown #'noop) handle-message
(cls 'async-task))
(let ((tsk (make-instance cls)))
(setf (job tsk) (setf (job tsk)
(lambda () (standard-job tsk :startup startup :teardown teardown (lambda () (standard-job tsk :startup startup :teardown teardown
:handle-message handle-message))) :handle-message handle-message)))
@ -52,19 +56,28 @@
(setf (mailbox tsk) (lpq:make-queue))) (setf (mailbox tsk) (lpq:make-queue)))
tsk)) tsk))
(defun start (tsk &key (wait nil)) (defun start (tsk)
(when (eq (status tsk) :running) (when (eq (status tsk) :running)
(util:lgw "task already running" (taskid tsk)) (util:lgw "task already running" (taskid tsk))
(return-from start)) (return-from start))
(setf (status tsk) :running) (setf (status tsk) :running)
(if wait (submit tsk))
(funcall (job tsk))
(defgeneric submit (tsk)
(:method ((tsk task))
(funcall (job tsk)))
(:method ((tsk async-task))
(lp:submit-task (channel tsk) (job tsk)))) (lp:submit-task (channel tsk) (job tsk))))
(defun stop (tsk &key (wait t)) (defun stop (tsk &key (wait t))
(when (mailbox tsk) (when (mailbox tsk)
(send tsk +quit-message+)) (send tsk +quit-message+))
(when wait (when wait
(wait-result tsk)))
(defgeneric wait-result (tsk)
(:method ((tsk task)))
(:method ((tsk async-task))
(lp:receive-result (channel tsk)))) (lp:receive-result (channel tsk))))
(defun send (tsk msg) (defun send (tsk msg)