core:service: use async:task for concurrent / parallel processing

This commit is contained in:
Helmut Merz 2025-03-06 16:03:08 +01:00
parent c6ec682937
commit 5cd84f356c
3 changed files with 16 additions and 15 deletions

View file

@ -44,7 +44,7 @@
(dotenv-val (if dotenv-data (gethash key dotenv-data)))) (dotenv-val (if dotenv-data (gethash key dotenv-data))))
(if env-val (if env-val
(setf (gethash sl data) env-val) (setf (gethash sl data) env-val)
(progn (when dotenv-val
(setf (uiop:getenv key) dotenv-val) (setf (uiop:getenv key) dotenv-val)
(setf (gethash sl data) dotenv-val))))))) (setf (gethash sl data) dotenv-val)))))))

View file

@ -74,14 +74,13 @@
(services :reader services :initform (make-hash-table)))) (services :reader services :initform (make-hash-table))))
(defclass service (context) (defclass service (context)
((task :accessor task :initform nil) ((task :accessor task :initform nil)))
(mailbox :reader mailbox :initform (lpq:make-queue))))
(defgeneric do-start (ctx) (defgeneric do-start (ctx &key wait)
(:method ((ctx context))) (:method ((ctx context) &key wait))
(:method ((ctx service)) (:method ((ctx service) &key wait)
(let ((ch (lp:make-channel))) (setf (task ctx) (async:make-task :handle-message #'handle-message))
(lp:submit-task ch (do-listen ctx))))) (async:start (task ctx :wait wait))))
(defgeneric do-listen (ctx) (defgeneric do-listen (ctx)
(:method ((ctx service)) (:method ((ctx service))
@ -97,14 +96,11 @@
(:method ((rcvr base-context) msg) (:method ((rcvr base-context) msg)
(handle-message rcvr msg)) (handle-message rcvr msg))
(:method ((rcvr service) msg) (:method ((rcvr service) msg)
(lpq:push-queue msg (mailbox rcvr)))) (async:send (task rcvr) msg)))
(defun default-setup (cfg &optional (cls 'context) &rest args &key &allow-other-keys) (defun default-setup (cfg &optional (cls 'context) &rest args &key &allow-other-keys)
(apply #'make-instance cls :config cfg :name (config:name cfg) args)) (apply #'make-instance cls :config cfg :name (config:name cfg) args))
;;; setup-service
;(setf (task child) (async:make-task :handle-message #'handle-message))
(defun find-service (name &optional (parent *root*)) (defun find-service (name &optional (parent *root*))
(with-slots (services) parent (with-slots (services) parent
(when services (when services
@ -117,6 +113,8 @@
(add-service *root* c))) (add-service *root* c)))
(defun shutdown () (defun shutdown ()
(if (task *root*)
(async:stop (task *root*)))
(dolist (ctx (alx:hash-table-values (services *root*))) (dolist (ctx (alx:hash-table-values (services *root*)))
(funcall (config:shutdown (config ctx)) ctx))) (funcall (config:shutdown (config ctx)) ctx)))
@ -124,7 +122,8 @@
(unwind-protect (unwind-protect
(progn (progn
(setup-services cfg) (setup-services cfg)
(do-listen *root*)) ;(do-listen *root*)
(do-start *root* :wait t))
(shutdown))) (shutdown)))
(defun add-action (ctx pat hdlr) (defun add-action (ctx pat hdlr)

View file

@ -52,12 +52,14 @@
(setf (mailbox tsk) (lpq:make-queue))) (setf (mailbox tsk) (lpq:make-queue)))
tsk)) tsk))
(defun start (tsk) (defun start (tsk &key (wait nil))
(when (eq (status tsk) :running) (when (eq (status tsk) :running)
(util:lgw "task already running" (taskid tsk)) (util:lgw "task already running" (taskid tsk))
(return-from start)) (return-from start))
(setf (status tsk) :running) (setf (status tsk) :running)
(lp:submit-task (channel tsk) (job tsk))) (if wait
(funcall (job tsk))
(lp:submit-task (channel tsk) (job tsk))))
(defun stop (tsk &key (wait t)) (defun stop (tsk &key (wait t))
(when (mailbox tsk) (when (mailbox tsk)