;;;; cl-scopes/util/async - utilities for asynchronous (concurrent / parallel) operations (defpackage :scopes/util/async (:use :common-lisp) (:local-nicknames (:util :scopes/util) (:lp :lparallel) (:lpq :lparallel.queue)) (:export #:task #:make-task #:start #:restart #:stop #:kill #:status #:logdata #:mailbox #:send #:receive)) (in-package :scopes/util/async) ;;;; general definitions (eval-when (:compile-toplevel :load-toplevel :execute) (progn (when (not (boundp '+quit-message+)) (defconstant +quit-message+ (gensym "QUIT"))) (when (null lp:*kernel*) (setf lp:*kernel* (lp:make-kernel (serapeum:count-cpus)))))) (defun noop (&rest params)) (defun standard-job (tsk &key (startup #'noop) (teardown #'noop) handle-message) (let ((mb (mailbox tsk))) (unwind-protect (progn (funcall startup tsk) (when mb (loop for msg = (lpq:pop-queue mb) until (eq msg +quit-message+) do (funcall handle-message tsk msg))) (funcall teardown tsk)) (setf (status tsk) :finished) "done"))) ;;;; task class and related functions / methods (defclass task () ((job :accessor job) (taskid :reader taskid :initform (gensym "TSK")) (channel :reader channel :initform (lp:make-channel)) (mailbox :accessor mailbox :initform nil) (status :accessor status :initform :new) (logdata :accessor logdata :initform nil))) (defun make-task (&key (startup #'noop) (teardown #'noop) handle-message) (let ((tsk (make-instance 'task))) (setf (job tsk) (lambda () (standard-job tsk :startup startup :teardown teardown :handle-message handle-message))) (if handle-message (setf (mailbox tsk) (lpq:make-queue))) tsk)) (defun start (tsk) (when (eq (status tsk) :running) (util:lgw "task already running" (taskid tsk)) (return-from start)) (setf (status tsk) :running) (lp:submit-task (channel tsk) (job tsk))) (defun stop (tsk &key (wait t)) (when (mailbox tsk) (send tsk +quit-message+)) (when wait (lp:receive-result (channel tsk)))) (defun send (tsk msg) (if (mailbox tsk) (lpq:push-queue msg (mailbox tsk)) (util:lgw "task has no mailbox" (taskid tsk))))