Skip to content

Commit e136267

Browse files
committed
gh-134634: Use a pipe instead of a queue for Pool's change notifier
Pool.__init__ used a multiprocessing.SimpleQueue for its internal _change_notifier. SimpleQueue creates Lock objects backed by POSIX named semaphores (sem_open), which requires /dev/shm. The change notifier never crosses a process boundary, so named semaphores were never needed. Replace it with a _ChangeNotifier class backed by multiprocessing.connection.Pipe (os.pipe). Same interface, no sem_open dependency. <claude>
1 parent cb76ab3 commit e136267

3 files changed

Lines changed: 95 additions & 4 deletions

File tree

Lib/multiprocessing/pool.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,30 @@ def _helper_reraises_exception(ex):
147147
# Class representing a process pool
148148
#
149149

150+
class _ChangeNotifier:
151+
"""Pipe-based notifier that does not depend on POSIX named semaphores.
152+
153+
Replaces the use of multiprocessing.SimpleQueue for the pool's change
154+
notification mechanism. SimpleQueue's internal locks require sem_open(),
155+
which fails on platforms without /dev/shm (AWS Lambda, Android, iOS).
156+
The change notifier never crosses a process boundary, so named semaphores
157+
were never needed.
158+
"""
159+
160+
def __init__(self):
161+
from .connection import Pipe
162+
self._reader, self._writer = Pipe(duplex=False)
163+
164+
def put(self, obj):
165+
self._writer.send_bytes(b'\0')
166+
167+
def get(self):
168+
self._reader.recv_bytes()
169+
170+
def empty(self):
171+
return not self._reader.poll(0)
172+
173+
150174
class _PoolCache(dict):
151175
"""
152176
Class that implements a cache for the Pool class that will notify
@@ -190,10 +214,7 @@ def __init__(self, processes=None, initializer=None, initargs=(),
190214
self._ctx = context or get_context()
191215
self._setup_queues()
192216
self._taskqueue = queue.SimpleQueue()
193-
# The _change_notifier queue exist to wake up self._handle_workers()
194-
# when the cache (self._cache) is empty or when there is a change in
195-
# the _state variable of the thread that runs _handle_workers.
196-
self._change_notifier = self._ctx.SimpleQueue()
217+
self._change_notifier = _ChangeNotifier()
197218
self._cache = _PoolCache(notifier=self._change_notifier)
198219
self._maxtasksperchild = maxtasksperchild
199220
self._initializer = initializer

Lib/test/_test_multiprocessing.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2828,6 +2828,9 @@ def raise_large_valuerror(wait):
28282828
def identity(x):
28292829
return x
28302830

2831+
def _kill_self():
2832+
os.kill(os.getpid(), signal.SIGKILL)
2833+
28312834
class CountedObject(object):
28322835
n_instances = 0
28332836

@@ -3068,6 +3071,41 @@ def test_make_pool(self):
30683071
p.close()
30693072
p.join()
30703073

3074+
def test_change_notifier_no_semaphore(self):
3075+
# gh-134634: The pool's change notifier uses a pipe instead of
3076+
# a multiprocessing.SimpleQueue to avoid depending on sem_open(),
3077+
# which requires /dev/shm.
3078+
if self.TYPE == 'manager':
3079+
return
3080+
from multiprocessing.pool import _ChangeNotifier
3081+
p = self.Pool(2)
3082+
try:
3083+
self.assertIsInstance(p._change_notifier, _ChangeNotifier)
3084+
result = p.map(sqr, list(range(10)))
3085+
self.assertEqual(result, list(map(sqr, range(10))))
3086+
finally:
3087+
p.close()
3088+
p.join()
3089+
3090+
def test_change_notifier_drain(self):
3091+
# gh-134634: Verify that multiple rapid notifications (from many
3092+
# tasks completing and cache emptying) are properly drained and
3093+
# the pool shuts down without hanging.
3094+
if self.TYPE == 'manager':
3095+
return
3096+
p = self.Pool(4)
3097+
try:
3098+
# Submit many short tasks so notifications pile up faster
3099+
# than the worker handler can drain them.
3100+
results = [p.apply_async(sqr, (i,)) for i in range(200)]
3101+
for i, r in enumerate(results):
3102+
self.assertEqual(r.get(timeout=support.SHORT_TIMEOUT), sqr(i))
3103+
finally:
3104+
# close() adds another notification on top of the cache-empty
3105+
# ones. If draining is broken, join() hangs.
3106+
p.close()
3107+
p.join()
3108+
30713109
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
30723110
def test_terminate(self):
30733111
# Simulate slow tasks which take "forever" to complete
@@ -3331,6 +3369,33 @@ def test_pool_worker_lifetime_early_close(self):
33313369
for (j, res) in enumerate(results):
33323370
self.assertEqual(res.get(), sqr(j))
33333371

3372+
def test_pool_worker_killed_mid_task(self):
3373+
# gh-134634: Verify the worker handler detects a killed worker
3374+
# via its sentinel fd and replaces it, keeping the pool functional.
3375+
p = multiprocessing.Pool(3)
3376+
orig_count = len(p._pool)
3377+
self.assertEqual(orig_count, 3)
3378+
3379+
# Kill one worker. The task will never return a result.
3380+
p.apply_async(_kill_self)
3381+
3382+
# Give the worker handler time to detect the death and replace.
3383+
deadline = time.monotonic() + support.SHORT_TIMEOUT
3384+
while time.monotonic() < deadline:
3385+
alive = [w for w in p._pool if w.is_alive()]
3386+
if len(alive) >= orig_count:
3387+
break
3388+
time.sleep(0.1)
3389+
self.assertEqual(len(p._pool), orig_count,
3390+
"worker handler did not replace the killed worker")
3391+
3392+
# Pool should still be functional with the replacement worker.
3393+
result = p.map(sqr, list(range(5)))
3394+
self.assertEqual(result, list(map(sqr, range(5))))
3395+
3396+
p.terminate()
3397+
p.join()
3398+
33343399
def test_pool_maxtasksperchild_invalid(self):
33353400
for value in [0, -1, 0.5, "12"]:
33363401
with self.assertRaises(ValueError):
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
:class:`multiprocessing.pool.Pool` and :class:`multiprocessing.pool.ThreadPool`
2+
no longer require POSIX named semaphores for the internal change notification
3+
mechanism. This fixes ``Pool`` and ``ThreadPool`` on platforms where
4+
``sem_open()`` is unavailable, such as AWS Lambda (no ``/dev/shm``), Android,
5+
and iOS.

0 commit comments

Comments
 (0)