From 8f206e4d88f809336541cf754f5c13379d113d3b Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Sat, 27 Dec 2014 20:08:26 +0200 Subject: [PATCH] more Akka.NET features reflected in F# API, created README file, ICanWatch interface to expose Watch/Unwatch feature --- src/core/Akka.FSharp/Akka.FSharp.fsproj | 1 + src/core/Akka.FSharp/FsApi.fs | 113 ++++++++--- src/core/Akka.FSharp/README.md | 238 ++++++++++++++++++++++++ src/core/Akka/Actor/IActorContext.cs | 10 +- src/core/Akka/Actor/Inbox.Actor.cs | 3 +- src/core/Akka/Actor/Inbox.cs | 32 +++- 6 files changed, 360 insertions(+), 37 deletions(-) create mode 100644 src/core/Akka.FSharp/README.md diff --git a/src/core/Akka.FSharp/Akka.FSharp.fsproj b/src/core/Akka.FSharp/Akka.FSharp.fsproj index f21a6559dd5..236dff2aec6 100644 --- a/src/core/Akka.FSharp/Akka.FSharp.fsproj +++ b/src/core/Akka.FSharp/Akka.FSharp.fsproj @@ -81,6 +81,7 @@ + diff --git a/src/core/Akka.FSharp/FsApi.fs b/src/core/Akka.FSharp/FsApi.fs index 7d85232cf96..a8bf6445c5b 100644 --- a/src/core/Akka.FSharp/FsApi.fs +++ b/src/core/Akka.FSharp/FsApi.fs @@ -36,6 +36,7 @@ type IO<'T> = [] type Actor<'Message> = inherit ActorRefFactory + inherit ICanWatch /// /// Explicitly retrieves next incoming message from the mailbox. @@ -218,21 +219,23 @@ type ActorBuilder() = open Microsoft.FSharp.Quotations open Microsoft.FSharp.Linq.QuotationEvaluation -type FunActor<'Message, 'Returned>(actor : Actor<'Message> -> Cont<'Message, 'Returned>) as self = +type FunActor<'Message, 'Returned>(actor : Actor<'Message> -> Cont<'Message, 'Returned>) as this = inherit Actor() let mutable state = - let self' = self.Self + let self' = this.Self let context = UntypedActor.Context :> IActorContext actor { new Actor<'Message> with member __.Receive() = Input member __.Self = self' member __.Context = context - member __.Sender() = self.Sender() - member __.Unhandled msg = self.Unhandled msg + member __.Sender() = this.Sender() + member __.Unhandled msg = this.Unhandled msg member __.ActorOf(props, name) = context.ActorOf(props, name) member __.ActorSelection(path : string) = context.ActorSelection(path) member __.ActorSelection(path : ActorPath) = context.ActorSelection(path) + member __.Watch(aref:ActorRef) = context.Watch aref + member __.Unwatch(aref:ActorRef) = context.Unwatch aref member __.Log = lazy (Akka.Event.Logging.GetLogger(context)) } new(actor : Expr -> Cont<'Message, 'Returned>>) = FunActor(actor.Compile () ()) @@ -294,10 +297,19 @@ module Serialization = use stream = new System.IO.MemoryStream(bytes) fsp.Deserialize(t, stream) +[] module Configuration = + + /// Parses provided HOCON string into a valid Akka configuration object. let parse = Akka.Configuration.ConfigurationFactory.ParseString + + /// Returns default Akka configuration. let defaultConfig = Akka.Configuration.ConfigurationFactory.Default + /// Loads Akka configuration from the project's .config file. + let load = Akka.Configuration.ConfigurationFactory.Load + +[] module Strategy = /// /// Returns a supervisor strategy appliable only to child actor which faulted during execution. @@ -316,21 +328,21 @@ module Strategy = upcast OneForOneStrategy(Nullable retries, Nullable timeout, System.Func<_, _>(decider)) /// - /// Returns a supervisor strategy appliable only each supervised actor when any of them had faulted during execution. + /// Returns a supervisor strategy appliable to each supervised actor when any of them had faulted during execution. /// /// Used to determine a actor behavior response depending on exception occurred. let allForOne (decider : exn -> Directive) : SupervisorStrategy = upcast AllForOneStrategy(System.Func<_, _>(decider)) /// - /// Returns a supervisor strategy appliable only each supervised actor when any of them had faulted during execution. + /// Returns a supervisor strategy appliable to each supervised actor when any of them had faulted during execution. /// /// Defines a number of times, an actor could be restarted. If it's a negative value, there is not limit. /// Defines time window for number of retries to occur. /// Used to determine a actor behavior response depending on exception occurred. let allForOne2 (retries : int) (timeout : TimeSpan) (decider : exn -> Directive) : SupervisorStrategy = upcast AllForOneStrategy(Nullable retries, Nullable timeout, System.Func<_, _>(decider)) - + module System = /// Creates an actor system with remote deployment serialization enabled. let create (name : string) (config : Configuration.Config) : ActorSystem = @@ -339,20 +351,7 @@ module System = system.Serialization.AddSerializer(serializer) system.Serialization.AddSerializationMap(typeof, serializer) system - -type SpawnParams = - { Deploy : Deploy option - Router : Akka.Routing.RouterConfig option - SupervisorStrategy : SupervisorStrategy option - Dispatcher : string option - Mailbox : string option } - static member empty = - { Deploy = None - Router = None - SupervisorStrategy = None - Dispatcher = None - Mailbox = None } - + type SpawnOption = | Deploy of Deploy | Router of Akka.Routing.RouterConfig @@ -481,6 +480,74 @@ let asyncReceive (i : Inbox) : Async<'Message option> = } /// -/// Orders inbox to watch an actor targeted by provided . +/// Orders a to monitor an actor targeted by provided . +/// When an actor refered by subject dies, a watcher should receive a message. +/// +let monitor (subject: ActorRef) (watcher: ICanWatch) : ActorRef = watcher.Watch subject + +/// +/// Orders a to stop monitoring an actor refered by provided . +/// +let demonitor (subject: ActorRef) (watcher: ICanWatch) : ActorRef = watcher.Unwatch subject + +/// +/// Subscribes an actor reference to target channel of the provided event stream. +/// +let sub (channel: System.Type) (ref: ActorRef) (eventStream: Akka.Event.EventStream) : bool = eventStream.Subscribe(ref, channel) + +/// +/// Unubscribes an actor reference from target channel of the provided event stream. +/// +let unsub (channel: System.Type) (ref: ActorRef) (eventStream: Akka.Event.EventStream) : bool = eventStream.Unsubscribe(ref, channel) + +/// +/// Publishes an event on the provided event stream. Event channel is resolved from event's type. +/// +let pub (event: 'Event) (eventStream: Akka.Event.EventStream) : unit = eventStream.Publish event + +let private taskContinuation (task: System.Threading.Tasks.Task) : unit = + match task.IsFaulted with + | true -> raise task.Exception + | _ -> () + +/// +/// Schedules a function to be invoked repeatedly in the provided time intervals. +/// +/// Initial delay to first function call. +/// Interval. +/// Function called by the scheduler. +/// +let schedule (after: TimeSpan) (every: TimeSpan) (fn: unit -> unit) (scheduler: Scheduler): Async = + let action = Action fn + Async.AwaitTask (scheduler.Schedule(after, every, action).ContinueWith taskContinuation) + +/// +/// Schedules a single function call using specified sheduler. +/// +/// Delay before calling the function. +/// Function called by the scheduler. +/// +let scheduleOnce (after: TimeSpan) (fn: unit -> unit) (scheduler: Scheduler): Async = + let action = Action fn + Async.AwaitTask (scheduler.ScheduleOnce(after, action).ContinueWith taskContinuation) + +/// +/// Schedules a to be sent to the provided in specified time intervals. +/// +/// Initial delay to first function call. +/// Interval. +/// Message to be sent to the receiver by the scheduler. +/// Message receiver. +/// +let scheduleTell (after: TimeSpan) (every: TimeSpan) (message: 'Message) (receiver: ActorRef) (scheduler: Scheduler): Async = + Async.AwaitTask (scheduler.Schedule(after, every, receiver, message).ContinueWith taskContinuation) + +/// +/// Schedules a single send to the provided . /// -let inline watch (actorRef : ActorRef) (i : Inbox) : unit = i.Watch actorRef +/// Delay before sending a message. +/// Message to be sent to the receiver by the scheduler. +/// Message receiver. +/// +let scheduleTellOnce (after: TimeSpan) (message: 'Message) (receiver: ActorRef) (scheduler: Scheduler): Async = + Async.AwaitTask (scheduler.ScheduleOnce(after, receiver, message).ContinueWith taskContinuation) diff --git a/src/core/Akka.FSharp/README.md b/src/core/Akka.FSharp/README.md new file mode 100644 index 00000000000..0a3213a1bd1 --- /dev/null +++ b/src/core/Akka.FSharp/README.md @@ -0,0 +1,238 @@ +# Akka.NET F# API + +### Actor system and configuration + +Unlike default (C#) actor system, F#-aware systems should be created using `Akka.FSharp.System.create` function. This function differs from it's C# equivalent by providing additional F#-specific features - i.e. serializers allowing to serialize F# quotations for remote deployment process. + +Example: + + open Akka.FSharp + use system = System.create "my-system" (Configuration.load()) + +F# also gives you it's own actor system Configuration module with support of following functions: + +- `defaultConfig() : Config` - returns default F# Akka configuration. +- `parse(hoconString : string) : Config` - parses a provided Akka configuration string. +- `load() : Config` - loads an Akka configuration found inside current project's *.config* file. + +### Creating actors with `actor` computation expression + +Unlike C# actors, which represent object oriented nature of the language, F# is able to define an actor's logic in more functional way. It is done by using `actor` computation expression. In most of the cases, an expression inside `actor` is expected to be represented as self-invoking recursive function - also invoking an other functions while maintaining recursive cycle is allowed, i.e. to change actor's behavior or even to create more advanced constructs like Finite State Machines. + +It's important to remember, that each actor returning point should point to the next recursive function call - any other value returned will result in stopping current actor (see: [Actor Lifecycle](http://akkadotnet.github.io/wiki/Actor%20lifecycle)). + +Example: + + let aref = + spawn system "my-actor" + (fun mailbox -> + let rec loop() = actor { + let! message = mailbox.Receive() + // handle an incoming message + return! loop() + } + loop()) + +Since construct used in an example above is quite popular, you may also use following shorthand functions to define message handler's behavior: + +- `actorOf (fn : 'Message -> unit) (mailbox : Actor<'Message>) : Cont<'Message, 'Returned>` - uses a function, which takes a message as the only parameter. Mailbox parameter is injected by spawning functions. +- `actorOf2 (fn : Actor<'Message> -> 'Message -> unit) (mailbox : Actor<'Message>) : Cont<'Message, 'Returned>` - uses a function, which takes both the message and an Actor instance as the parameters. Mailbox parameter is injected by spawning functions. + +Example: + + let handleMessage (mailbox: Actor<'a>) msg = + match msg with + | Some x -> printf "%A" x + | None -> () + + let aref = spawn system "my-actor" (actorOf2 handleMessage) + let blackHole = spawn system "black-hole" (actorOf (fun msg -> ())) + +#### Spawning actors + +Paragraph above already has shown, how actors may be created with help of the spawning function. There are several spawning function, which may be used to instantiate actors: + +- `spawn (actorFactory : ActorRefFactory) (name : string) (f : Actor<'Message> -> Cont<'Message, 'Returned>) : ActorRef` - spawns an actor using specified actor computation expression. The actor can only be used locally. +- `spawnOpt (actorFactory : ActorRefFactory) (name : string) (f : Actor<'Message> -> Cont<'Message, 'Returned>) (options : SpawnOption list) : ActorRef` - spawns an actor using specified actor computation expression, with custom spawn option settings. The actor can only be used locally. +- `spawne (actorFactory : ActorRefFactory) (name : string) (expr : Expr -> Cont<'Message, 'Returned>>) (options : SpawnOption list) : ActorRef` - spawns an actor using specified actor computation expression, using an Expression AST. The actor code can be deployed remotely. + +All of these functions may be used with either actor system or actor itself. In the first case spawned actor will be placed under */user* root guardian of the current actor system hierarchy. In second option spawned actor will become child of the actor used as [actorFactory] parameter of the spawning function. + +### Actor spawning options + +To be able to specifiy more precise actor creation behavior, you may use `spawnOpt` and `spawne` methods, both taking a list of `SpawnOption` values. Each specific option should be present only once in the collection. When a conflict occurs (more than one option of specified type has been found), the latest value found inside the list will be chosen. + +- `SpawnOption.Deploy(Akka.Actor.Deploy)` - defines deployment strategy for created actors (see: Deploy). This option may be used along with `spawne` function to enable remote actors deployment. +- `SpawnOption.Router(Akka.Routing.RouterConfig)` - defines an actor to be a router as well as it's routing specifics (see: [Routing](http://akkadotnet.github.io/wiki/Routing)). +- `SpawnOption.SupervisiorStrategy(Akka.Actor.SupervisiorStrategy)` - defines a supervisor strategy of the current actor. It will affect it's children (see: [Supervision](http://akkadotnet.github.io/wiki/Supervision)). +- `SpawnOption.Dispatcher(string)` - defines a type of the dispatcher used for resources management for the created actors. (See: [Dispatchers](http://akkadotnet.github.io/wiki/Dispatchers)) +- `SpawnOption.Mailbox(string)` - defines a type of the mailbox used for the created actors. (See: [Mailboxes](http://akkadotnet.github.io/wiki/Mailbox)) + +Example (deploy actor remotely): + + open Akka.Actor + let remoteRef = + spawne system "my-actor" <@ actorOf myFunc @> + [SpawnOption.Deploy (Deploy(RemoteScope(Address.Parse "akka.tcp://remote-system@127.0.0.1:9000/")))] + +### Ask and tell operators + +While you may use traditional `ActorRef.Tell` and `ActorRef.Ask` methods, it's more convenient to use dedicated ` printfn "%A said %s" (mailbox.Self.Path) m)) + aref bool) (i : Inbox) : 'Message option` - receives a next message sent to the inbox, which satisfies provided predicate. This is a blocking operation. Returns `None` if timeout occurred or message is incompatible with expected response type. +- `asyncReceive (i : Inbox) : Async<'Message option>` - Awaits in async block for a next message sent to the inbox. Returns `None` if message is incompatible with expected response type. + +Inboxes may be configured to accept a limited number of incoming messages (default is 1000): + + akka { + actor { + inbox { + inbox-size = 30 + } + } + } + +### Monitoring + +Actors and Inboxes may be used to monitor lifetime of other actors. This is done by `monitor`/`demonitor` functions: + +- `monitor (subject: ActorRef) (watcher: ICanWatch) : ActorRef` - starts monitoring a referred actor. +- `demonitor (subject: ActorRef) (watcher: ICanWatch) : ActorRef` - stops monitoring of the previously monitored actor. + +Monitored actors will automatically send a `Terminated` message to their watchers when they die. + +### Actor supervisor strategies + +Actors have a place in their system's hierarchy trees. To manage failures done by the child actors, their parents/supervisors may decide to use specific supervisor strategies (see: [Supervision](http://akkadotnet.github.io/wiki/Supervision)) in order to react to the specific types of errors. In F# this may be configured using functions of the `Strategy` module: + +- `Strategy.oneForOne (decider : exn -> Directive) : SupervisorStrategy` - returns a supervisor strategy applicable only to child actor which faulted during execution. +- `Strategy.oneForOne2 (retries : int) (timeout : TimeSpan) (decider : exn -> Directive) : SupervisorStrategy` - returns a supervisor strategy applicable only to child actor which faulted during execution. [retries] param defines a number of times, an actor could be restarted. If it's a negative value, there is not limit. [timeout] param defines a time window for number of retries to occur. +- `Strategy.allForOne (decider : exn -> Directive) : SupervisorStrategy` - returns a supervisor strategy applicable to each supervised actor when any of them had faulted during execution. +- `Strategy.allForOne2 (retries : int) (timeout : TimeSpan) (decider : exn -> Directive) : SupervisorStrategy` - returns a supervisor strategy applicable to each supervised actor when any of them had faulted during execution. [retries] param defines a number of times, an actor could be restarted. If it's a negative value, there is not limit. [timeout] param defines a time window for number of retries to occur. + +Example: + + let aref = + spawnOpt system "my-actor" (actorOf myFunc) + [ SpawnOption.SupervisorStrategy (Strategy.oneForOne (fun error -> + match error with + | :? ArithmeticException -> Directive.Escalate + | _ -> SupervisorStrategy.DefaultDecider error )) ] + +### Publish/Subscribe support + +While you may use built-in set of the event stream methods (see: Event Streams), there is an option of using dedicated F# API functions: + +- `sub (channel: System.Type) (ref: ActorRef) (eventStream: Akka.Event.EventStream) : bool` - subscribes an actor reference to target channel of the provided event stream. Channels are associated with specific types of a message emitted by the publishers. +- `unsub (channel: System.Type) (ref: ActorRef) (eventStream: Akka.Event.EventStream) : bool` - unsubscribes an actor reference from target channel of the provided event stream. +- `pub (event: 'Event) (eventStream: Akka.Event.EventStream) : unit` - publishes an event on the provided event stream. Event channel is resolved from event's type. + +Example: + + type Message = + | Subscribe + | Unsubscribe + | Msg of ActorRef * string + + let subscriber = + spawn system "subscriber" + (actorOf2 (fun mailbox msg -> + let eventStream = mailbox.Context.System.EventStream + match msg with + | Msg (sender, content) -> printfn "%A says %s" (sender.Path) content + | Subscribe -> sub typeof mailbox.Self eventStream |> ignore + | Unsubscribe -> unsub typeof mailbox.Self eventStream |> ignore )) + + let publisher = + spawn system "publisher" + (actorOf2 (fun mailbox msg -> + pub msg mailbox.Context.System.EventStream)) + + subscriber actor scheduling - you may decide to schedule job in form of a function or a message automatically sent to target actor reference. + +F# API provides following scheduling functions: + +- `schedule (after: TimeSpan) (every: TimeSpan) (fn: unit -> unit) (scheduler: Scheduler): Async` - [cyclic, function] schedules a function to be called by the scheduler repeatedly. +- `scheduleOnce (after: TimeSpan) (fn: unit -> unit) (scheduler: Scheduler): Async` - [single, function] schedules a function to be called only once by the scheduler. +- `scheduleTell (after: TimeSpan) (every: TimeSpan) (message: 'Message) (receiver: ActorRef) (scheduler: Scheduler): Async` - [cyclic, message] schedules a message to be sent to the target actor reference by the scheduler repeatedly. +- `scheduleTellOnce (after: TimeSpan) (message: 'Message) (receiver: ActorRef) (scheduler: Scheduler): Async` - [single, message] schedules a message to be sent only once to the target actor reference, by the scheduler. + +### Logging + +F# API supports two groups of logging functions - one that operates directly on strings and second (which may be recognized by *f* suffix in function names) which operates using F# string formating features. Major difference is performance - first one is less powerful, but it's also faster than the second one. + +Both groups support logging on various levels (DEBUG, <default> INFO, WARNING and ERROR). Actor system's logging level may be managed through configuration, i.e.: + + akka { + actor { + # collection of loggers used inside actor system, specified by fully-qualified type name + loggers = [ "Akka.Event.DefaultLogger, Akka" ] + + # Options: OFF, ERROR, WARNING, INFO, DEBUG + logLevel = "DEBUG" + } + } + +F# API provides following logging methods: + +- `log (level : LogLevel) (mailbox : Actor<'Message>) (msg : string) : unit`, `logf (level : LogLevel) (mailbox : Actor<'Message>) (format:StringFormat<'T, 'Result>) : 'T` - both functions takes an `Akka.Event.LogLevel` enum parameter to specify log level explicitly. +- `logDebug`, `logDebugf` - message will be logged at Debug level. +- `logInfo`, `logInfof` - message will be logged at Info level. +- `logWarning`, `logWarningf` - message will be logged at Warning level. +- `logError`, `logError` - message will be logged at Error level. +- `logException (mailbox: Actor<'a>) (e : exn) : unit` - this function logs a message from provided `System.Exception` object at the Error level. + +### Interop with Task Parallel Library + +Since both TPL an Akka frameworks can be used for parallel processing, sometimes they need to work both inside the same application. + +To operate directly between `Async` results and actors, use `pipeTo` function (and it's abbreviations in form of `` operators) to inform actor about tasks ending their processing pipelines. Piping functions used on tasks will move async result directly to the mailbox of a target actor. + +Example: + + open System.IO + let handler (mailbox: Actor) msg = + match box msg with + | :? FileInfo as fi -> + let reader = new StreamReader(fi.OpenRead()) + reader.AsyncReadToEnd() |!> mailbox.Self + | :? string as content -> + printfn "File content: %s" content + | _ -> mailbox.Unhandled() + + let aref = spawn system "my-actor" (actorOf2 handler) + aref GetChildren(); - ActorRef Watch(ActorRef subject); - ActorRef Unwatch(ActorRef subject); /// /// diff --git a/src/core/Akka/Actor/Inbox.Actor.cs b/src/core/Akka/Actor/Inbox.Actor.cs index c7f3cb6987a..3f29b154400 100644 --- a/src/core/Akka/Actor/Inbox.Actor.cs +++ b/src/core/Akka/Actor/Inbox.Actor.cs @@ -78,7 +78,7 @@ public bool MessagePredicate(object msg) protected override bool Receive(object message) { - PatternMatch.Match(message) + message.Match() .With(get => { if (_messages.Count == 0) @@ -112,6 +112,7 @@ protected override bool Receive(object message) } }) .With(sw => Context.Watch(sw.Target)) + .With(sw => Context.Unwatch(sw.Target)) .With(() => { var now = DateTime.Now; diff --git a/src/core/Akka/Actor/Inbox.cs b/src/core/Akka/Actor/Inbox.cs index 3dd677c1da5..fd4b6da5755 100644 --- a/src/core/Akka/Actor/Inbox.cs +++ b/src/core/Akka/Actor/Inbox.cs @@ -62,6 +62,17 @@ public StartWatch(ActorRef target) public ActorRef Target { get; private set; } } + internal struct StopWatch + { + public StopWatch(ActorRef target) + : this() + { + Target = target; + } + + public ActorRef Target { get; private set; } + } + internal struct Kick { } // LinkedList wrapper instead of Queue? While it's used for queueing, however I expect a lot of churn around @@ -177,7 +188,7 @@ public int Compare(IQuery x, IQuery y) /// It can watch other actors lifecycle and contains inner actor, which could be passed /// as reference to other actors. /// - public interface Inboxable + public interface Inboxable : ICanWatch { /// /// Get a reference to internal actor. It may be for example registered in event stream. @@ -211,12 +222,6 @@ public interface Inboxable /// object ReceiveWhere(Predicate predicate, TimeSpan timeout); - /// - /// Makes internal actor watch a provided actor. - /// If it terminates a message will be received. - /// - void Watch(ActorRef target); - /// /// Makes an internal actor act as a proxy of given , /// which will be send to given actor. It means, @@ -253,14 +258,21 @@ private Inbox(TimeSpan defaultTimeout, ActorRef receiver, ActorSystem system) } public ActorRef Receiver { get; private set; } - + /// /// Make the inbox’s actor watch the actor such that /// reception of the message can then be awaited. /// - public void Watch(ActorRef target) + public ActorRef Watch(ActorRef subject) + { + Receiver.Tell(new StartWatch(subject)); + return subject; + } + + public ActorRef Unwatch(ActorRef subject) { - Receiver.Tell(new StartWatch(target)); + Receiver.Tell(new StopWatch(subject)); + return subject; } public void Send(ActorRef actorRef, object msg)