63 lines
1.5 KiB
Common Lisp
63 lines
1.5 KiB
Common Lisp
;;;; cl-scopes/util/async - utilities for asynchronous (concurrent / parallel) operations
|
|
|
|
(defpackage :scopes/util/async
|
|
(:use :common-lisp)
|
|
(:local-nicknames (:util :scopes/util)
|
|
(:lp :lparallel)
|
|
(:lpq :lparallel.queue))
|
|
(:export #:init #:finish #:make-ch #:make-mb #:rcv #:snd #:submit-task
|
|
#:task #:make-task #:start #:status #:wait-receive))
|
|
|
|
(in-package :scopes/util/async)
|
|
|
|
;;;; general definitions (lparallel wrappers)
|
|
|
|
(defun init ()
|
|
(when (null lp:*kernel*)
|
|
(format t "async:init ~a ~%"
|
|
(setf lp:*kernel* (lp:make-kernel (serapeum:count-cpus))))))
|
|
|
|
(defun finish ()
|
|
(when lp:*kernel*
|
|
(lp:end-kernel)))
|
|
|
|
(defun make-ch ()
|
|
(lp:make-channel))
|
|
|
|
(defun make-mb ()
|
|
(lpq:make-queue))
|
|
|
|
(defun rcv (mb)
|
|
(lpq:pop-queue mb))
|
|
|
|
(defun snd (mb msg)
|
|
(lpq:push-queue msg mb))
|
|
|
|
(defun submit-task (ch job)
|
|
(lp:submit-task ch job))
|
|
|
|
;;;; not used at the moment
|
|
|
|
(defun receive-result
|
|
(lp:receive-result ch))
|
|
|
|
;;;; task class and related functions
|
|
|
|
(defclass task ()
|
|
((job :reader job :initarg :job)
|
|
(taskid :reader taskid :initform (gensym "TSK"))
|
|
(status :accessor status :initform :new)
|
|
(channel :reader channel :initform (make-ch))))
|
|
|
|
(defun make-task (job &key (cls 'task))
|
|
(make-instance cls :job job))
|
|
|
|
(defun start (tsk)
|
|
(when (eq (status tsk) :running)
|
|
(util:lgw "task already running" (taskid tsk))
|
|
(return-from start))
|
|
(setf (status tsk) :running)
|
|
(submit-task (channel tsk) (job tsk)))
|
|
|
|
(defun wait-receive (tsk)
|
|
(lp:receive-result (channel tsk)))
|