-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathSqlServerProcessManagersStore.cs
69 lines (61 loc) · 2.75 KB
/
SqlServerProcessManagersStore.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
using Dapper;
using KafkaFlow.SqlServer;
using Microsoft.Extensions.Options;
using System.Data.SqlClient;
namespace KafkaFlow.ProcessManagers.SqlServer;
public sealed class SqlServerProcessStateRepository(IOptions<SqlServerBackendOptions> options) : IProcessStateRepository
{
private readonly SqlServerBackendOptions _options = options?.Value ?? throw new ArgumentNullException(nameof(options));
public async ValueTask<int> Persist(Type processType, string processState, Guid processId, VersionedState state)
{
var sql = """
MERGE INTO [process_managers].[processes] as [target]
USING (VALUES (@process_type, @process_id,@process_state)) AS [source] ([process_type], [process_id], [process_state])
ON [target].[process_type] = @process_type AND [target].[process_id] = @process_id
WHEN MATCHED AND [target].[rowversion] = @Version THEN
UPDATE SET
[process_state] = @process_state,
[date_updated_utc] = SYSUTCDATETIME(),
[rowversion] = [target].[rowversion] + 1
WHEN NOT MATCHED THEN
INSERT ([process_type], [process_id], [process_state])
VALUES (@process_type, @process_id,@process_state);
""";
using var conn = new SqlConnection(_options.ConnectionString);
return await conn.ExecuteAsync(sql, new
{
process_type = processType.FullName,
process_id = processId,
process_state = processState,
version = state.Version
}).ConfigureAwait(false);
}
public async ValueTask<IEnumerable<ProcessStateTableRow>> Load(Type processType, Guid processId)
{
var sql = """
SELECT [process_state] as [ProcessState], [rowversion] as [Version]
FROM [process_managers].[processes]
WHERE [process_type] = @process_type AND [process_id] = @process_id;
""";
using var conn = new SqlConnection(_options.ConnectionString);
return await conn.QueryAsync<ProcessStateTableRow>(sql, new
{
process_type = processType.FullName,
process_id = processId
}).ConfigureAwait(false);
}
public async ValueTask<int> Delete(Type processType, Guid processId, int version)
{
var sql = """
DELETE FROM [process_managers].[processes]
WHERE [process_type] = @process_type AND [process_id] = @process_id and [rowversion] = @version;
""";
using var conn = new SqlConnection(_options.ConnectionString);
return await conn.ExecuteAsync(sql, new
{
process_type = processType.FullName,
process_id = processId,
version
}).ConfigureAwait(false);
}
}