Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added datadog metrics changes #842

Open
wants to merge 1 commit into
base: PD-257019-producer-async-fixes
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions queue/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,5 +135,11 @@
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-ssm</artifactId>
</dependency>
<dependency>
<groupId>com.datadoghq</groupId>
<artifactId>java-dogstatsd-client</artifactId>
<version>2.3</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import com.timgroup.statsd.StatsDClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -63,6 +64,7 @@ abstract class AbstractQueueService implements BaseQueueService {
public static final int MAX_MESSAGE_SIZE_IN_BYTES = 30 * 1024;
private final StepFunctionService stepFunctionService;
private final ParameterStoreUtil parameterStoreUtil;
private final StatsDClient _statsDClient;

// Cache for the isExperiment value with a TTL of 5 minutes
private final Cache<String, Boolean> experimentCache = CacheBuilder.newBuilder()
Expand All @@ -80,14 +82,15 @@ abstract class AbstractQueueService implements BaseQueueService {
protected AbstractQueueService(BaseEventStore eventStore, JobService jobService,
JobHandlerRegistry jobHandlerRegistry,
JobType<MoveQueueRequest, MoveQueueResult> moveQueueJobType,
Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) {
Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService, StatsDClient statsDClient) {
_eventStore = eventStore;
_jobService = jobService;
_moveQueueJobType = moveQueueJobType;
this.adminService = adminService;
this.producerService = producerService;
this.stepFunctionService = stepFunctionService;
this.parameterStoreUtil = new ParameterStoreUtil();
_statsDClient = statsDClient;


registerMoveQueueJobHandler(jobHandlerRegistry);
Expand Down Expand Up @@ -232,6 +235,7 @@ public void sendAll(Map<String, ? extends Collection<?>> messagesByQueue) {
validateMessage(message);
events.add(message.toString());
}
_statsDClient.recordGaugeValue("queue.messages.size", events.size(), "queue:" + queue);
builder.putAll(queue, events);
}

Expand Down Expand Up @@ -268,6 +272,7 @@ public void sendAll(String queue, Collection<?> messages, boolean fromKafka) {
"Message size (" + messageByteBuffer.limit() + ") is greater than the maximum allowed (" + MAX_MESSAGE_SIZE_IN_BYTES + ") message size");
events.add(messageByteBuffer);
}
_statsDClient.recordGaugeValue("queue.messages.size", events.size(), "queue:" + queue);
builder.putAll(queue, events);
Multimap<String, ByteBuffer> eventsByChannel = builder.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService;
import com.google.inject.Inject;
import com.timgroup.statsd.StatsDClient;

import java.time.Clock;

public class DefaultDedupQueueService extends AbstractQueueService implements DedupQueueService {
@Inject
public DefaultDedupQueueService(DedupEventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry,
Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) {
super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock,adminService,producerService,stepFunctionService );
Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService, StatsDClient statsDClient) {
super(eventStore, jobService, jobHandlerRegistry, MoveDedupQueueJob.INSTANCE, clock,adminService,producerService,stepFunctionService, statsDClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService;
import com.google.inject.Inject;
import com.timgroup.statsd.StatsDClient;

import java.time.Clock;

public class DefaultQueueService extends AbstractQueueService implements QueueService {
@Inject
public DefaultQueueService(EventStore eventStore, JobService jobService, JobHandlerRegistry jobHandlerRegistry,
Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService) {
super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock,adminService, producerService,stepFunctionService);
Clock clock, KafkaAdminService adminService, KafkaProducerService producerService, StepFunctionService stepFunctionService, StatsDClient statsDClient) {
super(eventStore, jobService, jobHandlerRegistry, MoveQueueJob.INSTANCE, clock,adminService, producerService,stepFunctionService, statsDClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.bazaarvoice.emodb.queue.core.kafka.KafkaAdminService;
import com.bazaarvoice.emodb.queue.core.kafka.KafkaProducerService;
import com.bazaarvoice.emodb.queue.core.stepfn.StepFunctionService;
import com.timgroup.statsd.StatsDClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -41,7 +42,7 @@ public void testSizeCache() {

BaseEventStore mockEventStore = mock(BaseEventStore.class);
AbstractQueueService queueService = new AbstractQueueService(mockEventStore, mock(JobService.class),
mock(JobHandlerRegistry.class), mock(JobType.class), clock, mock(KafkaAdminService.class), mock(KafkaProducerService.class), mock(StepFunctionService.class)){};
mock(JobHandlerRegistry.class), mock(JobType.class), clock, mock(KafkaAdminService.class), mock(KafkaProducerService.class), mock(StepFunctionService.class), mock(StatsDClient.class)){};

// At limit=500, size estimate should be at 4800
// At limit=50, size estimate should be at 5000
Expand Down