diff --git a/dropwizard/service/src/main/java/org/apache/polaris/service/dropwizard/config/PolarisApplicationConfig.java b/dropwizard/service/src/main/java/org/apache/polaris/service/dropwizard/config/PolarisApplicationConfig.java index dd9b8d0aa..7170ff58c 100644 --- a/dropwizard/service/src/main/java/org/apache/polaris/service/dropwizard/config/PolarisApplicationConfig.java +++ b/dropwizard/service/src/main/java/org/apache/polaris/service/dropwizard/config/PolarisApplicationConfig.java @@ -52,6 +52,7 @@ import org.apache.polaris.service.context.CallContextResolver; import org.apache.polaris.service.context.RealmContextResolver; import org.apache.polaris.service.ratelimiter.RateLimiter; +import org.apache.polaris.service.ratelimiter.TokenBucketFactory; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; import org.apache.polaris.service.types.TokenType; import org.glassfish.hk2.api.Factory; @@ -90,6 +91,7 @@ public class PolarisApplicationConfig extends Configuration { private String awsSecretKey; private FileIOFactory fileIOFactory; private RateLimiter rateLimiter; + private TokenBucketFactory tokenBucketFactory; private TokenBrokerFactory tokenBrokerFactory; private AccessToken gcpAccessToken; @@ -144,6 +146,9 @@ protected void configure() { bindFactory(SupplierFactory.create(serviceLocator, config::getRateLimiter)) .to(RateLimiter.class) .ranked(OVERRIDE_BINDING_RANK); + bindFactory(SupplierFactory.create(serviceLocator, config::getTokenBucketFactory)) + .to(TokenBucketFactory.class) + .ranked(OVERRIDE_BINDING_RANK); } }; } @@ -332,6 +337,17 @@ public void setRateLimiter(@Nullable RateLimiter rateLimiter) { this.rateLimiter = rateLimiter; } + @JsonProperty("tokenBucketFactory") + private TokenBucketFactory getTokenBucketFactory() { + return tokenBucketFactory; + } + + @JsonProperty("tokenBucketFactory") + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") + public void setTokenBucketFactory(@Nullable TokenBucketFactory tokenBucketFactory) { + this.tokenBucketFactory = tokenBucketFactory; + } + public void setTaskHandler(TaskHandlerConfiguration taskHandler) { this.taskHandler = taskHandler; } diff --git a/dropwizard/service/src/main/resources/META-INF/hk2-locator/default b/dropwizard/service/src/main/resources/META-INF/hk2-locator/default index 5c9359851..1771a6ad8 100644 --- a/dropwizard/service/src/main/resources/META-INF/hk2-locator/default +++ b/dropwizard/service/src/main/resources/META-INF/hk2-locator/default @@ -81,13 +81,13 @@ contract={org.apache.polaris.service.ratelimiter.RateLimiter} name=no-op qualifier={io.smallrye.common.annotation.Identifier} -[org.apache.polaris.service.ratelimiter.TokenBucketRateLimiter]S -contract={org.apache.polaris.service.ratelimiter.RateLimiter} -name=token-bucket -qualifier={io.smallrye.common.annotation.Identifier} - [org.apache.polaris.service.ratelimiter.RealmTokenBucketRateLimiter]S contract={org.apache.polaris.service.ratelimiter.RateLimiter} name=realm-token-bucket qualifier={io.smallrye.common.annotation.Identifier} +[org.apache.polaris.service.ratelimiter.DefaultTokenBucketFactory]S +contract={org.apache.polaris.service.ratelimiter.TokenBucketFactory} +name=default +qualifier={io.smallrye.common.annotation.Identifier} + diff --git a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/MockRealmTokenBucketRateLimiter.java b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/MockTokenBucketFactory.java similarity index 68% rename from dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/MockRealmTokenBucketRateLimiter.java rename to dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/MockTokenBucketFactory.java index 6bd4ad5c6..6516e7c79 100644 --- a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/MockRealmTokenBucketRateLimiter.java +++ b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/MockTokenBucketFactory.java @@ -21,26 +21,20 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.smallrye.common.annotation.Identifier; -import java.time.Clock; import java.time.Instant; import java.time.ZoneOffset; -import org.apache.polaris.service.ratelimiter.RealmTokenBucketRateLimiter; +import org.apache.polaris.service.ratelimiter.DefaultTokenBucketFactory; import org.threeten.extra.MutableClock; -/** RealmTokenBucketRateLimiter with a mock clock */ -@Identifier("mock-realm-token-bucket") -public class MockRealmTokenBucketRateLimiter extends RealmTokenBucketRateLimiter { +/** TokenBucketFactory with a mock clock */ +@Identifier("mock") +public class MockTokenBucketFactory extends DefaultTokenBucketFactory { public static MutableClock CLOCK = MutableClock.of(Instant.now(), ZoneOffset.UTC); @JsonCreator - public MockRealmTokenBucketRateLimiter( - @JsonProperty("requestsPerSecond") final long requestsPerSecond, - @JsonProperty("windowSeconds") final long windowSeconds) { - super(requestsPerSecond, windowSeconds); - } - - @Override - protected Clock getClock() { - return CLOCK; + public MockTokenBucketFactory( + @JsonProperty("requestsPerSecond") long requestsPerSecond, + @JsonProperty("windowSeconds") long windowSeconds) { + super(requestsPerSecond, windowSeconds, CLOCK); } } diff --git a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RateLimiterFilterTest.java b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RateLimiterFilterTest.java index 4689bdee7..a2dc8370a 100644 --- a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RateLimiterFilterTest.java +++ b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RateLimiterFilterTest.java @@ -65,14 +65,15 @@ public class RateLimiterFilterTest { "server.applicationConnectors[0].port", "0"), // Bind to random port to support parallelism ConfigOverride.config("server.adminConnectors[0].port", "0"), - ConfigOverride.config("rateLimiter.type", "mock-realm-token-bucket"), + ConfigOverride.config("tokenBucketFactory.type", "mock"), ConfigOverride.config( - "rateLimiter.requestsPerSecond", String.valueOf(REQUESTS_PER_SECOND)), - ConfigOverride.config("rateLimiter.windowSeconds", String.valueOf(WINDOW_SECONDS))); + "tokenBucketFactory.requestsPerSecond", String.valueOf(REQUESTS_PER_SECOND)), + ConfigOverride.config( + "tokenBucketFactory.windowSeconds", String.valueOf(WINDOW_SECONDS))); private static String userToken; private static String realm; - private static MutableClock clock = MockRealmTokenBucketRateLimiter.CLOCK; + private static MutableClock clock = MockTokenBucketFactory.CLOCK; @BeforeAll public static void setup( diff --git a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RealmTokenBucketRateLimiterTest.java b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RealmTokenBucketRateLimiterTest.java index 9f0aad5c4..efeee130c 100644 --- a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RealmTokenBucketRateLimiterTest.java +++ b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RealmTokenBucketRateLimiterTest.java @@ -18,41 +18,49 @@ */ package org.apache.polaris.service.dropwizard.ratelimiter; +import static org.apache.polaris.service.dropwizard.ratelimiter.MockTokenBucketFactory.CLOCK; + import java.time.Duration; import org.apache.polaris.core.context.CallContext; -import org.apache.polaris.service.ratelimiter.RateLimiter; +import org.apache.polaris.service.ratelimiter.DefaultTokenBucketFactory; +import org.apache.polaris.service.ratelimiter.RealmTokenBucketRateLimiter; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.threeten.extra.MutableClock; /** Main unit test class for TokenBucketRateLimiter */ public class RealmTokenBucketRateLimiterTest { @Test void testDifferentBucketsDontTouch() { - RateLimiter rateLimiter = new MockRealmTokenBucketRateLimiter(10, 10); - RateLimitResultAsserter asserter = new RateLimitResultAsserter(rateLimiter); - MutableClock clock = MockRealmTokenBucketRateLimiter.CLOCK; + RealmTokenBucketRateLimiter rateLimiter = new RealmTokenBucketRateLimiter(); + rateLimiter.setTokenBucketFactory(new DefaultTokenBucketFactory(10, 10, CLOCK)); for (int i = 0; i < 202; i++) { String realm = (i % 2 == 0) ? "realm1" : "realm2"; CallContext.setCurrentContext(CallContext.of(() -> realm, null)); if (i < 200) { - asserter.canAcquire(1); + Assertions.assertTrue(rateLimiter.canProceed()); } else { - asserter.cantAcquire(); + assertCannotProceed(rateLimiter); } } - clock.add(Duration.ofSeconds(1)); + CLOCK.add(Duration.ofSeconds(1)); for (int i = 0; i < 22; i++) { String realm = (i % 2 == 0) ? "realm1" : "realm2"; CallContext.setCurrentContext(CallContext.of(() -> realm, null)); if (i < 20) { - asserter.canAcquire(1); + Assertions.assertTrue(rateLimiter.canProceed()); } else { - asserter.cantAcquire(); + assertCannotProceed(rateLimiter); } } } + + private void assertCannotProceed(RealmTokenBucketRateLimiter rateLimiter) { + for (int i = 0; i < 5; i++) { + Assertions.assertFalse(rateLimiter.canProceed()); + } + } } diff --git a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/TokenBucketRateLimiterTest.java b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/TokenBucketRateLimiterTest.java index 454e9706c..472b79aca 100644 --- a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/TokenBucketRateLimiterTest.java +++ b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/TokenBucketRateLimiterTest.java @@ -26,7 +26,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.polaris.service.ratelimiter.TokenBucketRateLimiter; +import org.apache.polaris.service.ratelimiter.TokenBucket; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.threeten.extra.MutableClock; @@ -38,19 +38,18 @@ void testBasic() { MutableClock clock = MutableClock.of(Instant.now(), ZoneOffset.UTC); clock.add(Duration.ofSeconds(5)); - RateLimitResultAsserter asserter = - new RateLimitResultAsserter(new TokenBucketRateLimiter(10, 100, clock)); + TokenBucket tokenBucket = new TokenBucket(10, 100, clock); - asserter.canAcquire(100); - asserter.cantAcquire(); + assertCanAcquire(tokenBucket, 100); + assertCannotAcquire(tokenBucket); clock.add(Duration.ofSeconds(1)); - asserter.canAcquire(10); - asserter.cantAcquire(); + assertCanAcquire(tokenBucket, 10); + assertCannotAcquire(tokenBucket); clock.add(Duration.ofSeconds(10)); - asserter.canAcquire(100); - asserter.cantAcquire(); + assertCanAcquire(tokenBucket, 100); + assertCannotAcquire(tokenBucket); } /** @@ -63,9 +62,8 @@ void testConcurrent() throws InterruptedException { int numTasks = 50000; int tokensPerSecond = 10; // Can be anything above 0 - TokenBucketRateLimiter rl = - new TokenBucketRateLimiter( - tokensPerSecond, maxTokens, Clock.fixed(Instant.now(), ZoneOffset.UTC)); + TokenBucket rl = + new TokenBucket(tokensPerSecond, maxTokens, Clock.fixed(Instant.now(), ZoneOffset.UTC)); AtomicInteger numAcquired = new AtomicInteger(); CountDownLatch startLatch = new CountDownLatch(numTasks); CountDownLatch endLatch = new CountDownLatch(numTasks); @@ -95,4 +93,16 @@ void testConcurrent() throws InterruptedException { endLatch.await(); Assertions.assertEquals(maxTokens, numAcquired.get()); } + + private void assertCanAcquire(TokenBucket tokenBucket, int times) { + for (int i = 0; i < times; i++) { + Assertions.assertTrue(tokenBucket.tryAcquire()); + } + } + + private void assertCannotAcquire(TokenBucket tokenBucket) { + for (int i = 0; i < 5; i++) { + Assertions.assertFalse(tokenBucket.tryAcquire()); + } + } } diff --git a/dropwizard/service/src/test/resources/META-INF/hk2-locator/default b/dropwizard/service/src/test/resources/META-INF/hk2-locator/default index 07bd4da03..92b32e5af 100644 --- a/dropwizard/service/src/test/resources/META-INF/hk2-locator/default +++ b/dropwizard/service/src/test/resources/META-INF/hk2-locator/default @@ -21,7 +21,7 @@ contract={org.apache.polaris.service.catalog.io.FileIOFactory} name=test qualifier={io.smallrye.common.annotation.Identifier} -[org.apache.polaris.service.dropwizard.ratelimiter.MockRealmTokenBucketRateLimiter]S -contract={org.apache.polaris.service.ratelimiter.RateLimiter} -name=mock-realm-token-bucket +[org.apache.polaris.service.dropwizard.ratelimiter.MockTokenBucketFactory]S +contract={org.apache.polaris.service.ratelimiter.TokenBucketFactory} +name=mock qualifier={io.smallrye.common.annotation.Identifier} diff --git a/dropwizard/service/src/test/resources/polaris-server-integrationtest.yml b/dropwizard/service/src/test/resources/polaris-server-integrationtest.yml index 76db1ec1c..9d8f770e5 100644 --- a/dropwizard/service/src/test/resources/polaris-server-integrationtest.yml +++ b/dropwizard/service/src/test/resources/polaris-server-integrationtest.yml @@ -153,8 +153,12 @@ logging: # Limits the size of request bodies sent to Polaris. -1 means no limit. maxRequestBodyBytes: 1000000 -# Limits the request rate per realm +# Limits the request rate per realm. rateLimiter: type: realm-token-bucket + +# The token bucket factory to use when using the realm-token-bucket rate limiter. +tokenBucketFactory: + type: default requestsPerSecond: 9999 windowSeconds: 10 diff --git a/polaris-server.yml b/polaris-server.yml index e88307bf6..0bc9f785f 100644 --- a/polaris-server.yml +++ b/polaris-server.yml @@ -172,3 +172,11 @@ maxRequestBodyBytes: -1 # Optional, not specifying a "rateLimiter" section also means no rate limiter rateLimiter: type: no-op + # Uncomment to use the realm-token-bucket rate limiter + # type: realm-token-bucket + +# The token bucket factory to use when using the realm-token-bucket rate limiter. +tokenBucketFactory: + type: default + requestsPerSecond: 9999 + windowSeconds: 10 diff --git a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/DefaultTokenBucketFactory.java b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/DefaultTokenBucketFactory.java new file mode 100644 index 000000000..8393e8f00 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/DefaultTokenBucketFactory.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.ratelimiter; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.smallrye.common.annotation.Identifier; +import java.time.Clock; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.polaris.core.context.RealmContext; + +@Identifier("default") +public class DefaultTokenBucketFactory implements TokenBucketFactory { + + private final long requestsPerSecond; + private final long windowSeconds; + private final Clock clock; + private final Map perRealmBuckets = new ConcurrentHashMap<>(); + + @JsonCreator + public DefaultTokenBucketFactory( + @JsonProperty("requestsPerSecond") long requestsPerSecond, + @JsonProperty("windowSeconds") long windowSeconds) { + this(requestsPerSecond, windowSeconds, Clock.systemUTC()); + } + + public DefaultTokenBucketFactory(long requestsPerSecond, long windowSeconds, Clock clock) { + this.requestsPerSecond = requestsPerSecond; + this.windowSeconds = windowSeconds; + this.clock = clock; + } + + @Override + public TokenBucket getOrCreateTokenBucket(RealmContext realmContext) { + String realmId = realmContext.getRealmIdentifier(); + return perRealmBuckets.computeIfAbsent( + realmId, + k -> + new TokenBucket( + requestsPerSecond, Math.multiplyExact(requestsPerSecond, windowSeconds), clock)); + } +} diff --git a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/NoOpRateLimiter.java b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/NoOpRateLimiter.java index 7323c46b3..b20c01693 100644 --- a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/NoOpRateLimiter.java +++ b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/NoOpRateLimiter.java @@ -24,7 +24,7 @@ @Identifier("no-op") public class NoOpRateLimiter implements RateLimiter { @Override - public boolean tryAcquire() { + public boolean canProceed() { return true; } } diff --git a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiter.java b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiter.java index be2017d32..da84fcf01 100644 --- a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiter.java +++ b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiter.java @@ -26,5 +26,5 @@ public interface RateLimiter { * * @return Whether the request is allowed to proceed by the rate limiter */ - boolean tryAcquire(); + boolean canProceed(); } diff --git a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java index c5ad957bf..d72def5fa 100644 --- a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java +++ b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java @@ -42,7 +42,7 @@ public RateLimiterFilter(RateLimiter rateLimiter) { /** Returns a 429 if the rate limiter says so. Otherwise, forwards the request along. */ @Override public void filter(ContainerRequestContext ctx) throws IOException { - if (!rateLimiter.tryAcquire()) { + if (!rateLimiter.canProceed()) { ctx.abortWith(Response.status(Response.Status.TOO_MANY_REQUESTS).build()); LOGGER.atDebug().log("Rate limiting request"); } diff --git a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiter.java b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiter.java index 85ea54e22..0d1deb88f 100644 --- a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiter.java +++ b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RealmTokenBucketRateLimiter.java @@ -18,36 +18,19 @@ */ package org.apache.polaris.service.ratelimiter; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; import io.smallrye.common.annotation.Identifier; -import java.time.Clock; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; +import jakarta.inject.Inject; import org.apache.polaris.core.context.CallContext; -import org.apache.polaris.core.context.RealmContext; /** - * Rate limiter that maps the request's realm identifier to its own TokenBucketRateLimiter, with its - * own capacity. + * Rate limiter that maps the request's realm identifier to its own TokenBucket, with its own + * capacity. */ @Identifier("realm-token-bucket") public class RealmTokenBucketRateLimiter implements RateLimiter { - private final long requestsPerSecond; - private final long windowSeconds; - private final Map perRealmLimiters; - @VisibleForTesting - @JsonCreator - public RealmTokenBucketRateLimiter( - @JsonProperty("requestsPerSecond") final long requestsPerSecond, - @JsonProperty("windowSeconds") final long windowSeconds) { - this.requestsPerSecond = requestsPerSecond; - this.windowSeconds = windowSeconds; - this.perRealmLimiters = new ConcurrentHashMap<>(); - } + @Inject protected TokenBucketFactory tokenBucketFactory; /** * This signifies that a request is being made. That is, the rate limiter should count the request @@ -56,26 +39,14 @@ public RealmTokenBucketRateLimiter( * @return Whether the request is allowed to proceed by the rate limiter */ @Override - public boolean tryAcquire() { - String key = - Optional.ofNullable(CallContext.getCurrentContext()) - .map(CallContext::getRealmContext) - .map(RealmContext::getRealmIdentifier) - .orElse(""); - - return perRealmLimiters - .computeIfAbsent( - key, - (k) -> - new TokenBucketRateLimiter( - requestsPerSecond, - Math.multiplyExact(requestsPerSecond, windowSeconds), - getClock())) + public boolean canProceed() { + return tokenBucketFactory + .getOrCreateTokenBucket(CallContext.getCurrentContext().getRealmContext()) .tryAcquire(); } @VisibleForTesting - protected Clock getClock() { - return Clock.systemUTC(); + public void setTokenBucketFactory(TokenBucketFactory tokenBucketFactory) { + this.tokenBucketFactory = tokenBucketFactory; } } diff --git a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucketRateLimiter.java b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucket.java similarity index 82% rename from service/common/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucketRateLimiter.java rename to service/common/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucket.java index 2b3adb618..d610a6a01 100644 --- a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucketRateLimiter.java +++ b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucket.java @@ -18,15 +18,13 @@ */ package org.apache.polaris.service.ratelimiter; -import io.smallrye.common.annotation.Identifier; import java.time.InstantSource; /** - * Token bucket implementation of a Polaris RateLimiter. Acquires tokens at a fixed rate and has a - * maximum amount of tokens. Each successful "tryAcquire" costs 1 token. + * General-purpose Token bucket implementation. Acquires tokens at a fixed rate and has a maximum + * amount of tokens. Each successful "tryAcquire" costs 1 token. */ -@Identifier("token-bucket") -public class TokenBucketRateLimiter implements RateLimiter { +public class TokenBucket { private final double tokensPerMilli; private final long maxTokens; private final InstantSource instantSource; @@ -34,7 +32,7 @@ public class TokenBucketRateLimiter implements RateLimiter { private double tokens; private long lastTokenGenerationMillis; - public TokenBucketRateLimiter(long tokensPerSecond, long maxTokens, InstantSource instantSource) { + public TokenBucket(long tokensPerSecond, long maxTokens, InstantSource instantSource) { this.tokensPerMilli = tokensPerSecond / 1000D; this.maxTokens = maxTokens; this.instantSource = instantSource; @@ -48,7 +46,6 @@ public TokenBucketRateLimiter(long tokensPerSecond, long maxTokens, InstantSourc * * @return whether a token was successfully acquired and spent */ - @Override public synchronized boolean tryAcquire() { // Grant tokens for the time that has passed since our last tryAcquire() long t = instantSource.millis(); diff --git a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RateLimitResultAsserter.java b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucketFactory.java similarity index 53% rename from dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RateLimitResultAsserter.java rename to service/common/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucketFactory.java index e15ecbec9..4ea382ba2 100644 --- a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/ratelimiter/RateLimitResultAsserter.java +++ b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/TokenBucketFactory.java @@ -16,28 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.polaris.service.dropwizard.ratelimiter; +package org.apache.polaris.service.ratelimiter; -import org.apache.polaris.service.ratelimiter.RateLimiter; -import org.junit.jupiter.api.Assertions; +import org.apache.polaris.core.context.RealmContext; -/** Utility class for testing rate limiters. Lets you easily assert the result of tryAcquire(). */ -public class RateLimitResultAsserter { - private final RateLimiter rateLimiter; +/** Factory for creating token buckets per realm. */ +public interface TokenBucketFactory { - public RateLimitResultAsserter(RateLimiter rateLimiter) { - this.rateLimiter = rateLimiter; - } - - public void canAcquire(int times) { - for (int i = 0; i < times; i++) { - Assertions.assertTrue(rateLimiter.tryAcquire()); - } - } - - public void cantAcquire() { - for (int i = 0; i < 5; i++) { - Assertions.assertFalse(rateLimiter.tryAcquire()); - } - } + TokenBucket getOrCreateTokenBucket(RealmContext realmContext); }