Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions benchmarks/run-benchmark
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,5 @@ name = ARGV[0]
date = Time.now.strftime("%Y-%m-%d")

exec("mkdir -p benchmarks/results/#{date}")
exec(
"sbt " +
"benchmarksPrev/clean benchmarksNext/clean " +
"'benchmarksPrev/jmh:run -o ../results/#{date}/#{name}-Prev.txt -i 10 -wi 10 -f 2 -t 1 monix.benchmarks.#{name}' " +
"'benchmarksNext/jmh:run -o ../results/#{date}/#{name}-Next.txt -i 10 -wi 10 -f 2 -t 1 monix.benchmarks.#{name}'"
)
exec("sbt benchmarksNext/clean 'benchmarksNext/jmh:run -o ../results/#{date}/#{name}-Next.txt -i 10 -wi 10 -f 2 -t 1 monix.benchmarks.#{name}'")
exec("sbt benchmarksPrev/clean 'benchmarksPrev/jmh:run -o ../results/#{date}/#{name}-Prev.txt -i 10 -wi 10 -f 2 -t 1 monix.benchmarks.#{name}'")
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ lazy val unidocSettings = Seq(

// Exclude monix.*.internal from ScalaDoc
sources in (ScalaUnidoc, unidoc) ~= (_ filterNot { file =>
file.getCanonicalPath matches "^.*monix\\.[^.]+\\.internal.*$"
// Exclude all internal Java files from documentation
file.getCanonicalPath matches "^.*monix.+?internal.*?\\.java$"
}),

scalacOptions in (ScalaUnidoc, unidoc) +=
Expand Down Expand Up @@ -394,7 +395,7 @@ lazy val benchmarksPrev = project.in(file("benchmarks/vprev"))
.settings(sharedSettings)
.settings(doNotPublishArtifact)
.settings(
libraryDependencies += "io.monix" %% "monix-reactive" % "3.0.0-M2"
libraryDependencies += "io.monix" %% "monix-reactive" % "3.0.0-d357cb1"
)

lazy val benchmarksNext = project.in(file("benchmarks/vnext"))
Expand Down
6 changes: 3 additions & 3 deletions monix-eval/shared/src/main/scala/monix/eval/Callback.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ object Callback {
if (isActive) {
isActive = false
try underlying.onSuccess(value) catch {
case NonFatal(ex) =>
case ex if NonFatal(ex) =>
r.reportFailure(ex)
}
}
Expand All @@ -160,7 +160,7 @@ object Callback {
if (isActive) {
isActive = false
try underlying.onError(ex) catch {
case NonFatal(err) =>
case err if NonFatal(err) =>
r.reportFailure(ex)
r.reportFailure(err)
}
Expand All @@ -172,7 +172,7 @@ object Callback {

def onSuccess(value: B): Unit =
try underlying.onSuccess(f(value)) catch {
case NonFatal(err) =>
case err if NonFatal(err) =>
underlying.onError(err)
}

Expand Down
10 changes: 5 additions & 5 deletions monix-eval/shared/src/main/scala/monix/eval/Coeval.scala
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ object Coeval extends CoevalInstancesLevel0 {
/** Promotes a non-strict value to a [[Coeval.Eager]]. */
def apply[A](f: => A): Eager[A] =
try Now(f) catch {
case NonFatal(ex) => Error(ex)
case ex if NonFatal(ex) => Error(ex)
}

/** Builds an [[Coeval.Eager Eager]] from a `scala.util.Try` */
Expand Down Expand Up @@ -942,7 +942,7 @@ object Coeval extends CoevalInstancesLevel0 {
try {
Now(thunk())
} catch {
case NonFatal(ex) => Error(ex)
case ex if NonFatal(ex) => Error(ex)
} finally {
// GC relief
thunk = null
Expand All @@ -969,11 +969,11 @@ object Coeval extends CoevalInstancesLevel0 {
override def apply(): A = f()

override def run: Eager[A] =
try Now(f()) catch { case NonFatal(e) => Error(e) }
try Now(f()) catch { case e if NonFatal(e) => Error(e) }
override def runAttempt: Either[Throwable, A] =
try Right(f()) catch { case NonFatal(e) => Left (e) }
try Right(f()) catch { case e if NonFatal(e) => Left (e) }
override def runTry: Try[A] =
try Success(f()) catch { case NonFatal(e) => Failure(e) }
try Success(f()) catch { case e if NonFatal(e) => Failure(e) }
}

/** Internal state, the result of [[Coeval.defer]] */
Expand Down
70 changes: 35 additions & 35 deletions monix-eval/shared/src/main/scala/monix/eval/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ import scala.util.{Failure, Success, Try}
* @define optionsDesc a set of [[monix.eval.Task.Options Options]]
* that determine the behavior of Task's run-loop.
*/
sealed abstract class Task[+A] extends Serializable { self =>
sealed abstract class Task[+A] extends Serializable {
import monix.eval.Task._

/** $runAsyncDesc
Expand All @@ -300,7 +300,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* a running task.
*/
def runAsync(implicit s: Scheduler): CancelableFuture[A] =
TaskRunLoop.startAsFuture(this, s, defaultOptions)
TaskRunLoop.startFuture(this, s, defaultOptions)

/** $runAsyncDesc
*
Expand All @@ -309,7 +309,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* @return $cancelableDesc
*/
def runAsync(cb: Callback[A])(implicit s: Scheduler): Cancelable =
TaskRunLoop.startLightWithCallback(self, s, defaultOptions, cb)
TaskRunLoop.startLight(this, s, defaultOptions, cb)

/** $runAsyncOptDesc
*
Expand All @@ -318,7 +318,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* @return $cancelableDesc
*/
def runAsyncOpt(implicit s: Scheduler, opts: Options): CancelableFuture[A] =
TaskRunLoop.startAsFuture(this, s, opts)
TaskRunLoop.startFuture(this, s, opts)

/** $runAsyncOptDesc
*
Expand All @@ -328,7 +328,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* @return $cancelableDesc
*/
def runAsyncOpt(cb: Callback[A])(implicit s: Scheduler, opts: Options): Cancelable =
TaskRunLoop.startLightWithCallback(self, s, opts, cb)
TaskRunLoop.startLight(this, s, opts, cb)

/** Similar to Scala's `Future#onComplete`, this method triggers
* the evaluation of a `Task` and invokes the given callback whenever
Expand Down Expand Up @@ -435,7 +435,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* source with a customizable trigger.
*/
final def delayExecution(timespan: FiniteDuration): Task[A] =
TaskDelayExecution(self, timespan)
TaskDelayExecution(this, timespan)

/** Returns a task that waits for the specified `trigger` to succeed
* before mirroring the result of the source.
Expand All @@ -454,7 +454,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* source with a simple timespan
*/
final def delayExecutionWith(trigger: Task[Any]): Task[A] =
TaskDelayExecutionWith(self, trigger)
TaskDelayExecutionWith(this, trigger)

/** Returns a task that executes the source immediately on `runAsync`,
* but before emitting the `onSuccess` result for the specified
Expand All @@ -467,7 +467,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* delay strategies depending on the signaled result.
*/
final def delayResult(timespan: FiniteDuration): Task[A] =
TaskDelayResult(self, timespan)
TaskDelayResult(this, timespan)

/** Returns a task that executes the source immediately on `runAsync`,
* but with the result delayed by the specified `selector`.
Expand All @@ -492,7 +492,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* @see [[delayResult]] for delaying with a simple timeout
*/
final def delayResultBySelector[B](selector: A => Task[B]): Task[A] =
TaskDelayResultBySelector(self, selector)
TaskDelayResultBySelector(this, selector)

/** Overrides the default [[monix.execution.Scheduler Scheduler]],
* possibly forcing an asynchronous boundary before execution
Expand Down Expand Up @@ -631,7 +631,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* boundary on execution
*/
final def executeOn(s: Scheduler, forceAsync: Boolean = true): Task[A] =
TaskExecuteOn(self, s, forceAsync)
TaskExecuteOn(this, s, forceAsync)

/** Mirrors the given source `Task`, but upon execution ensure
* that evaluation forks into a separate (logical) thread.
Expand All @@ -657,7 +657,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* start the run-loop in `runAsync`.
*/
final def executeWithFork: Task[A] =
Task.shift.flatMap(_ => self)
Task.shift.flatMap(_ => this)

/** Returns a new task that will execute the source with a different
* [[monix.execution.ExecutionModel ExecutionModel]].
Expand All @@ -675,7 +675,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* with which the source will get evaluated on `runAsync`
*/
final def executeWithModel(em: ExecutionModel): Task[A] =
TaskExecuteWithModel(self, em)
TaskExecuteWithModel(this, em)

/** Returns a new task that will execute the source with a different
* set of [[Task.Options Options]].
Expand All @@ -692,7 +692,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* upon `runAsync`
*/
final def executeWithOptions(f: Options => Options): Task[A] =
TaskExecuteWithOptions(self, f)
TaskExecuteWithOptions(this, f)

/** Introduces an asynchronous boundary at the current stage in the
* asynchronous processing pipeline.
Expand Down Expand Up @@ -724,7 +724,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* scheduler.
*/
final def asyncBoundary: Task[A] =
self.flatMap(r => Task.shift.map(_ => r))
this.flatMap(r => Task.shift.map(_ => r))

/** Introduces an asynchronous boundary at the current stage in the
* asynchronous processing pipeline, making processing to jump on
Expand Down Expand Up @@ -759,7 +759,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* @param s is the scheduler triggering the asynchronous boundary
*/
final def asyncBoundary(s: Scheduler): Task[A] =
self.flatMap(a => Task.shift(s).map(_ => a))
this.flatMap(a => Task.shift(s).map(_ => a))

/** Returns a new task that upon evaluation will execute the given
* function for the generated element, transforming the source into
Expand All @@ -769,7 +769,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* obviously nothing gets executed at this point.
*/
final def foreachL(f: A => Unit): Task[Unit] =
self.map { a => f(a); () }
this.map { a => f(a); () }

/** Triggers the evaluation of the source, executing the given
* function for the generated element.
Expand Down Expand Up @@ -836,7 +836,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* canceled prematurely
*/
final def doOnCancel(callback: Task[Unit]): Task[A] =
TaskDoOnCancel(self, callback)
TaskDoOnCancel(this, callback)

/** Creates a new [[Task]] that will expose any triggered error from
* the source.
Expand All @@ -846,7 +846,7 @@ sealed abstract class Task[+A] extends Serializable { self =>

/** Dematerializes the source's result from a `Try`. */
final def dematerialize[B](implicit ev: A <:< Try[B]): Task[B] =
self.asInstanceOf[Task[Try[B]]].flatMap(fromTry)
this.asInstanceOf[Task[Try[B]]].flatMap(fromTry)

/** Creates a new task that will try recovering from an error by
* matching it with another task using the given partial function.
Expand Down Expand Up @@ -874,7 +874,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* task until the function returns true.
*/
final def restartUntil(p: (A) => Boolean): Task[A] =
self.flatMap(a => if (p(a)) now(a) else self.restartUntil(p))
this.flatMap(a => if (p(a)) now(a) else this.restartUntil(p))

/** Creates a new task that in case of error will retry executing the
* source again and again, until it succeeds.
Expand All @@ -883,8 +883,8 @@ sealed abstract class Task[+A] extends Serializable { self =>
* will be `maxRetries + 1`.
*/
final def onErrorRestart(maxRetries: Long): Task[A] =
self.onErrorHandleWith(ex =>
if (maxRetries > 0) self.onErrorRestart(maxRetries-1)
this.onErrorHandleWith(ex =>
if (maxRetries > 0) this.onErrorRestart(maxRetries-1)
else raiseError(ex))

/** Creates a new task that in case of error will retry executing the
Expand All @@ -894,7 +894,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* will be `maxRetries + 1`.
*/
final def onErrorRestartIf(p: Throwable => Boolean): Task[A] =
self.onErrorHandleWith(ex => if (p(ex)) self.onErrorRestartIf(p) else raiseError(ex))
this.onErrorHandleWith(ex => if (p(ex)) this.onErrorRestartIf(p) else raiseError(ex))

/** Creates a new task that will handle any matching throwable that
* this task might emit.
Expand Down Expand Up @@ -923,18 +923,18 @@ sealed abstract class Task[+A] extends Serializable { self =>
* successful results
*/
final def memoize: Task[A] =
self match {
this match {
case Now(_) | Error(_) =>
self
this
case Eval(f) =>
f match {
case _:Coeval.Once[_] => self
case _:Coeval.Once[_] => this
case _ =>
val coeval = Coeval.Once(f)
Eval(coeval)
}
case ref: MemoizeSuspend[_] if ref.isCachingAll =>
self
this
case other =>
new MemoizeSuspend[A](() => other, cacheErrors = true)
}
Expand All @@ -950,14 +950,14 @@ sealed abstract class Task[+A] extends Serializable { self =>
* results and failures
*/
final def memoizeOnSuccess: Task[A] =
self match {
this match {
case Now(_) | Error(_) =>
self
this
case Eval(f) =>
val lf = LazyOnSuccess(f)
if (lf eq f) self else Eval(lf)
if (lf eq f) this else Eval(lf)
case _: MemoizeSuspend[_] =>
self
this
case other =>
new MemoizeSuspend[A](() => other, cacheErrors = false)
}
Expand All @@ -973,7 +973,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* Reactive Streams specification.
*/
final def toReactivePublisher(implicit s: Scheduler): org.reactivestreams.Publisher[A @uV] =
TaskToReactivePublisher[A](self)(s)
TaskToReactivePublisher[A](this)(s)

/** Returns a Task that mirrors the source Task but that triggers a
* `TimeoutException` in case the given duration passes without the
Expand All @@ -987,7 +987,7 @@ sealed abstract class Task[+A] extends Serializable { self =>
* source emitting any item.
*/
final def timeoutTo[B >: A](after: FiniteDuration, backup: Task[B]): Task[B] =
Task.chooseFirstOf(self, Task.unit.delayExecution(after)).flatMap {
Task.chooseFirstOf(this, Task.unit.delayExecution(after)).flatMap {
case Left(((a, futureB))) =>
futureB.cancel()
Task.now(a)
Expand Down Expand Up @@ -2151,7 +2151,7 @@ object Task extends TaskInstancesLevel1 {
private[eval] final class MemoizeSuspend[A](
f: () => Task[A],
private[eval] val cacheErrors: Boolean)
extends Task[A] { self =>
extends Task[A] {

private[eval] var thunk: () => Task[A] = f
private[eval] val state = Atomic(null : AnyRef)
Expand Down Expand Up @@ -2219,7 +2219,7 @@ object Task extends TaskInstancesLevel1 {
def unsafeStartTrampolined[A](source: Task[A], context: Context, cb: Callback[A]): Unit =
context.scheduler.execute(new TrampolinedRunnable {
def run(): Unit =
TaskRunLoop.startWithCallback(source, context, cb, null, null, context.frameRef())
TaskRunLoop.startFull(source, context, cb, null, null, context.frameRef())
})

/** Unsafe utility - starts the execution of a Task, by providing
Expand All @@ -2231,7 +2231,7 @@ object Task extends TaskInstancesLevel1 {
* what you're doing. Prefer [[Task.runAsync(cb* Task.runAsync]].
*/
def unsafeStartNow[A](source: Task[A], context: Context, cb: Callback[A]): Unit =
TaskRunLoop.startWithCallback(source, context, cb, null, null, context.frameRef())
TaskRunLoop.startFull(source, context, cb, null, null, context.frameRef())

private[this] final val neverRef: Async[Nothing] =
Async((_,_) => ())
Expand Down
Loading