-
-
Notifications
You must be signed in to change notification settings - Fork 34.5k
Expand file tree
/
Copy pathpopen_fork.py
More file actions
136 lines (116 loc) · 4.33 KB
/
popen_fork.py
File metadata and controls
136 lines (116 loc) · 4.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import atexit
import os
import signal
import threading
from . import util
__all__ = ['Popen']
#
# Start child process using fork
#
class Popen(object):
method = 'fork'
def __init__(self, process_obj):
util._flush_std_streams()
self.returncode = None
self.finalizer = None
self._exit_condition = threading.Condition()
self._exit_blockers = 0
self._launch(process_obj)
def duplicate_for_child(self, fd):
return fd
def poll(self, flag=os.WNOHANG):
with self._exit_condition:
if self.returncode is not None:
return self.returncode
elif flag & os.WNOHANG == os.WNOHANG:
return self._nonblocking_poll(flag)
else:
self._exit_blockers += 1
# We have released the lock, so may be racing with blocking &
# non-blocking calls at this point...
pid = None
try:
pid, sts = os.waitpid(self.pid, flag)
except OSError:
# Child process doesn't exist because it hasn't started yet (see
# bpo-1731717) or has already been awaited on a racing thread (see
# gh-130895)
pass
with self._exit_condition:
self._exit_blockers -= 1
if pid == self.pid:
self._set_returncode(sts)
elif self._exit_blockers == 0:
self._exit_condition.notify_all()
# Wait until we get a definitive result, or we know there are no
# racing calls that might be about to set it
while self.returncode is None and self._exit_blockers > 0:
self._exit_condition.wait()
return self.returncode
def _nonblocking_poll(self, flag):
assert self._exit_condition._is_owned()
assert self.returncode is None
assert flag & os.WNOHANG == os.WNOHANG
try:
pid, sts = os.waitpid(self.pid, flag)
if pid == self.pid:
self._set_returncode(sts)
except OSError:
# See comments in the poll(...) except clause above
pass
# We may be racing with a blocking wait call, in which case (if we lose
# the race) it is arbitrary whether this returns None or the exit code
# (if there is one): calling code must always be prepared to handle a
# situation where this method returns None but the process has ended.
return self.returncode
def _set_returncode(self, sts):
assert self._exit_condition._is_owned()
assert self.returncode is None
self.returncode = os.waitstatus_to_exitcode(sts)
self._exit_condition.notify_all()
def wait(self, timeout=None):
if self.returncode is None:
if timeout is not None:
from multiprocessing.connection import wait
if not wait([self.sentinel], timeout):
return None
# This shouldn't block if wait() returned successfully.
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
return self.returncode
def _send_signal(self, sig):
if self.returncode is None:
try:
os.kill(self.pid, sig)
except ProcessLookupError:
pass
except OSError:
if self.wait(timeout=0.1) is None:
raise
def terminate(self):
self._send_signal(signal.SIGTERM)
def kill(self):
self._send_signal(signal.SIGKILL)
def _launch(self, process_obj):
code = 1
parent_r, child_w = os.pipe()
child_r, parent_w = os.pipe()
self.pid = os.fork()
if self.pid == 0:
try:
atexit._clear()
atexit.register(util._exit_function)
os.close(parent_r)
os.close(parent_w)
code = process_obj._bootstrap(parent_sentinel=child_r)
finally:
atexit._run_exitfuncs()
os._exit(code)
else:
os.close(child_w)
os.close(child_r)
self.finalizer = util.Finalize(self, util.close_fds,
(parent_r, parent_w,))
self.sentinel = parent_r
def close(self):
if self.finalizer is not None:
self.finalizer()