@@ -975,58 +975,7 @@ module internal TaskSeqInternal =
975975 raiseOutOfBounds ( nameof index)
976976 }
977977
978- // Consider turning using an F# version of this instead?
979- // https://github.com/i3arnon/ConcurrentHashSet
980- type ConcurrentHashSet < 'T when 'T: equality >( ct ) =
981- let _rwLock = new ReaderWriterLockSlim()
982- let hashSet = HashSet< 'T>( Array.empty, HashIdentity.Structural)
983-
984- member _.Add item =
985- _ rwLock.EnterWriteLock()
986-
987- try
988- hashSet.Add item
989- finally
990- _ rwLock.ExitWriteLock()
991-
992- member _.AddMany items =
993- _ rwLock.EnterWriteLock()
994-
995- try
996- for item in items do
997- hashSet.Add item |> ignore
998-
999- finally
1000- _ rwLock.ExitWriteLock()
1001-
1002- member _.AddManyAsync ( source : TaskSeq < 'T >) = task {
1003- use e = source.GetAsyncEnumerator( ct)
1004- let mutable go = true
1005- let! step = e.MoveNextAsync()
1006- go <- step
1007-
1008- while go do
1009- // NOTE: r/w lock cannot cross thread boundaries. Should we use SemaphoreSlim instead?
1010- // or alternatively, something like this: https://github.com/StephenCleary/AsyncEx/blob/8a73d0467d40ca41f9f9cf827c7a35702243abb8/src/Nito.AsyncEx.Coordination/AsyncReaderWriterLock.cs#L16
1011- // not sure how they compare.
1012-
1013- _ rwLock.EnterWriteLock()
1014-
1015- try
1016- hashSet.Add e.Current |> ignore
1017- finally
1018- _ rwLock.ExitWriteLock()
1019-
1020- let! step = e.MoveNextAsync()
1021- go <- step
1022- }
1023-
1024- interface IDisposable with
1025- override _.Dispose () =
1026- if not ( isNull _ rwLock) then
1027- _ rwLock.Dispose()
1028-
1029- let except itemsToExclude ( source : TaskSeq < _ >) =
978+ let except ( itemsToExclude : TaskSeq < _ >) ( source : TaskSeq < _ >) =
1030979 checkNonNull ( nameof source) source
1031980 checkNonNull ( nameof itemsToExclude) itemsToExclude
1032981
@@ -1037,9 +986,18 @@ module internal TaskSeqInternal =
1037986 go <- step
1038987
1039988 if step then
1040- // only create hashset by the time we actually start iterating
1041- use hashSet = new ConcurrentHashSet<_>( CancellationToken.None)
1042- do ! hashSet.AddManyAsync itemsToExclude
989+ // only create hashset by the time we actually start iterating;
990+ // taskSeq enumerates sequentially, so a plain HashSet suffices — no locking needed.
991+ let hashSet = HashSet<_>( HashIdentity.Structural)
992+
993+ use excl = itemsToExclude.GetAsyncEnumerator CancellationToken.None
994+ let! exclStep = excl.MoveNextAsync()
995+ let mutable exclGo = exclStep
996+
997+ while exclGo do
998+ hashSet.Add excl.Current |> ignore
999+ let! exclStep = excl.MoveNextAsync()
1000+ exclGo <- exclStep
10431001
10441002 while go do
10451003 let current = e.Current
@@ -1065,9 +1023,9 @@ module internal TaskSeqInternal =
10651023 go <- step
10661024
10671025 if step then
1068- // only create hashset by the time we actually start iterating
1069- use hashSet = new ConcurrentHashSet <_>( CancellationToken.None )
1070- do hashSet.AddMany itemsToExclude
1026+ // only create hashset by the time we actually start iterating;
1027+ // initialize directly from the seq — taskSeq is sequential so no locking needed.
1028+ let hashSet = HashSet <_>( itemsToExclude, HashIdentity.Structural )
10711029
10721030 while go do
10731031 let current = e.Current
0 commit comments