Skip to content

Commit 7ab1fc7

Browse files
author
electricessence
committed
Expanded transaction capability.
1 parent 897d363 commit 7ab1fc7

12 files changed

Lines changed: 1924 additions & 1307 deletions

ExpressiveCommandBase.cs

Lines changed: 631 additions & 571 deletions
Large diffs are not rendered by default.

ExpressiveDbCommand.cs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,35 @@ public class ExpressiveDbCommand : ExpressiveDbCommandBase<DbConnection, DbComma
1515
/// <param name="type">The command type>.</param>
1616
/// <param name="command">The SQL command.</param>
1717
/// <param name="params">The list of params</param>
18-
public ExpressiveDbCommand(IDbConnectionFactory<DbConnection> connFactory, CommandType type, string command, List<Param> @params = null)
19-
: base(connFactory, type, command, @params)
18+
public ExpressiveDbCommand(IDbConnectionFactory<DbConnection> connFactory, CommandType type, string command, List<Param> @params)
19+
: base(connFactory, type, command, @params?.ToList())
2020
{
2121
}
2222

2323
/// <param name="connFactory">The factory to generate connections from.</param>
2424
/// <param name="type">The command type>.</param>
2525
/// <param name="command">The SQL command.</param>
2626
/// <param name="params">The list of params</param>
27-
protected ExpressiveDbCommand(IDbConnectionFactory<DbConnection> connFactory, CommandType type, string command, params Param[] @params)
28-
: this(connFactory, type, command, @params.ToList())
27+
public ExpressiveDbCommand(IDbConnectionFactory<DbConnection> connFactory, CommandType type, string command, params Param[] @params)
28+
: base(connFactory, type, command, @params?.ToList())
29+
{
30+
}
31+
32+
/// <param name="connection">The connection to execute the command on.</param>
33+
/// <param name="type">The command type>.</param>
34+
/// <param name="command">The SQL command.</param>
35+
/// <param name="params">The list of params</param>
36+
public ExpressiveDbCommand(DbConnection connection, CommandType type, string command, List<Param> @params)
37+
: base(connection, type, command, @params?.ToList())
38+
{
39+
}
40+
41+
/// <param name="connection">The connection to execute the command on.</param>
42+
/// <param name="type">The command type>.</param>
43+
/// <param name="command">The SQL command.</param>
44+
/// <param name="params">The list of params</param>
45+
public ExpressiveDbCommand(DbConnection connection, CommandType type, string command, params Param[] @params)
46+
: this(connection, type, command, @params?.ToList())
2947
{
3048
}
3149

ExpressiveDbCommandBase.cs

Lines changed: 86 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -28,43 +28,51 @@ public abstract class ExpressiveDbCommandBase<TConnection, TCommand, TDbType, TT
2828
/// <param name="type">The command type>.</param>
2929
/// <param name="command">The SQL command.</param>
3030
/// <param name="params">The list of params</param>
31-
public ExpressiveDbCommandBase(
32-
IDbConnectionFactory<TConnection> connFactory,
33-
CommandType type,
34-
string command,
35-
List<Param> @params = null)
36-
: base(connFactory, type, command, @params)
31+
protected ExpressiveDbCommandBase(
32+
IDbConnectionFactory<TConnection> connFactory,
33+
CommandType type,
34+
string command,
35+
List<Param> @params)
36+
: base(connFactory, type, command, @params)
3737
{
3838
}
3939

40-
/// <param name="connFactory">The factory to generate connections from.</param>
41-
/// <param name="type">The command type>.</param>
42-
/// <param name="command">The SQL command.</param>
43-
/// <param name="params">The list of params</param>
44-
protected ExpressiveDbCommandBase(
45-
IDbConnectionFactory<TConnection> connFactory,
46-
CommandType type,
47-
string command,
48-
params Param[] @params)
49-
: this(connFactory, type, command, @params.ToList())
40+
/// <param name="connection">The connection to execute the command on.</param>
41+
/// <param name="type">The command type>.</param>
42+
/// <param name="command">The SQL command.</param>
43+
/// <param name="params">The list of params</param>
44+
protected ExpressiveDbCommandBase(
45+
TConnection connection,
46+
CommandType type,
47+
string command,
48+
List<Param> @params)
49+
: base(connection, type, command, @params)
5050
{
5151
}
5252

