Skip to content

Commit 7bde226

Browse files
author
Onur Gumus
committed
feat: add TaskSeq.foldUntil and foldUntilAsync with FoldStep<'State> DU
Fold over a task sequence with early termination. The folder returns `Continue newState` to keep consuming, or `Halt newState` to stop immediately; in either case the state is updated. No elements past the one that caused Halt are enumerated from the input. Motivation: reaching for `fold`/`foldAsync` often ends with a mutable flag or a `match state with Decided -> state | _ -> ...` guard at the top of the folder when the caller wants to short-circuit on some condition discovered mid-stream. `tryPickAsync` handles the "find and halt" subcase but cannot also thread an accumulator. `foldUntil` fills the gap and removes the impedance mismatch. Includes: - FoldStep<'State> DU (Continue / Halt) in public API - Internal FoldUntilAction DU unifying sync/async folder dispatch - Public TaskSeq.foldUntil and TaskSeq.foldUntilAsync - XML docs on both overloads mirroring fold/foldAsync style - Release-notes entry - 22 tests covering: null source, empty sequence (no folder call, initial state preserved), all-Continue (equals fold), Halt-on-first (1 folder call, no further pulls), Halt mid-sequence (correct count, source not pulled past Halt), Halt-on-last (equals fold)
1 parent 1e18f22 commit 7bde226

File tree

6 files changed

+319
-0
lines changed

6 files changed

+319
-0
lines changed

release-notes.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Release notes:
33

44
Unreleased
5+
- adds TaskSeq.foldUntil and TaskSeq.foldUntilAsync: fold with early termination via a FoldStep<'State> DU (Continue / Halt). The underlying sequence is not enumerated past the element that caused Halt.
56
- test: add SideEffects module and ImmTaskSeq variant tests to TaskSeq.ChunkBy.Tests.fs, improving coverage for chunkBy and chunkByAsync
67
- fixes: `Async.bind` signature corrected from `(Async<'T> -> Async<'U>)` to `('T -> Async<'U>)` to match standard monadic bind semantics (same as `Task.bind`); the previous signature made the function effectively equivalent to direct application
78
- refactor: simplify splitAt 'rest' taskSeq to use while!, removing redundant go2 mutable and manual MoveNextAsync pre-advance

