From 29fe525bd0382938e1d3d92957fd7c018ec761c1 Mon Sep 17 00:00:00 2001 From: Bowen Ding Date: Mon, 1 Jun 2020 00:59:45 +0800 Subject: [PATCH 1/2] DRILL-7745: Add storage plugin for IPFS --- .../drill/categories/IPFSStorageTest.java | 27 ++ .../client/src/protobuf/UserBitShared.pb.cc | 15 +- .../client/src/protobuf/UserBitShared.pb.h | 5 +- contrib/pom.xml | 1 + contrib/storage-ipfs/README.md | 99 ++++ contrib/storage-ipfs/pom.xml | 87 ++++ .../drill/exec/store/ipfs/IPFSCompat.java | 317 ++++++++++++ .../drill/exec/store/ipfs/IPFSContext.java | 99 ++++ .../drill/exec/store/ipfs/IPFSGroupScan.java | 454 ++++++++++++++++++ .../drill/exec/store/ipfs/IPFSHelper.java | 345 +++++++++++++ .../drill/exec/store/ipfs/IPFSJSONReader.java | 116 +++++ .../drill/exec/store/ipfs/IPFSPeer.java | 98 ++++ .../exec/store/ipfs/IPFSScanBatchCreator.java | 110 +++++ .../drill/exec/store/ipfs/IPFSScanSpec.java | 234 +++++++++ .../exec/store/ipfs/IPFSSchemaFactory.java | 108 +++++ .../exec/store/ipfs/IPFSStoragePlugin.java | 87 ++++ .../store/ipfs/IPFSStoragePluginConfig.java | 251 ++++++++++ .../drill/exec/store/ipfs/IPFSSubScan.java | 187 ++++++++ .../resources/bootstrap-storage-plugins.json | 29 ++ .../src/main/resources/drill-module.conf | 27 ++ .../drill/exec/store/ipfs/IPFSTestBase.java | 69 +++ .../exec/store/ipfs/IPFSTestConstants.java | 70 +++ .../store/ipfs/IPFSTestDataGenerator.java | 75 +++ .../drill/exec/store/ipfs/IPFSTestSuit.java | 60 +++ .../exec/store/ipfs/TestIPFSGroupScan.java | 178 +++++++ .../exec/store/ipfs/TestIPFSQueries.java | 106 ++++ .../exec/store/ipfs/TestIPFSScanSpec.java | 76 +++ .../resources/bootstrap-storage-plugins.json | 29 ++ .../src/test/resources/chunked-json-1.json | 12 + .../src/test/resources/chunked-json-2.json | 12 + .../src/test/resources/chunked-json-3.json | 6 + .../src/test/resources/simple.json | 1 + distribution/pom.xml | 5 + distribution/src/assemble/component.xml | 1 + .../drill/exec/proto/UserBitShared.java | 21 +- .../src/main/protobuf/UserBitShared.proto | 1 + 36 files changed, 3403 insertions(+), 15 deletions(-) create mode 100644 common/src/test/java/org/apache/drill/categories/IPFSStorageTest.java create mode 100644 contrib/storage-ipfs/README.md create mode 100644 contrib/storage-ipfs/pom.xml create mode 100644 contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java create mode 100644 contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSContext.java create mode 100644 contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java create mode 100644 contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java create mode 100644 contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSJSONReader.java create mode 100644 contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSPeer.java create mode 100644 contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanBatchCreator.java create mode 100644 contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanSpec.java create mode 100644 contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSchemaFactory.java create mode 100644 contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePlugin.java create mode 100644 contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java create mode 100644 contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSubScan.java create mode 100644 contrib/storage-ipfs/src/main/resources/bootstrap-storage-plugins.json create mode 100644 contrib/storage-ipfs/src/main/resources/drill-module.conf create mode 100644 contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestBase.java create mode 100644 contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestConstants.java create mode 100644 contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestDataGenerator.java create mode 100644 contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestSuit.java create mode 100644 contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSGroupScan.java create mode 100644 contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSQueries.java create mode 100644 contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSScanSpec.java create mode 100644 contrib/storage-ipfs/src/test/resources/bootstrap-storage-plugins.json create mode 100644 contrib/storage-ipfs/src/test/resources/chunked-json-1.json create mode 100644 contrib/storage-ipfs/src/test/resources/chunked-json-2.json create mode 100644 contrib/storage-ipfs/src/test/resources/chunked-json-3.json create mode 100644 contrib/storage-ipfs/src/test/resources/simple.json diff --git a/common/src/test/java/org/apache/drill/categories/IPFSStorageTest.java b/common/src/test/java/org/apache/drill/categories/IPFSStorageTest.java new file mode 100644 index 00000000000..80298c9081e --- /dev/null +++ b/common/src/test/java/org/apache/drill/categories/IPFSStorageTest.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.categories; + +/** + * This is a category used to mark unit tests that test the IPFS storage plugin. + */ +public interface IPFSStorageTest { +} + diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc b/contrib/native/client/src/protobuf/UserBitShared.pb.cc index 6dbd6259ff2..ae97caca310 100644 --- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc +++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc @@ -956,7 +956,7 @@ const char descriptor_table_protodef_UserBitShared_2eproto[] PROTOBUF_SECTION_VA "ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000" "\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014" "\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022" - "\032\n\026CANCELLATION_REQUESTED\020\006*\236\013\n\020CoreOper" + "\032\n\026CANCELLATION_REQUESTED\020\006*\261\013\n\020CoreOper" "atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST" "_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020" "\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH" @@ -992,11 +992,11 @@ const char descriptor_table_protodef_UserBitShared_2eproto[] PROTOBUF_SECTION_VA "CAN\020\?\022\022\n\016EXCEL_SUB_SCAN\020@\022\020\n\014SHP_SUB_SCA" "N\020A\022\024\n\020METADATA_HANDLER\020B\022\027\n\023METADATA_CO" "NTROLLER\020C\022\022\n\016DRUID_SUB_SCAN\020D\022\021\n\rSPSS_S" - "UB_SCAN\020E\022\021\n\rHTTP_SUB_SCAN\020F*g\n\nSaslStat" - "us\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020" - "SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013" - "SASL_FAILED\020\004B.\n\033org.apache.drill.exec.p" - "rotoB\rUserBitSharedH\001" + "UB_SCAN\020E\022\021\n\rHTTP_SUB_SCAN\020F\022\021\n\rIPFS_SUB" + "_SCAN\020G*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022" + "\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n" + "\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org." + "apache.drill.exec.protoB\rUserBitSharedH\001" ; static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_UserBitShared_2eproto_deps[3] = { &::descriptor_table_Coordination_2eproto, @@ -1030,7 +1030,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_Use static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_UserBitShared_2eproto_once; static bool descriptor_table_UserBitShared_2eproto_initialized = false; const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_UserBitShared_2eproto = { - &descriptor_table_UserBitShared_2eproto_initialized, descriptor_table_protodef_UserBitShared_2eproto, "UserBitShared.proto", 5821, + &descriptor_table_UserBitShared_2eproto_initialized, descriptor_table_protodef_UserBitShared_2eproto, "UserBitShared.proto", 5840, &descriptor_table_UserBitShared_2eproto_once, descriptor_table_UserBitShared_2eproto_sccs, descriptor_table_UserBitShared_2eproto_deps, 22, 3, schemas, file_default_instances, TableStruct_UserBitShared_2eproto::offsets, file_level_metadata_UserBitShared_2eproto, 22, file_level_enum_descriptors_UserBitShared_2eproto, file_level_service_descriptors_UserBitShared_2eproto, @@ -1269,6 +1269,7 @@ bool CoreOperatorType_IsValid(int value) { case 68: case 69: case 70: + case 71: return true; default: return false; diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h b/contrib/native/client/src/protobuf/UserBitShared.pb.h index ae87641a518..f5ebf2adebc 100644 --- a/contrib/native/client/src/protobuf/UserBitShared.pb.h +++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h @@ -392,11 +392,12 @@ enum CoreOperatorType : int { METADATA_CONTROLLER = 67, DRUID_SUB_SCAN = 68, SPSS_SUB_SCAN = 69, - HTTP_SUB_SCAN = 70 + HTTP_SUB_SCAN = 70, + IPFS_SUB_SCAN = 71 }; bool CoreOperatorType_IsValid(int value); constexpr CoreOperatorType CoreOperatorType_MIN = SINGLE_SENDER; -constexpr CoreOperatorType CoreOperatorType_MAX = HTTP_SUB_SCAN; +constexpr CoreOperatorType CoreOperatorType_MAX = IPFS_SUB_SCAN; constexpr int CoreOperatorType_ARRAYSIZE = CoreOperatorType_MAX + 1; const ::PROTOBUF_NAMESPACE_ID::EnumDescriptor* CoreOperatorType_descriptor(); diff --git a/contrib/pom.xml b/contrib/pom.xml index 3fb86471357..0bcfef54a20 100644 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -56,6 +56,7 @@ storage-kudu storage-opentsdb storage-http + storage-ipfs storage-druid diff --git a/contrib/storage-ipfs/README.md b/contrib/storage-ipfs/README.md new file mode 100644 index 00000000000..dd86a70a2ee --- /dev/null +++ b/contrib/storage-ipfs/README.md @@ -0,0 +1,99 @@ +# Drill Storage Plugin for IPFS (Minerva) + +## Contents + +0. [Introduction](#Introduction) +1. [Configuration](#Configuration) +2. [Usage Notes](#Usage Notes) + +## Introduction + +Minerva is a storage plugin of Drill that connects IPFS's decentralized storage and Drill's flexible query engine. Any data file stored on IPFS can be easily accessed from Drill's query interface, just like a file stored on a local disk. Moreover, with Drill's capability of distributed execution, other instances who are also running Minerva can help accelerate the execution: the data stays where it is, and the queries go to the most suitable nodes which stores the data locally, and from there the operations can be performed most efficiently. + +## Configuration + +1. Set Drill hostname to the IP address of the node to run Drill: + + Edit file `conf/drill-env.sh` and change the environment variable `DRILL_HOST_NAME` to the IP address of the node. Use private or global addresses, depending on whether you plan to run it in a private cluster or on the open Internet. + +2. Configure the IPFS storage plugin: + + The default configuration of the IPFS storage plugin is located at `src/resources/bootstrap-storage-plugins.json`: + + ``` + "ipfs" : { + "type":"ipfs", + "host": "127.0.0.1", + "port": 5001, + "max-nodes-per-leaf": 3, + "ipfs-timeouts": { + "find-provider": 4, + "find-peer-info": 4, + "fetch-data": 5 + }, + "ipfs-caches": { + "peer": {"size": 100, "ttl": 600}, + "provider": {"size": 1000, "ttl": 600} + }, + "groupscan-worker-threads": 50, + "formats": null, + "enabled": true + } + ``` + + where + + `host` and `port` are the host and API port where your IPFS daemon will be listening. Change it so that it matches the configuration of your IPFS instance. + + `max-nodes-per-leaf` controls how many provider nodes will be considered when the query is being planned. A larger value increases the parallelization width but typically takes longer to find enough providers from DHT resolution. A smaller value does the opposite. + + `ipfs-timeouts` set the maximum amount of time in seconds for various time-consuming operations: + + * `find-provider` is the time allowed to do DHT queries to find providers. + * `find-peer-info` is the time allowed to resolve the network addresses of the providers. + * `fetch-data` is the time the actual transmission is allowed to take. + + `ipfs-caches` control the size and TTL in seconds of cache entries of various caches used to accelerate query execution: + + * `peer` cache caches peers addresses. + * `provider` cache caches which providers provide a particular IPFS object. + + `groupscan-worker-threads` limits the number of worker threads when the planner communicate with the IPFS daemon to resolve providers and peer info. + + `formats` specifies the formats of the files. It is unimplemented for now and does nothing. + +3. Configure IPFS + + Start the IPFS daemon first. + + Set a Drill-ready flag to the node: + + ``` + $ IPFS_NULL_OBJECT=$(ipfs object new) + $ ipfs object patch add-link $IPFS_NULL_OBJECT "drill-ready" $IPFS_NULL_OBJECT + QmeXLv7D5uV2uXHejg7St7mSXDtqwTw8LuywSBawSxy5iA + $ ipfs name publish /ipfs/QmeXLv7D5uV2uXHejg7St7mSXDtqwTw8LuywSBawSxy5iA + Published to : /ipfs/QmeXLv7D5uV2uXHejg7St7mSXDtqwTw8LuywSBawSxy5iA + ``` + + This flag indicates that an IPFS node is also capable of handling Drill queries, and the planner will consider it when scheduling a query to execute distributedly. A node without this flag will be ignored. + + Also, pin the flag so that it will stick on your node: + + ``` + $ ipfs pin add -r QmeXLv7D5uV2uXHejg7St7mSXDtqwTw8LuywSBawSxy5iA + ``` + +## Usage Notes + +1. Compatible data formats + + Currently only JSON files are supported by this storage plugin. + +2. Add datasets to IPFS + + IPFS provides the `ipfs add` command to conveniently add a file to IPFS. Unfortunately that command does not split data files into chunks on line boundaries. Use [this script](https://gist.github.com/dbw9580/250e52a54e39a34083f815dea34a89e0) to do proper chunking and add files to IPFS. + +3. Timeout exceptions + + IPFS operations can be time-consuming, and sometimes an operation can take forever (e.g. querying the DHT for a non-existent object). Adjust the timeout values in the config to avoid most timeout exceptions. diff --git a/contrib/storage-ipfs/pom.xml b/contrib/storage-ipfs/pom.xml new file mode 100644 index 00000000000..711f071817c --- /dev/null +++ b/contrib/storage-ipfs/pom.xml @@ -0,0 +1,87 @@ + + + + + drill-contrib-parent + org.apache.drill.contrib + 1.18.0-SNAPSHOT + + 4.0.0 + + drill-ipfs-storage + contrib/ipfs-storage-plugin + + + **/IPFSTestSuit.class + + + + + org.apache.drill.exec + drill-java-exec + ${project.version} + + + + com.github.ipfs + java-ipfs-http-client + v1.3.3 + + + + + org.apache.drill.exec + drill-java-exec + tests + ${project.version} + test + + + + org.apache.drill + drill-common + tests + ${project.version} + test + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + 0 + + ${ipfs.TestSuite} + + + **/TestIPFSQueries.java + + + + + + \ No newline at end of file diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java new file mode 100644 index 00000000000..aca5b109162 --- /dev/null +++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSCompat.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.exec.store.ipfs; + +import io.ipfs.api.IPFS; +import io.ipfs.api.JSONParser; +import io.ipfs.multihash.Multihash; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Predicate; + +/** + * Compatibility fixes for java-ipfs-http-client library + */ +public class IPFSCompat { + public final String host; + public final int port; + private final String version; + public final String protocol; + public final int readTimeout; + public static final int DEFAULT_READ_TIMEOUT = 0; + + public final DHT dht = new DHT(); + public final Name name = new Name(); + + public IPFSCompat(IPFS ipfs) { + this(ipfs.host, ipfs.port); + } + + public IPFSCompat(String host, int port) { + this(host, port, "/api/v0/", false, DEFAULT_READ_TIMEOUT); + } + + public IPFSCompat(String host, int port, String version, boolean ssl, int readTimeout) { + this.host = host; + this.port = port; + + if (ssl) { + this.protocol = "https"; + } else { + this.protocol = "http"; + } + + this.version = version; + this.readTimeout = readTimeout; + } + + /** + * Resolve names to IPFS CIDs. + * See resolve in IPFS doc. + * + * @param scheme the scheme of the name to resolve, usually IPFS or IPNS + * @param path the path to the object + * @param recursive whether recursively resolve names until it is a IPFS CID + * @return a Map of JSON object, with the result as the value of key "Path" + */ + public Map resolve(String scheme, String path, boolean recursive) { + AtomicReference ret = new AtomicReference<>(); + getObjectStream( + "resolve?arg=/" + scheme + "/" + path + "&r=" + recursive, + res -> { + ret.set((Map) res); + return true; + }, + err -> { + throw new RuntimeException(err); + } + ); + return ret.get(); + } + + /** + * As defined in https://github.com/libp2p/go-libp2p-core/blob/b77fd280f2bfcce22f10a000e8e1d9ec53c47049/routing/query.go#L16 + */ + public enum DHTQueryEventType { + // Sending a query to a peer. + SendingQuery, + // Got a response from a peer. + PeerResponse, + // Found a "closest" peer (not currently used). + FinalPeer, + // Got an error when querying. + QueryError, + // Found a provider. + Provider, + // Found a value. + Value, + // Adding a peer to the query. + AddingPeer, + // Dialing a peer. + DialingPeer + } + + public class DHT { + /** + * Find internet addresses of a given peer. + * See dht/findpeer in IPFS doc. + * + * @param id the id of the peer to query + * @param timeout timeout value in seconds + * @param executor executor + * @return List of Multiaddresses of the peer + */ + public List findpeerListTimeout(Multihash id, int timeout, ExecutorService executor) { + AtomicReference> ret = new AtomicReference<>(new ArrayList<>()); + timeLimitedExec( + "dht/findpeer?arg=" + id, + timeout, + res -> { + Map peer = (Map) res; + if (peer == null) { + return false; + } + if ((int) peer.get("Type") != DHTQueryEventType.FinalPeer.ordinal()) { + return false; + } + List responses = (List) peer.get("Responses"); + if (responses == null || responses.size() == 0) { + return false; + } + // FinalPeer responses have exactly one response + Map> response = responses.get(0); + if (response == null) { + return false; + } + List addrs = response.get("Addrs"); + + ret.set(addrs); + return true; + }, + err -> { + if (!(err instanceof TimeoutException)) { + throw new RuntimeException(err); + } + }, + executor + ); + if (ret.get().size() > 0) { + return ret.get(); + } else { + return Collections.emptyList(); + } + } + + /** + * Find providers of a given CID. + * See dht/findprovs in IPFS doc. + * + * @param id the CID of the IPFS object + * @param timeout timeout value in seconds + * @param executor executor + * @return List of Multihash of providers of the object + */ + public List findprovsListTimeout(Multihash id, int maxPeers, int timeout, ExecutorService executor) { + AtomicReference> ret = new AtomicReference<>(new ArrayList<>()); + timeLimitedExec( + "dht/findprovs?arg=" + id + "&n=" + maxPeers, + timeout, + res -> { + Map peer = (Map) res; + if (peer == null) { + return false; + } + if ((int) peer.get("Type") != DHTQueryEventType.Provider.ordinal()) { + return false; + } + List responses = (List) peer.get("Responses"); + if (responses == null || responses.size() == 0) { + return false; + } + // One Provider message contains only one provider + Map response = responses.get(0); + if (response == null) { + return false; + } + String providerID = response.get("ID"); + + ret.get().add(providerID); + return ret.get().size() >= maxPeers; + }, + err -> { + if (!(err instanceof TimeoutException)) { + throw new RuntimeException(err); + } + }, + executor + ); + if (ret.get().size() > 0) { + return ret.get(); + } else { + return Collections.emptyList(); + } + } + } + + public class Name { + /** + * Resolve a IPNS name. + * See name/resolve in IPFS doc. + * + * @param hash the IPNS name to resolve + * @param timeout timeout value in seconds + * @param executor executor + * @return a Multihash of resolved name + */ + public Optional resolve(Multihash hash, int timeout, ExecutorService executor) { + AtomicReference ret = new AtomicReference<>(); + timeLimitedExec( + "name/resolve?arg=" + hash, + timeout, + res -> { + Map peer = (Map) res; + if (peer != null) { + ret.set((String) peer.get(("Path"))); + return true; + } + return false; + }, + err -> { + if (!(err instanceof TimeoutException)) { + throw new RuntimeException(err); + } + }, + executor + ); + return Optional.ofNullable(ret.get()); + } + } + + private void timeLimitedExec(String path, int timeout, Predicate processor, Consumer error, + ExecutorService executor) { + CompletableFuture f = CompletableFuture.runAsync( + () -> getObjectStream(path, processor, error), + executor + ); + try { + f.get(timeout, TimeUnit.SECONDS); + } catch (TimeoutException | ExecutionException | InterruptedException e) { + f.cancel(true); + error.accept(e); + } + } + + private void getObjectStream(String path, Predicate processor, Consumer error) { + byte LINE_FEED = (byte) 10; + + try { + InputStream in = getStream(path); + ByteArrayOutputStream resp = new ByteArrayOutputStream(); + + byte[] buf = new byte[4096]; + int r; + while ((r = in.read(buf)) >= 0) { + resp.write(buf, 0, r); + if (buf[r - 1] == LINE_FEED) { + try { + boolean done = processor.test(JSONParser.parse(resp.toString())); + if (done) { + break; + } + resp.reset(); + } catch (IllegalStateException e) { + in.close(); + resp.close(); + error.accept(e); + } + } + } + in.close(); + resp.close(); + } catch (IOException e) { + error.accept(e); + } + } + + private InputStream getStream(String path) throws IOException { + URL target = new URL(protocol, host, port, version + path); + HttpURLConnection conn = (HttpURLConnection) target.openConnection(); + conn.setRequestMethod("POST"); + conn.setRequestProperty("Content-Type", "application/json"); + conn.setReadTimeout(readTimeout); + return conn.getInputStream(); + } +} diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSContext.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSContext.java new file mode 100644 index 00000000000..b7cbf071a4a --- /dev/null +++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSContext.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.exec.store.ipfs; + +import io.ipfs.api.IPFS; +import io.ipfs.multihash.Multihash; +import org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSCacheType; +import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder; +import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader; +import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class IPFSContext { + private final IPFS ipfsClient; + private final IPFSHelper ipfsHelper; + private final IPFSPeer myself; + private final IPFSStoragePluginConfig storagePluginConfig; + private final IPFSStoragePlugin storagePlugin; + private final LoadingCache ipfsPeerCache; + private final LoadingCache> providerCache; + + public IPFSContext(IPFSStoragePluginConfig config, IPFSStoragePlugin plugin) throws IOException { + this.ipfsClient = new IPFS(config.getHost(), config.getPort()); + this.ipfsHelper = new IPFSHelper(ipfsClient, Executors.newCachedThreadPool()); + ipfsHelper.setMaxPeersPerLeaf(config.getMaxNodesPerLeaf()); + ipfsHelper.setTimeouts(config.getIPFSTimeouts()); + this.storagePlugin = plugin; + this.storagePluginConfig = config; + + this.myself = ipfsHelper.getMyself(); + this.ipfsPeerCache = CacheBuilder.newBuilder() + .maximumSize(config.getIPFSCache(IPFSCacheType.PEER).size) + .refreshAfterWrite(config.getIPFSCache(IPFSCacheType.PEER).ttl, TimeUnit.SECONDS) + .build(new CacheLoader() { + @Override + public IPFSPeer load(Multihash key) { + return new IPFSPeer(getIPFSHelper(), key); + } + }); + this.providerCache = CacheBuilder.newBuilder() + .maximumSize(config.getIPFSCache(IPFSCacheType.PROVIDER).size) + .refreshAfterWrite(config.getIPFSCache(IPFSCacheType.PROVIDER).ttl, TimeUnit.SECONDS) + .build(new CacheLoader>() { + @Override + public List load(Multihash key) { + return ipfsHelper.findprovsTimeout(key); + } + }); + } + + public IPFS getIPFSClient() { + return ipfsClient; + } + + public IPFSHelper getIPFSHelper() { + return ipfsHelper; + } + + public IPFSPeer getMyself() { + return myself; + } + + public IPFSStoragePlugin getStoragePlugin() { + return storagePlugin; + } + + public IPFSStoragePluginConfig getStoragePluginConfig() { + return storagePluginConfig; + } + + public LoadingCache getIPFSPeerCache() { + return ipfsPeerCache; + } + + public LoadingCache> getProviderCache() { + return providerCache; + } +} diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java new file mode 100644 index 00000000000..f68749d85b9 --- /dev/null +++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.exec.store.ipfs; + + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import io.ipfs.api.MerkleNode; +import io.ipfs.cid.Cid; +import io.ipfs.multihash.Multihash; +import org.apache.drill.common.PlanStringBuilder; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.util.DrillVersionInfo; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.schedule.AffinityCreator; +import org.apache.drill.exec.store.schedule.AssignmentCreator; +import org.apache.drill.exec.store.schedule.CompleteWork; +import org.apache.drill.exec.store.schedule.EndpointByteMap; +import org.apache.drill.exec.store.schedule.EndpointByteMapImpl; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; +import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache; +import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; +import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.RecursiveTask; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + + +@JsonTypeName("ipfs-scan") +public class IPFSGroupScan extends AbstractGroupScan { + private static final Logger logger = LoggerFactory.getLogger(IPFSGroupScan.class); + private final IPFSContext ipfsContext; + private final IPFSScanSpec ipfsScanSpec; + private final IPFSStoragePluginConfig config; + private List columns; + + private static final long DEFAULT_NODE_SIZE = 1000L; + public static final int DEFAULT_USER_PORT = 31010; + public static final int DEFAULT_CONTROL_PORT = 31011; + public static final int DEFAULT_DATA_PORT = 31012; + public static final int DEFAULT_HTTP_PORT = 8047; + + private ListMultimap assignments; + private List ipfsWorkList = Lists.newArrayList(); + private ListMultimap endpointWorksMap; + private List affinities; + + @JsonCreator + public IPFSGroupScan(@JsonProperty("IPFSScanSpec") IPFSScanSpec ipfsScanSpec, + @JsonProperty("IPFSStoragePluginConfig") IPFSStoragePluginConfig ipfsStoragePluginConfig, + @JsonProperty("columns") List columns, + @JacksonInject StoragePluginRegistry pluginRegistry) { + this( + pluginRegistry.resolve(ipfsStoragePluginConfig, IPFSStoragePlugin.class).getIPFSContext(), + ipfsScanSpec, + columns + ); + } + + public IPFSGroupScan(IPFSContext ipfsContext, + IPFSScanSpec ipfsScanSpec, + List columns) { + super((String) null); + this.ipfsContext = ipfsContext; + this.ipfsScanSpec = ipfsScanSpec; + this.config = ipfsContext.getStoragePluginConfig(); + logger.debug("GroupScan constructor called with columns {}", columns); + this.columns = columns == null || columns.size() == 0 ? ALL_COLUMNS : columns; + init(); + } + + private void init() { + IPFSHelper ipfsHelper = ipfsContext.getIPFSHelper(); + endpointWorksMap = ArrayListMultimap.create(); + + Multihash topHash = ipfsScanSpec.getTargetHash(ipfsHelper); + try { + Map leafAddrMap = getLeafAddrMappings(topHash); + logger.debug("Iterating on {} leaves...", leafAddrMap.size()); + ClusterCoordinator coordinator = ipfsContext.getStoragePlugin().getContext().getClusterCoordinator(); + for (Multihash leaf : leafAddrMap.keySet()) { + String peerHostname = leafAddrMap.get(leaf); + + Optional oep = coordinator.getAvailableEndpoints() + .stream() + .filter(a -> a.getAddress().equals(peerHostname)) + .findAny(); + DrillbitEndpoint ep; + if (oep.isPresent()) { + ep = oep.get(); + logger.debug("Using existing endpoint {}", ep.getAddress()); + } else { + logger.debug("created new endpoint on the fly {}", peerHostname); + //DRILL-7754: read ports & version info from IPFS instead of hard-coded + ep = DrillbitEndpoint.newBuilder() + .setAddress(peerHostname) + .setUserPort(DEFAULT_USER_PORT) + .setControlPort(DEFAULT_CONTROL_PORT) + .setDataPort(DEFAULT_DATA_PORT) + .setHttpPort(DEFAULT_HTTP_PORT) + .setVersion(DrillVersionInfo.getVersion()) + .setState(DrillbitEndpoint.State.ONLINE) + .build(); + //DRILL-7777: how to safely remove endpoints that are no longer needed once the query is completed? + ClusterCoordinator.RegistrationHandle handle = coordinator.register(ep); + } + + IPFSWork work = new IPFSWork(leaf); + logger.debug("added endpoint {} to work {}", ep.getAddress(), work); + work.getByteMap().add(ep, DEFAULT_NODE_SIZE); + work.setOnEndpoint(ep); + endpointWorksMap.put(ep.getAddress(), work); + ipfsWorkList.add(work); + } + } catch (Exception e) { + throw UserException + .planError(e) + .message("Exception during initialization of IPFS GroupScan") + .build(logger); + } + } + + Map getLeafAddrMappings(Multihash topHash) { + logger.debug("start to recursively expand nested IPFS hashes, topHash={}", topHash); + Stopwatch watch = Stopwatch.createStarted(); + ForkJoinPool forkJoinPool = new ForkJoinPool(config.getNumWorkerThreads()); + IPFSTreeFlattener topTask = new IPFSTreeFlattener(topHash, false, ipfsContext); + Map leafAddrMap = forkJoinPool.invoke(topTask); + logger.debug("Took {} ms to expand hash leaves", watch.elapsed(TimeUnit.MILLISECONDS)); + + return leafAddrMap; + } + + private IPFSGroupScan(IPFSGroupScan that) { + super(that); + this.ipfsContext = that.ipfsContext; + this.ipfsScanSpec = that.ipfsScanSpec; + this.config = that.config; + this.assignments = that.assignments; + this.ipfsWorkList = that.ipfsWorkList; + this.endpointWorksMap = that.endpointWorksMap; + this.columns = that.columns; + } + + @JsonProperty + public List getColumns() { + return columns; + } + + @JsonIgnore + public IPFSStoragePlugin getStoragePlugin() { + return ipfsContext.getStoragePlugin(); + } + + @JsonProperty + public IPFSScanSpec getIPFSScanSpec() { + return ipfsScanSpec; + } + + @Override + public List getOperatorAffinity() { + if (affinities == null) { + affinities = AffinityCreator.getAffinityMap(ipfsWorkList); + } + return affinities; + } + + @Override + public int getMaxParallelizationWidth() { + DrillbitEndpoint myself = ipfsContext.getStoragePlugin().getContext().getEndpoint(); + int width; + if (endpointWorksMap.containsKey(myself.getAddress())) { + // the foreman is also going to be a minor fragment worker under a UnionExchange operator + width = ipfsWorkList.size(); + } else { + // the foreman does not hold data, so we have to force parallelization + // to make sure there is a UnionExchange operator + width = ipfsWorkList.size() + 1; + } + logger.debug("getMaxParallelizationWidth: {}", width); + return width; + } + + @Override + public void applyAssignments(List incomingEndpoints) { + logger.debug("Applying assignments: endpointWorksMap = {}", endpointWorksMap); + assignments = AssignmentCreator.getMappings(incomingEndpoints, ipfsWorkList); + } + + @Override + public IPFSSubScan getSpecificScan(int minorFragmentId) { + logger.debug(String.format("getSpecificScan: minorFragmentId = %d", minorFragmentId)); + List workList = assignments.get(minorFragmentId); + List scanSpecList = Lists.newArrayList(); + if (workList != null) { + logger.debug("workList.size(): {}", workList.size()); + + for (IPFSWork work : workList) { + scanSpecList.add(work.getPartialRootHash()); + } + } + + return new IPFSSubScan(ipfsContext, scanSpecList, ipfsScanSpec.getFormatExtension(), columns); + } + + @Override + public ScanStats getScanStats() { + long recordCount = 100000 * endpointWorksMap.size(); + return new ScanStats(ScanStats.GroupScanProperty.NO_EXACT_ROW_COUNT, recordCount, 1, recordCount); + } + + @Override + public IPFSGroupScan clone(List columns) { + logger.debug("IPFSGroupScan clone {}", columns); + IPFSGroupScan cloned = new IPFSGroupScan(this); + cloned.columns = columns; + return cloned; + } + + @Override + @JsonIgnore + public boolean canPushdownProjects(List columns) { + return true; + } + + @Override + @JsonIgnore + public PhysicalOperator getNewWithChildren(List children) { + Preconditions.checkArgument(children.isEmpty()); + logger.debug("getNewWithChildren called"); + return new IPFSGroupScan(this); + } + + @Override + public String getDigest() { + return toString(); + } + + @Override + public String toString() { + return new PlanStringBuilder(this) + .field("scan spec", ipfsScanSpec) + .field("columns", columns) + .toString(); + } + + private static class IPFSWork implements CompleteWork { + private final EndpointByteMapImpl byteMap = new EndpointByteMapImpl(); + private final Multihash partialRoot; + private DrillbitEndpoint onEndpoint = null; + + + public IPFSWork(String root) { + this.partialRoot = Cid.decode(root); + } + + public IPFSWork(Multihash root) { + this.partialRoot = root; + } + + public Multihash getPartialRootHash() { + return partialRoot; + } + + public void setOnEndpoint(DrillbitEndpoint endpointAddress) { + this.onEndpoint = endpointAddress; + } + + @Override + public long getTotalBytes() { + return DEFAULT_NODE_SIZE; + } + + @Override + public EndpointByteMap getByteMap() { + return byteMap; + } + + @Override + public int compareTo(CompleteWork o) { + return 0; + } + + @Override + public String toString() { + return new PlanStringBuilder(this) + .field("partial root", partialRoot) + .toString(); + } + } + + //DRILL-7756: detect and warn about loops/recursions in case of a malformed tree + static class IPFSTreeFlattener extends RecursiveTask> { + private final Multihash hash; + private final boolean isProvider; + private final Map ret = new LinkedHashMap<>(); + private final IPFSPeer myself; + private final IPFSHelper helper; + private final LoadingCache peerCache; + private final LoadingCache> providerCache; + + public IPFSTreeFlattener(Multihash hash, boolean isProvider, IPFSContext context) { + this( + hash, + isProvider, + context.getMyself(), + context.getIPFSHelper(), + context.getIPFSPeerCache(), + context.getProviderCache() + ); + } + + IPFSTreeFlattener(Multihash hash, boolean isProvider, IPFSPeer myself, IPFSHelper ipfsHelper, + LoadingCache peerCache, LoadingCache> providerCache) { + this.hash = hash; + this.isProvider = isProvider; + this.myself = myself; + this.helper = ipfsHelper; + this.peerCache = peerCache; + this.providerCache = providerCache; + } + + public IPFSTreeFlattener(IPFSTreeFlattener reference, Multihash hash, boolean isProvider) { + this(hash, isProvider, reference.myself, reference.helper, reference.peerCache, reference.providerCache); + } + + @Override + public Map compute() { + try { + if (isProvider) { + IPFSPeer peer = peerCache.getUnchecked(hash); + ret.put(hash, peer.getDrillbitAddress().orElse(null)); + return ret; + } + + MerkleNode metaOrSimpleNode = helper.getObjectLinksTimeout(hash); + if (metaOrSimpleNode.links.size() > 0) { + logger.debug("{} is a meta node", hash); + //DRILL-7755: do something useful with leaf size, e.g. hint Drill about operation costs + List intermediates = metaOrSimpleNode.links.stream().map(x -> x.hash).collect(Collectors.toList()); + + ImmutableList.Builder builder = ImmutableList.builder(); + for (Multihash intermediate : intermediates.subList(1, intermediates.size())) { + builder.add(new IPFSTreeFlattener(this, intermediate, false)); + } + ImmutableList subtasks = builder.build(); + subtasks.forEach(IPFSTreeFlattener::fork); + + IPFSTreeFlattener first = new IPFSTreeFlattener(this, intermediates.get(0), false); + ret.putAll(first.compute()); + subtasks.reverse().forEach( + subtask -> ret.putAll(subtask.join()) + ); + } else { + logger.debug("{} is a simple node", hash); + List providers = providerCache.getUnchecked(hash).stream() + .map(peerCache::getUnchecked) + .collect(Collectors.toList()); + providers = providers.stream() + .filter(IPFSPeer::isDrillReady) + .collect(Collectors.toList()); + if (providers.size() < 1) { + logger.warn("No drill-ready provider found for leaf {}, adding foreman as the provider", hash); + providers.add(myself); + } + + logger.debug("Got {} providers for {} from IPFS", providers.size(), hash); + ImmutableList.Builder builder = ImmutableList.builder(); + for (IPFSPeer provider : providers.subList(1, providers.size())) { + builder.add(new IPFSTreeFlattener(this, provider.getId(), true)); + } + ImmutableList subtasks = builder.build(); + subtasks.forEach(IPFSTreeFlattener::fork); + + List possibleAddrs = new ArrayList<>(); + Multihash firstProvider = providers.get(0).getId(); + IPFSTreeFlattener firstTask = new IPFSTreeFlattener(this, firstProvider, true); + String firstAddr = firstTask.compute().get(firstProvider); + if (firstAddr != null) { + possibleAddrs.add(firstAddr); + } + + subtasks.reverse().forEach( + subtask -> { + String addr = subtask.join().get(subtask.hash); + if (addr != null) { + possibleAddrs.add(addr); + } + } + ); + + if (possibleAddrs.size() < 1) { + logger.error("All attempts to find an appropriate provider address for {} have failed", hash); + throw UserException + .planError() + .message("No address found for any provider for leaf " + hash) + .build(logger); + } else { + //DRILL-7753: better peer selection algorithm + Random random = new Random(); + String chosenAddr = possibleAddrs.get(random.nextInt(possibleAddrs.size())); + ret.clear(); + ret.put(hash, chosenAddr); + logger.debug("Got peer host {} for leaf {}", chosenAddr, hash); + } + } + } catch (IOException e) { + throw UserException.planError(e).message("Exception during planning").build(logger); + } + return ret; + } + } +} diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java new file mode 100644 index 00000000000..6025752ff99 --- /dev/null +++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSHelper.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.exec.store.ipfs; + +import io.ipfs.api.IPFS; +import io.ipfs.api.MerkleNode; +import io.ipfs.cid.Cid; +import io.ipfs.multiaddr.MultiAddress; +import io.ipfs.multihash.Multihash; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; +import org.bouncycastle.util.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import static org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FETCH_DATA; +import static org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FIND_PEER_INFO; + +/** + * Helper class with some utilities that are specific to Drill with an IPFS storage + * + * DRILL-7778: refactor to support CIDv1 + */ +public class IPFSHelper { + private static final Logger logger = LoggerFactory.getLogger(IPFSHelper.class); + + public static final String IPFS_NULL_OBJECT_HASH = "QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n"; + public static final Multihash IPFS_NULL_OBJECT = Cid.decode(IPFS_NULL_OBJECT_HASH); + + private ExecutorService executorService; + private final IPFS client; + private final IPFSCompat clientCompat; + private IPFSPeer myself; + private int maxPeersPerLeaf; + private Map timeouts; + + public IPFSHelper(IPFS ipfs) { + this.client = ipfs; + this.clientCompat = new IPFSCompat(ipfs); + } + + public IPFSHelper(IPFS ipfs, ExecutorService executorService) { + this(ipfs); + this.executorService = executorService; + } + + public void setTimeouts(Map timeouts) { + this.timeouts = timeouts; + } + + public void setMyself(IPFSPeer myself) { + this.myself = myself; + } + + /** + * Set maximum number of providers per leaf node. The more providers, the more time it takes to do DHT queries, while + * it is more likely we can find an optimal peer. + * + * @param maxPeersPerLeaf max number of providers to search per leaf node + */ + public void setMaxPeersPerLeaf(int maxPeersPerLeaf) { + this.maxPeersPerLeaf = maxPeersPerLeaf; + } + + public IPFS getClient() { + return client; + } + + public IPFSCompat getClientCompat() { + return clientCompat; + } + + public List findprovsTimeout(Multihash id) { + List providers; + providers = clientCompat.dht.findprovsListTimeout(id, maxPeersPerLeaf, timeouts.get(IPFSTimeOut.FIND_PROV), executorService); + + return providers.stream().map(Cid::decode).collect(Collectors.toList()); + } + + public List findpeerTimeout(Multihash peerId) { + // trying to resolve addresses of a node itself will always hang + // so we treat it specially + if (peerId.equals(myself.getId())) { + return myself.getMultiAddresses(); + } + + List addrs; + addrs = clientCompat.dht.findpeerListTimeout(peerId, timeouts.get(IPFSTimeOut.FIND_PEER_INFO), executorService); + return addrs.stream() + .filter(addr -> !addr.equals("")) + .map(MultiAddress::new).collect(Collectors.toList()); + } + + public byte[] getObjectDataTimeout(Multihash object) throws IOException { + return timedFailure(client.object::data, object, timeouts.get(IPFSTimeOut.FETCH_DATA)); + } + + public MerkleNode getObjectLinksTimeout(Multihash object) throws IOException { + return timedFailure(client.object::links, object, timeouts.get(IPFSTimeOut.FETCH_DATA)); + } + + public IPFSPeer getMyself() throws IOException { + if (this.myself != null) { + return this.myself; + } + + Map res = timedFailure(client::id, timeouts.get(FIND_PEER_INFO)); + Multihash myID = Cid.decode((String) res.get("ID")); + // Rule out any non-local addresses as they might be NAT-ed external + // addresses that are not always reachable from the inside. + // But is it safe to assume IPFS always listens on loopback and local addresses? + List myAddrs = ((List) res.get("Addresses")) + .stream() + .map(MultiAddress::new) + .filter(addr -> { + try { + InetAddress inetAddress = InetAddress.getByName(addr.getHost()); + if (inetAddress instanceof Inet6Address) { + return false; + } + return inetAddress.isSiteLocalAddress() + || inetAddress.isLinkLocalAddress() + || inetAddress.isLoopbackAddress(); + } catch (UnknownHostException e) { + return false; + } + }) + .collect(Collectors.toList()); + this.myself = new IPFSPeer(this, myID, myAddrs); + + return this.myself; + } + + public Multihash resolve(String prefix, String path, boolean recursive) { + Map result = timedFailure( + (args) -> clientCompat.resolve((String) args.get(0), (String) args.get(1), (boolean) args.get(2)), + ImmutableList.of(prefix, path, recursive), + timeouts.get(IPFSTimeOut.FIND_PEER_INFO) + ); + if (!result.containsKey("Path")) { + return null; + } + + // the path returned is of form /ipfs/Qma... + String hashString = result.get("Path").split("/")[2]; + return Cid.decode(hashString); + } + + @FunctionalInterface + public interface ThrowingFunction { + R apply(final T in) throws E; + } + + @FunctionalInterface + public interface ThrowingSupplier { + R get() throws E; + } + + /** + * Execute a time-critical operation op within time timeout. Causes the query to fail completely + * if the operation times out. + * + * @param op a Function that represents the operation to perform + * @param in the parameter for op + * @param timeout consider the execution has timed out after this amount of time in seconds + * @param Input type + * @param Return type + * @param Type of checked exception op throws + * @return R the result of the operation + * @throws E when the function throws an E + */ + public R timedFailure(ThrowingFunction op, T in, int timeout) throws E { + Callable task = () -> op.apply(in); + return timedFailure(task, timeout, TimeUnit.SECONDS); + } + + public R timedFailure(ThrowingSupplier op, int timeout) throws E { + Callable task = op::get; + return timedFailure(task, timeout, TimeUnit.SECONDS); + } + + private R timedFailure(Callable task, int timeout, TimeUnit timeUnit) throws E { + Future res = executorService.submit(task); + try { + return res.get(timeout, timeUnit); + } catch (ExecutionException e) { + throw (E) e.getCause(); + } catch (TimeoutException e) { + throw UserException.executionError(e).message("IPFS operation timed out").build(logger); + } catch (CancellationException | InterruptedException e) { + throw UserException.executionError(e).message("IPFS operation was cancelled or interrupted").build(logger); + } + } + + /* + * DRILL-7753: implement a more advanced algorithm that picks optimal addresses. Maybe check reachability, latency + * and bandwidth? + */ + + /** + * Choose a peer's network address from its advertised Multiaddresses. + * Prefer globally routable address over local addresses. + * + * @param peerAddrs Multiaddresses obtained from IPFS.DHT.findprovs + * @return network address + */ + public static Optional pickPeerHost(List peerAddrs) { + String localAddr = null; + for (MultiAddress addr : peerAddrs) { + String host = addr.getHost(); + try { + InetAddress inetAddress = InetAddress.getByName(host); + if (inetAddress instanceof Inet6Address) { + // ignore IPv6 addresses + continue; + } + if (inetAddress.isSiteLocalAddress() || inetAddress.isLinkLocalAddress()) { + localAddr = host; + } else { + return Optional.of(host); + } + } catch (UnknownHostException ignored) { + } + } + + return Optional.ofNullable(localAddr); + } + + public Optional getPeerDrillHostname(Multihash peerId) { + return getPeerData(peerId, "drill-hostname").map(Strings::fromByteArray); + } + + /** + * Check if an IPFS peer is also running a Drillbit so that it can be used to execute a part of a query. + * + * @param peerId the id of the peer + * @return if the peer is Drill-ready + */ + public boolean isDrillReady(Multihash peerId) { + try { + return getPeerData(peerId, "drill-ready").isPresent(); + } catch (RuntimeException e) { + return false; + } + } + + public Optional getIPNSDataHash(Multihash peerId) { + Optional> links = getPeerLinks(peerId); + if (!links.isPresent()) { + return Optional.empty(); + } + + return links.get().stream() + .filter(l -> l.name.equals(Optional.of("drill-data"))) + .findFirst() + .map(l -> l.hash); + } + + /** + * Get from IPFS data under a peer's ID, i.e. the data identified by /ipfs/{ID}/key. + * + * @param peerId the peer's ID + * @param key key + * @return data in bytes + */ + private Optional getPeerData(Multihash peerId, String key) { + Optional> links = getPeerLinks(peerId); + if (!links.isPresent()) { + return Optional.empty(); + } + + for (MerkleNode link : links.get()) { + if (link.name.equals(Optional.of(key))) { + try { + byte[] result = timedFailure(client.object::data, link.hash, timeouts.get(FETCH_DATA)); + return Optional.of(result); + } catch (IOException e) { + return Optional.empty(); + } + } + } + return Optional.empty(); + } + + /** + * Get all the links under a peer's ID. + * + * @param peerId peer's ID + * @return List of links + */ + private Optional> getPeerLinks(Multihash peerId) { + try { + Optional optionalPath = clientCompat.name.resolve(peerId, timeouts.get(FIND_PEER_INFO), executorService); + if (!optionalPath.isPresent()) { + return Optional.empty(); + } + String path = optionalPath.get().substring(6); // path starts with /ipfs/Qm... + + List links = timedFailure( + client.object::get, + Cid.decode(path), + timeouts.get(FETCH_DATA) + ).links; + if (links.size() > 0) { + return Optional.of(links); + } + } catch (IOException ignored) { + } + return Optional.empty(); + } +} diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSJSONReader.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSJSONReader.java new file mode 100644 index 00000000000..ec9584c7c10 --- /dev/null +++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSJSONReader.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.exec.store.ipfs; + +import io.ipfs.multihash.Multihash; +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.common.exceptions.ChildErrorContext; +import org.apache.drill.common.exceptions.CustomErrorContext; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator; +import org.apache.drill.exec.store.easy.json.loader.JsonLoader; +import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +public class IPFSJSONReader implements ManagedReader { + private static final Logger logger = LoggerFactory.getLogger(IPFSJSONReader.class); + private final IPFSContext ipfsContext; + private final Multihash block; + private JsonLoader jsonLoader; + + public IPFSJSONReader(IPFSContext ipfsContext, Multihash block) { + this.ipfsContext = ipfsContext; + this.block = block; + } + + @Override + public boolean open(SchemaNegotiator negotiator) { + CustomErrorContext errorContext = new ChildErrorContext(negotiator.parentErrorContext()) { + @Override + public void addContext(UserException.Builder builder) { + super.addContext(builder); + builder.addContext("hash", block.toString()); + } + }; + negotiator.setErrorContext(errorContext); + + IPFSHelper helper = ipfsContext.getIPFSHelper(); + + byte[] rawDataBytes; + if (block.equals(IPFSHelper.IPFS_NULL_OBJECT)) { + // An empty ipfs object, but an empty string will make Jackson ObjectMapper fail + // so treat it specially + rawDataBytes = "[{}]".getBytes(); + } else { + try { + rawDataBytes = helper.getObjectDataTimeout(block); + } catch (final IOException e) { + throw UserException + .dataReadError(e) + .message("Failed to retrieve data from IPFS block") + .addContext("Error message", e.getMessage()) + .addContext(errorContext) + .build(logger); + } + } + + String rootJson = new String(rawDataBytes); + int start = rootJson.indexOf("{"); + int end = rootJson.lastIndexOf("}"); + rootJson = rootJson.substring(start, end + 1); + InputStream inStream = new ByteArrayInputStream(rootJson.getBytes()); + + try { + jsonLoader = new JsonLoaderImpl.JsonLoaderBuilder() + .resultSetLoader(negotiator.build()) + .standardOptions(negotiator.queryOptions()) + .errorContext(errorContext) + .fromStream(inStream) + .build(); + } catch (Throwable t) { + + // Paranoia: ensure stream is closed if anything goes wrong. + // After this, the JSON loader will close the stream. + AutoCloseables.closeSilently(inStream); + throw t; + } + + return true; + } + + @Override + public boolean next() { + return jsonLoader.readBatch(); + } + + @Override + public void close() { + if (jsonLoader != null) { + jsonLoader.close(); + jsonLoader = null; + } + } +} diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSPeer.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSPeer.java new file mode 100644 index 00000000000..d59b0a057d2 --- /dev/null +++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSPeer.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.exec.store.ipfs; + +import io.ipfs.multiaddr.MultiAddress; +import io.ipfs.multihash.Multihash; + +import java.util.List; +import java.util.Optional; + +public class IPFSPeer { + private final IPFSHelper helper; + + private final Multihash id; + private List addrs; + private boolean isDrillReady; + private boolean isDrillReadyChecked = false; + private Optional drillbitAddress = Optional.empty(); + private boolean drillbitAddressChecked = false; + + + public IPFSPeer(IPFSHelper helper, Multihash id) { + this.helper = helper; + this.id = id; + } + + IPFSPeer(IPFSHelper helper, Multihash id, List addrs) { + this.helper = helper; + this.id = id; + this.addrs = addrs; + this.isDrillReady = helper.isDrillReady(id); + this.isDrillReadyChecked = true; + this.drillbitAddress = IPFSHelper.pickPeerHost(addrs); + this.drillbitAddressChecked = true; + } + + public boolean isDrillReady() { + if (!isDrillReadyChecked) { + isDrillReady = helper.isDrillReady(id); + isDrillReadyChecked = true; + } + return isDrillReady; + } + + public boolean hasDrillbitAddress() { + return getDrillbitAddress().isPresent(); + } + + public Optional getDrillbitAddress() { + findDrillbitAddress(); + return drillbitAddress; + } + + public List getMultiAddresses() { + findDrillbitAddress(); + return addrs; + } + + public Multihash getId() { + return id; + } + + + private void findDrillbitAddress() { + if (!drillbitAddressChecked) { + addrs = helper.findpeerTimeout(id); + drillbitAddress = IPFSHelper.pickPeerHost(addrs); + drillbitAddressChecked = true; + } + } + + @Override + public int hashCode() { + return id.hashCode(); + } + + @Override + public String toString() { + return String.format("IPFSPeer(%s)", id.toBase58()); + } +} diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanBatchCreator.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanBatchCreator.java new file mode 100644 index 00000000000..36a13c39492 --- /dev/null +++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanBatchCreator.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.exec.store.ipfs; + +import io.ipfs.multihash.Multihash; +import org.apache.drill.common.exceptions.ChildErrorContext; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.ops.ExecutorFragmentContext; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ReaderFactory; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder; +import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator; +import org.apache.drill.exec.record.CloseableRecordBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class IPFSScanBatchCreator implements BatchCreator { + private static final Logger logger = LoggerFactory.getLogger(IPFSScanBatchCreator.class); + + @Override + public CloseableRecordBatch getBatch(ExecutorFragmentContext context, IPFSSubScan subScan, List children) + throws ExecutionSetupException { + Preconditions.checkArgument(children.isEmpty()); + logger.debug(String.format("subScanSpecList.size = %d", subScan.getIPFSSubScanSpecList().size())); + + try { + ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan); + return builder.buildScanOperator(context, subScan); + } catch (Throwable e) { + // Wrap all others + throw new ExecutionSetupException(e); + } + } + + private ScanFrameworkBuilder createBuilder(OptionManager options, IPFSSubScan subScan) { + ManagedScanFramework.ScanFrameworkBuilder builder = new ManagedScanFramework.ScanFrameworkBuilder(); + builder.projection(subScan.getColumns()); + builder.setUserName(subScan.getUserName()); + + // Provide custom error context + builder.errorContext( + new ChildErrorContext(builder.errorContext()) { + @Override + public void addContext(UserException.Builder builder) { + builder.addContext("Plugin", subScan.getIPFSContext().getStoragePlugin().getName()); + } + }); + + // Reader + ManagedScanFramework.ReaderFactory readerFactory = new IPFSJSONReaderFactory(subScan); + builder.setReaderFactory(readerFactory); + builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR)); + return builder; + } + + private static class IPFSJSONReaderFactory implements ReaderFactory { + + private final IPFSSubScan subScan; + private int count; + + public IPFSJSONReaderFactory(IPFSSubScan subScan) { + this.subScan = subScan; + this.count = 0; + } + + @Override + public void bind(ManagedScanFramework framework) { + } + + @Override + public ManagedReader next() { + + List scanSpecList = subScan.getIPFSSubScanSpecList(); + if (count < scanSpecList.size()) { + Multihash block = scanSpecList.get(count); + count++; + return new IPFSJSONReader(subScan.getIPFSContext(), block); + } else { + return null; + } + } + } +} diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanSpec.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanSpec.java new file mode 100644 index 00000000000..2ca3a06d0c7 --- /dev/null +++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanSpec.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.exec.store.ipfs; + + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import io.ipfs.multihash.Multihash; +import org.apache.drill.common.PlanStringBuilder; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.security.InvalidParameterException; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +@JsonTypeName("IPFSScanSpec") +public class IPFSScanSpec { + private static final Logger logger = LoggerFactory.getLogger(IPFSScanSpec.class); + + public enum Prefix { + @JsonProperty("ipfs") + IPFS("ipfs"), + @JsonProperty("ipns") + IPNS("ipns"); + + @JsonProperty("prefix") + private final String name; + + Prefix(String prefix) { + this.name = prefix; + } + + @Override + public String toString() { + return this.name; + } + + @JsonCreator + public static Prefix of(String what) { + switch (what) { + case "ipfs": + return IPFS; + case "ipns": + return IPNS; + default: + throw new InvalidParameterException("Unsupported prefix: " + what); + } + } + } + + public enum Format { + @JsonProperty("json") + JSON("json"), + @JsonProperty("csv") + CSV("csv"); + + @JsonProperty("format") + private final String name; + + Format(String prefix) { + this.name = prefix; + } + + @Override + public String toString() { + return this.name; + } + + @JsonCreator + public static Format of(String what) { + switch (what) { + case "json": + return JSON; + case "csv": + return CSV; + default: + throw new InvalidParameterException("Unsupported format: " + what); + } + } + } + + public static Set formats = ImmutableSet.of("json", "csv"); + private Prefix prefix; + private String path; + private Format formatExtension; + private final IPFSContext ipfsContext; + + @JsonCreator + public IPFSScanSpec(@JacksonInject StoragePluginRegistry registry, + @JsonProperty("IPFSStoragePluginConfig") IPFSStoragePluginConfig ipfsStoragePluginConfig, + @JsonProperty("prefix") Prefix prefix, + @JsonProperty("format") Format format, + @JsonProperty("path") String path) { + this.ipfsContext = registry.resolve(ipfsStoragePluginConfig, IPFSStoragePlugin.class).getIPFSContext(); + this.prefix = prefix; + this.formatExtension = format; + this.path = path; + } + + public IPFSScanSpec(IPFSContext ipfsContext, String path) { + this.ipfsContext = ipfsContext; + parsePath(path); + } + + private void parsePath(String path) { + // IPFS CIDs can be encoded in various bases, see https://github.com/multiformats/multibase/blob/master/multibase.csv + // Base64-encoded CIDs should not be present in a path since it can contain the '/' character. + // [a-zA-Z0-9] should be enough to cover the other bases. + Pattern tableNamePattern = Pattern.compile("^/(ipfs|ipns)/([a-zA-Z0-9]+(/[^#]+)*)(?:#(\\w+))?$"); + Matcher matcher = tableNamePattern.matcher(path); + if (!matcher.matches()) { + throw UserException + .validationError() + .message("Invalid IPFS path in query string. Use paths of pattern " + + "`/scheme/hashpath#format`, where scheme:= \"ipfs\"|\"ipns\", " + + "hashpath:= HASH [\"/\" path], HASH is IPFS Base58 encoded hash, " + + "path:= TEXT [\"/\" path], format:= \"json\"|\"csv\"") + .build(logger); + } + + String prefix = matcher.group(1); + String hashPath = matcher.group(2); + String formatExtension = matcher.group(4); + if (formatExtension == null) { + formatExtension = "_FORMAT_OMITTED_"; + } + + logger.debug("prefix {}, hashPath {}, format {}", prefix, hashPath, formatExtension); + + this.path = hashPath; + this.prefix = Prefix.of(prefix); + try { + this.formatExtension = Format.of(formatExtension); + } catch (InvalidParameterException e) { + //if format is omitted or not valid, try resolve it from file extension in the path + Pattern fileExtensionPattern = Pattern.compile("^.*\\.(\\w+)$"); + Matcher fileExtensionMatcher = fileExtensionPattern.matcher(hashPath); + if (fileExtensionMatcher.matches()) { + this.formatExtension = Format.of(fileExtensionMatcher.group(1)); + logger.debug("extracted format from query: {}", this.formatExtension); + } else { + logger.debug("failed to extract format from path: {}", hashPath); + throw UserException + .validationError() + .message("File format is missing and cannot be extracted from query: %s. " + + "Please specify file format explicitly by appending `#csv` or `#json`, etc, to the IPFS path.", hashPath) + .build(logger); + } + } + } + + /** + * Resolve target hash from IPFS/IPNS paths. + * e.g. /ipfs/hash/path/file will be resolved to /ipfs/file_hash + * + * @param helper IPFS helper + * @return the resolved target hash + */ + @JsonProperty + public Multihash getTargetHash(IPFSHelper helper) { + try { + Multihash topHash = helper.resolve(prefix.toString(), path, true); + if (topHash == null) { + throw UserException.validationError().message("Non-existent IPFS path: %s", toString()).build(logger); + } + return topHash; + } catch (Exception e) { + throw UserException + .executionError(e) + .message("Unable to resolve IPFS path; is it a valid IPFS path?") + .build(logger); + } + } + + @JsonProperty + public Prefix getPrefix() { + return prefix; + } + + @JsonProperty + public String getPath() { + return path; + } + + @JsonProperty + public Format getFormatExtension() { + return formatExtension; + } + + @JsonIgnore + public IPFSContext getIPFSContext() { + return ipfsContext; + } + + @JsonProperty("IPFSStoragePluginConfig") + public IPFSStoragePluginConfig getIPFSStoragePluginConfig() { + return ipfsContext.getStoragePluginConfig(); + } + + @Override + public String toString() { + return new PlanStringBuilder(this) + .field("prefix", prefix) + .field("path", path) + .field("format", formatExtension) + .toString(); + } +} diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSchemaFactory.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSchemaFactory.java new file mode 100644 index 00000000000..1261896123a --- /dev/null +++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSchemaFactory.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.exec.store.ipfs; + +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; +import org.apache.drill.exec.planner.logical.DynamicDrillTable; +import org.apache.drill.exec.store.AbstractSchema; +import org.apache.drill.exec.store.SchemaConfig; +import org.apache.drill.exec.store.SchemaFactory; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; +import org.apache.drill.shaded.guava.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; + +public class IPFSSchemaFactory implements SchemaFactory { + private static final Logger logger = LoggerFactory.getLogger(IPFSSchemaFactory.class); + + final String schemaName; + final IPFSContext context; + + public IPFSSchemaFactory(IPFSContext context, String name) { + this.context = context; + this.schemaName = name; + } + + @Override + public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) { + logger.debug("registerSchemas {}", schemaName); + IPFSTables schema = new IPFSTables(schemaName); + SchemaPlus hPlus = parent.add(schemaName, schema); + schema.setHolder(hPlus); + } + + class IPFSTables extends AbstractSchema { + private final Set tableNames = Sets.newHashSet(); + private final ConcurrentMap tables = new ConcurrentSkipListMap<>(String::compareToIgnoreCase); + + public IPFSTables(String name) { + super(ImmutableList.of(), name); + tableNames.add(name); + } + + public void setHolder(SchemaPlus plusOfThis) { + } + + @Override + public String getTypeName() { + return IPFSStoragePluginConfig.NAME; + } + + @Override + public Set getTableNames() { + return Collections.emptySet(); + } + + @Override + public Table getTable(String tableName) { + //DRILL-7766: handle placeholder table name when the table is yet to create + logger.debug("getTable in IPFSTables {}", tableName); + if (tableName.equals("create")) { + return null; + } + + IPFSScanSpec spec = new IPFSScanSpec(context, tableName); + return tables.computeIfAbsent(name, + n -> new DynamicDrillTable(context.getStoragePlugin(), schemaName, spec)); + } + + @Override + public AbstractSchema getSubSchema(String name) { + return null; + } + + @Override + public Set getSubSchemaNames() { + return Collections.emptySet(); + } + + @Override + public boolean isMutable() { + logger.debug("IPFS Schema isMutable called"); + return true; + } + } +} diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePlugin.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePlugin.java new file mode 100644 index 00000000000..96eec770d08 --- /dev/null +++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePlugin.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.exec.store.ipfs; + + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.drill.common.JSONOptions; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.AbstractStoragePlugin; +import org.apache.drill.exec.store.SchemaConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +public class IPFSStoragePlugin extends AbstractStoragePlugin { + private static final Logger logger = LoggerFactory.getLogger(IPFSStoragePlugin.class); + + private final IPFSContext ipfsContext; + private final IPFSStoragePluginConfig pluginConfig; + private final IPFSSchemaFactory schemaFactory; + + public IPFSStoragePlugin(IPFSStoragePluginConfig config, DrillbitContext context, String name) throws IOException { + super(context, name); + this.ipfsContext = new IPFSContext(config, this); + this.schemaFactory = new IPFSSchemaFactory(this.ipfsContext, name); + this.pluginConfig = config; + } + + @Override + public boolean supportsRead() { + return true; + } + + @Override + public boolean supportsWrite() { + return true; + } + + @Override + public IPFSGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException { + return getPhysicalScan(userName, selection, (List) null); + } + + @Override + public IPFSGroupScan getPhysicalScan(String userName, JSONOptions selection, List columns) throws IOException { + logger.debug("IPFSStoragePlugin before getPhysicalScan"); + IPFSScanSpec spec = selection.getListWith(new ObjectMapper(), new TypeReference() {}); + logger.debug("IPFSStoragePlugin getPhysicalScan with selection {}, columns {}", selection, columns); + return new IPFSGroupScan(ipfsContext, spec, columns); + } + + @Override + public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) { + schemaFactory.registerSchemas(schemaConfig, parent); + } + + @Override + public IPFSStoragePluginConfig getConfig() { + return pluginConfig; + } + + public IPFSContext getIPFSContext() { + return ipfsContext; + } +} diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java new file mode 100644 index 00000000000..c34466f7198 --- /dev/null +++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.exec.store.ipfs; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.logical.FormatPluginConfig; +import org.apache.drill.common.logical.StoragePluginConfigBase; +import org.apache.drill.shaded.guava.com.google.common.base.Objects; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap; +import org.apache.drill.shaded.guava.com.google.common.collect.Maps; + +import java.security.InvalidParameterException; +import java.util.Map; + +@JsonTypeName(IPFSStoragePluginConfig.NAME) +public class IPFSStoragePluginConfig extends StoragePluginConfigBase { + public static final String NAME = "ipfs"; + + @JsonProperty + private final String host; + + @JsonProperty + private final int port; + + @JsonProperty("max-nodes-per-leaf") + private final int maxNodesPerLeaf; + + @JsonProperty("ipfs-timeouts") + private final Map ipfsTimeouts; + + @JsonProperty("ipfs-caches") + private final Map ipfsCaches; + + @JsonIgnore + public static final Map ipfsTimeoutDefaults = ImmutableMap.of( + IPFSTimeOut.FIND_PROV, 4, + IPFSTimeOut.FIND_PEER_INFO, 4, + IPFSTimeOut.FETCH_DATA, 6 + ); + + @JsonIgnore + public static final Map ipfsCacheDefaults = ImmutableMap.of( + IPFSCacheType.PEER, new IPFSCache(1000, 600), + IPFSCacheType.PROVIDER, new IPFSCache(1000, 600) + ); + + public enum IPFSTimeOut { + @JsonProperty("find-provider") + FIND_PROV("find-provider"), + @JsonProperty("find-peer-info") + FIND_PEER_INFO("find-peer-info"), + @JsonProperty("fetch-data") + FETCH_DATA("fetch-data"); + + @JsonProperty("type") + private final String which; + + IPFSTimeOut(String which) { + this.which = which; + } + + @JsonCreator + public static IPFSTimeOut of(String which) { + switch (which) { + case "find-provider": + return FIND_PROV; + case "find-peer-info": + return FIND_PEER_INFO; + case "fetch-data": + return FETCH_DATA; + default: + throw new InvalidParameterException("Unknown key for IPFS timeout config entry: " + which); + } + } + + @Override + public String toString() { + return this.which; + } + } + + public enum IPFSCacheType { + @JsonProperty("peer") + PEER("peer"), + @JsonProperty("provider") + PROVIDER("provider"); + + @JsonProperty("type") + private final String which; + + IPFSCacheType(String which) { + this.which = which; + } + + @JsonCreator + public static IPFSCacheType of(String which) { + switch (which) { + case "peer": + return PEER; + case "provider": + return PROVIDER; + default: + throw new InvalidParameterException("Unknown key for cache config entry: " + which); + } + } + + @Override + public String toString() { + return this.which; + } + } + + public static class IPFSCache { + @JsonProperty + public final int size; + @JsonProperty + public final int ttl; + + @JsonCreator + public IPFSCache(@JsonProperty("size") int size, @JsonProperty("ttl") int ttl) { + Preconditions.checkArgument(size >= 0 && ttl > 0); + this.size = size; + this.ttl = ttl; + } + } + + @JsonProperty("groupscan-worker-threads") + private final int numWorkerThreads; + + @JsonProperty + private final Map formats; + + @JsonCreator + public IPFSStoragePluginConfig( + @JsonProperty("host") String host, + @JsonProperty("port") int port, + @JsonProperty("max-nodes-per-leaf") int maxNodesPerLeaf, + @JsonProperty("ipfs-timeouts") Map ipfsTimeouts, + @JsonProperty("ipfs-caches") Map ipfsCaches, + @JsonProperty("groupscan-worker-threads") int numWorkerThreads, + @JsonProperty("formats") Map formats) { + this.host = host; + this.port = port; + this.maxNodesPerLeaf = maxNodesPerLeaf > 0 ? maxNodesPerLeaf : 1; + this.ipfsTimeouts = applyDefaultMap(ipfsTimeouts, ipfsTimeoutDefaults); + this.ipfsCaches = applyDefaultMap(ipfsCaches, ipfsCacheDefaults); + this.numWorkerThreads = numWorkerThreads > 0 ? numWorkerThreads : 1; + this.formats = formats; + } + + private static Map applyDefaultMap(Map supplied, Map defaults) { + Map ret; + if (supplied == null) { + ret = defaults; + } else { + ret = Maps.newHashMap(); + supplied.forEach(ret::put); + defaults.forEach(ret::putIfAbsent); + } + return ret; + } + + @JsonProperty + public String getHost() { + return host; + } + + @JsonProperty + public int getPort() { + return port; + } + + @JsonProperty("max-nodes-per-leaf") + public int getMaxNodesPerLeaf() { + return maxNodesPerLeaf; + } + + @JsonIgnore + public int getIPFSTimeout(IPFSTimeOut which) { + return ipfsTimeouts.get(which); + } + + @JsonIgnore + public IPFSCache getIPFSCache(IPFSCacheType which) { + return ipfsCaches.get(which); + } + + @JsonProperty("ipfs-timeouts") + public Map getIPFSTimeouts() { + return ipfsTimeouts; + } + + @JsonProperty("ipfs-caches") + public Map getIPFSCaches() { + return ipfsCaches; + } + + @JsonProperty("groupscan-worker-threads") + public int getNumWorkerThreads() { + return numWorkerThreads; + } + + @JsonProperty + public Map getFormats() { + return formats; + } + + @Override + public int hashCode() { + return Objects.hashCode(host, port, maxNodesPerLeaf, ipfsTimeouts, ipfsCaches, formats); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + IPFSStoragePluginConfig other = (IPFSStoragePluginConfig) obj; + return Objects.equal(formats, other.formats) + && Objects.equal(host, other.host) + && Objects.equal(ipfsTimeouts, other.ipfsTimeouts) + && Objects.equal(ipfsCaches, other.ipfsTimeouts) + && port == other.port + && maxNodesPerLeaf == other.maxNodesPerLeaf + && numWorkerThreads == other.numWorkerThreads; + } +} diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSubScan.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSubScan.java new file mode 100644 index 00000000000..c34840f1f65 --- /dev/null +++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSubScan.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.exec.store.ipfs; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import io.ipfs.cid.Cid; +import io.ipfs.multihash.Multihash; +import org.apache.drill.common.PlanStringBuilder; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + + +@JsonTypeName("ipfs-sub-scan") +public class IPFSSubScan extends AbstractBase implements SubScan { + private final IPFSContext ipfsContext; + private final List ipfsSubScanSpecList; + private final IPFSScanSpec.Format format; + private final List columns; + + + @JsonCreator + public IPFSSubScan(@JacksonInject StoragePluginRegistry registry, + @JsonProperty("IPFSStoragePluginConfig") IPFSStoragePluginConfig ipfsStoragePluginConfig, + @JsonProperty("IPFSSubScanSpec") @JsonDeserialize(using = MultihashDeserializer.class) List ipfsSubScanSpecList, + @JsonProperty("format") IPFSScanSpec.Format format, + @JsonProperty("columns") List columns + ) { + super((String) null); + IPFSStoragePlugin plugin = registry.resolve(ipfsStoragePluginConfig, IPFSStoragePlugin.class); + ipfsContext = plugin.getIPFSContext(); + this.ipfsSubScanSpecList = ipfsSubScanSpecList; + this.format = format; + this.columns = columns; + } + + public IPFSSubScan(IPFSContext ipfsContext, List ipfsSubScanSpecList, IPFSScanSpec.Format format, List columns) { + super((String) null); + this.ipfsContext = ipfsContext; + this.ipfsSubScanSpecList = ipfsSubScanSpecList; + this.format = format; + this.columns = columns; + } + + @JsonIgnore + public IPFSContext getIPFSContext() { + return ipfsContext; + } + + @JsonProperty("IPFSStoragePluginConfig") + public IPFSStoragePluginConfig getIPFSStoragePluginConfig() { + return ipfsContext.getStoragePluginConfig(); + } + + @JsonProperty("columns") + public List getColumns() { + return columns; + } + + @JsonProperty("format") + public IPFSScanSpec.Format getFormat() { + return format; + } + + @Override + public String toString() { + return new PlanStringBuilder(this) + .field("scan spec", ipfsSubScanSpecList) + .field("format", format) + .field("columns", columns) + .toString(); + } + + @JsonSerialize(using = MultihashSerializer.class) + @JsonProperty("IPFSSubScanSpec") + public List getIPFSSubScanSpecList() { + return ipfsSubScanSpecList; + } + + @Override + public T accept( + PhysicalVisitor physicalVisitor, X value) throws E { + return physicalVisitor.visitSubScan(this, value); + } + + @Override + public Iterator iterator() { + return ImmutableSet.of().iterator(); + } + + @Override + public int getOperatorType() { + return UserBitShared.CoreOperatorType.IPFS_SUB_SCAN_VALUE; + } + + @Override + public boolean isExecutable() { + return false; + } + + @Override + public PhysicalOperator getNewWithChildren(List children) { + return new IPFSSubScan(ipfsContext, ipfsSubScanSpecList, format, columns); + } + + public static class IPFSSubScanSpec { + private final String targetHash; + + @JsonCreator + public IPFSSubScanSpec(@JsonProperty("targetHash") String targetHash) { + this.targetHash = targetHash; + } + + @JsonProperty + public String getTargetHash() { + return targetHash; + } + } + + static class MultihashSerializer extends JsonSerializer> { + + @Override + public void serialize(List value, JsonGenerator jgen, + SerializerProvider provider) throws IOException { + jgen.writeStartArray(); + for (Multihash hash : value) { + jgen.writeString(hash.toString()); + } + jgen.writeEndArray(); + } + } + + static class MultihashDeserializer extends JsonDeserializer> { + @Override + public List deserialize(JsonParser jp, DeserializationContext ctxt) + throws IOException { + assert jp.currentToken() == JsonToken.START_ARRAY; + + List multihashList = new ArrayList<>(); + while (jp.nextToken() != JsonToken.END_ARRAY) { + String hash = jp.getValueAsString(); + multihashList.add(Cid.decode(hash)); + } + return multihashList; + } + } +} diff --git a/contrib/storage-ipfs/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-ipfs/src/main/resources/bootstrap-storage-plugins.json new file mode 100644 index 00000000000..54d8427aa6e --- /dev/null +++ b/contrib/storage-ipfs/src/main/resources/bootstrap-storage-plugins.json @@ -0,0 +1,29 @@ +{ + "storage": { + "ipfs": { + "type": "ipfs", + "host": "127.0.0.1", + "port": 5001, + "max-nodes-per-leaf": 3, + "ipfs-timeouts": { + "find-provider": 4, + "find-peer-info": 4, + "fetch-data": 5 + }, + "ipfs-caches": { + "peer": { + "size": 100, + "ttl": 600 + }, + "provider": { + "size": 1000, + "ttl": 600 + } + }, + "groupscan-worker-threads": 50, + "formats": null, + "enabled": false + } + } +} +

 \ No newline at end of file diff --git a/contrib/storage-ipfs/src/main/resources/drill-module.conf b/contrib/storage-ipfs/src/main/resources/drill-module.conf new file mode 100644 index 00000000000..f664e6dd257 --- /dev/null +++ b/contrib/storage-ipfs/src/main/resources/drill-module.conf @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This file tells Drill to consider this module when class path scanning. +# This file can also include any supplementary configuration information. +# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information. + +drill: { + classpath.scanning: { + packages += "org.apache.drill.exec.store.ipfs" + } +} \ No newline at end of file diff --git a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestBase.java b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestBase.java new file mode 100644 index 00000000000..dc6b05bf674 --- /dev/null +++ b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestBase.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.exec.store.ipfs; + + +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterFixtureBuilder; +import org.apache.drill.test.ClusterTest; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IPFSTestBase extends ClusterTest { + private static final Logger logger = LoggerFactory.getLogger(IPFSTestBase.class); + private static StoragePluginRegistry pluginRegistry; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher); + builder.configProperty(ExecConstants.INITIAL_BIT_PORT, IPFSGroupScan.DEFAULT_CONTROL_PORT) + .configProperty(ExecConstants.INITIAL_DATA_PORT, IPFSGroupScan.DEFAULT_DATA_PORT) + .configProperty(ExecConstants.INITIAL_USER_PORT, IPFSGroupScan.DEFAULT_USER_PORT) + .configProperty(ExecConstants.DRILL_PORT_HUNT, false) + .configProperty(ExecConstants.ALLOW_LOOPBACK_ADDRESS_BINDING, true) + .clusterSize(1) + .withLocalZk(); + startCluster(builder); + pluginRegistry = cluster.drillbit().getContext().getStorage(); + + IPFSTestSuit.initIPFS(); + initIPFSStoragePlugin(); + } + + private static void initIPFSStoragePlugin() throws Exception { + pluginRegistry + .put( + IPFSStoragePluginConfig.NAME, + IPFSTestSuit.getIpfsStoragePluginConfig()); + } + + @AfterClass + public static void tearDownIPFSTestBase() throws StoragePluginRegistry.PluginException { + if (pluginRegistry != null) { + pluginRegistry.remove(IPFSStoragePluginConfig.NAME); + } else { + logger.warn("Plugin Registry was null"); + } + } +} \ No newline at end of file diff --git a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestConstants.java b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestConstants.java new file mode 100644 index 00000000000..bd1a1ca2d39 --- /dev/null +++ b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestConstants.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.exec.store.ipfs; + +import io.ipfs.cid.Cid; +import io.ipfs.multiaddr.MultiAddress; +import io.ipfs.multihash.Multihash; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface IPFSTestConstants { + String MOCK_NODE_ID_STRING = "QmP14kRKf1mR6LAYgfuuMirscgZNYbzMCHQ1ebe4bBKdah"; + Multihash MOCK_NODE_ID_MULTIHASH = Multihash.fromBase58(MOCK_NODE_ID_STRING); + String MOCK_NODE_ADDR = "127.0.0.1"; + int MOCK_NODE_IPFS_SWARM_PORT = 4001; + int MOCK_NODE_IPFS_API_PORT = 5001; + List MOCK_NODE_ADDRS = ImmutableList.of( + String.format("/ip4/%s/tcp/%d/ipfs/%s", MOCK_NODE_ADDR, MOCK_NODE_IPFS_SWARM_PORT, MOCK_NODE_ID_STRING) + ); + List MOCK_NODE_MULTIADDRS = MOCK_NODE_ADDRS.stream().map(MultiAddress::new).collect(Collectors.toList()); + + String SIMPLE_DATASET_HASH_STRING = "QmcbeavnEofA6NjG7vkpe1yLJo6En6ML4JnDooDn1BbKmR"; + Multihash SIMPLE_DATASET_MULTIHASH = Multihash.fromBase58(SIMPLE_DATASET_HASH_STRING); + Cid SIMPLE_DATASET_CID_V1 = Cid.build(1, Cid.Codec.DagProtobuf, SIMPLE_DATASET_MULTIHASH); + String SIMPLE_DATASET_CID_V1_STRING = SIMPLE_DATASET_CID_V1.toString(); + + /** + * Chunked dataset layout: + * top object: QmSeX1YAGWMXoPrgeKBTq2Be6NdRzTVESeeWyt7mQFuvzo + * +-- 1 QmSmDFd1GcLPyYtscdtkBCj7gbNKiJ8MkaBPEFMz9orPEi chunked-json-1.json (162 bytes) + * +-- 2 QmQVBWTZ7MZjwHv5q9qG3zLzczsh8PGAVRWhF2gKsrj1hP chunked-json-2.json (159 bytes) + * +-- 3 QmY8ghdB3mwdUAdBmft3bdgzPVcq8bCvtqTRd9wu3LjyTd chunked-json-3.json (89 bytes) + */ + String CHUNKED_DATASET_HASH_STRING = "QmSeX1YAGWMXoPrgeKBTq2Be6NdRzTVESeeWyt7mQFuvzo"; + Multihash CHUNKED_DATASET_MULTIHASH = Multihash.fromBase58(CHUNKED_DATASET_HASH_STRING); + Map CHUNKS_MULTIHASH = ImmutableMap.of( + "chunked-json-1.json", Multihash.fromBase58("QmSmDFd1GcLPyYtscdtkBCj7gbNKiJ8MkaBPEFMz9orPEi"), + "chunked-json-2.json", Multihash.fromBase58("QmQVBWTZ7MZjwHv5q9qG3zLzczsh8PGAVRWhF2gKsrj1hP"), + "chunked-json-3.json", Multihash.fromBase58("QmY8ghdB3mwdUAdBmft3bdgzPVcq8bCvtqTRd9wu3LjyTd") + ); + + static String getQueryPath(Multihash dataset) { + return IPFSTestConstants.getQueryPath(dataset.toString()); + } + + static String getQueryPath(String dataset) { + return String.format("/ipfs/%s#json", dataset); + } +} diff --git a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestDataGenerator.java b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestDataGenerator.java new file mode 100644 index 00000000000..9304159bb53 --- /dev/null +++ b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestDataGenerator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.exec.store.ipfs; + +import io.ipfs.api.IPFS; +import io.ipfs.api.MerkleNode; +import io.ipfs.multihash.Multihash; +import org.apache.drill.shaded.guava.com.google.common.io.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Optional; + +public class IPFSTestDataGenerator { + private static final Logger logger = LoggerFactory.getLogger(IPFSTestDataGenerator.class); + + public static Multihash importSimple(IPFSStoragePluginConfig config) { + try { + final IPFS client = new IPFS(config.getHost(), config.getPort()); + File testFile = new File(Resources.getResource("simple.json").toURI()); + return addObject(client, Files.readAllBytes(testFile.toPath())); + } catch (Exception e) { + logger.error("Failed to import data: %s", e); + return null; + } + } + + public static Multihash importChunked(IPFSStoragePluginConfig config) { + try { + final IPFS client = new IPFS(config.getHost(), config.getPort()); + + Multihash base = IPFSHelper.IPFS_NULL_OBJECT; + for (int i = 1; i <= 3; i++) { + File testFile = new File(Resources.getResource(String.format("chunked-json-%d.json", i)).toURI()); + Multihash chunk = addObject(client, Files.readAllBytes(testFile.toPath())); + base = addLink(client, base, String.format("%d", i), chunk); + } + + return base; + } catch (Exception e) { + logger.error("Failed to import data: %s", e); + return null; + } + } + + private static Multihash addObject(IPFS client, byte[] data) throws IOException { + MerkleNode node = client.object.patch(IPFSHelper.IPFS_NULL_OBJECT, "set-data", Optional.of(data), Optional.empty(), Optional.empty()); + return node.hash; + } + + private static Multihash addLink(IPFS client, Multihash base, String name, Multihash referent) throws IOException { + MerkleNode node = client.object.patch(base, "add-link", Optional.empty(), Optional.of(name), Optional.of(referent)); + return node.hash; + } +} diff --git a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestSuit.java b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestSuit.java new file mode 100644 index 00000000000..a2be3184b3e --- /dev/null +++ b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/IPFSTestSuit.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.exec.store.ipfs; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.drill.categories.IPFSStorageTest; +import org.apache.drill.categories.SlowTest; +import org.apache.drill.shaded.guava.com.google.common.io.Resources; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + +@RunWith(Suite.class) +@Suite.SuiteClasses({TestIPFSQueries.class, TestIPFSGroupScan.class, TestIPFSScanSpec.class}) +@Category({SlowTest.class, IPFSStorageTest.class}) +public class IPFSTestSuit { + private static final Logger logger = LoggerFactory.getLogger(IPFSTestSuit.class); + + private static final ObjectMapper mapper = new ObjectMapper(); + + private static IPFSStoragePluginConfig ipfsStoragePluginConfig = null; + + @BeforeClass + public static void initIPFS() { + try { + JsonNode storagePluginJson = mapper.readTree(new File(Resources.getResource("bootstrap-storage-plugins.json").toURI())); + ipfsStoragePluginConfig = mapper.treeToValue(storagePluginJson.get("storage").get("ipfs"), IPFSStoragePluginConfig.class); + ipfsStoragePluginConfig.setEnabled(true); + } catch (Exception e) { + logger.error("Error initializing IPFS ", e); + } + } + + public static IPFSStoragePluginConfig getIpfsStoragePluginConfig() { + return ipfsStoragePluginConfig; + } +} diff --git a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSGroupScan.java b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSGroupScan.java new file mode 100644 index 00000000000..f944aef03c4 --- /dev/null +++ b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSGroupScan.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.exec.store.ipfs; + +import io.ipfs.api.IPFS; +import io.ipfs.api.JSONParser; +import io.ipfs.api.MerkleNode; +import io.ipfs.multiaddr.MultiAddress; +import io.ipfs.multihash.Multihash; +import org.apache.drill.categories.IPFSStorageTest; +import org.apache.drill.categories.SlowTest; +import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder; +import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap; +import org.apache.drill.shaded.guava.com.google.common.io.Resources; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mock; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@Category({SlowTest.class, IPFSStorageTest.class}) +public class TestIPFSGroupScan extends IPFSTestBase implements IPFSTestConstants { + @Mock + private IPFS ipfs; + @Mock + private IPFSCompat ipfsCompat; + @Mock + private IPFSHelper ipfsHelper; + @Mock + private IPFSStoragePlugin plugin; + @Mock + private IPFSPeer myself; + + @Before + public void before() { + + ipfs = Mockito.mock(IPFS.class); + ipfsCompat = Mockito.mock(IPFSCompat.class); + ipfsHelper = Mockito.mock(IPFSHelper.class); + plugin = Mockito.mock(IPFSStoragePlugin.class); + + try { + IPFSStoragePluginConfig config = IPFSTestSuit.getIpfsStoragePluginConfig(); + + Mockito.when(ipfs.id()).thenReturn(ImmutableMap.of( + "ID", MOCK_NODE_ID_STRING, + "Addresses", MOCK_NODE_ADDRS + )); + + IPFSContext context = Mockito.mock(IPFSContext.class); + myself = getMockedIPFSPeer( + MOCK_NODE_ID_MULTIHASH, + MOCK_NODE_MULTIADDRS, + true, + Optional.of(MOCK_NODE_ADDR) + ); + + Mockito.when(plugin.getConfig()).thenReturn(config); + Mockito.when(plugin.getIPFSContext()).thenReturn(context); + Mockito.when(plugin.getContext()).thenReturn(cluster.drillbit().getContext()); + Mockito.when(context.getMyself()).thenReturn(myself); + Mockito.when(context.getIPFSHelper()).thenReturn(ipfsHelper); + Mockito.when(context.getStoragePlugin()).thenReturn(plugin); + Mockito.when(context.getStoragePluginConfig()).thenReturn(config); + Mockito.when(context.getIPFSClient()).thenReturn(ipfs); + Mockito.when(context.getIPFSPeerCache()).thenReturn( + CacheBuilder.newBuilder() + .maximumSize(1) + .build(CacheLoader.from(key -> { + if (myself.getId().equals(key)) { + return myself; + } else { + return null; + } + }) + )); + Mockito.when(context.getProviderCache()).thenReturn( + CacheBuilder.newBuilder() + .maximumSize(1) + .build(CacheLoader.from(key -> ImmutableList.of(myself.getId()))) + ); + Mockito.when(ipfsHelper.getClient()).thenReturn(ipfs); + } catch (IOException e) { + fail(e.getMessage()); + } + } + + @Test + public void testSimpleDatasetWithNoAnyOtherProviders() { + try { + Mockito.when(ipfsHelper.getObjectLinksTimeout(Mockito.any(Multihash.class))).thenReturn(new MerkleNode(SIMPLE_DATASET_HASH_STRING)); + Mockito.when(ipfsHelper.findprovsTimeout(Mockito.any(Multihash.class))).thenReturn(ImmutableList.of(MOCK_NODE_ID_MULTIHASH)); + //called in IPFSScanSpec.getTargetHash + Mockito.when(ipfsHelper.resolve(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean())).thenReturn(SIMPLE_DATASET_MULTIHASH); + Mockito.when(ipfsHelper.isDrillReady(Mockito.any(Multihash.class))).thenReturn(true); + Mockito.when(ipfsHelper.findpeerTimeout(Mockito.any(Multihash.class))).thenReturn(MOCK_NODE_MULTIADDRS); + + File simpleDataset = new File(Resources.getResource("simple.json").toURI()); + byte[] contents = Files.readAllBytes(simpleDataset.toPath()); + Mockito.when(ipfsHelper.getObjectDataTimeout(Mockito.any(Multihash.class))).thenReturn(contents); + + IPFSContext context = plugin.getIPFSContext(); + IPFSGroupScan groupScan = new IPFSGroupScan(context, new IPFSScanSpec(context, IPFSTestConstants.getQueryPath(SIMPLE_DATASET_MULTIHASH)), null); + Map map = groupScan.getLeafAddrMappings(SIMPLE_DATASET_MULTIHASH); + assertEquals(map.keySet().size(), 1); + assertEquals(map.get(SIMPLE_DATASET_MULTIHASH), MOCK_NODE_ADDR); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testChunkedDatasetWithNoAnyOtherProviders() { + try { + Mockito.when(ipfsHelper.getObjectLinksTimeout(CHUNKED_DATASET_MULTIHASH)).thenReturn(MerkleNode.fromJSON(JSONParser.parse("{\"Hash\":\"QmSeX1YAGWMXoPrgeKBTq2Be6NdRzTVESeeWyt7mQFuvzo\",\"Links\":[{\"Name\":\"1\",\"Hash\":\"QmSmDFd1GcLPyYtscdtkBCj7gbNKiJ8MkaBPEFMz9orPEi\",\"Size\":162},{\"Name\":\"2\",\"Hash\":\"QmQVBWTZ7MZjwHv5q9qG3zLzczsh8PGAVRWhF2gKsrj1hP\",\"Size\":159},{\"Name\":\"3\",\"Hash\":\"QmY8ghdB3mwdUAdBmft3bdgzPVcq8bCvtqTRd9wu3LjyTd\",\"Size\":89}]}\n"))); + Mockito.when(ipfsHelper.findprovsTimeout(Mockito.any(Multihash.class))).thenReturn(ImmutableList.of(MOCK_NODE_ID_MULTIHASH)); + Mockito.when(ipfsHelper.resolve(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean())).thenReturn(CHUNKED_DATASET_MULTIHASH); + Mockito.when(ipfsHelper.isDrillReady(Mockito.any(Multihash.class))).thenReturn(true); + Mockito.when(ipfsHelper.findpeerTimeout(Mockito.any(Multihash.class))).thenReturn(MOCK_NODE_MULTIADDRS); + for (Map.Entry entry : CHUNKS_MULTIHASH.entrySet()) { + File chunkFile = new File(Resources.getResource(entry.getKey()).toURI()); + Mockito.when(ipfsHelper.getObjectDataTimeout(entry.getValue())).thenReturn(Files.readAllBytes(chunkFile.toPath())); + Mockito.when(ipfsHelper.getObjectLinksTimeout(entry.getValue())).thenReturn(new MerkleNode(entry.getValue().toBase58())); + } + + IPFSContext context = plugin.getIPFSContext(); + IPFSGroupScan groupScan = new IPFSGroupScan(context, new IPFSScanSpec(context, IPFSTestConstants.getQueryPath(SIMPLE_DATASET_MULTIHASH)), null); + Map map = groupScan.getLeafAddrMappings(CHUNKED_DATASET_MULTIHASH); + assertEquals(map.keySet().size(), 3); + for (Map.Entry entry : CHUNKS_MULTIHASH.entrySet()) { + assertEquals(map.get(entry.getValue()), MOCK_NODE_ADDR); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + + private IPFSPeer getMockedIPFSPeer(Multihash multihashId, List addrs, boolean isDrillReady, + Optional drillbitAddress) { + IPFSPeer peer = Mockito.mock(IPFSPeer.class); + Mockito.when(peer.getId()).thenReturn(multihashId); + Mockito.when(peer.getMultiAddresses()).thenReturn(addrs); + Mockito.when(peer.getDrillbitAddress()).thenReturn(drillbitAddress); + Mockito.when(peer.hasDrillbitAddress()).thenReturn(drillbitAddress.isPresent()); + Mockito.when(peer.toString()).thenReturn(multihashId.toBase58()); + + return peer; + } +} diff --git a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSQueries.java b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSQueries.java new file mode 100644 index 00000000000..b1fdda7f50b --- /dev/null +++ b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSQueries.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.exec.store.ipfs; + +import io.ipfs.multihash.Multihash; +import org.apache.drill.categories.IPFSStorageTest; +import org.apache.drill.categories.SlowTest; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.fail; + +@Ignore("Requires running local IPFS daemon") +@Category({SlowTest.class, IPFSStorageTest.class}) +public class TestIPFSQueries extends IPFSTestBase implements IPFSTestConstants { + + @Before + public void checkDrillbitPorts() { + CoordinationProtos.DrillbitEndpoint ep = cluster.drillbit().getRegistrationHandle().getEndPoint(); + int controlPort = ep.getControlPort(); + int userPort = ep.getUserPort(); + int dataPort = ep.getDataPort(); + if (controlPort != IPFSGroupScan.DEFAULT_CONTROL_PORT + || userPort != IPFSGroupScan.DEFAULT_USER_PORT + || dataPort != IPFSGroupScan.DEFAULT_DATA_PORT) { + //DRILL-7754 handle non-default ports + fail(String.format("Drill binded to non-default ports: %d, %d, %d", controlPort, userPort, dataPort)); + } + } + + @Test + public void testNullQuery() throws Exception { + + testBuilder() + .sqlQuery(getSelectStar(IPFSHelper.IPFS_NULL_OBJECT)) + .unOrdered() + .expectsNumRecords(1) + .go(); + } + + @Test + public void testSimple() throws Exception { + Multihash dataset = IPFSTestDataGenerator.importSimple(IPFSTestSuit.getIpfsStoragePluginConfig()); + if (null == dataset) { + fail(); + } + + testBuilder() + .sqlQuery(getSelectStar(dataset)) + .unOrdered() + .expectsNumRecords(1) + .go(); + } + + @Test + public void testSimpleCIDv1() throws Exception { + Multihash dataset = IPFSTestDataGenerator.importSimple(IPFSTestSuit.getIpfsStoragePluginConfig()); + if (null == dataset) { + fail(); + } + + testBuilder() + .sqlQuery(getSelectStar(SIMPLE_DATASET_CID_V1)) + .unOrdered() + .expectsNumRecords(1) + .go(); + } + + @Test + public void testChunked() throws Exception { + Multihash dataset = IPFSTestDataGenerator.importChunked(IPFSTestSuit.getIpfsStoragePluginConfig()); + if (null == dataset) { + fail(); + } + + testBuilder() + .sqlQuery(getSelectStar(dataset)) + .unOrdered() + .expectsNumRecords(5) + .go(); + } + + private static String getSelectStar(Multihash dataset) { + return String.format("SELECT * FROM ipfs.`%s`", IPFSTestConstants.getQueryPath(dataset)); + } +} diff --git a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSScanSpec.java b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSScanSpec.java new file mode 100644 index 00000000000..f163e0de69c --- /dev/null +++ b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSScanSpec.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.drill.exec.store.ipfs; + +import org.apache.drill.categories.IPFSStorageTest; +import org.apache.drill.categories.SlowTest; +import org.apache.drill.common.exceptions.UserException; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mock; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; + +@Category({SlowTest.class, IPFSStorageTest.class}) +public class TestIPFSScanSpec extends IPFSTestBase implements IPFSTestConstants { + @Mock + private IPFSContext context; + + @Before + public void before() { + context = Mockito.mock(IPFSContext.class); + } + + @Test + public void testSimpleDatasetPath() { + IPFSScanSpec spec = new IPFSScanSpec(context, IPFSTestConstants.getQueryPath(SIMPLE_DATASET_MULTIHASH)); + assertEquals(spec.getPath(), SIMPLE_DATASET_HASH_STRING); + } + + @Test(expected = UserException.class) + public void testInvalidPathWithBadPrefix() { + IPFSScanSpec spec = new IPFSScanSpec(context, "/root/data/dataset.json"); + } + + @Test(expected = UserException.class) + public void testInvalidPathWithNoExtension() { + IPFSScanSpec spec = new IPFSScanSpec(context, String.format("/ipfs/%s", SIMPLE_DATASET_HASH_STRING)); + } + + @Test + public void testPathWithCIDv1() { + IPFSScanSpec spec = new IPFSScanSpec(context, IPFSTestConstants.getQueryPath(SIMPLE_DATASET_CID_V1_STRING)); + assertEquals(spec.getPath(), SIMPLE_DATASET_CID_V1_STRING); + } + + @Test + public void testChunkedDatasetPath() { + IPFSScanSpec spec = new IPFSScanSpec(context, String.format("/ipfs/%s/1#json", CHUNKED_DATASET_HASH_STRING)); + assertEquals(spec.getPath(), CHUNKED_DATASET_HASH_STRING + "/1"); + } + + @Test + public void testDataFileWithExplicitExtensionName() { + IPFSScanSpec spec = new IPFSScanSpec(context, "/ipfs/QmSeX1YAGWMXoPrgeKBTq2Be6NdRzTVESeeWyt7mQFuvzo/1.json"); + assertEquals(spec.getPath(), "QmSeX1YAGWMXoPrgeKBTq2Be6NdRzTVESeeWyt7mQFuvzo/1.json"); + } +} diff --git a/contrib/storage-ipfs/src/test/resources/bootstrap-storage-plugins.json b/contrib/storage-ipfs/src/test/resources/bootstrap-storage-plugins.json new file mode 100644 index 00000000000..93435666872 --- /dev/null +++ b/contrib/storage-ipfs/src/test/resources/bootstrap-storage-plugins.json @@ -0,0 +1,29 @@ +{ + "storage": { + "ipfs": { + "type": "ipfs", + "host": "127.0.0.1", + "port": 5001, + "max-nodes-per-leaf": 1, + "ipfs-timeouts": { + "find-provider": 1, + "find-peer-info": 1, + "fetch-data": 1 + }, + "ipfs-caches": { + "peer": { + "size": 2, + "ttl": 30 + }, + "provider": { + "size": 2, + "ttl": 30 + } + }, + "groupscan-worker-threads": 5, + "formats": null, + "enabled": true + } + } +} +

 \ No newline at end of file diff --git a/contrib/storage-ipfs/src/test/resources/chunked-json-1.json b/contrib/storage-ipfs/src/test/resources/chunked-json-1.json new file mode 100644 index 00000000000..d9b444e3a7f --- /dev/null +++ b/contrib/storage-ipfs/src/test/resources/chunked-json-1.json @@ -0,0 +1,12 @@ +{ + "name": "Alice", + "job": "artist", + "age": 30, + "sex": "female" +} +{ + "name": "Bob", + "job": "butcher", + "age": 45, + "sex": "male" +} diff --git a/contrib/storage-ipfs/src/test/resources/chunked-json-2.json b/contrib/storage-ipfs/src/test/resources/chunked-json-2.json new file mode 100644 index 00000000000..a92f47043f4 --- /dev/null +++ b/contrib/storage-ipfs/src/test/resources/chunked-json-2.json @@ -0,0 +1,12 @@ +{ + "name": "Cecil", + "job": "cook", + "age": 32, + "sex": "male" +} +{ + "name": "David", + "job": "doctor", + "age": 28, + "sex": "male" +} diff --git a/contrib/storage-ipfs/src/test/resources/chunked-json-3.json b/contrib/storage-ipfs/src/test/resources/chunked-json-3.json new file mode 100644 index 00000000000..00a15d34d45 --- /dev/null +++ b/contrib/storage-ipfs/src/test/resources/chunked-json-3.json @@ -0,0 +1,6 @@ +{ + "name": "Elizabeth", + "job": "engineer", + "age": 23, + "sex": "female" +} diff --git a/contrib/storage-ipfs/src/test/resources/simple.json b/contrib/storage-ipfs/src/test/resources/simple.json new file mode 100644 index 00000000000..463b47c0831 --- /dev/null +++ b/contrib/storage-ipfs/src/test/resources/simple.json @@ -0,0 +1 @@ +{"name": "Alice", "job": "artist", "age": 30, "sex": "female"} diff --git a/distribution/pom.xml b/distribution/pom.xml index 3f5eeb081bd..26131bd717f 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -301,6 +301,11 @@ drill-opentsdb-storage ${project.version} + + org.apache.drill.contrib + drill-ipfs-storage + ${project.version} + org.apache.drill.contrib drill-mongo-storage diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml index b9a2fce4912..f615453b6e3 100644 --- a/distribution/src/assemble/component.xml +++ b/distribution/src/assemble/component.xml @@ -53,6 +53,7 @@ org.apache.drill.contrib:drill-storage-kafka:jar org.apache.drill.contrib:drill-storage-http:jar org.apache.drill.contrib:drill-opentsdb-storage:jar + org.apache.drill.contrib:drill-ipfs-storage:jar org.apache.drill.contrib:drill-udfs:jar org.apache.drill.contrib:drill-druid-storage:jar diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java index 01a51f07e67..ec8ac31c869 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java @@ -697,6 +697,10 @@ public enum CoreOperatorType * HTTP_SUB_SCAN = 70; */ HTTP_SUB_SCAN(70), + /** + * IPFS_SUB_SCAN = 71; + */ + IPFS_SUB_SCAN(71), ; /** @@ -983,6 +987,10 @@ public enum CoreOperatorType * HTTP_SUB_SCAN = 70; */ public static final int HTTP_SUB_SCAN_VALUE = 70; + /** + * IPFS_SUB_SCAN = 71; + */ + public static final int IPFS_SUB_SCAN_VALUE = 71; public final int getNumber() { @@ -1076,6 +1084,7 @@ public static CoreOperatorType forNumber(int value) { case 68: return DRUID_SUB_SCAN; case 69: return SPSS_SUB_SCAN; case 70: return HTTP_SUB_SCAN; + case 71: return IPFS_SUB_SCAN; default: return null; } } @@ -29055,7 +29064,7 @@ public org.apache.drill.exec.proto.UserBitShared.SaslMessage getDefaultInstanceF "ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000" + "\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014" + "\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022" + - "\032\n\026CANCELLATION_REQUESTED\020\006*\236\013\n\020CoreOper" + + "\032\n\026CANCELLATION_REQUESTED\020\006*\261\013\n\020CoreOper" + "atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST" + "_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020" + "\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH" + @@ -29091,11 +29100,11 @@ public org.apache.drill.exec.proto.UserBitShared.SaslMessage getDefaultInstanceF "CAN\020?\022\022\n\016EXCEL_SUB_SCAN\020@\022\020\n\014SHP_SUB_SCA" + "N\020A\022\024\n\020METADATA_HANDLER\020B\022\027\n\023METADATA_CO" + "NTROLLER\020C\022\022\n\016DRUID_SUB_SCAN\020D\022\021\n\rSPSS_S" + - "UB_SCAN\020E\022\021\n\rHTTP_SUB_SCAN\020F*g\n\nSaslStat" + - "us\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020" + - "SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013" + - "SASL_FAILED\020\004B.\n\033org.apache.drill.exec.p" + - "rotoB\rUserBitSharedH\001" + "UB_SCAN\020E\022\021\n\rHTTP_SUB_SCAN\020F\022\021\n\rIPFS_SUB" + + "_SCAN\020G*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022" + + "\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n" + + "\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org." + + "apache.drill.exec.protoB\rUserBitSharedH\001" }; descriptor = com.google.protobuf.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto index f7b7b02d1b6..ad96911613c 100644 --- a/protocol/src/main/protobuf/UserBitShared.proto +++ b/protocol/src/main/protobuf/UserBitShared.proto @@ -382,6 +382,7 @@ enum CoreOperatorType { DRUID_SUB_SCAN = 68; SPSS_SUB_SCAN = 69; HTTP_SUB_SCAN = 70; + IPFS_SUB_SCAN = 71; } /* Registry that contains list of jars, each jar contains its name and list of function signatures. From 0c34b569c06d2c0fecc5c132f20704e97f6163b6 Mon Sep 17 00:00:00 2001 From: Bowen Ding Date: Sat, 5 Sep 2020 23:47:57 +0800 Subject: [PATCH 2/2] Add manual switch to allow non-distributed execution. --- .../drill/exec/store/ipfs/IPFSGroupScan.java | 240 ++++++++++-------- .../store/ipfs/IPFSStoragePluginConfig.java | 13 +- .../resources/bootstrap-storage-plugins.json | 1 + .../exec/store/ipfs/TestIPFSGroupScan.java | 8 +- .../resources/bootstrap-storage-plugins.json | 1 + 5 files changed, 150 insertions(+), 113 deletions(-) diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java index f68749d85b9..a9534e95749 100644 --- a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java +++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java @@ -55,13 +55,15 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -116,34 +118,21 @@ private void init() { Multihash topHash = ipfsScanSpec.getTargetHash(ipfsHelper); try { - Map leafAddrMap = getLeafAddrMappings(topHash); - logger.debug("Iterating on {} leaves...", leafAddrMap.size()); - ClusterCoordinator coordinator = ipfsContext.getStoragePlugin().getContext().getClusterCoordinator(); - for (Multihash leaf : leafAddrMap.keySet()) { - String peerHostname = leafAddrMap.get(leaf); + Map leafPeerMap = getLeafPeerMappings(topHash); + logger.debug("Iterating on {} leaves...", leafPeerMap.size()); - Optional oep = coordinator.getAvailableEndpoints() - .stream() - .filter(a -> a.getAddress().equals(peerHostname)) - .findAny(); + ClusterCoordinator coordinator = ipfsContext.getStoragePlugin().getContext().getClusterCoordinator(); + for (Multihash leaf : leafPeerMap.keySet()) { DrillbitEndpoint ep; - if (oep.isPresent()) { - ep = oep.get(); - logger.debug("Using existing endpoint {}", ep.getAddress()); + if (config.isDistributedMode()) { + String peerHostname = leafPeerMap + .get(leaf) + .getDrillbitAddress() + .orElseThrow(() -> new RuntimeException("Chosen IPFS peer does not have drillbit address")); + ep = registerEndpoint(coordinator, peerHostname); } else { - logger.debug("created new endpoint on the fly {}", peerHostname); - //DRILL-7754: read ports & version info from IPFS instead of hard-coded - ep = DrillbitEndpoint.newBuilder() - .setAddress(peerHostname) - .setUserPort(DEFAULT_USER_PORT) - .setControlPort(DEFAULT_CONTROL_PORT) - .setDataPort(DEFAULT_DATA_PORT) - .setHttpPort(DEFAULT_HTTP_PORT) - .setVersion(DrillVersionInfo.getVersion()) - .setState(DrillbitEndpoint.State.ONLINE) - .build(); - //DRILL-7777: how to safely remove endpoints that are no longer needed once the query is completed? - ClusterCoordinator.RegistrationHandle handle = coordinator.register(ep); + // the foreman is used to execute the plan + ep = ipfsContext.getStoragePlugin().getContext().getEndpoint(); } IPFSWork work = new IPFSWork(leaf); @@ -161,15 +150,56 @@ private void init() { } } - Map getLeafAddrMappings(Multihash topHash) { + private DrillbitEndpoint registerEndpoint(ClusterCoordinator coordinator, String peerHostname) { + Optional oep = coordinator.getAvailableEndpoints() + .stream() + .filter(ep -> ep.getAddress().equals(peerHostname)) + .findAny(); + DrillbitEndpoint ep; + if (oep.isPresent()) { + ep = oep.get(); + logger.debug("Using existing endpoint {}", ep.getAddress()); + } else { + logger.debug("created new endpoint on the fly {}", peerHostname); + //DRILL-7754: read ports & version info from IPFS instead of hard-coded + ep = DrillbitEndpoint.newBuilder() + .setAddress(peerHostname) + .setUserPort(DEFAULT_USER_PORT) + .setControlPort(DEFAULT_CONTROL_PORT) + .setDataPort(DEFAULT_DATA_PORT) + .setHttpPort(DEFAULT_HTTP_PORT) + .setVersion(DrillVersionInfo.getVersion()) + .setState(DrillbitEndpoint.State.ONLINE) + .build(); + //DRILL-7777: how to safely remove endpoints that are no longer needed once the query is completed? + ClusterCoordinator.RegistrationHandle handle = coordinator.register(ep); + } + + return ep; + } + + Map getLeafPeerMappings(Multihash topHash) { logger.debug("start to recursively expand nested IPFS hashes, topHash={}", topHash); Stopwatch watch = Stopwatch.createStarted(); ForkJoinPool forkJoinPool = new ForkJoinPool(config.getNumWorkerThreads()); - IPFSTreeFlattener topTask = new IPFSTreeFlattener(topHash, false, ipfsContext); - Map leafAddrMap = forkJoinPool.invoke(topTask); + IPFSTreeFlattener topTask = new IPFSTreeFlattener(topHash, ipfsContext); + List leaves = forkJoinPool.invoke(topTask); logger.debug("Took {} ms to expand hash leaves", watch.elapsed(TimeUnit.MILLISECONDS)); - return leafAddrMap; + logger.debug("Start to resolve providers"); + watch.reset().start(); + Map leafPeerMap; + if (config.isDistributedMode()) { + leafPeerMap = forkJoinPool.invoke(new IPFSProviderResolver(leaves, ipfsContext)); + } else { + leafPeerMap = new HashMap<>(); + for (Multihash leaf : leaves) { + leafPeerMap.put(leaf, ipfsContext.getMyself()); + } + } + logger.debug("Took {} ms to resolve providers", watch.elapsed(TimeUnit.MILLISECONDS)); + + return leafPeerMap; } private IPFSGroupScan(IPFSGroupScan that) { @@ -330,50 +360,93 @@ public String toString() { } } - //DRILL-7756: detect and warn about loops/recursions in case of a malformed tree - static class IPFSTreeFlattener extends RecursiveTask> { - private final Multihash hash; - private final boolean isProvider; - private final Map ret = new LinkedHashMap<>(); + static class IPFSProviderResolver extends RecursiveTask> { + private final List leaves; + private final Map ret = new LinkedHashMap<>(); private final IPFSPeer myself; private final IPFSHelper helper; private final LoadingCache peerCache; private final LoadingCache> providerCache; - public IPFSTreeFlattener(Multihash hash, boolean isProvider, IPFSContext context) { + public IPFSProviderResolver(List leaves, IPFSContext context) { + this(leaves, context.getMyself(), context.getIPFSHelper(), context.getIPFSPeerCache(), context.getProviderCache()); + } + + public IPFSProviderResolver(IPFSProviderResolver reference, List leaves) { + this(leaves, reference.myself, reference.helper, reference.peerCache, reference.providerCache); + } + + IPFSProviderResolver(List leaves, IPFSPeer myself, IPFSHelper helper, LoadingCache peerCache, LoadingCache> providerCache) { + this.leaves = leaves; + this.myself = myself; + this.helper = helper; + this.peerCache = peerCache; + this.providerCache = providerCache; + } + + @Override + protected Map compute() { + int totalLeaves = leaves.size(); + if (totalLeaves == 1) { + Multihash hash = leaves.get(0); + List providers = providerCache.getUnchecked(hash).parallelStream() + .map(peerCache::getUnchecked) + .filter(IPFSPeer::isDrillReady) + .filter(IPFSPeer::hasDrillbitAddress) + .collect(Collectors.toList()); + if (providers.size() < 1) { + logger.warn("No drill-ready provider found for leaf {}, adding foreman as the provider", hash); + providers.add(myself); + } + logger.debug("Got {} providers for {} from IPFS", providers.size(), hash); + + //DRILL-7753: better peer selection algorithm + Random random = new Random(); + IPFSPeer chosenPeer = providers.get(random.nextInt(providers.size())); + ret.put(hash, chosenPeer); + logger.debug("Use peer {} for leaf {}", chosenPeer, hash); + return ret; + } + + int firstHalf = totalLeaves / 2; + ImmutableList resolvers = ImmutableList.of( + new IPFSProviderResolver(this, leaves.subList(0, firstHalf)), + new IPFSProviderResolver(this, leaves.subList(firstHalf, totalLeaves)) + ); + resolvers.forEach(ForkJoinTask::fork); + resolvers.reverse().forEach(resolver -> ret.putAll(resolver.join())); + return ret; + } + } + + //DRILL-7756: detect and warn about loops/recursions in case of a malformed tree + static class IPFSTreeFlattener extends RecursiveTask> { + private final Multihash hash; + private final List ret = new LinkedList<>(); + private final IPFSPeer myself; + private final IPFSHelper helper; + + public IPFSTreeFlattener(Multihash hash, IPFSContext context) { this( hash, - isProvider, context.getMyself(), - context.getIPFSHelper(), - context.getIPFSPeerCache(), - context.getProviderCache() + context.getIPFSHelper() ); } - IPFSTreeFlattener(Multihash hash, boolean isProvider, IPFSPeer myself, IPFSHelper ipfsHelper, - LoadingCache peerCache, LoadingCache> providerCache) { + IPFSTreeFlattener(Multihash hash, IPFSPeer myself, IPFSHelper ipfsHelper) { this.hash = hash; - this.isProvider = isProvider; this.myself = myself; this.helper = ipfsHelper; - this.peerCache = peerCache; - this.providerCache = providerCache; } - public IPFSTreeFlattener(IPFSTreeFlattener reference, Multihash hash, boolean isProvider) { - this(hash, isProvider, reference.myself, reference.helper, reference.peerCache, reference.providerCache); + public IPFSTreeFlattener(IPFSTreeFlattener reference, Multihash hash) { + this(hash, reference.myself, reference.helper); } @Override - public Map compute() { + public List compute() { try { - if (isProvider) { - IPFSPeer peer = peerCache.getUnchecked(hash); - ret.put(hash, peer.getDrillbitAddress().orElse(null)); - return ret; - } - MerkleNode metaOrSimpleNode = helper.getObjectLinksTimeout(hash); if (metaOrSimpleNode.links.size() > 0) { logger.debug("{} is a meta node", hash); @@ -382,68 +455,19 @@ public Map compute() { ImmutableList.Builder builder = ImmutableList.builder(); for (Multihash intermediate : intermediates.subList(1, intermediates.size())) { - builder.add(new IPFSTreeFlattener(this, intermediate, false)); + builder.add(new IPFSTreeFlattener(this, intermediate)); } ImmutableList subtasks = builder.build(); subtasks.forEach(IPFSTreeFlattener::fork); - IPFSTreeFlattener first = new IPFSTreeFlattener(this, intermediates.get(0), false); - ret.putAll(first.compute()); + IPFSTreeFlattener first = new IPFSTreeFlattener(this, intermediates.get(0)); + ret.addAll(first.compute()); subtasks.reverse().forEach( - subtask -> ret.putAll(subtask.join()) + subtask -> ret.addAll(subtask.join()) ); } else { logger.debug("{} is a simple node", hash); - List providers = providerCache.getUnchecked(hash).stream() - .map(peerCache::getUnchecked) - .collect(Collectors.toList()); - providers = providers.stream() - .filter(IPFSPeer::isDrillReady) - .collect(Collectors.toList()); - if (providers.size() < 1) { - logger.warn("No drill-ready provider found for leaf {}, adding foreman as the provider", hash); - providers.add(myself); - } - - logger.debug("Got {} providers for {} from IPFS", providers.size(), hash); - ImmutableList.Builder builder = ImmutableList.builder(); - for (IPFSPeer provider : providers.subList(1, providers.size())) { - builder.add(new IPFSTreeFlattener(this, provider.getId(), true)); - } - ImmutableList subtasks = builder.build(); - subtasks.forEach(IPFSTreeFlattener::fork); - - List possibleAddrs = new ArrayList<>(); - Multihash firstProvider = providers.get(0).getId(); - IPFSTreeFlattener firstTask = new IPFSTreeFlattener(this, firstProvider, true); - String firstAddr = firstTask.compute().get(firstProvider); - if (firstAddr != null) { - possibleAddrs.add(firstAddr); - } - - subtasks.reverse().forEach( - subtask -> { - String addr = subtask.join().get(subtask.hash); - if (addr != null) { - possibleAddrs.add(addr); - } - } - ); - - if (possibleAddrs.size() < 1) { - logger.error("All attempts to find an appropriate provider address for {} have failed", hash); - throw UserException - .planError() - .message("No address found for any provider for leaf " + hash) - .build(logger); - } else { - //DRILL-7753: better peer selection algorithm - Random random = new Random(); - String chosenAddr = possibleAddrs.get(random.nextInt(possibleAddrs.size())); - ret.clear(); - ret.put(hash, chosenAddr); - logger.debug("Got peer host {} for leaf {}", chosenAddr, hash); - } + ret.add(hash); } } catch (IOException e) { throw UserException.planError(e).message("Exception during planning").build(logger); diff --git a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java index c34466f7198..f0b358e5b3a 100644 --- a/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java +++ b/contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java @@ -46,6 +46,9 @@ public class IPFSStoragePluginConfig extends StoragePluginConfigBase { @JsonProperty("max-nodes-per-leaf") private final int maxNodesPerLeaf; + @JsonProperty("distributed-mode") + private final boolean distributedMode; + @JsonProperty("ipfs-timeouts") private final Map ipfsTimeouts; @@ -156,6 +159,7 @@ public IPFSStoragePluginConfig( @JsonProperty("host") String host, @JsonProperty("port") int port, @JsonProperty("max-nodes-per-leaf") int maxNodesPerLeaf, + @JsonProperty("distributed-mode") boolean distributedMode, @JsonProperty("ipfs-timeouts") Map ipfsTimeouts, @JsonProperty("ipfs-caches") Map ipfsCaches, @JsonProperty("groupscan-worker-threads") int numWorkerThreads, @@ -163,6 +167,7 @@ public IPFSStoragePluginConfig( this.host = host; this.port = port; this.maxNodesPerLeaf = maxNodesPerLeaf > 0 ? maxNodesPerLeaf : 1; + this.distributedMode = distributedMode; this.ipfsTimeouts = applyDefaultMap(ipfsTimeouts, ipfsTimeoutDefaults); this.ipfsCaches = applyDefaultMap(ipfsCaches, ipfsCacheDefaults); this.numWorkerThreads = numWorkerThreads > 0 ? numWorkerThreads : 1; @@ -196,6 +201,11 @@ public int getMaxNodesPerLeaf() { return maxNodesPerLeaf; } + @JsonProperty("distributed-mode") + public boolean isDistributedMode() { + return distributedMode; + } + @JsonIgnore public int getIPFSTimeout(IPFSTimeOut which) { return ipfsTimeouts.get(which); @@ -228,7 +238,7 @@ public Map getFormats() { @Override public int hashCode() { - return Objects.hashCode(host, port, maxNodesPerLeaf, ipfsTimeouts, ipfsCaches, formats); + return Objects.hashCode(host, port, maxNodesPerLeaf, distributedMode, ipfsTimeouts, ipfsCaches, formats); } @Override @@ -246,6 +256,7 @@ public boolean equals(Object obj) { && Objects.equal(ipfsCaches, other.ipfsTimeouts) && port == other.port && maxNodesPerLeaf == other.maxNodesPerLeaf + && distributedMode == other.distributedMode && numWorkerThreads == other.numWorkerThreads; } } diff --git a/contrib/storage-ipfs/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-ipfs/src/main/resources/bootstrap-storage-plugins.json index 54d8427aa6e..74bca8e6f7f 100644 --- a/contrib/storage-ipfs/src/main/resources/bootstrap-storage-plugins.json +++ b/contrib/storage-ipfs/src/main/resources/bootstrap-storage-plugins.json @@ -5,6 +5,7 @@ "host": "127.0.0.1", "port": 5001, "max-nodes-per-leaf": 3, + "distributed-mode": false, "ipfs-timeouts": { "find-provider": 4, "find-peer-info": 4, diff --git a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSGroupScan.java b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSGroupScan.java index f944aef03c4..489d0cf4c29 100644 --- a/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSGroupScan.java +++ b/contrib/storage-ipfs/src/test/java/org/apache/drill/exec/store/ipfs/TestIPFSGroupScan.java @@ -130,9 +130,9 @@ public void testSimpleDatasetWithNoAnyOtherProviders() { IPFSContext context = plugin.getIPFSContext(); IPFSGroupScan groupScan = new IPFSGroupScan(context, new IPFSScanSpec(context, IPFSTestConstants.getQueryPath(SIMPLE_DATASET_MULTIHASH)), null); - Map map = groupScan.getLeafAddrMappings(SIMPLE_DATASET_MULTIHASH); + Map map = groupScan.getLeafPeerMappings(SIMPLE_DATASET_MULTIHASH); assertEquals(map.keySet().size(), 1); - assertEquals(map.get(SIMPLE_DATASET_MULTIHASH), MOCK_NODE_ADDR); + assertEquals(map.get(SIMPLE_DATASET_MULTIHASH), myself); } catch (Exception e) { fail(e.getMessage()); } @@ -154,10 +154,10 @@ public void testChunkedDatasetWithNoAnyOtherProviders() { IPFSContext context = plugin.getIPFSContext(); IPFSGroupScan groupScan = new IPFSGroupScan(context, new IPFSScanSpec(context, IPFSTestConstants.getQueryPath(SIMPLE_DATASET_MULTIHASH)), null); - Map map = groupScan.getLeafAddrMappings(CHUNKED_DATASET_MULTIHASH); + Map map = groupScan.getLeafPeerMappings(CHUNKED_DATASET_MULTIHASH); assertEquals(map.keySet().size(), 3); for (Map.Entry entry : CHUNKS_MULTIHASH.entrySet()) { - assertEquals(map.get(entry.getValue()), MOCK_NODE_ADDR); + assertEquals(map.get(entry.getValue()), myself); } } catch (Exception e) { fail(e.getMessage()); diff --git a/contrib/storage-ipfs/src/test/resources/bootstrap-storage-plugins.json b/contrib/storage-ipfs/src/test/resources/bootstrap-storage-plugins.json index 93435666872..9d7635606d3 100644 --- a/contrib/storage-ipfs/src/test/resources/bootstrap-storage-plugins.json +++ b/contrib/storage-ipfs/src/test/resources/bootstrap-storage-plugins.json @@ -5,6 +5,7 @@ "host": "127.0.0.1", "port": 5001, "max-nodes-per-leaf": 1, + "distributed-mode": false, "ipfs-timeouts": { "find-provider": 1, "find-peer-info": 1,