Skip to content

Commit 6a5fc24

Browse files
committed
Initial commits
1 parent 283380a commit 6a5fc24

2 files changed

Lines changed: 640 additions & 33 deletions

File tree

Lib/multiprocessing/queues.py

Lines changed: 192 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
import types
1818
import weakref
1919
import errno
20+
from contextlib import contextmanager
2021

21-
from queue import Empty, Full
22+
from queue import Empty, Full, ShutDown
2223

2324
from . import connection
2425
from . import context
@@ -45,21 +46,37 @@ def __init__(self, maxsize=0, *, ctx):
4546
else:
4647
self._wlock = ctx.Lock()
4748
self._sem = ctx.BoundedSemaphore(maxsize)
49+
50+
self._lock_shutdown = ctx.Lock()
51+
# Cannot use a ctx.Value because 'ctypes' library is
52+
# not always available on all Linux platforms.
53+
# Use of Semaphores instead of an array from `heap.BufferWrapper'
54+
# is here more efficient and explicit.
55+
self._sem_flag_shutdown = ctx.Semaphore(0)
56+
self._sem_flag_shutdown_immediate = ctx.Semaphore(0)
57+
self._sem_pending_getters = ctx.Semaphore(0)
58+
self._sem_pending_putters = ctx.Semaphore(0)
59+
4860
# For use by concurrent.futures
4961
self._ignore_epipe = False
5062
self._reset()
51-
5263
if sys.platform != 'win32':
5364
register_after_fork(self, Queue._after_fork)
5465

5566
def __getstate__(self):
5667
context.assert_spawning(self)
5768
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
58-
self._rlock, self._wlock, self._sem, self._opid)
69+
self._rlock, self._wlock, self._sem, self._opid,
70+
self._lock_shutdown,
71+
self._sem_flag_shutdown, self._sem_flag_shutdown_immediate,
72+
self._sem_pending_getters, self._sem_pending_putters)
5973

6074
def __setstate__(self, state):
6175
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
62-
self._rlock, self._wlock, self._sem, self._opid) = state
76+
self._rlock, self._wlock, self._sem, self._opid,
77+
self._lock_shutdown,
78+
self._sem_flag_shutdown, self._sem_flag_shutdown_immediate,
79+
self._sem_pending_getters, self._sem_pending_putters) = state
6380
self._reset()
6481

6582
def _after_fork(self):
@@ -81,43 +98,101 @@ def _reset(self, after_fork=False):
8198
self._recv_bytes = self._reader.recv_bytes
8299
self._poll = self._reader.poll
83100

101+
def _is_shutdown(self):
102+
return not self._sem_flag_shutdown.locked()
103+
104+
def _set_shutdown(self, immediate=False):
105+
self._sem_flag_shutdown.release()
106+
if immediate:
107+
self._sem_flag_shutdown_immediate.release()
108+
109+
@contextmanager
110+
def _handle_pending_processes(self, sem):
111+
# Count pending getter or putter processes in a dedicated
112+
# semaphore. These 2 semaphores are only used when queue
113+
# shuts down to release one by one all pending processes.
114+
sem.release()
115+
try:
116+
# Wraps potentialy blocking calls:
117+
# sem._sem.acquire() in put method,
118+
# _recv_bytes()/_poll(*args) in get method.
119+
yield
120+
finally:
121+
sem.acquire()
122+
123+
def _release_pending_putters(self):
124+
with self._lock_shutdown:
125+
if not self._sem_pending_putters.locked():
126+
self._sem.release()
127+
84128
def put(self, obj, block=True, timeout=None):
85129
if self._closed:
86130
raise ValueError(f"Queue {self!r} is closed")
87-
if not self._sem.acquire(block, timeout):
88-
raise Full
131+
132+
if self._is_shutdown():
133+
raise ShutDown
134+
try:
135+
with self._handle_pending_processes(self._sem_pending_putters):
136+
if not self._sem.acquire(block, timeout):
137+
raise Full
138+
finally:
139+
if self._is_shutdown():
140+
self._release_pending_putters()
141+
raise ShutDown
89142

