Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
4cf1d06
Added Local implementation
leandrob13 Sep 11, 2017
ef3c3e7
Added Local tests
leandrob13 Sep 11, 2017
310fbf5
Added TracingContext and TracingContextCompanion
leandrob13 Sep 11, 2017
870a473
Added CorrelationId as a TracingContext to enable propagation of valu…
leandrob13 Sep 11, 2017
65a2c04
Added CorrelationId as a TracingContext to enable propagation of valu…
leandrob13 Sep 11, 2017
aa96ffe
Added Local implementation for JS using ThreadLocal in executionJS
leandrob13 Sep 11, 2017
530bf59
Added TaskTracedRunLoop which helps to propagate local context throug…
leandrob13 Sep 11, 2017
4a0e16e
Added runAsyncTraced in Task so it can propagate local with TaskTrace…
leandrob13 Sep 11, 2017
714a8cd
Added TracingScheduler for propagating locals through async boundaries
leandrob13 Sep 11, 2017
74b4553
Added CorrelationId tests
leandrob13 Sep 11, 2017
67981c7
Merge branch 'master' into feature/LocalContext
leandrob13 Sep 18, 2017
28731c0
Merge branch 'master' of https://github.com/monix/monix into feature/…
leandrob13 Sep 20, 2017
c927b8e
Deleted unsued val and reformated TaskTracedRunLoop
leandrob13 Sep 21, 2017
26ba702
Merge branch 'feature/LocalContext' of https://github.com/leandrob13/…
leandrob13 Sep 21, 2017
3f12360
Changed the registration id from string UUID to a Local.Key class
leandrob13 Sep 21, 2017
82e8200
Removed Option usage from Local functions
leandrob13 Sep 21, 2017
48ddf5c
Updated js/Local with latest changes made to jvm version
leandrob13 Sep 21, 2017
b382918
Added traced scheduler to Scheduler.Implicits
leandrob13 Sep 21, 2017
93bf983
Reformated code, removed unnecessary executeWithTrace, moved implicit…
leandrob13 Sep 21, 2017
df03bbb
Fixed tests
leandrob13 Sep 21, 2017
bf4a090
Fixed compile errors
leandrob13 Sep 21, 2017
38bc2b6
Made CorrelationId final and improved scaladoc
leandrob13 Sep 21, 2017
21abac6
Improved scaladoc for Local
leandrob13 Sep 24, 2017
c44679c
Improved scaladoc for TracingContext
leandrob13 Sep 24, 2017
92692f7
Added traced function to monix.execution.Scheduler
leandrob13 Sep 25, 2017
2649c92
Added scaladoc to TracingScheduler
leandrob13 Sep 25, 2017
6b3214a
Added TracingSchedulerSuite
leandrob13 Sep 25, 2017
b385cbf
Merge branch 'master' of https://github.com/monix/monix into feature/…
leandrob13 Sep 25, 2017
2467e73
Merge branch 'master' into feature/LocalContext
leandrob13 Sep 25, 2017
1bb6f00
Merge branch 'master' into feature/LocalContext
alexandru Sep 27, 2017
52f8553
Merge branch 'master' into feature/LocalContext
leandrob13 Sep 27, 2017
6116b2d
Merge branch 'master' of https://github.com/monix/monix into feature/…
leandrob13 Sep 28, 2017
5a7f05c
Merge branch 'feature/LocalContext' of https://github.com/leandrob13/…
leandrob13 Sep 30, 2017
d42405f
Added localContextPropagation option to Task.Options
leandrob13 Sep 30, 2017
341678a
Added Local propagation to TaskRunLoop and TaskExecuteWithModel
leandrob13 Oct 1, 2017
afd3d4d
Removed asyncTraced from Task functions
leandrob13 Oct 1, 2017
5150117
Fixed tests
leandrob13 Oct 1, 2017
fb4c636
Merge branch 'feature/tracedOptions' of https://github.com/leandrob13…
leandrob13 Oct 5, 2017
5754577
Merge branch 'master' of https://github.com/monix/monix into feature/…
leandrob13 Oct 5, 2017
0b401e5
Removed the parameter passed local from functions of TaskRunLoop, man…
leandrob13 Oct 5, 2017
737c7cc
Removed any reference to Local.getContext from Task, managed now in T…
leandrob13 Oct 5, 2017
297c20e
Fixed CorrelationIdSuite tests
leandrob13 Oct 5, 2017
4f71d1e
Merge branch 'master' into feature/tracedOptions
alexandru Oct 8, 2017
f86097e
Polished some minor details in TaskRunLoop
leandrob13 Oct 9, 2017
fe7a96b
Added and fixed tests in CorrelationIdSuite
leandrob13 Oct 9, 2017
f5401cc
Extended Serializable to Local.Key and Local.LocalContext
leandrob13 Oct 9, 2017
438a33e
Moved Local to execution shared files
leandrob13 Oct 9, 2017
2f8e4a5
Replaced the Runner instance with ShiftedRunnable in TracingScheduler
leandrob13 Oct 11, 2017
d25de07
Added globalTuning @define in Implicits§ scaladoc
leandrob13 Oct 11, 2017
9371ba6
Added Task.Options as an implicit argument in runAsync, added the par…
leandrob13 Oct 12, 2017
829bde3
Fixed CorrelationIdSuite tests
leandrob13 Oct 12, 2017
9fb17aa
Made defaultOptions an implicit val in Task companion object to help …
leandrob13 Oct 14, 2017
448cbdb
removed the default Task.Options implicit value from overloaded runAsync
leandrob13 Oct 14, 2017
82f4914
Fixed tests by removing explicit scheduler parameter passing
leandrob13 Oct 14, 2017
dc8822b
Added Task.Options param to first runAync function and its default value
leandrob13 Oct 15, 2017
5d759bf
Added scaladoc for options in TaskApp
leandrob13 Oct 15, 2017
6a83959
Passed Task.defaultOptions to runAsync main call in js.TaskApp
leandrob13 Oct 15, 2017
aa32903
Reverted use of defaultOptions as implicit
leandrob13 Oct 28, 2017
04acd14
Defined default implicit values for runAsync usages with callback in …
leandrob13 Oct 28, 2017
dcc0d04
Added default implicit value for Options in IterantToReactivePublisher
leandrob13 Oct 28, 2017
b803288
Fixed tests
leandrob13 Oct 28, 2017
dbed73f
Moved the System.getProperties calls for defaultOptions to Platform o…
leandrob13 Oct 28, 2017
db2d324
Merge branch 'master' of https://github.com/monix/monix into feature/…
leandrob13 Oct 28, 2017
fd65835
Merge branch 'feature/tracedOptions' of https://github.com/leandrob13…
alexandru Nov 12, 2017
3b292e3
Refactoring, ScalaDoc, add TaskLocal
alexandru Nov 12, 2017
babf5c8
Add Leandro Bolivar to AUTHORS
alexandru Nov 12, 2017
54c5209
Fix ScalaDoc
alexandru Nov 12, 2017
73699b8
Merge branch 'master' into feature/tracedOptions
alexandru Nov 13, 2017
e05666f
Revert signature of runAsync to not take implicit Options
alexandru Nov 13, 2017
27e750f
Make defaultOptions non-implicit
alexandru Nov 13, 2017
f5bcf58
Fix TaskApp, with tests
alexandru Nov 13, 2017
b0aae74
Fix ScalaDoc in TaskLocal
alexandru Nov 13, 2017
3c4c5cd
Re-add Task.runAsyncOpt
alexandru Nov 13, 2017
94e8be8
Final cleanups
alexandru Nov 13, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ https://github.com/avasil

