async: provide lparallel wrappers => no other package uses lparallel directly

This commit is contained in:
Helmut Merz 2025-06-03 14:07:26 +02:00
parent 3509887ad0
commit 09636d9960
4 changed files with 26 additions and 23 deletions

View file

@ -3,8 +3,6 @@
(defpackage :scopes/core/actor (defpackage :scopes/core/actor
(:use :common-lisp) (:use :common-lisp)
(:local-nicknames (:async :scopes/util/async) (:local-nicknames (:async :scopes/util/async)
(:lp :lparallel)
(:lpq :lparallel.queue)
(:shape :scopes/shape) (:shape :scopes/shape)
(:util :scopes/util)) (:util :scopes/util))
(:export #:start #:stop #:become #:create #:send (:export #:start #:stop #:become #:create #:send
@ -39,15 +37,15 @@
(defun start (mb bhv &key foreground) (defun start (mb bhv &key foreground)
(if foreground (if foreground
(ac-loop mb bhv) (ac-loop mb bhv)
(let ((ch (lp:make-channel))) (let ((ch (async:make-ch)))
(lp:submit-task ch (lambda () (ac-loop mb bhv))) (async:submit-task ch (lambda () (ac-loop mb bhv)))
ch))) ch)))
(defun stop (mb) (defun stop (mb)
(send mb (message +quit-message+))) (send mb (message +quit-message+)))
(defun ac-loop (mb bhv) (defun ac-loop (mb bhv)
(let ((msg (lpq:pop-queue mb))) (let ((msg (async:rcv mb)))
(unless (eq (content msg) +quit-message+) (unless (eq (content msg) +quit-message+)
(ac-loop mb (or (funcall bhv msg) bhv))))) (ac-loop mb (or (funcall bhv msg) bhv)))))
@ -55,12 +53,12 @@
;;; there is no `become` operation: the behavior just returns the new behavior ;;; there is no `become` operation: the behavior just returns the new behavior
(defun create (bhv) (defun create (bhv)
(let ((mb (lpq:make-queue))) (let ((mb (async:make-mb)))
(values mb (start mb bhv)))) (values mb (start mb bhv))))
(defun send (mb msg) (defun send (mb msg)
;(util:lgi msg) ;(util:lgi msg)
(lpq:push-queue msg mb)) (async:snd mb msg))
;;;; predefined behaviors ;;;; predefined behaviors

View file

@ -8,9 +8,7 @@
(:message :scopes/core/message) (:message :scopes/core/message)
(:shape :scopes/shape) (:shape :scopes/shape)
(:util :scopes/util) (:util :scopes/util)
(:alx :alexandria) (:alx :alexandria))
(:lp :lparallel)
(:lpq :lparallel.queue))
(:export #:action-spec #:define-actions (:export #:action-spec #:define-actions
#:*root* #:make-setup #:actions #:*root* #:make-setup #:actions
#:find-service #:run-services #:setup-services #:shutdown #:find-service #:run-services #:setup-services #:shutdown
@ -84,7 +82,7 @@
(defun setup-services (&optional (cfg config:*root*)) (defun setup-services (&optional (cfg config:*root*))
(async:init) (async:init)
(let* ((ctx (make-instance 'context :config cfg))) (let* ((ctx (make-instance 'context :config cfg)))
(setf (mailbox ctx) (lpq:make-queue)) (setf (mailbox ctx) (async:make-mb))
(setf *root* ctx)) (setf *root* ctx))
(dolist (c (reverse (config:children cfg))) (dolist (c (reverse (config:children cfg)))
(add-service *root* c))) (add-service *root* c)))

View file

@ -5,9 +5,8 @@
(:local-nicknames (:util :scopes/util) (:local-nicknames (:util :scopes/util)
(:lp :lparallel) (:lp :lparallel)
(:lpq :lparallel.queue)) (:lpq :lparallel.queue))
(:export #:init #:finish #:make-mb #:receive #:submit-task (:export #:init #:finish #:make-ch #:make-mb #:rcv #:snd #:submit-task
#:fg-task #:task #:fg-task #:task #:make-task #:start #:stop #:status #:data #:send))
#:make-task #:start #:stop #:status #:data #:send))
(in-package :scopes/util/async) (in-package :scopes/util/async)
@ -16,24 +15,32 @@
(defun init () (defun init ()
(when (null lp:*kernel*) (when (null lp:*kernel*)
(format t "async:init ~a ~%" (format t "async:init ~a ~%"
(setf lp:*kernel* (lp:make-kernel (serapeum:count-cpus)))) (setf lp:*kernel* (lp:make-kernel (serapeum:count-cpus))))))
))
(defun finish () (defun finish ()
(when lp:*kernel* (when lp:*kernel*
(lp:end-kernel) (lp:end-kernel)))
;(setf lp:*kernel* nil)
)) (defun make-ch ()
(lp:make-channel))
(defun make-mb () (defun make-mb ()
(lpq:make-queue)) (lpq:make-queue))
(defun receive (mb) (defun rcv (mb)
(lpq:pop-queue mb)) (lpq:pop-queue mb))
(defun snd (mb msg)
(lpq:push-queue msg mb))
(defun submit-task (ch job) (defun submit-task (ch job)
(lp:submit-task ch job)) (lp:submit-task ch job))
;;;; not used at the moment
(defun receive-result
(lp:receive-result ch))
;;;; job - probably obsolete ;;;; job - probably obsolete
(eval-when (:compile-toplevel :load-toplevel :execute) (eval-when (:compile-toplevel :load-toplevel :execute)

View file

@ -2,11 +2,11 @@
(defpackage :scopes/web/response (defpackage :scopes/web/response
(:use :common-lisp) (:use :common-lisp)
(:local-nicknames (:actor :scopes/core/actor) (:local-nicknames (:async :scopes/util/async)
(:actor :scopes/core/actor)
(:cookie :scopes/web/cookie) (:cookie :scopes/web/cookie)
(:core :scopes/core) (:core :scopes/core)
(:dom :scopes/web/dom) (:dom :scopes/web/dom)
(:lpq :lparallel.queue)
(:message :scopes/core/message) (:message :scopes/core/message)
(:shape :scopes/shape) (:shape :scopes/shape)
(:util :scopes/util)) (:util :scopes/util))
@ -74,7 +74,7 @@
(let* ((headers (getf env :headers)) (let* ((headers (getf env :headers))
(resp-class (select-response-class (gethash "accept" headers) html-responder)) (resp-class (select-response-class (gethash "accept" headers) html-responder))
(resp (make-instance resp-class :context ctx :env env))) (resp (make-instance resp-class :context ctx :env env)))
(setf (core:mailbox resp) (lpq:make-queue)) (setf (core:mailbox resp) (async:make-mb))
resp)) resp))
;(actor:make-actor #'core:handle-message resp-class :context ctx :env env))) ;(actor:make-actor #'core:handle-message resp-class :context ctx :env env)))