cl-scopes/util/async.lisp

63 lines
1.6 KiB
Common Lisp

;;;; cl-scopes/util/async - utilities for asynchronous (concurrent / parallel) operations
(defpackage :scopes/util/async
(:use :common-lisp)
(:local-nicknames (:util :scopes/util)
(:lp :lparallel)
(:lpq :lparallel.queue))
(:export #:init #:finish #:make-ch #:make-mb #:rcv #:snd #:submit-task
#:task #:make-task #:start #:status #:wait-receive))
(in-package :scopes/util/async)
;;;; general definitions (lparallel wrappers)
(defun init ()
(when (null lp:*kernel*)
(format t "async:init ~a ~%"
(setf lp:*kernel* (lp:make-kernel (serapeum:count-cpus))))))
(defun finish ()
(when lp:*kernel*
(lp:end-kernel)))
(defun make-ch ()
(lp:make-channel))
(defun make-mb ()
(lpq:make-queue))
(defun rcv (mb)
(lpq:pop-queue mb))
(defun snd (mb msg)
(lpq:push-queue msg mb))
(defun submit-task (ch job)
(lp:submit-task ch job))
;;;; task class and related functions
(defun receive-result (ch)
(lp:receive-result ch))
(defclass task ()
((job :reader job :initarg :job)
(taskid :reader taskid :initform (gensym "TSK"))
(status :accessor status :initform :new)
(channel :reader channel :initform (make-ch))))
(defun make-task (job &key (cls 'task))
(make-instance cls :job job))
(defun start (tsk)
(when (eq (status tsk) :running)
(util:lgw "task already running" (taskid tsk))
(return-from start))
(setf (status tsk) :running)
(submit-task (channel tsk) (job tsk)))
(defun wait-receive (tsk)
(let ((data (receive-result (channel tsk))))
(setf (status tsk) :done)
data))