Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions Lib/test/test_free_threading/test_code.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,39 @@
import unittest

try:
import ctypes
except ImportError:
ctypes = None

from threading import Thread
from unittest import TestCase

from test.support import threading_helper
from test.support.threading_helper import run_concurrently

if ctypes is not None:
capi = ctypes.pythonapi

freefunc = ctypes.CFUNCTYPE(None, ctypes.c_voidp)

RequestCodeExtraIndex = capi.PyUnstable_Eval_RequestCodeExtraIndex
RequestCodeExtraIndex.argtypes = (freefunc,)
RequestCodeExtraIndex.restype = ctypes.c_ssize_t

SetExtra = capi.PyUnstable_Code_SetExtra
SetExtra.argtypes = (ctypes.py_object, ctypes.c_ssize_t, ctypes.c_voidp)
SetExtra.restype = ctypes.c_int

GetExtra = capi.PyUnstable_Code_GetExtra
GetExtra.argtypes = (
ctypes.py_object,
ctypes.c_ssize_t,
ctypes.POINTER(ctypes.c_voidp),
)
GetExtra.restype = ctypes.c_int

NTHREADS = 20


@threading_helper.requires_working_threading()
class TestCode(TestCase):
Expand All @@ -25,6 +55,78 @@ def run_in_thread():
for thread in threads:
thread.join()

@unittest.skipUnless(ctypes, "ctypes is required")
def test_request_code_extra_index_concurrent(self):
"""Test concurrent calls to RequestCodeExtraIndex"""
results = []

def worker():
idx = RequestCodeExtraIndex(freefunc(0))
self.assertGreaterEqual(idx, 0)
results.append(idx)

run_concurrently(worker_func=worker, nthreads=NTHREADS)

# Every thread must get a unique index.
self.assertEqual(len(results), NTHREADS)
self.assertEqual(len(set(results)), NTHREADS)

@unittest.skipUnless(ctypes, "ctypes is required")
def test_code_extra_all_ops_concurrent(self):
"""Test concurrent RequestCodeExtraIndex + SetExtra + GetExtra"""
LOOP = 100

def f():
pass

code = f.__code__

def worker():
idx = RequestCodeExtraIndex(freefunc(0))
self.assertGreaterEqual(idx, 0)

for i in range(LOOP):
SetExtra(code, idx, ctypes.c_voidp(i + 1))

for _ in range(LOOP):
extra = ctypes.c_voidp()
GetExtra(code, idx, extra)
# The slot was set by this thread, so the value must
# be the last one written.
self.assertEqual(extra.value, LOOP)

run_concurrently(worker_func=worker, nthreads=NTHREADS)

@unittest.skipUnless(ctypes, "ctypes is required")
def test_code_extra_set_get_concurrent(self):
"""Test concurrent SetExtra + GetExtra on a shared index"""
LOOP = 100

def f():
pass

code = f.__code__

idx = RequestCodeExtraIndex(freefunc(0))
self.assertGreaterEqual(idx, 0)

def worker():
for i in range(LOOP):
SetExtra(code, idx, ctypes.c_voidp(i + 1))

for _ in range(LOOP):
extra = ctypes.c_voidp()
GetExtra(code, idx, extra)
# Value is set by any writer thread.
self.assertTrue(1 <= extra.value <= LOOP)

run_concurrently(worker_func=worker, nthreads=NTHREADS)

# Every thread's last write is LOOP, so the final value must be LOOP.
extra = ctypes.c_voidp()
GetExtra(code, idx, extra)
self.assertEqual(extra.value, LOOP)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Made :c:func:`PyUnstable_Code_SetExtra`, :c:func:`PyUnstable_Code_GetExtra`,
and :c:func:`PyUnstable_Eval_RequestCodeExtraIndex` thread-safe on the
:term:`free threaded <free threading>` build.
103 changes: 77 additions & 26 deletions Objects/codeobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -1575,6 +1575,12 @@ typedef struct {
} _PyCodeObjectExtra;


static inline size_t
code_extra_size(Py_ssize_t n)
{
return sizeof(_PyCodeObjectExtra) + (n - 1) * sizeof(void *);
}

int
PyUnstable_Code_GetExtra(PyObject *code, Py_ssize_t index, void **extra)
{
Expand All @@ -1583,15 +1589,19 @@ PyUnstable_Code_GetExtra(PyObject *code, Py_ssize_t index, void **extra)
return -1;
}

PyCodeObject *o = (PyCodeObject*) code;
_PyCodeObjectExtra *co_extra = (_PyCodeObjectExtra*) o->co_extra;
PyCodeObject *o = (PyCodeObject *)code;
*extra = NULL;

if (co_extra == NULL || index < 0 || co_extra->ce_size <= index) {
*extra = NULL;
if (index < 0) {
return 0;
}

*extra = co_extra->ce_extras[index];
// Lock-free read; pairs with release store in SetExtra.
_PyCodeObjectExtra *co_extra = FT_ATOMIC_LOAD_PTR_ACQUIRE(o->co_extra);
if (co_extra != NULL && index < co_extra->ce_size) {
*extra = co_extra->ce_extras[index];
Comment thread
yoney marked this conversation as resolved.
Outdated
}

return 0;
}

