diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala index b3e5e743e2..5ea7b772df 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala @@ -12,7 +12,7 @@ package kafka.log.streamaspect import com.automq.stream.api.{Client, CreateStreamOptions, KeyValue, OpenStreamOptions} -import com.automq.stream.utils.FutureUtil +import com.automq.stream.utils.{FutureUtil, Systems} import io.netty.buffer.Unpooled import kafka.log.LocalLog.CleanedFileSuffix import kafka.log._ @@ -588,8 +588,8 @@ object ElasticLog extends Logging { private val APPEND_TIME_HIST = KafkaMetricsGroup.newHistogram("AppendTimeNanos") private val APPEND_CALLBACK_TIME_HIST = KafkaMetricsGroup.newHistogram("AppendCallbackTimeNanos") private val APPEND_ACK_TIME_HIST = KafkaMetricsGroup.newHistogram("AppendAckTimeNanos") - private val APPEND_CALLBACK_EXECUTOR: Array[ExecutorService] = new Array[ExecutorService](8) - private val READ_ASYNC_EXECUTOR: Array[ExecutorService] = new Array[ExecutorService](8) + private val APPEND_CALLBACK_EXECUTOR: Array[ExecutorService] = new Array[ExecutorService](Systems.CPU_CORES * 2) + private val READ_ASYNC_EXECUTOR: Array[ExecutorService] = new Array[ExecutorService](Systems.CPU_CORES * 4) for (i <- APPEND_CALLBACK_EXECUTOR.indices) { APPEND_CALLBACK_EXECUTOR(i) = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("log-append-callback-executor-" + i, true))