From a98ebf06123ce699ea390a60ff71f0c27469fcfa Mon Sep 17 00:00:00 2001 From: Jaap Reitsma Date: Wed, 7 Oct 2020 10:47:35 +0200 Subject: [PATCH 1/4] Use a recvTimeTs value in seconds for CKAN (4 bytes max). --- .../iot/cygnus/aggregation/NGSIGenericAggregator.java | 8 ++++++++ .../cygnus/aggregation/NGSIGenericRowAggregator.java | 4 ++-- .../com/telefonica/iot/cygnus/sinks/NGSICKANSink.java | 10 +++++++++- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericAggregator.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericAggregator.java index f7efe515e..195e164cb 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericAggregator.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericAggregator.java @@ -694,4 +694,12 @@ public void setEnableNameMappings(boolean enableNameMappings) { */ public abstract void initialize(NGSIEvent cygnusEvent); + /** + * Returns the timestamp to use for the RECV_TIME_TS field. + * @param cygnusEvent the event to produce the timestamp for. + * @return the timestamp value to use. Default milliseconds. + */ + public long getRecvTimeTsValue(NGSIEvent cygnusEvent) { + return cygnusEvent.getRecvTimeTs(); + } } diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java index da42a4c40..a53057f65 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java @@ -88,7 +88,7 @@ public void aggregate(NGSIEvent event) { LOGGER.debug("[" + getName() + "] Processing context attribute (name=" + attrName + ", type=" + attrType + ")"); // aggregate the attribute information - aggregation.get(NGSIConstants.RECV_TIME_TS).add(new JsonPrimitive(Long.toString(recvTimeTs))); + aggregation.get(NGSIConstants.RECV_TIME_TS).add(new JsonPrimitive(Long.toString(getRecvTimeTsValue(event)))); aggregation.get(NGSIConstants.RECV_TIME).add(new JsonPrimitive(recvTime)); aggregation.get(NGSIConstants.FIWARE_SERVICE_PATH).add(new JsonPrimitive(getServicePathForData())); aggregation.get(NGSIConstants.ENTITY_ID).add(new JsonPrimitive(entityId)); @@ -102,7 +102,7 @@ public void aggregate(NGSIEvent event) { } // aggregate private String getName() { - return "NGSIUtils.GenericColumnAggregator"; + return "NGSIUtils.GenericRowAggregator"; } } diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSICKANSink.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSICKANSink.java index 12b5bba53..4de0ecb78 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSICKANSink.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSICKANSink.java @@ -342,9 +342,17 @@ public void expirateRecords(long expirationTime) throws CygnusExpiratingError { } // try catch } // truncateByTime + private class NGSICKANRowAggregator extends NGSIGenericRowAggregator { + @Override + public long getRecvTimeTsValue(NGSIEvent cygnusEvent) { + return cygnusEvent.getRecvTimeTs() / 1000; + } + + } + protected NGSIGenericAggregator getAggregator(boolean rowAttrPersistence) { if (rowAttrPersistence) { - return new NGSIGenericRowAggregator(); + return new NGSICKANRowAggregator(); } else { return new NGSIGenericColumnAggregator(); } // if else From 4380bb658715a54918d4b8943d75c71c48e9944b Mon Sep 17 00:00:00 2001 From: Jaap Reitsma Date: Wed, 7 Oct 2020 11:03:00 +0200 Subject: [PATCH 2/4] Added dependency maven plugin for easier insight in dependencies with copy-dependencies. --- cygnus-common/pom.xml | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/cygnus-common/pom.xml b/cygnus-common/pom.xml index 58584212a..fc1a52ec1 100644 --- a/cygnus-common/pom.xml +++ b/cygnus-common/pom.xml @@ -230,6 +230,26 @@ + + org.apache.maven.plugins + maven-dependency-plugin + 3.1.2 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/dependencies + false + true + false + + + + From 427f678689cc3ca81a03410ec494d77242d7ec4c Mon Sep 17 00:00:00 2001 From: Jaap Reitsma Date: Wed, 7 Oct 2020 11:15:03 +0200 Subject: [PATCH 3/4] Fixed invalid coercion of integer 0 to nested json at CKAN (2.8.5) by changing it to string 0. --- .../cygnus/aggregation/NGSIGenericAggregator.java | 10 ++++++++++ .../telefonica/iot/cygnus/sinks/NGSICKANSink.java | 14 ++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericAggregator.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericAggregator.java index 195e164cb..0551fc6cd 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericAggregator.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericAggregator.java @@ -702,4 +702,14 @@ public void setEnableNameMappings(boolean enableNameMappings) { public long getRecvTimeTsValue(NGSIEvent cygnusEvent) { return cygnusEvent.getRecvTimeTs(); } + + /** + * Returns a possible adapted value for attribute value. + * @param attrValue The input + * @return the adapted output. Default unchanged, same object. + */ + public JsonElement adaptAttrValue(JsonElement attrValue) { + // Default: No adaptation. + return attrValue; + } } diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSICKANSink.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSICKANSink.java index 4de0ecb78..9ae53b29c 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSICKANSink.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSICKANSink.java @@ -20,6 +20,7 @@ import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; import com.telefonica.iot.cygnus.aggregation.NGSIGenericAggregator; import com.telefonica.iot.cygnus.aggregation.NGSIGenericColumnAggregator; import com.telefonica.iot.cygnus.aggregation.NGSIGenericRowAggregator; @@ -347,6 +348,19 @@ private class NGSICKANRowAggregator extends NGSIGenericRowAggregator { public long getRecvTimeTsValue(NGSIEvent cygnusEvent) { return cygnusEvent.getRecvTimeTs() / 1000; } + + @Override + public JsonElement adaptAttrValue(JsonElement attrValue) { + JsonElement adaptedValue = attrValue; + // JR 2020-10-7: Attempt to fix a coerce error at CKAN, not coercing 0 to the nested data type. + if (attrValue.isJsonPrimitive()) { + JsonPrimitive jp = attrValue.getAsJsonPrimitive(); + if (jp.isNumber() && "0".equals(jp.getAsString())) { + adaptedValue = new JsonPrimitive("0"); + } + } + return adaptedValue; + } } From 6151ad42483d3ac0b72bc90e639012ad73c17fac Mon Sep 17 00:00:00 2001 From: Jaap Reitsma Date: Wed, 7 Oct 2020 11:42:06 +0200 Subject: [PATCH 4/4] ... and also call the adaptation method. --- .../iot/cygnus/aggregation/NGSIGenericRowAggregator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java index a53057f65..d1270d9cd 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/aggregation/NGSIGenericRowAggregator.java @@ -95,7 +95,7 @@ public void aggregate(NGSIEvent event) { aggregation.get(NGSIConstants.ENTITY_TYPE).add(new JsonPrimitive(entityType)); aggregation.get(NGSIConstants.ATTR_NAME).add(new JsonPrimitive(attrName)); aggregation.get(NGSIConstants.ATTR_TYPE).add(new JsonPrimitive(attrType)); - aggregation.get(NGSIConstants.ATTR_VALUE).add(attrValue); + aggregation.get(NGSIConstants.ATTR_VALUE).add(adaptAttrValue(attrValue)); aggregation.get(NGSIConstants.ATTR_MD).add(jsonAttrMetadata); } // for setAggregation(aggregation);