core: start working with lparallel queues and background tasks
This commit is contained in:
		
							parent
							
								
									dc37f7778a
								
							
						
					
					
						commit
						c61d3bcfd3
					
				
					 1 changed files with 29 additions and 12 deletions
				
			
		| 
						 | 
					@ -6,7 +6,8 @@
 | 
				
			||||||
                    (:message :scopes/core/message)
 | 
					                    (:message :scopes/core/message)
 | 
				
			||||||
                    (:shape :scopes/shape)
 | 
					                    (:shape :scopes/shape)
 | 
				
			||||||
                    (:util :scopes/util)
 | 
					                    (:util :scopes/util)
 | 
				
			||||||
                    (:alx :alexandria))
 | 
					                    (:alx :alexandria)
 | 
				
			||||||
 | 
					                    (:q :lparallel.queue))
 | 
				
			||||||
  (:export #:action-spec #:define-actions
 | 
					  (:export #:action-spec #:define-actions
 | 
				
			||||||
           #:*root* #:default-setup #:actions 
 | 
					           #:*root* #:default-setup #:actions 
 | 
				
			||||||
           #:find-service #:run-services #:setup-services #:shutdown
 | 
					           #:find-service #:run-services #:setup-services #:shutdown
 | 
				
			||||||
| 
						 | 
					@ -57,10 +58,10 @@
 | 
				
			||||||
(defvar *root* nil)
 | 
					(defvar *root* nil)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
;;; check / fix:
 | 
					;;; check / fix:
 | 
				
			||||||
(defvar *quit-queue* (lparallel.queue:make-queue :fixed-capacity 1))
 | 
					(defvar *quit-queue* (q:make-queue :fixed-capacity 1))
 | 
				
			||||||
(defun quit-handler (sig)
 | 
					(defun quit-handler (sig)
 | 
				
			||||||
  (format t "~%quit-handler: got signal ~s~%" sig)
 | 
					  (format t "~%quit-handler: got signal ~s~%" sig)
 | 
				
			||||||
  (lparallel.queue:push-queue sig))
 | 
					  (q:push-queue sig *quit-queue*))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
(defclass base-context ()
 | 
					(defclass base-context ()
 | 
				
			||||||
  ((actions :accessor actions :initform nil)))
 | 
					  ((actions :accessor actions :initform nil)))
 | 
				
			||||||
| 
						 | 
					@ -70,6 +71,23 @@
 | 
				
			||||||
   (name :reader name :initarg :name)
 | 
					   (name :reader name :initarg :name)
 | 
				
			||||||
   (services :reader services :initform (make-hash-table))))
 | 
					   (services :reader services :initform (make-hash-table))))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(defclass service (context)
 | 
				
			||||||
 | 
					  ((mailbox :reader mailbox :initform (lparallel.queue:make-queue))))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(defgeneric do-start (ctx)
 | 
				
			||||||
 | 
					  (:method ((ctx context)))
 | 
				
			||||||
 | 
					  (:method ((ctx service)) (do-listen ctx)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(defgeneric do-listen (ctx)
 | 
				
			||||||
 | 
					  (:method ((ctx service))
 | 
				
			||||||
 | 
					    (do ((r (do-step ctx) (do-step ctx)))
 | 
				
			||||||
 | 
					      ((not r)))))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(defgeneric do-step (ctx)
 | 
				
			||||||
 | 
					  (:method ((ctx service))
 | 
				
			||||||
 | 
					    (let ((msg (q:pop-queue (mailbox ctx)))) 
 | 
				
			||||||
 | 
					      (handle-message ctx msg))))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
(defun default-setup (cfg &optional (cls 'context) &rest args &key &allow-other-keys)
 | 
					(defun default-setup (cfg &optional (cls 'context) &rest args &key &allow-other-keys)
 | 
				
			||||||
  (apply #'make-instance cls :config cfg :name (config:name cfg) args))
 | 
					  (apply #'make-instance cls :config cfg :name (config:name cfg) args))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -79,9 +97,8 @@
 | 
				
			||||||
      (gethash name services))))
 | 
					      (gethash name services))))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
(defun setup-services (&optional (cfg config:*root*))
 | 
					(defun setup-services (&optional (cfg config:*root*))
 | 
				
			||||||
  (setf *root* (make-instance 'context :config cfg))
 | 
					  (setf *root* (make-instance 'service :config cfg))
 | 
				
			||||||
  ;(setf (trivial-signal:signal-handler :int) #'quit-handler)
 | 
					  ;(setf (trivial-signal:signal-handler :int) #'quit-handler)
 | 
				
			||||||
  ;(setf (trivial-signal:signal-handler :term) #'quit-handler)
 | 
					 | 
				
			||||||
  (dolist (c (reverse (config:children cfg)))
 | 
					  (dolist (c (reverse (config:children cfg)))
 | 
				
			||||||
    (add-service *root* c)))
 | 
					    (add-service *root* c)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -93,8 +110,8 @@
 | 
				
			||||||
   (unwind-protect
 | 
					   (unwind-protect
 | 
				
			||||||
     (progn
 | 
					     (progn
 | 
				
			||||||
       (setup-services cfg)
 | 
					       (setup-services cfg)
 | 
				
			||||||
       ;(do-listen)
 | 
					       (do-listen *root*))
 | 
				
			||||||
       (lparallel.queue:pop-queue *quit-queue*))
 | 
					       ;(q:pop-queue *quit-queue*))
 | 
				
			||||||
     (shutdown)))
 | 
					     (shutdown)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
(defun add-action (ctx pat hdlr)
 | 
					(defun add-action (ctx pat hdlr)
 | 
				
			||||||
| 
						 | 
					@ -112,7 +129,7 @@
 | 
				
			||||||
        (dolist (a (config:actions cfg))
 | 
					        (dolist (a (config:actions cfg))
 | 
				
			||||||
          (add-action child (car a) (cadr a)))
 | 
					          (add-action child (car a) (cadr a)))
 | 
				
			||||||
        (setf (gethash (config:name cfg) services) child)
 | 
					        (setf (gethash (config:name cfg) services) child)
 | 
				
			||||||
        ;(do-start child)
 | 
					        (do-start child)
 | 
				
			||||||
        ))))
 | 
					        ))))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
(defgeneric send (rcvr msg)
 | 
					(defgeneric send (rcvr msg)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		
		Reference in a new issue