Skip to content

Commit

Permalink
Basic avro -> HBase loader for #11
Browse files Browse the repository at this point in the history
First implementation to load variant information into HBase from avro.
  • Loading branch information
Matthias Haimel committed May 21, 2015
1 parent 1aae75b commit ba7fceb
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.opencb.hpg.bigdata.core.models.Read;
import org.ga4gh.models.ReadAlignment;
import org.ga4gh.models.Variant;
Expand All @@ -68,6 +69,7 @@
import org.opencb.hpg.bigdata.core.converters.variation.VariantAvroEncoderTask;
import org.opencb.hpg.bigdata.core.converters.variation.VariantConverterContext;
import org.opencb.hpg.bigdata.tools.converters.mr.Bam2AvroMR;
import org.opencb.hpg.bigdata.tools.converters.mr.Variant2HbaseMR;
import org.opencb.hpg.bigdata.core.io.VcfBlockIterator;
import org.opencb.hpg.bigdata.core.io.avro.AvroFileWriter;
import org.opencb.hpg.bigdata.core.io.avro.AvroWriter;
Expand All @@ -93,6 +95,7 @@ public enum Conversion {
CRAM_2_AVRO ("cram2avro", "Save CRAM file as Global Alliance for Genomics and Health (ga4gh) in Avro format"),
AVRO_2_CRAM ("avro2cram", "Save Global Alliance for Genomics and Health (ga4gh) in Avro format as CRAM file"),
VCF_2_AVRO ("vcf2avro", "Save VCF file as Global Alliance for Genomics and Health (ga4gh) in Avro format"),
VARIANT_2_HBASE ("variant2hbase", "Load ga4gh avro variant file into HBase"),
AVRO_2_PARQUET ("avro2parquet", "Save Avro file in Parquet format"),
;

Expand Down Expand Up @@ -131,6 +134,7 @@ static public String getValidConversionString() {
Conversion.AVRO_2_BAM,
Conversion.CRAM_2_AVRO,
Conversion.VCF_2_AVRO,
Conversion.VARIANT_2_HBASE
// Conversion.AVRO_2_PARQUET,
};

Expand Down Expand Up @@ -211,6 +215,10 @@ public void execute() {
vcf2avro(convertCommandOptions.input, convertCommandOptions.output, convertCommandOptions.compression);
break;
}
case VARIANT_2_HBASE: {
variant2hbase(convertCommandOptions.input, convertCommandOptions.output);
break;
}

// case AVRO_2_PARQUET: {
// avro2parquet(ga4ghCommandOptions.input, ga4ghCommandOptions.output);
Expand Down Expand Up @@ -482,7 +490,15 @@ private void cram2avro(String input, String output, String codecName) throws IOE
}