53+
5354
/// <summary>
5455
/// Asynchronously executes a reader on a command with a handler function.
5556
/// </summary>
5657
/// <param name="handler">The handler function for each IDataRecord.</param>
5758
public async Task ExecuteAsync(Func<TCommand, Task> handler)
5859
{
59-
using (var con = ConnectionFactory.Create())
60-
using (var cmd = (TCommand)con.CreateCommand(
60+
TConnection con = Connection ?? ConnectionFactory.Create();
61+
try
62+
{
63+
using (var cmd = (TCommand)con.CreateCommand(
6164
Type, Command, Timeout))
65+
{
66+
var c = cmd as TCommand;
67+
if (c == null) throw new InvalidCastException($"Actual command type ({cmd.GetType()}) is not compatible with expected command type ({typeof(TCommand)}).");
68+
AddParams(c);
69+
await con.OpenAsync();
70+
await handler(c);
71+
}
72+
}
73+
finally
6274
{
63-
var c = cmd as TCommand;
64-
if (c == null) throw new InvalidCastException($"Actual command type ({cmd.GetType()}) is not compatible with expected command type ({typeof(TCommand)}).");
65-
AddParams(c);
66-
await con.OpenAsync();
67-
await handler(c);
75+
if (Connection == null) con.Dispose();
6876
}
6977
}
7078

@@ -76,15 +84,21 @@ public async Task ExecuteAsync(Func<TCommand, Task> handler)
7684
/// <returns>The result of the transform.</returns>
7785
public async Task<T> ExecuteAsync<T>(Func<TCommand, Task<T>> transform)
7886
{
79-
using (var con = ConnectionFactory.Create())
80-
using (var cmd = con.CreateCommand(
81-
Type, Command, Timeout))
87+
TConnection con = Connection ?? ConnectionFactory.Create();
88+
try
8289
{
83-
var c = cmd as TCommand;
84-
if (c == null) throw new InvalidCastException($"Actual command type ({cmd.GetType()}) is not compatible with expected command type ({typeof(TCommand)}).");
85-
AddParams(c);
86-
await con.OpenAsync();
87-
return await transform(c);
90+
using (var cmd = con.CreateCommand(Type, Command, Timeout))
91+
{
92+
var c = cmd as TCommand;
93+
if (c == null) throw new InvalidCastException($"Actual command type ({cmd.GetType()}) is not compatible with expected command type ({typeof(TCommand)}).");
94+
AddParams(c);
95+
await con.OpenAsync();
96+
return await transform(c);
97+
}
98+
}
99+
finally
100+
{
101+
if (Connection == null) con.Dispose();
88102
}
89103
}
90104

@@ -94,19 +108,25 @@ public async Task<T> ExecuteAsync<T>(Func<TCommand, Task<T>> transform)
94108
/// <returns>The value from the return parameter.</returns>
95109
public async Task<object> ExecuteReturnAsync()
96110
{
97-
using (var con = ConnectionFactory.Create())
98-
using (var cmd = con.CreateCommand(
99-
Type, Command, Timeout))
111+
TConnection con = Connection ?? ConnectionFactory.Create();
112+
try
100113
{
101-
var c = cmd as TCommand;
102-
if (c == null) throw new InvalidCastException($"Actual command type ({cmd.GetType()}) is not compatible with expected command type ({typeof(TCommand)}).");
103-
AddParams(c);
104-
var returnParameter = c.CreateParameter();
105-
returnParameter.Direction = ParameterDirection.ReturnValue;
106-
cmd.Parameters.Add(returnParameter);
107-
await con.OpenAsync();
108-
await c.ExecuteNonQueryAsync();
109-
return returnParameter.Value;
114+
using (var cmd = con.CreateCommand(Type, Command, Timeout))
115+
{
116+
var c = cmd as TCommand;
117+
if (c == null) throw new InvalidCastException($"Actual command type ({cmd.GetType()}) is not compatible with expected command type ({typeof(TCommand)}).");
118+
AddParams(c);
119+
var returnParameter = c.CreateParameter();
120+
returnParameter.Direction = ParameterDirection.ReturnValue;
121+
cmd.Parameters.Add(returnParameter);
122+
await con.OpenAsync();
123+
await c.ExecuteNonQueryAsync();
124+
return returnParameter.Value;
125+
}
126+
}
127+
finally
128+
{
129+
if (Connection == null) con.Dispose();
110130
}
111131
}
112132

@@ -222,29 +242,29 @@ public Task<List<T>> TakeAsync<T>(Func<IDataRecord, T> transform, int count)
222242
}
223243

224244

225-
/// <summary>
226-
/// Reads the first column from every record and returns the results as a list..
227-
/// DBNull values are converted to null.
228-
/// </summary>
229-
/// <returns>The list of transformed records.</returns>
230-
public Task<IEnumerable<object>> FirstOrdinalResultsAsync()
231-
=> ExecuteAsync(command => command.FirstOrdinalResultsAsync());
232-
233-
/// <summary>
234-
/// Reads the first column from every record..
235-
/// DBNull values are converted to null.
236-
/// </summary>
237-
/// <returns>The enumerable of casted values.</returns>
238-
public Task<IEnumerable<T0>> FirstOrdinalResultsAsync<T0>()
239-
=> ExecuteAsync(command => command.FirstOrdinalResultsAsync<T0>());
240-
241-
/// <summary>
242-
/// Asynchronously iterates all records within the current result set using an IDataReader and returns the desired results.
243-
/// </summary>
244-
/// <param name="n">The first ordinal to include in the request to the reader for each record.</param>
245-
/// <param name="others">The remaining ordinals to request from the reader for each record.</param>
246-
/// <returns>The QueryResult that contains all the results and the column mappings.</returns>
247-
public Task<QueryResult<Queue<object[]>>> RetrieveAsync(int n, params int[] others)
245+
/// <summary>
246+
/// Reads the first column from every record and returns the results as a list..
247+
/// DBNull values are converted to null.
248+
/// </summary>
249+
/// <returns>The list of transformed records.</returns>
250+
public Task<IEnumerable<object>> FirstOrdinalResultsAsync()
251+
=> ExecuteAsync(command => command.FirstOrdinalResultsAsync());
252+
253+
/// <summary>
254+
/// Reads the first column from every record..
255+
/// DBNull values are converted to null.
256+
/// </summary>
257+
/// <returns>The enumerable of casted values.</returns>
258+
public Task<IEnumerable<T0>> FirstOrdinalResultsAsync<T0>()
259+
=> ExecuteAsync(command => command.FirstOrdinalResultsAsync<T0>());
260+
261+
/// <summary>
262+
/// Asynchronously iterates all records within the current result set using an IDataReader and returns the desired results.
263+
/// </summary>
264+
/// <param name="n">The first ordinal to include in the request to the reader for each record.</param>
265+
/// <param name="others">The remaining ordinals to request from the reader for each record.</param>
266+
/// <returns>The QueryResult that contains all the results and the column mappings.</returns>
267+
public Task<QueryResult<Queue<object[]>>> RetrieveAsync(int n, params int[] others)
248268
=> RetrieveAsync(new int[1] { n }.Concat(others));
249269

250270
/// <summary>

Extensions.Dataflow.cs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
using System;
2+
using System.Data;
3+
using System.Data.Common;
4+
using System.Threading.Tasks;
5+
using System.Threading.Tasks.Dataflow;
6+
7+
namespace Open.Database.Extensions
8+
{
9+
public static partial class Extensions
10+
{
11+
internal static bool IsStillAlive<T>(this ITargetBlock<T> task)
12+
{
13+
return IsStillAlive(task.Completion);
14+
}
15+
16+
/// <summary>
17+
/// Iterates an IDataReader through the transform function and posts each record to the target block.
18+
/// </summary>
19+
/// <typeparam name="T">The return type of the transform function.</typeparam>
20+
/// <param name="reader">The IDataReader to iterate.</param>
21+
/// <param name="transform">The transform function for each IDataRecord.</param>
22+
/// <param name="target">The target block to receivethe results.</param>
23+
public static void ToTargetBlock<T>(this IDataReader reader,
24+
ITargetBlock<T> target,
25+
Func<IDataRecord, T> transform)
26+
{
27+
while (target.IsStillAlive() && reader.Read() && target.Post(transform(reader))) { }
28+
}
29+
30+
/// <summary>
31+
/// Asynchronously iterates an IDataReader and through the transform function and posts each record it to the target block.
32+
/// </summary>
33+
/// <typeparam name="T">The return type of the transform function.</typeparam>
34+
/// <param name="reader">The SqlDataReader to read from.</param>
35+
/// <param name="target">The target block to receive the results.</param>
36+
/// <param name="transform">The transform function to process each IDataRecord.</param>
37+
public static async Task ToTargetBlockAsync<T>(this DbDataReader reader,
38+
ITargetBlock<T> target,
39+
Func<IDataRecord, T> transform)
40+
{
41+
Task<bool> lastSend = null;
42+
while (target.IsStillAlive()
43+
&& await reader.ReadAsync()
44+
&& (lastSend == null || await lastSend))
45+
{
46+
// Allows for a premtive read before waiting for the next send.
47+
lastSend = target.SendAsync(transform(reader));
48+
}
49+
}
50+
51+
/// <summary>
52+
/// Asynchronously iterates an IDataReader and through the transform function and posts each record it to the target block.
53+
/// </summary>
54+
/// <typeparam name="T">The return type of the transform function.</typeparam>
55+
/// <param name="command">The DbCommand to generate a reader from.</param>
56+
/// <param name="target">The target block to receive the results.</param>
57+
/// <param name="transform">The transform function for each IDataRecord.</param>
58+
public static async Task ToTargetBlockAsync<T>(this DbCommand command,
59+
ITargetBlock<T> target,
60+
Func<IDataRecord, T> transform)
61+
{
62+
if (target.IsStillAlive())
63+
{
64+
using (var reader = await command.ExecuteReaderAsync())
65+
{
66+
if (target.IsStillAlive())
67+
await reader.ToTargetBlockAsync(target, transform);
68+
}
69+
}
70+
}
71+
72+
/// <summary>
73+
/// Iterates an IDataReader through the transform function and posts each record to the target block.
74+
/// </summary>
75+
/// <typeparam name="T">The return type of the transform function.</typeparam>
76+
/// <param name="command">The IDataReader to iterate.</param>
77+
/// <param name="target">The target block to receive the results.</param>
78+
/// <param name="transform">The transform function for each IDataRecord.</param>
79+
public static void ToTargetBlock<T>(this IDbCommand command,
80+
ITargetBlock<T> target,
81+
Func<IDataRecord, T> transform)
82+
=> command.ExecuteReader(reader => reader.ToTargetBlock(target, transform));
83+
84+
}
85+
}

0 commit comments

Comments
 (0)