cl-scopes/util/async.lisp

74 lines
2 KiB
Common Lisp

;;;; cl-scopes/util/async - utilities for asynchronous (concurrent / parallel) operations
(defpackage :scopes/util/async
(:use :common-lisp)
(:local-nicknames (:lp :lparallel)
(:lpq :lparallel.queue)
(:util :scopes/util))
(:export #:+quit-message+ #:init #:finish #:make-mb #:make-task #:rcv #:try-rcv #:snd
#:mailbox #:task #:restartable-task #:behavior #:get-status #:set-status
#:submit-task #:receive-result #:try-receive-result))
(in-package :scopes/util/async)
(defconstant +quit-message+ :quit)
;;;; general definitions (lparallel wrappers)
(defun init ()
(when (null lp:*kernel*)
(format t "async:init ~a ~%"
(setf lp:*kernel* (lp:make-kernel (serapeum:count-cpus))))))
(defun finish ()
(when lp:*kernel*
(lp:end-kernel)))
;;;; simple task = mailbox with result channel
(defclass mailbox ()
((queue :reader queue :initform (lpq:make-queue))
(channel :reader channel :initform (lp:make-channel))))
(defun make-mb ()
(make-instance 'mailbox))
(defun rcv (mb)
(handler-case (lpq:pop-queue (queue mb))
(condition (cnd)
(util:lg :info "rcv -> lpq:pop-queue" cnd)
+quit-message+)))
(defun try-rcv (mb)
(lpq:try-pop-queue (queue mb)))
(defun snd (mb msg)
(lpq:push-queue msg (queue mb)))
(defun submit-task (mb job &rest args)
(apply #'lp:submit-task (channel mb) job args))
(defun receive-result (mb)
(lp:receive-result (channel mb)))
(defun try-receive-result (mb)
(lp:try-receive-result (channel mb)))
;;;; tasks - with behavior and thread-safe status
(defclass task (mailbox)
((behavior :accessor behavior :initarg :behavior)))
(defclass restartable-task (task)
((status :reader status
:initform (lpq:make-queue :fixed-capacity 1 :initial-contents '(:new)))))
(defun make-task (bhv &key restartable (cls 'task))
(if restartable (setf cls 'restartable-task))
(make-instance cls :behavior bhv))
(defun get-status (tsk)
(lpq:pop-queue (status tsk)))
(defun set-status (tsk st)
(lpq:push-queue st (status tsk)))