Skip to content

Commit

Permalink
support jdk9 forkjoinpool maximum-pool-size (#485)
Browse files Browse the repository at this point in the history
* support jdk9 forkjoinpool maximum-pool-size

Update PekkoJdk9ForkJoinPool.scala

Update ForkJoinPoolConstants.scala

scala 2.12 compile issue

review comments

use methodhandle

review comments

* refactor imports

Update PekkoJdk9ForkJoinPool.scala
  • Loading branch information
pjfanning authored Mar 30, 2024
1 parent 9e54e46 commit a2835b0
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 6 deletions.
4 changes: 4 additions & 0 deletions actor/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,10 @@ pekko {
# Setting to "FIFO" to use queue like peeking mode which "poll" or "LIFO" to use stack
# like peeking mode which "pop".
task-peeking-mode = "FIFO"

# This config is new in Pekko v1.1.0 and only has an effect if you are running with JDK 9 and above.
# Read the documentation on `java.util.concurrent.ForkJoinPool` to find out more. Default in hex is 0x7fff.
maximum-pool-size = 32767
}

# This will be used if you have set "executor = "thread-pool-executor""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.dispatch

import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.dispatch.ForkJoinExecutorConfigurator.PekkoForkJoinTask

import java.util.concurrent.{ ForkJoinPool, ForkJoinTask, TimeUnit }

/**
* INTERNAL PEKKO USAGE ONLY
*
* An alternative version of [[ForkJoinExecutorConfigurator.PekkoForkJoinPool]]
* that supports the `maximumPoolSize` feature available in [[java.util.concurrent.ForkJoinPool]] in JDK9+.
*/
@InternalApi
private[dispatch] final class PekkoJdk9ForkJoinPool(
parallelism: Int,
threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
maximumPoolSize: Int,
unhandledExceptionHandler: Thread.UncaughtExceptionHandler,
asyncMode: Boolean)
extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, asyncMode,
0, maximumPoolSize, 1, null, ForkJoinPoolConstants.DefaultKeepAliveMillis, TimeUnit.MILLISECONDS)
with LoadMetrics {

override def execute(r: Runnable): Unit =
if (r ne null)
super.execute(
(if (r.isInstanceOf[ForkJoinTask[_]]) r else new PekkoForkJoinTask(r)).asInstanceOf[ForkJoinTask[Any]])
else
throw new NullPointerException("Runnable was null")

def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism()
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@

package org.apache.pekko.dispatch

import com.typesafe.config.Config

import java.lang.invoke.{ MethodHandle, MethodHandles, MethodType }
import java.util.concurrent.{ ExecutorService, ForkJoinPool, ForkJoinTask, ThreadFactory }
import scala.util.Try

import com.typesafe.config.Config
import org.apache.pekko.util.JavaVersion

object ForkJoinExecutorConfigurator {

Expand Down Expand Up @@ -84,12 +88,41 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
class ForkJoinExecutorServiceFactory(
val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
val parallelism: Int,
val asyncMode: Boolean)
val asyncMode: Boolean,
val maxPoolSize: Int)
extends ExecutorServiceFactory {

def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
parallelism: Int,
asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap)

private def pekkoJdk9ForkJoinPoolClassOpt: Option[Class[_]] =
Try(Class.forName("org.apache.pekko.dispatch.PekkoJdk9ForkJoinPool")).toOption

private lazy val pekkoJdk9ForkJoinPoolHandleOpt: Option[MethodHandle] = {
if (JavaVersion.majorVersion == 8) {
None
} else {
pekkoJdk9ForkJoinPoolClassOpt.map { clz =>
val methodHandleLookup = MethodHandles.lookup()
val mt = MethodType.methodType(classOf[Unit], classOf[Int],
classOf[ForkJoinPool.ForkJoinWorkerThreadFactory],
classOf[Int], classOf[Thread.UncaughtExceptionHandler], classOf[Boolean])
methodHandleLookup.findConstructor(clz, mt)
}
}
}

def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) =
this(threadFactory, parallelism, asyncMode = true)
def createExecutorService: ExecutorService =
new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode)

def createExecutorService: ExecutorService = pekkoJdk9ForkJoinPoolHandleOpt match {
case Some(handle) =>
handle.invokeExact(parallelism, threadFactory, maxPoolSize,
MonitorableThreadFactory.doNothing, asyncMode).asInstanceOf[ExecutorService]
case _ =>
new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode)
}
}

final def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
Expand All @@ -115,6 +148,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
config.getInt("parallelism-min"),
config.getDouble("parallelism-factor"),
config.getInt("parallelism-max")),
asyncMode)
asyncMode,
config.getInt("maximum-pool-size"))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.dispatch

private[dispatch] object ForkJoinPoolConstants {
final val MaxCap: Int = 0x7FFF // 32767
final val DefaultKeepAliveMillis: Long = 60000
}
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ lazy val actor = pekkoModule("actor")
.settings(AddMetaInfLicenseFiles.actorSettings)
.settings(VersionGenerator.settings)
.settings(serialversionRemoverPluginSettings)
.enablePlugins(BoilerplatePlugin, SbtOsgi)
.enablePlugins(BoilerplatePlugin, SbtOsgi, Jdk9)

lazy val actorTests = pekkoModule("actor-tests")
.dependsOn(testkit % "compile->compile;test->test", actor)
Expand Down

0 comments on commit a2835b0

Please sign in to comment.