/*!
* peer.js - peer object for hsd
* Copyright (c) 2017-2018, Christopher Jeffrey (MIT License).
* https://github.com/handshake-org/hsd
*/
'use strict';
const assert = require('bsert');
const EventEmitter = require('events');
const {Lock} = require('bmutex');
const {format} = require('util');
const tcp = require('btcp');
const dns = require('bdns');
const Logger = require('blgr');
const {RollingFilter} = require('@handshake-org/bfilter');
const {BufferMap} = require('buffer-map');
const Parser = require('./parser');
const Framer = require('./framer');
const packets = require('./packets');
const consensus = require('../protocol/consensus');
const common = require('./common');
const InvItem = require('../primitives/invitem');
const BIP152 = require('./bip152');
const Block = require('../primitives/block');
const TX = require('../primitives/tx');
const Claim = require('../primitives/claim');
const NetAddress = require('./netaddress');
const Network = require('../protocol/network');
const {BrontideStream} = require('./brontide');
const AirdropProof = require('../primitives/airdropproof');
const SlidingWindow = require('./slidingwindow');
const services = common.services;
const invTypes = InvItem.types;
const packetTypes = packets.types;
/** @typedef {import('net').Socket} NetSocket */
/** @typedef {import('../types').Hash} Hash */
/** @typedef {import('../types').Rate} Rate */
/** @typedef {import('../protocol/errors').VerifyError} VerifyError */
/**
* Represents a network peer.
* @alias module:net.Peer
* @extends EventEmitter
*/
class Peer extends EventEmitter {
/**
* Create a peer.
* @alias module:net.Peer
* @constructor
* @param {PeerOptions} options
*/
constructor(options) {
super();
this.options = new PeerOptions(options);
this.network = this.options.network;
this.logger = this.options.logger.context('peer');
this.locker = new Lock();
this.parser = new Parser(this.network);
this.framer = new Framer(this.network);
this.id = -1;
this.stream = null;
this.socket = null;
this.brontide = new BrontideStream();
this.encrypted = false;
this.identityKey = this.options.identityKey;
this.opened = false;
this.outbound = false;
this.loader = false;
this.address = new NetAddress();
this.local = new NetAddress();
this.name = null;
this.connected = false;
this.destroyed = false;
this.ack = false;
this.handshake = false;
this.time = 0;
this.lastSend = 0;
this.lastRecv = 0;
this.drainSize = 0;
this.drainQueue = [];
this.banScore = 0;
this.invQueue = [];
this.onPacket = null;
this.next = null;
this.prev = null;
this.version = -1;
this.services = 0;
this.height = -1;
this.agent = null;
this.noRelay = false;
this.preferHeaders = false;
this.hashContinue = consensus.ZERO_HASH;
this.spvFilter = null;
this.feeRate = -1;
this.compactMode = -1;
this.merkleBlock = null;
this.merkleTime = -1;
this.merkleMatches = 0;
this.merkleMap = null;
this.syncing = false;
this.sentAddr = false;
this.gettingAddr = false;
this.sentGetAddr = false;
this.challenge = null;
this.lastPong = -1;
this.lastPing = -1;
this.minPing = -1;
this.blockTime = -1;
this.bestHash = consensus.ZERO_HASH;
this.bestHeight = -1;
this.lastTip = consensus.ZERO_HASH;
this.lastStop = consensus.ZERO_HASH;
this.connectTimeout = null;
this.pingTimer = null;
this.invTimer = null;
this.stallTimer = null;
this.addrFilter = new RollingFilter(5000, 0.001);
this.invFilter = new RollingFilter(50000, 0.000001);
this.blockMap = new BufferMap();
this.txMap = new BufferMap();
this.claimMap = new BufferMap();
this.airdropMap = new BufferMap();
this.responseMap = new Map();
this.compactBlocks = new BufferMap();
this.nameMap = new BufferMap();
this.totalProofs = 0;
this.proofWindow = null;
this.init();
}
/**
* Create inbound peer from socket.
* @param {PeerOptions} options
* @param {NetSocket} socket
* @param {Boolean} encrypted
* @returns {Peer}
*/
static fromInbound(options, socket, encrypted) {
const peer = new this(options);
peer.accept(socket, encrypted);
return peer;
}
/**
* Create outbound peer from net address.
* @param {PeerOptions} options
* @param {NetAddress} addr
* @returns {Peer}
*/
static fromOutbound(options, addr) {
const peer = new this(options);
peer.connect(addr);
return peer;
}
/**
* Begin peer initialization.
* @private
*/
init() {
this.parser.on('packet', async (packet) => {
try {
await this.readPacket(packet);
} catch (e) {
this.error(e);
this.destroy();
}
});
this.parser.on('error', (err) => {
if (this.destroyed)
return;
this.error(err);
try {
this.sendReject('malformed', 'error parsing message');
this.increaseBan(10);
} catch (e) {
this.error(e);
}
});
}
/**
* Getter to retrieve hostname.
* @returns {String}
*/
hostname() {
return this.address.hostname;
}
/**
* Frame a payload with a header.
* @param {String} cmd - Packet type.
* @param {Buffer} payload
* @returns {Buffer} Payload with header prepended.
*/
framePacket(cmd, payload) {
return this.framer.packet(cmd, payload);
}
/**
* Feed data to the parser.
* @param {Buffer} data
*/
feedParser(data) {
return this.parser.feed(data);
}
/**
* Bind to socket.
* @param {net.Socket} socket
*/
_bind(socket, encrypted) {
assert(!this.socket);
this.socket = socket;
this.encrypted = encrypted;
this.stream = encrypted ? this.brontide : this.socket;
this.brontide.on('error', (err) => {
this.error(err);
this.destroy();
});
this.socket.on('error', (err) => {
if (!this.connected)
return;
this.error(err);
this.destroy();
});
this.socket.once('close', () => {
this.error('Socket hangup.');
this.destroy();
});
this.socket.on('drain', () => {
this.handleDrain();
});
this.stream.on('data', (chunk) => {
try {
this.lastRecv = Date.now();
this.feedParser(chunk);
} catch (e) {
this.error(e);
this.destroy();
}
});
this.socket.setNoDelay(true);
}
/**
* Accept an inbound socket.
* @param {net.Socket} socket
* @returns {net.Socket}
*/
accept(socket, encrypted) {
assert(!this.socket);
this.address = NetAddress.fromSocket(socket, this.network);
this.address.services = 0;
this.outbound = false;
this._bind(socket, encrypted);
if (encrypted) {
this.connected = false;
this.brontide.accept(socket, this.identityKey);
} else {
this.time = Date.now();
this.connected = true;
}
return socket;
}
/**
* Create the socket and begin connecting. This method
* will use `options.createSocket` if provided.
* @param {NetAddress} addr
* @returns {net.Socket}
*/
connect(addr) {
assert(!this.socket);
const socket = this.options.createSocket(addr.port, addr.host);
this.address = addr;
this.outbound = true;
this.connected = false;
this._bind(socket, addr.hasKey());
if (addr.hasKey())
this.brontide.connect(socket, this.identityKey, addr.key);
return socket;
}
/**
* Do a reverse dns lookup on peer's addr.
* @returns {Promise}
*/
async getName() {
try {
if (!this.name) {
const {host, port} = this.address;
const {hostname} = await dns.lookupService(host, port);
this.name = hostname;
}
} catch (e) {
;
}
return this.name;
}
/**
* Open and perform initial handshake (without rejection).
* @method
* @returns {Promise}
*/
async tryOpen() {
try {
await this.open();
} catch (e) {
;
}
}
/**
* Open and perform initial handshake.
* @method
* @returns {Promise}
*/
async open() {
try {
await this._open();
} catch (e) {
this.error(e);
this.destroy();
throw e;
}
}
/**
* Open and perform initial handshake.
* @method
* @returns {Promise}
*/
async _open() {
this.opened = true;
// Connect to peer.
await this.initConnect();
await this.initStall();
await this.initVersion();
await this.finalize();
assert(!this.destroyed);
// Finally we can let the pool know
// that this peer is ready to go.
this.emit('open');
}
/**
* Wait for connection.
* @private
* @returns {Promise}
*/
async initConnect() {
if (this.connected) {
assert(!this.outbound);
return Promise.resolve();
}
assert(this.stream);
assert(this.socket);
return new Promise((resolve, reject) => {
const cleanup = () => {
if (this.connectTimeout != null) {
clearTimeout(this.connectTimeout);
this.connectTimeout = null;
}
if (this.socket) {
// eslint-disable-next-line no-use-before-define
this.socket.removeListener('error', onError);
}
// eslint-disable-next-line no-use-before-define
this.brontide.removeListener('error', onError);
};
const onError = (err) => {
cleanup();
reject(err);
};
this.stream.once('connect', () => {
this.time = Date.now();
this.connected = true;
this.emit('connect');
cleanup();
resolve();
});
this.socket.once('error', onError);
this.brontide.once('error', onError);
this.connectTimeout = setTimeout(() => {
this.connectTimeout = null;
cleanup();
reject(new Error('Connection timed out.'));
}, Peer.CONNECT_TIMEOUT);
});
}
/**
* Setup stall timer.
* @private
* @returns {Promise}
*/
initStall() {
assert(!this.stallTimer);
assert(!this.destroyed);
this.stallTimer = setInterval(() => {
this.maybeTimeout();
}, Peer.STALL_INTERVAL);
return Promise.resolve();
}
/**
* Handle post handshake.
* @method
* @private
* @returns {Promise}
*/
async initVersion() {
assert(!this.destroyed);
if (this.outbound) {
if (this.version !== -1)
throw new Error('Peer prematurely introduced themselves (outbound).');
if (this.ack)
throw new Error('Peer prematurely acknowledged us (outbound).');
// Say hello.
this.sendVersion();
await this.wait(packetTypes.VERACK, Peer.HANDSHAKE_TIMEOUT);
assert(this.ack);
if (this.version === -1)
await this.wait(packetTypes.VERSION, Peer.HANDSHAKE_TIMEOUT);
assert(this.version !== -1);
} else {
// We're shy. Wait for an introduction.
if (this.version === -1)
await this.wait(packetTypes.VERSION, Peer.HANDSHAKE_TIMEOUT);
assert(this.version !== -1);
if (this.ack)
throw new Error('Peer prematurely acknowledged us (inbound).');
this.sendVersion();
await this.wait(packetTypes.VERACK, Peer.HANDSHAKE_TIMEOUT);
}
if (this.destroyed)
throw new Error('Peer was destroyed during handshake.');
this.handshake = true;
this.logger.debug('Version handshake complete (%s).', this.hostname());
}
/**
* Finalize peer after handshake.
* @method
* @private
* @returns {Promise}
*/
async finalize() {
assert(!this.destroyed);
// Setup the ping interval.
this.pingTimer = setInterval(() => {
this.sendPing();
}, Peer.PING_INTERVAL);
// Setup the inv flusher.
this.invTimer = setInterval(() => {
this.flushInv();
}, Peer.INV_INTERVAL);
this.proofWindow = new SlidingWindow({
limit: this.options.maxProofRPS
});
this.proofWindow.start();
}
/**
* Broadcast blocks to peer.
* @param {Block[]} blocks
*/
announceBlock(blocks) {
if (!this.handshake)
return;
if (this.destroyed)
return;
if (!Array.isArray(blocks))
blocks = [blocks];
const inv = [];
for (const block of blocks) {
assert(block instanceof Block);
// Don't send if they already have it.
if (this.invFilter.test(block.hash()))
continue;
// Send them the block immediately if
// they're using compact block mode 1.
if (this.compactMode === 1) {
this.invFilter.add(block.hash());
this.sendCompactBlock(block);
continue;
}
// Convert item to block headers
// for peers that request it.
if (this.preferHeaders) {
inv.push(block.toHeaders());
continue;
}
inv.push(block.toInv());
}
if (this.preferHeaders) {
this.sendHeaders(inv);
return;
}
this.queueInv(inv);
}
/**
* Broadcast transactions to peer.
* @param {TX[]} txs
*/
announceTX(txs) {
if (!this.handshake)
return;
if (this.destroyed)
return;
// Do not send txs to spv clients
// that have relay unset.
if (this.noRelay)
return;
if (!Array.isArray(txs))
txs = [txs];
const inv = [];
for (const tx of txs) {
assert(tx instanceof TX);
// Don't send if they already have it.
if (this.invFilter.test(tx.hash()))
continue;
// Check the peer's bloom
// filter if they're using spv.
if (this.spvFilter) {
if (!tx.testAndMaybeUpdate(this.spvFilter))
continue;
}
// Check the fee filter.
if (this.feeRate !== -1) {
const rate = this.options.getRate(tx.hash());
if (rate !== -1 && rate < this.feeRate)
continue;
}
inv.push(tx.toInv());
}
this.queueInv(inv);
}
/**
* Broadcast transactions to peer.
* @param {Claim[]} claims
*/
announceClaim(claims) {
if (!this.handshake)
return;
if (this.destroyed)
return;
// Do not send claims to spv clients
// that have relay unset.
if (this.noRelay)
return;
if (!Array.isArray(claims))
claims = [claims];
const inv = [];
for (const claim of claims) {
assert(claim instanceof Claim);
// Don't send if they already have it.
if (this.invFilter.test(claim.hash()))
continue;
inv.push(claim.toInv());
}
this.queueInv(inv);
}
/**
* Broadcast transactions to peer.
* @param {AirdropProof[]} proofs
*/
announceAirdrop(proofs) {
if (!this.handshake)
return;
if (this.destroyed)
return;
// Do not send proofs to spv clients
// that have relay unset.
if (this.noRelay)
return;
if (!Array.isArray(proofs))
proofs = [proofs];
const inv = [];
for (const proof of proofs) {
assert(proof instanceof AirdropProof);
// Don't send if they already have it.
if (this.invFilter.test(proof.hash()))
continue;
inv.push(proof.toInv(InvItem));
}
this.queueInv(inv);
}
/**
* Send inv to a peer.
* @param {InvItem[]} items
*/
queueInv(items) {
if (!this.handshake)
return;
if (this.destroyed)
return;
if (!Array.isArray(items))
items = [items];
let hasBlock = false;
for (const item of items) {
if (item.type === invTypes.BLOCK)
hasBlock = true;
this.invQueue.push(item);
}
if (this.invQueue.length >= 500 || hasBlock)
this.flushInv();
}
/**
* Flush inv queue.
* @private
*/
flushInv() {
if (this.destroyed)
return;
const queue = this.invQueue;
if (queue.length === 0)
return;
this.invQueue = [];
this.logger.spam('Serving %d inv items to %s.',
queue.length, this.hostname());
const items = [];
for (const item of queue) {
if (!this.invFilter.added(item.hash))
continue;
items.push(item);
}
for (let i = 0; i < items.length; i += 1000) {
const chunk = items.slice(i, i + 1000);
this.send(new packets.InvPacket(chunk));
}
}
/**
* Force send an inv (no filter check).
* @param {InvItem[]} items
*/
sendInv(items) {
if (!this.handshake)
return;
if (this.destroyed)
return;
if (!Array.isArray(items))
items = [items];
for (const item of items)
this.invFilter.add(item.hash);
if (items.length === 0)
return;
this.logger.spam('Serving %d inv items to %s.',
items.length, this.hostname());
for (let i = 0; i < items.length; i += 1000) {
const chunk = items.slice(i, i + 1000);
this.send(new packets.InvPacket(chunk));
}
}
/**
* Send headers to a peer.
* @param {Headers[]} items
*/
sendHeaders(items) {
if (!this.handshake)
return;
if (this.destroyed)
return;
if (!Array.isArray(items))
items = [items];
for (const item of items)
this.invFilter.add(item.hash());
if (items.length === 0)
return;
this.logger.spam('Serving %d headers to %s.',
items.length, this.hostname());
for (let i = 0; i < items.length; i += 2000) {
const chunk = items.slice(i, i + 2000);
this.send(new packets.HeadersPacket(chunk));
}
}
/**
* Send a compact block.
* @private
* @param {Block} block
* @returns {Boolean}
*/
sendCompactBlock(block) {
const compact = BIP152.CompactBlock.fromBlock(block);
this.send(new packets.CmpctBlockPacket(compact));
}
/**
* Send a `version` packet.
*/
sendVersion() {
const packet = new packets.VersionPacket();
packet.version = this.options.version;
packet.services = this.options.services;
packet.time = this.network.now();
packet.remote = this.address;
packet.nonce = this.options.createNonce(this.hostname());
packet.agent = this.options.agent;
packet.height = this.options.getHeight();
packet.noRelay = this.options.noRelay;
this.send(packet);
}
/**
* Send a `getaddr` packet.
*/
sendGetAddr() {
if (this.sentGetAddr)
return;
this.sentGetAddr = true;
this.send(new packets.GetAddrPacket());
}
/**
* Send a `ping` packet.
*/
sendPing() {
if (!this.handshake)
return;
if (this.challenge) {
this.logger.debug(
'Peer has not responded to ping (%s).',
this.hostname());
return;
}
this.lastPing = Date.now();
this.challenge = common.nonce();
this.send(new packets.PingPacket(this.challenge));
}
/**
* Send `filterload` to update the local bloom filter.
*/
sendFilterLoad(filter) {
if (!this.handshake)
return;
if (!this.options.spv)
return;
if (!(this.services & services.BLOOM))
return;
this.send(new packets.FilterLoadPacket(filter));
}
/**
* Set a fee rate filter for the peer.
* @param {Rate} rate
*/
sendFeeRate(rate) {
if (!this.handshake)
return;
this.send(new packets.FeeFilterPacket(rate));
}
/**
* Disconnect from and destroy the peer.
*/
destroy() {
const connected = this.connected;
if (this.destroyed)
return;
this.destroyed = true;
this.connected = false;
this.socket.destroy();
this.socket = null;
if (this.pingTimer != null) {
clearInterval(this.pingTimer);
this.pingTimer = null;
}
if (this.invTimer != null) {
clearInterval(this.invTimer);
this.invTimer = null;
}
if (this.proofWindow != null) {
this.proofWindow.stop();
}
if (this.stallTimer != null) {
clearInterval(this.stallTimer);
this.stallTimer = null;
}
if (this.connectTimeout != null) {
clearTimeout(this.connectTimeout);
this.connectTimeout = null;
}
const jobs = this.drainQueue;
this.drainSize = 0;
this.drainQueue = [];
for (const job of jobs)
job.reject(new Error('Peer was destroyed.'));
for (const [cmd, entry] of this.responseMap) {
this.responseMap.delete(cmd);
entry.reject(new Error('Peer was destroyed.'));
}
this.locker.destroy();
this.emit('close', connected);
}
/**
* Write data to the peer's socket.
* @param {Buffer} data
*/
write(data) {
if (this.destroyed)
throw new Error('Peer is destroyed (write).');
this.lastSend = Date.now();
if (this.stream.write(data) === false)
this.needsDrain(data.length);
}
/**
* Send a packet.
* @param {Packet} packet
*/
send(packet) {
if (this.destroyed)
throw new Error('Peer is destroyed (send).');
this.sendRaw(packet.rawType, packet.encode());
this.addTimeout(packet);
}
/**
* Send a packet.
* @param {Packet} packet
*/
sendRaw(type, body) {
const payload = this.framePacket(type, body);
this.write(payload);
}
/**
* Wait for a drain event.
* @returns {Promise}
*/
drain() {
if (this.destroyed)
return Promise.reject(new Error('Peer is destroyed.'));
if (this.drainSize === 0)
return Promise.resolve();
return new Promise((resolve, reject) => {
this.drainQueue.push({ resolve, reject });
});
}
/**
* Handle drain event.
* @private
*/
handleDrain() {
const jobs = this.drainQueue;
this.drainSize = 0;
if (jobs.length === 0)
return;
this.drainQueue = [];
for (const job of jobs)
job.resolve();
}
/**
* Add to drain counter.
* @private
* @param {Number} size
*/
needsDrain(size) {
this.drainSize += size;
if (this.drainSize >= Peer.DRAIN_MAX) {
this.logger.warning(
'Peer is not reading: %dmb buffered (%s).',
this.drainSize / (1 << 20),
this.hostname());
this.error('Peer stalled (drain).');
this.destroy();
}
}
/**
* Potentially add response timeout.
* @private
* @param {Packet} packet
*/
addTimeout(packet) {
const timeout = Peer.RESPONSE_TIMEOUT;
if (!this.outbound)
return;
switch (packet.type) {
case packetTypes.GETBLOCKS:
if (!this.options.isFull())
this.request(packetTypes.INV, timeout);
break;
case packetTypes.GETHEADERS:
this.request(packetTypes.HEADERS, timeout * 2);
break;
case packetTypes.GETDATA:
this.request(packetTypes.DATA, timeout * 2);
break;
case packetTypes.GETBLOCKTXN:
this.request(packetTypes.BLOCKTXN, timeout);
break;
case packetTypes.GETPROOF:
this.request(packetTypes.PROOF, timeout);
break;
}
}
/**
* Potentially finish response timeout.
* @private
* @param {Packet} packet
*/
fulfill(packet) {
switch (packet.type) {
case packetTypes.BLOCK:
case packetTypes.CMPCTBLOCK:
case packetTypes.MERKLEBLOCK:
case packetTypes.TX:
case packetTypes.CLAIM:
case packetTypes.AIRDROP:
case packetTypes.NOTFOUND: {
const entry = this.response(packetTypes.DATA, packet);
assert(!entry || entry.jobs.length === 0);
break;
}
}
return this.response(packet.type, packet);
}
/**
* Potentially timeout peer if it hasn't responded.
* @private
*/
maybeTimeout() {
const now = Date.now();
for (const [key, entry] of this.responseMap) {
if (now > entry.timeout) {
const name = packets.typesByVal[key];
this.error('Peer is stalling (%s).', name.toLowerCase());
this.destroy();
return;
}
}
if (this.merkleBlock) {
assert(this.merkleTime !== -1);
if (now > this.merkleTime + Peer.BLOCK_TIMEOUT) {
this.error('Peer is stalling (merkleblock).');
this.destroy();
return;
}
}
if (this.syncing && this.loader && !this.options.isFull()) {
if (now > this.blockTime + Peer.BLOCK_TIMEOUT) {
this.error('Peer is stalling (block).');
this.destroy();
return;
}
}
if (this.options.isFull() || !this.syncing) {
for (const time of this.blockMap.values()) {
if (now > time + Peer.BLOCK_TIMEOUT) {
this.error('Peer is stalling (block).');
this.destroy();
return;
}
}
for (const time of this.txMap.values()) {
if (now > time + Peer.TX_TIMEOUT) {
this.error('Peer is stalling (tx).');
this.destroy();
return;
}
}
for (const time of this.claimMap.values()) {
if (now > time + Peer.TX_TIMEOUT) {
this.error('Peer is stalling (claim).');
this.destroy();
return;
}
}
for (const time of this.airdropMap.values()) {
if (now > time + Peer.TX_TIMEOUT) {
this.error('Peer is stalling (airdrop).');
this.destroy();
return;
}
}
for (const block of this.compactBlocks.values()) {
if (now > block.now + Peer.RESPONSE_TIMEOUT) {
this.error('Peer is stalling (blocktxn).');
this.destroy();
return;
}
}
}
for (const time of this.nameMap.values()) {
if (now > time + Peer.NAME_TIMEOUT) {
this.error('Peer is stalling (name).');
this.destroy();
return;
}
}
if (now > this.time + 60000) {
assert(this.time !== 0);
if (this.lastRecv === 0 || this.lastSend === 0) {
this.error('Peer is stalling (no message).');
this.destroy();
return;
}
if (now > this.lastSend + Peer.TIMEOUT_INTERVAL) {
this.error('Peer is stalling (send).');
this.destroy();
return;
}
if (now > this.lastRecv + Peer.TIMEOUT_INTERVAL) {
this.error('Peer is stalling (recv).');
this.destroy();
return;
}
if (this.challenge && now > this.lastPing + Peer.TIMEOUT_INTERVAL) {
this.error('Peer is stalling (ping).');
this.destroy();
return;
}
}
}
/**
* Wait for a packet to be received from peer.
* @private
* @param {Number} type - Packet type.
* @param {Number} timeout
* @returns {RequestEntry}
*/
request(type, timeout) {
if (this.destroyed)
return null;
let entry = this.responseMap.get(type);
if (!entry) {
entry = new RequestEntry();
this.responseMap.set(type, entry);
if (this.responseMap.size >= common.MAX_REQUEST) {
this.destroy();
return null;
}
}
entry.setTimeout(timeout);
return entry;
}
/**
* Fulfill awaiting requests created with {@link Peer#request}.
* @private
* @param {Number} type - Packet type.
* @param {Object} payload
*/
response(type, payload) {
const entry = this.responseMap.get(type);
if (!entry)
return null;
this.responseMap.delete(type);
return entry;
}
/**
* Wait for a packet to be received from peer.
* @private
* @param {Number} type - Packet type.
* @returns {Promise} - Returns Object(payload).
* Executed on timeout or once packet is received.
*/
wait(type, timeout) {
return new Promise((resolve, reject) => {
const entry = this.request(type);
if (!entry) {
reject(new Error('Peer is destroyed (request).'));
return;
}
entry.setTimeout(timeout);
entry.addJob(resolve, reject);
});
}
/**
* Emit an error and destroy the peer.
* @private
* @param {...String|Error} err
*/
error(err) {
if (this.destroyed)
return;
if (typeof err === 'string') {
const msg = format.apply(null, arguments);
err = new Error(msg);
}
if (typeof err.code === 'string' && err.code[0] === 'E') {
const msg = err.code;
err = new Error(msg);
err.code = msg;
err.message = `Socket Error: ${msg}`;
}
err.message += ` (${this.hostname()})`;
this.emit('error', err);
}
/**
* Calculate peer block inv type (filtered,
* compact, witness, or non-witness).
* @returns {Number}
*/
blockType() {
if (this.options.spv)
return invTypes.FILTERED_BLOCK;
if (this.options.compact && this.hasCompact())
return invTypes.CMPCT_BLOCK;
return invTypes.BLOCK;
}
/**
* Calculate peer tx inv type (witness or non-witness).
* @returns {Number}
*/
txType() {
return invTypes.TX;
}
/**
* Send `getdata` to peer.
* @param {InvItem[]} items
*/
getData(items) {
this.send(new packets.GetDataPacket(items));
}
/**
* Send batched `getdata` to peer.
* @param {InvType} type
* @param {Hash[]} hashes
*/
getItems(type, hashes) {
const items = [];
for (const hash of hashes)
items.push(new InvItem(type, hash));
if (items.length === 0)
return;
this.getData(items);
}
/**
* Send batched `getdata` to peer (blocks).
* @param {Hash[]} hashes
*/
getBlock(hashes) {
this.getItems(this.blockType(), hashes);
}
/**
* Send batched `getdata` to peer (txs).
* @param {Hash[]} hashes
*/
getTX(hashes) {
this.getItems(this.txType(), hashes);
}
/**
* Send batched `getdata` to peer (claims).
* @param {Hash[]} hashes
*/
getClaim(hashes) {
this.getItems(invTypes.CLAIM, hashes);
}
/**
* Send batched `getdata` to peer (airdrops).
* @param {Hash[]} hashes
*/
getAirdrop(hashes) {
this.getItems(invTypes.AIRDROP, hashes);
}
/**
* Send `getdata` to peer for a single block.
* @param {Hash} hash
*/
getFullBlock(hash) {
assert(!this.options.spv);
this.getItems(invTypes.BLOCK, [hash]);
}
/**
* Handle a packet payload.
* @method
* @private
* @param {Packet} packet
*/
async readPacket(packet) {
if (this.destroyed)
return;
// The "pre-handshake" packets get
// to bypass the lock, since they
// are meant to change the way input
// is handled at a low level. They
// must be handled immediately.
switch (packet.type) {
case packetTypes.PONG: {
try {
this.socket.pause();
await this.handlePacket(packet);
} finally {
if (!this.destroyed && this.socket) {
try {
this.socket.resume();
} catch (e) {
;
}
}
}
break;
}
default: {
const unlock = await this.locker.lock();
try {
this.socket.pause();
await this.handlePacket(packet);
} finally {
if (!this.destroyed && this.socket) {
try {
this.socket.resume();
} catch (e) {
;
}
}
unlock();
}
break;
}
}
}
/**
* Handle a packet payload without a lock.
* @method
* @private
* @param {Packet} packet
*/
async handlePacket(packet) {
if (this.destroyed)
throw new Error('Destroyed peer sent a packet.');
const entry = this.fulfill(packet);
switch (packet.type) {
case packetTypes.VERSION:
await this.handleVersion(packet);
break;
case packetTypes.VERACK:
await this.handleVerack(packet);
break;
case packetTypes.PING:
await this.handlePing(packet);
break;
case packetTypes.PONG:
await this.handlePong(packet);
break;
case packetTypes.SENDHEADERS:
await this.handleSendHeaders(packet);
break;
case packetTypes.FILTERLOAD:
await this.handleFilterLoad(packet);
break;
case packetTypes.FILTERADD:
await this.handleFilterAdd(packet);
break;
case packetTypes.FILTERCLEAR:
await this.handleFilterClear(packet);
break;
case packetTypes.FEEFILTER:
await this.handleFeeFilter(packet);
break;
case packetTypes.SENDCMPCT:
await this.handleSendCmpct(packet);
break;
}
if (this.onPacket)
await this.onPacket(packet);
this.emit('packet', packet);
if (entry)
entry.resolve(packet);
}
/**
* Handle `version` packet.
* @method
* @private
* @param {VersionPacket} packet
*/
async handleVersion(packet) {
if (this.version !== -1)
throw new Error('Peer sent a duplicate version.');
this.version = packet.version;
this.services = packet.services;
this.height = packet.height;
this.agent = packet.agent;
this.noRelay = packet.noRelay;
this.local = packet.remote;
if (!this.network.selfConnect) {
if (this.options.hasNonce(packet.nonce))
throw new Error('We connected to ourself. Oops.');
}
if (this.version < common.MIN_VERSION)
throw new Error('Peer does not support required protocol version.');
if (this.outbound) {
if (!(this.services & services.NETWORK))
throw new Error('Peer does not support network services.');
if (this.options.spv) {
if (!(this.services & services.BLOOM))
throw new Error('Peer does not support BIP37.');
}
}
this.send(new packets.VerackPacket());
}
/**
* Handle `verack` packet.
* @method
* @private
* @param {VerackPacket} packet
*/
async handleVerack(packet) {
if (this.ack) {
this.logger.debug('Peer sent duplicate ack (%s).', this.hostname());
return;
}
this.ack = true;
this.logger.debug('Received verack (%s).', this.hostname());
}
/**
* Handle `ping` packet.
* @method
* @private
* @param {PingPacket} packet
*/
async handlePing(packet) {
if (!packet.nonce)
return;
this.send(new packets.PongPacket(packet.nonce));
}
/**
* Handle `pong` packet.
* @method
* @private
* @param {PongPacket} packet
*/
async handlePong(packet) {
const nonce = packet.nonce;
const now = Date.now();
if (!this.challenge) {
this.logger.debug('Peer sent an unsolicited pong (%s).', this.hostname());
return;
}
if (!nonce.equals(this.challenge)) {
if (nonce.equals(common.ZERO_NONCE)) {
this.logger.debug('Peer sent a zero nonce (%s).', this.hostname());
this.challenge = null;
return;
}
this.logger.debug('Peer sent the wrong nonce (%s).', this.hostname());
return;
}
if (now >= this.lastPing) {
this.lastPong = now;
if (this.minPing === -1)
this.minPing = now - this.lastPing;
this.minPing = Math.min(this.minPing, now - this.lastPing);
} else {
this.logger.debug('Timing mismatch (what?) (%s).', this.hostname());
}
this.challenge = null;
}
/**
* Handle `sendheaders` packet.
* @method
* @private
* @param {SendHeadersPacket} packet
*/
async handleSendHeaders(packet) {
if (this.preferHeaders) {
this.logger.debug(
'Peer sent a duplicate sendheaders (%s).',
this.hostname());
return;
}
this.preferHeaders = true;
}
/**
* Handle `filterload` packet.
* @method
* @private
* @param {FilterLoadPacket} packet
*/
async handleFilterLoad(packet) {
if (!packet.isWithinConstraints()) {
this.increaseBan(100);
return;
}
this.spvFilter = packet.filter;
this.noRelay = false;
}
/**
* Handle `filteradd` packet.
* @method
* @private
* @param {FilterAddPacket} packet
*/
async handleFilterAdd(packet) {
const data = packet.data;
if (data.length > consensus.MAX_SCRIPT_PUSH) {
this.increaseBan(100);
return;
}
if (this.spvFilter)
this.spvFilter.add(data);
this.noRelay = false;
}
/**
* Handle `filterclear` packet.
* @method
* @private
* @param {FilterClearPacket} packet
*/
async handleFilterClear(packet) {
if (this.spvFilter)
this.spvFilter.reset();
this.noRelay = false;
}
/**
* Handle `feefilter` packet.
* @method
* @private
* @param {FeeFilterPacket} packet
*/
async handleFeeFilter(packet) {
const rate = packet.rate;
if (rate < 0 || rate > consensus.MAX_MONEY) {
this.increaseBan(100);
return;
}
this.feeRate = rate;
}
/**
* Handle `sendcmpct` packet.
* @method
* @private
* @param {SendCmpctPacket}
*/
async handleSendCmpct(packet) {
if (this.compactMode !== -1) {
this.logger.debug(
'Peer sent a duplicate sendcmpct (%s).',
this.hostname());
return;
}
if (packet.version > 1) {
// Ignore
this.logger.info(
'Peer request compact blocks version %d (%s).',
packet.version, this.hostname());
return;
}
if (packet.mode > 1) {
this.logger.info(
'Peer request compact blocks mode %d (%s).',
packet.mode, this.hostname());
return;
}
this.logger.info(
'Peer initialized compact blocks (mode=%d, version=%d) (%s).',
packet.mode, packet.version, this.hostname());
this.compactMode = packet.mode;
}
/**
* Send `getheaders` to peer. Note that unlike
* `getblocks`, `getheaders` can have a null locator.
* @param {Hash[]} locator - Chain locator.
* @param {Hash} stop - Hash to stop at.
*/
sendGetHeaders(locator, stop) {
const packet = new packets.GetHeadersPacket(locator, stop);
let hash = consensus.ZERO_HASH;
if (packet.locator.length > 0)
hash = packet.locator[0];
this.logger.debug(
'Requesting headers packet from peer with getheaders (%s).',
this.hostname());
this.logger.debug(
'Sending getheaders (hash=%x, stop=%x).',
hash, stop);
this.send(packet);
}
/**
* Send `getblocks` to peer.
* @param {Hash[]} locator - Chain locator.
* @param {Hash} stop - Hash to stop at.
*/
sendGetBlocks(locator, stop) {
const packet = new packets.GetBlocksPacket(locator, stop);
let hash = consensus.ZERO_HASH;
if (packet.locator.length > 0)
hash = packet.locator[0];
if (hash.equals(this.lastTip) && stop.equals(this.lastStop))
return;
this.lastTip = hash;
this.lastStop = stop;
this.logger.debug(
'Requesting inv packet from peer with getblocks (%s).',
this.hostname());
this.logger.debug(
'Sending getblocks (hash=%x, stop=%x).',
hash, stop);
this.send(packet);
}
/**
* Send `mempool` to peer.
*/
sendMempool() {
if (!this.handshake)
return;
if (!(this.services & services.BLOOM)) {
this.logger.debug(
'Cannot request mempool for non-bloom peer (%s).',
this.hostname());
return;
}
this.logger.debug(
'Requesting inv packet from peer with mempool (%s).',
this.hostname());
this.send(new packets.MempoolPacket());
}
/**
* Send `reject` to peer.
* @param {Number} code
* @param {String} reason
* @param {Number} msg
* @param {Hash} hash
*/
sendReject(code, reason, msg, hash) {
const reject = packets.RejectPacket.fromReason(code, reason, msg, hash);
if (msg != null) {
this.logger.debug('Rejecting %s %x (%s): code=%s reason=%s.',
packets.typesByVal[msg] || 'UNKNOWN',
hash, this.hostname(), code, reason);
} else {
this.logger.debug('Rejecting packet from %s: code=%s reason=%s.',
this.hostname(), code, reason);
}
this.logger.debug(
'Sending reject packet to peer (%s).',
this.hostname());
this.send(reject);
}
/**
* Send a `sendcmpct` packet.
* @param {Number} mode
*/
sendCompact(mode) {
this.logger.info(
'Initializing normal compact blocks (%s).',
this.hostname());
this.send(new packets.SendCmpctPacket(mode, 1));
}
/**
* Send a `getproof` packet.
* @param {Buffer} root
* @param {Buffer} key
*/
sendGetProof(root, key) {
this.logger.info(
'Sending proof request for %x (%s).',
key,
this.hostname());
this.send(new packets.GetProofPacket(root, key));
}
/**
* Send a `proof` packet.
* @param {Number} version
* @param {Buffer} root
* @param {Buffer} key
* @param {Proof} proof
*/
sendProof(root, key, proof) {
this.logger.info(
'Sending proof (%s).',
this.hostname());
this.proofWindow.increase(1);
if (!this.proofWindow.allow()) {
this.logger.debug('proof: rate limit exceeded (%s).', this.hostname);
this.ban();
return;
}
this.send(new packets.ProofPacket(root, key, proof));
}
/**
* Increase banscore on peer.
* @param {Number} score
* @returns {Boolean}
*/
increaseBan(score) {
this.banScore += score;
if (this.banScore >= this.options.banScore) {
this.logger.debug('Ban threshold exceeded (%s).', this.hostname());
this.ban();
return true;
}
return false;
}
/**
* Ban peer.
*/
ban() {
this.emit('ban');
}
/**
* Send a `reject` packet to peer.
* @param {Number} msg
* @param {VerifyError} err
* @returns {Boolean}
*/
reject(msg, err) {
this.sendReject(err.code, err.reason, msg, err.hash);
return this.increaseBan(err.score);
}
/**
* Returns human readable list of services
* that are available.
* @returns {String[]}
*/
getServiceNames() {
const enabled = [];
for (const [service, bit] of Object.entries(services)) {
if (this.hasServices(bit))
enabled.push(service);
}
return enabled;
}
/**
* Test whether required services are available.
* @param {Number} services
* @returns {Boolean}
*/
hasServices(services) {
return (this.services & services) === services;
}
/**
* Test whether the peer sent us a
* compatible compact block handshake.
* @returns {Boolean}
*/
hasCompact() {
if (this.compactMode === -1)
return false;
return true;
}
/**
* Inspect the peer.
* @returns {String}
*/
inspect() {
return '<Peer:'
+ ` handshake=${this.handshake}`
+ ` host=${this.hostname()}`
+ ` outbound=${this.outbound}`
+ ` ping=${this.minPing}`
+ '>';
}
}
/**
* Max output bytes buffered before
* invoking stall behavior for peer.
* @const {Number}
* @default
*/
Peer.DRAIN_MAX = 10 << 20;
/**
* Interval to check for drainage
* and required responses from peer.
* @const {Number}
* @default
*/
Peer.STALL_INTERVAL = 5000;
/**
* Interval for pinging peers.
* @const {Number}
* @default
*/
Peer.PING_INTERVAL = 30000;
/**
* Interval to flush invs.
* Higher means more invs (usually
* txs) will be accumulated before
* flushing.
* @const {Number}
* @default
*/
Peer.INV_INTERVAL = 5000;
/**
* Required time for peers to
* respond to messages (i.e.
* getblocks/getdata).
* @const {Number}
* @default
*/
Peer.RESPONSE_TIMEOUT = 30000;
/**
* Required time for loader to
* respond with block/merkleblock.
* @const {Number}
* @default
*/
Peer.BLOCK_TIMEOUT = 120000;
/**
* Required time for loader to
* respond with a tx.
* @const {Number}
* @default
*/
Peer.TX_TIMEOUT = 120000;
/**
* Required time for peer to
* respond with a name.
* @const {Number}
* @default
*/
Peer.NAME_TIMEOUT = 5000;
/**
* Generic timeout interval.
* @const {Number}
* @default
*/
Peer.TIMEOUT_INTERVAL = 20 * 60000;
/**
* Connection timeout.
* @const {Number}
* @default
*/
Peer.CONNECT_TIMEOUT = 5 * 1000;
/**
* Handshake timeout.
* @const {Number}
* @default
*/
Peer.HANDSHAKE_TIMEOUT = 5 * 1000;
/**
* Peer Options
* @alias module:net.PeerOptions
*/
class PeerOptions {
/**
* Create peer options.
* @constructor
*/
constructor(options) {
this.network = Network.primary;
this.logger = Logger.global;
this.createSocket = tcp.createSocket;
this.version = common.PROTOCOL_VERSION;
this.services = common.LOCAL_SERVICES;
this.agent = common.USER_AGENT;
this.identityKey = common.ZERO_KEY;
this.noRelay = false;
this.spv = false;
this.compact = false;
this.headers = false;
this.banScore = common.BAN_SCORE;
this.maxProofRPS = 100;
this.getHeight = PeerOptions.getHeight;
this.isFull = PeerOptions.isFull;
this.createNonce = PeerOptions.createNonce;
this.hasNonce = PeerOptions.hasNonce;
this.getRate = PeerOptions.getRate;
if (options)
this.fromOptions(options);
}
/**
* Inject properties from object.
* @private
* @param {Object} options
* @returns {PeerOptions}
*/
fromOptions(options) {
assert(options, 'Options are required.');
if (options.network != null)
this.network = Network.get(options.network);
if (options.logger != null) {
assert(typeof options.logger === 'object');
this.logger = options.logger;
}
if (options.createSocket != null) {
assert(typeof options.createSocket === 'function');
this.createSocket = options.createSocket;
}
if (options.version != null) {
assert(typeof options.version === 'number');
this.version = options.version;
}
if (options.services != null) {
assert(typeof options.services === 'number');
this.services = options.services;
}
if (options.agent != null) {
assert(typeof options.agent === 'string');
this.agent = options.agent;
}
if (options.identityKey != null) {
assert(Buffer.isBuffer(options.identityKey));
assert(options.identityKey.length === 32);
this.identityKey = options.identityKey;
}
if (options.noRelay != null) {
assert(typeof options.noRelay === 'boolean');
this.noRelay = options.noRelay;
}
if (options.spv != null) {
assert(typeof options.spv === 'boolean');
this.spv = options.spv;
}
if (options.compact != null) {
assert(typeof options.compact === 'boolean');
this.compact = options.compact;
}
if (options.headers != null) {
assert(typeof options.headers === 'boolean');
this.headers = options.headers;
}
if (options.banScore != null) {
assert(typeof options.banScore === 'number');
this.banScore = options.banScore;
}
if (options.maxProofRPS != null) {
assert(typeof options.maxProofRPS === 'number');
this.maxProofRPS = options.maxProofRPS;
}
if (options.getHeight != null) {
assert(typeof options.getHeight === 'function');
this.getHeight = options.getHeight;
}
if (options.isFull != null) {
assert(typeof options.isFull === 'function');
this.isFull = options.isFull;
}
if (options.createNonce != null) {
assert(typeof options.createNonce === 'function');
this.createNonce = options.createNonce;
}
if (options.hasNonce != null) {
assert(typeof options.hasNonce === 'function');
this.hasNonce = options.hasNonce;
}
if (options.getRate != null) {
assert(typeof options.getRate === 'function');
this.getRate = options.getRate;
}
return this;
}
/**
* Instantiate options from object.
* @param {Object} options
* @returns {PeerOptions}
*/
static fromOptions(options) {
return new this().fromOptions(options);
}
/**
* Get the chain height.
* @private
* @returns {Number}
*/
static getHeight() {
return 0;
}
/**
* Test whether the chain is synced.
* @private
* @returns {Boolean}
*/
static isFull() {
return false;
}
/**
* Create a version packet nonce.
* @private
* @param {String} hostname
* @returns {Buffer}
*/
static createNonce(hostname) {
return common.nonce();
}
/**
* Test whether version nonce is ours.
* @private
* @param {Buffer} nonce
* @returns {Boolean}
*/
static hasNonce(nonce) {
return false;
}
/**
* Get fee rate for txid.
* @private
* @param {Hash} hash
* @returns {Rate}
*/
static getRate(hash) {
return -1;
}
}
/**
* Request Entry
* @ignore
*/
class RequestEntry {
/**
* Create a request entry.
* @constructor
*/
constructor() {
this.timeout = 0;
this.jobs = [];
}
addJob(resolve, reject) {
this.jobs.push({ resolve, reject });
}
setTimeout(timeout) {
this.timeout = Date.now() + timeout;
}
reject(err) {
for (const job of this.jobs)
job.reject(err);
this.jobs.length = 0;
}
resolve(result) {
for (const job of this.jobs)
job.resolve(result);
this.jobs.length = 0;
}
}
/*
* Expose
*/
module.exports = Peer;