Description
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

668 lines
16 KiB

4 years ago
const transports = require("./transports/index");
const Emitter = require("component-emitter");
const debug = require("debug")("engine.io-client:socket");
const parser = require("engine.io-parser");
const parseuri = require("parseuri");
const parseqs = require("parseqs");
class Socket extends Emitter {
/**
* Socket constructor.
*
* @param {String|Object} uri or options
* @param {Object} options
* @api public
*/
constructor(uri, opts = {}) {
super();
if (uri && "object" === typeof uri) {
opts = uri;
uri = null;
}
if (uri) {
uri = parseuri(uri);
opts.hostname = uri.host;
opts.secure = uri.protocol === "https" || uri.protocol === "wss";
opts.port = uri.port;
if (uri.query) opts.query = uri.query;
} else if (opts.host) {
opts.hostname = parseuri(opts.host).host;
}
this.secure =
null != opts.secure
? opts.secure
: typeof location !== "undefined" && "https:" === location.protocol;
if (opts.hostname && !opts.port) {
// if no port is specified manually, use the protocol default
opts.port = this.secure ? "443" : "80";
}
this.hostname =
opts.hostname ||
(typeof location !== "undefined" ? location.hostname : "localhost");
this.port =
opts.port ||
(typeof location !== "undefined" && location.port
? location.port
: this.secure
? 443
: 80);
this.transports = opts.transports || ["polling", "websocket"];
this.readyState = "";
this.writeBuffer = [];
this.prevBufferLen = 0;
this.opts = Object.assign(
{
path: "/engine.io",
agent: false,
withCredentials: false,
upgrade: true,
jsonp: true,
timestampParam: "t",
policyPort: 843,
rememberUpgrade: false,
rejectUnauthorized: true,
perMessageDeflate: {
threshold: 1024
},
transportOptions: {}
},
opts
);
this.opts.path = this.opts.path.replace(/\/$/, "") + "/";
if (typeof this.opts.query === "string") {
this.opts.query = parseqs.decode(this.opts.query);
}
// set on handshake
this.id = null;
this.upgrades = null;
this.pingInterval = null;
this.pingTimeout = null;
// set on heartbeat
this.pingTimeoutTimer = null;
this.open();
}
/**
* Creates transport of the given type.
*
* @param {String} transport name
* @return {Transport}
* @api private
*/
createTransport(name) {
debug('creating transport "%s"', name);
const query = clone(this.opts.query);
// append engine.io protocol identifier
query.EIO = parser.protocol;
// transport name
query.transport = name;
// session id if we already have one
if (this.id) query.sid = this.id;
const opts = Object.assign(
{},
this.opts.transportOptions[name],
this.opts,
{
query,
socket: this,
hostname: this.hostname,
secure: this.secure,
port: this.port
}
);
debug("options: %j", opts);
return new transports[name](opts);
}
/**
* Initializes transport to use and starts probe.
*
* @api private
*/
open() {
let transport;
if (
this.opts.rememberUpgrade &&
Socket.priorWebsocketSuccess &&
this.transports.indexOf("websocket") !== -1
) {
transport = "websocket";
} else if (0 === this.transports.length) {
// Emit error on next tick so it can be listened to
const self = this;
setTimeout(function() {
self.emit("error", "No transports available");
}, 0);
return;
} else {
transport = this.transports[0];
}
this.readyState = "opening";
// Retry with the next transport if the transport is disabled (jsonp: false)
try {
transport = this.createTransport(transport);
} catch (e) {
debug("error while creating transport: %s", e);
this.transports.shift();
this.open();
return;
}
transport.open();
this.setTransport(transport);
}
/**
* Sets the current transport. Disables the existing one (if any).
*
* @api private
*/
setTransport(transport) {
debug("setting transport %s", transport.name);
const self = this;
if (this.transport) {
debug("clearing existing transport %s", this.transport.name);
this.transport.removeAllListeners();
}
// set up transport
this.transport = transport;
// set up transport listeners
transport
.on("drain", function() {
self.onDrain();
})
.on("packet", function(packet) {
self.onPacket(packet);
})
.on("error", function(e) {
self.onError(e);
})
.on("close", function() {
self.onClose("transport close");
});
}
/**
* Probes a transport.
*
* @param {String} transport name
* @api private
*/
probe(name) {
debug('probing transport "%s"', name);
let transport = this.createTransport(name, { probe: 1 });
let failed = false;
const self = this;
Socket.priorWebsocketSuccess = false;
function onTransportOpen() {
if (self.onlyBinaryUpgrades) {
const upgradeLosesBinary =
!this.supportsBinary && self.transport.supportsBinary;
failed = failed || upgradeLosesBinary;
}
if (failed) return;
debug('probe transport "%s" opened', name);
transport.send([{ type: "ping", data: "probe" }]);
transport.once("packet", function(msg) {
if (failed) return;
if ("pong" === msg.type && "probe" === msg.data) {
debug('probe transport "%s" pong', name);
self.upgrading = true;
self.emit("upgrading", transport);
if (!transport) return;
Socket.priorWebsocketSuccess = "websocket" === transport.name;
debug('pausing current transport "%s"', self.transport.name);
self.transport.pause(function() {
if (failed) return;
if ("closed" === self.readyState) return;
debug("changing transport and sending upgrade packet");
cleanup();
self.setTransport(transport);
transport.send([{ type: "upgrade" }]);
self.emit("upgrade", transport);
transport = null;
self.upgrading = false;
self.flush();
});
} else {
debug('probe transport "%s" failed', name);
const err = new Error("probe error");
err.transport = transport.name;
self.emit("upgradeError", err);
}
});
}
function freezeTransport() {
if (failed) return;
// Any callback called by transport should be ignored since now
failed = true;
cleanup();
transport.close();
transport = null;
}
// Handle any error that happens while probing
function onerror(err) {
const error = new Error("probe error: " + err);
error.transport = transport.name;
freezeTransport();
debug('probe transport "%s" failed because of error: %s', name, err);
self.emit("upgradeError", error);
}
function onTransportClose() {
onerror("transport closed");
}
// When the socket is closed while we're probing
function onclose() {
onerror("socket closed");
}
// When the socket is upgraded while we're probing
function onupgrade(to) {
if (transport && to.name !== transport.name) {
debug('"%s" works - aborting "%s"', to.name, transport.name);
freezeTransport();
}
}
// Remove all listeners on the transport and on self
function cleanup() {
transport.removeListener("open", onTransportOpen);
transport.removeListener("error", onerror);
transport.removeListener("close", onTransportClose);
self.removeListener("close", onclose);
self.removeListener("upgrading", onupgrade);
}
transport.once("open", onTransportOpen);
transport.once("error", onerror);
transport.once("close", onTransportClose);
this.once("close", onclose);
this.once("upgrading", onupgrade);
transport.open();
}
/**
* Called when connection is deemed open.
*
* @api public
*/
onOpen() {
debug("socket open");
this.readyState = "open";
Socket.priorWebsocketSuccess = "websocket" === this.transport.name;
this.emit("open");
this.flush();
// we check for `readyState` in case an `open`
// listener already closed the socket
if (
"open" === this.readyState &&
this.opts.upgrade &&
this.transport.pause
) {
debug("starting upgrade probes");
let i = 0;
const l = this.upgrades.length;
for (; i < l; i++) {
this.probe(this.upgrades[i]);
}
}
}
/**
* Handles a packet.
*
* @api private
*/
onPacket(packet) {
if (
"opening" === this.readyState ||
"open" === this.readyState ||
"closing" === this.readyState
) {
debug('socket receive: type "%s", data "%s"', packet.type, packet.data);
this.emit("packet", packet);
// Socket is live - any packet counts
this.emit("heartbeat");
switch (packet.type) {
case "open":
this.onHandshake(JSON.parse(packet.data));
break;
case "ping":
this.resetPingTimeout();
this.sendPacket("pong");
this.emit("pong");
break;
case "error":
const err = new Error("server error");
err.code = packet.data;
this.onError(err);
break;
case "message":
this.emit("data", packet.data);
this.emit("message", packet.data);
break;
}
} else {
debug('packet received with socket readyState "%s"', this.readyState);
}
}
/**
* Called upon handshake completion.
*
* @param {Object} handshake obj
* @api private
*/
onHandshake(data) {
this.emit("handshake", data);
this.id = data.sid;
this.transport.query.sid = data.sid;
this.upgrades = this.filterUpgrades(data.upgrades);
this.pingInterval = data.pingInterval;
this.pingTimeout = data.pingTimeout;
this.onOpen();
// In case open handler closes socket
if ("closed" === this.readyState) return;
this.resetPingTimeout();
}
/**
* Sets and resets ping timeout timer based on server pings.
*
* @api private
*/
resetPingTimeout() {
clearTimeout(this.pingTimeoutTimer);
this.pingTimeoutTimer = setTimeout(() => {
this.onClose("ping timeout");
}, this.pingInterval + this.pingTimeout);
}
/**
* Called on `drain` event
*
* @api private
*/
onDrain() {
this.writeBuffer.splice(0, this.prevBufferLen);
// setting prevBufferLen = 0 is very important
// for example, when upgrading, upgrade packet is sent over,
// and a nonzero prevBufferLen could cause problems on `drain`
this.prevBufferLen = 0;
if (0 === this.writeBuffer.length) {
this.emit("drain");
} else {
this.flush();
}
}
/**
* Flush write buffers.
*
* @api private
*/
flush() {
if (
"closed" !== this.readyState &&
this.transport.writable &&
!this.upgrading &&
this.writeBuffer.length
) {
debug("flushing %d packets in socket", this.writeBuffer.length);
this.transport.send(this.writeBuffer);
// keep track of current length of writeBuffer
// splice writeBuffer and callbackBuffer on `drain`
this.prevBufferLen = this.writeBuffer.length;
this.emit("flush");
}
}
/**
* Sends a message.
*
* @param {String} message.
* @param {Function} callback function.
* @param {Object} options.
* @return {Socket} for chaining.
* @api public
*/
write(msg, options, fn) {
this.sendPacket("message", msg, options, fn);
return this;
}
send(msg, options, fn) {
this.sendPacket("message", msg, options, fn);
return this;
}
/**
* Sends a packet.
*
* @param {String} packet type.
* @param {String} data.
* @param {Object} options.
* @param {Function} callback function.
* @api private
*/
sendPacket(type, data, options, fn) {
if ("function" === typeof data) {
fn = data;
data = undefined;
}
if ("function" === typeof options) {
fn = options;
options = null;
}
if ("closing" === this.readyState || "closed" === this.readyState) {
return;
}
options = options || {};
options.compress = false !== options.compress;
const packet = {
type: type,
data: data,
options: options
};
this.emit("packetCreate", packet);
this.writeBuffer.push(packet);
if (fn) this.once("flush", fn);
this.flush();
}
/**
* Closes the connection.
*
* @api private
*/
close() {
const self = this;
if ("opening" === this.readyState || "open" === this.readyState) {
this.readyState = "closing";
if (this.writeBuffer.length) {
this.once("drain", function() {
if (this.upgrading) {
waitForUpgrade();
} else {
close();
}
});
} else if (this.upgrading) {
waitForUpgrade();
} else {
close();
}
}
function close() {
self.onClose("forced close");
debug("socket closing - telling transport to close");
self.transport.close();
}
function cleanupAndClose() {
self.removeListener("upgrade", cleanupAndClose);
self.removeListener("upgradeError", cleanupAndClose);
close();
}
function waitForUpgrade() {
// wait for upgrade to finish since we can't send packets while pausing a transport
self.once("upgrade", cleanupAndClose);
self.once("upgradeError", cleanupAndClose);
}
return this;
}
/**
* Called upon transport error
*
* @api private
*/
onError(err) {
debug("socket error %j", err);
Socket.priorWebsocketSuccess = false;
this.emit("error", err);
this.onClose("transport error", err);
}
/**
* Called upon transport close.
*
* @api private
*/
onClose(reason, desc) {
if (
"opening" === this.readyState ||
"open" === this.readyState ||
"closing" === this.readyState
) {
debug('socket close with reason: "%s"', reason);
const self = this;
// clear timers
clearTimeout(this.pingIntervalTimer);
clearTimeout(this.pingTimeoutTimer);
// stop event from firing again for transport
this.transport.removeAllListeners("close");
// ensure transport won't stay open
this.transport.close();
// ignore further transport communication
this.transport.removeAllListeners();
// set ready state
this.readyState = "closed";
// clear session id
this.id = null;
// emit close event
this.emit("close", reason, desc);
// clean buffers after, so users can still
// grab the buffers on `close` event
self.writeBuffer = [];
self.prevBufferLen = 0;
}
}
/**
* Filters upgrades, returning only those matching client transports.
*
* @param {Array} server upgrades
* @api private
*
*/
filterUpgrades(upgrades) {
const filteredUpgrades = [];
let i = 0;
const j = upgrades.length;
for (; i < j; i++) {
if (~this.transports.indexOf(upgrades[i]))
filteredUpgrades.push(upgrades[i]);
}
return filteredUpgrades;
}
}
Socket.priorWebsocketSuccess = false;
/**
* Protocol version.
*
* @api public
*/
Socket.protocol = parser.protocol; // this is an int
function clone(obj) {
const o = {};
for (let i in obj) {
if (obj.hasOwnProperty(i)) {
o[i] = obj[i];
}
}
return o;
}
module.exports = Socket;