private void vcf2avro(String input, String output, String compression) throws Exception {
private void variant2hbase(String input, String output) throws Exception {
Variant2HbaseMR mr = new Variant2HbaseMR();
String[] args = new String[]{"-i",input,"-t","VariantLoad"};
int run = ToolRunner.run(mr, args);
if(run != 0)
throw new IllegalStateException(String.format("Variant 2 HBase finished with %s !", run));
}

private void vcf2avro(String input, String output, String compression) throws Exception {
// String output = convertCommandOptions.output;
// String input = convertCommandOptions.input;
// String compression = convertCommandOptions.compression;
Expand Down
14 changes: 12 additions & 2 deletions hpg-bigdata-tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,10 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<!-- <dependency> <groupId>org.seqdoop</groupId> <artifactId>hadoop-bam</artifactId>
<version>7.0.0</version> </dependency> -->
Expand Down Expand Up @@ -100,6 +98,18 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/**
*
*/
package org.opencb.hpg.bigdata.tools.converters.mr;

import java.io.IOException;
import java.util.List;

import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.ga4gh.models.Call;
import org.ga4gh.models.Variant;
import org.opencb.commons.utils.CryptoUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Matthias Haimel [email protected]
*
*/
public class Variant2HbaseMR extends Mapper<AvroKey<Variant>, NullWritable, ImmutableBytesWritable, Put> implements Tool {
public final static byte[] COLUMN_FAMILY = Bytes.toBytes("d");
private final static String ROWKEY_SEPARATOR = "_";
private static final int SV_THRESHOLD = 50; // TODO update as needed

private final Logger log;
private Configuration config;

public Variant2HbaseMR() {
super();
log = LoggerFactory.getLogger(this.getClass().toString());
}

public Logger getLog() {
return log;
}

@Override
protected void setup(
Mapper<AvroKey<Variant>, NullWritable, ImmutableBytesWritable, Put>.Context context)
throws IOException, InterruptedException {
super.setup(context);
}

@Override
protected void map(
AvroKey<Variant> key,
NullWritable value,
Mapper<AvroKey<Variant>, NullWritable, ImmutableBytesWritable, Put>.Context context)
throws IOException, InterruptedException {
Variant variant = key.datum();

if(!isReference(variant)){ // is a variant (not just coverage info)
byte[] id = Bytes.toBytes(buildStorageId(variant));

Put put = new Put(id);

/* Ignore fields */
// List<CharSequence> ids = v.getAlleleIds(); // graph mode -> not supported

/* TODO fields - fine for first implementation*/
// v.getInfo()
// v.getNames()
// v.getEnd();

List<Call> calls = variant.getCalls();
for(Call call : calls){
addEntry(put,call);
}
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(id);

/* Submit data to HBase */
context.write(rowKey, put);
}
}

private boolean isReference(Variant variant) {
return null == variant.getAlternateBases() || variant.getAlternateBases().isEmpty();
}

private void addEntry(Put put, Call call) {
CharSequence id = call.getCallSetId();
String idStr = id.toString();
/* other possibility
* id = call.getCallSetName()
*/
put.addColumn(
COLUMN_FAMILY,
Bytes.toBytes(id.toString()),
Bytes.toBytes(call.toString()));
}

public String buildStorageId(Variant v) {
CharSequence chr = v.getReferenceName(); // TODO check for chr at chromosome name and remove it (maybe expect it to be done before.
StringBuilder builder = new StringBuilder(chr);
builder.append(ROWKEY_SEPARATOR);
builder.append(String.format("%012d", v.getStart()));
builder.append(ROWKEY_SEPARATOR);

if (v.getReferenceBases().length() < SV_THRESHOLD) {
builder.append(v.getReferenceBases());
} else {
builder.append(new String(CryptoUtils.encryptSha1(v.getReferenceBases().toString())));
}

builder.append(ROWKEY_SEPARATOR);

if(v.getAlternateBases().size() > 1)
throw new NotImplementedException("More than one alternate for same position not yet supported!!! for position " + builder.toString());

if(v.getAlternateBases().size() == 1){
CharSequence ab = v.getAlternateBases().get(0);
if (ab.length() < SV_THRESHOLD) {
builder.append(ab);
} else {
builder.append(new String(CryptoUtils.encryptSha1(ab.toString())));
}
}

return builder.toString();
}

@Override
public void setConf(Configuration conf) {
this.config = conf;
}

@Override
public Configuration getConf() {
return this.config;
}

@Override
public int run(String[] args) throws Exception {
String tablename = "test_table";
String inputfile = null;
for(int i = 0; i < args.length; ++i){
if(args[i] == "-t")
tablename = args[++i];
if(args[i] == "-i")
inputfile = args[++i];
}

setConf(HBaseConfiguration.addHbaseResources(getConf()));

Job job = Job.getInstance(getConf());
job.setJobName(this.getClass().getName() + "_" + tablename);
job.setJarByClass(this.getClass());

// input
AvroJob.setInputKeySchema(job, Variant.getClassSchema());
FileInputFormat.setInputPaths(job, new Path(inputfile));
job.setInputFormatClass(AvroKeyInputFormat.class);


// output -> Hbase
TableMapReduceUtil.initTableReducerJob(tablename, null, job);
job.setNumReduceTasks(0); // Write to table directory

// mapper
job.setMapperClass(Variant2HbaseMR.class);

// create Table if needed
createTableIfNeeded(tablename);

return job.waitForCompletion(true)?0:1;
}

/**
* Create HBase table if needed
*
* @param tablename
* @throws IOException
*/
public void createTableIfNeeded(String tablename) throws IOException {
TableName tname = TableName.valueOf(tablename);
try(
Connection con = ConnectionFactory.createConnection(getConf());
Table table = con.getTable(tname);
Admin admin = con.getAdmin();
){
if(!exist(tname, admin)){
HTableDescriptor descr = new HTableDescriptor(tname);
descr.addFamily(new HColumnDescriptor(COLUMN_FAMILY));
getLog().info(String.format("Create table '%s' in hbase!", tablename));
admin.createTable(descr);
}
}
}

private boolean exist(TableName tname, Admin admin) throws IOException {
for (TableName tn : admin.listTableNames()) {
if(tn.equals(tname)){
return true;
}
}
return false;
}
}
35 changes: 34 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
<biodata.version>0.4-SNAPSHOT</biodata.version>
<datastore.version>0.4-SNAPSHOT</datastore.version>
<commons-lib.version>3.0-SNAPSHOT</commons-lib.version>
<hbase.version>1.0.1</hbase.version>
<hadoop.version>2.6.0</hadoop.version>
<compileSource>1.8</compileSource>
<build.dir>build</build.dir>
</properties>
Expand Down Expand Up @@ -111,7 +113,38 @@
<artifactId>commons-lib</artifactId>
<version>${commons-lib.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<!-- <exclusions> -->
<!-- <exclusion> -->
<!-- <groupId>org.apache.hadoop</groupId> -->
<!-- <artifactId>hadoop-client</artifactId> -->
<!-- </exclusion> -->
<!-- </exclusions> -->
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>

<!-- General dependencies -->
<dependency>
<groupId>com.google.guava</groupId>
Expand Down

0 comments on commit ba7fceb

Please sign in to comment.