Skip to content

Commit 8bb5149

Browse files
committed
pulled in defparkingop
1 parent 0dac2ee commit 8bb5149

File tree

2 files changed

+44
-21
lines changed

2 files changed

+44
-21
lines changed

src/main/clojure/clojure/core/async.clj

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,18 @@ return nil for unexpected contexts."
4949
[clojure.core.async.impl.timers :as timers]
5050
[clojure.core.async.impl.dispatch :as dispatch]
5151
[clojure.core.async.impl.ioc-macros :as ioc]
52-
clojure.core.async.impl.go ;; TODO: make conditional
5352
[clojure.core.async.impl.mutex :as mutex]
5453
)
5554
(:import [java.util.concurrent.atomic AtomicLong]
5655
[java.util.concurrent.locks Lock]
5756
[java.util.concurrent ThreadLocalRandom]
5857
[java.util Arrays ArrayList]))
5958

59+
(def ^:private lazy-loading-supported? (dispatch/at-least-clojure-version? [1 12 3]))
60+
61+
(when-not lazy-loading-supported?
62+
(require 'clojure.core.async.impl.go))
63+
6064
(alias 'core 'clojure.core)
6165

6266
(set! *warn-on-reflection* false)
@@ -138,6 +142,21 @@ return nil for unexpected contexts."
138142
[^long msecs]
139143
(timers/timeout msecs))
140144

145+
(defn- defparkingop* [op doc arglist]
146+
(let [as (mapv #(list 'quote %) arglist)
147+
blockingop (-> op name (str "!") symbol)]
148+
`(def ~(with-meta op {:arglists `(list ~as) :doc doc})
149+
(fn [~'& ~'args]
150+
(if (dispatch/in-vthread?)
151+
~(list* apply blockingop '[args])
152+
(assert nil ~(str op " used not in (go ...) block")))))))
153+
154+
(defmacro defparkingop
155+
"Emits a Var with a function that checks if it's running in a virtual thread. If so then
156+
the related blocking op will be called, otherwise the function throws."
157+
[op doc arglist]
158+
(defparkingop* op doc arglist))
159+
141160
(defmacro defblockingop
142161
[op doc arglist & body]
143162
(let [as (mapv #(list 'quote %) arglist)]
@@ -162,11 +181,11 @@ return nil for unexpected contexts."
162181
@ret
163182
(deref p))))
164183

165-
(defn <!
166-
"takes a val from port. Must be called inside a (go ...) block. Will
167-
return nil if closed. Will park if nothing is available."
168-
[port]
169-
(assert nil "<! used not in (go ...) block"))
184+
(defparkingop <!
185+
"takes a val from port. Must be called inside a (go ...) block, or on
186+
a virtual thread (no matter how it was started). Will return nil if
187+
closed. Will park if nothing is available."
188+
[port])
170189

171190
(defn take!
172191
"Asynchronously takes a val from port, passing to fn1. Will pass nil
@@ -201,12 +220,12 @@ return nil for unexpected contexts."
201220
@ret
202221
(deref p))))
203222

204-
(defn >!
223+
(defparkingop >!
205224
"puts a val into port. nil values are not allowed. Must be called
206-
inside a (go ...) block. Will park if no buffer space is available.
225+
inside a (go ...) block, or on a virtual thread (no matter how it
226+
was started). Will park if no buffer space is available.
207227
Returns true unless port is already closed."
208-
[port val]
209-
(assert nil ">! used not in (go ...) block"))
228+
[port val])
210229

211230
(def ^:private nop (on-caller (fn [_])))
212231
(def ^:private fhnop (fn-handler nop))
@@ -344,16 +363,16 @@ return nil for unexpected contexts."
344363
@ret
345364
(deref p))))
346365

347-
(defn alts!
366+
(defparkingop alts!
348367
"Completes at most one of several channel operations. Must be called
349-
inside a (go ...) block. ports is a vector of channel endpoints,
350-
which can be either a channel to take from or a vector of
351-
[channel-to-put-to val-to-put], in any combination. Takes will be
352-
made as if by <!, and puts will be made as if by >!. Unless
353-
the :priority option is true, if more than one port operation is
354-
ready a non-deterministic choice will be made. If no operation is
355-
ready and a :default value is supplied, [default-val :default] will
356-
be returned, otherwise alts! will park until the first operation to
368+
inside a (go ...) block, or on a virtual thread (no matter how it was
369+
started). ports is a vector of channel endpoints, which can be either
370+
a channel to take from or a vector of [channel-to-put-to val-to-put],
371+
in any combination. Takes will be made as if by <!, and puts will be
372+
made as if by >!. Unless the :priority option is true, if more than one
373+
port operation is ready a non-deterministic choice will be made. If no
374+
operation is ready and a :default value is supplied, [default-val :default]
375+
will be returned, otherwise alts! will park until the first operation to
357376
become ready completes. Returns [val port] of the completed
358377
operation, where val is the value taken for takes, and a
359378
boolean (true unless already closed, as per put!) for puts.
@@ -367,8 +386,7 @@ return nil for unexpected contexts."
367386
used, nor in what order should they be, so they should not be
368387
depended upon for side effects."
369388

370-
[ports & {:as opts}]
371-
(assert nil "alts! used not in (go ...) block"))
389+
[ports & {:as opts}])
372390

373391
(defn do-alt [alts clauses]
374392
(assert (even? (count clauses)) "unbalanced clauses")

src/main/clojure/clojure/core/async/impl/dispatch.clj

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@
7272
[workload]
7373
(Executors/newCachedThreadPool (counted-thread-factory (str "async-" (name workload) "-%d") true)))
7474

75+
(defn at-least-clojure-version?
76+
[[maj min incr]]
77+
(let [{:keys [major minor incremental]} *clojure-version*]
78+
(not (neg? (compare [major minor incremental] [maj min incr])))))
79+
7580
(def virtual-threads-available?
7681
(try
7782
(Class/forName "java.lang.Thread$Builder$OfVirtual")

0 commit comments

Comments
 (0)