90143
with self._notempty:
91144
if self._thread is None:
92145
self._start_thread()
93146
self._buffer.append(obj)
94147
self._notempty.notify()
95148

149+
def _release_pending_getters(self):
150+
with self._lock_shutdown:
151+
if not self._sem_pending_getters.locked():
152+
self._put_sentinel()
153+
96154
def get(self, block=True, timeout=None):
97155
if self._closed:
98156
raise ValueError(f"Queue {self!r} is closed")
99-
if block and timeout is None:
100-
with self._rlock:
101-
res = self._recv_bytes()
102-
self._sem.release()
103-
else:
104-
if block:
105-
deadline = time.monotonic() + timeout
106-
if not self._rlock.acquire(block, timeout):
107-
raise Empty
108-
try:
109-
if block:
110-
timeout = deadline - time.monotonic()
111-
if not self._poll(timeout):
157+
158+
if (empty := self.empty()) and self._is_shutdown():
159+
raise ShutDown
160+
try:
161+
with self._handle_pending_processes(self._sem_pending_getters):
162+
if block and timeout is None:
163+
with self._rlock:
164+
res = self._recv_bytes()
165+
self._sem.release()
166+
else:
167+
if block:
168+
deadline = time.monotonic() + timeout
169+
if not self._rlock.acquire(block, timeout):
112170
raise Empty
113-
elif not self._poll():
114-
raise Empty
115-
res = self._recv_bytes()
116-
self._sem.release()
117-
finally:
118-
self._rlock.release()
119-
# unserialize the data after having released the lock
120-
return _ForkingPickler.loads(res)
171+
try:
172+
if block:
173+
timeout = deadline - time.monotonic()
174+
if not self._poll(timeout):
175+
raise Empty
176+
elif not self._poll():
177+
raise Empty
178+
res = self._recv_bytes()
179+
self._sem.release()
180+
finally:
181+
self._rlock.release()
182+
finally:
183+
if self._is_shutdown() and empty:
184+
self._release_pending_getters()
185+
raise ShutDown
186+
187+
item = _ForkingPickler.loads(res)
188+
if self._is_shutdown() \
189+
and isinstance(item, _ShutdownSentinel):
190+
# A pending getter process is just unblocked,
191+
# Unblock a next one if exists.
192+
self._release_pending_getters()
193+
raise ShutDown
194+
195+
return item
121196

122197
def qsize(self):
123198
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
@@ -135,6 +210,57 @@ def get_nowait(self):
135210
def put_nowait(self, obj):
136211
return self.put(obj, False)
137212

213+
def _clear(self):
214+
with self._rlock:
215+
while self._poll():
216+
self._recv_bytes()
217+
218+
def _put_sentinel(self):
219+
# When put a sentinel into an empty queue,
220+
# dont forget to call to _sem.acquire in order to
221+
# maintain a correct count of acquire/release
222+
# calls for BoudedSempaphore.
223+
self._sem.acquire()
224+
225+
with self._notempty:
226+
if self._thread is None:
227+
self._start_thread()
228+
self._buffer.append(_sentinel_shutdown)
229+
self._notempty.notify()
230+
231+
def shutdown(self, immediate=False):
232+
if self._closed:
233+
raise ValueError(f"Queue {self!r} is closed")
234+
235+
with self._lock_shutdown:
236+
if self._is_shutdown():
237+
raise RuntimeError(f"Queue {self!r} already shut down")
238+
239+
is_pending_getters = not self._sem_pending_getters.locked()
240+
is_pending_putters = not self._sem_pending_putters.locked()
241+
str_shutdown = f"shutdown -> immediate:{immediate}"
242+
str_shutdown += f"/PGetters:{is_pending_getters}" \
243+
f"/PPutters:{is_pending_putters}" \
244+
f"/Empty:{self.empty()}" \
245+
f"/Full:{self.full()}"
246+
debug(str_shutdown)
247+
self._set_shutdown(immediate)
248+
249+
# Shut down is immediatly and there is no pending getter,
250+
# we purge the queue (pipe). If there are datas into the buffer
251+
# the 'feeder' thread should erase all of them.
252+
if immediate and not is_pending_getters:
253+
self._clear()
254+
255+
# Starting release one pending getter process.
256+
# Put a first shutdown sentinel data into the pipe.
257+
if self.empty() and is_pending_getters:
258+
self._put_sentinel()
259+
260+
# Starting release one pending putter processes.
261+
if is_pending_putters:
262+
self._sem.release()
263+
138264
def close(self):
139265
self._closed = True
140266
close = self._close
@@ -180,7 +306,7 @@ def _start_thread(self):
180306
args=(self._buffer, self._notempty, self._send_bytes,
181307
self._wlock, self._reader.close, self._writer.close,
182308
self._ignore_epipe, self._on_queue_feeder_error,
183-
self._sem),
309+
self._sem, self._sem_flag_shutdown_immediate),
184310
name='QueueFeederThread',
185311
daemon=True,
186312
)
@@ -228,7 +354,8 @@ def _finalize_close(buffer, notempty):
228354

