From 220801b652633265d93693afb8d85688540e0e4d Mon Sep 17 00:00:00 2001 From: Ajinkya Date: Fri, 10 Jan 2025 14:53:24 +0530 Subject: [PATCH] added threat detection backend service --- .github/workflows/prod.yml | 34 ++- .github/workflows/staging.yml | 19 +- apps/threat-detection-backend/.gitignore | 1 + apps/threat-detection-backend/Dockerfile | 4 + apps/threat-detection-backend/pom.xml | 210 ++++++++++++++++ .../akto/threat/backend/BackendVerticle.java | 81 ++++++ .../java/com/akto/threat/backend/Main.java | 88 +++++++ .../threat/backend/client/IPLookupClient.java | 26 ++ .../threat/backend/constants/KafkaTopic.java | 7 + .../backend/constants/MongoDBCollection.java | 9 + .../AggregateSampleMaliciousEventModel.java | 138 +++++++++++ .../backend/db/MaliciousEventModel.java | 231 ++++++++++++++++++ .../AuthenticationInterceptor.java | 69 ++++++ .../backend/interceptors/Constants.java | 16 ++ .../akto/threat/backend/router/ARouter.java | 9 + .../backend/router/DashboardRouter.java | 138 +++++++++++ .../backend/router/ThreatDetectionRouter.java | 43 ++++ .../service/MaliciousEventService.java | 200 +++++++++++++++ .../backend/service/ThreatActorService.java | 140 +++++++++++ .../backend/service/ThreatApiService.java | 152 ++++++++++++ .../backend/tasks/FlushMessagesToDB.java | 134 ++++++++++ .../akto/threat/backend/utils/KafkaUtils.java | 19 ++ libs/utils/pom.xml | 2 +- .../src/main/java/com/akto/kafka/Kafka.java | 127 ++++++---- .../main/java/com/akto/kafka/KafkaConfig.java | 94 +++++++ .../com/akto/kafka/KafkaConsumerConfig.java | 44 ++++ .../com/akto/kafka/KafkaProducerConfig.java | 44 ++++ .../main/java/com/akto/kafka/Serializer.java | 26 ++ .../dashboard_service/v1/service.proto | 33 ++- 29 files changed, 2067 insertions(+), 71 deletions(-) create mode 100644 apps/threat-detection-backend/.gitignore create mode 100644 apps/threat-detection-backend/Dockerfile create mode 100644 apps/threat-detection-backend/pom.xml create mode 100644 apps/threat-detection-backend/src/main/java/com/akto/threat/backend/BackendVerticle.java create mode 100644 apps/threat-detection-backend/src/main/java/com/akto/threat/backend/Main.java create mode 100644 apps/threat-detection-backend/src/main/java/com/akto/threat/backend/client/IPLookupClient.java create mode 100644 apps/threat-detection-backend/src/main/java/com/akto/threat/backend/constants/KafkaTopic.java create mode 100644 apps/threat-detection-backend/src/main/java/com/akto/threat/backend/constants/MongoDBCollection.java create mode 100644 apps/threat-detection-backend/src/main/java/com/akto/threat/backend/db/AggregateSampleMaliciousEventModel.java create mode 100644 apps/threat-detection-backend/src/main/java/com/akto/threat/backend/db/MaliciousEventModel.java create mode 100644 apps/threat-detection-backend/src/main/java/com/akto/threat/backend/interceptors/AuthenticationInterceptor.java create mode 100644 apps/threat-detection-backend/src/main/java/com/akto/threat/backend/interceptors/Constants.java create mode 100644 apps/threat-detection-backend/src/main/java/com/akto/threat/backend/router/ARouter.java create mode 100644 apps/threat-detection-backend/src/main/java/com/akto/threat/backend/router/DashboardRouter.java create mode 100644 apps/threat-detection-backend/src/main/java/com/akto/threat/backend/router/ThreatDetectionRouter.java create mode 100644 apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/MaliciousEventService.java create mode 100644 apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatActorService.java create mode 100644 apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatApiService.java create mode 100644 apps/threat-detection-backend/src/main/java/com/akto/threat/backend/tasks/FlushMessagesToDB.java create mode 100644 apps/threat-detection-backend/src/main/java/com/akto/threat/backend/utils/KafkaUtils.java create mode 100644 libs/utils/src/main/java/com/akto/kafka/KafkaConfig.java create mode 100644 libs/utils/src/main/java/com/akto/kafka/KafkaConsumerConfig.java create mode 100644 libs/utils/src/main/java/com/akto/kafka/KafkaProducerConfig.java create mode 100644 libs/utils/src/main/java/com/akto/kafka/Serializer.java diff --git a/.github/workflows/prod.yml b/.github/workflows/prod.yml index c803f18f68..9923e5b91e 100644 --- a/.github/workflows/prod.yml +++ b/.github/workflows/prod.yml @@ -23,10 +23,15 @@ on: type: boolean default: true description: Internal - protection: + threat_detection: type: boolean default: true - description: Protection + description: Threat Detection Client + + threat_detection_backend: + type: boolean + default: true + description: Threat Deteection Backend # A workflow run is made up of one or more jobs that can run sequentially or in parallel jobs: @@ -37,12 +42,12 @@ jobs: # Steps represent a sequence of tasks that will be executed as part of the job steps: - # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it + # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it - uses: actions/checkout@v2 - uses: actions/setup-java@v2 with: - distribution: 'adopt' - java-version: '8' + distribution: "adopt" + java-version: "8" architecture: x64 - uses: actions/setup-node@v2 with: @@ -63,6 +68,14 @@ jobs: wget -O filetypes.json https://raw.githubusercontent.com/akto-api-security/akto/master/pii-types/filetypes.json wget -O automated_api_groups.csv https://raw.githubusercontent.com/akto-api-security/akto/master/automated-api-groups/automated-api-groups.csv + + - name: Create maxmind directory + run: mkdir -p ./apps/threat-detection-backend/src/main/resources/maxmind + - name: Download Maxmind Country database + working-directory: ./apps/threat-detection-backend/src/main/resources/maxmind + run: | + wget -O Geo-Country.mmdb https://raw.githubusercontent.com/akto-api-security/tests-library/refs/heads/master/resources/Geo-Country.mmdb + - name: Prepare Dashboard polaris UI working-directory: ./apps/dashboard/web/polaris_web run: npm install && export RELEASE_VERSION=${{github.event.inputs.release_version}} && npm run build @@ -125,6 +138,15 @@ jobs: echo "::set-output name=image::$ECR_REGISTRY/akto-internal:$IMAGE_TAG" fi + if [[ "${{ github.event.inputs.threat_detection}}" == "true" ]]; then + cd apps/threat-detection + docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/akto-threat-detection:$IMAGE_TAG . --push + fi + + if [[ "${{ github.event.inputs.threat_detection_backend}}" == "true" ]]; then + cd ../threat-detection-backend + docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/akto-threat-detection-backend:$IMAGE_TAG . --push + fi - name: Configure AWS Credentials for ECR uses: aws-actions/configure-aws-credentials@v1 with: @@ -135,7 +157,7 @@ jobs: id: login-ecr uses: aws-actions/amazon-ecr-login@v1 with: - mask-password: 'true' + mask-password: "true" registry-type: public - name: Push git tag diff --git a/.github/workflows/staging.yml b/.github/workflows/staging.yml index 9ec00fc6fd..50f383996a 100644 --- a/.github/workflows/staging.yml +++ b/.github/workflows/staging.yml @@ -43,9 +43,13 @@ jobs: wget -O general.json https://raw.githubusercontent.com/akto-api-security/pii-types/master/general.json wget -O fintech.json https://raw.githubusercontent.com/akto-api-security/akto/master/pii-types/fintech.json wget -O filetypes.json https://raw.githubusercontent.com/akto-api-security/akto/master/pii-types/filetypes.json - - name: Prepare Dashboard polaris UI - working-directory: ./apps/dashboard/web/polaris_web - run: npm install && export RELEASE_VERSION=${{steps.docker_tag.outputs.IMAGE_TAG}} && npm run build + - name: Create maxmind directory + run: mkdir -p ./apps/threat-detection-backend/src/main/resources/maxmind + - name: Download Maxmind Country database + working-directory: ./apps/threat-detection-backend/src/main/resources/maxmind + run: | + wget -O Geo-Country.mmdb https://raw.githubusercontent.com/akto-api-security/tests-library/refs/heads/master/resources/Geo-Country.mmdb + - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v1 with: @@ -53,10 +57,7 @@ jobs: aws-secret-access-key: ${{secrets.AWS_SECRET_ACCESS_KEY}} aws-region: ap-south-1 - - name: Deploy polaris site to S3 bucket - run: aws s3 sync ./apps/dashboard/web/polaris_web/web/dist s3://dashboard-on-cdn/polaris_web/${{steps.docker_tag.outputs.IMAGE_TAG}}/dist --delete - - - run: mvn package -Dakto-image-tag=${{ github.event.inputs.Tag }} -Dakto-build-time=$(eval "date +%s") -Dakto-release-version=${{steps.docker_tag.outputs.IMAGE_TAG}} + - run: mvn package -Dakto-image-tag=${{ github.event.inputs.Tag }} -Dakto-build-time=$(eval "date +%s") -Dakto-release-version=${{steps.docker_tag.outputs.IMAGE_TAG}} -DskipTests - name: DockerHub login env: DOCKER_USERNAME: ${{secrets.DOCKER_USERNAME}} @@ -88,6 +89,10 @@ jobs: docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/akto-internal:$IMAGE_TAG . --push cd ../source-code-analyser docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/source-code-analyser:$IMAGE_TAG . --push + cd ../threat-detection + docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/akto-threat-detection:$IMAGE_TAG . --push + cd ../threat-detection-backend + docker buildx build --platform linux/arm64/v8,linux/amd64 -t $ECR_REGISTRY/akto-threat-detection-backend:$IMAGE_TAG . --push - name: Set up JDK 11 uses: actions/setup-java@v1 diff --git a/apps/threat-detection-backend/.gitignore b/apps/threat-detection-backend/.gitignore new file mode 100644 index 0000000000..1ffb9e7e9d --- /dev/null +++ b/apps/threat-detection-backend/.gitignore @@ -0,0 +1 @@ +*.mmdb \ No newline at end of file diff --git a/apps/threat-detection-backend/Dockerfile b/apps/threat-detection-backend/Dockerfile new file mode 100644 index 0000000000..e2ec8f2cbf --- /dev/null +++ b/apps/threat-detection-backend/Dockerfile @@ -0,0 +1,4 @@ +FROM openjdk +WORKDIR /app +COPY ./target/threat-detection-backend-1.0-SNAPSHOT-jar-with-dependencies.jar /app/threat-detection-backend-1.0-SNAPSHOT-jar-with-dependencies.jar +CMD "java" "-XX:+ExitOnOutOfMemoryError" "-jar" "/app/threat-detection-backend-1.0-SNAPSHOT-jar-with-dependencies.jar" \ No newline at end of file diff --git a/apps/threat-detection-backend/pom.xml b/apps/threat-detection-backend/pom.xml new file mode 100644 index 0000000000..b7e4129a9c --- /dev/null +++ b/apps/threat-detection-backend/pom.xml @@ -0,0 +1,210 @@ + + + 4.0.0 + + + 9.4.27.v20200227 + 4.5.11 + + + + com.akto.apps + apps + ${revision} + + + com.akto.apps.threat-detection-backend + threat-detection-backend + jar + + + + org.apache.commons + commons-lang3 + 3.12.0 + + + com.akto.libs.dao + dao + ${project.version} + + + com.akto.libs.utils + utils + ${project.version} + + + org.apache.httpcomponents + httpclient + 4.5.13 + + + + org.jetbrains + annotations + RELEASE + compile + + + org.junit.jupiter + junit-jupiter-api + 5.4.2 + test + + + com.akto.libs.utils + utils + test-jar + ${project.version} + test + + + com.akto.libs.protobuf + protobuf + 1.0-SNAPSHOT + compile + + + + io.vertx + vertx-web + ${vertex.version} + + + + io.vertx + vertx-core + ${vertex.version} + + + + + com.maxmind.geoip2 + geoip2 + 2.15.0 + + + + + + + org.codehaus.mojo + exec-maven-plugin + 3.1.0 + + com.akto.threat.backend.Main + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 8 + 8 + + + + + org.apache.maven.plugins + maven-dependency-plugin + 3.0.1 + + + copy-dependencies + package + + copy-dependencies + + + + + + + src/main/java + src/test/java + + + src/main/resources + + + + + + + normal + + true + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + package + + single + + + + + + com.akto.threat.backend.Main + + + + + jar-with-dependencies + + + + + + + + + + + devcontainer + + + + org.apache.maven.plugins + maven-jar-plugin + + api-threat-detection-1.0-SNAPSHOT-jar-with-dependencies + + + true + com.akto.threat.backend.Main + dependency-jars/ + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/dependency-jars/ + + + + + + + + + + \ No newline at end of file diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/BackendVerticle.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/BackendVerticle.java new file mode 100644 index 0000000000..5b4355b7f0 --- /dev/null +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/BackendVerticle.java @@ -0,0 +1,81 @@ +package com.akto.threat.backend; + +import com.akto.threat.backend.interceptors.AuthenticationInterceptor; +import com.akto.threat.backend.router.DashboardRouter; +import com.akto.threat.backend.router.ThreatDetectionRouter; +import com.akto.threat.backend.service.MaliciousEventService; +import com.akto.threat.backend.service.ThreatActorService; +import com.akto.threat.backend.service.ThreatApiService; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Vertx; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.handler.BodyHandler; + +public class BackendVerticle extends AbstractVerticle { + + private final MaliciousEventService maliciousEventService; + private final ThreatActorService threatActorService; + private final ThreatApiService threatApiService; + + public BackendVerticle( + MaliciousEventService maliciousEventService, + ThreatActorService threatActorService, + ThreatApiService threatApiService) { + this.maliciousEventService = maliciousEventService; + this.threatActorService = threatActorService; + this.threatApiService = threatApiService; + } + + @Override + public void start() { + Vertx vertx = Vertx.vertx(); + + // Create the router + Router router = Router.router(vertx); + + Router api = Router.router(vertx); + + api.route().handler(BodyHandler.create()); + api.route().handler(new AuthenticationInterceptor()); + + Router dashboardRouter = + new DashboardRouter(maliciousEventService, threatActorService, threatApiService) + .setup(vertx); + Router threatDetectionRouter = new ThreatDetectionRouter(maliciousEventService).setup(vertx); + + api.route("/dashboard/*").subRouter(dashboardRouter); + api.route("/threat_detection/*").subRouter(threatDetectionRouter); + + router.route("/api/*").subRouter(api); + + // Start the HTTP server + + router.route("/health").handler(ctx -> ctx.response().setStatusCode(200).end("OK")); + + // 404 handler + router + .route() + .handler( + rc -> { + rc.response().setStatusCode(404).end("404 - Not Found: " + rc.request().uri()); + }); + + int port = + Integer.parseInt( + System.getenv().getOrDefault("THREAT_DETECTION_BACKEND_SERVER_PORT", "9090")); + + vertx + .createHttpServer() + .requestHandler(router) + .listen(port) + .onSuccess( + server -> { + System.out.println("HTTP server started on port " + port); + }) + .onFailure( + err -> { + System.err.println("Failed to start HTTP server: " + err.getMessage()); + System.exit(1); + }); + } +} diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/Main.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/Main.java new file mode 100644 index 0000000000..44d546378a --- /dev/null +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/Main.java @@ -0,0 +1,88 @@ +package com.akto.threat.backend; + +import static org.bson.codecs.configuration.CodecRegistries.fromProviders; +import static org.bson.codecs.configuration.CodecRegistries.fromRegistries; + +import com.akto.DaoInit; +import com.akto.kafka.KafkaConfig; +import com.akto.kafka.KafkaConsumerConfig; +import com.akto.kafka.KafkaProducerConfig; +import com.akto.kafka.Serializer; +import com.akto.threat.backend.client.IPLookupClient; +import com.akto.threat.backend.service.MaliciousEventService; +import com.akto.threat.backend.service.ThreatActorService; +import com.akto.threat.backend.service.ThreatApiService; +import com.akto.threat.backend.tasks.FlushMessagesToDB; +import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; +import com.mongodb.ReadPreference; +import com.mongodb.WriteConcern; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import org.apache.commons.io.IOUtils; +import org.bson.codecs.configuration.CodecRegistry; +import org.bson.codecs.pojo.PojoCodecProvider; + +public class Main { + public static void main(String[] args) throws Exception { + + DaoInit.init(new ConnectionString(System.getenv("AKTO_MONGO_CONN"))); + + ConnectionString connectionString = + new ConnectionString(System.getenv("AKTO_THREAT_PROTECTION_MONGO_CONN")); + CodecRegistry pojoCodecRegistry = + fromProviders(PojoCodecProvider.builder().automatic(true).build()); + CodecRegistry codecRegistry = + fromRegistries(MongoClientSettings.getDefaultCodecRegistry(), pojoCodecRegistry); + MongoClientSettings clientSettings = + MongoClientSettings.builder() + .readPreference(ReadPreference.secondary()) + .writeConcern(WriteConcern.ACKNOWLEDGED) + .applyConnectionString(connectionString) + .codecRegistry(codecRegistry) + .build(); + + MongoClient threatProtectionMongo = MongoClients.create(clientSettings); + KafkaConfig internalKafkaConfig = + KafkaConfig.newBuilder() + .setBootstrapServers(System.getenv("THREAT_EVENTS_KAFKA_BROKER_URL")) + .setGroupId("akto.threat_protection.flush_db") + .setConsumerConfig( + KafkaConsumerConfig.newBuilder() + .setMaxPollRecords(100) + .setPollDurationMilli(100) + .build()) + .setProducerConfig( + KafkaProducerConfig.newBuilder().setBatchSize(100).setLingerMs(1000).build()) + .setKeySerializer(Serializer.STRING) + .setValueSerializer(Serializer.STRING) + .build(); + + IPLookupClient ipLookupClient = new IPLookupClient(getMaxmindFile()); + + new FlushMessagesToDB(internalKafkaConfig, threatProtectionMongo).run(); + + MaliciousEventService maliciousEventService = + new MaliciousEventService(internalKafkaConfig, threatProtectionMongo, ipLookupClient); + + ThreatActorService threatActorService = new ThreatActorService(threatProtectionMongo); + ThreatApiService threatApiService = new ThreatApiService(threatProtectionMongo); + + new BackendVerticle(maliciousEventService, threatActorService, threatApiService).start(); + } + + private static File getMaxmindFile() throws IOException { + File maxmindTmpFile = File.createTempFile("tmp-geo-country", ".mmdb"); + maxmindTmpFile.deleteOnExit(); + + try (FileOutputStream fos = new FileOutputStream(maxmindTmpFile)) { + IOUtils.copy( + Main.class.getClassLoader().getResourceAsStream("maxmind/Geo-Country.mmdb"), fos); + } + + return maxmindTmpFile; + } +} diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/client/IPLookupClient.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/client/IPLookupClient.java new file mode 100644 index 0000000000..10b64166ab --- /dev/null +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/client/IPLookupClient.java @@ -0,0 +1,26 @@ +package com.akto.threat.backend.client; + +import com.maxmind.geoip2.DatabaseReader; +import com.maxmind.geoip2.model.CountryResponse; +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.util.Optional; + +public class IPLookupClient { + private final DatabaseReader db; + + public IPLookupClient(File dbFile) throws IOException { + this.db = new DatabaseReader.Builder(dbFile).build(); + } + + public Optional getCountryISOCodeGivenIp(String ip) { + try { + InetAddress ipAddr = InetAddress.getByName(ip); + CountryResponse resp = db.country(ipAddr); + return Optional.of(resp.getCountry().getIsoCode()); + } catch (Exception e) { + return Optional.empty(); + } + } +} diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/constants/KafkaTopic.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/constants/KafkaTopic.java new file mode 100644 index 0000000000..3f056fc39a --- /dev/null +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/constants/KafkaTopic.java @@ -0,0 +1,7 @@ +package com.akto.threat.backend.constants; + +public class KafkaTopic { + public static class ThreatDetection { + public static final String INTERNAL_DB_MESSAGES = "akto.threat_detection.internal_db_messages"; + } +} diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/constants/MongoDBCollection.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/constants/MongoDBCollection.java new file mode 100644 index 0000000000..879af94557 --- /dev/null +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/constants/MongoDBCollection.java @@ -0,0 +1,9 @@ +package com.akto.threat.backend.constants; + +public class MongoDBCollection { + public static class ThreatDetection { + public static final String MALICIOUS_EVENTS = "malicious_events"; + public static final String AGGREGATE_SAMPLE_MALICIOUS_REQUESTS = + "aggregate_sample_malicious_requests"; + } +} diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/db/AggregateSampleMaliciousEventModel.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/db/AggregateSampleMaliciousEventModel.java new file mode 100644 index 0000000000..cc5f876d8b --- /dev/null +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/db/AggregateSampleMaliciousEventModel.java @@ -0,0 +1,138 @@ +package com.akto.threat.backend.db; + +import com.akto.dto.type.URLMethods.Method; +import java.util.UUID; + +public class AggregateSampleMaliciousEventModel { + + private String id; + private String filterId; + private String actor; + private String ip; + private String url; + private String country; + private Method method; + private String orig; + private int apiCollectionId; + private long requestTime; + + public AggregateSampleMaliciousEventModel() {} + + private AggregateSampleMaliciousEventModel(Builder builder) { + this.id = UUID.randomUUID().toString(); + this.filterId = builder.filterId; + this.actor = builder.actor; + this.ip = builder.ip; + this.country = builder.country; + this.method = builder.method; + this.orig = builder.orig; + this.requestTime = builder.requestTime; + this.url = builder.url; + this.apiCollectionId = builder.apiCollectionId; + } + + public static class Builder { + public int apiCollectionId; + private String filterId; + private String actor; + private String ip; + private String country; + private String url; + private Method method; + private String orig; + private long requestTime; + + public Builder setFilterId(String filterId) { + this.filterId = filterId; + return this; + } + + public Builder setActor(String actor) { + this.actor = actor; + return this; + } + + public Builder setIp(String ip) { + this.ip = ip; + return this; + } + + public Builder setCountry(String country) { + this.country = country; + return this; + } + + public Builder setUrl(String url) { + this.url = url; + return this; + } + + public Builder setMethod(Method method) { + this.method = method; + return this; + } + + public Builder setOrig(String orig) { + this.orig = orig; + return this; + } + + public Builder setRequestTime(long requestTime) { + this.requestTime = requestTime; + return this; + } + + public Builder setApiCollectionId(int apiCollectionId) { + this.apiCollectionId = apiCollectionId; + return this; + } + + public AggregateSampleMaliciousEventModel build() { + return new AggregateSampleMaliciousEventModel(this); + } + } + + public String getId() { + return id; + } + + public String getFilterId() { + return filterId; + } + + public String getActor() { + return actor; + } + + public String getIp() { + return ip; + } + + public String getUrl() { + return url; + } + + public String getCountry() { + return country; + } + + public Method getMethod() { + return method; + } + + public String getOrig() { + return orig; + } + + public long getRequestTime() { + return requestTime; + } + + public int getApiCollectionId() { + return apiCollectionId; + } + + public static Builder newBuilder() { + return new Builder(); + } +} diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/db/MaliciousEventModel.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/db/MaliciousEventModel.java new file mode 100644 index 0000000000..83aeb1c148 --- /dev/null +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/db/MaliciousEventModel.java @@ -0,0 +1,231 @@ +package com.akto.threat.backend.db; + +import com.akto.dto.type.URLMethods; +import java.util.UUID; + +public class MaliciousEventModel { + + private String id; + private String filterId; + private String actor; + private String latestApiIp; + private String latestApiEndpoint; + private String country; + private URLMethods.Method latestApiMethod; + private String latestApiOrig; + private long detectedAt; + private int latestApiCollectionId; + private EventType eventType; + private String category; + private String subCategory; + + public enum EventType { + SINGLE, + AGGREGATED + } + + public MaliciousEventModel() {} + + private MaliciousEventModel(Builder builder) { + this.id = UUID.randomUUID().toString(); + this.filterId = builder.filterId; + this.actor = builder.actor; + this.latestApiIp = builder.latestApiIp; + this.country = builder.country; + this.latestApiEndpoint = builder.latestApiEndpoint; + this.latestApiMethod = builder.latestApiMethod; + this.latestApiOrig = builder.latestApiOrig; + this.latestApiCollectionId = builder.latestApiCollectionId; + this.detectedAt = builder.detectedAt; + this.eventType = builder.eventType; + this.category = builder.category; + this.subCategory = builder.subCategory; + } + + public static class Builder { + public EventType eventType; + private String filterId; + private String actor; + private String latestApiIp; + private String country; + private String latestApiEndpoint; + private URLMethods.Method latestApiMethod; + private String latestApiOrig; + private int latestApiCollectionId; + private long detectedAt; + private String category; + private String subCategory; + + public Builder setFilterId(String filterId) { + this.filterId = filterId; + return this; + } + + public Builder setActor(String actor) { + this.actor = actor; + return this; + } + + public Builder setLatestApiIp(String ip) { + this.latestApiIp = ip; + return this; + } + + public Builder setCountry(String country) { + this.country = country; + return this; + } + + public Builder setLatestApiEndpoint(String latestApiEndpoint) { + this.latestApiEndpoint = latestApiEndpoint; + return this; + } + + public Builder setLatestApiMethod(URLMethods.Method latestApiMethod) { + this.latestApiMethod = latestApiMethod; + return this; + } + + public Builder setLatestApiOrig(String latestApiOrig) { + this.latestApiOrig = latestApiOrig; + return this; + } + + public Builder setDetectedAt(long detectedAt) { + this.detectedAt = detectedAt; + return this; + } + + public Builder setLatestApiCollectionId(int latestApiCollectionId) { + this.latestApiCollectionId = latestApiCollectionId; + return this; + } + + public Builder setEventType(EventType eventType) { + this.eventType = eventType; + return this; + } + + public Builder setCategory(String category) { + this.category = category; + return this; + } + + public Builder setSubCategory(String subCategory) { + this.subCategory = subCategory; + return this; + } + + public MaliciousEventModel build() { + return new MaliciousEventModel(this); + } + } + + public String getId() { + return id; + } + + public String getFilterId() { + return filterId; + } + + public String getActor() { + return actor; + } + + public String getLatestApiIp() { + return latestApiIp; + } + + public String getLatestApiEndpoint() { + return latestApiEndpoint; + } + + public String getCountry() { + return country; + } + + public URLMethods.Method getLatestApiMethod() { + return latestApiMethod; + } + + public String getLatestApiOrig() { + return latestApiOrig; + } + + public long getDetectedAt() { + return detectedAt; + } + + public int getLatestApiCollectionId() { + return latestApiCollectionId; + } + + public EventType getEventType() { + return eventType; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public void setId(String id) { + this.id = id; + } + + public void setFilterId(String filterId) { + this.filterId = filterId; + } + + public void setActor(String actor) { + this.actor = actor; + } + + public void setLatestApiIp(String ip) { + this.latestApiIp = ip; + } + + public void setLatestApiEndpoint(String latestApiEndpoint) { + this.latestApiEndpoint = latestApiEndpoint; + } + + public void setCountry(String country) { + this.country = country; + } + + public void setLatestApiMethod(URLMethods.Method latestApiMethod) { + this.latestApiMethod = latestApiMethod; + } + + public void setLatestApiOrig(String latestApiOrig) { + this.latestApiOrig = latestApiOrig; + } + + public void setDetectedAt(long detectedAt) { + this.detectedAt = detectedAt; + } + + public void setLatestApiCollectionId(int latestApiCollectionId) { + this.latestApiCollectionId = latestApiCollectionId; + } + + public void setEventType(EventType eventType) { + this.eventType = eventType; + } + + public String getCategory() { + return category; + } + + public void setCategory(String category) { + this.category = category; + } + + public String getSubCategory() { + return subCategory; + } + + public void setSubCategory(String subCategory) { + this.subCategory = subCategory; + } +} diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/interceptors/AuthenticationInterceptor.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/interceptors/AuthenticationInterceptor.java new file mode 100644 index 0000000000..f17f7bdd97 --- /dev/null +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/interceptors/AuthenticationInterceptor.java @@ -0,0 +1,69 @@ +package com.akto.threat.backend.interceptors; + +import com.akto.dao.ConfigsDao; +import com.akto.dto.Config; +import io.jsonwebtoken.Claims; +import io.jsonwebtoken.Jws; +import io.jsonwebtoken.Jwts; +import io.vertx.core.Handler; +import io.vertx.ext.web.RoutingContext; +import java.io.IOException; +import java.security.KeyFactory; +import java.security.NoSuchAlgorithmException; +import java.security.PublicKey; +import java.security.spec.InvalidKeySpecException; +import java.security.spec.X509EncodedKeySpec; +import java.util.Base64; + +public class AuthenticationInterceptor implements Handler { + + private static PublicKey getPublicKey() + throws NoSuchAlgorithmException, InvalidKeySpecException, IOException { + Config.HybridSaasConfig config; + try { + config = + (Config.HybridSaasConfig) + ConfigsDao.instance.findOne("_id", Config.ConfigType.HYBRID_SAAS.name()); + } catch (Exception e) { + System.out.println(e); + throw e; + } + String rsaPublicKey = config.getPublicKey(); + + rsaPublicKey = rsaPublicKey.replace("-----BEGIN PUBLIC KEY-----", ""); + rsaPublicKey = rsaPublicKey.replace("-----END PUBLIC KEY-----", ""); + rsaPublicKey = rsaPublicKey.replace("\n", ""); + byte[] decoded = Base64.getDecoder().decode(rsaPublicKey); + X509EncodedKeySpec keySpec = new X509EncodedKeySpec(decoded); + KeyFactory kf = KeyFactory.getInstance("RSA"); + + try { + return kf.generatePublic(keySpec); + } catch (Exception e) { + System.out.println(e); + throw e; + } + } + + @Override + public void handle(RoutingContext context) { + String token = context.request().getHeader("Authorization"); + if (token == null || !token.startsWith("Bearer ")) { + context.response().setStatusCode(401).end("Missing or Invalid Authorization header"); + return; + } + + token = token.substring(7); + + try { + PublicKey publicKey = getPublicKey(); + Jws claims = + Jwts.parserBuilder().setSigningKey(publicKey).build().parseClaimsJws(token); + int accountId = (int) claims.getBody().get("accountId"); + context.put("accountId", accountId + ""); + context.next(); + } catch (Exception e) { + context.response().setStatusCode(401).end("Missing or Invalid Authorization header"); + } + } +} diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/interceptors/Constants.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/interceptors/Constants.java new file mode 100644 index 0000000000..7c5eb033ea --- /dev/null +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/interceptors/Constants.java @@ -0,0 +1,16 @@ +package com.akto.threat.backend.interceptors; + +import static io.grpc.Metadata.ASCII_STRING_MARSHALLER; + +import io.grpc.Context; +import io.grpc.Metadata; + +public class Constants { + public static final Metadata.Key AUTHORIZATION_METADATA_KEY = + Metadata.Key.of("Authorization", ASCII_STRING_MARSHALLER); + public static final Context.Key ACCOUNT_ID_CONTEXT_KEY = Context.key("accountId"); + + private Constants() { + throw new AssertionError(); + } +} diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/router/ARouter.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/router/ARouter.java new file mode 100644 index 0000000000..b4e54e8109 --- /dev/null +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/router/ARouter.java @@ -0,0 +1,9 @@ +package com.akto.threat.backend.router; + +import io.vertx.core.Vertx; +import io.vertx.ext.web.Router; + +public interface ARouter { + + Router setup(Vertx vertx); +} diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/router/DashboardRouter.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/router/DashboardRouter.java new file mode 100644 index 0000000000..03705b6b4a --- /dev/null +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/router/DashboardRouter.java @@ -0,0 +1,138 @@ +package com.akto.threat.backend.router; + +import com.akto.proto.generated.threat_detection.service.dashboard_service.v1.FetchAlertFiltersRequest; +import com.akto.proto.generated.threat_detection.service.dashboard_service.v1.ListMaliciousRequestsRequest; +import com.akto.proto.generated.threat_detection.service.dashboard_service.v1.ListThreatActorsRequest; +import com.akto.proto.generated.threat_detection.service.dashboard_service.v1.ListThreatApiRequest; +import com.akto.proto.generated.threat_detection.service.dashboard_service.v1.ThreatActorByCountryRequest; +import com.akto.proto.generated.threat_detection.service.dashboard_service.v1.ThreatCategoryWiseCountRequest; +import com.akto.proto.utils.ProtoMessageUtils; +import com.akto.threat.backend.service.MaliciousEventService; +import com.akto.threat.backend.service.ThreatActorService; +import com.akto.threat.backend.service.ThreatApiService; +import io.vertx.core.Vertx; +import io.vertx.ext.web.RequestBody; +import io.vertx.ext.web.Router; + +public class DashboardRouter implements ARouter { + + private final MaliciousEventService dsService; + private final ThreatActorService threatActorService; + private final ThreatApiService threatApiService; + + public DashboardRouter( + MaliciousEventService dsService, + ThreatActorService threatActorService, + ThreatApiService threatApiService + ) { + this.dsService = dsService; + this.threatActorService = threatActorService; + this.threatApiService = threatApiService; + } + + @Override + public Router setup(Vertx vertx) { + Router router = Router.router(vertx); + + router + .get("/fetch_filters") + .blockingHandler(ctx -> { + ProtoMessageUtils.toString( + dsService.fetchAlertFilters( + ctx.get("accountId"), + FetchAlertFiltersRequest.newBuilder().build() + ) + ).ifPresent(s -> ctx.response().setStatusCode(200).end(s)); + }); + + router + .post("/list_malicious_requests") + .blockingHandler(ctx -> { + RequestBody reqBody = ctx.body(); + ListMaliciousRequestsRequest req = ProtoMessageUtils.< + ListMaliciousRequestsRequest + >toProtoMessage( + ListMaliciousRequestsRequest.class, + reqBody.asString() + ).orElse(null); + + if (req == null) { + ctx.response().setStatusCode(400).end("Invalid request"); + return; + } + + ProtoMessageUtils.toString( + dsService.listMaliciousRequests(ctx.get("accountId"), req) + ).ifPresent(s -> ctx.response().setStatusCode(200).end(s)); + }); + + router + .post("/list_threat_actors") + .blockingHandler(ctx -> { + RequestBody reqBody = ctx.body(); + ListThreatActorsRequest req = ProtoMessageUtils.< + ListThreatActorsRequest + >toProtoMessage( + ListThreatActorsRequest.class, + reqBody.asString() + ).orElse(null); + + if (req == null) { + ctx.response().setStatusCode(400).end("Invalid request"); + return; + } + + ProtoMessageUtils.toString( + threatActorService.listThreatActors( + ctx.get("accountId"), + req + ) + ).ifPresent(s -> ctx.response().setStatusCode(200).end(s)); + }); + + router + .post("/list_threat_apis") + .blockingHandler(ctx -> { + RequestBody reqBody = ctx.body(); + ListThreatApiRequest req = ProtoMessageUtils.< + ListThreatApiRequest + >toProtoMessage( + ListThreatApiRequest.class, + reqBody.asString() + ).orElse(null); + + if (req == null) { + ctx.response().setStatusCode(400).end("Invalid request"); + return; + } + + ProtoMessageUtils.toString( + threatApiService.listThreatApis(ctx.get("accountId"), req) + ).ifPresent(s -> ctx.response().setStatusCode(200).end(s)); + }); + + router + .get("/get_actors_count_per_country") + .blockingHandler(ctx -> { + ProtoMessageUtils.toString( + threatActorService.getThreatActorByCountry( + ctx.get("accountId"), + ThreatActorByCountryRequest.newBuilder().build() + ) + ).ifPresent(s -> ctx.response().setStatusCode(200).end(s)); + }); + + router + .get("/get_subcategory_wise_count") + .blockingHandler(ctx -> { + ProtoMessageUtils.toString( + threatApiService.getSubCategoryWiseCount( + ctx.get("accountId"), + ThreatCategoryWiseCountRequest.newBuilder().build() + ) + ).ifPresent(s -> ctx.response().setStatusCode(200).end(s)); + }); + + return router; + } +} diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/router/ThreatDetectionRouter.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/router/ThreatDetectionRouter.java new file mode 100644 index 0000000000..67deec84dc --- /dev/null +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/router/ThreatDetectionRouter.java @@ -0,0 +1,43 @@ +package com.akto.threat.backend.router; + +import com.akto.proto.generated.threat_detection.service.malicious_alert_service.v1.RecordMaliciousEventRequest; +import com.akto.proto.utils.ProtoMessageUtils; +import com.akto.threat.backend.service.MaliciousEventService; +import io.vertx.core.Vertx; +import io.vertx.ext.web.RequestBody; +import io.vertx.ext.web.Router; + +public class ThreatDetectionRouter implements ARouter { + + private final MaliciousEventService maliciousEventService; + + public ThreatDetectionRouter(MaliciousEventService maliciousEventService) { + this.maliciousEventService = maliciousEventService; + } + + @Override + public Router setup(Vertx vertx) { + Router router = Router.router(vertx); + + router + .post("/record_malicious_event") + .blockingHandler( + ctx -> { + RequestBody reqBody = ctx.body(); + RecordMaliciousEventRequest req = + ProtoMessageUtils.toProtoMessage( + RecordMaliciousEventRequest.class, reqBody.asString()) + .orElse(null); + + if (req == null) { + ctx.response().setStatusCode(400).end("Invalid request"); + return; + } + + maliciousEventService.recordMaliciousEvent(ctx.get("accountId"), req); + ctx.response().setStatusCode(202).end(); + }); + + return router; + } +} diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/MaliciousEventService.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/MaliciousEventService.java new file mode 100644 index 0000000000..7bd58e6888 --- /dev/null +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/MaliciousEventService.java @@ -0,0 +1,200 @@ +package com.akto.threat.backend.service; + +import com.akto.dto.type.URLMethods; +import com.akto.kafka.Kafka; +import com.akto.kafka.KafkaConfig; +import com.akto.proto.generated.threat_detection.message.malicious_event.event_type.v1.EventType; +import com.akto.proto.generated.threat_detection.message.malicious_event.v1.MaliciousEventMessage; +import com.akto.proto.generated.threat_detection.message.sample_request.v1.SampleMaliciousRequest; +import com.akto.proto.generated.threat_detection.service.dashboard_service.v1.FetchAlertFiltersRequest; +import com.akto.proto.generated.threat_detection.service.dashboard_service.v1.FetchAlertFiltersResponse; +import com.akto.proto.generated.threat_detection.service.dashboard_service.v1.ListMaliciousRequestsRequest; +import com.akto.proto.generated.threat_detection.service.dashboard_service.v1.ListMaliciousRequestsResponse; +import com.akto.proto.generated.threat_detection.service.dashboard_service.v1.TimeRangeFilter; +import com.akto.proto.generated.threat_detection.service.malicious_alert_service.v1.RecordMaliciousEventRequest; +import com.akto.threat.backend.client.IPLookupClient; +import com.akto.threat.backend.constants.KafkaTopic; +import com.akto.threat.backend.constants.MongoDBCollection; +import com.akto.threat.backend.db.AggregateSampleMaliciousEventModel; +import com.akto.threat.backend.db.MaliciousEventModel; +import com.akto.threat.backend.utils.KafkaUtils; +import com.mongodb.client.DistinctIterable; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.model.Filters; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.bson.Document; +import org.bson.conversions.Bson; + +public class MaliciousEventService { + + private final Kafka kafka; + private final MongoClient mongoClient; + private final IPLookupClient ipLookupClient; + + public MaliciousEventService( + KafkaConfig kafkaConfig, MongoClient mongoClient, IPLookupClient ipLookupClient) { + this.kafka = new Kafka(kafkaConfig); + this.mongoClient = mongoClient; + this.ipLookupClient = ipLookupClient; + } + + public void recordMaliciousEvent(String accountId, RecordMaliciousEventRequest request) { + MaliciousEventMessage evt = request.getMaliciousEvent(); + String actor = evt.getActor(); + String filterId = evt.getFilterId(); + + EventType eventType = evt.getEventType(); + + MaliciousEventModel.EventType maliciousEventType = + EventType.EVENT_TYPE_AGGREGATED.equals(eventType) + ? MaliciousEventModel.EventType.AGGREGATED + : MaliciousEventModel.EventType.SINGLE; + + MaliciousEventModel maliciousEventModel = + MaliciousEventModel.newBuilder() + .setDetectedAt(evt.getDetectedAt()) + .setActor(actor) + .setFilterId(filterId) + .setLatestApiEndpoint(evt.getLatestApiEndpoint()) + .setLatestApiMethod(URLMethods.Method.fromString(evt.getLatestApiMethod())) + .setLatestApiOrig(evt.getLatestApiPayload()) + .setLatestApiCollectionId(evt.getLatestApiCollectionId()) + .setEventType(maliciousEventType) + .setLatestApiIp(evt.getLatestApiIp()) + .setCountry( + this.ipLookupClient.getCountryISOCodeGivenIp(evt.getLatestApiIp()).orElse("")) + .setCategory(evt.getCategory()) + .setSubCategory(evt.getSubCategory()) + .build(); + + if (MaliciousEventModel.EventType.AGGREGATED.equals(maliciousEventType)) { + List events = new ArrayList<>(); + for (SampleMaliciousRequest sampleReq : request.getSampleRequestsList()) { + events.add( + AggregateSampleMaliciousEventModel.newBuilder() + .setActor(actor) + .setIp(sampleReq.getIp()) + .setUrl(sampleReq.getUrl()) + .setMethod(URLMethods.Method.fromString(sampleReq.getMethod())) + .setOrig(sampleReq.getPayload()) + .setRequestTime(sampleReq.getTimestamp()) + .setApiCollectionId(sampleReq.getApiCollectionId()) + .setFilterId(filterId) + .build()); + } + + this.kafka.send( + KafkaUtils.generateMsg( + events, + MongoDBCollection.ThreatDetection.AGGREGATE_SAMPLE_MALICIOUS_REQUESTS, + accountId), + KafkaTopic.ThreatDetection.INTERNAL_DB_MESSAGES); + } + + this.kafka.send( + KafkaUtils.generateMsg( + maliciousEventModel, MongoDBCollection.ThreatDetection.MALICIOUS_EVENTS, accountId), + KafkaTopic.ThreatDetection.INTERNAL_DB_MESSAGES); + } + + private static Set findDistinctFields( + MongoCollection coll, String fieldName, Class tClass, Bson filters) { + DistinctIterable r = coll.distinct(fieldName, filters, tClass); + Set result = new HashSet<>(); + MongoCursor cursor = r.cursor(); + while (cursor.hasNext()) { + result.add(cursor.next()); + } + return result; + } + + public FetchAlertFiltersResponse fetchAlertFilters( + String accountId, FetchAlertFiltersRequest request) { + MongoCollection coll = + this.mongoClient + .getDatabase(accountId) + .getCollection("malicious_events", MaliciousEventModel.class); + + Set actors = + MaliciousEventService.findDistinctFields( + coll, "actor", String.class, Filters.empty()); + Set urls = + MaliciousEventService.findDistinctFields( + coll, "latestApiEndpoint", String.class, Filters.empty()); + + return FetchAlertFiltersResponse.newBuilder().addAllActors(actors).addAllUrls(urls).build(); + } + + public ListMaliciousRequestsResponse listMaliciousRequests( + String accountId, ListMaliciousRequestsRequest request) { + int limit = request.getLimit(); + int skip = request.hasSkip() ? request.getSkip() : 0; + Map sort = request.getSortMap(); + ListMaliciousRequestsRequest.Filter filter = request.getFilter(); + + MongoCollection coll = + this.mongoClient + .getDatabase(accountId) + .getCollection( + MongoDBCollection.ThreatDetection.MALICIOUS_EVENTS, MaliciousEventModel.class); + + Document query = new Document(); + if (!filter.getActorsList().isEmpty()) { + query.append("actor", new Document("$in", filter.getActorsList())); + } + + if (!filter.getUrlsList().isEmpty()) { + query.append("latestApiEndpoint", new Document("$in", filter.getUrlsList())); + } + + if (!filter.getIpsList().isEmpty()) { + query.append("latestApiIp", new Document("$in", filter.getIpsList())); + } + + if (filter.hasDetectedAtTimeRange()) { + TimeRangeFilter timeRange = filter.getDetectedAtTimeRange(); + long start = timeRange.hasStart() ? timeRange.getStart() : 0; + long end = timeRange.hasEnd() ? timeRange.getEnd() : Long.MAX_VALUE; + + query.append("detectedAt", new Document("$gte", start).append("$lte", end)); + } + + long total = coll.countDocuments(query); + try (MongoCursor cursor = + coll.find(query) + .sort(new Document("detectedAt", sort.getOrDefault("detectedAt", -1))) + .skip(skip) + .limit(limit) + .cursor()) { + List maliciousEvents = new ArrayList<>(); + while (cursor.hasNext()) { + MaliciousEventModel evt = cursor.next(); + maliciousEvents.add( + ListMaliciousRequestsResponse.MaliciousEvent.newBuilder() + .setActor(evt.getActor()) + .setFilterId(evt.getFilterId()) + .setFilterId(evt.getFilterId()) + .setId(evt.getId()) + .setIp(evt.getLatestApiIp()) + .setCountry(evt.getCountry()) + .setPayload(evt.getLatestApiOrig()) + .setEndpoint(evt.getLatestApiEndpoint()) + .setMethod(evt.getLatestApiMethod().name()) + .setDetectedAt(evt.getDetectedAt()) + .setCategory(evt.getCategory()) + .setSubCategory(evt.getSubCategory()) + .build()); + } + return ListMaliciousRequestsResponse.newBuilder() + .setTotal(total) + .addAllMaliciousEvents(maliciousEvents) + .build(); + } + } +} diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatActorService.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatActorService.java new file mode 100644 index 0000000000..31495f3ec0 --- /dev/null +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatActorService.java @@ -0,0 +1,140 @@ +package com.akto.threat.backend.service; + +import com.akto.proto.generated.threat_detection.service.dashboard_service.v1.ListThreatActorResponse; +import com.akto.proto.generated.threat_detection.service.dashboard_service.v1.ListThreatActorsRequest; +import com.akto.proto.generated.threat_detection.service.dashboard_service.v1.ThreatActorByCountryRequest; +import com.akto.proto.generated.threat_detection.service.dashboard_service.v1.ThreatActorByCountryResponse; +import com.akto.threat.backend.constants.MongoDBCollection; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.bson.Document; + +public class ThreatActorService { + + private final MongoClient mongoClient; + + public ThreatActorService(MongoClient mongoClient) { + this.mongoClient = mongoClient; + } + + public ListThreatActorResponse listThreatActors( + String accountId, ListThreatActorsRequest request) { + int skip = request.hasSkip() ? request.getSkip() : 0; + int limit = request.getLimit(); + Map sort = request.getSortMap(); + MongoCollection coll = + this.mongoClient + .getDatabase(accountId) + .getCollection(MongoDBCollection.ThreatDetection.MALICIOUS_EVENTS, Document.class); + + ListThreatActorsRequest.Filter filter = request.getFilter(); + + List base = new ArrayList<>(); + + Document match = new Document(); + + if (!filter.getActorsList().isEmpty()) { + match.append("actor", new Document("$in", filter.getActorsList())); + } + + if (!filter.getLatestIpsList().isEmpty()) { + match.append("latestApiIp", new Document("$in", filter.getLatestIpsList())); + } + + if (filter.hasDetectedAtTimeRange()) { + long start = filter.getDetectedAtTimeRange().getStart(); + long end = filter.getDetectedAtTimeRange().getEnd(); + match.append("detectedAt", new Document("$gte", start).append("$lte", end)); + } + + if (!match.isEmpty()) { + base.add(new Document("$match", match)); + } + + base.add(new Document("$sort", new Document("detectedAt", -1))); + base.add( + new Document( + "$group", + new Document("_id", "$actor") + .append("latestApiEndpoint", new Document("$last", "$latestApiEndpoint")) + .append("latestApiMethod", new Document("$last", "$latestApiMethod")) + .append("latestApiIp", new Document("$last", "$latestApiIp")) + .append("country", new Document("$last", "$country")) + .append("discoveredAt", new Document("$last", "$detectedAt")))); + + List countPipeline = new ArrayList<>(base); + countPipeline.add(new Document("$count", "total")); + + Document result = coll.aggregate(countPipeline).first(); + long total = result != null ? result.getInteger("total", 0) : 0; + + List pipeline = new ArrayList<>(base); + + pipeline.add(new Document("$skip", skip)); + pipeline.add(new Document("$limit", limit)); + + pipeline.add( + new Document( + "$sort", new Document("discoveredAt", sort.getOrDefault("discoveredAt", -1)))); // sort + + List actors = new ArrayList<>(); + try (MongoCursor cursor = coll.aggregate(pipeline).cursor()) { + while (cursor.hasNext()) { + Document doc = cursor.next(); + actors.add( + ListThreatActorResponse.ThreatActor.newBuilder() + .setId(doc.getString("_id")) + .setLatestApiEndpoint(doc.getString("latestApiEndpoint")) + .setLatestApiMethod(doc.getString("latestApiMethod")) + .setLatestApiIp(doc.getString("latestApiIp")) + .setDiscoveredAt(doc.getLong("discoveredAt")) + .setCountry(doc.getString("country")) + .build()); + } + } + + return ListThreatActorResponse.newBuilder().addAllActors(actors).setTotal(total).build(); + } + + public ThreatActorByCountryResponse getThreatActorByCountry( + String accountId, ThreatActorByCountryRequest request) { + MongoCollection coll = + this.mongoClient + .getDatabase(accountId) + .getCollection(MongoDBCollection.ThreatDetection.MALICIOUS_EVENTS, Document.class); + + List pipeline = new ArrayList<>(); + pipeline.add( + new Document("$sort", new Document("country", 1).append("detectedAt", -1))); // sort + pipeline.add( + new Document( + "$group", + new Document("_id", "$country") + .append("distinctActors", new Document("$addToSet", "$actor")))); + + pipeline.add( + new Document( + "$addFields", new Document("actorsCount", new Document("$size", "$distinctActors")))); + + pipeline.add(new Document("$sort", new Document("actorsCount", -1))); // sort + + List actorsByCountryCount = new ArrayList<>(); + + try (MongoCursor cursor = coll.aggregate(pipeline).cursor()) { + while (cursor.hasNext()) { + Document doc = cursor.next(); + actorsByCountryCount.add( + ThreatActorByCountryResponse.CountryCount.newBuilder() + .setCode(doc.getString("_id")) + .setCount(doc.getInteger("actorsCount", 0)) + .build()); + } + } + + return ThreatActorByCountryResponse.newBuilder().addAllCountries(actorsByCountryCount).build(); + } +} diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatApiService.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatApiService.java new file mode 100644 index 0000000000..b2dbc258a9 --- /dev/null +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/service/ThreatApiService.java @@ -0,0 +1,152 @@ +package com.akto.threat.backend.service; + +import com.akto.proto.generated.threat_detection.service.dashboard_service.v1.ListThreatApiRequest; +import com.akto.proto.generated.threat_detection.service.dashboard_service.v1.ListThreatApiResponse; +import com.akto.proto.generated.threat_detection.service.dashboard_service.v1.ThreatCategoryWiseCountRequest; +import com.akto.proto.generated.threat_detection.service.dashboard_service.v1.ThreatCategoryWiseCountResponse; +import com.akto.threat.backend.constants.MongoDBCollection; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.bson.Document; + +public class ThreatApiService { + + private final MongoClient mongoClient; + + public ThreatApiService(MongoClient mongoClient) { + this.mongoClient = mongoClient; + } + + public ListThreatApiResponse listThreatApis(String accountId, ListThreatApiRequest request) { + int skip = request.hasSkip() ? request.getSkip() : 0; + int limit = request.getLimit(); + Map sort = request.getSortMap(); + MongoCollection coll = + this.mongoClient + .getDatabase(accountId) + .getCollection(MongoDBCollection.ThreatDetection.MALICIOUS_EVENTS, Document.class); + + List base = new ArrayList<>(); + ListThreatApiRequest.Filter filter = request.getFilter(); + + Document match = new Document(); + if (!filter.getMethodsList().isEmpty()) { + match.append("latestApiMethod", new Document("$in", filter.getMethodsList())); + } + + if (!filter.getUrlsList().isEmpty()) { + match.append("latestApiEndpoint", new Document("$in", filter.getUrlsList())); + } + + if (filter.hasDetectedAtTimeRange()) { + long start = filter.getDetectedAtTimeRange().getStart(); + long end = filter.getDetectedAtTimeRange().getEnd(); + match.append("detectedAt", new Document("$gte", start).append("$lte", end)); + } + + if (!match.isEmpty()) { + base.add(new Document("$match", match)); + } + + base.add(new Document("$sort", new Document("detectedAt", -1))); // sort + base.add( + new Document( + "$group", + new Document( + "_id", + new Document("endpoint", "$latestApiEndpoint") + .append("method", "$latestApiMethod")) + .append("discoveredAt", new Document("$last", "$detectedAt")) + .append("distinctActors", new Document("$addToSet", "$actor")) + .append("requestsCount", new Document("$sum", 1)))); + base.add( + new Document( + "$addFields", new Document("actorsCount", new Document("$size", "$distinctActors")))); + + List countPipeline = new ArrayList<>(base); + countPipeline.add(new Document("$count", "total")); + + Document result = coll.aggregate(countPipeline).first(); + long total = result != null ? result.getInteger("total", 0) : 0; + + List pipeline = new ArrayList<>(base); + + pipeline.add(new Document("$project", new Document("distinctActors", 0))); + pipeline.add(new Document("$skip", skip)); + pipeline.add(new Document("$limit", limit)); + // add sort + pipeline.add( + new Document( + "$sort", + new Document("discoveredAt", sort.getOrDefault("discoveredAt", -1)) + .append("requestsCount", sort.getOrDefault("requestsCount", -1)) + .append("actorsCount", sort.getOrDefault("actorsCount", -1)))); + + List apis = new ArrayList<>(); + try (MongoCursor cursor = coll.aggregate(pipeline).cursor()) { + while (cursor.hasNext()) { + Document doc = cursor.next(); + Document agg = (Document) doc.get("_id"); + apis.add( + ListThreatApiResponse.ThreatApi.newBuilder() + .setEndpoint(agg.getString("endpoint")) + .setMethod(agg.getString("method")) + .setDiscoveredAt(doc.getLong("discoveredAt")) + .setActorsCount(doc.getInteger("actorsCount", 0)) + .setRequestsCount(doc.getInteger("requestsCount", 0)) + .build()); + } + } catch (Exception e) { + e.printStackTrace(); + } + + return ListThreatApiResponse.newBuilder().addAllApis(apis).setTotal(total).build(); + } + + public ThreatCategoryWiseCountResponse getSubCategoryWiseCount( + String accountId, ThreatCategoryWiseCountRequest req) { + MongoCollection coll = + this.mongoClient + .getDatabase(accountId) + .getCollection(MongoDBCollection.ThreatDetection.MALICIOUS_EVENTS, Document.class); + + List pipeline = new ArrayList<>(); + pipeline.add( + new Document("$sort", new Document("category", 1).append("detectedAt", -1))); // sort + pipeline.add( + new Document( + "$group", + new Document( + "_id", + new Document("category", "$category").append("subCategory", "$subCategory")) + .append("count", new Document("$sum", 1)))); + + pipeline.add( + new Document( + "$sort", + new Document("category", -1).append("subCategory", -1).append("count", -1))); // sort + + List categoryWiseCounts = new ArrayList<>(); + + try (MongoCursor cursor = coll.aggregate(pipeline).cursor()) { + while (cursor.hasNext()) { + Document doc = cursor.next(); + Document agg = (Document) doc.get("_id"); + categoryWiseCounts.add( + ThreatCategoryWiseCountResponse.SubCategoryCount.newBuilder() + .setCategory(agg.getString("category")) + .setSubCategory(agg.getString("subCategory")) + .setCount(doc.getInteger("count", 0)) + .build()); + } + } + + return ThreatCategoryWiseCountResponse.newBuilder() + .addAllCategoryWiseCounts(categoryWiseCounts) + .build(); + } +} diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/tasks/FlushMessagesToDB.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/tasks/FlushMessagesToDB.java new file mode 100644 index 0000000000..1faf433e77 --- /dev/null +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/tasks/FlushMessagesToDB.java @@ -0,0 +1,134 @@ +package com.akto.threat.backend.tasks; + +import com.akto.kafka.KafkaConfig; +import com.akto.runtime.utils.Utils; +import com.akto.threat.backend.constants.KafkaTopic; +import com.akto.threat.backend.constants.MongoDBCollection; +import com.akto.threat.backend.db.AggregateSampleMaliciousEventModel; +import com.akto.threat.backend.db.MaliciousEventModel; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; +import com.mongodb.client.MongoClient; +import com.mongodb.client.model.BulkWriteOptions; +import com.mongodb.client.model.InsertOneModel; +import com.mongodb.client.model.WriteModel; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; + +public class FlushMessagesToDB { + + private final KafkaConsumer kafkaConsumer; + private final KafkaConfig kafkaConfig; + private final MongoClient mClient; + + private static final ObjectMapper mapper = new ObjectMapper(); + private static final Gson gson = new Gson(); + + public FlushMessagesToDB(KafkaConfig kafkaConfig, MongoClient mongoClient) { + String kafkaBrokerUrl = kafkaConfig.getBootstrapServers(); + + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerUrl); + properties.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + kafkaConfig.getKeySerializer().getDeserializer()); + properties.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + kafkaConfig.getValueSerializer().getDeserializer()); + properties.put( + ConsumerConfig.MAX_POLL_RECORDS_CONFIG, + kafkaConfig.getConsumerConfig().getMaxPollRecords()); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId()); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + this.kafkaConsumer = new KafkaConsumer<>(properties); + this.kafkaConfig = kafkaConfig; + + this.mClient = mongoClient; + } + + public void run() { + ExecutorService pollingExecutor = Executors.newSingleThreadExecutor(); + this.kafkaConsumer.subscribe( + Collections.singletonList(KafkaTopic.ThreatDetection.INTERNAL_DB_MESSAGES)); + + pollingExecutor.execute( + () -> { + // Poll data from Kafka topic + while (true) { + ConsumerRecords records = + kafkaConsumer.poll( + Duration.ofMillis(this.kafkaConfig.getConsumerConfig().getPollDurationMilli())); + if (records.isEmpty()) { + continue; + } + + processRecords(records); + + if (!records.isEmpty()) { + kafkaConsumer.commitSync(); + } + } + }); + } + + private void processRecords(ConsumerRecords records) { + records.forEach( + r -> { + try { + String message = r.value(); + writeMessage(message); + } catch (JsonProcessingException ex) { + ex.printStackTrace(); + } catch (Exception ex) { + ex.printStackTrace(); + } + }); + } + + private void writeMessage(String message) throws JsonProcessingException { + Map json = gson.fromJson(message, Map.class); + String eventType = (String) json.get("eventType"); + String payload = (String) json.get("payload"); + String accountId = (String) json.get("accountId"); + + switch (eventType) { + case MongoDBCollection.ThreatDetection.AGGREGATE_SAMPLE_MALICIOUS_REQUESTS: + List> bulkUpdates = new ArrayList<>(); + List events = + mapper.readValue( + payload, new TypeReference>() {}); + events.forEach( + event -> { + bulkUpdates.add(new InsertOneModel<>(event)); + }); + + this.mClient + .getDatabase(accountId + "") + .getCollection(eventType, AggregateSampleMaliciousEventModel.class) + .bulkWrite(bulkUpdates, new BulkWriteOptions().ordered(false)); + break; + + case MongoDBCollection.ThreatDetection.MALICIOUS_EVENTS: + MaliciousEventModel event = + mapper.readValue(payload, new TypeReference() {}); + this.mClient + .getDatabase(accountId + "") + .getCollection(eventType, MaliciousEventModel.class) + .insertOne(event); + break; + default: + throw new IllegalArgumentException("Invalid event type"); + } + } +} diff --git a/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/utils/KafkaUtils.java b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/utils/KafkaUtils.java new file mode 100644 index 0000000000..afaefdd802 --- /dev/null +++ b/apps/threat-detection-backend/src/main/java/com/akto/threat/backend/utils/KafkaUtils.java @@ -0,0 +1,19 @@ +package com.akto.threat.backend.utils; + +import com.google.gson.Gson; +import com.mongodb.BasicDBObject; + +public class KafkaUtils { + + private static final Gson gson = new Gson(); + + public static String generateMsg(Object writes, String eventType, String accountId) { + BasicDBObject obj = new BasicDBObject(); + obj.put("eventType", eventType); + String payloadStr = gson.toJson(writes); + obj.put("payload", payloadStr); + obj.put("accountId", accountId); + + return obj.toString(); + } +} diff --git a/libs/utils/pom.xml b/libs/utils/pom.xml index 38084f0cbd..ef81986018 100644 --- a/libs/utils/pom.xml +++ b/libs/utils/pom.xml @@ -108,7 +108,7 @@ com.graphql-java graphql-java - 20.9 + 20.0 diff --git a/libs/utils/src/main/java/com/akto/kafka/Kafka.java b/libs/utils/src/main/java/com/akto/kafka/Kafka.java index 95a143987c..87e178434e 100644 --- a/libs/utils/src/main/java/com/akto/kafka/Kafka.java +++ b/libs/utils/src/main/java/com/akto/kafka/Kafka.java @@ -1,7 +1,6 @@ package com.akto.kafka; import org.apache.kafka.clients.producer.*; -import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -9,68 +8,88 @@ import java.util.Properties; public class Kafka { - private static final Logger logger = LoggerFactory.getLogger(Kafka.class); - private KafkaProducer producer; - public boolean producerReady; + private static final Logger logger = LoggerFactory.getLogger(Kafka.class); + private KafkaProducer producer; + public boolean producerReady; - public Kafka(String brokerIP, int lingerMS, int batchSize) { - producerReady = false; - try { - setProducer(brokerIP, lingerMS, batchSize); - } catch (Exception e) { - e.printStackTrace(); - } + public Kafka(KafkaConfig kafkaConfig) { + this( + kafkaConfig.getBootstrapServers(), + kafkaConfig.getProducerConfig().getLingerMs(), + kafkaConfig.getProducerConfig().getBatchSize(), + kafkaConfig.getKeySerializer(), + kafkaConfig.getValueSerializer()); + } + + public Kafka( + String brokerIP, + int lingerMS, + int batchSize, + Serializer keySerializer, + Serializer valueSerializer) { + producerReady = false; + try { + setProducer(brokerIP, lingerMS, batchSize, keySerializer, valueSerializer); + } catch (Exception e) { + e.printStackTrace(); } + } - public void send(String message,String topic) { - if (!this.producerReady) return; + public Kafka(String brokerIP, int lingerMS, int batchSize) { + this(brokerIP, 0, 0, Serializer.STRING, Serializer.STRING); + } - ProducerRecord record = new ProducerRecord<>(topic,message); - producer.send(record, new DemoProducerCallback()); - } + public void send(String message, String topic) { + if (!this.producerReady) return; - public void close() { - this.producerReady = false; - producer.close(Duration.ofMillis(0)); // close immediately - } + ProducerRecord record = new ProducerRecord<>(topic, message); + producer.send(record, new DemoProducerCallback()); + } - private void setProducer(String brokerIP, int lingerMS, int batchSize) { - if (producer != null) close(); // close existing producer connection + public void close() { + this.producerReady = false; + producer.close(Duration.ofMillis(0)); // close immediately + } - int requestTimeoutMs = 5000; - Properties kafkaProps = new Properties(); - kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerIP); - kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); - kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMS); - kafkaProps.put(ProducerConfig.RETRIES_CONFIG, 0); - kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); - kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, lingerMS + requestTimeoutMs); - producer = new KafkaProducer(kafkaProps); + private void setProducer( + String brokerIP, + int lingerMS, + int batchSize, + Serializer keySerializer, + Serializer valueSerializer) { + if (producer != null) close(); // close existing producer connection - // test if connection successful by sending a test message in a blocking way - // calling .get() blocks the thread till we receive a message - // if any error then close the connection - ProducerRecord record = new ProducerRecord<>("akto.misc", "ping"); - try { - producer.send(record).get(); - producerReady = true; - } catch (Exception ignored) { - close(); - } - } + int requestTimeoutMs = 5000; + Properties kafkaProps = new Properties(); + kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerIP); + kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getSerializer()); + kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getSerializer()); + kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); + kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMS); + kafkaProps.put(ProducerConfig.RETRIES_CONFIG, 0); + kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); + kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, lingerMS + requestTimeoutMs); + producer = new KafkaProducer(kafkaProps); - private class DemoProducerCallback implements Callback { - @Override - public void onCompletion(RecordMetadata recordMetadata, Exception e) { - if (e != null) { - Kafka.this.close(); - logger.error("onCompletion error: " + e.getMessage()); - } - } + // test if connection successful by sending a test message in a blocking way + // calling .get() blocks the thread till we receive a message + // if any error then close the connection + ProducerRecord record = new ProducerRecord<>("akto.misc", "ping"); + try { + producer.send(record).get(); + producerReady = true; + } catch (Exception ignored) { + close(); } + } + private class DemoProducerCallback implements Callback { + @Override + public void onCompletion(RecordMetadata recordMetadata, Exception e) { + if (e != null) { + Kafka.this.close(); + logger.error("onCompletion error: " + e.getMessage()); + } + } + } } - - diff --git a/libs/utils/src/main/java/com/akto/kafka/KafkaConfig.java b/libs/utils/src/main/java/com/akto/kafka/KafkaConfig.java new file mode 100644 index 0000000000..9e486518d1 --- /dev/null +++ b/libs/utils/src/main/java/com/akto/kafka/KafkaConfig.java @@ -0,0 +1,94 @@ +package com.akto.kafka; + +public class KafkaConfig { + private final String bootstrapServers; + private final String groupId; + private final KafkaConsumerConfig consumerConfig; + private final KafkaProducerConfig producerConfig; + private final Serializer keySerializer; + private final Serializer valueSerializer; + + public static class Builder { + private String bootstrapServers; + private String groupId; + private KafkaConsumerConfig consumerConfig; + private KafkaProducerConfig producerConfig; + private Serializer keySerializer; + private Serializer valueSerializer; + + private Builder() {} + + public Builder setBootstrapServers(String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + return this; + } + + public Builder setGroupId(String groupId) { + this.groupId = groupId; + return this; + } + + public Builder setConsumerConfig(KafkaConsumerConfig consumerConfig) { + this.consumerConfig = consumerConfig; + return this; + } + + public Builder setProducerConfig(KafkaProducerConfig producerConfig) { + this.producerConfig = producerConfig; + return this; + } + + public Builder setKeySerializer(Serializer keySerializer) { + this.keySerializer = keySerializer; + return this; + } + + public Builder setValueSerializer(Serializer valueSerializer) { + this.valueSerializer = valueSerializer; + return this; + } + + public KafkaConfig build() { + return new KafkaConfig(this); + } + } + + private KafkaConfig(Builder builder) { + this.bootstrapServers = builder.bootstrapServers; + this.groupId = builder.groupId; + this.consumerConfig = builder.consumerConfig; + this.producerConfig = builder.producerConfig; + + this.keySerializer = builder.keySerializer == null ? Serializer.STRING : builder.keySerializer; + this.valueSerializer = + builder.valueSerializer == null ? Serializer.STRING : builder.valueSerializer; + } + + public String getBootstrapServers() { + return bootstrapServers; + } + + public String getGroupId() { + return groupId; + } + + public KafkaConsumerConfig getConsumerConfig() { + return consumerConfig; + } + + public KafkaProducerConfig getProducerConfig() { + return producerConfig; + } + + public Serializer getKeySerializer() { + return keySerializer; + } + + public Serializer getValueSerializer() { + return valueSerializer; + } + + public static Builder newBuilder() { + return new Builder(); + } +} diff --git a/libs/utils/src/main/java/com/akto/kafka/KafkaConsumerConfig.java b/libs/utils/src/main/java/com/akto/kafka/KafkaConsumerConfig.java new file mode 100644 index 0000000000..6feddce794 --- /dev/null +++ b/libs/utils/src/main/java/com/akto/kafka/KafkaConsumerConfig.java @@ -0,0 +1,44 @@ +package com.akto.kafka; + +public class KafkaConsumerConfig { + private final int maxPollRecords; + private final int pollDurationMilli; + + public static class Builder { + private int maxPollRecords; + private int pollDurationMilli; + + private Builder() {} + + public Builder setMaxPollRecords(int maxPollRecords) { + this.maxPollRecords = maxPollRecords; + return this; + } + + public Builder setPollDurationMilli(int pollDurationMilli) { + this.pollDurationMilli = pollDurationMilli; + return this; + } + + public KafkaConsumerConfig build() { + return new KafkaConsumerConfig(this); + } + } + + public KafkaConsumerConfig(Builder builder) { + this.maxPollRecords = builder.maxPollRecords; + this.pollDurationMilli = builder.pollDurationMilli; + } + + public int getMaxPollRecords() { + return maxPollRecords; + } + + public int getPollDurationMilli() { + return pollDurationMilli; + } + + public static Builder newBuilder() { + return new Builder(); + } +} diff --git a/libs/utils/src/main/java/com/akto/kafka/KafkaProducerConfig.java b/libs/utils/src/main/java/com/akto/kafka/KafkaProducerConfig.java new file mode 100644 index 0000000000..ca999ee0a6 --- /dev/null +++ b/libs/utils/src/main/java/com/akto/kafka/KafkaProducerConfig.java @@ -0,0 +1,44 @@ +package com.akto.kafka; + +public class KafkaProducerConfig { + private final int lingerMs; + private final int batchSize; + + public static class Builder { + private int lingerMs; + private int batchSize; + + public Builder() {} + + public Builder setLingerMs(int lingerMs) { + this.lingerMs = lingerMs; + return this; + } + + public Builder setBatchSize(int batchSize) { + this.batchSize = batchSize; + return this; + } + + public KafkaProducerConfig build() { + return new KafkaProducerConfig(this); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + public int getLingerMs() { + return lingerMs; + } + + public int getBatchSize() { + return batchSize; + } + + public KafkaProducerConfig(Builder builder) { + this.lingerMs = builder.lingerMs; + this.batchSize = builder.batchSize; + } +} diff --git a/libs/utils/src/main/java/com/akto/kafka/Serializer.java b/libs/utils/src/main/java/com/akto/kafka/Serializer.java new file mode 100644 index 0000000000..0e917dd645 --- /dev/null +++ b/libs/utils/src/main/java/com/akto/kafka/Serializer.java @@ -0,0 +1,26 @@ +package com.akto.kafka; + +public enum Serializer { + STRING( + "org.apache.kafka.common.serialization.StringSerializer", + "org.apache.kafka.common.serialization.StringDeserializer"), + BYTE_ARRAY( + "org.apache.kafka.common.serialization.ByteArraySerializer", + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + + private final String serializer; + private final String deserializer; + + Serializer(String serializer, String deSerializer) { + this.serializer = serializer; + this.deserializer = deSerializer; + } + + public String getDeserializer() { + return deserializer; + } + + public String getSerializer() { + return serializer; + } +} diff --git a/protobuf/threat_detection/service/dashboard_service/v1/service.proto b/protobuf/threat_detection/service/dashboard_service/v1/service.proto index 983cf18be7..a3f31b1c20 100644 --- a/protobuf/threat_detection/service/dashboard_service/v1/service.proto +++ b/protobuf/threat_detection/service/dashboard_service/v1/service.proto @@ -25,14 +25,27 @@ message ListMaliciousRequestsResponse { string sub_category = 13; } repeated MaliciousEvent malicious_events = 1; - int32 total = 2; + uint64 total = 2; +} + +message TimeRangeFilter { + optional uint64 start = 1; + optional uint64 end = 2; } message ListMaliciousRequestsRequest { + message Filter { + repeated string actors = 1; + repeated string urls = 2; + repeated string ips = 3; + optional TimeRangeFilter detected_at_time_range = 4; + } + // The number of alerts to return optional uint32 skip = 1; uint32 limit = 2; map sort = 3; + Filter filter = 4; } message FetchAlertFiltersRequest {} @@ -43,9 +56,16 @@ message FetchAlertFiltersResponse { } message ListThreatActorsRequest { + message Filter { + repeated string actors = 1; + repeated string latest_ips = 2; + optional TimeRangeFilter detected_at_time_range = 3; + } + optional uint32 skip = 1; uint32 limit = 2; map sort = 3; + Filter filter = 4; } message ListThreatActorResponse { @@ -58,13 +78,20 @@ message ListThreatActorResponse { string country = 6; } repeated ThreatActor actors = 1; - int32 total = 2; + uint64 total = 2; } message ListThreatApiRequest { + message Filter { + repeated string urls = 1; + repeated string methods = 2; + optional TimeRangeFilter detected_at_time_range = 3; + } + optional uint32 skip = 1; uint32 limit = 2; map sort = 3; + Filter filter = 4; } message ListThreatApiResponse { @@ -77,7 +104,7 @@ message ListThreatApiResponse { } repeated ThreatApi apis = 1; - int32 total = 2; + uint64 total = 2; } message ThreatActorByCountryRequest {