Skip to content

Commit

Permalink
Add option to specify max threads based on num cpus (#633)
Browse files Browse the repository at this point in the history
* Add option to specify max threads based on num cpus
  • Loading branch information
aprudhomme authored Mar 18, 2024
1 parent 978b7f6 commit 223f620
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 9 deletions.
29 changes: 29 additions & 0 deletions docs/server_configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,35 @@ Example server configuration
- Size of ReplicationServer threadpool executor
- numCPUs + 1

.. list-table:: `Alternative Max Threads Config <https://github.com/Yelp/nrtsearch/blob/master/src/main/java/com/yelp/nrtsearch/server/config/ThreadPoolConfiguration.java>`_ (``threadPoolConfiguration.max*Threads.*``)
:widths: 25 10 50 25
:header-rows: 1

* - Property
- Type
- Description
- Default

* - min
- int
- Minimum number of threads
- 1

* - max
- int
- Maximum number of threads
- INT_MAX

* - multiplier
- float
- Multiplier in threads formula: (numCPUs * multiplier) + offset
- 1.0

* - offset
- int
- Offset in threads formula: (numCPUs * multiplier) + offset
- 0

.. list-table:: `Warmer Configuration <https://github.com/Yelp/nrtsearch/blob/master/src/main/java/com/yelp/nrtsearch/server/luceneserver/warming/WarmerConfig.java>`_ (``warmer.*``)
:widths: 25 10 50 25
:header-rows: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
*/
package com.yelp.nrtsearch.server.config;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.yelp.nrtsearch.server.utils.JsonUtils;
import java.util.Map;

/** Configuration for various ThreadPool Settings used in nrtsearch */
public class ThreadPoolConfiguration {

Expand Down Expand Up @@ -61,14 +65,18 @@ public class ThreadPoolConfiguration {

public ThreadPoolConfiguration(YamlConfigReader configReader) {
maxSearchingThreads =
configReader.getInteger(
"threadPoolConfiguration.maxSearchingThreads", DEFAULT_MAX_SEARCHING_THREADS);
getNumThreads(
configReader,
"threadPoolConfiguration.maxSearchingThreads",
DEFAULT_MAX_SEARCHING_THREADS);
maxSearchBufferedItems =
configReader.getInteger(
"threadPoolConfiguration.maxSearchBufferedItems", DEFAULT_MAX_SEARCH_BUFFERED_ITEMS);
maxFetchThreads =
configReader.getInteger(
"threadPoolConfiguration.maxFetchThreads", DEFAULT_MAX_FILL_FIELDS_THREADS);
getNumThreads(
configReader,
"threadPoolConfiguration.maxFetchThreads",
DEFAULT_MAX_FILL_FIELDS_THREADS);
minParallelFetchNumFields =
configReader.getInteger(
"threadPoolConfiguration.minParallelFetchNumFields",
Expand All @@ -80,15 +88,18 @@ public ThreadPoolConfiguration(YamlConfigReader configReader) {
configReader.getBoolean("threadPoolConfiguration.parallelFetchByField", true);

maxIndexingThreads =
configReader.getInteger(
"threadPoolConfiguration.maxIndexingThreads", DEFAULT_MAX_INDEXING_THREADS);
getNumThreads(
configReader,
"threadPoolConfiguration.maxIndexingThreads",
DEFAULT_MAX_INDEXING_THREADS);
maxIndexingBufferedItems =
configReader.getInteger(
"threadPoolConfiguration.maxIndexingBufferedItems",
DEFAULT_MAX_INDEXING_BUFFERED_ITEMS);

maxGrpcLuceneserverThreads =
configReader.getInteger(
getNumThreads(
configReader,
"threadPoolConfiguration.maxGrpcLuceneserverThreads",
DEFAULT_MAX_GRPC_LUCENESERVER_THREADS);
maxGrpcLuceneserverBufferedItems =
Expand All @@ -97,7 +108,8 @@ public ThreadPoolConfiguration(YamlConfigReader configReader) {
DEFAULT_MAX_GRPC_LUCENESERVER_BUFFERED_ITEMS);

maxGrpcReplicationserverThreads =
configReader.getInteger(
getNumThreads(
configReader,
"threadPoolConfiguration.maxGrpcReplicationserverThreads",
DEFAULT_MAX_GRPC_REPLICATIONSERVER_THREADS);
maxGrpcReplicationserverBufferedItems =
Expand All @@ -106,6 +118,59 @@ public ThreadPoolConfiguration(YamlConfigReader configReader) {
DEFAULT_MAX_GRPC_REPLICATIONSERVER_BUFFERED_ITEMS);
}

@JsonIgnoreProperties(ignoreUnknown = true)
static class ThreadsConfig {
private int min = 1;
private int max = Integer.MAX_VALUE;
private int offset = 0;
private float multiplier = 1.0f;

public void setMin(int min) {
if (min <= 0) {
throw new IllegalArgumentException("min must be >= 1");
}
this.min = min;
}

public void setMax(int max) {
if (max <= 0) {
throw new IllegalArgumentException("max must be >= 1");
}
this.max = max;
}

public void setOffset(int offset) {
this.offset = offset;
}

public void setMultiplier(float multiplier) {
this.multiplier = multiplier;
}

public int computeNumThreads() {
int threads = (int) ((Runtime.getRuntime().availableProcessors() * multiplier) + offset);
threads = Math.min(threads, max);
threads = Math.max(threads, min);
return threads;
}
}

static int getNumThreads(YamlConfigReader configReader, String key, int defaultValue) {
return configReader.get(
key,
obj -> {
if (obj instanceof Number) {
return ((Number) obj).intValue();
} else if (obj instanceof Map) {
return JsonUtils.convertValue(obj, ThreadsConfig.class).computeNumThreads();
} else {
throw new IllegalArgumentException(
"Invalid thread pool config type: " + obj.getClass() + ", key: " + key);
}
},
defaultValue);
}

public int getMaxSearchingThreads() {
return maxSearchingThreads;
}
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/yelp/nrtsearch/server/utils/JsonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,16 @@ private JsonUtils() {}
public static String objectToJsonStr(Object obj) throws IOException {
return OBJECT_MAPPER.writeValueAsString(obj);
}

/**
* Convert the given java Object into a class instance.
*
* @param fromValue source object
* @param toValueType created class type
* @return instance of desired class
* @param <T> class type
*/
public static <T> T convertValue(Object fromValue, Class<T> toValueType) {
return OBJECT_MAPPER.convertValue(fromValue, toValueType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,24 @@
*/
package com.yelp.nrtsearch.server.config;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;

import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.nio.file.Paths;
import org.junit.Test;

public class ThreadPoolConfigurationTest {

private YamlConfigReader getReaderForConfig(String config) {
return new YamlConfigReader(new ByteArrayInputStream(config.getBytes()));
}

private int cpus() {
return Runtime.getRuntime().availableProcessors();
}

@Test
public void testConfiguration() throws FileNotFoundException {
String config =
Expand All @@ -37,4 +46,122 @@ public void testConfiguration() throws FileNotFoundException {
assertEquals(
luceneServerConfiguration.getThreadPoolConfiguration().getMaxSearchBufferedItems(), 100);
}

@Test
public void testGetNumThreads_default() {
String config = "other_key: other_value";
int numThreads =
ThreadPoolConfiguration.getNumThreads(getReaderForConfig(config), "threadTest", 100);
assertEquals(100, numThreads);
}

@Test
public void testGetNumThreads_number() {
String config = "threadTest: 7";
int numThreads =
ThreadPoolConfiguration.getNumThreads(getReaderForConfig(config), "threadTest", 100);
assertEquals(7, numThreads);
}

@Test
public void testGetNumThreads_cpuDefault() {
String config = "threadTest: {}";
int numThreads =
ThreadPoolConfiguration.getNumThreads(getReaderForConfig(config), "threadTest", 100);
assertEquals(cpus(), numThreads);
}

@Test
public void testGetNumThreads_cpuMultiplier() {
String config = String.join("\n", "threadTest:", " multiplier: 2.5");
int numThreads =
ThreadPoolConfiguration.getNumThreads(getReaderForConfig(config), "threadTest", 100);
int expectedNumThreads = (int) (cpus() * 2.5f);
assertEquals(expectedNumThreads, numThreads);
}

@Test
public void testGetNumThreads_cpuOffset() {
String config = String.join("\n", "threadTest:", " offset: 2");
int numThreads =
ThreadPoolConfiguration.getNumThreads(getReaderForConfig(config), "threadTest", 100);
int expectedNumThreads = cpus() + 2;
assertEquals(expectedNumThreads, numThreads);
}

@Test
public void testGetNumThreads_cpuMultiplierAndOffset() {
String config = String.join("\n", "threadTest:", " offset: 2", " multiplier: 2.5");
int numThreads =
ThreadPoolConfiguration.getNumThreads(getReaderForConfig(config), "threadTest", 100);
int expectedNumThreads = (int) (cpus() * 2.5f) + 2;
assertEquals(expectedNumThreads, numThreads);
}

@Test
public void testGetNumThreads_cpuMinDefault() {
String config = String.join("\n", "threadTest:", " multiplier: 0");
int numThreads =
ThreadPoolConfiguration.getNumThreads(getReaderForConfig(config), "threadTest", 100);
assertEquals(1, numThreads);
}

@Test
public void testGetNumThreads_cpuMinSet() {
String config = String.join("\n", "threadTest:", " multiplier: 0", " min: 10");
int numThreads =
ThreadPoolConfiguration.getNumThreads(getReaderForConfig(config), "threadTest", 100);
assertEquals(10, numThreads);
}

@Test
public void testGetNumThreads_cpuMinInvalid() {
String config = String.join("\n", "threadTest:", " multiplier: 0", " min: 0");
try {
ThreadPoolConfiguration.getNumThreads(getReaderForConfig(config), "threadTest", 100);
fail();
} catch (Exception e) {
assertTrue(e.getMessage().contains("min must be >= 1"));
}
}

@Test
public void testGetNumThreads_cpuMaxDefault() {
String config = String.join("\n", "threadTest:", " multiplier: 0", " offset: 2147483647");
int numThreads =
ThreadPoolConfiguration.getNumThreads(getReaderForConfig(config), "threadTest", 100);
assertEquals(2147483647, numThreads);
}

@Test
public void testGetNumThreads_cpuMaxSet() {
String config = String.join("\n", "threadTest:", " multiplier: 100", " max: 10");
int numThreads =
ThreadPoolConfiguration.getNumThreads(getReaderForConfig(config), "threadTest", 100);
assertEquals(10, numThreads);
}

@Test
public void testGetNumThreads_cpuMaxInvalid() {
String config = String.join("\n", "threadTest:", " max: 0");
try {
ThreadPoolConfiguration.getNumThreads(getReaderForConfig(config), "threadTest", 100);
fail();
} catch (Exception e) {
assertTrue(e.getMessage().contains("max must be >= 1"));
}
}

@Test
public void testGetNumThreads_cpuInvalidType() {
String config = "threadTest: []";
try {
ThreadPoolConfiguration.getNumThreads(getReaderForConfig(config), "threadTest", 100);
fail();
} catch (IllegalArgumentException e) {
assertEquals(
"Invalid thread pool config type: class java.util.ArrayList, key: threadTest",
e.getMessage());
}
}
}

0 comments on commit 223f620

Please sign in to comment.