From 8a6e8d8b525fe737e5cda6bcc31dc88f261caf64 Mon Sep 17 00:00:00 2001 From: luocooong Date: Wed, 13 Jul 2022 20:54:51 +0800 Subject: [PATCH 1/2] DRILL-8259: Supports advanced HBase persistence storage options --- ...StorageTest.java => HBaseStorageTest.java} | 2 +- .../exec/store/hbase/DrillHBaseConstants.java | 5 + .../hbase/config/HBasePersistentStore.java | 20 +- .../config/HBasePersistentStoreProvider.java | 203 ++++++++++++++---- .../drill/hbase/HBaseRecordReaderTest.java | 4 +- .../apache/drill/hbase/HBaseTestsSuite.java | 2 +- .../drill/hbase/TestHBaseCFAsJSONString.java | 4 +- .../hbase/TestHBaseConnectionManager.java | 4 +- .../drill/hbase/TestHBaseFilterPushDown.java | 4 +- .../drill/hbase/TestHBaseProjectPushDown.java | 4 +- .../apache/drill/hbase/TestHBaseQueries.java | 4 +- .../drill/hbase/TestHBaseRegexParser.java | 4 +- .../hbase/TestHBaseRegionScanAssignments.java | 4 +- .../drill/hbase/TestHBaseTableProvider.java | 148 +++++++++---- .../TestOrderedBytesConvertFunctions.java | 4 +- 15 files changed, 307 insertions(+), 109 deletions(-) rename common/src/test/java/org/apache/drill/categories/{HbaseStorageTest.java => HBaseStorageTest.java} (96%) diff --git a/common/src/test/java/org/apache/drill/categories/HbaseStorageTest.java b/common/src/test/java/org/apache/drill/categories/HBaseStorageTest.java similarity index 96% rename from common/src/test/java/org/apache/drill/categories/HbaseStorageTest.java rename to common/src/test/java/org/apache/drill/categories/HBaseStorageTest.java index 5dfa3be925e..e116f8afce5 100644 --- a/common/src/test/java/org/apache/drill/categories/HbaseStorageTest.java +++ b/common/src/test/java/org/apache/drill/categories/HBaseStorageTest.java @@ -20,5 +20,5 @@ /** * This is a category used to mark unit tests that test the HBase storage plugin. */ -public interface HbaseStorageTest { +public interface HBaseStorageTest { } diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java index 12384079f58..d6d01c0a407 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java @@ -23,6 +23,7 @@ import org.apache.drill.common.types.Types; public interface DrillHBaseConstants { + String ROW_KEY = "row_key"; SchemaPath ROW_KEY_PATH = SchemaPath.getSimplePath(ROW_KEY); @@ -38,4 +39,8 @@ public interface DrillHBaseConstants { String SYS_STORE_PROVIDER_HBASE_TABLE = "drill.exec.sys.store.provider.hbase.table"; String SYS_STORE_PROVIDER_HBASE_CONFIG = "drill.exec.sys.store.provider.hbase.config"; + + String SYS_STORE_PROVIDER_HBASE_TABLE_CONFIG = "drill.exec.sys.store.provider.hbase.table_config"; + + String SYS_STORE_PROVIDER_HBASE_COLUMN_CONFIG = "drill.exec.sys.store.provider.hbase.column_config"; } diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java index 36680f9bbfd..f579c6e2022 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java @@ -17,8 +17,8 @@ */ package org.apache.drill.exec.store.hbase.config; -import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.FAMILY; -import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.QUALIFIER; +import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.FAMILY_NAME; +import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.QUALIFIER_NAME; import java.io.IOException; import java.util.Iterator; @@ -70,7 +70,7 @@ public PersistentStoreMode getMode() { public boolean contains(String key) { try { Get get = new Get(row(key)); - get.addColumn(FAMILY, QUALIFIER); + get.addColumn(FAMILY_NAME, QUALIFIER_NAME); return hbaseTable.exists(get); } catch (IOException e) { throw UserException @@ -82,13 +82,13 @@ public boolean contains(String key) { @Override public V get(String key) { - return get(key, FAMILY); + return get(key, FAMILY_NAME); } protected synchronized V get(String key, byte[] family) { try { Get get = new Get(row(key)); - get.addColumn(family, QUALIFIER); + get.addColumn(family, QUALIFIER_NAME); Result r = hbaseTable.get(get); if(r.isEmpty()){ return null; @@ -103,13 +103,13 @@ protected synchronized V get(String key, byte[] family) { @Override public void put(String key, V value) { - put(key, FAMILY, value); + put(key, FAMILY_NAME, value); } protected synchronized void put(String key, byte[] family, V value) { try { Put put = new Put(row(key)); - put.addColumn(family, QUALIFIER, bytes(value)); + put.addColumn(family, QUALIFIER_NAME, bytes(value)); hbaseTable.put(put); } catch (IOException e) { throw UserException.dataReadError(e) @@ -122,8 +122,8 @@ protected synchronized void put(String key, byte[] family, V value) { public synchronized boolean putIfAbsent(String key, V value) { try { Put put = new Put(row(key)); - put.addColumn(FAMILY, QUALIFIER, bytes(value)); - return hbaseTable.checkAndPut(put.getRow(), FAMILY, QUALIFIER, null /*absent*/, put); + put.addColumn(FAMILY_NAME, QUALIFIER_NAME, bytes(value)); + return hbaseTable.checkAndPut(put.getRow(), FAMILY_NAME, QUALIFIER_NAME, null /*absent*/, put); } catch (IOException e) { throw UserException.dataReadError(e) .message("Caught error while putting row '%s' into table '%s'", key, hbaseTableName) @@ -183,7 +183,7 @@ private class Iter implements Iterator> { Iter(int take) { try { Scan scan = new Scan(tableNameStartKey, tableNameStopKey); - scan.addColumn(FAMILY, QUALIFIER); + scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); scan.setCaching(Math.min(take, 100)); scan.setBatch(take); // set batch size scanner = hbaseTable.getScanner(scan); diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java index 9893151ec56..626521a98cd 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Map; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.exception.StoreException; import org.apache.drill.exec.store.hbase.DrillHBaseConstants; @@ -27,109 +28,225 @@ import org.apache.drill.exec.store.sys.PersistentStoreConfig; import org.apache.drill.exec.store.sys.PersistentStoreRegistry; import org.apache.drill.exec.store.sys.store.provider.BasePersistentStoreProvider; +import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; +import org.apache.drill.shaded.guava.com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; - public class HBasePersistentStoreProvider extends BasePersistentStoreProvider { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePersistentStoreProvider.class); - static final byte[] FAMILY = Bytes.toBytes("s"); + public static final byte[] FAMILY_NAME = Bytes.toBytes("s"); + + public static final byte[] QUALIFIER_NAME = Bytes.toBytes("d"); - static final byte[] QUALIFIER = Bytes.toBytes("d"); + private static final String HBASE_CLIENT_ID = "drill-hbase-persistent-store-client"; private final TableName hbaseTableName; + private Table hbaseTable; + private Configuration hbaseConf; - private Connection connection; + private final Map tableConfig; - private Table hbaseTable; + private final Map columnConfig; + private Connection connection; + + @SuppressWarnings("unchecked") public HBasePersistentStoreProvider(PersistentStoreRegistry registry) { - @SuppressWarnings("unchecked") - final Map config = (Map) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_CONFIG); - this.hbaseConf = HBaseConfiguration.create(); - this.hbaseConf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, "drill-hbase-persistent-store-client"); - if (config != null) { - for (Map.Entry entry : config.entrySet()) { - this.hbaseConf.set(entry.getKey(), String.valueOf(entry.getValue())); + final Map hbaseConfig = (Map) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_CONFIG); + if (registry.getConfig().hasPath(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE_CONFIG)) { + tableConfig = (Map) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE_CONFIG); + } else { + tableConfig = Maps.newHashMap(); + } + if (registry.getConfig().hasPath(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_COLUMN_CONFIG)) { + columnConfig = (Map) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_COLUMN_CONFIG); + } else { + columnConfig = Maps.newHashMap(); + } + hbaseConf = HBaseConfiguration.create(); + hbaseConf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, HBASE_CLIENT_ID); + if (hbaseConfig != null) { + for (Map.Entry entry : hbaseConfig.entrySet()) { + hbaseConf.set(entry.getKey(), String.valueOf(entry.getValue())); } } - this.hbaseTableName = TableName.valueOf(registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE)); + logger.info("Received the hbase config is {}", hbaseConfig); + if (!tableConfig.isEmpty()) { + logger.info("Received the table config is {}", tableConfig); + } + if (!columnConfig.isEmpty()) { + logger.info("Received the column config is {}", columnConfig); + } + hbaseTableName = TableName.valueOf(registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE)); } @VisibleForTesting public HBasePersistentStoreProvider(Configuration conf, String storeTableName) { + this.tableConfig = Maps.newHashMap(); + this.columnConfig = Maps.newHashMap(); this.hbaseConf = conf; this.hbaseTableName = TableName.valueOf(storeTableName); } - + @VisibleForTesting + public HBasePersistentStoreProvider(Map tableConfig, Map columnConfig, Configuration conf, String storeTableName) { + this.tableConfig = tableConfig; + this.columnConfig = columnConfig; + this.hbaseConf = conf; + this.hbaseTableName = TableName.valueOf(storeTableName); + } @Override public PersistentStore getOrCreateStore(PersistentStoreConfig config) throws StoreException { - switch(config.getMode()){ + switch (config.getMode()) { case BLOB_PERSISTENT: case PERSISTENT: - return new HBasePersistentStore<>(config, this.hbaseTable); - + return new HBasePersistentStore<>(config, hbaseTable); default: - throw new IllegalStateException(); + throw new IllegalStateException("Unknown persistent mode"); } } - @Override public void start() throws IOException { + // Create the column family builder + ColumnFamilyDescriptorBuilder columnFamilyBuilder = ColumnFamilyDescriptorBuilder + .newBuilder(FAMILY_NAME) + .setMaxVersions(1); + // Append the config to column family + verifyAndSetColumnConfig(columnConfig, columnFamilyBuilder); + // Create the table builder + TableDescriptorBuilder tableBuilder = TableDescriptorBuilder + .newBuilder(hbaseTableName) + .setColumnFamily(columnFamilyBuilder.build()); + // Append the config to table + verifyAndSetTableConfig(tableConfig, tableBuilder); this.connection = ConnectionFactory.createConnection(hbaseConf); - try(Admin admin = connection.getAdmin()) { if (!admin.tableExists(hbaseTableName)) { - HTableDescriptor desc = new HTableDescriptor(hbaseTableName); - desc.addFamily(new HColumnDescriptor(FAMILY).setMaxVersions(1)); - admin.createTable(desc); + // Go to create the table + admin.createTable(tableBuilder.build()); + logger.info("The HBase table of persistent store created : {}", hbaseTableName); } else { - HTableDescriptor desc = admin.getTableDescriptor(hbaseTableName); - if (!desc.hasFamily(FAMILY)) { + TableDescriptor table = admin.getDescriptor(hbaseTableName); + if (!admin.isTableEnabled(hbaseTableName)) { + admin.enableTable(hbaseTableName); // In case the table is disabled + } + if (!table.hasColumnFamily(FAMILY_NAME)) { throw new DrillRuntimeException("The HBase table " + hbaseTableName + " specified as persistent store exists but does not contain column family: " - + (Bytes.toString(FAMILY))); + + (Bytes.toString(FAMILY_NAME))); } + logger.info("The HBase table of persistent store is loaded : {}", hbaseTableName); } } this.hbaseTable = connection.getTable(hbaseTableName); } - @Override - public synchronized void close() { - if (this.hbaseTable != null) { - try { - this.hbaseTable.close(); - this.hbaseTable = null; - } catch (IOException e) { - logger.warn("Caught exception while closing HBase table.", e); + /** + * Verify the configuration of HBase table and + * add them to the table builder. + * @param config Received the table config + * @param builder HBase table builder + */ + private void verifyAndSetTableConfig(Map config, TableDescriptorBuilder builder) { + for (Map.Entry entry : config.entrySet()) { + switch (entry.getKey().toUpperCase()) { + case TableDescriptorBuilder.DURABILITY: + Durability durability = Durability.valueOf(((String) entry.getValue()).toUpperCase()); + builder.setDurability(durability); + break; + case TableDescriptorBuilder.COMPACTION_ENABLED: + builder.setCompactionEnabled((Boolean) entry.getValue()); + break; + case TableDescriptorBuilder.SPLIT_ENABLED: + builder.setSplitEnabled((Boolean) entry.getValue()); + break; + case TableDescriptorBuilder.FLUSH_POLICY: + builder.setFlushPolicyClassName((String) entry.getValue()); + break; + case TableDescriptorBuilder.SPLIT_POLICY: + builder.setRegionSplitPolicyClassName((String) entry.getValue()); + break; + case TableDescriptorBuilder.MAX_FILESIZE: + builder.setMaxFileSize((Integer) entry.getValue()); + break; + case TableDescriptorBuilder.MEMSTORE_FLUSHSIZE: + builder.setMemStoreFlushSize((Integer) entry.getValue()); + break; + default: + break; } } - if (this.connection != null && !this.connection.isClosed()) { - try { - this.connection.close(); - } catch (IOException e) { - logger.warn("Caught exception while closing HBase connection.", e); + } + + /** + * Verify the configuration of HBase column family and + * add them to the column family builder. + * @param config Received the column config + * @param builder HBase column family builder + */ + private void verifyAndSetColumnConfig(Map config, ColumnFamilyDescriptorBuilder builder) { + for (Map.Entry entry : config.entrySet()) { + switch (entry.getKey().toUpperCase()) { + case ColumnFamilyDescriptorBuilder.MAX_VERSIONS: + builder.setMaxVersions((Integer) entry.getValue()); + break; + case ColumnFamilyDescriptorBuilder.TTL: + builder.setTimeToLive((Integer) entry.getValue()); + break; + case ColumnFamilyDescriptorBuilder.COMPRESSION: + Algorithm algorithm = Algorithm.valueOf(((String) entry.getValue()).toUpperCase()); + builder.setCompressionType(algorithm); + break; + case ColumnFamilyDescriptorBuilder.BLOCKCACHE: + builder.setBlockCacheEnabled((Boolean) entry.getValue()); + break; + case ColumnFamilyDescriptorBuilder.BLOCKSIZE: + builder.setBlocksize((Integer) entry.getValue()); + break; + case ColumnFamilyDescriptorBuilder.DATA_BLOCK_ENCODING: + DataBlockEncoding encoding = DataBlockEncoding.valueOf(((String) entry.getValue()).toUpperCase()); + builder.setDataBlockEncoding(encoding); + break; + case ColumnFamilyDescriptorBuilder.IN_MEMORY: + builder.setInMemory((Boolean) entry.getValue()); + break; + case ColumnFamilyDescriptorBuilder.DFS_REPLICATION: + builder.setDFSReplication(((Integer) entry.getValue()).shortValue()); + break; + default: + break; } - this.connection = null; } } + @Override + public synchronized void close() { + if (hbaseTable != null) { + AutoCloseables.closeSilently(hbaseTable); + } + if (connection != null && !connection.isClosed()) { + AutoCloseables.closeSilently(connection); + } + logger.info("The HBase connection of persistent store closed."); + } } diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java index d183a703196..0f0f405f65e 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java @@ -17,14 +17,14 @@ */ package org.apache.drill.hbase; -import org.apache.drill.categories.HbaseStorageTest; +import org.apache.drill.categories.HBaseStorageTest; import org.apache.drill.categories.SlowTest; import org.junit.Test; import org.junit.experimental.categories.Category; import static org.apache.drill.test.TestBuilder.mapOf; -@Category({SlowTest.class, HbaseStorageTest.class}) +@Category({SlowTest.class, HBaseStorageTest.class}) public class HBaseRecordReaderTest extends BaseHBaseTest { @Test diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java index 39c1e3b1f94..083b4210984 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java @@ -76,7 +76,7 @@ public class HBaseTestsSuite extends BaseTest { private static Configuration conf; - private static Connection conn; + protected static Connection conn; private static Admin admin; private static HBaseTestingUtility UTIL; diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java index 50cda8fb292..8db8f3bda82 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java @@ -20,7 +20,7 @@ import java.io.IOException; import java.util.List; -import org.apache.drill.categories.HbaseStorageTest; +import org.apache.drill.categories.HBaseStorageTest; import org.apache.drill.categories.SlowTest; import org.apache.drill.exec.client.DrillClient; import org.apache.drill.exec.rpc.user.QueryDataBatch; @@ -30,7 +30,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; -@Category({SlowTest.class, HbaseStorageTest.class}) +@Category({SlowTest.class, HBaseStorageTest.class}) public class TestHBaseCFAsJSONString extends BaseHBaseTest { private static DrillClient parent_client; diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseConnectionManager.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseConnectionManager.java index 3227277c512..22290c7a427 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseConnectionManager.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseConnectionManager.java @@ -17,12 +17,12 @@ */ package org.apache.drill.hbase; -import org.apache.drill.categories.HbaseStorageTest; +import org.apache.drill.categories.HBaseStorageTest; import org.apache.drill.categories.SlowTest; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category({SlowTest.class, HbaseStorageTest.class}) +@Category({SlowTest.class, HBaseStorageTest.class}) public class TestHBaseConnectionManager extends BaseHBaseTest { @Test diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java index edba290fc39..60f516703fb 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java @@ -17,13 +17,13 @@ */ package org.apache.drill.hbase; -import org.apache.drill.categories.HbaseStorageTest; +import org.apache.drill.categories.HBaseStorageTest; import org.apache.drill.PlanTestBase; import org.apache.drill.categories.SlowTest; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category({SlowTest.class, HbaseStorageTest.class}) +@Category({SlowTest.class, HBaseStorageTest.class}) public class TestHBaseFilterPushDown extends BaseHBaseTest { @Test diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java index 9c0c70768f7..9787aa10b13 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java @@ -17,12 +17,12 @@ */ package org.apache.drill.hbase; -import org.apache.drill.categories.HbaseStorageTest; +import org.apache.drill.categories.HBaseStorageTest; import org.apache.drill.categories.SlowTest; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category({SlowTest.class, HbaseStorageTest.class}) +@Category({SlowTest.class, HBaseStorageTest.class}) public class TestHBaseProjectPushDown extends BaseHBaseTest { @Test diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java index e8fa925ebf3..bc97d3a1445 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java @@ -21,7 +21,7 @@ import java.util.List; import org.apache.drill.PlanTestBase; -import org.apache.drill.categories.HbaseStorageTest; +import org.apache.drill.categories.HBaseStorageTest; import org.apache.drill.categories.SlowTest; import org.apache.drill.exec.rpc.user.QueryDataBatch; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -34,7 +34,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; -@Category({SlowTest.class, HbaseStorageTest.class}) +@Category({SlowTest.class, HBaseStorageTest.class}) public class TestHBaseQueries extends BaseHBaseTest { @Test diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegexParser.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegexParser.java index 1c018298497..78aa2d2cc7a 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegexParser.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegexParser.java @@ -24,14 +24,14 @@ import java.util.regex.Pattern; -import org.apache.drill.categories.HbaseStorageTest; +import org.apache.drill.categories.HBaseStorageTest; import org.apache.drill.categories.SlowTest; import org.apache.drill.exec.store.hbase.HBaseRegexParser; import org.apache.drill.test.DrillTest; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category({SlowTest.class, HbaseStorageTest.class}) +@Category({SlowTest.class, HBaseStorageTest.class}) public class TestHBaseRegexParser extends DrillTest { @Test diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java index c2205422de7..f2898b6e513 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.NavigableMap; -import org.apache.drill.categories.HbaseStorageTest; +import org.apache.drill.categories.HBaseStorageTest; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.store.hbase.HBaseGroupScan; import org.apache.drill.exec.store.hbase.HBaseScanSpec; @@ -42,7 +42,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Maps; import org.junit.experimental.categories.Category; -@Category({SlowTest.class, HbaseStorageTest.class}) +@Category({SlowTest.class, HBaseStorageTest.class}) public class TestHBaseRegionScanAssignments extends BaseHBaseTest { static final String HOST_A = "A"; static final String HOST_B = "B"; diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java index 592479ef1d9..1f32276d056 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java @@ -21,65 +21,141 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; -import org.apache.drill.categories.HbaseStorageTest; +import java.util.HashMap; +import java.util.Map; + +import org.apache.drill.categories.HBaseStorageTest; +import org.apache.drill.categories.SlowTest; import org.apache.drill.common.config.LogicalPlanPersistence; import org.apache.drill.exec.exception.StoreException; import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory; import org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider; import org.apache.drill.exec.store.sys.PersistentStore; import org.apache.drill.exec.store.sys.PersistentStoreConfig; -import org.apache.drill.categories.SlowTest; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category({SlowTest.class, HbaseStorageTest.class}) +@Category({SlowTest.class, HBaseStorageTest.class}) public class TestHBaseTableProvider extends BaseHBaseTest { private static HBasePersistentStoreProvider provider; + private static final String STORE_TABLENAME = "drill_store"; + + private static final int MAX_FILESIZE = 1073741824; + private static final int MEMSTORE_FLUSHSIZE = 536870912; + private static final int MAX_VERSIONS = 1; + private static final int TTL = 3600; + private static final int BLOCKSIZE = 102400; + private static final int DFS_REPLICATION = 1; + + private static final Map tableConfig; + private static final Map columnConfig; + + static { + // Table level + tableConfig = new HashMap<>(); + tableConfig.put(TableDescriptorBuilder.DURABILITY, Durability.ASYNC_WAL.name()); + tableConfig.put(TableDescriptorBuilder.COMPACTION_ENABLED, false); + tableConfig.put(TableDescriptorBuilder.SPLIT_ENABLED, false); + tableConfig.put(TableDescriptorBuilder.MAX_FILESIZE, MAX_FILESIZE); // 1GB + tableConfig.put(TableDescriptorBuilder.MEMSTORE_FLUSHSIZE, MEMSTORE_FLUSHSIZE); // 512 MB + // Column Family level + columnConfig = new HashMap<>(); + columnConfig.put(ColumnFamilyDescriptorBuilder.MAX_VERSIONS, MAX_VERSIONS); + columnConfig.put(ColumnFamilyDescriptorBuilder.TTL, TTL); // 1 HOUR + columnConfig.put(ColumnFamilyDescriptorBuilder.COMPRESSION, Compression.Algorithm.NONE.name()); + columnConfig.put(ColumnFamilyDescriptorBuilder.BLOCKCACHE, false); + columnConfig.put(ColumnFamilyDescriptorBuilder.BLOCKSIZE, BLOCKSIZE); + columnConfig.put(ColumnFamilyDescriptorBuilder.DATA_BLOCK_ENCODING, DataBlockEncoding.FAST_DIFF.name()); + columnConfig.put(ColumnFamilyDescriptorBuilder.IN_MEMORY, true); + columnConfig.put(ColumnFamilyDescriptorBuilder.DFS_REPLICATION, DFS_REPLICATION); + } + @BeforeClass // mask HBase cluster start function public static void setUpBeforeTestHBaseTableProvider() throws Exception { - provider = new HBasePersistentStoreProvider(storagePluginConfig.getHBaseConf(), "drill_store"); + provider = new HBasePersistentStoreProvider(tableConfig, columnConfig, storagePluginConfig.getHBaseConf(), STORE_TABLENAME); provider.start(); } + @Test + public void testStoreTableAttributes() throws Exception { + TableName tableName = TableName.valueOf(STORE_TABLENAME); + try(Admin tableAdmin = HBaseTestsSuite.conn.getAdmin()) { + assertTrue("The store table not found : " + STORE_TABLENAME, tableAdmin.tableExists(tableName)); + // Table verify + TableDescriptor tableDescriptor = tableAdmin.getDescriptor(tableName); + assertTrue("The durability must be " + Durability.ASYNC_WAL, tableDescriptor.getDurability() == Durability.ASYNC_WAL); + assertTrue("The compaction must be disabled", !tableDescriptor.isCompactionEnabled()); + assertTrue("The split must be disabled", !tableDescriptor.isSplitEnabled()); + assertTrue("The max size of hfile must be " + MAX_FILESIZE, tableDescriptor.getMaxFileSize() == MAX_FILESIZE); + assertTrue("The memstore size must be " + MEMSTORE_FLUSHSIZE, tableDescriptor.getMemStoreFlushSize() == MEMSTORE_FLUSHSIZE); + // Column Family verify + assertTrue("The column family not found", tableDescriptor.hasColumnFamily(HBasePersistentStoreProvider.FAMILY_NAME)); + ColumnFamilyDescriptor columnDescriptor = tableDescriptor.getColumnFamily(HBasePersistentStoreProvider.FAMILY_NAME); + assertTrue("The max number of versions must be " + MAX_VERSIONS, columnDescriptor.getMaxVersions() == MAX_VERSIONS); + assertTrue("The time-to-live must be " + TTL, columnDescriptor.getTimeToLive() == TTL); + // TODO native snappy* library not available + assertTrue("The algorithm of compression must be " + Algorithm.NONE, columnDescriptor.getCompressionType() == Algorithm.NONE); + assertTrue("The block cache must be disabled", columnDescriptor.isBlockCacheEnabled() == false); + assertTrue("The block size must be " + BLOCKSIZE, columnDescriptor.getBlocksize() == BLOCKSIZE); + assertTrue("The encoding of data block must be " + DataBlockEncoding.FAST_DIFF, columnDescriptor.getDataBlockEncoding() == DataBlockEncoding.FAST_DIFF); + assertTrue("The in-memory must be enabled", columnDescriptor.isInMemory()); + assertTrue("The replication of dfs must be " + DFS_REPLICATION, columnDescriptor.getDFSReplication() == DFS_REPLICATION); + } + } + @Test public void testTableProvider() throws StoreException { LogicalPlanPersistence lp = PhysicalPlanReaderTestFactory.defaultLogicalPlanPersistence(config); - PersistentStore hbaseStore = provider.getOrCreateStore( - PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase").build()); - hbaseStore.put("", "v0"); - hbaseStore.put("k1", "v1"); - hbaseStore.put("k2", "v2"); - hbaseStore.put("k3", "v3"); - hbaseStore.put("k4", "v4"); - hbaseStore.put("k5", "v5"); - hbaseStore.put(".test", "testValue"); - - assertEquals("v0", hbaseStore.get("")); - assertEquals("testValue", hbaseStore.get(".test")); - - assertTrue(hbaseStore.contains("")); - assertFalse(hbaseStore.contains("unknown_key")); - - assertEquals(7, Lists.newArrayList(hbaseStore.getAll()).size()); - - PersistentStore hbaseTestStore = provider.getOrCreateStore( - PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase.test").build()); - hbaseTestStore.put("", "v0"); - hbaseTestStore.put("k1", "v1"); - hbaseTestStore.put("k2", "v2"); - hbaseTestStore.put("k3", "v3"); - hbaseTestStore.put("k4", "v4"); - hbaseTestStore.put(".test", "testValue"); - - assertEquals("v0", hbaseStore.get("")); - assertEquals("testValue", hbaseStore.get(".test")); - - assertEquals(6, Lists.newArrayList(hbaseTestStore.getAll()).size()); + { + PersistentStoreConfig storeConfig = PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase").build(); + PersistentStore hbaseStore = provider.getOrCreateStore(storeConfig); + hbaseStore.put("", "v0"); + hbaseStore.put("k1", "v1"); + hbaseStore.put("k2", "v2"); + hbaseStore.put("k3", "v3"); + hbaseStore.put("k4", "v4"); + hbaseStore.put("k5", "v5"); + hbaseStore.put(".test", "testValue"); + + assertEquals("v0", hbaseStore.get("")); + assertEquals("testValue", hbaseStore.get(".test")); + + assertTrue(hbaseStore.contains("")); + assertFalse(hbaseStore.contains("unknown_key")); + + assertEquals(7, Lists.newArrayList(hbaseStore.getAll()).size()); + } + + { + PersistentStoreConfig storeConfig = PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase.test").build(); + PersistentStore hbaseStore = provider.getOrCreateStore(storeConfig); + hbaseStore.put("", "v0"); + hbaseStore.put("k1", "v1"); + hbaseStore.put("k2", "v2"); + hbaseStore.put("k3", "v3"); + hbaseStore.put("k4", "v4"); + hbaseStore.put(".test", "testValue"); + + assertEquals("v0", hbaseStore.get("")); + assertEquals("testValue", hbaseStore.get(".test")); + + assertEquals(6, Lists.newArrayList(hbaseStore.getAll()).size()); + } } @AfterClass diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestOrderedBytesConvertFunctions.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestOrderedBytesConvertFunctions.java index b64a38614e2..5fbf0bb94cf 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestOrderedBytesConvertFunctions.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestOrderedBytesConvertFunctions.java @@ -26,7 +26,7 @@ import java.util.List; import org.apache.drill.test.BaseTestQuery; -import org.apache.drill.categories.HbaseStorageTest; +import org.apache.drill.categories.HBaseStorageTest; import org.apache.drill.categories.SlowTest; import org.apache.drill.exec.proto.UserBitShared.QueryType; import org.apache.drill.exec.record.RecordBatchLoader; @@ -39,7 +39,7 @@ import org.apache.drill.shaded.guava.com.google.common.io.Resources; import org.junit.experimental.categories.Category; -@Category({SlowTest.class, HbaseStorageTest.class}) +@Category({SlowTest.class, HBaseStorageTest.class}) public class TestOrderedBytesConvertFunctions extends BaseTestQuery { private static final String CONVERSION_TEST_PHYSICAL_PLAN = "functions/conv/conversionTestWithPhysicalPlan.json"; From 61862516fb6dbe6451f9961abdf70cb55d1d184b Mon Sep 17 00:00:00 2001 From: luocooong Date: Mon, 25 Jul 2022 22:56:29 +0800 Subject: [PATCH 2/2] Add the column family and namespace options --- .../exec/store/hbase/DrillHBaseConstants.java | 4 +++ .../hbase/config/HBasePersistentStore.java | 17 +++++------ .../config/HBasePersistentStoreProvider.java | 28 +++++++++++++++---- .../drill/hbase/TestHBaseTableProvider.java | 4 +-- 4 files changed, 37 insertions(+), 16 deletions(-) diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java index d6d01c0a407..883614020b8 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java @@ -36,8 +36,12 @@ public interface DrillHBaseConstants { MajorType COLUMN_TYPE = Types.optional(MinorType.VARBINARY); + String SYS_STORE_PROVIDER_HBASE_NAMESPACE = "drill.exec.sys.store.provider.hbase.namespace"; + String SYS_STORE_PROVIDER_HBASE_TABLE = "drill.exec.sys.store.provider.hbase.table"; + String SYS_STORE_PROVIDER_HBASE_FAMILY = "drill.exec.sys.store.provider.hbase.family"; + String SYS_STORE_PROVIDER_HBASE_CONFIG = "drill.exec.sys.store.provider.hbase.config"; String SYS_STORE_PROVIDER_HBASE_TABLE_CONFIG = "drill.exec.sys.store.provider.hbase.table_config"; diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java index f579c6e2022..9cf281cd477 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.store.hbase.config; -import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.FAMILY_NAME; import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.QUALIFIER_NAME; import java.io.IOException; @@ -48,10 +47,11 @@ public class HBasePersistentStore extends BasePersistentStore { private final String hbaseTableName; private final String tableName; + private final byte[] familyName; private final byte[] tableNameStartKey; private final byte[] tableNameStopKey; - public HBasePersistentStore(PersistentStoreConfig config, Table table) { + public HBasePersistentStore(PersistentStoreConfig config, Table table, byte[] family) { this.tableName = config.getName() + '\0'; this.tableNameStartKey = Bytes.toBytes(tableName); // "tableName\x00" this.tableNameStopKey = this.tableNameStartKey.clone(); @@ -59,6 +59,7 @@ public HBasePersistentStore(PersistentStoreConfig config, Table table) { this.config = config; this.hbaseTable = table; this.hbaseTableName = table.getName().getNameAsString(); + this.familyName = family; } @Override @@ -70,7 +71,7 @@ public PersistentStoreMode getMode() { public boolean contains(String key) { try { Get get = new Get(row(key)); - get.addColumn(FAMILY_NAME, QUALIFIER_NAME); + get.addColumn(familyName, QUALIFIER_NAME); return hbaseTable.exists(get); } catch (IOException e) { throw UserException @@ -82,7 +83,7 @@ public boolean contains(String key) { @Override public V get(String key) { - return get(key, FAMILY_NAME); + return get(key, familyName); } protected synchronized V get(String key, byte[] family) { @@ -103,7 +104,7 @@ protected synchronized V get(String key, byte[] family) { @Override public void put(String key, V value) { - put(key, FAMILY_NAME, value); + put(key, familyName, value); } protected synchronized void put(String key, byte[] family, V value) { @@ -122,8 +123,8 @@ protected synchronized void put(String key, byte[] family, V value) { public synchronized boolean putIfAbsent(String key, V value) { try { Put put = new Put(row(key)); - put.addColumn(FAMILY_NAME, QUALIFIER_NAME, bytes(value)); - return hbaseTable.checkAndPut(put.getRow(), FAMILY_NAME, QUALIFIER_NAME, null /*absent*/, put); + put.addColumn(familyName, QUALIFIER_NAME, bytes(value)); + return hbaseTable.checkAndPut(put.getRow(), familyName, QUALIFIER_NAME, null /*absent*/, put); } catch (IOException e) { throw UserException.dataReadError(e) .message("Caught error while putting row '%s' into table '%s'", key, hbaseTableName) @@ -183,7 +184,7 @@ private class Iter implements Iterator> { Iter(int take) { try { Scan scan = new Scan(tableNameStartKey, tableNameStopKey); - scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); + scan.addColumn(familyName, QUALIFIER_NAME); scan.setCaching(Math.min(take, 100)); scan.setBatch(take); // set batch size scanner = hbaseTable.getScanner(scan); diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java index 626521a98cd..80a3f04c142 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java @@ -49,7 +49,7 @@ public class HBasePersistentStoreProvider extends BasePersistentStoreProvider { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePersistentStoreProvider.class); - public static final byte[] FAMILY_NAME = Bytes.toBytes("s"); + public static final byte[] DEFAULT_FAMILY_NAME = Bytes.toBytes("s"); public static final byte[] QUALIFIER_NAME = Bytes.toBytes("d"); @@ -57,6 +57,8 @@ public class HBasePersistentStoreProvider extends BasePersistentStoreProvider { private final TableName hbaseTableName; + private final byte[] family; + private Table hbaseTable; private Configuration hbaseConf; @@ -94,7 +96,19 @@ public HBasePersistentStoreProvider(PersistentStoreRegistry registry) { if (!columnConfig.isEmpty()) { logger.info("Received the column config is {}", columnConfig); } - hbaseTableName = TableName.valueOf(registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE)); + String tableName = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE); + if (registry.getConfig().hasPath(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_NAMESPACE)) { + String namespaceStr = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_NAMESPACE); + hbaseTableName = TableName.valueOf(namespaceStr.concat(":").concat(tableName)); + } else { + hbaseTableName = TableName.valueOf(tableName); + } + if (registry.getConfig().hasPath(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_FAMILY)) { + String familyStr = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_FAMILY); + family = Bytes.toBytes(familyStr); + } else { // The default name + family = DEFAULT_FAMILY_NAME; + } } @VisibleForTesting @@ -103,6 +117,7 @@ public HBasePersistentStoreProvider(Configuration conf, String storeTableName) { this.columnConfig = Maps.newHashMap(); this.hbaseConf = conf; this.hbaseTableName = TableName.valueOf(storeTableName); + this.family = DEFAULT_FAMILY_NAME; } @VisibleForTesting @@ -111,6 +126,7 @@ public HBasePersistentStoreProvider(Map tableConfig, Map PersistentStore getOrCreateStore(PersistentStoreConfig config) switch (config.getMode()) { case BLOB_PERSISTENT: case PERSISTENT: - return new HBasePersistentStore<>(config, hbaseTable); + return new HBasePersistentStore<>(config, hbaseTable, family); default: throw new IllegalStateException("Unknown persistent mode"); } @@ -128,7 +144,7 @@ public PersistentStore getOrCreateStore(PersistentStoreConfig config) public void start() throws IOException { // Create the column family builder ColumnFamilyDescriptorBuilder columnFamilyBuilder = ColumnFamilyDescriptorBuilder - .newBuilder(FAMILY_NAME) + .newBuilder(family) .setMaxVersions(1); // Append the config to column family verifyAndSetColumnConfig(columnConfig, columnFamilyBuilder); @@ -149,10 +165,10 @@ public void start() throws IOException { if (!admin.isTableEnabled(hbaseTableName)) { admin.enableTable(hbaseTableName); // In case the table is disabled } - if (!table.hasColumnFamily(FAMILY_NAME)) { + if (!table.hasColumnFamily(family)) { throw new DrillRuntimeException("The HBase table " + hbaseTableName + " specified as persistent store exists but does not contain column family: " - + (Bytes.toString(FAMILY_NAME))); + + (Bytes.toString(family))); } logger.info("The HBase table of persistent store is loaded : {}", hbaseTableName); } diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java index 1f32276d056..443bdd0abb0 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java @@ -104,8 +104,8 @@ public void testStoreTableAttributes() throws Exception { assertTrue("The max size of hfile must be " + MAX_FILESIZE, tableDescriptor.getMaxFileSize() == MAX_FILESIZE); assertTrue("The memstore size must be " + MEMSTORE_FLUSHSIZE, tableDescriptor.getMemStoreFlushSize() == MEMSTORE_FLUSHSIZE); // Column Family verify - assertTrue("The column family not found", tableDescriptor.hasColumnFamily(HBasePersistentStoreProvider.FAMILY_NAME)); - ColumnFamilyDescriptor columnDescriptor = tableDescriptor.getColumnFamily(HBasePersistentStoreProvider.FAMILY_NAME); + assertTrue("The column family not found", tableDescriptor.hasColumnFamily(HBasePersistentStoreProvider.DEFAULT_FAMILY_NAME)); + ColumnFamilyDescriptor columnDescriptor = tableDescriptor.getColumnFamily(HBasePersistentStoreProvider.DEFAULT_FAMILY_NAME); assertTrue("The max number of versions must be " + MAX_VERSIONS, columnDescriptor.getMaxVersions() == MAX_VERSIONS); assertTrue("The time-to-live must be " + TTL, columnDescriptor.getTimeToLive() == TTL); // TODO native snappy* library not available