/*!
* blockstore/file.js - file blockstore for hsd
* Copyright (c) 2019, Braydon Fuller (MIT License).
* https://github.com/handshake-org/hsd
*/
'use strict';
const {isAbsolute, resolve, join} = require('path');
const bdb = require('bdb');
const assert = require('bsert');
const fs = require('bfile');
const bio = require('bufio');
const Network = require('../protocol/network');
const consensus = require('../protocol/consensus');
const Headers = require('../primitives/headers');
const AbstractBlockStore = require('./abstract');
const {AbstractBatch} = AbstractBlockStore;
const {BlockRecord, FileRecord} = require('./records');
const layout = require('./layout');
const {types, prefixes} = require('./common');
const WRITE = 0;
const PRUNE = 1;
/**
* File Block Store
*
* @alias module:blockstore:FileBlockStore
* @abstract
*/
class FileBlockStore extends AbstractBlockStore {
/**
* Create a blockstore that stores blocks in files.
* @constructor
* @param {Object} [options]
*/
constructor(options) {
super(options);
assert(isAbsolute(options.location), 'Location not absolute.');
this.location = options.location;
this.indexLocation = resolve(this.location, './index');
this.db = bdb.create({
location: this.indexLocation,
cacheSize: options.cacheSize,
compression: false
});
this.name = 'fileblockstore';
this.version = 0;
this.maxFileLength = options.maxFileLength || 128 * 1024 * 1024;
assert(Number.isSafeInteger(this.maxFileLength),
'Invalid max file length.');
this.network = Network.primary;
if (options.network != null)
this.network = Network.get(options.network);
this.writing = Object.create(null);
}
/**
* Compares the number of files in the directory
* with the recorded number of files.
* @param {Number} type - The type of block data
* @private
* @returns {Promise<Object>}
*/
async check(type) {
const prefix = prefixes[type];
const regexp = new RegExp(`^${prefix}(\\d{5})\\.dat$`);
const all = await fs.readdir(this.location);
const dats = all.filter(f => regexp.test(f));
const filenos = dats.map(f => parseInt(f.match(regexp)[1]));
let missing = false;
for (const fileno of filenos) {
const rec = await this.db.get(layout.f.encode(type, fileno));
if (!rec) {
missing = true;
break;
}
}
return {missing, filenos};
}
/**
* Creates indexes from files for a block type. Reads the hash of
* the block data from the magic prefix, except for a block which
* the hash is read from the block header.
* @private
* @param {Number} type - The type of block data
* @returns {Promise}
*/
async _index(type) {
const {missing, filenos} = await this.check(type);
if (!missing)
return;
this.logger.info('Indexing block type %d...', type);
for (const fileno of filenos) {
const b = this.db.batch();
const filepath = this.filepath(type, fileno);
const data = await fs.readFile(filepath);
const reader = bio.read(data);
let magic = null;
let blocks = 0;
while (reader.left() >= 4) {
magic = reader.readU32();
// Move forward a byte from the last read
// if the magic doesn't match.
if (magic !== this.network.magic) {
reader.seek(-3);
continue;
}
let hash = null;
let position = 0;
let length = 0;
try {
length = reader.readU32();
if (type === types.BLOCK || type === types.MERKLE) {
position = reader.offset;
const headers = Headers.fromReader(reader);
hash = headers.hash();
reader.seek(length - consensus.HEADER_SIZE);
} else {
hash = reader.readHash();
position = reader.offset;
reader.seek(length);
}
} catch (err) {
this.logger.warning(
'Unknown block in file: %s, reason: %s',
filepath, err.message);
continue;
}
const blockrecord = new BlockRecord({
file: fileno,
position: position,
length: length
});
blocks += 1;
b.put(layout.b.encode(type, hash), blockrecord.encode());
}
const filerecord = new FileRecord({
blocks: blocks,
used: reader.offset,
length: this.maxFileLength
});
b.put(layout.f.encode(type, fileno), filerecord.encode());
await b.write();
this.logger.info('Indexed %d blocks (file=%s).', blocks, filepath);
}
}
/**
* Compares the number of files in the directory
* with the recorded number of files. If there are any
* inconsistencies it will reindex all blocks.
* @private
* @returns {Promise}
*/
async index() {
await this._index(types.BLOCK);
await this._index(types.UNDO);
await this._index(types.MERKLE);
}
/**
* This method ensures that both the block storage directory
* and index directory exist.
* before opening.
* @returns {Promise}
*/
async ensure() {
return fs.mkdirp(this.indexLocation);
}
/**
* Opens the file block store. It will regenerate necessary block
* indexing if the index is missing or inconsistent.
* @returns {Promise}
*/
async open() {
this.logger.info('Opening FileBlockStore...');
await this.db.open();
await this.db.verify(layout.V.encode(), this.name, this.version);
await this.index();
}
/**
* This closes the file block store and underlying
* indexing databases.
*/
async close() {
this.logger.info('Closing FileBlockStore...');
await this.db.close();
}
/**
* This method will determine the file path based on the file number
* and the current block data location.
* @private
* @param {Number} type - The type of block data
* @param {Number} fileno - The number of the file.
* @returns {String}
*/
filepath(type, fileno) {
const pad = 5;
let num = fileno.toString(10);
if (num.length > pad)
throw new Error('File number too large.');
while (num.length < pad)
num = `0${num}`;
let filepath = null;
const prefix = prefixes[type];
if (!prefix)
throw new Error('Unknown file prefix.');
filepath = join(this.location, `${prefix}${num}.dat`);
return filepath;
}
/**
* This method will select and potentially allocate a file to
* write a block based on the size and type.
* @private
* @param {Number} type - The type of block data
* @param {Number} length - The number of bytes
* @returns {Promise<Object>}
*/
async allocate(type, length) {
if (length > this.maxFileLength)
throw new Error('Block length above max file length.');
let fileno = 0;
let filerecord = null;
let filepath = null;
const last = await this.db.get(layout.F.encode(type));
if (last)
fileno = bio.readU32(last, 0);
filepath = this.filepath(type, fileno);
const rec = await this.db.get(layout.f.encode(type, fileno));
let touch = false;
if (rec) {
filerecord = FileRecord.decode(rec);
} else {
touch = true;
filerecord = new FileRecord({
blocks: 0,
used: 0,
length: this.maxFileLength
});
}
if (filerecord.used + length > filerecord.length) {
fileno += 1;
filepath = this.filepath(type, fileno);
touch = true;
filerecord = new FileRecord({
blocks: 0,
used: 0,
length: this.maxFileLength
});
}
if (touch) {
const fd = await fs.open(filepath, 'w');
await fs.close(fd);
}
return {fileno, filerecord, filepath};
}
/**
* This method stores merkle block data in files.
* @param {Buffer} hash - The block hash
* @param {Buffer} data - The block data
* @returns {Promise<Boolean>}
*/
async writeMerkle(hash, data) {
return this._write(types.MERKLE, hash, data);
}
/**
* This method stores block undo coin data in files.
* @param {Buffer} hash - The block hash
* @param {Buffer} data - The block data
* @returns {Promise<Boolean>}
*/
async writeUndo(hash, data) {
return this._write(types.UNDO, hash, data);
}
/**
* This method stores block data in files.
* @param {Buffer} hash - The block hash
* @param {Buffer} data - The block data
* @returns {Promise<Boolean>}
*/
async writeBlock(hash, data) {
return this._write(types.BLOCK, hash, data);
}
/**
* This method stores block data in files with by appending
* data to the last written file and updating indexes to point
* to the file and position.
* @private
* @param {Number} type - The type of block data
* @param {Buffer} hash - The block hash
* @param {Buffer} data - The block data
* @returns {Promise<Boolean>} - Whether the data was written.
*/
async _write(type, hash, data) {
if (this.writing[type])
throw new Error('Already writing.');
this.writing[type] = true;
if (await this.db.has(layout.b.encode(type, hash))) {
this.writing[type] = false;
return false;
}
let mlength = 8;
// Hash for a block is not stored with
// the magic prefix as it's read from the header
// of the block data.
if (type !== types.BLOCK && type !== types.MERKLE)
mlength += 32;
const blength = data.length;
const length = data.length + mlength;
const bwm = bio.write(mlength);
bwm.writeU32(this.network.magic);
bwm.writeU32(blength);
if (type !== types.BLOCK && type !== types.MERKLE)
bwm.writeHash(hash);
const magic = bwm.render();
const {
fileno,
filerecord,
filepath
} = await this.allocate(type, length);
const mposition = filerecord.used;
const bposition = filerecord.used + mlength;
const fd = await fs.open(filepath, 'r+');
let mwritten = 0;
let bwritten = 0;
try {
mwritten = await fs.write(fd, magic, 0, mlength, mposition);
bwritten = await fs.write(fd, data, 0, blength, bposition);
} finally {
await fs.close(fd);
}
if (mwritten !== mlength) {
this.writing[type] = false;
throw new Error('Could not write block magic.');
}
if (bwritten !== blength) {
this.writing[type] = false;
throw new Error('Could not write block.');
}
filerecord.blocks += 1;
filerecord.used += length;
const b = this.db.batch();
const blockrecord = new BlockRecord({
file: fileno,
position: bposition,
length: blength
});
b.put(layout.b.encode(type, hash), blockrecord.encode());
b.put(layout.f.encode(type, fileno), filerecord.encode());
const last = bio.write(4).writeU32(fileno).render();
b.put(layout.F.encode(type), last);
await b.write();
this.writing[type] = false;
return true;
}
/**
* This method will retrieve merkle block data.
* @param {Buffer} hash - The block hash
* @returns {Promise<Buffer>}
*/
async readMerkle(hash) {
return this._read(types.MERKLE, hash);
}
/**
* This method will retrieve block undo coin data.
* @param {Buffer} hash - The block hash
* @returns {Promise<Buffer>}
*/
async readUndo(hash) {
return this._read(types.UNDO, hash);
}
/**
* This method will retrieve block data. Smaller portions of the
* block (e.g. transactions) can be read by using the offset and
* length arguments.
* @param {Buffer} hash - The block hash
* @param {Number} offset - The offset within the block
* @param {Number} length - The number of bytes of the data
* @returns {Promise}
*/
async readBlock(hash, offset, length) {
return this._read(types.BLOCK, hash, offset, length);
}
/**
* This methods reads data from disk by retrieving the index of
* the data and reading from the corresponding file and location.
* @private
* @param {Number} type - The type of block data
* @param {Buffer} hash - The block hash
* @param {Number} [offset] - The offset within the block
* @param {Number} [length] - The number of bytes of the data
* @returns {Promise<Buffer>}
*/
async _read(type, hash, offset, length) {
const raw = await this.db.get(layout.b.encode(type, hash));
if (!raw)
return null;
const blockrecord = BlockRecord.decode(raw);
const filepath = this.filepath(type, blockrecord.file);
let position = blockrecord.position;
if (offset)
position += offset;
if (!length && offset > 0)
length = blockrecord.length - offset;
if (!length)
length = blockrecord.length;
if (offset + length > blockrecord.length)
throw new Error('Out-of-bounds read.');
const data = Buffer.alloc(length);
const fd = await fs.open(filepath, 'r');
let bytes = 0;
try {
bytes = await fs.read(fd, data, 0, length, position);
} finally {
await fs.close(fd);
}
if (bytes !== length)
throw new Error('Wrong number of bytes read.');
return data;
}
/**
* This will free resources for storing merkle block data.
* @param {Buffer} hash - The block hash
* @returns {Promise<Boolean>}
*/
async pruneMerkle(hash) {
return this._prune(types.MERKLE, hash);
}
/**
* This will free resources for storing the block undo coin data.
* @param {Buffer} hash - The block hash
* @returns {Promise<Boolean>}
*/
async pruneUndo(hash) {
return this._prune(types.UNDO, hash);
}
/**
* This will free resources for storing the block data.
* @param {Buffer} hash - The block hash
* @returns {Promise<Boolean>}
*/
async pruneBlock(hash) {
return this._prune(types.BLOCK, hash);
}
/**
* This will free resources for storing the block data. The block
* data may not be deleted from disk immediately, the index for the
* block is removed and will not be able to be read. The underlying
* file is unlinked when all blocks in a file have been pruned.
* @private
* @param {Number} type - The type of block data
* @param {Buffer} hash - The block hash
* @returns {Promise<Boolean>}
*/
async _prune(type, hash) {
const braw = await this.db.get(layout.b.encode(type, hash));
if (!braw)
return false;
const blockrecord = BlockRecord.decode(braw);
const fraw = await this.db.get(layout.f.encode(type, blockrecord.file));
if (!fraw)
return false;
const filerecord = FileRecord.decode(fraw);
filerecord.blocks -= 1;
const b = this.db.batch();
if (filerecord.blocks === 0)
b.del(layout.f.encode(type, blockrecord.file));
else
b.put(layout.f.encode(type, blockrecord.file), filerecord.encode());
b.del(layout.b.encode(type, hash));
await b.write();
if (filerecord.blocks === 0)
await fs.unlink(this.filepath(type, blockrecord.file));
return true;
}
/**
* This will check if merkle block data has been stored
* and is available.
* @param {Buffer} hash - The block hash
* @returns {Promise}
*/
async hasMerkle(hash) {
return await this.db.has(layout.b.encode(types.MERKLE, hash));
}
/**
* This will check if a block undo coin has been stored
* and is available.
* @param {Buffer} hash - The block hash
* @returns {Promise}
*/
async hasUndo(hash) {
return await this.db.has(layout.b.encode(types.UNDO, hash));
}
/**
* This will check if a block has been stored and is available.
* @param {Buffer} hash - The block hash
* @returns {Promise}
*/
async hasBlock(hash) {
return await this.db.has(layout.b.encode(types.BLOCK, hash));
}
/**
* Create batch.
* @returns {FileBatch}
*/
batch() {
return new FileBatch(this);
}
}
/**
* Batch operations for fileblockstore.
* Currently, this is not meant for atomicity or performance improvements,
* but to have better interface for chaindb.
* Proper implementation could use single batch for
* leveldb updates after all file writes.
* @alias module:blockstore.FileBatch
*/
class FileBatch extends AbstractBatch {
/**
* Create AbstractBatch.
* @constructor
* @param {FileBlockStore} blocks
*/
constructor(blocks) {
super();
this.blocks = blocks;
this.writes = [];
this.prunes = [];
this.committedWrites = false;
this.committedPrunes = false;
}
get written() {
return this.committedWrites && this.committedPrunes;
}
/**
* Write merkle block data to the batch.
* @param {Buffer} hash
* @param {Buffer} data
* @returns {this}
*/
writeMerkle(hash, data) {
this.writes.push(new WriteOp(types.MERKLE, hash, data));
return this;
}
/**
* Write undo coin data to the batch.
* @param {Buffer} hash
* @param {Buffer} data
* @returns {this}
*/
writeUndo(hash, data) {
this.writes.push(new WriteOp(types.UNDO, hash, data));
return this;
}
/**
* Write block data to the batch.
* @param {Buffer} hash
* @param {Buffer} data
* @returns {this}
*/
writeBlock(hash, data) {
this.writes.push(new WriteOp(types.BLOCK, hash, data));
return this;
}
/**
* Remove merkle block data from the batch.
* @param {Buffer} hash
* @returns {this}
*/
pruneMerkle(hash) {
this.prunes.push(new PruneOp(types.MERKLE, hash));
return this;
}
/**
* Remove undo data from the batch.
* @param {Buffer} hash
* @returns {this}
*/
pruneUndo(hash) {
this.prunes.push(new PruneOp(types.UNDO, hash));
return this;
}
/**
* Prune block data from the batch.
* @param {Buffer} hash
* @returns {this}
*/
pruneBlock(hash) {
this.prunes.push(new PruneOp(types.BLOCK, hash));
return this;
}
/**
* Clear the batch.
* @returns {this}
*/
clear() {
assert(!this.written, 'Already written all.');
this.writes.length = 0;
this.prunes.length = 0;
return this;
}
/**
* Commit only writes.
* @returns {Promise}
*/
async commitWrites() {
assert(!this.committedWrites, 'Already written writes.');
for (const op of this.writes)
await this.blocks._write(op.writeType, op.hash, op.data);
this.committedWrites = true;
}
/**
* Commit only prunes.
* @returns {Promise}
*/
async commitPrunes() {
assert(!this.committedPrunes, 'Already written prunes.');
for (const op of this.prunes)
await this.blocks._prune(op.pruneType, op.hash);
this.committedPrunes = true;
}
/**
* Commit both.
* @returns {Promise}
*/
async commit() {
assert(!this.written, 'Already written all.');
await this.commitWrites();
await this.commitPrunes();
}
}
class WriteOp {
constructor(type, hash, data) {
this.type = WRITE;
this.writeType = type;
this.hash = hash;
this.data = data;
}
}
class PruneOp {
constructor(type, hash) {
this.type = PRUNE;
this.pruneType = type;
this.hash = hash;
}
}
/*
* Expose
*/
module.exports = FileBlockStore;