Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support url to download zip from HTTP(s) server using Range header #91

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions node_stream_zip.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ declare namespace StreamZip {
*/
fd?: number;

/**
* http(s) url to stream from
* The server should support HEAD request returning content-length and Range header
* @default undefined
*/
url?: string;

/**
* You will be able to work with entries inside zip archive,
* otherwise the only way to access them is entry event
Expand Down
276 changes: 232 additions & 44 deletions node_stream_zip.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/

let fs = require('fs');
const https = require('https');
const http = require('http');
const util = require('util');
const path = require('path');
const events = require('events');
Expand Down Expand Up @@ -136,7 +138,7 @@ const consts = {
};

const StreamZip = function (config) {
let fd, fileSize, chunkSize, op, centralDirectory, closed;
let fd, fileSize, chunkSize, op, centralDirectory, closed, url;
const ready = false,
that = this,
entries = config.storeEntries !== false ? {} : null,
Expand All @@ -146,7 +148,13 @@ const StreamZip = function (config) {
open();

function open() {
if (config.fd) {
if(config.url) {
url = new URL(config.url);
if(!['http:', 'https:'].includes(url.protocol)) {
throw new Error('Url should be http or https')
}
readFile();
} else if (config.fd) {
fd = config.fd;
readFile();
} else {
Expand All @@ -161,18 +169,31 @@ const StreamZip = function (config) {
}

function readFile() {
fs.fstat(fd, (err, stat) => {
if (err) {
return that.emit('error', err);
}
fileSize = stat.size;
chunkSize = config.chunkSize || Math.round(fileSize / 1000);
chunkSize = Math.max(
Math.min(chunkSize, Math.min(128 * 1024, fileSize)),
Math.min(1024, fileSize)
);
readCentralDirectory();
});
if(url){
const req = selectUrlLib(url).request(url, {method: 'HEAD'}, (res) => {
fileSize = parseInt(res.headers['content-length'], 10)
chunkSize = config.chunkSize || Math.round(fileSize / 1000);
chunkSize = Math.max(
Math.min(chunkSize, Math.min(128 * 1024, fileSize)),
Math.min(1024, fileSize)
);
readCentralDirectory();
})
req.end()
} else {
fs.fstat(fd, (err, stat) => {
if (err) {
return that.emit('error', err);
}
fileSize = stat.size;
chunkSize = config.chunkSize || Math.round(fileSize / 1000);
chunkSize = Math.max(
Math.min(chunkSize, Math.min(128 * 1024, fileSize)),
Math.min(1024, fileSize)
);
readCentralDirectory();
});
}
}

function readUntilFoundCallback(err, bytesRead) {
Expand Down Expand Up @@ -209,7 +230,7 @@ const StreamZip = function (config) {
function readCentralDirectory() {
const totalReadLength = Math.min(consts.ENDHDR + consts.MAXFILECOMMENT, fileSize);
op = {
win: new FileWindowBuffer(fd),
win: url ? new UrlWindowBuffer(url) : new FileWindowBuffer(fd),
totalReadLength,
minPos: fileSize - totalReadLength,
lastPos: fileSize,
Expand Down Expand Up @@ -311,7 +332,7 @@ const StreamZip = function (config) {

function readEntries() {
op = {
win: new FileWindowBuffer(fd),
win: url ? new UrlWindowBuffer(url) : new FileWindowBuffer(fd),
pos: centralDirectory.offset,
chunkSize,
entriesLeft: centralDirectory.volumeEntries,
Expand Down Expand Up @@ -348,7 +369,7 @@ const StreamZip = function (config) {
if (!config.skipEntryNameValidation) {
entry.validateName();
}
if (entries) {
if (entries && !(entry.name in entries)) {
entries[entry.name] = entry;
}
that.emit('entry', entry);
Expand Down Expand Up @@ -393,20 +414,29 @@ const StreamZip = function (config) {
return callback(err);
}
const offset = dataOffset(entry);
let entryStream = new EntryDataReaderStream(fd, offset, entry.compressedSize);
if (entry.method === consts.STORED) {
// nothing to do
} else if (entry.method === consts.DEFLATED) {
entryStream = entryStream.pipe(zlib.createInflateRaw());
let entryStream
if(url){
const req = selectUrlLib(url).get(url, {headers: {
'Range': `bytes=${offset}-${offset + entry.compressedSize - 1}`
}}, (res) => {
if(res.statusCode !== 206){
callback(new Error(`${url} server doesn't support Range header`))
}
try {
callback(null, extraEntryStreamAction(entry, res))
} catch(err) {
callback(err)
}
})
req.end()
} else {
return callback(new Error('Unknown compression method: ' + entry.method));
}
if (canVerifyCrc(entry)) {
entryStream = entryStream.pipe(
new EntryVerifyStream(entryStream, entry.crc, entry.size)
);
entryStream = new EntryDataReaderStream(fd, offset, entry.compressedSize);
try {
callback(null, extraEntryStreamAction(entry, entryStream))
} catch(err) {
callback(err)
}
}
callback(null, entryStream);
},
false
);
Expand Down Expand Up @@ -460,25 +490,46 @@ const StreamZip = function (config) {
if (!entry.isFile) {
return callback(new Error('Entry is not file'));
}
if (!fd) {
if (!fd && !url) {
return callback(new Error('Archive closed'));
}
if (url && sync) {
return callback(new Error('Can\'t do sync on url'));
}
const buffer = Buffer.alloc(consts.LOCHDR);
new FsRead(fd, buffer, 0, buffer.length, entry.offset, (err) => {
if (err) {
return callback(err);
}
let readEx;
try {
entry.readDataHeader(buffer);
if (entry.encrypted) {
readEx = new Error('Entry encrypted');
if(url){
new UrlRead(url, buffer, 0, buffer.length, entry.offset, (err) => {
if (err) {
return callback(err);
}
} catch (ex) {
readEx = ex;
}
callback(readEx, entry);
}).read(sync);
let readEx;
try {
entry.readDataHeader(buffer);
if (entry.encrypted) {
readEx = new Error('Entry encrypted');
}
} catch (ex) {
readEx = ex;
}
callback(readEx, entry);
}).read();
} else {
new FsRead(fd, buffer, 0, buffer.length, entry.offset, (err) => {
if (err) {
return callback(err);
}
let readEx;
try {
entry.readDataHeader(buffer);
if (entry.encrypted) {
readEx = new Error('Entry encrypted');
}
} catch (ex) {
readEx = ex;
}
callback(readEx, entry);
}).read(sync);
}
};

function dataOffset(entry) {
Expand All @@ -490,6 +541,23 @@ const StreamZip = function (config) {
return (entry.flags & 0x8) !== 0x8;
}

function extraEntryStreamAction(entry, entryStream) {
if (entry.method === consts.STORED) {
// nothing to do
} else if (entry.method === consts.DEFLATED) {
entryStream = entryStream.pipe(zlib.createInflateRaw());
} else {
throw new Error('Unknown compression method: ' + entry.method);
}
if (canVerifyCrc(entry)) {
return entryStream.pipe(
new EntryVerifyStream(entryStream, entry.crc, entry.size)
);
}
return entryStream
}


function extract(entry, outPath, callback) {
that.stream(entry, (err, stm) => {
if (err) {
Expand Down Expand Up @@ -945,6 +1013,56 @@ class ZipEntry {
}
}

class UrlRead {
constructor(url, buffer, offset, length, position, callback) {
this.url = url;
this.buffer = buffer;
this.offset = offset;
this.length = length;
this.position = position;
this.callback = callback;
this.bytesRead = 0;
this.waiting = false;
}

read() {
StreamZip.debugLog('read', this.position, this.bytesRead, this.length, this.offset);
this.waiting = true;
const req = selectUrlLib(this.url).get(this.url, {headers: {
'Range': `bytes=${this.position + this.bytesRead}-${this.position + this.length}`
}}, (res) => {
const chunks = []
if(res.statusCode !== 206){
throw new Error(`${this.url} server doesn't support Range header`)
}
res.on('data', (chunk) => {
chunks.push(chunk)
})
res.on('end', () => {
const data = Buffer.concat(chunks);
data.copy(this.buffer, this.offset + this.bytesRead)
this.readCallback(null, data.length - 1);
})
res.on('error', (err) => {
this.readCallback(err, null);
})
})
req.end();
}

readCallback(err, bytesRead) {
if (typeof bytesRead === 'number') {
this.bytesRead += bytesRead;
}
if (err || !bytesRead || this.bytesRead === this.length) {
this.waiting = false;
return this.callback(err, this.bytesRead);
} else {
this.read();
}
}
}

class FsRead {
constructor(fd, buffer, offset, length, position, callback) {
this.fd = fd;
Expand Down Expand Up @@ -1000,6 +1118,72 @@ class FsRead {
}
}

class UrlWindowBuffer {
constructor(url) {
this.position = 0;
this.buffer = Buffer.alloc(0);
this.url = url;
this.urlOp = null;
}

checkOp() {
if (this.urlOp && this.urlOp.waiting) {
throw new Error('Operation in progress');
}
}

read(pos, length, callback) {
this.checkOp();
if (this.buffer.length < length) {
this.buffer = Buffer.alloc(length);
}
this.position = pos;
this.urlOp = new UrlRead(this.url, this.buffer, 0, length, this.position, callback).read();
}

expandLeft(length, callback) {
this.checkOp();
this.buffer = Buffer.concat([Buffer.alloc(length), this.buffer]);
this.position -= length;
if (this.position < 0) {
this.position = 0;
}
this.urlOp = new UrlRead(this.url, this.buffer, 0, length, this.position, callback).read();
}

expandRight(length, callback) {
this.checkOp();
const offset = this.buffer.length;
this.buffer = Buffer.concat([this.buffer, Buffer.alloc(length)]);
this.urlOp = new UrlRead(
this.url,
this.buffer,
offset,
length,
this.position + offset,
callback
).read();
}

moveRight(length, callback, shift) {
this.checkOp();
if (shift) {
this.buffer.copy(this.buffer, 0, shift);
} else {
shift = 0;
}
this.position += shift;
this.urlOp = new UrlRead(
this.url,
this.buffer,
this.buffer.length - shift,
shift,
this.position + this.buffer.length - shift,
callback
).read();
}
}

class FileWindowBuffer {
constructor(fd) {
this.position = 0;
Expand Down Expand Up @@ -1179,6 +1363,10 @@ class CrcVerify {
}
}

function selectUrlLib(url) {
return url.protocol === 'https:' ? https : http
}

function parseZipTime(timebytes, datebytes) {
const timebits = toBits(timebytes, 16);
const datebits = toBits(datebytes, 16);
Expand Down
Loading