Add localContextPropagation to Task.Options, implement tracing Local vars #444
Conversation
…es through a Local
…es through a Local
…h trampolined execution
…dRunLoop.startAsFuture
…monix into feature/LocalContext
… traced scheduler
Codecov Report
@@ Coverage Diff @@
## master #444 +/- ##
=========================================
+ Coverage 89.21% 89.32% +0.1%
=========================================
Files 351 356 +5
Lines 9525 9609 +84
Branches 1269 1264 -5
=========================================
+ Hits 8498 8583 +85
+ Misses 1027 1026 -1 |
alexandru
left a comment
There was a problem hiding this comment.
Changes look good, but I prefer that Options parameter to have a default value, not a globally implicit one. This is because we are breaking source compatibility otherwise.
| * be used to cancel a running task | ||
| */ | ||
| def runOnComplete(f: Try[A] => Unit)(implicit s: Scheduler): Cancelable = | ||
| def runOnComplete(f: Try[A] => Unit)(implicit s: Scheduler, opts: Options): Cancelable = |
There was a problem hiding this comment.
The opts value needs a default parameter, otherwise we are breaking source compatibility.
A call like runOnComplete(f)(ec) should still work.
There was a problem hiding this comment.
This can be done without problems.
| */ | ||
| def runAsync(implicit s: Scheduler): CancelableFuture[A] = | ||
| TaskRunLoop.startAsFuture(this, s) | ||
| def runAsync(implicit s: Scheduler, opts: Options): CancelableFuture[A] = |
There was a problem hiding this comment.
Similarly, I want opts to have a default value, because we need calls like runAsync(ec) to still work.
So do what you did above with opts: Options = defaultOptions.
There was a problem hiding this comment.
@alexandru just a note on this. I can only define a default value for only one overloaded runAsync function. If I define it on this one, the other one can't have it and that is why I chose the implicit default options val because it broke the code in many places for the runAsync with the callback function.
I can make that change but it will affect code in reactive module, so the impact is going to be bigger.
Right now the default value is defined in the first runAsync function:
def runAsync(cb: Callback[A])(implicit s: Scheduler, opts: Options = defaultOptions): Cancelable| */ | ||
| def runSyncMaybe(implicit s: Scheduler): Either[CancelableFuture[A], A] = { | ||
| val future = this.runAsync(s) | ||
| def runSyncMaybe(implicit s: Scheduler, opts: Options): Either[CancelableFuture[A], A] = { |
There was a problem hiding this comment.
Similarly, we need runSyncMaybe(ec) to work, so opts needs to have a default value.
There was a problem hiding this comment.
This an be done without problem.
| */ | ||
| def coeval(implicit s: Scheduler): Coeval[Either[CancelableFuture[A], A]] = | ||
| Coeval.eval(runSyncMaybe(s)) | ||
| def coeval(implicit s: Scheduler, opts: Options): Coeval[Either[CancelableFuture[A], A]] = |
There was a problem hiding this comment.
Here too, we need coeval(ec) to still work, otherwise we break source compatibility.
| */ | ||
| def foreach(f: A => Unit)(implicit s: Scheduler): CancelableFuture[Unit] = | ||
| foreachL(f).runAsync(s) | ||
| def foreach(f: A => Unit)(implicit s: Scheduler, opts: Options): CancelableFuture[Unit] = |
There was a problem hiding this comment.
Here source compatibility is maybe less important, but we need to be consistent.
| /** Converts the source `Task` to a `cats.effect.IO` value. */ | ||
| def toIO(implicit s: Scheduler): IO[A] = | ||
| TaskConversions.toIO(this)(s) | ||
| def toIO(implicit s: Scheduler, opts: Options): IO[A] = |
There was a problem hiding this comment.
We should not have this parameter in toIO, because it simply doesn't work and it's misleading the user.
IO is an equivalent data type, however we do not control its implementation. And the other option about enabling cancelable run-loops? IO is not cancelable.
So if this boundary is reached, we know that the user is not interested in local context propagation or cancelation or any future options we might have here.
| .map(_.toLowerCase) | ||
| .exists(v => v == "yes" || v == "true" || v == "1"), | ||
| localContextPropagation = | ||
| Option(System.getProperty("monix.environment.localContextPropagation", "")) |
There was a problem hiding this comment.
The System.getProperty calls should be in Platform.scala, see:
It's fine to add stuff to it because it is private[monix].
| private[eval] object TaskConversions { | ||
| /** Implementation for `Task#toIO`. */ | ||
| def toIO[A](source: Task[A])(implicit s: Scheduler): IO[A] = | ||
| def toIO[A](source: Task[A])(implicit s: Scheduler, opts: Task.Options): IO[A] = |
There was a problem hiding this comment.
As mentioned, I don't want the opts parameter in toIO.
|
@leandrob13 btw, sorry for the delay man, I've also been busy. |
…bject for JS and JVM
…/monix into leandrob13-feature/tracedOptions
|
As a status update, I took the liberty of doing some refactoring to move this ticket forward. So first of all I took a look at Twitter's own Local implementation to see how they did it. I'm actually pleased that you chose an immutable I do hope we made the right choice though. I see only benefits right now in using an immutable Map as our data structure and I hope we aren't wrong. Anyway, refactoring for greater consistency and for simplification is what I do best. So I hope you won't get upset about modifying your PR:
This PR is ready to be merged, however we've got two problems:
|
|
I have reverted the The issue is this — implicit parameters infect everything, including conversions from one type to another. For example So by requiring an For example I'm using Basically if we let In other news, I've ran benchmarks, comparing this with master. We have performance degradation since I'm also sorry for modifying your code @leandrob13, but you got me excited about this feature and it's a tough design problem at the same time, so thought it would be easier if we both worked on it for pushing it in |
|
@alexandru No worries, I am glad that this is moving forward. Everything you mentioned is about performance, API consistency and not breaking signatures so those are valid arguments. There is a style to respect and my work was based on Finagle, there is no problem for you to adapt it monix style. I am not upset about not using This is a first important step and I can assure you I'll be testing this and I hope it gets more people interested in this. Thanks for your help and I'll be waiting for the merge! |
Fixes #246.
Update:
This PR introduces the concept of
Local, inspired by Twitter's Local, which is aThreadLocalthat can be transported over asynchronous boundaries by supporting implementations — in Twitter's case being their Promise implementation.So we are introducing in
monix-execution:monix.execution.misc.Local— our own implementation ofLocal, inspired by Twitter's implementationmonix.execution.schedulers.TracingScheduler— wraps anySchedulerreference into an implementation that can transport locals over asynchronous boundariesmonix.eval.TaskLocal: the pure,Task-enabledLocalBesides
Localwhich has an implementation that literally keeps its state into aThreadLocal, the challenge is to transport these locals over asynchronous boundaries. So we've got:Taskwhich is now capable of transporting these locals over the async boundaries managed by its own run-loop, provided that it gets executed withOptions.localContextPropagationset totrue(it's set tofalseby default)TracingSchedulerwhich works forFutureand all abstractions that need anExecutionContextfor managing their async boundariesAn interesting implementation detail is that
Taskdoes not have this propagation enabled by default. This is because, for now at least, this is for users that want the propagation of locals and that know what they are doing.One way of doing that is to use
executeWithOptions:task.executeWithOptions(_.enableLocalContextPropagation) // triggers the actual execution .runAsyncAnother possibility is to use
runAsyncOpt:In effect what
ThreadLocalis for threads,TaskLocalis for tasks. Full example:This sample doesn't seem like much, but
Localis a thread-safe variable that can be used in the context ofFuture, andTaskLocalis meant for thread-safe and pure variables that can be used in the context ofTask.Original:
Follow-up PR to compare the original
TaskRunLoopimplementation with the one adapted forLocalpropagation as proposed in #429. The benchmarks are presented below.Tested on a 2.5 GHz Intel Dual Core i5, 16GB of RAM, SSD hard disk.
Benchmarks:
Master:feature/tracedOptions: