Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions monix-tail/shared/src/main/scala/monix/tail/Iterant.scala
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,41 @@ sealed abstract class Iterant[F[_], A] extends Product with Serializable {
final def tail(implicit F: Sync[F]): Iterant[F, A] =
IterantTail(self)(F)

/** Applies a binary operator to a start value and all elements of
* this `Iterant`, going left to right and returns a new
* `Iterant` that emits on each step the result of the applied
* function.
*
* Similar to [[foldLeftL]], but emits the state on each
* step. Useful for modeling finite state machines.
*
* Example showing how state can be evolved and acted upon:
* {{{
* sealed trait State[+A] { def count: Int }
* case object Init extends State[Nothing] { def count = 0 }
* case class Current[A](current: A, count: Int) extends State[A]
*
* val scanned = source.scan(Init : State[A]) { (acc, a) =>
* acc match {
* case Init => Current(a, 1)
* case Current(_, count) => Current(a, count + 1)
* }
* }
*
* scanned
* .takeWhile(_.count < 10)
* .collect { case Current(a, _) => a }
* }}}
*
* @param initial is the initial state
* @param f is the function that evolves the current state
*
* @return a new iterant that emits all intermediate states being
* resulted from applying function `f`
*/
final def scan[S](initial: =>S)(f: (S, A) => S)(implicit F: Sync[F]): Iterant[F, S] =
IterantScan(self, initial, f)

/** Skips over [[Iterant.Suspend]] states, along with
* [[Iterant.NextCursor]] and [[Iterant.NextBatch]] states that
* signal empty collections.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import monix.tail.Iterant
import monix.tail.Iterant.{Halt, Last, Next, NextBatch, NextCursor, Suspend}
import monix.tail.batches.BatchCursor
import monix.tail.internal.IterantUtils.signalError
import scala.util.control.NonFatal
import monix.execution.misc.NonFatal

private[tail] object IterantConcat {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package monix.tail.internal

import cats.syntax.all._
import cats.effect.Sync
import monix.execution.misc.NonFatal
import monix.tail.Iterant
import monix.tail.Iterant.{Halt, Last, Next, NextBatch, NextCursor, Suspend}
import monix.tail.internal.IterantUtils._
import scala.util.control.NonFatal

private[tail] object IterantFilter {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import cats.syntax.all._
import monix.tail.Iterant
import monix.tail.Iterant.{Halt, Last, Next, NextBatch, NextCursor, Suspend}
import scala.collection.mutable
import scala.util.control.NonFatal
import monix.execution.misc.NonFatal

private[tail] object IterantFoldLeftL {
/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (c) 2014-2017 by The Monix Project Developers.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.tail
package internal

import cats.effect.Sync
import cats.syntax.all._
import monix.execution.misc.NonFatal
import monix.tail.Iterant.{Halt, Last, Next, NextBatch, NextCursor, Suspend}
import monix.tail.batches.BatchCursor
import monix.tail.internal.IterantUtils._
import scala.collection.mutable.ArrayBuffer

private[tail] object IterantScan {
/** Implementation for `Iterant#scan`. */
def apply[F[_], A, S](fa: Iterant[F, A], initial: => S, f: (S, A) => S)
(implicit F: Sync[F]): Iterant[F, S] = {

def processCursor(state: S, cursor: BatchCursor[A], rest: F[Iterant[F, A]], stop: F[Unit]) = {
if (!cursor.hasNext())
Suspend(rest.map(loop(state)), stop)
else if (cursor.recommendedBatchSize <= 1) {
val newState = f(state, cursor.next())
val next: F[Iterant[F, A]] =
if (cursor.hasNext()) F.pure(NextCursor(cursor, rest, stop))
else rest

Next(newState, next.map(loop(newState)), stop)
} else {
val buffer = ArrayBuffer.empty[S]
var toProcess = cursor.recommendedBatchSize
var newState = state

while (toProcess > 0 && cursor.hasNext()) {
newState = f(newState, cursor.next())
buffer += newState
toProcess -= 1
}

val next: F[Iterant[F, A]] =
if (cursor.hasNext()) F.pure(NextCursor(cursor, rest, stop))
else rest

val elems = BatchCursor.fromAnyArray[S](buffer.toArray[Any])
NextCursor(elems, next.map(loop(newState)), stop)
}
}

def loop(state: S)(fa: Iterant[F, A]): Iterant[F, S] =
try fa match {
case Next(a, rest, stop) =>
val newState = f(state, a)
Next(newState, rest.map(loop(newState)), stop)

case NextCursor(cursor, rest, stop) =>
processCursor(state, cursor, rest, stop)

case NextBatch(batch, rest, stop) =>
val cursor = batch.cursor()
processCursor(state, cursor, rest, stop)

case Suspend(rest, stop) =>
Suspend(rest.map(loop(state)), stop)

case Last(a) =>
Last(f(state, a))

case halt @ Halt(_) =>
halt.asInstanceOf[Iterant[F, S]]

} catch {
case NonFatal(e) =>
signalError(fa, e)
}

// Given that `initial` is a by-name value, we have
// to suspend
val task = F.delay {
try loop(initial)(fa)
catch { case NonFatal(e) => Halt[F, S](Some(e)) }
}
Suspend(task, F.unit)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import monix.tail.batches.{Batch, BatchCursor}
import org.scalacheck.Arbitrary

trait ArbitraryInstances extends monix.eval.ArbitraryInstances {
def arbitraryListToIterant[F[_], A](list: List[A], idx: Int)(implicit F: Sync[F]): Iterant[F, A] = {
def arbitraryListToIterant[F[_], A](list: List[A], idx: Int, allowErrors: Boolean = true)
(implicit F: Sync[F]): Iterant[F, A] = {

def loop(list: List[A], idx: Int): Iterant[F, A] =
list match {
case Nil =>
if (math.abs(idx % 4) != 0)
if (!allowErrors || math.abs(idx % 4) != 0)
Iterant[F].haltS(None)
else
Iterant[F].haltS(Some(DummyException("arbitrary")))
Expand All @@ -39,24 +41,22 @@ trait ArbitraryInstances extends monix.eval.ArbitraryInstances {
Iterant[F].lastS(x)

case ns =>
math.abs(idx % 6) match {
case 0 =>
math.abs(idx % 14) match {
case 0 | 1 =>
Iterant[F].nextS(ns.head, F.delay(loop(ns.tail, idx+1)), F.unit)
case 1 =>
case 2 | 3 =>
Iterant[F].suspend(F.delay(loop(list, idx+1)))
case 2 =>
case 4 | 5 =>
Iterant[F].suspendS(F.delay(loop(ns, idx + 1)), F.unit)
case n @ (6 | 7 | 8) =>
val (headSeq, tail) = list.splitAt(3)
val bs = if (idx % 7 < 3) 1 else 3
val cursor = BatchCursor.fromIterator(headSeq.toVector.iterator, bs)
val cursor = BatchCursor.fromIterator(headSeq.toVector.iterator, n - 5)
Iterant[F].nextCursorS(cursor, F.delay(loop(tail, idx+1)), F.unit)
case 3 =>
Iterant[F].suspendS(F.delay(loop(ns, idx + 1)), F.unit)
case 4 =>
case n @ (9 | 10 | 11) =>
val (headSeq, tail) = list.splitAt(3)
val bs = if (idx % 7 < 3) 1 else 3
val batch = Batch.fromSeq(headSeq.toVector, bs)
Iterant[F].nextBatchS(batch, F.delay(loop(tail, idx+1)), F.unit)
case 5 =>
val batch = Batch.fromSeq(headSeq.toVector, n - 8)
Iterant[F].nextBatchS(batch, F.delay(loop(tail, idx + 1)), F.unit)
case 12 | 13 =>
Iterant[F].nextBatchS(Batch.empty, F.delay(loop(ns, idx + 1)), F.unit)
}
}
Expand Down
11 changes: 10 additions & 1 deletion monix-tail/shared/src/test/scala/monix/tail/BaseTestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@

package monix.tail

import monix.execution.internal.Platform
import org.scalacheck.Test.Parameters

/** Just a marker for what we need to extend in the tests
* of `monix-tail`.
*/
trait BaseTestSuite extends monix.eval.BaseTestSuite with ArbitraryInstances
trait BaseTestSuite extends monix.eval.BaseTestSuite with ArbitraryInstances {
override lazy val checkConfig: Parameters =
Parameters.default
.withMinSuccessfulTests(if (Platform.isJVM) 200 else 20)
.withMaxDiscardRatio(if (Platform.isJVM) 5.0f else 50.0f)
.withMaxSize(24)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import scala.util.{Failure, Success}
object IterantBasicSuite extends BaseTestSuite {
test("arbitraryListToTaskStream works") { implicit s =>
check2 { (list: List[Int], i: Int) =>
val stream = arbitraryListToIterant[Task, Int](list, math.abs(i % 4)).onErrorIgnore
val stream = arbitraryListToIterant[Task, Int](list, math.abs(i % 4), allowErrors = false)
stream.toListL <-> Task.now(list)
}
}

test("arbitraryListToCoevalStream") { implicit s =>
check2 { (list: List[Int], i: Int) =>
val stream = arbitraryListToIterant[Coeval, Int](list, math.abs(i % 4)).onErrorIgnore
val stream = arbitraryListToIterant[Coeval, Int](list, math.abs(i % 4), allowErrors = false)
stream.toListL <-> Coeval.now(list)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ object IterantDropWhileSuite extends BaseTestSuite {

test("Iterant.dropWhile equivalence with List.dropWhile") { implicit s =>
check3 { (list: List[Int], idx: Int, p: Int => Boolean) =>
val stream = arbitraryListToIterant[Task, Int](list, math.abs(idx) + 1).onErrorIgnore
val stream = arbitraryListToIterant[Task, Int](list, math.abs(idx) + 1, allowErrors = false)
stream.dropWhile(p).toListL <-> stream.toListL.map(dropFromList(p))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ object IterantFlatMapSuite extends BaseTestSuite {
check2 { (l: List[Int], idx: Int) =>
val dummy = DummyException("dummy")
val list = if (l.isEmpty) List(1) else l
val source = arbitraryListToIterant[Coeval, Int](list, idx).onErrorIgnore
val source = arbitraryListToIterant[Coeval, Int](list, idx, allowErrors = false)
val received = source.flatMap(_ => Iterant[Coeval].raiseError[Int](dummy))
received <-> Iterant[Coeval].haltS[Int](Some(dummy))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import monix.eval.Coeval
object IterantHeadOptionSuite extends BaseTestSuite {
test("Iterant.headOptionL <-> List.headOption") { _ =>
check2 { (list: List[Int], idx: Int) =>
val iter = arbitraryListToIterant[Coeval, Int](list, math.abs(idx % 4)).onErrorIgnore
val iter = arbitraryListToIterant[Coeval, Int](list, math.abs(idx % 4), allowErrors = false)
iter.headOptionL <-> Coeval.now(list.headOption)
}
}

test("Iterant.headOption <-> List.headOption") { _ =>
check2 { (list: List[Int], idx: Int) =>
val iter = arbitraryListToIterant[Coeval, Int](list, math.abs(idx % 4)).onErrorIgnore
val iter = arbitraryListToIterant[Coeval, Int](list, math.abs(idx % 4), allowErrors = false)
iter.headOptionL.value == list.headOption
}
}
Expand Down
58 changes: 58 additions & 0 deletions monix-tail/shared/src/test/scala/monix/tail/IterantScanSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2014-2017 by The Monix Project Developers.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.tail

import monix.eval.Coeval
import monix.execution.exceptions.DummyException

object IterantScanSuite extends BaseTestSuite {
test("scan evolves state") { implicit s =>
check1 { (source: Iterant[Coeval, Int]) =>
sealed trait State[+A] { def count: Int }
case object Init extends State[Nothing] { def count = 0 }
case class Current[A](current: A, count: Int) extends State[A]

val scanned = source.scan(Init : State[Int]) { (acc, a) =>
acc match {
case Init => Current(a, 1)
case Current(_, count) => Current(a, count + 1)
}
}

val fa = scanned
.takeWhile(_.count < 10)
.collect { case Current(a, _) => a }

fa.toListL <-> source.take(10).toListL.map(_.take(9))
}
}

test("scan protects against exceptions initial") { implicit s =>
val dummy = DummyException("dummy")
val fa = Iterant[Coeval].of(1, 2, 3)
val r = fa.scan((throw dummy) : Int)((_, e) => e).attempt.toListL
assertEquals(r.value, List(Left(dummy)))
}

test("scan protects against exceptions in f") { implicit s =>
val dummy = DummyException("dummy")
val fa = Iterant[Coeval].of(1, 2, 3)
val r = fa.scan(0)((_, _) => throw dummy).attempt.toListL
assertEquals(r.value, List(Left(dummy)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object IterantTakeSuite extends BaseTestSuite {

test("Iterant[Task].take equivalence with List.take") { implicit s =>
check3 { (list: List[Int], idx: Int, nr: Int) =>
val stream = arbitraryListToIterant[Task, Int](list, math.abs(idx) + 1).onErrorIgnore
val stream = arbitraryListToIterant[Task, Int](list, math.abs(idx) + 1, allowErrors = false)
val length = list.length
val n =
if (nr == 0) 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object IterantTakeWhileSuite extends BaseTestSuite {

test("Iterant[Task].takeWhile equivalence with List.takeWhile") { implicit s =>
check3 { (list: List[Int], idx: Int, p: Int => Boolean) =>
val stream = arbitraryListToIterant[Task, Int](list, math.abs(idx) + 1)
val stream = arbitraryListToIterant[Task, Int](list, math.abs(idx) + 1, allowErrors = false)
stream.takeWhile(p).toListL <-> stream.toListL.map(_.takeWhile(p))
}
}
Expand All @@ -65,8 +65,7 @@ object IterantTakeWhileSuite extends BaseTestSuite {
test("Iterant[Coeval].takeWhile triggers early stop") { implicit s =>
check2 { (list: List[Int], idx: Int) =>
val cancelable = BooleanCancelable()
val stream = arbitraryListToIterant[Coeval, Int](list, math.abs(idx) + 1)
.onErrorIgnore
val stream = arbitraryListToIterant[Coeval, Int](list, math.abs(idx) + 1, allowErrors = false)
.doOnEarlyStop(Coeval.eval(cancelable.cancel()))

stream.takeWhile(_ => false).toListL.value == Nil &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ object IterantZipMapSuite extends BaseTestSuite {
}

test("Iterant.zipMap equivalence with List.zip") { implicit s =>
check3 { (stream1: Iterant[Task, Int], stream2: Iterant[Task, Int], f: (Int, Int) => Long) =>
check5 { (list1: List[Int], idx1: Int, list2: List[Int], idx2: Int, f: (Int, Int) => Long) =>
val stream1 = arbitraryListToIterant[Coeval, Int](list1, math.abs(idx1) + 1, allowErrors = false)
val stream2 = arbitraryListToIterant[Coeval, Int](list2, math.abs(idx2) + 1, allowErrors = false)

val received = stream1.zipMap(stream2)(f).toListL
val expected = Task.zipMap2(stream1.toListL, stream2.toListL)((l1, l2) => l1.zip(l2).map { case (a,b) => f(a,b) })
val expected = Coeval(list1.zip(list2).map { case (a,b) => f(a,b) })
received <-> expected
}
}
Expand Down