Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using FluentAssertions;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Akka.Configuration;
using Xunit;
using static Akka.Actor.FSMBase;
Expand Down Expand Up @@ -445,6 +446,17 @@ public void PersistentFSM_must_save_periodical_snapshots_if_enablesnapshotafter(
Shutdown(sys2);
}
}

[Fact]
public async Task PersistentFSM_must_pass_latest_statedata_to_AndThen()
{
var actor = Sys.ActorOf(Props.Create(() => new AndThenTestActor()));

var response1 = await actor.Ask<AndThenTestActor.Data>(new AndThenTestActor.Command("Message 1")).ConfigureAwait(true);
var response2 = await actor.Ask<AndThenTestActor.Data>(new AndThenTestActor.Command("Message 2")).ConfigureAwait(true);
Assert.Equal("Message 1", response1.Value);
Assert.Equal("Message 2", response2.Value);
}

internal class WebStoreCustomerFSM : PersistentFSM<IUserState, IShoppingCart, IDomainEvent>
{
Expand Down Expand Up @@ -1016,4 +1028,92 @@ public static Props Props(IActorRef probe)
return Actor.Props.Create(() => new SnapshotFSM(probe));
}
}
#region AndThen receiving latest data

public class AndThenTestActor : PersistentFSM<AndThenTestActor.IState, AndThenTestActor.Data, AndThenTestActor.IEvent>
{
public override string PersistenceId => "PersistentFSMSpec.AndThenTestActor";

public AndThenTestActor()
{
StartWith(Init.Instance, new Data());
When(Init.Instance, (evt, state) =>
{
switch (evt.FsmEvent)
{
case Command cmd:
return Stay()
.Applying(new CommandReceived(cmd.Value))
.AndThen(data =>
{
// NOTE At this point, I'd expect data to be the value returned by Apply
Sender.Tell(data, Self);
});
default:
return Stay();
}
});
}

protected override Data ApplyEvent(IEvent domainEvent, Data currentData)
{
switch (domainEvent)
{
case CommandReceived cmd:
return new Data(cmd.Value);
default:
return currentData;
}
}


public interface IState : PersistentFSM.IFsmState
{
}

public class Init : IState
{
public static readonly Init Instance = new Init();
public string Identifier => "Init";
}

public class Data
{
public Data()
{
}

public Data(string value)
{
Value = value;
}

public string Value { get; }
}

public interface IEvent
{
}

public class CommandReceived : IEvent
{
public CommandReceived(string value)
{
Value = value;
}

public string Value { get; }
}

public class Command
{
public Command(string value)
{
Value = value;
}

public string Value { get; }
}
}
#endregion
}
3 changes: 2 additions & 1 deletion src/core/Akka.Persistence/Fsm/PersistentFSM.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ void ApplyStateOnLastHandler()
handlersExecutedCounter++;
if (handlersExecutedCounter == eventsToPersist.Count)
{
base.ApplyState(nextState.Using(nextData));
nextState = nextState.Using(nextData);
Copy link
Member

@ismaelhamed ismaelhamed Nov 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a change in behavior, since nextState is actually set in the MakeTransition method? Looking at the JVM, what we're missing is:

base.ApplyState(nextState.Copy(stateData: nextData));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must be missing something. Here's what's happening now in ApplyStateOnLastHandler which is called after the persistence is completed.

void ApplyStateOnLastHandler()
{
handlersExecutedCounter++;
if (handlersExecutedCounter == eventsToPersist.Count)
{
base.ApplyState(nextState.Using(nextData));
CurrentStateTimeout = nextState.Timeout;
nextState.AfterTransitionDo?.Invoke(nextState.StateData);
if (doSnapshot)
{
Log.Info($"Saving snapshot, sequence number [{SnapshotSequenceNr}]");
SaveStateSnapshot();
}
}
}

With my comments:

 void ApplyStateOnLastHandler()
{
    handlersExecutedCounter++;
    if (handlersExecutedCounter == eventsToPersist.Count)
    {
        // Using creates a COPY of nextState with stateData set to nextData
        // nextState still refers to the OLD version WITHOUT the NEW nextData being set
        base.ApplyState(nextState.Using(nextData));

        // uses OLD nextState, but that doesn't matter, as Timeout hasn't changed        
        CurrentStateTimeout = nextState.Timeout;

        // HERE's the issue and the actual bug
        // The "AfterTransitionDo" callback (ie, AndThen) - if present - is
        // called with the StateData Property of the OLD, UNCHANGED state.
        // That is, this StateData is always lagging one version behind.        
        nextState.AfterTransitionDo?.Invoke(nextState.StateData);

        // the rest is not relevant
        if (doSnapshot)
        {
            Log.Info($"Saving snapshot, sequence number [{SnapshotSequenceNr}]");
            SaveStateSnapshot();
        }
    }
}

This issue was not discovered using the current set of unit tests, but it is very easily to show (eg. in the shopping cart example) that the current version is broken.

Please try it for yourself. Here's a snippet from the specs:

When(LookingAround.Instance, (evt, state) =>
{
    // NB: When in this state, the cart is empty
    if (evt.FsmEvent is AddItem addItem)
    {
        // visitor adds an item to the EMPTY cart
        return GoTo(Shopping.Instance)
            .Applying(new ItemAdded(addItem.Item))
            .ForMax(TimeSpan.FromSeconds(1))
            .AndThen(cart => /* what to you expect cart to be? Empty or non-empty? */);
    }
    [...]
});

The issue is at the "/* what to you expect cart to be? Empty or non-empty? */" part. You'd expect the cart to be non-empty, but it isn't. It's still the OLD, EMPTY version.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I checked, and line #136 needs to be fixed too:

nextState.AfterTransitionDo?.Invoke(StateData);

With these two changes, it works as expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should actually work without. To me, it's either-or. My PR is one solution. The other one is nextState.AfterTransitionDo?.Invoke(nextData); which fixes the issue, too, but for my personal taste is the less clean approach. Ymmv.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, the jvm version uses the second approach. I'll stick to that and will update the PR asap.

base.ApplyState(nextState);
CurrentStateTimeout = nextState.Timeout;
nextState.AfterTransitionDo?.Invoke(nextState.StateData);
if (doSnapshot)
Expand Down