src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
<Compile Include="TaskSeq.FindIndex.Tests.fs" />
2929
<Compile Include="TaskSeq.Find.Tests.fs" />
3030
<Compile Include="TaskSeq.Fold.Tests.fs" />
31+
<Compile Include="TaskSeq.FoldUntil.Tests.fs" />
3132
<Compile Include="TaskSeq.Scan.Tests.fs" />
3233
<Compile Include="TaskSeq.MapFold.Tests.fs" />
3334
<Compile Include="TaskSeq.Reduce.Tests.fs" />
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
module TaskSeq.Tests.FoldUntil
2+
3+
open Xunit
4+
open FsUnit.Xunit
5+
6+
open FSharp.Control
7+
8+
//
9+
// TaskSeq.foldUntil
10+
// TaskSeq.foldUntilAsync
11+
//
12+
13+
module EmptySeq =
14+
[<Fact>]
15+
let ``Null source is invalid`` () =
16+
assertNullArg
17+
<| fun () -> TaskSeq.foldUntil (fun _ _ -> Continue 42) 0 null
18+
19+
assertNullArg
20+
<| fun () -> TaskSeq.foldUntilAsync (fun _ _ -> Task.fromResult (Continue 42)) 0 null
21+
22+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
23+
let ``TaskSeq-foldUntil returns initial state when empty`` variant = task {
24+
let! result =
25+
Gen.getEmptyVariant variant
26+
|> TaskSeq.foldUntil (fun _ item -> Continue(item + 1)) -1
27+
28+
result |> should equal -1
29+
}
30+
31+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
32+
let ``TaskSeq-foldUntilAsync returns initial state when empty`` variant = task {
33+
let! result =
34+
Gen.getEmptyVariant variant
35+
|> TaskSeq.foldUntilAsync (fun _ item -> task { return Continue(item + 1) }) -1
36+
37+
result |> should equal -1
38+
}
39+
40+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
41+
let ``TaskSeq-foldUntil does not call folder when empty`` variant = task {
42+
let mutable called = false
43+
44+
let! _ =
45+
Gen.getEmptyVariant variant
46+
|> TaskSeq.foldUntil
47+
(fun state _ ->
48+
called <- true
49+
Continue state)
50+
0
51+
52+
called |> should be False
53+
}
54+
55+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
56+
let ``TaskSeq-foldUntilAsync does not call folder when empty`` variant = task {
57+
let mutable called = false
58+
59+
let! _ =
60+
Gen.getEmptyVariant variant
61+
|> TaskSeq.foldUntilAsync
62+
(fun state _ -> task {
63+
called <- true
64+
return Continue state
65+
})
66+
0
67+
68+
called |> should be False
69+
}
70+
71+
module Functionality =
72+
[<Fact>]
73+
let ``TaskSeq-foldUntil with all Continue behaves like fold`` () = task {
74+
let! result =
75+
TaskSeq.ofList [ 1; 2; 3; 4; 5 ]
76+
|> TaskSeq.foldUntil (fun acc item -> Continue(acc + item)) 0
77+
78+
result |> should equal 15
79+
}
80+
81+
[<Fact>]
82+
let ``TaskSeq-foldUntilAsync with all Continue behaves like foldAsync`` () = task {
83+
let! result =
84+
TaskSeq.ofList [ 1; 2; 3; 4; 5 ]
85+
|> TaskSeq.foldUntilAsync (fun acc item -> task { return Continue(acc + item) }) 0
86+
87+
result |> should equal 15
88+
}
89+
90+
[<Fact>]
91+
let ``TaskSeq-foldUntil is left-associative like fold`` () = task {
92+
let! result =
93+
TaskSeq.ofList [ "b"; "c"; "d" ]
94+
|> TaskSeq.foldUntil (fun acc item -> Continue(acc + item)) "a"
95+
96+
result |> should equal "abcd"
97+
}
98+
99+
[<Fact>]
100+
let ``TaskSeq-foldUntilAsync is left-associative like foldAsync`` () = task {
101+
let! result =
102+
TaskSeq.ofList [ "b"; "c"; "d" ]
103+
|> TaskSeq.foldUntilAsync (fun acc item -> task { return Continue(acc + item) }) "a"
104+
105+
result |> should equal "abcd"
106+
}
107+
108+
module Halt =
109+
[<Fact>]
110+
let ``TaskSeq-foldUntil Halt on first element stops immediately`` () = task {
111+
let mutable callCount = 0
112+
113+
let! result =
114+
TaskSeq.ofList [ 1; 2; 3; 4; 5 ]
115+
|> TaskSeq.foldUntil
116+
(fun _ item ->
117+
callCount <- callCount + 1
118+
Halt item)
119+
0
120+
121+
result |> should equal 1
122+
callCount |> should equal 1
123+
}
124+
125+
[<Fact>]
126+
let ``TaskSeq-foldUntilAsync Halt on first element stops immediately`` () = task {
127+
let mutable callCount = 0
128+
129+
let! result =
130+
TaskSeq.ofList [ 1; 2; 3; 4; 5 ]
131+
|> TaskSeq.foldUntilAsync
132+
(fun _ item -> task {
133+
callCount <- callCount + 1
134+
return Halt item
135+
})
136+
0
137+
138+
result |> should equal 1
139+
callCount |> should equal 1
140+
}
141+
142+
[<Fact>]
143+
let ``TaskSeq-foldUntil halts mid-sequence, preserving halt state`` () = task {
144+
// Sum until running total exceeds 5, then halt with the overshoot.
145+
let mutable callCount = 0
146+
147+
let! result =
148+
TaskSeq.ofList [ 1; 2; 3; 4; 5 ]
149+
|> TaskSeq.foldUntil
150+
(fun acc item ->
151+
callCount <- callCount + 1
152+
let next = acc + item
153+
if next > 5 then Halt next else Continue next)
154+
0
155+
156+
// 1, 3, 6 — halts on the third element (6 > 5)
157+
result |> should equal 6
158+
callCount |> should equal 3
159+
}
160+
161+
[<Fact>]
162+
let ``TaskSeq-foldUntilAsync halts mid-sequence, preserving halt state`` () = task {
163+
let mutable callCount = 0
164+
165+
let! result =
166+
TaskSeq.ofList [ 1; 2; 3; 4; 5 ]
167+
|> TaskSeq.foldUntilAsync
168+
(fun acc item -> task {
169+
callCount <- callCount + 1
170+
let next = acc + item
171+
return if next > 5 then Halt next else Continue next
172+
})
173+
0
174+
175+
result |> should equal 6
176+
callCount |> should equal 3
177+
}
178+
179+
[<Fact>]
180+
let ``TaskSeq-foldUntil does not enumerate past the halt element`` () = task {
181+
// Source has a side effect per pulled element; halt on the 2nd.
182+
let mutable pulled = 0
183+
184+
let source = taskSeq {
185+
for i in 1..5 do
186+
pulled <- pulled + 1
187+
yield i
188+
}
189+
190+
let! _ =
191+
source
192+
|> TaskSeq.foldUntil (fun _ item -> if item = 2 then Halt item else Continue item) 0
193+
194+
// The source yielded 1 (Continue), then 2 (Halt) — we must not pull 3.
195+
pulled |> should equal 2
196+
}
197+
198+
[<Fact>]
199+
let ``TaskSeq-foldUntilAsync does not enumerate past the halt element`` () = task {
200+
let mutable pulled = 0
201+
202+
let source = taskSeq {
203+
for i in 1..5 do
204+
pulled <- pulled + 1
205+
yield i
206+
}
207+
208+
let! _ =
209+
source
210+
|> TaskSeq.foldUntilAsync (fun _ item -> task { return if item = 2 then Halt item else Continue item }) 0
211+
212+
pulled |> should equal 2
213+
}
214+
215+
[<Fact>]
216+
let ``TaskSeq-foldUntil on last-element Halt is equivalent to fold`` () = task {
217+
let! result =
218+
TaskSeq.ofList [ 1; 2; 3; 4; 5 ]
219+
|> TaskSeq.foldUntil
220+
(fun acc item ->
221+
let next = acc + item
222+
if item = 5 then Halt next else Continue next)
223+
0
224+
225+
result |> should equal 15
226+
}

