Skip to content

Commit 39c1c38

Browse files
authored
Get process output from Malt, instead of capturing it ourselves (#84)
* Get process output from `Malt`, instead of capturing it ourselves This enables us to get the output generated by crashes jobs. * DROP ME: temporarily point to my fork of Malt * Almost completely fix handling of colours * Pass `--color` option directly to workers to completely fix colour issues * Append newlines to flush streams in colorful output tests * Add tests about capturing output of crashed workers * Use `readavailable` instead of `readline` * Add more tests about output capturing * Replace `isopen` -> `!eof` * Fix Malt revision * Create a custom `PTRWorker` worker type to adapt to new upstream changes * Update docstrings * Malt v1.4.0 is now registered * Fold initialisation of `PTRWorker` in its constructor * `@async` -> `Threads.@spawn` in `stdio_loop` * Include worker ID in `PTRWorker` structure
1 parent 608f947 commit 39c1c38

3 files changed

Lines changed: 110 additions & 50 deletions

File tree

Project.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
name = "ParallelTestRunner"
22
uuid = "d3525ed8-44d0-4b2c-a655-542cee43accc"
33
authors = ["Valentin Churavy <v.churavy@gmail.com>"]
4-
version = "2.0.2"
4+
version = "2.1.0"
55

66
[deps]
77
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
@@ -17,7 +17,7 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
1717
[compat]
1818
Dates = "1"
1919
IOCapture = "0.2.5, 1"
20-
Malt = "1.3.0"
20+
Malt = "1.4.0"
2121
Printf = "1"
2222
Random = "1"
2323
Scratch = "1.3.0"

src/ParallelTestRunner.jl

Lines changed: 70 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,27 @@ function anynonpass(ts::Test.AbstractTestSet)
2323
end
2424
end
2525

26+
const ID_COUNTER = Threads.Atomic{Int}(0)
27+
28+
# Thin wrapper around Malt.Worker, to handle the stdio loop differently.
29+
struct PTRWorker <: Malt.AbstractWorker
30+
w::Malt.Worker
31+
io::IOBuffer
32+
id::Int
33+
end
34+
35+
function PTRWorker(; exename=Base.julia_cmd()[1], exeflags=String[], env=String[])
36+
io = IOBuffer()
37+
wrkr = Malt.Worker(; exename, exeflags, env, monitor_stdout=false, monitor_stderr=false)
38+
stdio_loop(wrkr, io)
39+
id = ID_COUNTER[] += 1
40+
return PTRWorker(wrkr, io, id)
41+
end
42+
43+
worker_id(wrkr::PTRWorker) = wrkr.id
44+
Malt.isrunning(wrkr::PTRWorker) = Malt.isrunning(wrkr.w)
45+
Malt.stop(wrkr::PTRWorker) = Malt.stop(wrkr.w)
46+
2647
#Always set the max rss so that if tests add large global variables (which they do) we don't make the GC's life too hard
2748
if Sys.WORD_SIZE == 64
2849
const JULIA_TEST_MAXRSS_MB = 3800
@@ -57,7 +78,6 @@ abstract type AbstractTestRecord end
5778

5879
struct TestRecord <: AbstractTestRecord
5980
value::DefaultTestSet
60-
output::String # captured stdout/stderr
6181

6282
# stats
6383
time::Float64
@@ -201,6 +221,25 @@ function print_test_crashed(wrkr, test, ctx::TestIOContext)
201221
end
202222
end
203223

224+
# Adapted from `Malt._stdio_loop`
225+
function stdio_loop(worker::Malt.Worker, io)
226+
Threads.@spawn while !eof(worker.stdout) && Malt.isrunning(worker)
227+
try
228+
bytes = readavailable(worker.stdout)
229+
write(io, bytes)
230+
catch
231+
break
232+
end
233+
end
234+
Threads.@spawn while !eof(worker.stderr) && Malt.isrunning(worker)
235+
try
236+
bytes = readavailable(worker.stderr)
237+
write(io, bytes)
238+
catch
239+
break
240+
end
241+
end
242+
end
204243

205244
#
206245
# entry point
@@ -236,7 +275,7 @@ function Test.finish(ts::WorkerTestSet)
236275
return ts.wrapped_ts
237276
end
238277

239-
function runtest(f, name, init_code, color)
278+
function runtest(f, name, init_code)
240279
function inner()
241280
# generate a temporary module to execute the tests in
242281
mod = @eval(Main, module $(gensym(name)) end)
@@ -252,28 +291,15 @@ function runtest(f, name, init_code, color)
252291
GC.gc(true)
253292
Random.seed!(1)
254293

255-
pipe = Pipe()
256-
pipe_initialized = Channel{Nothing}(1)
257-
reader = @async begin
258-
take!(pipe_initialized)
259-
read(pipe, String)
260-
end
261-
io = IOContext(pipe, :color=>$(color))
262-
stats = redirect_stdio(; stdout=io, stderr=io) do
263-
put!(pipe_initialized, nothing)
264-
265-
# @testset CustomTestRecord switches the all lower-level testset to our custom testset,
266-
# so we need to have two layers here such that the user-defined testsets are using `DefaultTestSet`.
267-
# This also guarantees our invariant about `WorkerTestSet` containing a single `DefaultTestSet`.
268-
@timed @testset WorkerTestSet "placeholder" begin
269-
@testset DefaultTestSet $name begin
270-
$f
271-
end
294+
# @testset CustomTestRecord switches the all lower-level testset to our custom testset,
295+
# so we need to have two layers here such that the user-defined testsets are using `DefaultTestSet`.
296+
# This also guarantees our invariant about `WorkerTestSet` containing a single `DefaultTestSet`.
297+
stats = @timed @testset WorkerTestSet "placeholder" begin
298+
@testset DefaultTestSet $name begin
299+
$f
272300
end
273301
end
274-
close(pipe.in)
275-
output = fetch(reader)
276-
(; testset=stats.value, output, stats.time, stats.bytes, stats.gctime)
302+
(; testset=stats.value, stats.time, stats.bytes, stats.gctime)
277303
end
278304

279305
# process results
@@ -392,7 +418,7 @@ function save_test_history(mod::Module, history::Dict{String, Float64})
392418
end
393419
end
394420

395-
function test_exe()
421+
function test_exe(color::Bool=false)
396422
test_exeflags = Base.julia_cmd()
397423
filter!(test_exeflags.exec) do c
398424
!(startswith(c, "--depwarn") || startswith(c, "--check-bounds"))
@@ -401,16 +427,12 @@ function test_exe()
401427
push!(test_exeflags.exec, "--startup-file=no")
402428
push!(test_exeflags.exec, "--depwarn=yes")
403429
push!(test_exeflags.exec, "--project=$(Base.active_project())")
430+
push!(test_exeflags.exec, "--color=$(color ? "yes" : "no")")
404431
return test_exeflags
405432
end
406433

407-
# Map PIDs to logical worker IDs
408-
# Malt doesn't have a global worker ID, and PID make printing ugly
409-
const WORKER_IDS = Dict{Int32, Int32}()
410-
worker_id(wrkr) = WORKER_IDS[wrkr.proc_pid]
411-
412434
"""
413-
addworkers(; env=Vector{Pair{String, String}}(), exename=nothing, exeflags=nothing)
435+
addworkers(; env=Vector{Pair{String, String}}(), exename=nothing, exeflags=nothing, color::Bool=false)
414436
415437
Add `X` worker processes.
416438
To add a single worker, use [`addworker`](@ref).
@@ -419,11 +441,12 @@ To add a single worker, use [`addworker`](@ref).
419441
- `env`: Vector of environment variable pairs to set for the worker process.
420442
- `exename`: Custom executable to use for the worker process.
421443
- `exeflags`: Custom flags to pass to the worker process.
444+
- `color`: Boolean flag to decide whether to start `julia` with `--color=yes` (if `true`) or `--color=no` (if `false`).
422445
"""
423446
addworkers(X; kwargs...) = [addworker(; kwargs...) for _ in 1:X]
424447

425448
"""
426-
addworker(; env=Vector{Pair{String, String}}(), exename=nothing, exeflags=nothing)
449+
addworker(; env=Vector{Pair{String, String}}(), exename=nothing, exeflags=nothing; color::Bool=false)
427450
428451
Add a single worker process.
429452
To add multiple workers, use [`addworkers`](@ref).
@@ -432,12 +455,15 @@ To add multiple workers, use [`addworkers`](@ref).
432455
- `env`: Vector of environment variable pairs to set for the worker process.
433456
- `exename`: Custom executable to use for the worker process.
434457
- `exeflags`: Custom flags to pass to the worker process.
458+
- `color`: Boolean flag to decide whether to start `julia` with `--color=yes` (if `true`) or `--color=no` (if `false`).
435459
"""
436460
function addworker(;
437461
env = Vector{Pair{String, String}}(),
438-
exename = nothing, exeflags = nothing
462+
exename = nothing,
463+
exeflags = nothing,
464+
color::Bool = false,
439465
)
440-
exe = test_exe()
466+
exe = test_exe(color)
441467
if exename === nothing
442468
exename = exe[1]
443469
end
@@ -450,10 +476,7 @@ function addworker(;
450476
push!(env, "JULIA_NUM_THREADS" => "1")
451477
# Malt already sets OPENBLAS_NUM_THREADS to 1
452478
push!(env, "OPENBLAS_NUM_THREADS" => "1")
453-
454-
wrkr = Malt.Worker(; exename, exeflags, env)
455-
WORKER_IDS[wrkr.proc_pid] = length(WORKER_IDS) + 1
456-
return wrkr
479+
return PTRWorker(; exename, exeflags, env)
457480
end
458481

459482
"""
@@ -840,7 +863,7 @@ function runtests(mod::Module, args::ParsedArgs;
840863
line3 = "Progress: $completed/$total tests completed"
841864
if completed > 0
842865
# estimate per-test time (slightly pessimistic)
843-
durations_done = [end_time - start_time for (_, _, start_time, end_time) in results]
866+
durations_done = [end_time - start_time for (_, _,_, start_time, end_time) in results]
844867
μ = mean(durations_done)
845868
σ = length(durations_done) > 1 ? std(durations_done) : 0.0
846869
est_per_test = μ + 0.5σ
@@ -970,15 +993,15 @@ function runtests(mod::Module, args::ParsedArgs;
970993
wrkr = p
971994
end
972995
if wrkr === nothing || !Malt.isrunning(wrkr)
973-
wrkr = p = addworker()
996+
wrkr = p = addworker(; io_ctx.color)
974997
end
975998

976999
# run the test
9771000
put!(printer_channel, (:started, test, worker_id(wrkr)))
9781001
result = try
979-
Malt.remote_eval_wait(Main, wrkr, :(import ParallelTestRunner))
980-
Malt.remote_call_fetch(invokelatest, wrkr, runtest,
981-
testsuite[test], test, init_code, io_ctx.color)
1002+
Malt.remote_eval_wait(Main, wrkr.w, :(import ParallelTestRunner))
1003+
Malt.remote_call_fetch(invokelatest, wrkr.w, runtest,
1004+
testsuite[test], test, init_code)
9821005
catch ex
9831006
if isa(ex, InterruptException)
9841007
# the worker got interrupted, signal other tasks to stop
@@ -989,7 +1012,8 @@ function runtests(mod::Module, args::ParsedArgs;
9891012
ex
9901013
end
9911014
test_t1 = time()
992-
push!(results, (test, result, test_t0, test_t1))
1015+
output = String(take!(wrkr.io))
1016+
push!(results, (test, result, output, test_t0, test_t1))
9931017

9941018
# act on the results
9951019
if result isa AbstractTestRecord
@@ -1070,10 +1094,10 @@ function runtests(mod::Module, args::ParsedArgs;
10701094
@async rmprocs(; waitfor=0)
10711095

10721096
# print the output generated by each testset
1073-
for (testname, result, start, stop) in results
1074-
if isa(result, AbstractTestRecord) && !isempty(result.output)
1097+
for (testname, result, output, start, stop) in results
1098+
if !isempty(output)
10751099
println(io_ctx.stdout, "\nOutput generated during execution of '$testname':")
1076-
lines = collect(eachline(IOBuffer(result.output)))
1100+
lines = collect(eachline(IOBuffer(output)))
10771101

10781102
for (i,line) in enumerate(lines)
10791103
prefix = if length(lines) == 1
@@ -1122,7 +1146,7 @@ function runtests(mod::Module, args::ParsedArgs;
11221146
function collect_results()
11231147
with_testset(o_ts) do
11241148
completed_tests = Set{String}()
1125-
for (testname, result, start, stop) in results
1149+
for (testname, result, output, start, stop) in results
11261150
push!(completed_tests, testname)
11271151

11281152
if result isa AbstractTestRecord

test/runtests.jl

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,10 @@ end
179179
end
180180

181181
@testset "crashing test" begin
182+
msg = "This test will crash"
182183
testsuite = Dict(
183184
"abort" => quote
185+
println($(msg))
184186
abort() = ccall(:abort, Nothing, ())
185187
abort()
186188
end
@@ -192,6 +194,13 @@ end
192194
end
193195

194196
str = String(take!(io))
197+
# Make sure we can capture the output generated by the crashed process, see
198+
# issue <https://github.com/JuliaTesting/ParallelTestRunner.jl/issues/83>.
199+
@test contains(str, msg)
200+
# "in expression starting at" comes from the abort trap, make sure we
201+
# captured that as well.
202+
@test contains(str, "in expression starting at")
203+
# Following are messages printed by ParallelTestRunner.
195204
@test contains(str, r"abort .+ started at")
196205
@test contains(str, r"abort .+ crashed at")
197206
@test contains(str, "FAILURE")
@@ -200,9 +209,10 @@ end
200209
end
201210

202211
@testset "test output" begin
212+
msg = "This is some output from the test"
203213
testsuite = Dict(
204214
"output" => quote
205-
println("This is some output from the test")
215+
println($(msg))
206216
end
207217
)
208218

@@ -211,7 +221,33 @@ end
211221

212222
str = String(take!(io))
213223
@test contains(str, r"output .+ started at")
214-
@test contains(str, r"This is some output from the test")
224+
@test contains(str, msg)
225+
@test contains(str, "SUCCESS")
226+
227+
msg2 = "More output"
228+
testsuite = Dict(
229+
"verbose-1" => quote
230+
print($(msg))
231+
end,
232+
"verbose-2" => quote
233+
println($(msg2))
234+
end,
235+
"silent" => quote
236+
@test true
237+
end,
238+
)
239+
io = IOBuffer()
240+
# Run all tests on the same worker, makre sure all the output is captured
241+
# and attributed to the correct test set.
242+
runtests(ParallelTestRunner, ["--verbose", "--jobs=1"]; testsuite, stdout=io, stderr=io)
243+
244+
str = String(take!(io))
245+
@test contains(str, r"verbose-1 .+ started at")
246+
@test contains(str, r"verbose-2 .+ started at")
247+
@test contains(str, r"silent .+ started at")
248+
@test contains(str, "Output generated during execution of 'verbose-1':\n[ $(msg)")
249+
@test contains(str, "Output generated during execution of 'verbose-2':\n[ $(msg2)")
250+
@test !contains(str, "Output generated during execution of 'silent':")
215251
@test contains(str, "SUCCESS")
216252
end
217253

0 commit comments

Comments
 (0)