From 9475326fbeba1fa2c61b32604f0d6e7667d3527c Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 25 Feb 2020 18:11:37 -0600 Subject: [PATCH 1/2] close #3879 - disable buffer pooling in DotNetty transport (#4252) --- src/core/Akka.Remote/Configuration/Remote.conf | 5 ----- src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs | 4 ++-- src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs | 2 -- 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/src/core/Akka.Remote/Configuration/Remote.conf b/src/core/Akka.Remote/Configuration/Remote.conf index 08c1fc87bc0..e5d0c7622b7 100644 --- a/src/core/Akka.Remote/Configuration/Remote.conf +++ b/src/core/Akka.Remote/Configuration/Remote.conf @@ -432,11 +432,6 @@ akka { # i.e. how long a connect may take until it is timed out connection-timeout = 15 s - # Toggles buffer pooling on and off inside DotNetty. - # Only intended to be a work-around for users who are still running on DotNetty v0.4.6-v0.4.7 - # for the following bug: https://github.com/akkadotnet/akka.net/issues/3370 - enable-pooling = true - # If set to "" then the specified dispatcher # will be used to accept inbound connections, and perform IO. If "" then # dedicated threads will be used. diff --git a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs index 129a5e675ad..eb2e6b7fef2 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs @@ -275,7 +275,7 @@ protected Bootstrap ClientFactory(Address remoteAddress) .Option(ChannelOption.TcpNodelay, Settings.TcpNoDelay) .Option(ChannelOption.ConnectTimeout, Settings.ConnectTimeout) .Option(ChannelOption.AutoRead, false) - .Option(ChannelOption.Allocator, Settings.EnableBufferPooling ? (IByteBufferAllocator)PooledByteBufferAllocator.Default : UnpooledByteBufferAllocator.Default) + .Option(ChannelOption.Allocator, UnpooledByteBufferAllocator.Default) .ChannelFactory(() => Settings.EnforceIpFamily ? new TcpSocketChannel(addressFamily) : new TcpSocketChannel()) @@ -384,7 +384,7 @@ private ServerBootstrap ServerFactory() .Option(ChannelOption.TcpNodelay, Settings.TcpNoDelay) .Option(ChannelOption.AutoRead, false) .Option(ChannelOption.SoBacklog, Settings.Backlog) - .Option(ChannelOption.Allocator, Settings.EnableBufferPooling ? (IByteBufferAllocator)PooledByteBufferAllocator.Default : UnpooledByteBufferAllocator.Default) + .Option(ChannelOption.Allocator, UnpooledByteBufferAllocator.Default) .ChannelFactory(() => Settings.EnforceIpFamily ? new TcpServerSocketChannel(addressFamily) : new TcpServerSocketChannel()) diff --git a/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs b/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs index a8dce870dd7..792ea620a61 100644 --- a/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs +++ b/src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs @@ -180,8 +180,6 @@ public override bool Write(ByteString payload) private IByteBuffer ToByteBuffer(ByteString payload) { - //TODO: optimize DotNetty byte buffer usage - // (maybe custom IByteBuffer working directly on ByteString?) var buffer = Unpooled.WrappedBuffer(payload.ToByteArray()); return buffer; } From c8cc24f456c02c3c7f837b53f3c824f528e3cd3f Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 25 Feb 2020 15:14:32 -0600 Subject: [PATCH 2/2] close #4246 - no throw inside EndpointReader (#4249) Rewrote To no longer use ConcurrentDictionary methods that can throw --- src/core/Akka.Remote/Endpoint.cs | 42 +++++++++++++++++--------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/src/core/Akka.Remote/Endpoint.cs b/src/core/Akka.Remote/Endpoint.cs index 720b10ee4b5..5d52092ae3e 100644 --- a/src/core/Akka.Remote/Endpoint.cs +++ b/src/core/Akka.Remote/Endpoint.cs @@ -1981,31 +1981,33 @@ private void NotReading() private void SaveState() { - var key = new EndpointManager.Link(LocalAddress, RemoteAddress); - _receiveBuffers.TryGetValue(key, out var previousValue); - UpdateSavedState(key, previousValue); - } - - private EndpointManager.ResendState Merge(EndpointManager.ResendState current, - EndpointManager.ResendState oldState) - { - if (current.Uid == oldState.Uid) return new EndpointManager.ResendState(_uid, oldState.Buffer.MergeFrom(current.Buffer)); - return current; - } + EndpointManager.ResendState Merge(EndpointManager.ResendState current, + EndpointManager.ResendState oldState) + { + if (current.Uid == oldState.Uid) return new EndpointManager.ResendState(_uid, oldState.Buffer.MergeFrom(current.Buffer)); + return current; + } - private void UpdateSavedState(EndpointManager.Link key, EndpointManager.ResendState expectedState) - { - if (expectedState == null) + void UpdateSavedState(EndpointManager.Link key, EndpointManager.ResendState expectedState) { - if (!_receiveBuffers.TryAdd(key, new EndpointManager.ResendState(_uid, _ackedReceiveBuffer))) + if (expectedState == null) { - UpdateSavedState(key, _receiveBuffers[key]); + if (!_receiveBuffers.TryAdd(key, new EndpointManager.ResendState(_uid, _ackedReceiveBuffer))) + { + _receiveBuffers.TryGetValue(key, out var prevValue); + UpdateSavedState(key, prevValue); + } + } + else if (!_receiveBuffers.TryUpdate(key, + Merge(new EndpointManager.ResendState(_uid, _ackedReceiveBuffer), expectedState), expectedState)) + { + _receiveBuffers.TryGetValue(key, out var prevValue); + UpdateSavedState(key, prevValue); } - } else if (!_receiveBuffers.TryUpdate(key, - Merge(new EndpointManager.ResendState(_uid, _ackedReceiveBuffer), expectedState), expectedState)) - { - UpdateSavedState(key, _receiveBuffers[key]); } + + var k = new EndpointManager.Link(LocalAddress, RemoteAddress); + UpdateSavedState(k, !_receiveBuffers.TryGetValue(k, out var previousValue) ? null : previousValue); } private void HandleDisassociated(DisassociateInfo info)