summaryrefslogtreecommitdiff
path: root/lib/fs.js
diff options
context:
space:
mode:
Diffstat (limited to 'lib/fs.js')
-rw-r--r--lib/fs.js710
1 files changed, 365 insertions, 345 deletions
diff --git a/lib/fs.js b/lib/fs.js
index 5bbcf057c..8188f7170 100644
--- a/lib/fs.js
+++ b/lib/fs.js
@@ -34,8 +34,10 @@ var fs = exports;
var Stream = require('stream').Stream;
var EventEmitter = require('events').EventEmitter;
+var Readable = Stream.Readable;
+var Writable = Stream.Writable;
+
var kMinPoolSpace = 128;
-var kPoolSize = 40 * 1024;
var O_APPEND = constants.O_APPEND || 0;
var O_CREAT = constants.O_CREAT || 0;
@@ -52,6 +54,65 @@ var O_WRONLY = constants.O_WRONLY || 0;
var isWindows = process.platform === 'win32';
+var DEBUG = process.env.NODE_DEBUG && /fs/.test(process.env.NODE_DEBUG);
+
+function rethrow() {
+ // Only enable in debug mode. A backtrace uses ~1000 bytes of heap space and
+ // is fairly slow to generate.
+ if (DEBUG) {
+ var backtrace = new Error;
+ return function(err) {
+ if (err) {
+ backtrace.message = err.message;
+ err = backtrace;
+ throw err;
+ }
+ };
+ }
+
+ return function(err) {
+ if (err) {
+ throw err; // Forgot a callback but don't know where? Use NODE_DEBUG=fs
+ }
+ };
+}
+
+function maybeCallback(cb) {
+ return typeof cb === 'function' ? cb : rethrow();
+}
+
+// Ensure that callbacks run in the global context. Only use this function
+// for callbacks that are passed to the binding layer, callbacks that are
+// invoked from JS already run in the proper scope.
+function makeCallback(cb) {
+ if (typeof cb !== 'function') {
+ return rethrow();
+ }
+
+ return function() {
+ return cb.apply(null, arguments);
+ };
+}
+
+function assertEncoding(encoding) {
+ if (encoding && !Buffer.isEncoding(encoding)) {
+ throw new Error('Unknown encoding: ' + encoding);
+ }
+}
+
+function nullCheck(path, callback) {
+ if (('' + path).indexOf('\u0000') !== -1) {
+ var er = new Error('Path must be a string without null bytes.');
+ if (!callback)
+ throw er;
+ process.nextTick(function() {
+ callback(er);
+ });
+ return false;
+ }
+ return true;
+}
+
fs.Stats = binding.Stats;
fs.Stats.prototype._checkModeProperty = function(property) {
@@ -87,13 +148,16 @@ fs.Stats.prototype.isSocket = function() {
};
fs.exists = function(path, callback) {
- binding.stat(pathModule._makeLong(path), function(err, stats) {
+ if (!nullCheck(path, cb)) return;
+ binding.stat(pathModule._makeLong(path), cb);
+ function cb(err, stats) {
if (callback) callback(err ? false : true);
- });
+ }
};
fs.existsSync = function(path) {
try {
+ nullCheck(path);
binding.stat(pathModule._makeLong(path));
return true;
} catch (e) {
@@ -103,8 +167,9 @@ fs.existsSync = function(path) {
fs.readFile = function(path, encoding_) {
var encoding = typeof(encoding_) === 'string' ? encoding_ : null;
- var callback = arguments[arguments.length - 1];
- if (typeof(callback) !== 'function') callback = function() {};
+ var callback = maybeCallback(arguments[arguments.length - 1]);
+
+ assertEncoding(encoding);
// first, stat the file, so we know the size.
var size;
@@ -179,6 +244,8 @@ fs.readFile = function(path, encoding_) {
};
fs.readFileSync = function(path, encoding) {
+ assertEncoding(encoding);
+
var fd = fs.openSync(path, constants.O_RDONLY, 438 /*=0666*/);
var size;
@@ -284,21 +351,6 @@ Object.defineProperty(exports, '_stringToFlags', {
});
-// Ensure that callbacks run in the global context. Only use this function
-// for callbacks that are passed to the binding layer, callbacks that are
-// invoked from JS already run in the proper scope.
-function makeCallback(cb) {
- if (typeof cb !== 'function') {
- // faster than returning a ref to a global no-op function
- return function() {};
- }
-
- return function() {
- return cb.apply(null, arguments);
- };
-}
-
-
// Yes, the follow could be easily DRYed up but I provide the explicit
// list to make the arguments clear.
@@ -327,6 +379,7 @@ fs.open = function(path, flags, mode, callback) {
callback = makeCallback(arguments[arguments.length - 1]);
mode = modeNum(mode, 438 /*=0666*/);
+ if (!nullCheck(path, callback)) return;
binding.open(pathModule._makeLong(path),
stringToFlags(flags),
mode,
@@ -335,6 +388,7 @@ fs.open = function(path, flags, mode, callback) {
fs.openSync = function(path, flags, mode) {
mode = modeNum(mode, 438 /*=0666*/);
+ nullCheck(path);
return binding.open(pathModule._makeLong(path), stringToFlags(flags), mode);
};
@@ -343,6 +397,9 @@ fs.read = function(fd, buffer, offset, length, position, callback) {
// legacy string interface (fd, length, position, encoding, callback)
var cb = arguments[4],
encoding = arguments[3];
+
+ assertEncoding(encoding);
+
position = arguments[2];
length = arguments[1];
buffer = new Buffer(length);
@@ -371,6 +428,9 @@ fs.readSync = function(fd, buffer, offset, length, position) {
// legacy string interface (fd, length, position, encoding, callback)
legacy = true;
var encoding = arguments[3];
+
+ assertEncoding(encoding);
+
position = arguments[2];
length = arguments[1];
buffer = new Buffer(length);
@@ -392,6 +452,7 @@ fs.write = function(fd, buffer, offset, length, position, callback) {
// legacy string interface (fd, data, position, encoding, callback)
callback = arguments[4];
position = arguments[2];
+ assertEncoding(arguments[3]);
buffer = new Buffer('' + arguments[1], arguments[3]);
offset = 0;
@@ -407,9 +468,11 @@ fs.write = function(fd, buffer, offset, length, position, callback) {
return;
}
+ callback = maybeCallback(callback);
+
function wrapper(err, written) {
// Retain a reference to buffer so that it can't be GC'ed too soon.
- callback && callback(err, written || 0, buffer);
+ callback(err, written || 0, buffer);
}
binding.write(fd, buffer, offset, length, position, wrapper);
@@ -419,6 +482,7 @@ fs.writeSync = function(fd, buffer, offset, length, position) {
if (!Buffer.isBuffer(buffer)) {
// legacy string interface (fd, data, position, encoding)
position = arguments[2];
+ assertEncoding(arguments[3]);
buffer = new Buffer('' + arguments[1], arguments[3]);
offset = 0;
@@ -430,29 +494,86 @@ fs.writeSync = function(fd, buffer, offset, length, position) {
};
fs.rename = function(oldPath, newPath, callback) {
+ callback = makeCallback(callback);
+ if (!nullCheck(oldPath, callback)) return;
+ if (!nullCheck(newPath, callback)) return;
binding.rename(pathModule._makeLong(oldPath),
pathModule._makeLong(newPath),
- makeCallback(callback));
+ callback);
};
fs.renameSync = function(oldPath, newPath) {
+ nullCheck(oldPath);
+ nullCheck(newPath);
return binding.rename(pathModule._makeLong(oldPath),
pathModule._makeLong(newPath));
};
-fs.truncate = function(fd, len, callback) {
- binding.truncate(fd, len, makeCallback(callback));
+fs.truncate = function(path, len, callback) {
+ if (typeof path === 'number') {
+ // legacy
+ return fs.ftruncate(path, len, callback);
+ }
+ if (typeof len === 'function') {
+ callback = len;
+ len = 0;
+ } else if (typeof len === 'undefined') {
+ len = 0;
+ }
+ callback = maybeCallback(callback);
+ fs.open(path, 'w', function(er, fd) {
+ if (er) return callback(er);
+ binding.ftruncate(fd, len, function(er) {
+ fs.close(fd, function(er2) {
+ callback(er || er2);
+ });
+ });
+ });
+};
+
+fs.truncateSync = function(path, len) {
+ if (typeof path === 'number') {
+ // legacy
+ return fs.ftruncateSync(path, len);
+ }
+ if (typeof len === 'undefined') {
+ len = 0;
+ }
+ // allow error to be thrown, but still close fd.
+ var fd = fs.openSync(path, 'w');
+ try {
+ var ret = fs.ftruncateSync(fd, len);
+ } finally {
+ fs.closeSync(fd);
+ }
+ return ret;
+};
+
+fs.ftruncate = function(fd, len, callback) {
+ if (typeof len === 'function') {
+ callback = len;
+ len = 0;
+ } else if (typeof len === 'undefined') {
+ len = 0;
+ }
+ binding.ftruncate(fd, len, makeCallback(callback));
};
-fs.truncateSync = function(fd, len) {
- return binding.truncate(fd, len);
+fs.ftruncateSync = function(fd, len) {
+ if (typeof len === 'undefined') {
+ len = 0;
+ }
+ return binding.ftruncate(fd, len);
};
fs.rmdir = function(path, callback) {
- binding.rmdir(pathModule._makeLong(path), makeCallback(callback));
+ callback = makeCallback(callback);
+ if (!nullCheck(path, callback)) return;
+ binding.rmdir(pathModule._makeLong(path), callback);
};
fs.rmdirSync = function(path) {
+ nullCheck(path);
return binding.rmdir(pathModule._makeLong(path));
};
@@ -474,29 +595,27 @@ fs.fsyncSync = function(fd) {
fs.mkdir = function(path, mode, callback) {
if (typeof mode === 'function') callback = mode;
+ callback = makeCallback(callback);
+ if (!nullCheck(path, callback)) return;
binding.mkdir(pathModule._makeLong(path),
modeNum(mode, 511 /*=0777*/),
- makeCallback(callback));
+ callback);
};
fs.mkdirSync = function(path, mode) {
+ nullCheck(path);
return binding.mkdir(pathModule._makeLong(path),
modeNum(mode, 511 /*=0777*/));
};
-fs.sendfile = function(outFd, inFd, inOffset, length, callback) {
- binding.sendfile(outFd, inFd, inOffset, length, makeCallback(callback));
-};
-
-fs.sendfileSync = function(outFd, inFd, inOffset, length) {
- return binding.sendfile(outFd, inFd, inOffset, length);
-};
-
fs.readdir = function(path, callback) {
- binding.readdir(pathModule._makeLong(path), makeCallback(callback));
+ callback = makeCallback(callback);
+ if (!nullCheck(path, callback)) return;
+ binding.readdir(pathModule._makeLong(path), callback);
};
fs.readdirSync = function(path) {
+ nullCheck(path);
return binding.readdir(pathModule._makeLong(path));
};
@@ -505,11 +624,15 @@ fs.fstat = function(fd, callback) {
};
fs.lstat = function(path, callback) {
- binding.lstat(pathModule._makeLong(path), makeCallback(callback));
+ callback = makeCallback(callback);
+ if (!nullCheck(path, callback)) return;
+ binding.lstat(pathModule._makeLong(path), callback);
};
fs.stat = function(path, callback) {
- binding.stat(pathModule._makeLong(path), makeCallback(callback));
+ callback = makeCallback(callback);
+ if (!nullCheck(path, callback)) return;
+ binding.stat(pathModule._makeLong(path), callback);
};
fs.fstatSync = function(fd) {
@@ -517,18 +640,23 @@ fs.fstatSync = function(fd) {
};
fs.lstatSync = function(path) {
+ nullCheck(path);
return binding.lstat(pathModule._makeLong(path));
};
fs.statSync = function(path) {
+ nullCheck(path);
return binding.stat(pathModule._makeLong(path));
};
fs.readlink = function(path, callback) {
- binding.readlink(pathModule._makeLong(path), makeCallback(callback));
+ callback = makeCallback(callback);
+ if (!nullCheck(path, callback)) return;
+ binding.readlink(pathModule._makeLong(path), callback);
};
fs.readlinkSync = function(path) {
+ nullCheck(path);
return binding.readlink(pathModule._makeLong(path));
};
@@ -549,6 +677,9 @@ fs.symlink = function(destination, path, type_, callback) {
var type = (typeof type_ === 'string' ? type_ : null);
var callback = makeCallback(arguments[arguments.length - 1]);
+ if (!nullCheck(destination, callback)) return;
+ if (!nullCheck(path, callback)) return;
+
binding.symlink(preprocessSymlinkDestination(destination, type),
pathModule._makeLong(path),
type,
@@ -558,27 +689,39 @@ fs.symlink = function(destination, path, type_, callback) {
fs.symlinkSync = function(destination, path, type) {
type = (typeof type === 'string' ? type : null);
+ nullCheck(destination);
+ nullCheck(path);
+
return binding.symlink(preprocessSymlinkDestination(destination, type),
pathModule._makeLong(path),
type);
};
fs.link = function(srcpath, dstpath, callback) {
+ callback = makeCallback(callback);
+ if (!nullCheck(srcpath, callback)) return;
+ if (!nullCheck(dstpath, callback)) return;
+
binding.link(pathModule._makeLong(srcpath),
pathModule._makeLong(dstpath),
- makeCallback(callback));
+ callback);
};
fs.linkSync = function(srcpath, dstpath) {
+ nullCheck(srcpath);
+ nullCheck(dstpath);
return binding.link(pathModule._makeLong(srcpath),
pathModule._makeLong(dstpath));
};
fs.unlink = function(path, callback) {
- binding.unlink(pathModule._makeLong(path), makeCallback(callback));
+ callback = makeCallback(callback);
+ if (!nullCheck(path, callback)) return;
+ binding.unlink(pathModule._makeLong(path), callback);
};
fs.unlinkSync = function(path) {
+ nullCheck(path);
return binding.unlink(pathModule._makeLong(path));
};
@@ -592,7 +735,7 @@ fs.fchmodSync = function(fd, mode) {
if (constants.hasOwnProperty('O_SYMLINK')) {
fs.lchmod = function(path, mode, callback) {
- callback = callback || (function() {});
+ callback = maybeCallback(callback);
fs.open(path, constants.O_WRONLY | constants.O_SYMLINK, function(err, fd) {
if (err) {
callback(err);
@@ -631,18 +774,21 @@ if (constants.hasOwnProperty('O_SYMLINK')) {
fs.chmod = function(path, mode, callback) {
+ callback = makeCallback(callback);
+ if (!nullCheck(path, callback)) return;
binding.chmod(pathModule._makeLong(path),
modeNum(mode),
- makeCallback(callback));
+ callback);
};
fs.chmodSync = function(path, mode) {
+ nullCheck(path);
return binding.chmod(pathModule._makeLong(path), modeNum(mode));
};
if (constants.hasOwnProperty('O_SYMLINK')) {
fs.lchown = function(path, uid, gid, callback) {
- callback = callback || (function() {});
+ callback = maybeCallback(callback);
fs.open(path, constants.O_WRONLY | constants.O_SYMLINK, function(err, fd) {
if (err) {
callback(err);
@@ -667,10 +813,13 @@ fs.fchownSync = function(fd, uid, gid) {
};
fs.chown = function(path, uid, gid, callback) {
- binding.chown(pathModule._makeLong(path), uid, gid, makeCallback(callback));
+ callback = makeCallback(callback);
+ if (!nullCheck(path, callback)) return;
+ binding.chown(pathModule._makeLong(path), uid, gid, callback);
};
fs.chownSync = function(path, uid, gid) {
+ nullCheck(path);
return binding.chown(pathModule._makeLong(path), uid, gid);
};
@@ -690,13 +839,16 @@ function toUnixTimestamp(time) {
fs._toUnixTimestamp = toUnixTimestamp;
fs.utimes = function(path, atime, mtime, callback) {
+ callback = makeCallback(callback);
+ if (!nullCheck(path, callback)) return;
binding.utimes(pathModule._makeLong(path),
toUnixTimestamp(atime),
toUnixTimestamp(mtime),
- makeCallback(callback));
+ callback);
};
fs.utimesSync = function(path, atime, mtime) {
+ nullCheck(path);
atime = toUnixTimestamp(atime);
mtime = toUnixTimestamp(mtime);
binding.utimes(pathModule._makeLong(path), atime, mtime);
@@ -715,8 +867,7 @@ fs.futimesSync = function(fd, atime, mtime) {
};
function writeAll(fd, buffer, offset, length, position, callback) {
- var callback_ = arguments[arguments.length - 1];
- callback = (typeof(callback_) == 'function' ? callback_ : null);
+ callback = maybeCallback(arguments[arguments.length - 1]);
// write(fd, buffer, offset, length, position, callback)
fs.write(fd, buffer, offset, length, position, function(writeErr, written) {
@@ -739,8 +890,9 @@ function writeAll(fd, buffer, offset, length, position, callback) {
fs.writeFile = function(path, data, encoding_, callback) {
var encoding = (typeof(encoding_) == 'string' ? encoding_ : 'utf8');
- var callback_ = arguments[arguments.length - 1];
- callback = (typeof(callback_) == 'function' ? callback_ : null);
+ assertEncoding(encoding);
+
+ callback = maybeCallback(arguments[arguments.length - 1]);
fs.open(path, 'w', 438 /*=0666*/, function(openErr, fd) {
if (openErr) {
if (callback) callback(openErr);
@@ -753,6 +905,8 @@ fs.writeFile = function(path, data, encoding_, callback) {
};
fs.writeFileSync = function(path, data, encoding) {
+ assertEncoding(encoding);
+
var fd = fs.openSync(path, 'w');
if (!Buffer.isBuffer(data)) {
data = new Buffer('' + data, encoding || 'utf8');
@@ -770,8 +924,9 @@ fs.writeFileSync = function(path, data, encoding) {
fs.appendFile = function(path, data, encoding_, callback) {
var encoding = (typeof(encoding_) == 'string' ? encoding_ : 'utf8');
- var callback_ = arguments[arguments.length - 1];
- callback = (typeof(callback_) == 'function' ? callback_ : null);
+ assertEncoding(encoding);
+
+ callback = maybeCallback(arguments[arguments.length - 1]);
fs.open(path, 'a', 438 /*=0666*/, function(err, fd) {
if (err) return callback(err);
@@ -781,6 +936,8 @@ fs.appendFile = function(path, data, encoding_, callback) {
};
fs.appendFileSync = function(path, data, encoding) {
+ assertEncoding(encoding);
+
var fd = fs.openSync(path, 'a');
if (!Buffer.isBuffer(data)) {
data = new Buffer('' + data, encoding || 'utf8');
@@ -821,7 +978,7 @@ function FSWatcher() {
this._handle.onchange = function(status, event, filename) {
if (status) {
self._handle.close();
- self.emit('error', errnoException(errno, 'watch'));
+ self.emit('error', errnoException(process._errno, 'watch'));
} else {
self.emit('change', event, filename);
}
@@ -830,11 +987,12 @@ function FSWatcher() {
util.inherits(FSWatcher, EventEmitter);
FSWatcher.prototype.start = function(filename, persistent) {
+ nullCheck(filename);
var r = this._handle.start(pathModule._makeLong(filename), persistent);
if (r) {
this._handle.close();
- throw errnoException(errno, 'watch');
+ throw errnoException(process._errno, 'watch');
}
};
@@ -843,6 +1001,7 @@ FSWatcher.prototype.close = function() {
};
fs.watch = function(filename) {
+ nullCheck(filename);
var watcher;
var options;
var listener;
@@ -897,6 +1056,7 @@ util.inherits(StatWatcher, EventEmitter);
StatWatcher.prototype.start = function(filename, persistent, interval) {
+ nullCheck(filename);
this._handle.start(pathModule._makeLong(filename), persistent, interval);
};
@@ -914,6 +1074,7 @@ function inStatWatchers(filename) {
fs.watchFile = function(filename) {
+ nullCheck(filename);
var stat;
var listener;
@@ -947,6 +1108,7 @@ fs.watchFile = function(filename) {
};
fs.unwatchFile = function(filename, listener) {
+ nullCheck(filename);
if (!inStatWatchers(filename)) return;
var stat = statWatchers[filename];
@@ -1083,7 +1245,7 @@ fs.realpathSync = function realpathSync(p, cache) {
fs.realpath = function realpath(p, cache, cb) {
if (typeof cb !== 'function') {
- cb = cache;
+ cb = maybeCallback(cache);
cache = null;
}
@@ -1207,8 +1369,8 @@ fs.realpath = function realpath(p, cache, cb) {
var pool;
-function allocNewPool() {
- pool = new Buffer(kPoolSize);
+function allocNewPool(poolSize) {
+ pool = new Buffer(poolSize);
pool.used = 0;
}
@@ -1218,32 +1380,31 @@ fs.createReadStream = function(path, options) {
return new ReadStream(path, options);
};
-var ReadStream = fs.ReadStream = function(path, options) {
- if (!(this instanceof ReadStream)) return new ReadStream(path, options);
+util.inherits(ReadStream, Readable);
+fs.ReadStream = ReadStream;
- Stream.call(this);
+function ReadStream(path, options) {
+ if (!(this instanceof ReadStream))
+ return new ReadStream(path, options);
- var self = this;
-
- this.path = path;
- this.fd = null;
- this.readable = true;
- this.paused = false;
+ // a little bit bigger buffer and water marks by default
+ options = util._extend({
+ bufferSize: 64 * 1024,
+ highWaterMark: 64 * 1024
+ }, options || {});
- this.flags = 'r';
- this.mode = 438; /*=0666*/
- this.bufferSize = 64 * 1024;
+ Readable.call(this, options);
- options = options || {};
-
- // Mixin options into this
- var keys = Object.keys(options);
- for (var index = 0, length = keys.length; index < length; index++) {
- var key = keys[index];
- this[key] = options[key];
- }
+ this.path = path;
+ this.fd = options.hasOwnProperty('fd') ? options.fd : null;
+ this.flags = options.hasOwnProperty('flags') ? options.flags : 'r';
+ this.mode = options.hasOwnProperty('mode') ? options.mode : 438; /*=0666*/
- if (this.encoding) this.setEncoding(this.encoding);
+ this.start = options.hasOwnProperty('start') ? options.start : undefined;
+ this.end = options.hasOwnProperty('end') ? options.end : undefined;
+ this.autoClose = options.hasOwnProperty('autoClose') ?
+ options.autoClose : true;
+ this.pos = undefined;
if (this.start !== undefined) {
if ('number' !== typeof this.start) {
@@ -1262,192 +1423,155 @@ var ReadStream = fs.ReadStream = function(path, options) {
this.pos = this.start;
}
- if (this.fd !== null) {
- process.nextTick(function() {
- self._read();
- });
- return;
- }
+ if (typeof this.fd !== 'number')
+ this.open();
- fs.open(this.path, this.flags, this.mode, function(err, fd) {
- if (err) {
- self.emit('error', err);
- self.readable = false;
+ this.on('end', function() {
+ if (this.autoClose) {
+ this.destroy();
+ }
+ });
+}
+
+fs.FileReadStream = fs.ReadStream; // support the legacy name
+
+ReadStream.prototype.open = function() {
+ var self = this;
+ fs.open(this.path, this.flags, this.mode, function(er, fd) {
+ if (er) {
+ if (this.autoClose) {
+ self.destroy();
+ }
+ self.emit('error', er);
return;
}
self.fd = fd;
self.emit('open', fd);
- self._read();
+ // start the flow of data.
+ self.read();
});
};
-util.inherits(ReadStream, Stream);
-fs.FileReadStream = fs.ReadStream; // support the legacy name
-
-ReadStream.prototype.setEncoding = function(encoding) {
- var StringDecoder = require('string_decoder').StringDecoder; // lazy load
- this._decoder = new StringDecoder(encoding);
-};
-
-
-ReadStream.prototype._read = function() {
- var self = this;
- if (!this.readable || this.paused || this.reading) return;
+ReadStream.prototype._read = function(n, cb) {
+ if (typeof this.fd !== 'number')
+ return this.once('open', function() {
+ this._read(n, cb);
+ });
- this.reading = true;
+ if (this.destroyed)
+ return;
if (!pool || pool.length - pool.used < kMinPoolSpace) {
// discard the old pool. Can't add to the free list because
// users might have refernces to slices on it.
pool = null;
- allocNewPool();
+ allocNewPool(this._readableState.bufferSize);
}
- // Grab another reference to the pool in the case that while we're in the
- // thread pool another read() finishes up the pool, and allocates a new
- // one.
+ // Grab another reference to the pool in the case that while we're
+ // in the thread pool another read() finishes up the pool, and
+ // allocates a new one.
var thisPool = pool;
- var toRead = Math.min(pool.length - pool.used, ~~this.bufferSize);
+ var toRead = Math.min(pool.length - pool.used, n);
var start = pool.used;
- if (this.pos !== undefined) {
+ if (this.pos !== undefined)
toRead = Math.min(this.end - this.pos + 1, toRead);
- }
- function afterRead(err, bytesRead) {
- self.reading = false;
- if (err) {
- fs.close(self.fd, function() {
- self.fd = null;
- self.emit('error', err);
- self.readable = false;
- });
- return;
- }
+ // already read everything we were supposed to read!
+ // treat as EOF.
+ if (toRead <= 0)
+ return cb();
- if (bytesRead === 0) {
- self.emit('end');
- self.destroy();
- return;
- }
-
- var b = thisPool.slice(start, start + bytesRead);
+ // the actual read.
+ var self = this;
+ fs.read(this.fd, pool, pool.used, toRead, this.pos, onread);
- // Possible optimizition here?
- // Reclaim some bytes if bytesRead < toRead?
- // Would need to ensure that pool === thisPool.
+ // move the pool positions, and internal position for reading.
+ if (this.pos !== undefined)
+ this.pos += toRead;
+ pool.used += toRead;
- // do not emit events if the stream is paused
- if (self.paused) {
- self.buffer = b;
- return;
+ function onread(er, bytesRead) {
+ if (er) {
+ if (self.autoClose) {
+ self.destroy();
+ }
+ return cb(er);
}
- // do not emit events anymore after we declared the stream unreadable
- if (!self.readable) return;
-
- self._emitData(b);
- self._read();
- }
-
- fs.read(this.fd, pool, pool.used, toRead, this.pos, afterRead);
+ var b = null;
+ if (bytesRead > 0)
+ b = thisPool.slice(start, start + bytesRead);
- if (this.pos !== undefined) {
- this.pos += toRead;
+ cb(null, b);
}
- pool.used += toRead;
};
-ReadStream.prototype._emitData = function(d) {
- if (this._decoder) {
- var string = this._decoder.write(d);
- if (string.length) this.emit('data', string);
- } else {
- this.emit('data', d);
- }
+ReadStream.prototype.destroy = function() {
+ if (this.destroyed)
+ return;
+ this.destroyed = true;
+
+ if ('number' === typeof this.fd)
+ this.close();
};
-ReadStream.prototype.destroy = function(cb) {
+ReadStream.prototype.close = function(cb) {
var self = this;
-
- if (!this.readable) {
- if (cb) process.nextTick(function() { cb(null); });
- return;
+ if (cb)
+ this.once('close', cb);
+ if (this.closed || 'number' !== typeof this.fd) {
+ if ('number' !== typeof this.fd) {
+ this.once('open', close);
+ return;
+ }
+ return process.nextTick(this.emit.bind(this, 'close'));
}
- this.readable = false;
-
- function close() {
- fs.close(self.fd, function(err) {
- if (err) {
- if (cb) cb(err);
- self.emit('error', err);
- return;
- }
+ this.closed = true;
+ close();
- if (cb) cb(null);
- self.emit('close');
+ function close(fd) {
+ fs.close(fd || self.fd, function(er) {
+ if (er)
+ self.emit('error', er);
+ else
+ self.emit('close');
});
- }
-
- if (this.fd === null) {
- this.addListener('open', close);
- } else {
- close();
+ self.fd = null;
}
};
-ReadStream.prototype.pause = function() {
- this.paused = true;
-};
-
-
-ReadStream.prototype.resume = function() {
- this.paused = false;
-
- if (this.buffer) {
- var buffer = this.buffer;
- this.buffer = null;
- this._emitData(buffer);
- }
-
- // hasn't opened yet.
- if (null == this.fd) return;
-
- this._read();
-};
-
fs.createWriteStream = function(path, options) {
return new WriteStream(path, options);
};
-var WriteStream = fs.WriteStream = function(path, options) {
- if (!(this instanceof WriteStream)) return new WriteStream(path, options);
+util.inherits(WriteStream, Writable);
+fs.WriteStream = WriteStream;
+function WriteStream(path, options) {
+ if (!(this instanceof WriteStream))
+ return new WriteStream(path, options);
- Stream.call(this);
+ options = options || {};
+
+ Writable.call(this, options);
this.path = path;
this.fd = null;
- this.writable = true;
- this.flags = 'w';
- this.encoding = 'binary';
- this.mode = 438; /*=0666*/
- this.bytesWritten = 0;
-
- options = options || {};
+ this.fd = options.hasOwnProperty('fd') ? options.fd : null;
+ this.flags = options.hasOwnProperty('flags') ? options.flags : 'w';
+ this.mode = options.hasOwnProperty('mode') ? options.mode : 438; /*=0666*/
- // Mixin options into this
- var keys = Object.keys(options);
- for (var index = 0, length = keys.length; index < length; index++) {
- var key = keys[index];
- this[key] = options[key];
- }
+ this.start = options.hasOwnProperty('start') ? options.start : undefined;
+ this.pos = undefined;
+ this.bytesWritten = 0;
if (this.start !== undefined) {
if ('number' !== typeof this.start) {
@@ -1460,159 +1584,54 @@ var WriteStream = fs.WriteStream = function(path, options) {
this.pos = this.start;
}
- this.busy = false;
- this._queue = [];
+ if ('number' !== typeof this.fd)
+ this.open();
- if (this.fd === null) {
- this._open = fs.open;
- this._queue.push([this._open, this.path, this.flags, this.mode, undefined]);
- this.flush();
- }
-};
-util.inherits(WriteStream, Stream);
+ // dispose on finish.
+ this.once('finish', this.close);
+}
fs.FileWriteStream = fs.WriteStream; // support the legacy name
-WriteStream.prototype.flush = function() {
- if (this.busy) return;
- var self = this;
-
- var args = this._queue.shift();
- if (!args) {
- if (this.drainable) { this.emit('drain'); }
- return;
- }
-
- this.busy = true;
-
- var method = args.shift(),
- cb = args.pop();
-
- args.push(function(err) {
- self.busy = false;
-
- if (err) {
- self.writable = false;
-
- function emit() {
- self.fd = null;
- if (cb) cb(err);
- self.emit('error', err);
- }
-
- if (self.fd === null) {
- emit();
- } else {
- fs.close(self.fd, emit);
- }
-
- return;
- }
-
- if (method == fs.write) {
- self.bytesWritten += arguments[1];
- if (cb) {
- // write callback
- cb(null, arguments[1]);
- }
-
- } else if (method === self._open) {
- // save reference for file pointer
- self.fd = arguments[1];
- self.emit('open', self.fd);
- } else if (method === fs.close) {
- // stop flushing after close
- if (cb) {
- cb(null);
- }
- self.emit('close');
+WriteStream.prototype.open = function() {
+ fs.open(this.path, this.flags, this.mode, function(er, fd) {
+ if (er) {
+ this.destroy();
+ this.emit('error', er);
return;
}
- self.flush();
- });
-
- // Inject the file pointer
- if (method !== self._open) {
- args.unshift(this.fd);
- }
-
- method.apply(this, args);
+ this.fd = fd;
+ this.emit('open', fd);
+ }.bind(this));
};
-WriteStream.prototype.write = function(data) {
- if (!this.writable) {
- this.emit('error', new Error('stream not writable'));
- return false;
- }
- this.drainable = true;
+WriteStream.prototype._write = function(data, cb) {
+ if (!Buffer.isBuffer(data))
+ return this.emit('error', new Error('Invalid data'));
- var cb;
- if (typeof(arguments[arguments.length - 1]) == 'function') {
- cb = arguments[arguments.length - 1];
- }
+ if (typeof this.fd !== 'number')
+ return this.once('open', this._write.bind(this, data, cb));
- if (!Buffer.isBuffer(data)) {
- var encoding = 'utf8';
- if (typeof(arguments[1]) == 'string') encoding = arguments[1];
- data = new Buffer('' + data, encoding);
- }
-
- this._queue.push([fs.write, data, 0, data.length, this.pos, cb]);
+ var self = this;
+ fs.write(this.fd, data, 0, data.length, this.pos, function(er, bytes) {
+ if (er) {
+ self.destroy();
+ return cb(er);
+ }
+ self.bytesWritten += bytes;
+ cb();
+ });
- if (this.pos !== undefined) {
+ if (this.pos !== undefined)
this.pos += data.length;
- }
-
- this.flush();
-
- return false;
};
-WriteStream.prototype.end = function(data, encoding, cb) {
- if (typeof(data) === 'function') {
- cb = data;
- } else if (typeof(encoding) === 'function') {
- cb = encoding;
- this.write(data);
- } else if (arguments.length > 0) {
- this.write(data, encoding);
- }
- this.writable = false;
- this._queue.push([fs.close, cb]);
- this.flush();
-};
-
-WriteStream.prototype.destroy = function(cb) {
- var self = this;
- if (!this.writable) {
- if (cb) process.nextTick(function() { cb(null); });
- return;
- }
- this.writable = false;
-
- function close() {
- fs.close(self.fd, function(err) {
- if (err) {
- if (cb) { cb(err); }
- self.emit('error', err);
- return;
- }
-
- if (cb) { cb(null); }
- self.emit('close');
- });
- }
-
- if (this.fd === null) {
- this.addListener('open', close);
- } else {
- close();
- }
-};
+WriteStream.prototype.destroy = ReadStream.prototype.destroy;
+WriteStream.prototype.close = ReadStream.prototype.close;
// There is no shutdown() for files.
WriteStream.prototype.destroySoon = WriteStream.prototype.end;
@@ -1649,6 +1668,7 @@ SyncWriteStream.prototype.write = function(data, arg1, arg2) {
throw new Error('bad arg');
}
}
+ assertEncoding(encoding);
// Change strings to buffers. SLOW
if (typeof data == 'string') {