108 lines
3.1 KiB
Common Lisp
108 lines
3.1 KiB
Common Lisp
;;;; cl-scopes/util/async - utilities for asynchronous (concurrent / parallel) operations
|
|
|
|
(defpackage :scopes/util/async
|
|
(:use :common-lisp)
|
|
(:local-nicknames (:util :scopes/util)
|
|
(:lp :lparallel)
|
|
(:lpq :lparallel.queue))
|
|
(:export #:init #:finish #:bg-task #:fg-task
|
|
#:make-task #:start #:stop #:status #:data #:send))
|
|
|
|
(in-package :scopes/util/async)
|
|
|
|
;;;; general definitions
|
|
|
|
(eval-when (:compile-toplevel :load-toplevel :execute)
|
|
(when (not (boundp '+quit-message+))
|
|
(defconstant +quit-message+ (gensym "QUIT"))))
|
|
|
|
(defun init ()
|
|
(when (null lp:*kernel*)
|
|
(setf lp:*kernel* (lp:make-kernel (serapeum:count-cpus)))))
|
|
|
|
(defun finish ()
|
|
(when lp:*kernel*
|
|
(lp:end-kernel)))
|
|
|
|
(defun noop (&rest params))
|
|
|
|
(defun standard-job (tsk &key (startup #'noop) (teardown #'noop) handle-message)
|
|
(unwind-protect
|
|
(progn
|
|
(funcall startup tsk)
|
|
(when (mailbox tsk)
|
|
(do-listen tsk handle-message))
|
|
(data tsk))
|
|
(setf (status tsk) :done)
|
|
(funcall teardown tsk)))
|
|
|
|
(defun do-listen (tsk handle-message)
|
|
(handler-case
|
|
(loop for msg = (lpq:pop-queue (mailbox tsk))
|
|
until (eq msg +quit-message+)
|
|
do (funcall handle-message tsk msg))
|
|
(sb-sys:interactive-interrupt (condition)
|
|
(util:lgi condition))))
|
|
|
|
;;;; task classes and related functions / methods
|
|
|
|
(defclass fg-task ()
|
|
((job :accessor job)
|
|
(taskid :reader taskid :initform (gensym "TSK"))
|
|
(mailbox :accessor mailbox :initform nil)
|
|
(status :accessor status :initform :new)
|
|
(data :accessor data :initform nil)))
|
|
|
|
(defclass bg-task (fg-task)
|
|
((channel :reader channel :initform (lp:make-channel))))
|
|
|
|
(defun make-task (&key (startup #'noop) (teardown #'noop) handle-message
|
|
(cls 'bg-task))
|
|
(let ((tsk (make-instance cls)))
|
|
(setf (job tsk)
|
|
(lambda () (standard-job tsk :startup startup :teardown teardown
|
|
:handle-message handle-message)))
|
|
(if handle-message
|
|
(setf (mailbox tsk) (lpq:make-queue)))
|
|
tsk))
|
|
|
|
(defun start (tsk)
|
|
(when (eq (status tsk) :running)
|
|
(util:lgw "task already running" (taskid tsk))
|
|
(return-from start))
|
|
(setf (status tsk) :running)
|
|
(submit tsk))
|
|
|
|
(defgeneric submit (tsk)
|
|
(:method ((tsk fg-task))
|
|
(funcall (job tsk)))
|
|
(:method ((tsk bg-task))
|
|
(lp:submit-task (channel tsk) (job tsk))))
|
|
|
|
(defun stop (tsk &key (wait t))
|
|
(when (mailbox tsk)
|
|
(send tsk +quit-message+))
|
|
(when wait
|
|
(wait-result tsk)))
|
|
|
|
(defgeneric wait-result (tsk)
|
|
(:method ((tsk fg-task)))
|
|
(:method ((tsk bg-task))
|
|
(lp:receive-result (channel tsk))))
|
|
|
|
(defun send (tsk msg)
|
|
(if (mailbox tsk)
|
|
(lpq:push-queue msg (mailbox tsk))
|
|
(util:lgw "task has no mailbox" (taskid tsk))))
|
|
|
|
;;; alternate implementation - may be removed
|
|
(defun do-listen-hb (tsk handle-message)
|
|
(handler-bind
|
|
((sb-sys:interactive-interrupt
|
|
(lambda (condition)
|
|
(util:lgi condition)
|
|
(return-from do-listen-hb))))
|
|
(loop for msg = (lpq:pop-queue (mailbox tsk))
|
|
until (eq msg +quit-message+)
|
|
do (funcall handle-message tsk msg))))
|
|
|