Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ final class ConcurrentQueue[F[_], A] private (
*/
@UnsafeProtocol
def tryPoll: F[Option[A]] = tryPollRef
private[this] val tryPollRef =
F.delay(Option(tryPollUnsafe()))

/** Fetches a value from the queue, or if the queue is empty it awaits
* asynchronously until a value is made available.
Expand Down Expand Up @@ -355,10 +357,6 @@ final class ConcurrentQueue[F[_], A] private (
private[this] val offerTest: Boolean => Boolean = x => x
private[this] val offerMap: Boolean => Unit = _ => ()

/** Cached implementation for [[tryPoll]]. */
private[this] val tryPollRef =
F.delay(Option(tryPollUnsafe()))

private def toSeq(buffer: ArrayBuffer[A]): Seq[A] =
buffer.toArray[Any].toSeq.asInstanceOf[Seq[A]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ private[monix] object Platform {
*
* It's always a power of 2, because then for
* applying the modulo operation we can just do:
*
* {{{
* val modulus = Platform.recommendedBatchSize - 1
* // ...
Expand All @@ -51,6 +52,13 @@ private[monix] object Platform {
*/
final val recommendedBatchSize: Int = 512

/** Recommended chunk size in unbounded buffer implementations that are chunked,
* or in chunked streams.
*
* Should be a power of 2.
*/
final val recommendedBufferChunkSize: Int = 128

/**
* Auto cancelable run loops are set to `false` if Monix
* is running on top of Scala.js.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,36 @@ private[monix] object Platform {
.getOrElse(1024)
}

/** Recommended chunk size in unbounded buffer implementations that are chunked,
* or in chunked streaming.
*
* Examples:
*
* - the default when no `chunkSizeHint` is specified in
* [[monix.execution.BufferCapacity.Unbounded BufferCapacity.Unbounded]]
* - the chunk size used in
* [[monix.reactive.OverflowStrategy.Unbounded OverflowStrategy.Unbounded]]
* - the default in
* [[monix.tail.Iterant.fromConsumer Iterant.fromConsumer]] or in
* [[monix.tail.Iterant.fromConsumer Iterant.fromChannel]]
*
* Can be configured by setting Java properties:
*
* <pre>
* java -Dmonix.environment.bufferChunkSize=128 \
* ...
* </pre>
*
* Should be a power of 2 or it gets rounded to one.
*/
val recommendedBufferChunkSize: Int = {
Option(System.getProperty("monix.environment.bufferChunkSize", ""))
.filter(s => s != null && s.nonEmpty)
.flatMap(s => Try(s.toInt).toOption)
.map(math.nextPowerOf2)
.getOrElse(256)
}

/** Default value for auto cancelable loops, set to `false`.
*
* On top of the JVM the default can be overridden by setting the following
Expand All @@ -82,10 +112,11 @@ private[monix] object Platform {
* which now recommends for this default to be `true` due to the design
* of its type classes.
*/
val autoCancelableRunLoops: Boolean =
val autoCancelableRunLoops: Boolean = {
Option(System.getProperty("monix.environment.autoCancelableRunLoops", ""))
.map(_.toLowerCase)
.forall(v => v != "no" && v != "false" && v != "0")
}

/**
* Default value for local context propagation loops is set to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ private[internal] abstract class FromCircularQueue[A](queue: ConcurrentCircularA
def fenceOffer(): Unit
def fencePoll(): Unit

final def isEmpty: Boolean =
queue.isEmpty

final def offer(elem: A): Int =
if (queue.offer(elem)) 0 else 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ private[internal] class FromJavaQueue[A](queue: util.Queue[A])
final def fenceOffer(): Unit = ()
final def fencePoll(): Unit = ()

final def isEmpty: Boolean =
queue.isEmpty

final def offer(elem: A): Int =
if (queue.offer(elem)) 0 else 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ private[internal] abstract class FromMessagePassingQueue[A](queue: MessagePassin
def fenceOffer(): Unit
def fencePoll(): Unit

final def isEmpty: Boolean =
queue.isEmpty
final def offer(elem: A): Int =
if (queue.offer(elem)) 0 else 1
final def poll(): A =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ private[internal] trait LowLevelConcurrentQueueBuilders {
* Builds an bounded `ConcurrentQueue` reference.
*/
private def unbounded[A](chunkSize: Option[Int], ct: ChannelType, fenced: Boolean): LowLevelConcurrentQueue[A] = {
val chunk = chunkSize.getOrElse(Platform.recommendedBatchSize)
val chunk = chunkSize.getOrElse(Platform.recommendedBufferChunkSize)

if (UnsafeAccess.IS_OPENJDK_COMPATIBLE) {
// Support for memory fences in Unsafe is only available in Java 8+
if (UnsafeAccess.HAS_JAVA8_INTRINSICS || !fenced) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,15 @@ object ChannelType {
* Enumeration for describing the type of the producer, with two
* possible values:
*
* - [[MultiProducer]]
* - [[MultiProducer]] (default)
* - [[SingleProducer]]
*
* This is often used to optimize the underlying buffer used.
* The multi-producer option is the safe default and specifies
* that multiple producers (threads, actors, etc) can push events
* concurrently, whereas the single-producer option specifies that
* a single producer can (sequentially) push events and can be used
* as an (unsafe) optimization.
*/
sealed abstract class ProducerSide(val value: String)
extends Serializable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package monix.execution.internal.collection

private[monix] trait LowLevelConcurrentQueue[A] extends Serializable {
def isEmpty: Boolean
def offer(a: A): Int
def poll(): A
def drainToBuffer(buffer: scala.collection.mutable.Buffer[A], limit: Int): Int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package monix.reactive.observers.buffers

import monix.execution.ChannelType
import monix.execution.ChannelType.MultiProducer
import monix.reactive.OverflowStrategy
import monix.reactive.OverflowStrategy._
import monix.reactive.observers.{BufferedSubscriber, Subscriber}

private[observers] trait BuildersImpl { self: BufferedSubscriber.type =>
def apply[A](subscriber: Subscriber[A], bufferPolicy: OverflowStrategy[A]): Subscriber[A] = {
def apply[A](subscriber: Subscriber[A], bufferPolicy: OverflowStrategy[A], producerType: ChannelType.ProducerSide = MultiProducer): Subscriber[A] = {
bufferPolicy match {
case Unbounded =>
SyncBufferedSubscriber.unbounded(subscriber)
Expand All @@ -48,7 +50,7 @@ private[observers] trait BuildersImpl { self: BufferedSubscriber.type =>
}
}

def synchronous[A](subscriber: Subscriber[A], bufferPolicy: OverflowStrategy.Synchronous[A]): Subscriber.Sync[A] = {
def synchronous[A](subscriber: Subscriber[A], bufferPolicy: OverflowStrategy.Synchronous[A], producerType: ChannelType.ProducerSide = MultiProducer): Subscriber.Sync[A] = {
bufferPolicy match {
case Unbounded =>
SyncBufferedSubscriber.unbounded(subscriber)
Expand All @@ -72,6 +74,6 @@ private[observers] trait BuildersImpl { self: BufferedSubscriber.type =>
}
}

def batched[A](underlying: Subscriber[List[A]], bufferSize: Int): Subscriber[A] =
def batched[A](underlying: Subscriber[List[A]], bufferSize: Int, producerType: ChannelType.ProducerSide = MultiProducer): Subscriber[A] =
BatchedBufferedSubscriber(underlying, bufferSize)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package monix.reactive.observers.buffers

import monix.eval.Coeval
import monix.execution.Ack
import monix.execution.Ack.{Continue, Stop}
import monix.execution.internal.collection.{JSArrayQueue, _}

import scala.util.control.NonFatal
import monix.execution.exceptions.BufferOverflowException
import monix.reactive.observers.{BufferedSubscriber, Subscriber}
Expand All @@ -31,7 +33,7 @@ import scala.util.{Failure, Success}
* [[monix.reactive.OverflowStrategy.DropNew DropNew]] overflow strategy.
*/
private[observers] final class SyncBufferedSubscriber[-A] private
(out: Subscriber[A], queue: EvictingQueue[A], onOverflow: Long => Option[A] = null)
(out: Subscriber[A], queue: EvictingQueue[A], onOverflow: Long => Coeval[Option[A]] = null)
extends BufferedSubscriber[A] with Subscriber.Sync[A] {

implicit val scheduler = out.scheduler
Expand Down Expand Up @@ -163,7 +165,7 @@ private[observers] final class SyncBufferedSubscriber[-A] private
if (onOverflow == null || droppedCount == 0)
null.asInstanceOf[A]
else {
val msg = onOverflow(droppedCount) match {
val msg = onOverflow(droppedCount).value() match {
case Some(value) => value
case None => null.asInstanceOf[A]
}
Expand Down Expand Up @@ -280,7 +282,7 @@ private[monix] object SyncBufferedSubscriber {
* for the [[monix.reactive.OverflowStrategy.DropNew DropNew]]
* overflow strategy.
*/
def dropNewAndSignal[A](underlying: Subscriber[A], bufferSize: Int, onOverflow: Long => Option[A]): Subscriber.Sync[A] = {
def dropNewAndSignal[A](underlying: Subscriber[A], bufferSize: Int, onOverflow: Long => Coeval[Option[A]]): Subscriber.Sync[A] = {
require(bufferSize > 1, "bufferSize must be strictly higher than 1")
val buffer = JSArrayQueue.bounded[A](bufferSize)
new SyncBufferedSubscriber[A](underlying, buffer, onOverflow)
Expand All @@ -303,7 +305,7 @@ private[monix] object SyncBufferedSubscriber {
* overflow strategy, with signaling of the number of events that
* were dropped.
*/
def dropOldAndSignal[A](underlying: Subscriber[A], bufferSize: Int, onOverflow: Long => Option[A]): Subscriber.Sync[A] = {
def dropOldAndSignal[A](underlying: Subscriber[A], bufferSize: Int, onOverflow: Long => Coeval[Option[A]]): Subscriber.Sync[A] = {
require(bufferSize > 1, "bufferSize must be strictly higher than 1")
val buffer = DropHeadOnOverflowQueue[AnyRef](bufferSize).asInstanceOf[EvictingQueue[A]]
new SyncBufferedSubscriber[A](underlying, buffer, onOverflow)
Expand All @@ -326,7 +328,7 @@ private[monix] object SyncBufferedSubscriber {
* overflow strategy, with signaling of the number of events that
* were dropped.
*/
def clearBufferAndSignal[A](underlying: Subscriber[A], bufferSize: Int, onOverflow: Long => Option[A]): Subscriber.Sync[A] = {
def clearBufferAndSignal[A](underlying: Subscriber[A], bufferSize: Int, onOverflow: Long => Coeval[Option[A]]): Subscriber.Sync[A] = {
require(bufferSize > 1, "bufferSize must be strictly higher than 1")
val buffer = DropAllOnOverflowQueue[AnyRef](bufferSize).asInstanceOf[EvictingQueue[A]]
new SyncBufferedSubscriber[A](underlying, buffer, onOverflow)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

package monix.reactive.observers.buffers

import monix.execution.Ack
import monix.execution.{Ack, ChannelType}
import monix.execution.Ack.{Continue, Stop}
import monix.execution.BufferCapacity.Unbounded
import monix.execution.ChannelType._
import monix.execution.atomic.Atomic
import monix.execution.atomic.PaddingStrategy.LeftRight256
import monix.execution.internal.math
import monix.execution.internal.collection.LowLevelConcurrentQueue
import monix.execution.internal.{Platform, math}

import scala.util.control.NonFatal
import monix.reactive.observers.{BufferedSubscriber, Subscriber}

Expand All @@ -33,7 +37,7 @@ import scala.util.{Failure, Success}
* [[BatchedBufferedSubscriber]].
*/
private[observers] abstract class AbstractBackPressuredBufferedSubscriber[A,R]
(out: Subscriber[R], _bufferSize: Int)
(out: Subscriber[R], _bufferSize: Int, pt: ChannelType.ProducerSide)
extends CommonBufferMembers with BufferedSubscriber[A] {

require(_bufferSize > 0, "bufferSize must be a strictly positive number")
Expand All @@ -42,8 +46,12 @@ private[observers] abstract class AbstractBackPressuredBufferedSubscriber[A,R]
private[this] val em = out.scheduler.executionModel
implicit final val scheduler = out.scheduler

protected final val queue: ConcurrentQueue[A] =
ConcurrentQueue.unbounded()
protected final val queue: LowLevelConcurrentQueue[A] =
LowLevelConcurrentQueue(
Unbounded(Some(scala.math.min(Platform.recommendedBufferChunkSize, bufferSize))),
ChannelType.assemble(pt, SingleConsumer),
fenced = false
)

private[this] val itemsToPush =
Atomic.withPadding(0, LeftRight256)
Expand All @@ -64,7 +72,7 @@ private[observers] abstract class AbstractBackPressuredBufferedSubscriber[A,R]
case Some(v) => v
}

backPressured.get match {
backPressured.get() match {
case null =>
if (toPush < bufferSize) {
queue.offer(elem)
Expand Down Expand Up @@ -180,7 +188,7 @@ private[observers] abstract class AbstractBackPressuredBufferedSubscriber[A,R]
private final def fastLoop(prevAck: Future[Ack], lastProcessed: Int, startIndex: Int): Unit = {
def stopStreaming(): Unit = {
downstreamIsComplete = true
val bp = backPressured.get
val bp = backPressured.get()
if (bp != null) bp.success(Stop)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

package monix.reactive.observers.buffers

import monix.execution.ChannelType
import monix.reactive.observers.Subscriber

/** A `BufferedSubscriber` implementation for the
* [[monix.reactive.OverflowStrategy.BackPressure BackPressure]]
* buffer overflow strategy.
*/
private[observers] final class BackPressuredBufferedSubscriber[A] private
(out: Subscriber[A], _bufferSize: Int)
extends AbstractBackPressuredBufferedSubscriber[A,A](out, _bufferSize) {
(out: Subscriber[A], _bufferSize: Int, pt: ChannelType.ProducerSide)
extends AbstractBackPressuredBufferedSubscriber[A,A](out, _bufferSize, pt) {

@volatile protected var p50, p51, p52, p53, p54, p55, p56, p57 = 5
@volatile protected var q50, q51, q52, q53, q54, q55, q56, q57 = 5
Expand All @@ -38,6 +39,11 @@ private[observers] final class BackPressuredBufferedSubscriber[A] private
}

private[observers] object BackPressuredBufferedSubscriber {
def apply[A](underlying: Subscriber[A], bufferSize: Int): BackPressuredBufferedSubscriber[A] =
new BackPressuredBufferedSubscriber[A](underlying, bufferSize)
}
def apply[A](
underlying: Subscriber[A],
bufferSize: Int,
producerType: ChannelType.ProducerSide): BackPressuredBufferedSubscriber[A] = {

new BackPressuredBufferedSubscriber[A](underlying, bufferSize, producerType)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@

package monix.reactive.observers.buffers

import monix.execution.ChannelType
import monix.execution.internal.Platform
import monix.reactive.observers.Subscriber

import scala.collection.mutable.ListBuffer

/** A `BufferedSubscriber` implementation for the
* [[monix.reactive.OverflowStrategy.BackPressure BackPressured]]
* buffer overflowStrategy that sends events in bundles.
*/
private[monix] final class BatchedBufferedSubscriber[A] private
(out: Subscriber[List[A]], _bufferSize: Int)
(out: Subscriber[List[A]], _bufferSize: Int, pt: ChannelType.ProducerSide)
extends AbstractBackPressuredBufferedSubscriber[A, ListBuffer[A]](
subscriberBufferToList(out), _bufferSize) { self =>
subscriberBufferToList(out),
_bufferSize,
pt
) { self =>

@volatile protected var p50, p51, p52, p53, p54, p55, p56, p57 = 5
@volatile protected var q50, q51, q52, q53, q54, q55, q56, q57 = 5
Expand All @@ -46,6 +51,9 @@ private[monix] final class BatchedBufferedSubscriber[A] private

private[monix] object BatchedBufferedSubscriber {
/** Builder for [[BatchedBufferedSubscriber]] */
def apply[A](underlying: Subscriber[List[A]], bufferSize: Int): BatchedBufferedSubscriber[A] =
new BatchedBufferedSubscriber[A](underlying, bufferSize)
}
def apply[A](
underlying: Subscriber[List[A]],
bufferSize: Int,
producerType: ChannelType.ProducerSide): BatchedBufferedSubscriber[A] =
new BatchedBufferedSubscriber[A](underlying, bufferSize, producerType)
}
Loading