diff --git a/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbConfiguration.cs b/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbConfiguration.cs index 58a3929e49..81e01eb258 100644 --- a/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbConfiguration.cs +++ b/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbConfiguration.cs @@ -1,4 +1,4 @@ -using System; +using System; using Amazon; using Amazon.Runtime; diff --git a/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs b/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs index 4c0c4f8af8..f60e18f7ae 100644 --- a/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs +++ b/src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs @@ -271,7 +271,7 @@ await _context.SaveAsync( message, _dynamoOverwriteTableConfig, cancellationToken); - } + } public async Task MarkDispatchedAsync(IEnumerable ids, DateTime? dispatchedAt = null, Dictionary args = null, CancellationToken cancellationToken = default) @@ -296,13 +296,16 @@ public void MarkDispatched(Guid id, DateTime? dispatchedAt = null, Dictionary @@ -386,7 +389,7 @@ private Task AddToTransactionWrite(MessageItem messag private async Task GetMessage(Guid id, CancellationToken cancellationToken = default) { - MessageItem messageItem = await _context.LoadAsync(id.ToString(), _dynamoOverwriteTableConfig, cancellationToken); + var messageItem = await _context.LoadAsync(id.ToString(), _dynamoOverwriteTableConfig, cancellationToken); return messageItem?.ConvertToMessage() ?? new Message(); } @@ -460,9 +463,9 @@ private async Task> OutstandingMessagesForAllTopicsAsync(do private async Task> OutstandingMessagesForTopicAsync(double millisecondsDispatchedSince, string topicName, CancellationToken cancellationToken) { - var olrderThan = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince)); + var olderThan = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince)); - var messages = (await QueryAllOutstandingShardsAsync(topicName, olrderThan, cancellationToken)).ToList(); + var messages = (await QueryAllOutstandingShardsAsync(topicName, olderThan, cancellationToken)).ToList(); return messages.Select(msg => msg.ConvertToMessage()); } @@ -485,14 +488,10 @@ private async Task> QueryAllOutstandingShardsAsync(stri for (int shard = 0; shard < _configuration.NumberOfShards; shard++) { - // We get all the messages for topic, added within a time range - // There should be few enough of those that we can efficiently filter for those - // that don't have a delivery date. var queryConfig = new QueryOperationConfig { IndexName = _configuration.OutstandingIndexName, - KeyExpression = new KeyTopicCreatedTimeExpression().Generate(topic, minimumAge, shard), - FilterExpression = new NoDispatchTimeExpression().Generate(), + KeyExpression = new KeyTopicOutstandingCreatedTimeExpression().Generate(topic, minimumAge, shard), ConsistentRead = false }; diff --git a/src/Paramore.Brighter.Outbox.DynamoDB/KeyTopicCreatedTimeExpression.cs b/src/Paramore.Brighter.Outbox.DynamoDB/KeyTopicOutstandingCreatedTimeExpression.cs similarity index 70% rename from src/Paramore.Brighter.Outbox.DynamoDB/KeyTopicCreatedTimeExpression.cs rename to src/Paramore.Brighter.Outbox.DynamoDB/KeyTopicOutstandingCreatedTimeExpression.cs index 88d65d10b9..69a7cb4fce 100644 --- a/src/Paramore.Brighter.Outbox.DynamoDB/KeyTopicCreatedTimeExpression.cs +++ b/src/Paramore.Brighter.Outbox.DynamoDB/KeyTopicOutstandingCreatedTimeExpression.cs @@ -1,16 +1,16 @@ -using System; +using System; using System.Collections.Generic; using Amazon.DynamoDBv2.DocumentModel; namespace Paramore.Brighter.Outbox.DynamoDB { - internal class KeyTopicCreatedTimeExpression + internal class KeyTopicOutstandingCreatedTimeExpression { private readonly Expression _expression; - public KeyTopicCreatedTimeExpression() + public KeyTopicOutstandingCreatedTimeExpression() { - _expression = new Expression { ExpressionStatement = "TopicShard = :v_TopicShard and CreatedTime < :v_CreatedTime" }; + _expression = new Expression { ExpressionStatement = "TopicShard = :v_TopicShard and OutstandingCreatedTime < :v_OutstandingCreatedTime" }; } public override string ToString() @@ -22,7 +22,7 @@ public Expression Generate(string topicName, DateTime createdTime, int shard) { var values = new Dictionary(); values.Add(":v_TopicShard", $"{topicName}_{shard}"); - values.Add(":v_CreatedTime", createdTime.Ticks); + values.Add(":v_OutstandingCreatedTime", createdTime.Ticks); _expression.ExpressionAttributeValues = values; diff --git a/src/Paramore.Brighter.Outbox.DynamoDB/MessageItem.cs b/src/Paramore.Brighter.Outbox.DynamoDB/MessageItem.cs index 32afa80fcf..c5e14d8715 100644 --- a/src/Paramore.Brighter.Outbox.DynamoDB/MessageItem.cs +++ b/src/Paramore.Brighter.Outbox.DynamoDB/MessageItem.cs @@ -41,10 +41,16 @@ public class MessageItem /// /// The time at which the message was created, in ticks /// - [DynamoDBGlobalSecondaryIndexRangeKey(indexName: "Outstanding")] [DynamoDBProperty] public long CreatedTime { get; set; } + /// + /// The time at which the message was created, in ticks. Null if the message has been dispatched. + /// + [DynamoDBGlobalSecondaryIndexRangeKey(indexName: "Outstanding")] + [DynamoDBProperty] + public long? OutstandingCreatedTime { get; set; } + /// /// The time at which the message was delivered, formatted as a string yyyy-MM-dd /// @@ -122,6 +128,7 @@ public MessageItem(Message message, int shard = 0, long? expiresAt = null) CharacterEncoding = message.Body.CharacterEncoding.ToString(); CreatedAt = $"{date}"; CreatedTime = date.Ticks; + OutstandingCreatedTime = date.Ticks; DeliveryTime = 0; HeaderBag = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options); MessageId = message.Id.ToString(); @@ -163,12 +170,6 @@ public Message ConvertToMessage() return new Message(header, body); } - - public void MarkMessageDelivered(DateTime deliveredAt) - { - DeliveryTime = deliveredAt.Ticks; - DeliveredAt = $"{deliveredAt:yyyy-MM-dd}"; - } } public class MessageItemBodyConverter : IPropertyConverter @@ -176,29 +177,30 @@ public class MessageItemBodyConverter : IPropertyConverter public DynamoDBEntry ToEntry(object value) { byte[] body = value as byte[]; - if (body == null) throw new ArgumentOutOfRangeException("Expected the body to be a byte array"); + if (body == null) + throw new ArgumentOutOfRangeException("Expected the body to be a byte array"); DynamoDBEntry entry = new Primitive { Value = body, Type = DynamoDBEntryType.Binary - + }; - + return entry; } public object FromEntry(DynamoDBEntry entry) { byte[] data = Array.Empty(); - Primitive primitive = entry as Primitive; + Primitive primitive = entry as Primitive; if (primitive?.Value is byte[] bytes) data = bytes; if (primitive?.Value is string text) //for historical data that used UTF-8 strings data = Encoding.UTF8.GetBytes(text); if (primitive == null || !(primitive.Value is string || primitive.Value is byte[])) throw new ArgumentOutOfRangeException("Expected Dynamo to have stored a byte array"); - + return data; } } diff --git a/tests/Paramore.Brighter.DynamoDB.Tests/Outbox/When_there_are_outstanding_messages_in_the_outbox.cs b/tests/Paramore.Brighter.DynamoDB.Tests/Outbox/When_there_are_outstanding_messages_in_the_outbox.cs index d8bc3e0c9e..56bc03c4bf 100644 --- a/tests/Paramore.Brighter.DynamoDB.Tests/Outbox/When_there_are_outstanding_messages_in_the_outbox.cs +++ b/tests/Paramore.Brighter.DynamoDB.Tests/Outbox/When_there_are_outstanding_messages_in_the_outbox.cs @@ -107,6 +107,54 @@ public async Task When_there_are_outstanding_messages_for_multiple_topics() } } + [Fact] + public async Task When_an_outstanding_message_is_dispatched_async() + { + await _dynamoDbOutbox.AddAsync(_message); + + await Task.Delay(1000); + + var args = new Dictionary { { "Topic", "test_topic" } }; + + var messages = await _dynamoDbOutbox.OutstandingMessagesAsync(0, 100, 1, args); + + //Other tests may leave messages, so make sure that we grab ours + var message = messages.Single(m => m.Id == _message.Id); + message.Should().NotBeNull(); + + await _dynamoDbOutbox.MarkDispatchedAsync(_message.Id); + + // Give the GSI a second to catch up + await Task.Delay(1000); + + messages = await _dynamoDbOutbox.OutstandingMessagesAsync(0, 100, 1, args); + messages.All(m => m.Id != _message.Id); + } + + [Fact] + public async Task When_an_outstanding_message_is_dispatched() + { + _dynamoDbOutbox.Add(_message); + + await Task.Delay(1000); + + var args = new Dictionary { { "Topic", "test_topic" } }; + + var messages = _dynamoDbOutbox.OutstandingMessages(0, 100, 1, args); + + //Other tests may leave messages, so make sure that we grab ours + var message = messages.Single(m => m.Id == _message.Id); + message.Should().NotBeNull(); + + _dynamoDbOutbox.MarkDispatched(_message.Id); + + // Give the GSI a second to catch up + await Task.Delay(1000); + + messages = _dynamoDbOutbox.OutstandingMessages(0, 100, 1, args); + messages.All(m => m.Id != _message.Id); + } + private Message CreateMessage(string topic) { return new Message(