229355
@staticmethod
230356
def _feed(buffer, notempty, send_bytes, writelock, reader_close,
231-
writer_close, ignore_epipe, onerror, queue_sem):
357+
writer_close, ignore_epipe, onerror, queue_sem,
358+
flag_shutdown_immediate):
232359
debug('starting thread to feed data to pipe')
233360
nacquire = notempty.acquire
234361
nrelease = notempty.release
@@ -240,7 +367,7 @@ def _feed(buffer, notempty, send_bytes, writelock, reader_close,
240367
wrelease = writelock.release
241368
else:
242369
wacquire = None
243-
370+
is_shutdown_immediate = lambda: not flag_shutdown_immediate.locked()
244371
while 1:
245372
try:
246373
nacquire()
@@ -258,6 +385,14 @@ def _feed(buffer, notempty, send_bytes, writelock, reader_close,
258385
writer_close()
259386
return
260387

388+
# When queue shuts down immediatly, do not insert
389+
# regular data in pipe, only shutdown sentinel.
390+
if is_shutdown_immediate() \
391+
and not isinstance(obj, _ShutdownSentinel):
392+
debug("Queue shuts down immediatly, " \
393+
"don't feed regular data to pipe")
394+
continue
395+
261396
# serialize the data before acquiring the lock
262397
obj = _ForkingPickler.dumps(obj)
263398
if wacquire is None:
@@ -301,6 +436,12 @@ def _on_queue_feeder_error(e, obj):
301436
__class_getitem__ = classmethod(types.GenericAlias)
302437

303438

439+
# Sentinel item used to release pending getter processes
440+
# when queue shuts down.
441+
class _ShutdownSentinel: pass
442+
_sentinel_shutdown = _ShutdownSentinel()
443+
444+
304445
_sentinel = object()
305446

306447
#
@@ -328,8 +469,16 @@ def __setstate__(self, state):
328469
def put(self, obj, block=True, timeout=None):
329470
if self._closed:
330471
raise ValueError(f"Queue {self!r} is closed")
331-
if not self._sem.acquire(block, timeout):
332-
raise Full
472+
if self._is_shutdown():
473+
raise ShutDown
474+
try:
475+
with self._handle_pending_processes(self._sem_pending_putters):
476+
if not self._sem.acquire(block, timeout):
477+
raise Full
478+
finally:
479+
if self._is_shutdown():
480+
self._release_pending_putters()
481+
raise ShutDown
333482

334483
with self._notempty, self._cond:
335484
if self._thread is None:
@@ -350,6 +499,17 @@ def join(self):
350499
if not self._unfinished_tasks._semlock._is_zero():
351500
self._cond.wait()
352501

502+
def _clear(self):
503+
super()._clear()
504+
505+
# Data could be in the buffer, not in the pipe.
506+
# Call acquire method of '_unfinished_tasks' Semaphore
507+
# until counter is zero.
508+
with self._cond:
509+
while not self._unfinished_tasks.locked():
510+
self._unfinished_tasks.acquire(block=False)
511+
self._cond.notify_all()
512+
353513
#
354514
# Simplified Queue type -- really just a locked pipe
355515
#

0 commit comments

Comments
 (0)