Skip to content

Commit

Permalink
fix: update support for postgres collection names with schema identif… (
Browse files Browse the repository at this point in the history
#181)

* fix: update support for postgres collection names with schema identifiers

* build: update jackson suppression

* fix: update postgres metric to include schema

* docs: add comment for table metadta

* docs: update comments to describe default schema, remove resolved fixme
  • Loading branch information
aaron-steinfeld authored Nov 2, 2023
1 parent c87d82f commit 5822f1a
Show file tree
Hide file tree
Showing 13 changed files with 376 additions and 183 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package org.hypertrace.core.documentstore.postgres;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
Expand All @@ -23,6 +27,7 @@
import org.hypertrace.core.documentstore.Key;
import org.hypertrace.core.documentstore.Query;
import org.hypertrace.core.documentstore.SingleValueKey;
import org.hypertrace.core.documentstore.expression.impl.IdentifierExpression;
import org.hypertrace.core.documentstore.utils.Utils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -93,8 +98,8 @@ public void testInitWithDatabase() {

try {
DatabaseMetaData metaData = datastore.getPostgresClient().getMetaData();
Assertions.assertEquals(connectionUrl + database, metaData.getURL());
Assertions.assertEquals(user, metaData.getUserName());
assertEquals(connectionUrl + database, metaData.getURL());
assertEquals(user, metaData.getUserName());
} catch (SQLException e) {
System.out.println("Exception executing init test with user and password");
Assertions.fail();
Expand All @@ -108,7 +113,7 @@ public void testUpsertAndReturn() throws IOException {
Document resultDocument =
collection.upsertAndReturn(new SingleValueKey("default", "testKey"), document);

Assertions.assertEquals(document.toJson(), resultDocument.toJson());
assertEquals(document.toJson(), resultDocument.toJson());
}

@Test
Expand Down Expand Up @@ -148,16 +153,12 @@ public void test_getJsonNodeAtPath() throws Exception {
JsonNode expectedRootNode2 = objectMapper.readTree(outputNode2);
PostgresCollection collection = (PostgresCollection) datastore.getCollection(COLLECTION_NAME);
try {
Assertions.assertEquals(
collection.getJsonNodeAtPath(path1, rootNode, true), expectedRootNode1);
Assertions.assertEquals(
collection.getJsonNodeAtPath(path2, rootNode, true), expectedRootNode2);
Assertions.assertEquals(collection.getJsonNodeAtPath(null, rootNode, true), rootNode);
Assertions.assertEquals(
collection.getJsonNodeAtPath(path1, rootNode, false), expectedRootNode1);
Assertions.assertEquals(
collection.getJsonNodeAtPath(path2, rootNode, false), expectedRootNode2);
Assertions.assertEquals(collection.getJsonNodeAtPath(null, rootNode, false), rootNode);
assertEquals(collection.getJsonNodeAtPath(path1, rootNode, true), expectedRootNode1);
assertEquals(collection.getJsonNodeAtPath(path2, rootNode, true), expectedRootNode2);
assertEquals(collection.getJsonNodeAtPath(null, rootNode, true), rootNode);
assertEquals(collection.getJsonNodeAtPath(path1, rootNode, false), expectedRootNode1);
assertEquals(collection.getJsonNodeAtPath(path2, rootNode, false), expectedRootNode2);
assertEquals(collection.getJsonNodeAtPath(null, rootNode, false), rootNode);
} catch (Exception e) {
System.out.println("Created path is not right");
Assertions.fail();
Expand Down Expand Up @@ -187,35 +188,59 @@ public void testBulkUpsertAndReturn() throws IOException {
while (iterator.hasNext()) {
documents.add(iterator.next());
}
Assertions.assertEquals(6, documents.size());
assertEquals(6, documents.size());

{
// empty query returns all the documents
Query query = new Query();
Assertions.assertEquals(6, collection.total(query));
assertEquals(6, collection.total(query));
}

{
Query query = new Query();
query.setFilter(Filter.eq("name", "Bob"));
Assertions.assertEquals(2, collection.total(query));
assertEquals(2, collection.total(query));
}

{
// limit should not affect the total
Query query = new Query();
query.setFilter(Filter.eq("name", "Bob"));
query.setLimit(1);
Assertions.assertEquals(2, collection.total(query));
assertEquals(2, collection.total(query));
}
}

@Test
public void testDrop() {
Collection collection = datastore.getCollection(COLLECTION_NAME);

Assertions.assertTrue(datastore.listCollections().contains("postgres." + COLLECTION_NAME));
assertTrue(datastore.listCollections().contains("postgres." + COLLECTION_NAME));
collection.drop();
Assertions.assertFalse(datastore.listCollections().contains("postgres." + COLLECTION_NAME));
}

@Test
void testBasicSchemaTableOps() throws IOException {
Collection collection = datastore.getCollection("schema.myTest");
assertTrue(datastore.listCollections().contains("postgres.schema.myTest"));
Document document = Utils.createDocument("foo1", "bar1");
Document resultDocument =
collection.createOrReplaceAndReturn(new SingleValueKey("default", "testKey"), document);
assertEquals(
"bar1", new ObjectMapper().readTree(resultDocument.toJson()).get("foo1").textValue());

// Going to the same table without the schema should not get any data
assertFalse(
datastore
.getCollection(COLLECTION_NAME)
.aggregate(
org.hypertrace.core.documentstore.query.Query.builder()
.addSelection(IdentifierExpression.of("foo1"))
.build())
.hasNext());

assertTrue(datastore.deleteCollection("schema.myTest"));
assertFalse(datastore.listCollections().contains("postgres.schema.myTest"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class PostgresDocStoreMetricProvider extends BaseDocStoreMetricProviderIm
"num.active.postgres.connections";
private static final String APP_NAME_LABEL = "app_name";
private static final String APPLICATION_COLUMN_NAME = "application_name";
private static final String PG_STAT_ACTIVITY_TABLE = "pg_stat_activity";
private static final String PG_STAT_ACTIVITY_TABLE = "pg_catalog.pg_stat_activity";

private final String applicationNameInCurrentConnection;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,21 @@ public class PostgresCollection implements Collection {
CloseableIterator.emptyIterator();

private final PostgresClient client;
private final String collectionName;
private final PostgresTableIdentifier tableIdentifier;
private final PostgresSubDocumentUpdater subDocUpdater;
private final PostgresQueryExecutor queryExecutor;
private final UpdateValidator updateValidator;

public PostgresCollection(final PostgresClient client, final String collectionName) {
this(client, PostgresTableIdentifier.parse(collectionName));
}

PostgresCollection(final PostgresClient client, final PostgresTableIdentifier tableIdentifier) {
this.client = client;
this.collectionName = collectionName;
this.subDocUpdater = new PostgresSubDocumentUpdater(new PostgresQueryBuilder(collectionName));
this.queryExecutor = new PostgresQueryExecutor(collectionName);
this.tableIdentifier = tableIdentifier;
this.subDocUpdater =
new PostgresSubDocumentUpdater(new PostgresQueryBuilder(this.tableIdentifier));
this.queryExecutor = new PostgresQueryExecutor(this.tableIdentifier);
this.updateValidator = new CommonUpdateValidator();
}

Expand Down Expand Up @@ -369,7 +374,7 @@ private BulkUpdateSubDocsInternalResult bulkUpdateSubDocsInternal(
String updateSubDocSQL =
String.format(
"UPDATE %s SET %s=jsonb_set(%s, ?::text[], ?::jsonb) WHERE %s = ?",
collectionName, DOCUMENT, DOCUMENT, ID);
tableIdentifier, DOCUMENT, DOCUMENT, ID);
try {
PreparedStatement preparedStatement =
client.getConnection().prepareStatement(updateSubDocSQL);
Expand Down Expand Up @@ -429,7 +434,7 @@ public CloseableIterator<Document> search(Query query) {
private CloseableIterator<Document> search(Query query, boolean removeDocumentId) {
String selection = PostgresQueryParser.parseSelections(query.getSelections());
StringBuilder sqlBuilder =
new StringBuilder(String.format("SELECT %s FROM ", selection)).append(collectionName);
new StringBuilder(String.format("SELECT %s FROM ", selection)).append(tableIdentifier);

String filters = null;
Params.Builder paramsBuilder = Params.newBuilder();
Expand Down Expand Up @@ -501,7 +506,7 @@ public Optional<Document> update(
try (final Connection connection = client.getPooledConnection()) {
org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser parser =
new org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser(
collectionName, query);
tableIdentifier, query);
final String selectQuery = parser.buildSelectQueryForUpdate();

try (final PreparedStatement preparedStatement =
Expand Down Expand Up @@ -604,7 +609,7 @@ public CloseableIterator<Document> bulkUpdate(
public long count(org.hypertrace.core.documentstore.query.Query query) {
org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser queryParser =
new org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser(
collectionName, query);
tableIdentifier, query);
String subQuery = queryParser.parse();
String sqlQuery = String.format("SELECT COUNT(*) FROM (%s) p(count)", subQuery);
try {
Expand All @@ -623,7 +628,7 @@ public long count(org.hypertrace.core.documentstore.query.Query query) {

@Override
public boolean delete(Key key) {
String deleteSQL = String.format("DELETE FROM %s WHERE %s = ?", collectionName, ID);
String deleteSQL = String.format("DELETE FROM %s WHERE %s = ?", tableIdentifier, ID);
try (PreparedStatement preparedStatement = client.getConnection().prepareStatement(deleteSQL)) {
preparedStatement.setString(1, key.toString());
preparedStatement.executeUpdate();
Expand All @@ -639,10 +644,10 @@ public boolean delete(Filter filter) {
if (filter == null) {
throw new UnsupportedOperationException("Filter must be provided");
}
StringBuilder sqlBuilder = new StringBuilder("DELETE FROM ").append(collectionName);
StringBuilder sqlBuilder = new StringBuilder("DELETE FROM ").append(tableIdentifier);
Params.Builder paramsBuilder = Params.newBuilder();
String filters = PostgresQueryParser.parseFilter(filter, paramsBuilder);
LOGGER.debug("Sending query to PostgresSQL: {} : {}", collectionName, filters);
LOGGER.debug("Sending query to PostgresSQL: {} : {}", tableIdentifier, filters);
if (filters == null) {
throw new UnsupportedOperationException("Parsed filter is invalid");
}
Expand All @@ -667,7 +672,7 @@ public BulkDeleteResult delete(Set<Key> keys) {
String deleteSQL =
new StringBuilder("DELETE FROM")
.append(space)
.append(collectionName)
.append(tableIdentifier)
.append(" WHERE ")
.append(ID)
.append(" IN ")
Expand All @@ -688,7 +693,7 @@ public BulkDeleteResult delete(Set<Key> keys) {
public boolean deleteSubDoc(Key key, String subDocPath) {
String deleteSubDocSQL =
String.format(
"UPDATE %s SET %s=%s #- ?::text[] WHERE %s=?", collectionName, DOCUMENT, DOCUMENT, ID);
"UPDATE %s SET %s=%s #- ?::text[] WHERE %s=?", tableIdentifier, DOCUMENT, DOCUMENT, ID);
String jsonSubDocPath = formatSubDocPath(subDocPath);

try (PreparedStatement preparedStatement =
Expand All @@ -711,7 +716,7 @@ public boolean deleteSubDoc(Key key, String subDocPath) {

@Override
public boolean deleteAll() {
String deleteSQL = String.format("DELETE FROM %s", collectionName);
String deleteSQL = String.format("DELETE FROM %s", tableIdentifier);
try (PreparedStatement preparedStatement = client.getConnection().prepareStatement(deleteSQL)) {
preparedStatement.executeUpdate();
return true;
Expand All @@ -723,7 +728,7 @@ public boolean deleteAll() {

@Override
public long count() {
String countSQL = String.format("SELECT COUNT(*) FROM %s", collectionName);
String countSQL = String.format("SELECT COUNT(*) FROM %s", tableIdentifier);
long count = -1;
try (PreparedStatement preparedStatement = client.getConnection().prepareStatement(countSQL)) {
ResultSet resultSet = preparedStatement.executeQuery();
Expand All @@ -739,7 +744,7 @@ public long count() {
@Override
public long total(Query query) {
StringBuilder totalSQLBuilder =
new StringBuilder("SELECT COUNT(*) FROM ").append(collectionName);
new StringBuilder("SELECT COUNT(*) FROM ").append(tableIdentifier);
Params.Builder paramsBuilder = Params.newBuilder();

long count = -1;
Expand Down Expand Up @@ -806,7 +811,7 @@ public CloseableIterator<Document> bulkUpsertAndReturnOlderDocuments(Map<Key, Do
query =
new StringBuilder("SELECT * FROM")
.append(space)
.append(collectionName)
.append(tableIdentifier)
.append(" WHERE ")
.append(ID)
.append(" IN ")
Expand Down Expand Up @@ -836,12 +841,12 @@ public CloseableIterator<Document> bulkUpsertAndReturnOlderDocuments(Map<Key, Do

@Override
public void drop() {
String dropTableSQL = String.format("DROP TABLE IF EXISTS %s", collectionName);
String dropTableSQL = String.format("DROP TABLE IF EXISTS %s", tableIdentifier);
try (PreparedStatement preparedStatement =
client.getConnection().prepareStatement(dropTableSQL)) {
preparedStatement.executeUpdate();
} catch (SQLException e) {
LOGGER.error("Exception deleting table name: {}", collectionName);
LOGGER.error("Exception deleting table name: {}", tableIdentifier);
}
}

Expand Down Expand Up @@ -1134,7 +1139,7 @@ private long updateLastModifiedTime(Set<Key> keys) {
String updateSubDocSQL =
String.format(
"UPDATE %s SET %s=jsonb_set(%s, '{lastUpdatedTime}'::text[], ?::jsonb) WHERE %s=?",
collectionName, DOCUMENT, DOCUMENT, ID);
tableIdentifier, DOCUMENT, DOCUMENT, ID);
long now = System.currentTimeMillis();
try {
PreparedStatement preparedStatement =
Expand Down Expand Up @@ -1173,20 +1178,19 @@ public Optional<Document> getFirstDocument(final CloseableIterator<Document> ite

private String getInsertSQL() {
return String.format(
"INSERT INTO %s (%s,%s) VALUES( ?, ? :: jsonb)",
collectionName, ID, DOCUMENT, ID, DOCUMENT);
"INSERT INTO %s (%s,%s) VALUES( ?, ? :: jsonb)", tableIdentifier, ID, DOCUMENT);
}

private String getUpdateSQL() {
return String.format(
"UPDATE %s SET (%s, %s) = ( ?, ? :: jsonb) ", collectionName, ID, DOCUMENT, ID, DOCUMENT);
"UPDATE %s SET (%s, %s) = ( ?, ? :: jsonb) ", tableIdentifier, ID, DOCUMENT);
}

private String getUpsertSQL() {
return String.format(
"INSERT INTO %s (%s,%s) VALUES( ?, ? :: jsonb) ON CONFLICT(%s) DO UPDATE SET %s = "
+ "?::jsonb ",
collectionName, ID, DOCUMENT, ID, DOCUMENT);
tableIdentifier, ID, DOCUMENT, ID, DOCUMENT);
}

private String getCreateOrReplaceSQL() {
Expand All @@ -1199,14 +1203,14 @@ private String getCreateOrReplaceSQL() {
+ "%s = jsonb_set(?::jsonb, '{%s}', %s.%s->'%s'), "
+ "%s = NOW() "
+ "RETURNING %s = NOW() AS %s",
collectionName,
tableIdentifier,
ID,
DOCUMENT,
CREATED_AT,
ID,
DOCUMENT,
DocStoreConstants.CREATED_TIME,
collectionName,
tableIdentifier,
DOCUMENT,
DocStoreConstants.CREATED_TIME,
UPDATED_AT,
Expand All @@ -1224,14 +1228,14 @@ private String getCreateOrReplaceAndReturnSQL() {
+ "%s = jsonb_set(?::jsonb, '{%s}', %s.%s->'%s'), "
+ "%s = NOW() "
+ "RETURNING %s::text AS %s, %s AS %s, %s AS %s",
collectionName,
tableIdentifier,
ID,
DOCUMENT,
CREATED_AT,
ID,
DOCUMENT,
DocStoreConstants.CREATED_TIME,
collectionName,
tableIdentifier,
DOCUMENT,
DocStoreConstants.CREATED_TIME,
UPDATED_AT,
Expand All @@ -1246,7 +1250,7 @@ private String getCreateOrReplaceAndReturnSQL() {
private String getSubDocUpdateQuery() {
return String.format(
"UPDATE %s SET %s=jsonb_set(%s, ?::text[], ?::jsonb) WHERE %s=?",
collectionName, DOCUMENT, DOCUMENT, ID);
tableIdentifier, DOCUMENT, DOCUMENT, ID);
}

static class PostgresResultIterator implements CloseableIterator<Document> {
Expand Down
Loading

0 comments on commit 5822f1a

Please sign in to comment.