Expand All @@ -1601,40 +1611,81 @@ PyUnstable_Code_SetExtra(PyObject *code, Py_ssize_t index, void *extra)
{
PyInterpreterState *interp = _PyInterpreterState_GET();

if (!PyCode_Check(code) || index < 0 ||
index >= interp->co_extra_user_count) {
// co_extra_user_count increases monotonically and is published with a
// release store, so once an index is valid it remains valid.
Py_ssize_t user_count = FT_ATOMIC_LOAD_SSIZE_ACQUIRE(
interp->co_extra_user_count);

if (!PyCode_Check(code) || index < 0 || index >= user_count) {
PyErr_BadInternalCall();
return -1;
}

PyCodeObject *o = (PyCodeObject*) code;
_PyCodeObjectExtra *co_extra = (_PyCodeObjectExtra *) o->co_extra;
PyCodeObject *o = (PyCodeObject *) code;
int res = 0;

if (co_extra == NULL || co_extra->ce_size <= index) {
Py_ssize_t i = (co_extra == NULL ? 0 : co_extra->ce_size);
co_extra = PyMem_Realloc(
co_extra,
sizeof(_PyCodeObjectExtra) +
(interp->co_extra_user_count-1) * sizeof(void*));
if (co_extra == NULL) {
return -1;
}
for (; i < interp->co_extra_user_count; i++) {
co_extra->ce_extras[i] = NULL;
}
co_extra->ce_size = interp->co_extra_user_count;
o->co_extra = co_extra;
Py_BEGIN_CRITICAL_SECTION(o);

_PyCodeObjectExtra *old_extra = (_PyCodeObjectExtra *) o->co_extra;
Comment thread
yoney marked this conversation as resolved.
Outdated
Py_ssize_t old_size = (old_extra == NULL) ? 0 : old_extra->ce_size;

// user_count > index is checked above.
Py_ssize_t new_size = old_size > index ? old_size : user_count;
assert(new_size > 0 && new_size > index);

// Free-threaded builds require copy-on-write to avoid mutating
Comment thread
yoney marked this conversation as resolved.
Outdated
// co_extra while lock-free readers in GetExtra may be traversing it.
// GIL builds could realloc in place, but SetExtra is called rarely
// and co_extra is small, so use the same path for simplicity.
_PyCodeObjectExtra *co_extra = PyMem_Malloc(code_extra_size(new_size));
if (co_extra == NULL) {
PyErr_NoMemory();
res = -1;
goto done;
}

if (co_extra->ce_extras[index] != NULL) {
co_extra->ce_size = new_size;

// Copy existing extras from the old buffer.
if (old_size > 0) {
memcpy(co_extra->ce_extras, old_extra->ce_extras,
old_size * sizeof(void *));
}

// NULL-initialize new slots.
for (Py_ssize_t i = old_size; i < new_size; i++) {
co_extra->ce_extras[i] = NULL;
}

if (old_extra != NULL && index < old_size &&
Comment thread
yoney marked this conversation as resolved.
Outdated
old_extra->ce_extras[index] != NULL)
{
// Free the old extra value if a free function was registered.
// We assume the caller ensures no other thread is concurrently
// using the old value.
freefunc free = interp->co_extra_freefuncs[index];
if (free != NULL) {
free(co_extra->ce_extras[index]);
free(old_extra->ce_extras[index]);
}
}

co_extra->ce_extras[index] = extra;
Comment thread
yoney marked this conversation as resolved.
Outdated
return 0;

// Publish pointer and slot writes to lock-free readers.
FT_ATOMIC_STORE_PTR_RELEASE(o->co_extra, co_extra);

if (old_extra != NULL) {
#ifdef Py_GIL_DISABLED
// Defer container free for lock-free readers.
_PyMem_FreeDelayed(old_extra, code_extra_size(old_size));
#else
PyMem_Free(old_extra);
#endif
}

done:;
Py_END_CRITICAL_SECTION();
return res;
}


Expand Down
20 changes: 18 additions & 2 deletions Python/ceval.c
Original file line number Diff line number Diff line change
Expand Up @@ -3493,11 +3493,27 @@ PyUnstable_Eval_RequestCodeExtraIndex(freefunc free)
PyInterpreterState *interp = _PyInterpreterState_GET();
Py_ssize_t new_index;

if (interp->co_extra_user_count == MAX_CO_EXTRA_USERS - 1) {
#ifdef Py_GIL_DISABLED
struct _py_code_state *state = &interp->code_state;
FT_MUTEX_LOCK(&state->mutex);
#endif

if (interp->co_extra_user_count >= MAX_CO_EXTRA_USERS - 1) {
#ifdef Py_GIL_DISABLED
FT_MUTEX_UNLOCK(&state->mutex);
#endif
return -1;
}
new_index = interp->co_extra_user_count++;

new_index = interp->co_extra_user_count;
interp->co_extra_freefuncs[new_index] = free;

// Publish freefuncs[new_index] before making the index visible.
FT_ATOMIC_STORE_SSIZE_RELEASE(interp->co_extra_user_count, new_index + 1);

#ifdef Py_GIL_DISABLED
FT_MUTEX_UNLOCK(&state->mutex);
#endif
return new_index;
}

Expand Down
Loading