A. Alonso Dominguez
https://github.com/alonsodomin

Leandro Bolivar
https://github.com/leandrob13
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ lazy val benchmarksPrev = project.in(file("benchmarks/vprev"))
.settings(sharedSettings)
.settings(doNotPublishArtifact)
.settings(
libraryDependencies += "io.monix" %% "monix-reactive" % "2.3.0"
libraryDependencies += "io.monix" %% "monix-reactive" % "2.3.2"
)

lazy val benchmarksNext = project.in(file("benchmarks/vnext"))
Expand Down
10 changes: 9 additions & 1 deletion monix-eval/js/src/main/scala/monix/eval/TaskApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,16 @@ trait TaskApp {
protected val scheduler: Coeval[Scheduler] =
Coeval.evalOnce(Scheduler.global)

/** [[monix.eval.Task.Options Options]] for executing the
* [[Task]] action. The default value is defined in
* [[monix.eval.Task.defaultOptions defaultOptions]],
* but can be overridden.
*/
protected val options: Coeval[Task.Options] =
Coeval.evalOnce(Task.defaultOptions)

@JSExport
final def main(args: Array[String]): Unit = {
run(args).runAsync(scheduler.value)
run(args).runAsyncOpt(scheduler.value, options.value)
}
}
28 changes: 28 additions & 0 deletions monix-eval/js/src/test/scala/monix/eval/TaskAppSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package monix.eval

