Skip to content

Commit 8546a11

Browse files
Merge pull request #18 from Open-NET-Libraries/development
Development
2 parents 4544aa3 + d6bf353 commit 8546a11

80 files changed

Lines changed: 4707 additions & 2447 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,6 @@
22

33
# CA1303: Do not pass literals as localized parameters
44
dotnet_diagnostic.CA1303.severity = silent
5+
6+
# AD0001: Analyzer Failure
7+
dotnet_diagnostic.AD0001.severity = silent

Channel/AsChannel.cs

Lines changed: 20 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -9,29 +9,9 @@
99

1010
namespace Open.Database.Extensions
1111
{
12-
public static partial class ChannelExtensions
12+
[System.Diagnostics.CodeAnalysis.SuppressMessage("Reliability", "CA2012:Use ValueTasks correctly", Justification = "Intentionally running in the background.")]
13+
public static partial class ChannelDbExtensions
1314
{
14-
internal static Channel<T> CreateChannel<T>(int capacity = -1, bool singleReader = false, bool singleWriter = true)
15-
{
16-
if (capacity == 0) throw new ArgumentOutOfRangeException(nameof(capacity), capacity, "Cannot be zero.");
17-
if (capacity < -1) throw new ArgumentOutOfRangeException(nameof(capacity), capacity, "Must greater than zero or equal to negative one (unbounded).");
18-
19-
return capacity > 0
20-
? Channel.CreateBounded<T>(new BoundedChannelOptions(capacity)
21-
{
22-
SingleWriter = singleWriter,
23-
SingleReader = singleReader,
24-
AllowSynchronousContinuations = true,
25-
FullMode = BoundedChannelFullMode.Wait
26-
})
27-
: Channel.CreateUnbounded<T>(new UnboundedChannelOptions
28-
{
29-
SingleWriter = singleWriter,
30-
SingleReader = singleReader,
31-
AllowSynchronousContinuations = true
32-
});
33-
}
34-
3515
/// <summary>
3616
/// Iterates an IDataReader and writes each record as an array to an unbound channel.
3717
/// Be sure to await the completion.
@@ -61,16 +41,16 @@ public static ChannelReader<object[]> AsChannel(this IDataReader reader,
6141
/// <param name="arrayPool">The array pool to acquire buffers from.</param>
6242
/// <param name="cancellationToken">An optional cancellation token.</param>
6343
/// <returns>The channel reader containing the results.</returns>
64-
public static ChannelReader<object[]> AsChannel(this IDataReader reader,
44+
public static ChannelReader<object?[]> AsChannel(this IDataReader reader,
6545
bool singleReader,
66-
ArrayPool<object> arrayPool,
46+
ArrayPool<object?> arrayPool,
6747
CancellationToken cancellationToken = default)
6848
{
6949
if (reader is null) throw new ArgumentNullException(nameof(reader));
7050
if (arrayPool is null) throw new ArgumentNullException(nameof(arrayPool));
7151
Contract.EndContractBlock();
7252

73-
var channel = CreateChannel<object[]>(-1, singleReader);
53+
var channel = CreateChannel<object?[]>(-1, singleReader);
7454
_ = ToChannel(reader, channel.Writer, true, arrayPool, cancellationToken);
7555
return channel.Reader;
7656
}
@@ -172,16 +152,16 @@ public static ChannelReader<object[]> AsChannel(this IDbCommand command,
172152
/// <param name="arrayPool">The array pool to acquire buffers from.</param>
173153
/// <param name="cancellationToken">An optional cancellation token.</param>
174154
/// <returns>The channel reader containing the results.</returns>
175-
public static ChannelReader<object[]> AsChannel(this IDbCommand command,
155+
public static ChannelReader<object?[]> AsChannel(this IDbCommand command,
176156
bool singleReader,
177-
ArrayPool<object> arrayPool,
157+
ArrayPool<object?> arrayPool,
178158
CancellationToken cancellationToken = default)
179159
{
180160
if (command is null) throw new ArgumentNullException(nameof(command));
181161
if (arrayPool is null) throw new ArgumentNullException(nameof(arrayPool));
182162
Contract.EndContractBlock();
183163

184-
var channel = CreateChannel<object[]>(-1, singleReader);
164+
var channel = CreateChannel<object?[]>(-1, singleReader);
185165
_ = ToChannel(command, channel.Writer, true, arrayPool, cancellationToken);
186166
return channel.Reader;
187167
}
@@ -280,15 +260,15 @@ public static ChannelReader<object[]> AsChannel(this IExecuteReader command,
280260
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
281261
/// <param name="arrayPool">The array pool to acquire buffers from.</param>
282262
/// <returns>The channel reader containing the results.</returns>
283-
public static ChannelReader<object[]> AsChannel(this IExecuteReader command,
263+
public static ChannelReader<object?[]> AsChannel(this IExecuteReader command,
284264
bool singleReader,
285-
ArrayPool<object> arrayPool)
265+
ArrayPool<object?> arrayPool)
286266
{
287267
if (command is null) throw new ArgumentNullException(nameof(command));
288268
if (arrayPool is null) throw new ArgumentNullException(nameof(arrayPool));
289269
Contract.EndContractBlock();
290270

291-
var channel = CreateChannel<object[]>(-1, singleReader);
271+
var channel = CreateChannel<object?[]>(-1, singleReader);
292272
_ = ToChannel(command, channel.Writer, true, arrayPool);
293273
return channel.Reader;
294274
}
@@ -386,16 +366,16 @@ public static ChannelReader<object[]> AsChannelAsync(this DbDataReader reader,
386366
/// <param name="arrayPool">The array pool to acquire buffers from.</param>
387367
/// <param name="cancellationToken">An optional cancellation token.</param>
388368
/// <returns>The channel reader containing the results.</returns>
389-
public static ChannelReader<object[]> AsChannelAsync(this DbDataReader reader,
369+
public static ChannelReader<object?[]> AsChannelAsync(this DbDataReader reader,
390370
bool singleReader,
391-
ArrayPool<object> arrayPool,
371+
ArrayPool<object?> arrayPool,
392372
CancellationToken cancellationToken = default)
393373
{
394374
if (reader is null) throw new ArgumentNullException(nameof(reader));
395375
if (arrayPool is null) throw new ArgumentNullException(nameof(arrayPool));
396376
Contract.EndContractBlock();
397377

398-
var channel = CreateChannel<object[]>(-1, singleReader);
378+
var channel = CreateChannel<object?[]>(-1, singleReader);
399379
_ = ToChannelAsync(reader, channel.Writer, true, arrayPool, cancellationToken);
400380
return channel.Reader;
401381
}
@@ -497,16 +477,16 @@ public static ChannelReader<object[]> AsChannelAsync(this DbCommand command,
497477
/// <param name="arrayPool">The array pool to acquire buffers from.</param>
498478
/// <param name="cancellationToken">An optional cancellation token.</param>
499479
/// <returns>The channel reader containing the results.</returns>
500-
public static ChannelReader<object[]> AsChannelAsync(this DbCommand command,
480+
public static ChannelReader<object?[]> AsChannelAsync(this DbCommand command,
501481
bool singleReader,
502-
ArrayPool<object> arrayPool,
482+
ArrayPool<object?> arrayPool,
503483
CancellationToken cancellationToken = default)
504484
{
505485
if (command is null) throw new ArgumentNullException(nameof(command));
506486
if (arrayPool is null) throw new ArgumentNullException(nameof(arrayPool));
507487
Contract.EndContractBlock();
508488

509-
var channel = CreateChannel<object[]>(-1, singleReader);
489+
var channel = CreateChannel<object?[]>(-1, singleReader);
510490
_ = ToChannelAsync(command, channel.Writer, true, arrayPool, cancellationToken);
511491
return channel.Reader;
512492
}
@@ -605,14 +585,14 @@ public static ChannelReader<object[]> AsChannelAsync(this IExecuteReaderAsync co
605585
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
606586
/// <param name="arrayPool">The array pool to acquire buffers from.</param>
607587
/// <returns>The channel reader containing the results.</returns>
608-
public static ChannelReader<object[]> AsChannelAsync(this IExecuteReaderAsync command,
588+
public static ChannelReader<object?[]> AsChannelAsync(this IExecuteReaderAsync command,
609589
bool singleReader,
610-
ArrayPool<object> arrayPool)
590+
ArrayPool<object?> arrayPool)
611591
{
612592
if (command is null) throw new ArgumentNullException(nameof(command));
613593
Contract.EndContractBlock();
614594

615-
var channel = CreateChannel<object[]>(-1, singleReader);
595+
var channel = CreateChannel<object?[]>(-1, singleReader);
616596
_ = ToChannelAsync(command, channel.Writer, true, arrayPool);
617597
return channel.Reader;
618598
}

Channel/CreateChannel.cs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
using System;
2+
using System.Threading.Channels;
3+
4+
namespace Open.Database.Extensions
5+
{
6+
public static partial class ChannelDbExtensions
7+
{
8+
internal static Channel<T> CreateChannel<T>(int capacity = -1, bool singleReader = false, bool singleWriter = true)
9+
{
10+
if (capacity == 0) throw new ArgumentOutOfRangeException(nameof(capacity), capacity, "Cannot be zero.");
11+
if (capacity < -1) throw new ArgumentOutOfRangeException(nameof(capacity), capacity, "Must greater than zero or equal to negative one (unbounded).");
12+
13+
return capacity > 0
14+
? Channel.CreateBounded<T>(new BoundedChannelOptions(capacity)
15+
{
16+
SingleWriter = singleWriter,
17+
SingleReader = singleReader,
18+
AllowSynchronousContinuations = true,
19+
FullMode = BoundedChannelFullMode.Wait
20+
})
21+
: Channel.CreateUnbounded<T>(new UnboundedChannelOptions
22+
{
23+
SingleWriter = singleWriter,
24+
SingleReader = singleReader,
25+
AllowSynchronousContinuations = true
26+
});
27+
}
28+
}
29+
}

0 commit comments

Comments
 (0)