;;;; cl-scopes/util/async - utilities for asynchronous (concurrent / parallel) operations (defpackage :scopes/util/async (:use :common-lisp) (:local-nicknames (:util :scopes/util) (:lp :lparallel) (:lpq :lparallel.queue)) (:export #:init #:finish #:make-ch #:make-mb #:rcv #:snd #:submit-task #:receive-result #:task #:make-task #:start #:status #:wait-receive)) (in-package :scopes/util/async) ;;;; general definitions (lparallel wrappers) (defun init () (when (null lp:*kernel*) (format t "async:init ~a ~%" (setf lp:*kernel* (lp:make-kernel (serapeum:count-cpus)))))) (defun finish () (when lp:*kernel* (lp:end-kernel))) (defun make-ch () (lp:make-channel)) (defun make-mb () (lpq:make-queue)) (defun rcv (mb) (lpq:pop-queue mb)) (defun snd (mb msg) (lpq:push-queue msg mb)) (defun submit-task (ch job) (lp:submit-task ch job)) (defun receive-result (ch) (lp:receive-result ch)) ;;;; task class and related functions (defclass task () ((job :reader job :initarg :job) (taskid :reader taskid :initform (gensym "TSK")) (status :accessor status :initform :new) (channel :reader channel :initform (make-ch)))) (defun make-task (job &key (cls 'task)) (make-instance cls :job job)) (defun start (tsk) (when (eq (status tsk) :running) (util:lgw "task already running" (taskid tsk)) (return-from start)) (setf (status tsk) :running) (submit-task (channel tsk) (job tsk))) (defun wait-receive (tsk) (let ((data (receive-result (channel tsk)))) (setf (status tsk) :done) data))