import minitest.SimpleTestSuite
import monix.eval.Task.Options
import monix.execution.schedulers.TestScheduler
import scala.concurrent.Promise

object TaskAppSuite extends SimpleTestSuite {
test("runl works") {
Expand Down Expand Up @@ -47,4 +49,30 @@ object TaskAppSuite extends SimpleTestSuite {
app.main(Array.empty); testS.tick()
assertEquals(wasExecuted, true)
}

testAsync("options are configurable") {
import monix.execution.Scheduler.Implicits.global

val opts = Task.defaultOptions
assert(!opts.localContextPropagation, "!opts.localContextPropagation")
val opts2 = opts.enableLocalContextPropagation
assert(opts2.localContextPropagation, "opts2.localContextPropagation")

val p = Promise[Options]()
val exposeOpts =
Task.unsafeCreate[Task.Options] { (ctx, cb) =>
cb.onSuccess(ctx.options)
}

val app = new TaskApp {
override val options = Coeval(opts2)
override def runc =
exposeOpts.map { x => p.success(x) }
}

app.main(Array.empty)
for (r <- p.future) yield {
assertEquals(r, opts2)
}
}
}
10 changes: 9 additions & 1 deletion monix-eval/jvm/src/main/scala/monix/eval/TaskApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,16 @@ trait TaskApp {
protected val scheduler: Coeval[Scheduler] =
Coeval.evalOnce(Scheduler.global)

/** [[monix.eval.Task.Options Options]] for executing the
* [[Task]] action. The default value is defined in
* [[monix.eval.Task.defaultOptions defaultOptions]],
* but can be overridden.
*/
protected val options: Coeval[Task.Options] =
Coeval.evalOnce(Task.defaultOptions)

final def main(args: Array[String]): Unit = {
val f = run(args).runAsync(scheduler.value)
val f = run(args).runAsyncOpt(scheduler.value, options.value)
Await.result(f, Duration.Inf)
}
}
28 changes: 28 additions & 0 deletions monix-eval/jvm/src/test/scala/monix/eval/TaskAppSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package monix.eval

import minitest.SimpleTestSuite
import monix.eval.Task.Options
import scala.concurrent.Promise

object TaskAppSuite extends SimpleTestSuite {
test("runl works") {
Expand All @@ -40,4 +42,30 @@ object TaskAppSuite extends SimpleTestSuite {
app.main(Array.empty)
assert(wasExecuted, "wasExecuted")
}

testAsync("options are configurable") {
import monix.execution.Scheduler.Implicits.global

val opts = Task.defaultOptions
assert(!opts.localContextPropagation, "!opts.localContextPropagation")
val opts2 = opts.enableLocalContextPropagation
assert(opts2.localContextPropagation, "opts2.localContextPropagation")

val p = Promise[Options]()
val exposeOpts =
Task.unsafeCreate[Task.Options] { (ctx, cb) =>
cb.onSuccess(ctx.options)
}

val app = new TaskApp {
override val options = Coeval(opts2)
override def runc =
exposeOpts.map { x => p.success(x) }
}

app.main(Array.empty)
for (r <- p.future) yield {
assertEquals(r, opts2)
}
}
}
151 changes: 99 additions & 52 deletions monix-eval/shared/src/main/scala/monix/eval/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import monix.execution.atomic.Atomic
import monix.execution.cancelables.StackedCancelable
import monix.execution.internal.Platform
import monix.execution.misc.{NonFatal, ThreadLocal}
import monix.execution.schedulers.TrampolinedRunnable

import scala.annotation.unchecked.{uncheckedVariance => uV}
import scala.collection.generic.CanBuildFrom
Expand Down Expand Up @@ -260,59 +261,87 @@ import scala.util.{Failure, Success, Try}
* the injected `ExecutionModel`. If you want a different behavior,
* you need to execute the `Task` reference with a different scheduler.
*
* @define runAsyncOptDesc Triggers the asynchronous execution,
* much like normal `runAsync`, but includes the ability
* to specify [[monix.eval.Task.Options Options]] that
* can modify the behavior of the run-loop.
*
* @define runAsyncDesc Triggers the asynchronous execution.
*
* Without invoking `runAsync` on a `Task`, nothing
* gets evaluated, as a `Task` has lazy behavior.
*
* @define schedulerDesc is an injected
* [[monix.execution.Scheduler Scheduler]] that gets used
* whenever asynchronous boundaries are needed when
* evaluating the task
*
* @define callbackDesc is a callback that will be invoked upon
* completion
*
* @define cancelableDesc a [[monix.execution.Cancelable Cancelable]]
* that can be used to cancel a running task
*
* @define optionsDesc a set of [[monix.eval.Task.Options Options]]
* that determine the behavior of Task's run-loop.
*/
sealed abstract class Task[+A] extends Serializable { self =>
import monix.eval.Task._

/** $runAsyncDesc
*
* @param cb is a callback that will be invoked upon completion.
*
* @param s is an injected [[monix.execution.Scheduler Scheduler]]
* that gets used whenever asynchronous boundaries are needed
* when evaluating the task
*
* @return a [[monix.execution.Cancelable Cancelable]] that can
* be used to cancel a running task
* @return a [[monix.execution.CancelableFuture CancelableFuture]]
* that can be used to extract the result or to cancel
* a running task.
*/
def runAsync(implicit s: Scheduler): CancelableFuture[A] =
TaskRunLoop.startAsFuture(this, s, defaultOptions)

/** $runAsyncDesc
*
* @param cb $callbackDesc
* @param s $schedulerDesc
* @return $cancelableDesc
*/
def runAsync(cb: Callback[A])(implicit s: Scheduler): Cancelable =
TaskRunLoop.startLightWithCallback(self, s, cb)
TaskRunLoop.startLightWithCallback(self, s, cb, defaultOptions)

/** $runAsyncOptDesc
*
* @param s $schedulerDesc
* @param opts $optionsDesc
* @return $cancelableDesc
*/
def runAsyncOpt(implicit s: Scheduler, opts: Options): CancelableFuture[A] =
TaskRunLoop.startAsFuture(this, s, opts)

/** $runAsyncOptDesc
*
* @param cb $callbackDesc
* @param s $schedulerDesc
* @param opts $optionsDesc
* @return $cancelableDesc
*/
def runAsyncOpt(cb: Callback[A])(implicit s: Scheduler, opts: Options): Cancelable =
TaskRunLoop.startLightWithCallback(self, s, cb, opts)

/** Similar to Scala's `Future#onComplete`, this method triggers
* the evaluation of a `Task` and invokes the given callback whenever
* the result is available.
*
* @param f is a callback that will be invoked upon completion.
*
* @param s is an injected [[monix.execution.Scheduler Scheduler]]
* that gets used whenever asynchronous boundaries are needed
* when evaluating the task
*
* @return a [[monix.execution.Cancelable Cancelable]] that can
* be used to cancel a running task
* @param f $callbackDesc
* @param s $schedulerDesc
* @return $cancelableDesc
*/
def runOnComplete(f: Try[A] => Unit)(implicit s: Scheduler): Cancelable =
runAsync(new Callback[A] {
def onSuccess(value: A): Unit = f(Success(value))
def onError(ex: Throwable): Unit = f(Failure(ex))
})

/** $runAsyncDesc
*
* @param s is an injected [[monix.execution.Scheduler Scheduler]]
* that gets used whenever asynchronous boundaries are needed
* when evaluating the task
*
* @return a [[monix.execution.CancelableFuture CancelableFuture]]
* that can be used to extract the result or to cancel
* a running task.
*/
def runAsync(implicit s: Scheduler): CancelableFuture[A] =
TaskRunLoop.startAsFuture(this, s)
})(s)

/** Tries to execute the source synchronously.
*
Expand Down Expand Up @@ -1833,13 +1862,22 @@ object Task extends TaskInstancesLevel1 {
type FrameIndex = Int

/** Set of options for customizing the task's behavior.
*
* See [[Task.defaultOptions]] for the default `Options` instance
* used by [[Task!.runAsync(implicit* .runAsync]].
*
* @param autoCancelableRunLoops should be set to `true` in
* case you want `flatMap` driven loops to be
* auto-cancelable. Defaults to `false`.
*
* @param localContextPropagation should be set to `true` in
* case you want the [[monix.execution.misc.Local Local]]
* variables to be propagated on async boundaries.
* Defaults to `false`.
*/
final case class Options(
autoCancelableRunLoops: Boolean) {
autoCancelableRunLoops: Boolean,
localContextPropagation: Boolean) {

/** Creates a new set of options from the source, but with
* the [[autoCancelableRunLoops]] value set to `true`.
Expand All @@ -1852,34 +1890,42 @@ object Task extends TaskInstancesLevel1 {
*/
def disableAutoCancelableRunLoops: Options =
copy(autoCancelableRunLoops = false)

/** Creates a new set of options from the source, but with
* the [[localContextPropagation]] value set to `true`.
*/
def enableLocalContextPropagation: Options =
copy(localContextPropagation = true)

/** Creates a new set of options from the source, but with
* the [[localContextPropagation]] value set to `false`.
*/
def disableLocalContextPropagation: Options =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to indicate that false is the default.

copy(localContextPropagation = false)
}

/** Default [[Options]] to use for [[Task]] evaluation,
* thus:
*
* - `autoCancelableRunLoops` is `false` by default
* - `localContextPropagation` is `false` by default
*
* On top of the JVM the default can be overridden by
* setting the following system properties:
*
* - `monix.environment.autoCancelableRunLoops`
* (`true`, `yes` or `1` for enabling)
*
* - `monix.environment.localContextPropagation`
* (`true`, `yes` or `1` for enabling)
*
* @see [[Task.Options]]
*/
val defaultOptions: Options = {
if (Platform.isJS)
// $COVERAGE-OFF$
Options(autoCancelableRunLoops = false)
// $COVERAGE-ON$
else
Options(
autoCancelableRunLoops =
Option(System.getProperty("monix.environment.autoCancelableRunLoops", ""))
.map(_.toLowerCase)
.exists(v => v == "yes" || v == "true" || v == "1")
)
}
val defaultOptions: Options =
Options(
autoCancelableRunLoops = Platform.autoCancelableRunLoops,
localContextPropagation = Platform.localContextPropagation
)

/** A reference that boxes a [[FrameIndex]] possibly using a thread-local.
*
Expand Down Expand Up @@ -2000,11 +2046,11 @@ object Task extends TaskInstancesLevel1 {

object Context {
/** Initialize fresh [[Context]] reference. */
def apply(s: Scheduler): Context = {
def apply(s: Scheduler, opts: Options): Context = {
val conn = StackedCancelable()
val em = s.executionModel
val frameRef = FrameIndexRef(em)
Context(s, conn, frameRef, defaultOptions)
Context(s, conn, frameRef, opts)
}
}

Expand Down Expand Up @@ -2160,9 +2206,10 @@ object Task extends TaskInstancesLevel1 {
* and `Task.fork`.
*/
def unsafeStartTrampolined[A](source: Task[A], context: Context, cb: Callback[A]): Unit =
context.scheduler.executeTrampolined { () =>
TaskRunLoop.startWithCallback(source, context, cb, null, null, context.frameRef())
}
context.scheduler.execute(new TrampolinedRunnable {
def run(): Unit =
TaskRunLoop.startWithCallback(source, context, cb, null, null, context.frameRef())
})

/** Unsafe utility - starts the execution of a Task, by providing
* the needed [[monix.execution.Scheduler Scheduler]],
Expand All @@ -2180,25 +2227,25 @@ object Task extends TaskInstancesLevel1 {

/** Internal, reusable reference. */
private final val nowConstructor: (Any => Task[Nothing]) =
((a: Any) => new Now(a)).asInstanceOf[Any => Task[Nothing]]
((a: Any) => Now(a)).asInstanceOf[Any => Task[Nothing]]
/** Internal, reusable reference. */
private final val raiseConstructor: (Throwable => Task[Nothing]) =
e => new Error(e)
e => Error(e)

/** Used as optimization by [[Task.attempt]]. */
private object AttemptTask extends Transformation[Any, Task[Either[Throwable, Any]]] {
override def apply(a: Any): Task[Either[Throwable, Any]] =
new Now(Right(a))
Now(Right(a))
override def error(e: Throwable): Task[Either[Throwable, Any]] =
new Now(Left(e))
Now(Left(e))
}

/** Used as optimization by [[Task.materialize]]. */
private object MaterializeTask extends Transformation[Any, Task[Try[Any]]] {
override def apply(a: Any): Task[Try[Any]] =
new Now(Success(a))
Now(Success(a))
override def error(e: Throwable): Task[Try[Any]] =
new Now(Failure(e))
Now(Failure(e))
}
}

Expand Down
Loading