forked from fsprojects/FSharp.Control.TaskSeq
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTaskSeq.fs
More file actions
408 lines (314 loc) · 15.8 KB
/
TaskSeq.fs
File metadata and controls
408 lines (314 loc) · 15.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
namespace FSharp.Control
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks
// Just for convenience
module Internal = TaskSeqInternal
[<AutoOpen>]
module TaskSeqExtensions =
// these need to be in a module, not a type for proper auto-initialization of generic values
module TaskSeq =
let empty<'T> = Internal.empty<'T>
[<Sealed; AbstractClass>]
type TaskSeq private () =
// Rules for static classes, see bug report: https://github.com/dotnet/fsharp/issues/8093
// F# does not need this internally, but C# does
// 'Abstract & Sealed': makes it a static class in C#
// the 'private ()' ensure that a constructor is emitted, which is required by IL
static member singleton(value: 'T) = Internal.singleton value
static member isEmpty source = Internal.isEmpty source
//
// Convert 'ToXXX' functions
//
static member toList(source: TaskSeq<'T>) = [
Internal.checkNonNull (nameof source) source
let e = source.GetAsyncEnumerator CancellationToken.None
try
while (let vt = e.MoveNextAsync() in if vt.IsCompleted then vt.Result else vt.AsTask().Result) do
yield e.Current
finally
e.DisposeAsync().AsTask().Wait()
]
static member toArray(source: TaskSeq<'T>) = [|
Internal.checkNonNull (nameof source) source
let e = source.GetAsyncEnumerator CancellationToken.None
try
while (let vt = e.MoveNextAsync() in if vt.IsCompleted then vt.Result else vt.AsTask().Result) do
yield e.Current
finally
e.DisposeAsync().AsTask().Wait()
|]
static member toSeq(source: TaskSeq<'T>) =
Internal.checkNonNull (nameof source) source
seq {
let e = source.GetAsyncEnumerator CancellationToken.None
try
while (let vt = e.MoveNextAsync() in if vt.IsCompleted then vt.Result else vt.AsTask().Result) do
yield e.Current
finally
e.DisposeAsync().AsTask().Wait()
}
static member toArrayAsync source =
Internal.toResizeArrayAsync source
|> Task.map (fun a -> a.ToArray())
static member toListAsync source = Internal.toResizeArrayAndMapAsync List.ofSeq source
static member toResizeArrayAsync source = Internal.toResizeArrayAsync source
static member toIListAsync source = Internal.toResizeArrayAndMapAsync (fun x -> x :> IList<_>) source
//
// Convert 'OfXXX' functions
//
static member ofArray(source: 'T[]) =
Internal.checkNonNull (nameof source) source
taskSeq {
for c in source do
yield c
}
static member ofList(source: 'T list) = taskSeq {
for c in source do
yield c
}
static member ofSeq(source: 'T seq) =
Internal.checkNonNull (nameof source) source
taskSeq {
for c in source do
yield c
}
static member ofResizeArray(source: 'T ResizeArray) =
Internal.checkNonNull (nameof source) source
taskSeq {
for c in source do
yield c
}
static member ofTaskSeq(source: #Task<'T> seq) =
Internal.checkNonNull (nameof source) source
taskSeq {
for c in source do
let! c = c
yield c
}
static member ofTaskList(source: #Task<'T> list) = taskSeq {
for c in source do
let! c = c
yield c
}
static member ofTaskArray(source: #Task<'T> array) =
Internal.checkNonNull (nameof source) source
taskSeq {
for c in source do
let! c = c
yield c
}
static member ofAsyncSeq(source: Async<'T> seq) =
Internal.checkNonNull (nameof source) source
taskSeq {
for c in source do
let! c = task { return! c }
yield c
}
static member ofAsyncList(source: Async<'T> list) = taskSeq {
for c in source do
let! c = Task.ofAsync c
yield c
}
static member ofAsyncArray(source: Async<'T> array) =
Internal.checkNonNull (nameof source) source
taskSeq {
for c in source do
let! c = Async.toTask c
yield c
}
//
// Utility functions
//
static member max source = Internal.maxMin max source
static member min source = Internal.maxMin min source
static member maxBy projection source = Internal.maxMinBy (<) projection source // looks like 'less than', is 'greater than'
static member minBy projection source = Internal.maxMinBy (>) projection source
static member maxByAsync projection source = Internal.maxMinByAsync (<) projection source // looks like 'less than', is 'greater than'
static member minByAsync projection source = Internal.maxMinByAsync (>) projection source
static member length source = Internal.lengthBy None source
static member lengthOrMax max source = Internal.lengthBeforeMax max source
static member lengthBy predicate source = Internal.lengthBy (Some(Predicate predicate)) source
static member lengthByAsync predicate source = Internal.lengthBy (Some(PredicateAsync predicate)) source
static member init count initializer = Internal.init (Some count) (InitAction initializer)
static member initInfinite initializer = Internal.init None (InitAction initializer)
static member initAsync count initializer = Internal.init (Some count) (InitActionAsync initializer)
static member initInfiniteAsync initializer = Internal.init None (InitActionAsync initializer)
static member delay(generator: unit -> TaskSeq<'T>) =
{ new IAsyncEnumerable<'T> with
member _.GetAsyncEnumerator(ct) = generator().GetAsyncEnumerator(ct)
}
static member concat(sources: TaskSeq<#TaskSeq<'T>>) =
Internal.checkNonNull (nameof sources) sources
taskSeq {
for ts in sources do
// no null-check of inner taskseqs, similar to seq
yield! (ts :> TaskSeq<'T>)
}
static member concat(sources: TaskSeq<'T seq>) = // NOTE: we cannot use flex types on two overloads
Internal.checkNonNull (nameof sources) sources
taskSeq {
for ts in sources do
// no null-check of inner seqs, similar to seq
yield! ts
}
static member concat(sources: TaskSeq<'T[]>) =
Internal.checkNonNull (nameof sources) sources
taskSeq {
for ts in sources do
// no null-check of inner arrays, similar to seq
yield! ts
}
static member concat(sources: TaskSeq<'T list>) =
Internal.checkNonNull (nameof sources) sources
taskSeq {
for ts in sources do
// no null-check of inner lists, similar to seq
yield! ts
}
static member concat(sources: TaskSeq<ResizeArray<'T>>) =
Internal.checkNonNull (nameof sources) sources
taskSeq {
for ts in sources do
// no null-check of inner resize arrays, similar to seq
yield! ts
}
static member append (source1: TaskSeq<'T>) (source2: TaskSeq<'T>) =
Internal.checkNonNull (nameof source1) source1
Internal.checkNonNull (nameof source2) source2
taskSeq {
yield! source1
yield! source2
}
static member appendSeq (source1: TaskSeq<'T>) (source2: seq<'T>) =
Internal.checkNonNull (nameof source1) source1
Internal.checkNonNull (nameof source2) source2
taskSeq {
yield! source1
yield! source2
}
static member prependSeq (source1: seq<'T>) (source2: TaskSeq<'T>) =
Internal.checkNonNull (nameof source1) source1
Internal.checkNonNull (nameof source2) source2
taskSeq {
yield! source1
yield! source2
}
static member chunkBySize (chunkSize: int) (source: TaskSeq<'T>) = Internal.chunkBySize chunkSize source
//
// iter/map/collect functions
//
static member cast source : TaskSeq<'T> = Internal.map (SimpleAction(fun (x: obj) -> x :?> 'T)) source
static member box source = Internal.map (SimpleAction box) source
static member unbox<'U when 'U: struct>(source: TaskSeq<obj>) : TaskSeq<'U> = Internal.map (SimpleAction unbox) source
static member iter action source = Internal.iter (SimpleAction action) source
static member iteri action source = Internal.iter (CountableAction action) source
static member iterAsync action source = Internal.iter (AsyncSimpleAction action) source
static member iteriAsync action source = Internal.iter (AsyncCountableAction action) source
static member map (mapper: 'T -> 'U) source = Internal.map (SimpleAction mapper) source
static member mapi (mapper: int -> 'T -> 'U) source = Internal.map (CountableAction mapper) source
static member mapAsync mapper source = Internal.map (AsyncSimpleAction mapper) source
static member mapiAsync mapper source = Internal.map (AsyncCountableAction mapper) source
static member collect (binder: 'T -> #TaskSeq<'U>) source = Internal.collect binder source
static member collectSeq (binder: 'T -> #seq<'U>) source = Internal.collectSeq binder source
static member collectAsync (binder: 'T -> #Task<#TaskSeq<'U>>) source : TaskSeq<'U> = Internal.collectAsync binder source
static member collectSeqAsync (binder: 'T -> #Task<#seq<'U>>) source : TaskSeq<'U> = Internal.collectSeqAsync binder source
//
// choosers, pickers and the like
//
static member tryHead source = Internal.tryHead source
static member head source =
Internal.tryHead source
|> Task.map (Option.defaultWith Internal.raiseEmptySeq)
static member tryLast source = Internal.tryLast source
static member last source =
Internal.tryLast source
|> Task.map (Option.defaultWith Internal.raiseEmptySeq)
static member tryTail source = Internal.tryTail source
static member tail source =
Internal.tryTail source
|> Task.map (Option.defaultWith Internal.raiseEmptySeq)
static member tryItem index source = Internal.tryItem index source
static member item index source =
if index < 0 then
invalidArg (nameof index) "The input must be non-negative."
Internal.tryItem index source
|> Task.map (Option.defaultWith Internal.raiseInsufficient)
static member tryExactlyOne source = Internal.tryExactlyOne source
static member exactlyOne source =
Internal.tryExactlyOne source
|> Task.map (Option.defaultWith (fun () -> invalidArg (nameof source) "The input sequence contains more than one element."))
static member indexed(source: TaskSeq<'T>) =
Internal.checkNonNull (nameof source) source
taskSeq {
let mutable i = 0
for x in source do
yield i, x
i <- i + 1
}
static member choose chooser source = Internal.choose (TryPick chooser) source
static member chooseAsync chooser source = Internal.choose (TryPickAsync chooser) source
static member filter predicate source = Internal.filter (Predicate predicate) source
static member filterAsync predicate source = Internal.filter (PredicateAsync predicate) source
static member where predicate source = Internal.filter (Predicate predicate) source
static member whereAsync predicate source = Internal.filter (PredicateAsync predicate) source
static member skip count source = Internal.skipOrTake Skip count source
static member drop count source = Internal.skipOrTake Drop count source
static member take count source = Internal.skipOrTake Take count source
static member truncate count source = Internal.skipOrTake Truncate count source
static member takeWhile predicate source = Internal.takeWhile false (Predicate predicate) source
static member takeWhileAsync predicate source = Internal.takeWhile false (PredicateAsync predicate) source
static member takeWhileInclusive predicate source = Internal.takeWhile true (Predicate predicate) source
static member takeWhileInclusiveAsync predicate source = Internal.takeWhile true (PredicateAsync predicate) source
static member skipWhile predicate source = Internal.skipWhile false (Predicate predicate) source
static member skipWhileAsync predicate source = Internal.skipWhile false (PredicateAsync predicate) source
static member skipWhileInclusive predicate source = Internal.skipWhile true (Predicate predicate) source
static member skipWhileInclusiveAsync predicate source = Internal.skipWhile true (PredicateAsync predicate) source
static member tryPick chooser source = Internal.tryPick (TryPick chooser) source
static member tryPickAsync chooser source = Internal.tryPick (TryPickAsync chooser) source
static member tryFind predicate source = Internal.tryFind (Predicate predicate) source
static member tryFindAsync predicate source = Internal.tryFind (PredicateAsync predicate) source
static member tryFindIndex predicate source = Internal.tryFindIndex (Predicate predicate) source
static member tryFindIndexAsync predicate source = Internal.tryFindIndex (PredicateAsync predicate) source
static member insertAt index value source = Internal.insertAt index (One value) source
static member insertManyAt index values source = Internal.insertAt index (Many values) source
static member removeAt index source = Internal.removeAt index source
static member removeManyAt index count source = Internal.removeManyAt index count source
static member updateAt index value source = Internal.updateAt index value source
static member except itemsToExclude source = Internal.except itemsToExclude source
static member exceptOfSeq itemsToExclude source = Internal.exceptOfSeq itemsToExclude source
static member forall predicate source = Internal.forall (Predicate predicate) source
static member forallAsync predicate source = Internal.forall (PredicateAsync predicate) source
static member exists predicate source =
Internal.tryFind (Predicate predicate) source
|> Task.map Option.isSome
static member existsAsync predicate source =
Internal.tryFind (PredicateAsync predicate) source
|> Task.map Option.isSome
static member contains value source =
Internal.tryFind (Predicate((=) value)) source
|> Task.map Option.isSome
static member pick chooser source =
Internal.tryPick (TryPick chooser) source
|> Task.map (Option.defaultWith Internal.raiseNotFound)
static member pickAsync chooser source =
Internal.tryPick (TryPickAsync chooser) source
|> Task.map (Option.defaultWith Internal.raiseNotFound)
static member find predicate source =
Internal.tryFind (Predicate predicate) source
|> Task.map (Option.defaultWith Internal.raiseNotFound)
static member findAsync predicate source =
Internal.tryFind (PredicateAsync predicate) source
|> Task.map (Option.defaultWith Internal.raiseNotFound)
static member findIndex predicate source =
Internal.tryFindIndex (Predicate predicate) source
|> Task.map (Option.defaultWith Internal.raiseNotFound)
static member findIndexAsync predicate source =
Internal.tryFindIndex (PredicateAsync predicate) source
|> Task.map (Option.defaultWith Internal.raiseNotFound)
//
// zip/unzip/fold etc functions
//
static member zip source1 source2 = Internal.zip source1 source2
static member fold folder state source = Internal.fold (FolderAction folder) state source
static member foldAsync folder state source = Internal.fold (AsyncFolderAction folder) state source