62 lines
1.7 KiB
Common Lisp
62 lines
1.7 KiB
Common Lisp
;;;; cl-scopes/util/async - utilities for asynchronous (concurrent / parallel) operations
|
|
|
|
(defpackage :scopes/util/async
|
|
(:use :common-lisp)
|
|
(:local-nicknames (:lp :lparallel)
|
|
(:lpq :lparallel.queue))
|
|
(:export #:init #:finish #:make-ch #:make-mb #:make-task #:rcv #:try-rcv #:snd
|
|
#:mailbox #:task #:restartable-task #:behavior #:status
|
|
#:submit-task #:receive-result #:try-receive-result))
|
|
|
|
(in-package :scopes/util/async)
|
|
|
|
;;;; general definitions (lparallel wrappers)
|
|
|
|
(defun init ()
|
|
(when (null lp:*kernel*)
|
|
(format t "async:init ~a ~%"
|
|
(setf lp:*kernel* (lp:make-kernel (serapeum:count-cpus))))))
|
|
|
|
(defun finish ()
|
|
(when lp:*kernel*
|
|
(lp:end-kernel)))
|
|
|
|
;;;; simple task = mailbox with result channel
|
|
|
|
(defclass mailbox ()
|
|
((queue :reader queue :initform (lpq:make-queue))
|
|
(channel :reader channel :initform (lp:make-channel))))
|
|
|
|
(defun make-mb ()
|
|
(make-instance 'mailbox))
|
|
|
|
(defun rcv (mb)
|
|
(lpq:pop-queue (queue mb)))
|
|
|
|
(defun try-rcv (mb)
|
|
(lpq:try-pop-queue (queue mb)))
|
|
|
|
(defun snd (mb msg)
|
|
(lpq:push-queue msg (queue mb)))
|
|
|
|
(defun submit-task (mb job &rest args)
|
|
(apply #'lp:submit-task (channel mb) job args))
|
|
|
|
(defun receive-result (mb)
|
|
(lp:receive-result (channel mb)))
|
|
|
|
(defun try-receive-result (mb)
|
|
(lp:try-receive-result (channel mb)))
|
|
|
|
;;;; tasks - with behavior and thread-safe status
|
|
|
|
(defclass task (mailbox)
|
|
((behavior :accessor behavior :initarg :behavior)))
|
|
|
|
(defclass restartable-task (task)
|
|
((status :reader status
|
|
:initform (lp:make-channel :fixed-capacity 1 :initial-contents '(:new)))))
|
|
|
|
(defun make-task (bhv &key restartable (cls 'task))
|
|
(if restartable (setf cls 'restartable-task))
|
|
(make-instance cls :behavior bhv))
|