Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions src/core/Akka.Streams.Tests/Issue7794Spec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// -----------------------------------------------------------------------
// <copyright file="Issue7794Spec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System.Threading.Channels;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Tests.Implementation;

public class Issue7794Spec: AkkaSpec
{
private ActorMaterializer Materializer { get; }

public Issue7794Spec(ITestOutputHelper helper) : base(helper)
{
Materializer = Sys.Materializer();
}

[Fact(DisplayName = "ChannelSource should not throw NRE when Channel completes")]
public async Task Issue_7794_ChannelSource_NRE()
{
var channel = Channel.CreateBounded<Message<string, string>>(new BoundedChannelOptions(capacity: 100)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = true,
AllowSynchronousContinuations = false
});

var streamRes = ChannelSource.FromReader(channel.Reader)
.Select(e => e)
.RunWith(Sink.Ignore<Message<string, string>>(), Materializer);

_ = Task.Run(async () =>
{
await Task.Delay(100);
channel.Writer.Complete();
});

await streamRes;
}

private class Message<TKey, TValue>
{
public TKey Key { get; set; }
public TValue Value { get; set; }
}
}
17 changes: 12 additions & 5 deletions src/core/Akka.Streams/Stage/GraphStage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -885,11 +885,18 @@ public ConcurrentAsyncCallback(Action<T> handler, GraphStageLogic ownedStage)
_ownedStage = ownedStage;
_wrappedHandler = obj =>
{
if (obj is T e)
handler1(e);
else
throw new ArgumentException(
$"Expected {nameof(obj)} to be of type {typeof(T)}, but was {obj.GetType()}");
switch (obj)
{
// Always assume that T can be null and the handler will handle null values
case null:
handler1(default);
break;
case T e:
handler1(e);
break;
default:
throw new ArgumentException($"Expected {nameof(obj)} to be of type {typeof(T)}, but was {obj.GetType()}");
}
};
}

Expand Down
Loading