-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add hive merge pipeline #33
Changes from 7 commits
544b9e9
c1b02b4
0283d69
3e3e343
dfdbe1c
2440ccd
0a1fb46
e281b53
24c40ef
44ec934
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
--- | ||
connection_string: "jdbc:mysql://mysql:3306/integration_test" | ||
hdfs_basedir: "hdfs://0.0.0.0:8020/user/hive/warehouse" | ||
source_db_user_name: "pipewrench" | ||
password_file: "file:////mount/password.file" | ||
destination_database: "default" | ||
impala_cmd: "impala-shell -i localhost -f " |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
#!/bin/bash -e | ||
pipewrench-merge --conf=tables.yml --debug_level ERROR --env=env.yml --pipeline-templates=../../templates/sqoop-parquet-hdfs-hive-merge |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
This pipeline uses hive to merge the incremental data as explained in https://hortonworks.com/blog/four-step-strategy-incremental-updates-hive/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
#!/bin/bash | ||
# Copyright 2017 Cargill Incorporated | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
set -e | ||
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" | ||
cd $SCRIPT_DIR | ||
# verify we can generate scripts without error | ||
sudo -u hdfs hdfs dfs -rm -r /user/hive/warehouse/* || true | ||
make -j1 integration-test-all -C output/sqoop-parquet-hdfs-hive-merge | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
#!/bin/bash | ||
set -eu | ||
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" | ||
cd $SCRIPT_DIR | ||
./generate-scripts | ||
docker-compose exec kimpala /mount/sqoop-parquet-hdfs-hive-merge/run-in-container.sh |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
--- | ||
name: "sqoop_parquet_hdfs_hive_merge" # The name of this configuration | ||
user_name: {{ source_db_user_name }} # Source database user name | ||
type_mapping: type-mapping.yml # Type mapping used for database type conversion | ||
sqoop_export_dir: {{ hdfs_basedir }}/export # Sqoop export data HDFS path | ||
sqoop_password_file: {{ password_file}} # Password file for sqoop. Must reside in HDFS | ||
sqoop_password: {{ password }} # Password for sqoop. Must reside in HDFS | ||
connection_manager: "org.apache.sqoop.manager.MySQLManager" # Connection manager fully qualified class | ||
sqoop_job_name_suffix: test # Suffix added to sqoop jobs. Can be used to differentiate environments | ||
impala_cmd: "{{ impala_cmd }}" | ||
source_database: | ||
name: "integration_test" # Source database name | ||
connection_string: {{ connection_string }} # Source database connection string. Should be kept in 'env.yml' | ||
cmd: "mysql -P 3306 -uroot -ppipewrench -h mysql <" | ||
staging_database: | ||
name: "{{ destination_database }}" # Staging database name. | ||
path: "{{ hdfs_basedir }}" # Staging database HDFS path | ||
result_database: | ||
name: "{{ destination_database }}" # Result database | ||
path: "{{ hdfs_basedir }}" # Result database HDFS path | ||
tables: | ||
- id: "titanic" # Uniquely identifies this table | ||
META_CONTACT_INFO: "[email protected]" # Contact info will be loaded into tblproperties | ||
META_LOAD_FREQUENCY: "STREAMING" # Load frequency will be loaded into tblproperties | ||
META_SECURITY_CLASSIFICATION: "OPEN" # Security classification will be loaded into tblproperties | ||
META_SOURCE: "upstream.source.location" # Source will be loaded into tblproperties | ||
source: | ||
name: "titanic" # Source table name | ||
file: ../../../../../data/Titanic.csv | ||
destination: | ||
name: "titanic" # Destination (Impala) table name | ||
split_by_column: "Id" # Sqoop split by column (--split-by) | ||
kudu: | ||
hash_by: # List of columns to hash by | ||
- Id | ||
num_partitions: 2 # Number of Kudu partitions to create | ||
check_column: Id # Incrementing timestamp of numeric column used when incrementally pulling data (sqoop --check-column) | ||
primary_keys: Id # List of primary keys | ||
columns: | ||
- name: "Id" # Column name in source table | ||
datatype: "int" # Column datatype in source table | ||
comment: "comment" # Column comment | ||
- name: "LastName" | ||
datatype: "text" | ||
comment: "comment" | ||
- name: "FirstName" | ||
datatype: "text" | ||
comment: "comment" | ||
- name: "PClass" | ||
datatype: "text" | ||
comment: "comment" | ||
- name: "Age" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "Sex" | ||
datatype: "text" | ||
comment: "comment" | ||
- name: "Survived" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "SexCode" | ||
datatype: "int" | ||
comment: "comment" | ||
- id: "vocab" # Uniquely identifies this table | ||
source: | ||
name: "vocab" # Source table name | ||
file: ../../../../../data/Vocab.csv | ||
destination: | ||
name: "vocab" # Destination (Impala) table name | ||
split_by_column: "Id" # Sqoop split by column (--split-by) | ||
kudu: | ||
hash_by: # List of columns to hash by | ||
- Id | ||
num_partitions: 2 # Number of Kudu partitions to create | ||
check_column: Id # Incrementing timestamp of numeric column used when incrementally pulling data (sqoop --check-column) | ||
primary_keys: Id # List of primary keys | ||
columns: | ||
- name: "Id" # Column name in source table | ||
datatype: "int" # Column datatype in source table | ||
comment: "comment" # Column comment | ||
- name: "Year" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "Sex" | ||
datatype: "text" | ||
comment: "comment" | ||
- name: "Education" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "Vocabulary" | ||
datatype: "int" | ||
comment: "comment" | ||
- id: "baseball" # Uniquely identifies this table | ||
source: | ||
name: "baseball" # Source table name | ||
file: ../../../../../data/Baseball.csv | ||
destination: | ||
name: "baseball" # Destination (Impala) table name | ||
split_by_column: "Id" # Sqoop split by column (--split-by) | ||
kudu: | ||
hash_by: # List of columns to hash by | ||
- Id | ||
num_partitions: 2 # Number of Kudu partitions to create | ||
check_column: Id # Incrementing timestamp of numeric column used when incrementally pulling data (sqoop --check-column) | ||
primary_keys: Id # List of primary keys | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is the right way to fix this error. The problem was that you weren't handling the list. The field is called 'primary_keys', and all other tables.yml files have a list here, so I don't think we should break that assumption. |
||
columns: | ||
- name: "Id" # Column name in source table | ||
datatype: "int" # Column datatype in source table | ||
comment: "comment" # Column comment | ||
- name: "PlayerId" | ||
datatype: "text" | ||
comment: "comment" | ||
- name: "Year" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "Stint" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "Team" | ||
datatype: "text" | ||
comment: "comment" | ||
- name: "LG" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "G" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "AB" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "R" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "H" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "X2b" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "X3b" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "HR" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "RBI" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "SB" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "CS" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "BB" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "SO" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "IBB" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "HBP" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "SH" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "SF" | ||
datatype: "int" | ||
comment: "comment" | ||
- name: "GIDP" | ||
datatype: "int" | ||
comment: "comment" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
{# Copyright 2017 Cargill Incorporated | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. #} | ||
|
||
impala-cmd={{ conf.impala_cmd }} | ||
|
||
integration-test: | ||
$(MAKE) clean | ||
$(MAKE) first-run | ||
$(MAKE) update | ||
|
||
sqoop-create: sqoop-create.sh #### Create Sqoop job | ||
./run-with-logging.sh ./sqoop-create.sh $@ | ||
|
||
sqoop-delete: ## Delete Sqoop job | ||
./run-with-logging.sh ./sqoop-delete.sh $@ | ||
|
||
sqoop-exec: ## Execute sqoop job | ||
./run-with-logging.sh ./sqoop-exec.sh $@ | ||
|
||
create-base-table: create-base-table.sql | ||
$(impala-cmd) create-base-table.sql | ||
|
||
create-incr-table: create-incr-table.sql | ||
$(impala-cmd) create-incr-table.sql | ||
|
||
create-merge-view: create-merge-view.sql | ||
$(impala-cmd) create-merge-view.sql | ||
|
||
create-report-table: create-report-table.sql | ||
$(impala-cmd) create-report-table.sql | ||
|
||
overwrite-base-table: overwrite-base-table.sql | ||
$(impala-cmd) overwrite-base-table.sql | ||
|
||
hdfs-clean: hdfs-delete.sh ## Delete parquet files from HDFS | ||
./run-with-logging.sh ./hdfs-delete.sh $@ | ||
|
||
move-1st-sqoop: move-1st-sqoop.sh | ||
./run-with-logging.sh ./move-1st-sqoop.sh $@ | ||
|
||
drop-raw-tables: drop-raw-tables.sql | ||
$(impala-cmd) drop-raw-tables.sql | ||
./run-with-logging.sh ./hdfs-delete.sh $@ | ||
|
||
hdfs-incr-clear: hdfs-incr-clear.sh | ||
./run-with-logging.sh ./hdfs-incr-clear.sh $@ | ||
|
||
drop-report-table: drop-report-table.sql | ||
$(impala-cmd) drop-report-table.sql | ||
./run-with-logging.sh ./hdfs-report-delete.sh $@ | ||
|
||
first-run: | ||
$(MAKE) create-base-table #keep this on top so that the hdfs directories are created by impala the lower lever user. Otherwise there might be access issue later. | ||
$(MAKE) sqoop-create | ||
$(MAKE) sqoop-exec | ||
$(MAKE) move-1st-sqoop | ||
$(MAKE) create-incr-table | ||
|
||
update: | ||
$(MAKE) sqoop-exec | ||
$(MAKE) create-merge-view | ||
$(MAKE) drop-report-table | ||
$(MAKE) create-report-table | ||
$(MAKE) overwrite-base-table | ||
$(MAKE) hdfs-incr-clear | ||
|
||
clean: | ||
$(MAKE) drop-raw-tables | ||
$(MAKE) drop-report-table | ||
$(MAKE) sqoop-delete | ||
|
||
targets: ## Print out a list of available targets | ||
@fgrep -h ": " $(MAKEFILE_LIST) | fgrep -v fgrep | sed -e 's/\\$$//' | sed -e 's/:.*//' | ||
|
||
help: | ||
@fgrep -h "##" $(MAKEFILE_LIST) | fgrep -v fgrep | sed -e 's/\\$$//' | sed -e 's/##//' |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
{# Copyright 2017 Cargill Incorporated | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. #} | ||
|
||
first-run-all: {%- for table in tables %} first-run-{{ table.id }} {%- endfor %} | ||
|
||
{%- for table in tables %} | ||
first-run-{{ table.id }}: | ||
$(MAKE) first-run -C {{ table.id }} | ||
{%- endfor %} | ||
|
||
update-all: {%- for table in tables %} update-{{ table.id }} {%- endfor %} | ||
|
||
{%- for table in tables %} | ||
update-{{ table.id }}: | ||
$(MAKE) update -C {{ table.id }} | ||
{%- endfor %} | ||
|
||
clean-all: | ||
{%- for table in tables %} | ||
$(MAKE) clean -C {{ table.id }} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is red here usually because it has spaces, should be changed to a tab (can only use tabes here in a Makefile) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems to be tab already: |
||
{%- endfor %} | ||
|
||
{%- for table in tables %} | ||
integration-test-{{ table.id }}: | ||
$(MAKE) integration-test -C {{ table.id }} | ||
{%- endfor %} | ||
|
||
integration-test-all: | ||
{%- for table in tables %} | ||
$(MAKE) integration-test -C {{ table.id }} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. switch to TAB There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems to be tab already: |
||
{%- endfor %} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
{# Copyright 2017 Cargill Incorporated | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. #} | ||
|
||
set sync_ddl=1; | ||
USE {{ conf.staging_database.name }}; | ||
|
||
CREATE EXTERNAL TABLE IF NOT EXISTS {{ table.destination.name }}_base ( | ||
{% for column in table.columns %} | ||
{{ column.name }} {{ map_datatypes(column).parquet }} COMMENT '{{ column.comment }}' | ||
{%- if not loop.last -%}, {% endif %} | ||
{%- endfor %}) | ||
STORED AS Parquet | ||
LOCATION '{{ conf.staging_database.path }}/{{ table.destination.name }}/base' | ||
TBLPROPERTIES( | ||
'parquet.compression'='SNAPPY', | ||
'SOURCE' = '{{ table.META_SOURCE }}', | ||
'SECURITY_CLASSIFICATION' = '{{ table.META_SECURITY_CLASSIFICATION }}', | ||
'LOAD_FREQUENCY' = '{{ table.META_LOAD_FREQUENCY }}', | ||
'CONTACT_INFO' = '{{ table.META_CONTACT_INFO }}' | ||
); | ||
|
||
COMPUTE STATS {{ table.destination.name }}_base |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary? What does this command have to do with the comment above it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took that line from the existing example, such as sqoop-parquet-hdfs-impala. I take it to mean cleaning the target directory before ingestion, which sounds right to me. If you are questioning the need for sudo. I'll tested it and it doesn't work w/o sudo it doesn't work w/ sudo root. It only works with sudo hdfs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok this one is fine