Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ Jisoo Park
https://github.com/guersam

Dawid Dworak
https://github.com/ddworak
https://github.com/ddworak

Ryo Fukumuro
https://github.com/rfkm
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,18 @@ class AsyncStateActionObservable[S,A](seed: => S, f: S => Task[(A,S)]) extends O
}

def loop(subscriber: Subscriber[A], state: S): Task[Unit] =
try f(state).flatMap { case (a, newState) =>
Task.fromFuture(subscriber.onNext(a)).flatMap {
case Continue => loop(subscriber, newState)
case Stop => Task.unit
try f(state).transformWith(
{ case (a, newState) =>
Task.fromFuture(subscriber.onNext(a)).flatMap {
case Continue => loop(subscriber, newState)
case Stop => Task.unit
}
},
{ ex =>
subscriber.onError(ex)
Task.unit
}
} catch {
) catch {
case NonFatal(ex) =>
Task.raiseError(ex)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ import monix.eval.Task
import monix.execution.Ack.Continue
import monix.execution.internal.Platform
import monix.execution.ExecutionModel.AlwaysAsyncExecution
import monix.execution.exceptions.DummyException
import monix.execution.schedulers.TestScheduler
import monix.reactive.Observable
import monix.reactive.observers.Subscriber

import scala.util.Failure

object AsyncStateActionObservableSuite extends TestSuite[TestScheduler] {
def setup() = TestScheduler()
def tearDown(s: TestScheduler): Unit = {
Expand Down Expand Up @@ -88,6 +91,15 @@ object AsyncStateActionObservableSuite extends TestSuite[TestScheduler] {
assert(!wasCompleted)
}

test("should protect against user code errors") { implicit s =>
val ex = DummyException("dummy")
val f = Observable.fromAsyncStateAction(intError(ex))(s.currentTimeMillis())
.runAsyncGetFirst

s.tick()
assertEquals(f.value, Some(Failure(ex)))
}

test("should respect the ExecutionModel") { scheduler =>
implicit val s = scheduler.withExecutionModel(AlwaysAsyncExecution)

Expand All @@ -109,6 +121,7 @@ object AsyncStateActionObservableSuite extends TestSuite[TestScheduler] {

def intAsync(seed: Long) = Task(int(seed))
def intNow(seed: Long) = Task.now(int(seed))
def intError(ex: Throwable)(seed: Long) = Task.raiseError[(Int, Long)](ex)

def int(seed: Long): (Int, Long) = {
// `&` is bitwise AND. We use the current seed to generate a new seed.
Expand Down