Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integrity integration #10

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions src/main/java/org/archive/hadoop/jobs/WEATGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.archive.extract.ExtractorOutput;
import org.archive.extract.ProducerUtils;
import org.archive.extract.ResourceFactoryMapper;
import org.archive.extract.RealCDXExtractorOutput;
import org.archive.extract.WATExtractorOutput;
import org.archive.extract.WETExtractorOutput;
import org.archive.format.json.JSONUtils;
Expand Down Expand Up @@ -54,6 +55,9 @@ public class WEATGenerator extends Configured implements Tool {

public static final Log LOG = LogFactory.getLog(WEATGenerator.class);

// TODO: modify this class to also output cdx files. Look at hadoop plugins for fetch and/or other classes in this project for hints.
// TODO: new property for cdx paths or something is likely needed, same as fetch...

public static class WEATGeneratorMapper extends MapReduceBase implements Mapper<Text, Text, Text, Text> {
private JobConf jobConf;

Expand Down Expand Up @@ -93,17 +97,25 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter
String inputBasename = inputPath.getName();
String watOutputBasename = "";
String wetOutputBasename = "";
String cdxWatOutputBasename = "";
String cdxWetOutputBasename = "";

if(path.endsWith(".gz")) {
watOutputBasename = inputBasename.substring(0,inputBasename.length()-3) + ".wat.gz";
wetOutputBasename = inputBasename.substring(0,inputBasename.length()-3) + ".wet.gz";
cdxWatOutputBasename = inputBasename.substring(0,inputBasename.length()-3) + ".cdxwat.gz";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This results in:

name.warc.gz      name.cdx.gz
name.warc.wat.gz  name.warc.wat.cdxwat.gz
name.warc.wet.gz  name.warc.wet.cdxwet.gz
  • "wat" is given twice
  • ".warc" is removed for CDX files derived from WARC files
    • should be the same for WAT/WET files
    • a CDX file does not follow the WARC format
    • (a WAT or WET file does)

Maybe the following looks better?

name.warc.gz      name.cdx.gz
name.warc.wat.gz  name.wat.cdx.gz
name.warc.wet.gz  name.wet.cdx.gz

cdxWetOutputBasename = inputBasename.substring(0,inputBasename.length()-3) + ".cdxwet.gz";
} else {
watOutputBasename = inputBasename + ".wat.gz";
wetOutputBasename = inputBasename + ".wet.gz";
cdxWatOutputBasename = inputBasename + ".cdxwat.gz";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above.

cdxWetOutputBasename = inputBasename + ".cdxwet.gz";
}

String watOutputFileString = basePath.toString() + "/wat/" + watOutputBasename;
String wetOutputFileString = basePath.toString() + "/wet/" + wetOutputBasename;
String cdxWetOutputFileString = basePath.toString() + "/cdxwet/" + cdxWetOutputBasename;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fixed output path. Do we want to have the CDX files for WAT/WET there?

The configuration for the CDX indexing uses different output paths, cf. https://github.com/commoncrawl/webarchive-indexing/blob/main/run_index_hadoop.sh

String cdxWatOutputFileString = basePath.toString() + "/cdxwat/" + cdxWatOutputBasename;

LOG.info("About to write out to " + watOutputFileString + " and " + wetOutputFileString);
if (this.jobConf.getBoolean("skipExisting", false)) {
Expand All @@ -123,6 +135,21 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter
FSDataOutputStream wetfsdOut = FileSystem.get(new java.net.URI(wetOutputFileString), this.jobConf).create(new Path(wetOutputFileString), false);
ExtractorOutput wetOut = new WETExtractorOutput(wetfsdOut, wetOutputBasename);

FSDataOutputStream cdxWetfsOut = null;
ExtractorOutput cdxWetOut = null;
FSDataOutputStream cdxWatfsOut = null;
ExtractorOutput cdxWatOut = null;
if ( this.jobConf.getBoolean("outputCDX") ) {
cdxWetfsOut = FileSystem.get(new java.net.URI(cdxWetOutputFileString), this.jobConf).create(new Path(cdxWetOutputFileString), false);
cdxWetOut = new RealCDXExtractorOutput(new PrintWriter(cdxWetfsOut));
cdxWatfsOut = FileSystem.get(new java.net.URI(cdxWatOutputFileString), this.jobConf).create(new Path(cdxWatOutputFileString), false);
cdxWatOut = new RealCDXExtractorOutput(new PrintWriter(cdxWatfsOut));
}
FSDataOutputStream cdxWetfsOut = FileSystem.get(new java.net.URI(cdxWetOutputFileString), this.jobConf).create(new Path(cdxWetOutputFileString), false);
ExtractorOutput cdxWetOut = new RealCDXExtractorOutput(new PrintWriter(cdxWetfsOut));
FSDataOutputStream cdxWatfsOut = FileSystem.get(new java.net.URI(cdxWatOutputFileString), this.jobConf).create(new Path(cdxWatOutputFileString), false);
ExtractorOutput cdxWatOut = new RealCDXExtractorOutput(new PrintWriter(cdxWatfsOut));

int count = 0;
Resource lr = null;
while(count < Integer.MAX_VALUE) {
Expand All @@ -143,6 +170,9 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter
LOG.info("Outputting new record " + count);
}
watOut.output(r);
if( cdxWatOut != null ) {
cdxWatOut.output(r);
}
if (lr != null && isMetaConcurrentTo(r, lr)) {
JSONArray payloadMetadata = JSONUtils.extractArray(r.getMetaData().getTopMetaData(),
"Envelope.Payload-Metadata.WARC-Metadata-Metadata.Metadata-Records");
Expand All @@ -152,6 +182,9 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter
}
if (lr != null) {
wetOut.output(lr);
if( cdxWetOut != null ) {
cdxWetOut.output(lr);
}
}
lr = r;
}
Expand All @@ -160,6 +193,12 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter
}
watfsdOut.close();
wetfsdOut.close();
if ( cdxWatfsdOut != null ) {
cdxWatfsdOut.close();
}
if ( cdxWetfsdOut != null ) {
cdxWetfsdOut.close();
}
} catch ( Exception e ) {
LOG.error( "Error processing file: " + path, e );
reporter.incrCounter("exporter", "errors", 1);
Expand Down Expand Up @@ -259,6 +298,7 @@ public int run( String[] args ) throws Exception {
// keep job running despite some failures in generating WATs
job.setBoolean("strictMode",false);
job.setBoolean("skipExisting", false);
job.setBoolean("outputCDX", false);

job.setOutputFormat(NullOutputFormat.class);
job.setOutputKeyClass(Text.class);
Expand All @@ -274,6 +314,9 @@ public int run( String[] args ) throws Exception {
} else if(args[arg].equals("-skipExisting")) {
job.setBoolean("skipExisting", true);
arg++;
} else if(args[arg].equals("-outputCDX")) {
job.setBoolean("outputCDX", true);
arg++;
} else {
break;
}
Expand Down