src/FSharp.Control.TaskSeq/TaskSeq.fs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,10 @@ type TaskSeq private () =
540540
static member compareWithAsync comparer source1 source2 = Internal.compareWithAsync comparer source1 source2
541541
static member fold folder state source = Internal.fold (FolderAction folder) state source
542542
static member foldAsync folder state source = Internal.fold (AsyncFolderAction folder) state source
543+
static member foldUntil folder state source = Internal.foldUntil (FoldUntilAction folder) state source
544+
545+
static member foldUntilAsync folder state source = Internal.foldUntil (AsyncFoldUntilAction folder) state source
546+
543547
static member scan folder state source = Internal.scan (FolderAction folder) state source
544548
static member scanAsync folder state source = Internal.scan (AsyncFolderAction folder) state source
545549
static member reduce folder source = Internal.reduce (FolderAction folder) source

src/FSharp.Control.TaskSeq/TaskSeq.fsi

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1865,6 +1865,42 @@ type TaskSeq =
18651865
static member foldAsync:
18661866
folder: ('State -> 'T -> #Task<'State>) -> state: 'State -> source: TaskSeq<'T> -> Task<'State>
18671867

1868+
/// <summary>
1869+
/// Applies the function <paramref name="folder" /> to each element in the task sequence, threading an
1870+
/// accumulator of type <paramref name="'State" /> through the computation, with the ability to stop
1871+
/// early. The folder returns <c>Continue newState</c> to keep consuming or <c>Halt newState</c> to
1872+
/// stop iteration immediately; in either case the state is updated. When the folder halts, no further
1873+
/// elements of the input are enumerated.
1874+
/// If the folder function <paramref name="folder" /> is asynchronous, consider using
1875+
/// <see cref="TaskSeq.foldUntilAsync" />.
1876+
/// </summary>
1877+
///
1878+
/// <param name="folder">A function that updates the state and decides whether to continue or halt.</param>
1879+
/// <param name="state">The initial state.</param>
1880+
/// <param name="source">The input sequence.</param>
1881+
/// <returns>The state object, either after the folder halts or after the whole sequence has been consumed.</returns>
1882+
/// <exception cref="T:ArgumentNullException">Thrown when the input task sequence is null.</exception>
1883+
static member foldUntil:
1884+
folder: ('State -> 'T -> FoldStep<'State>) -> state: 'State -> source: TaskSeq<'T> -> Task<'State>
1885+
1886+
/// <summary>
1887+
/// Applies the asynchronous function <paramref name="folder" /> to each element in the task sequence,
1888+
/// threading an accumulator of type <paramref name="'State" /> through the computation, with the ability
1889+
/// to stop early. The folder returns <c>Continue newState</c> to keep consuming or <c>Halt newState</c>
1890+
/// to stop iteration immediately; in either case the state is updated. When the folder halts, no further
1891+
/// elements of the input are enumerated.
1892+
/// If the folder function <paramref name="folder" /> is synchronous, consider using
1893+
/// <see cref="TaskSeq.foldUntil" />.
1894+
/// </summary>
1895+
///
1896+
/// <param name="folder">A function that updates the state and decides whether to continue or halt.</param>
1897+
/// <param name="state">The initial state.</param>
1898+
/// <param name="source">The input sequence.</param>
1899+
/// <returns>The state object, either after the folder halts or after the whole sequence has been consumed.</returns>
1900+
/// <exception cref="T:ArgumentNullException">Thrown when the input task sequence is null.</exception>
1901+
static member foldUntilAsync:
1902+
folder: ('State -> 'T -> #Task<FoldStep<'State>>) -> state: 'State -> source: TaskSeq<'T> -> Task<'State>
1903+
18681904
/// <summary>
18691905
/// Like <see cref="TaskSeq.fold" />, but returns the sequence of intermediate results and the final result.
18701906
/// The first element of the output sequence is always the initial state. If the input task sequence

src/FSharp.Control.TaskSeq/TaskSeqInternal.fs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,19 @@ type internal FolderAction<'T, 'State, 'TaskState when 'TaskState :> Task<'State
3434
| FolderAction of state_action: ('State -> 'T -> 'State)
3535
| AsyncFolderAction of async_state_action: ('State -> 'T -> 'TaskState)
3636

37+
/// The result of a single folder step in <see cref="TaskSeq.foldUntil" /> or
38+
/// <see cref="TaskSeq.foldUntilAsync" />: <c>Continue</c> threads the new state and
39+
/// keeps consuming, <c>Halt</c> records the final state and stops iteration.
40+
[<Struct>]
41+
type FoldStep<'State> =
42+
| Continue of continue_state: 'State
43+
| Halt of halt_state: 'State
44+
45+
[<Struct>]
46+
type internal FoldUntilAction<'T, 'State, 'TaskStep when 'TaskStep :> Task<FoldStep<'State>>> =
47+
| FoldUntilAction of fold_until_action: ('State -> 'T -> FoldStep<'State>)
48+
| AsyncFoldUntilAction of async_fold_until_action: ('State -> 'T -> 'TaskStep)
49+
3750
[<Struct>]
3851
type internal ChooserAction<'T, 'U, 'TaskOption when 'TaskOption :> Task<'U option>> =
3952
| TryPick of try_pick: ('T -> 'U option)
@@ -409,6 +422,44 @@ module internal TaskSeqInternal =
409422
return result
410423
}
411424

425+
let foldUntil folder initial (source: TaskSeq<_>) =
426+
checkNonNull (nameof source) source
427+
428+
task {
429+
use e = source.GetAsyncEnumerator CancellationToken.None
430+
let mutable result = initial
431+
let mutable running = true
432+
433+
match folder with
434+
| FoldUntilAction folder ->
435+
while running do
436+
let! hasNext = e.MoveNextAsync()
437+
438+
if hasNext then
439+
match folder result e.Current with
440+
| Continue s -> result <- s
441+
| Halt s ->
442+
result <- s
443+
running <- false
444+
else
445+
running <- false
446+
447+
| AsyncFoldUntilAction folder ->
448+
while running do
449+
let! hasNext = e.MoveNextAsync()
450+
451+
if hasNext then
452+
match! folder result e.Current with
453+
| Continue s -> result <- s
454+
| Halt s ->
455+
result <- s
456+
running <- false
457+
else
458+
running <- false
459+
460+
return result
461+
}
462+
412463
let scan folder initial (source: TaskSeq<_>) =
413464
checkNonNull (nameof source) source
414465

0 commit comments

Comments
 (0)