A Naïve Peer Discovery Implementation with Node.js and ZeroMQ
Recently, I’ve been thinking about building a simple blockchain implementation to better understand the pattern. As blockchain systems are distributed, the first challenge that came to mind was how to allow peers in the network to discover one-another. My ultimate goal of this exercise is to keep things as simple as possible, so, my first thought was to use UDP multicast for peer discovery. In essence, every node would simply emits its state to a multicast group every n seconds. And while UDP multicast is the simplest approach, unfortunately, it does not work over the internet since most network providers do not handle UDP multicast packets.
The next logical step in my attempt to enable peer discovery in a network was to use a more robust protocol. For this, I opted to use ZeroMQ which is still ‘socket’ oriented but is more feature rich and supports smart patterns like publish and subscribe.
For this implementation, there are many similarities to the UDP multicast approach. Every node in the network periodically emits the location of all of it’s known peers. By doing this, any new node that listens to these emissions will be able to discover all known nodes and begin emitting this state itself – in effect joining the network.
Let’s take a look at a simple implementation of this pattern using Node.js and ZeroMQ.
const process = require("process");
const zmq = require("zeromq");
const DISCOVERY_ADDRESS = "tcp://*:20000";
const MESSAGE_ADDRESS = `tcp://*:30000`;
const TOPIC_DISCOVERY = "0";
const TOPIC_MESSAGE = "1";
We have two known ports for communication. Port 20000
is used from transmitting peer locations and port 30000
is for transmitting messages. We also have some known topics that we use to distinguish between message types.
const NODES = [];
const args = process.argv.slice(2);
NODES.push(process.env["HOSTNAME"], ...args);
Now, every node knows it’s own hostname. It listens for it’s own updates but every node must also be given the hostname of at least one other valid node. By giving a node the location of another node, it can listen for updates from the network and join the network itself.
const discoveryPublisher = zmq.socket("pub");
const messagePublisher = zmq.socket("pub");
const subscriber = zmq.socket("sub");
We create 3 socket contexts. Each socket can bind to multiple ports, but in this example to avoid receiving duplicate messages, we use two sockets for publishing information for separate topics and one socket for listening to everything.
function sendMessageUpdate() {
const message = Buffer.from(`PING! from host ${process.env["HOSTNAME"]}`);
console.info(`Sending message: "${message}"`);
messagePublisher.send([TOPIC_MESSAGE, message]);
}
function sendDiscoveryUpdate() {
const message = Buffer.from(NODES.join(","));
console.info(`Sending discovery update: "${message}"`);
discoveryPublisher.send([TOPIC_DISCOVERY, message]);
}
We have a couple functions to handle publishing data on our two sockets with our two topics. These functions are called on an interval later on.
function handleDiscoveryTopic(message) {
console.info(`Discovery update received - ${message}`);
const messageString = String(message);
nodes = messageString.split(",");
nodes.forEach(node => {
if (NODES.includes(node)) return;
NODES.push(node);
subscriber.connect(`tcp://${node}:20000`);
subscriber.connect(`tcp://${node}:30000`);
});
}
function handleMessageTopic(message) {
console.info(`Message received - ${message}`);
}
We have two functions to handle incoming messages from our two topics. The message handler is quite simple – it just prints the message that was received to the console. The discovery topic handler exists to take a message containing the list of all known peers (from another peer) and update it’s own list of known peers. By doing this, if any of this node’s current peers updates its own list of peers, then this node will receive that update as well. In effect, changes to the list of peers propagate through the network.
subscriber.on("message", function(topic, message) {
const topicString = String(topic);
switch (topicString) {
case TOPIC_DISCOVERY:
handleDiscoveryTopic(message);
break;
case TOPIC_MESSAGE:
handleMessageTopic(message);
break;
default:
console.warn(`Unknown topic '${topic}'. Dropping message.`);
break;
}
});
Our subscriber socket listens for the message event and calls the relevant function with the data.
discoveryPublisher.bindSync(DISCOVERY_ADDRESS);
messagePublisher.bindSync(MESSAGE_ADDRESS);
NODES.forEach(node => {
subscriber.connect(`tcp://${node}:20000`);
subscriber.connect(`tcp://${node}:30000`);
});
subscriber.subscribe(TOPIC_MESSAGE);
subscriber.subscribe(TOPIC_DISCOVERY);
Here, we bind our publisher contexts to the relevant address and then we connect and subscribe our subscriber socket to all initially known peers (must be at least itself and one other!).
console.log(`TCP socket listening, hostname: ${process.env["HOSTNAME"]}`);
setInterval(() => {
console.log("===================");
sendMessageUpdate();
sendDiscoveryUpdate();
console.log("===================");
}, 2500);
Finally, we kick off the node to start sending updates every 2.5 seconds.
Here is the full code snippet in all of its glory:
const process = require("process");
const zmq = require("zeromq");
const DISCOVERY_ADDRESS = "tcp://*:20000";
const MESSAGE_ADDRESS = `tcp://*:30000`;
// We keep track of all peers including our own hostname
const NODES = [];
const args = process.argv.slice(2);
NODES.push(process.env["HOSTNAME"], ...args);
const TOPIC_DISCOVERY = "0";
const TOPIC_MESSAGE = "1";
// We us different sockets for different message types.
// Otherwise, we'll get duplicate messages.
const discoveryPublisher = zmq.socket("pub");
const messagePublisher = zmq.socket("pub");
const subscriber = zmq.socket("sub");
// Send a simple message
function sendMessageUpdate() {
const message = Buffer.from(`PING! from host ${process.env["HOSTNAME"]}`);
console.info(`Sending message: "${message}"`);
messagePublisher.send([TOPIC_MESSAGE, message]);
}
// Send our known peers
function sendDiscoveryUpdate() {
const message = Buffer.from(NODES.join(","));
console.info(`Sending discovery update: "${message}"`);
discoveryPublisher.send([TOPIC_DISCOVERY, message]);
}
// Update our list of known peers and connect to knew ones
// if we get a discovery topic message
function handleDiscoveryTopic(message) {
console.info(`Discovery update received - ${message}`);
const messageString = String(message);
nodes = messageString.split(",");
nodes.forEach(node => {
if (NODES.includes(node)) return;
NODES.push(node);
subscriber.connect(`tcp://${node}:20000`);
subscriber.connect(`tcp://${node}:30000`);
});
}
// Handle simple message topic messages
function handleMessageTopic(message) {
console.info(`Message received - ${message}`);
}
// Listen for messages on socket and call function related to topic
subscriber.on("message", function(topic, message) {
const topicString = String(topic);
switch (topicString) {
case TOPIC_DISCOVERY:
handleDiscoveryTopic(message);
break;
case TOPIC_MESSAGE:
handleMessageTopic(message);
break;
default:
console.warn(`Unknown topic '${topic}'. Dropping message.`);
break;
}
});
discoveryPublisher.bindSync(DISCOVERY_ADDRESS);
messagePublisher.bindSync(MESSAGE_ADDRESS);
// Connect to our initially known nodes
NODES.forEach(node => {
subscriber.connect(`tcp://${node}:20000`);
subscriber.connect(`tcp://${node}:30000`);
});
subscriber.subscribe(TOPIC_MESSAGE);
subscriber.subscribe(TOPIC_DISCOVERY);
console.log(`TCP socket listening, hostname: ${process.env["HOSTNAME"]}`);
// Start sending messages every 2.5s
setInterval(() => {
console.log("===================");
sendMessageUpdate();
sendDiscoveryUpdate();
console.log("===================");
}, 2500);
Since I am reusing ports to keep the logic simple, I decided to use docker and docker-compose to create a 4 node network. If you want to run the code, you can see the full example to follow along here https://github.com/nrempel/naive-peer-discovery.
Take note of the docker-compose file:
version: '3'
services:
node_1:
build: .
command: node discovery.js node_2
environment:
HOSTNAME: node_1
node_2:
build: .
command: node discovery.js node_3
environment:
HOSTNAME: node_2
node_3:
build: .
command: node discovery.js node_4
environment:
HOSTNAME: node_3
node_4:
build: .
command: node discovery.js node_1
environment:
HOSTNAME: node_4
Here, node_1
is initially passed the location of node_2
. And node_2
is given the location of node_3
. The pattern continues so that you can see there is a full circuit if you were to examine the nodes as a graph. Since every node publishes all of its known peers, running these 4 nodes should have the effect of all 4 nodes discovering all other nodes in the network after allowing the state to propagate.
We can run the 4 nodes to verify that this is true:
$ docker-compose up | grep node_1_1
node_1_1 | TCP socket listening, hostname: node_1
node_1_1 | ===================
node_1_1 | Sending message: "PING! from host node_1"
node_1_1 | Sending discovery update: "node_1,node_2"
node_1_1 | ===================
node_1_1 | Message received - PING! from host node_1
node_1_1 | Discovery update received - node_1,node_2
node_1_1 | Message received - PING! from host node_2
node_1_1 | Discovery update received - node_2,node_3,node_4,node_1
node_1_1 | Message received - PING! from host node_3
node_1_1 | Discovery update received - node_3,node_4,node_1,node_2
node_1_1 | ===================
node_1_1 | Sending message: "PING! from host node_1"
node_1_1 | Sending discovery update: "node_1,node_2,node_3,node_4"
node_1_1 | ===================
node_1_1 | Message received - PING! from host node_1
node_1_1 | Discovery update received - node_1,node_2,node_3,node_4
node_1_1 | Message received - PING! from host node_4
node_1_1 | Discovery update received - node_4,node_1,node_2,node_3
node_1_1 | Message received - PING! from host node_2
node_1_1 | Discovery update received - node_2,node_3,node_4,node_1
node_1_1 | Message received - PING! from host node_3
node_1_1 | Discovery update received - node_3,node_4,node_1,node_2
As you can see, on the first discovery update node_1
sends, it is only sending the hostname of itself and node_2
which it was provided. Subsequently, it receives its own update but quickly also receives a discovery update from another node that contains the location of all other nodes. By the second interval, node_1
knows the location of all other nodes in the network and begins listening to all of those nodes for messages but also publishing the list of host names for others to see. We only inspected the logs from node_1
but you can verify that the same is true for all other processes.
I would call this a success. We have a fairly simple pattern for peer discovery with minimal overhead.
There are still a few questions and caveats: a node can be easily added to the network this way, but it will never be removed. I can add heuristics to ignore failing nodes but this means that if a node starts failing and all other nodes add it to a blacklist, even if the failing node recovers, it may never be able rejoin the network. Additionally, is it a problem that a node can add any number of peers to the network for free? This could have performance consequences – especially if a bad actor wanted to take down the network. Since I’m exploring a blockchain implementation, perhaps the list of peers could itself be written to the ledger.
I’ll continue to consider these problems and explore a solution.
Discussion