1717import types
1818import weakref
1919import errno
20+ import ctypes
2021
21- from queue import Empty , Full
22+ from queue import Empty , Full , ShutDown
2223
2324import _multiprocessing
2425
2829
2930from .util import debug , info , Finalize , register_after_fork , is_exiting
3031
32+ _queue_alive = 0
33+ _queue_shutdown = 1
34+ _queue_shutdown_immediate = 2
35+
3136#
3237# Queue type using a pipe, buffer and thread
3338#
@@ -50,6 +55,9 @@ def __init__(self, maxsize=0, *, ctx):
5055 # For use by concurrent.futures
5156 self ._ignore_epipe = False
5257 self ._reset ()
58+ self ._shutdown_state = context ._default_context .Value (
59+ ctypes .c_uint8 , lock = self ._rlock
60+ )
5361
5462 if sys .platform != 'win32' :
5563 register_after_fork (self , Queue ._after_fork )
@@ -86,20 +94,28 @@ def _reset(self, after_fork=False):
8694 def put (self , obj , block = True , timeout = None ):
8795 if self ._closed :
8896 raise ValueError (f"Queue { self !r} is closed" )
97+ if self ._shutdown_state .value != _queue_alive :
98+ raise ShutDown
8999 if not self ._sem .acquire (block , timeout ):
90100 raise Full
91101
92102 with self ._notempty :
103+ if self ._shutdown_state .value != _queue_alive :
104+ raise ShutDown
93105 if self ._thread is None :
94106 self ._start_thread ()
95107 self ._buffer .append (obj )
96108 self ._notempty .notify ()
97109
98110 def get (self , block = True , timeout = None ):
111+ if self ._shutdown_state .value == _queue_shutdown_immediate :
112+ raise ShutDown
99113 if self ._closed :
100114 raise ValueError (f"Queue { self !r} is closed" )
101115 if block and timeout is None :
102116 with self ._rlock :
117+ if self ._shutdown_state .value != _queue_alive :
118+ raise ShutDown
103119 res = self ._recv_bytes ()
104120 self ._sem .release ()
105121 else :
@@ -111,13 +127,19 @@ def get(self, block=True, timeout=None):
111127 if block :
112128 timeout = deadline - time .monotonic ()
113129 if not self ._poll (timeout ):
130+ if self ._shutdown_state .value != _queue_alive :
131+ raise ShutDown
114132 raise Empty
133+ if self ._shutdown_state .value != _queue_alive :
134+ raise ShutDown
115135 elif not self ._poll ():
116136 raise Empty
117137 res = self ._recv_bytes ()
118138 self ._sem .release ()
119139 finally :
120140 self ._rlock .release ()
141+ if self ._shutdown_state .value == _queue_shutdown :
142+ raise ShutDown
121143 # unserialize the data after having released the lock
122144 return _ForkingPickler .loads (res )
123145
@@ -329,6 +351,8 @@ def task_done(self):
329351
330352 def join (self ):
331353 with self ._cond :
354+ if self ._shutdown_state .value == _queue_shutdown_immediate :
355+ return
332356 if not self ._unfinished_tasks ._semlock ._is_zero ():
333357 self ._cond .wait ()
334358
0 commit comments