Kademlia (My Implementation)¶
2020-05-21
If you haven’t yet read it, please see my overview of Kademlia [ACa].
Kademlia is obviously an idea that I have a fairly deep interest in at this point, so I spent the last couple weeks writing up another implementation of it. This one managed to solve many of the problems I had in the past by using a slightly different concurrency model, by focusing narrowly on UDP, and by using equivalent alternatives to some structures defined in the Kademlia specification.
What I want to do in this post is walk through how this implementation works, what progress it makes on my larger goal of developing Retrion, and what problems it has that remain to be solved.
Code can be found here [ACb], in the files that use protocol version 4. Version 5 is my current work to try and extend it.
How It Works¶
Concurrency Model¶
The node’s world is divided into four threads (plus the main thread, but that doesn’t count). Threads 1 and 2 listen for incoming messages on UDP/IPv4 and UDP/IPv6 respectively. Thread 3, the reaction thread, is responsible for reacting and responding to messages. Thread 4, the heartbeat thread, is responsible for scheduled events in the future. It implements a sort of event loop using the sched module.
The listener threads exist primarily out of laziness, since select would have easily let me consolidate them. It should be noted, however, that having multiple threads like this will allow for future abstraction. If each listening socket gets its own thread, then you don’t need to deal as much with how those transport methods differ. A websocket, for example, would have a very different interface when compared with a UDP one or a Bluetooth one.
Serialization¶
Serialization was done with u-msgpack-python, which is a particularly nice implementation of the MessagePack serialization scheme. It’s easiest to think of it as “JSON in binary, with some customizability”.
Of particular usefulness, MessagePack allows you to define how custom objects should be serialized and deserialized using their concept of “extension objects”. Basically they throw a header onto whatever bytestream you hand them and they can use that header to send the stream to the correct factory function. I’m going to borrow their notation for how to represent these bytestreams:
one byte:
+--------+
| |
+--------+
a variable number of bytes:
+========+
| |
+========+
variable number of objects stored in MessagePack format:
+~~~~~~~~~~~~~~~~~+
| |
+~~~~~~~~~~~~~~~~~+
For an example of how this looks, let’s show the PING message:
+========+--------+--------+--------+--------+--------+========+========+
| header | 0 | 4 |compress|10010011| 1 |sequence|senderID|
+========+--------+--------+--------+--------+--------+========+========+
|-------------compressed------------|
Let’s look at that cell-by-cell. The header is a few bytes that MessagePack tacks on to the beginning. It essentially says “I am an extension object __ bytes in length.”
The first value we actually give it is that 0, which is the extension typecode I defined for Message objects.
The next cell indicates the compression method used for the remainder of the bytestream. This is negotiated between nodes in a HELLO message. The ones currently defined are none, ZLIB, GZIP, LZMA, and BZ2, taking their values in that order.
The next cell is a header for an array. The first four bits indicate that it is an array, and the second four bits indicate the length of the array.
The next cell is a 1, which is the message type assigned to PING.
The next cell is the sequence number of the packet. This number should monotonically increase and is assigned by the sending node. It is what gets referred to if a response gets sent back.
The next cell is the sender ID. By default this is just 20 random bytes. In future implementations it will likely be a hash of some node information in a canonical order.
This entire array is unpacked and then handed to a factory function to regenerate that message object.
Reactions¶
When nodes reconstruct a message, they find that each have a react()
method. For the simplest example, let’s look at the IDENTIFY message:
class Message(BaseMessage):
...
def react(self, node, addr, sock):
"""Delay sending a PING to the sending node, since the connection is clearly active if you got a message."""
try:
event = node.timeouts.pop((self.sender, ))
node.schedule.cancel(event)
except (KeyError, ValueError):
pass
def ping():
node._send(sock, addr, PingMessage())
node.timeouts[(self.sender, )] = node.schedule.enter(60, 2, ping)
# enter() takes in delay, priority, callable
@Message.register(int(MessageType.IDENTIFY))
class IdentifyMessage(PingMessage):
def __init__(self, compress: int = 0, seq: Optional[int] = None, sender: bytes = b''):
super().__init__(compress, seq, sender)
self.message_type = MessageType.IDENTIFY
def react(self, node, addr, sock):
"""Send a HELLO back in leiu of an ACK."""
node.logger.debug("Got an IDENTIFY request from %r (%r)", addr, self)
node._send_hello(sock, addr)
Message.react(self, node, addr, sock)
A node who sends IDENTIFY is saying “hey, I have your address but I
don’t know who you are. Would you mind introducing yourself?” So this
event gets processed in two phases. The first is just the node sending a
HELLO message. The second part is to modify the node’s schedule. By
default, nodes will send a PING after 60 seconds of dead air to make
sure that their peer is still alive. If you get a message this timer
needs to be reset, which is what happens in Message.react().
Responses¶
Several messages also have a method called react_response(). The
idea here is that if you received an ACK, probably that means something
to some other message. The way this gets triggered looks like:
@Message.register(int(MessageType.ACK))
class AckMessage(Message):
...
def react(self, node, addr, sock):
"""Clear the message from node.awaiting_ack and call the relevant react_response() method."""
node.logger.debug("Got an %sACK from %r (%r)", 'N' if self.status else '', addr, self)
super().react(node, addr, sock)
try:
node.routing_table.member_info[self.sender].local.hits += 1
except KeyError:
pass
if self.resp_seq in node.awaiting_ack:
node.awaiting_ack[self.resp_seq].react_response(self, node, addr, sock)
del node.awaiting_ack[self.resp_seq]
Let’s break that down.
The first step is just like above, where it resets that PING timer for that peer.
After that, we’d like to record that a message successfully went through. So, if we have the sender in our routing table, we look up our local information on that node (as opposed to public data like their ID or listening addresses) and record that there was a hit. This information can be used when pruning peers (which I did not yet implement).
The last step is to see if the node was expecting this acknowledgement
for something. If it was, we call the message in question’s
react_response() method. Currently there are only two messages that
use this feature: FIND_NODE and FIND_KEY. Let’s look at the former:
@Message.register(int(MessageType.FIND_NODE))
class FindNodeMessage(Message):
...
def react_response(self, ack, node, addr, sock, message_constructor=PingMessage):
"""Attempt to connect to each of the nodes you were told about."""
node.logger.debug("Got a response to a FIND_NODE from %r (%r, %r)", addr, self, ack)
for info in ack.data:
name = info.name
if name != node.id and name not in node.routing_table:
for address in info.addresses:
try:
if node.routing_table.add(name, address.args, address.addr_type):
node._send(address.addr_type, address.args, message_constructor())
node.routing_table.member_info[name].public = info
break
except Exception:
continue
A FIND_NODE message expects to get back an array of GlobalPeerInfo
objects, the details of which don’t matter very much here. The only
important things for our purposes is that it contains a list of
listening addresses and a list of supported compression methods in order
of preference.
For each of objects, we check that it’s not someone we already know about or ourself. If not, then we try to send a message to each address in sequence, ignoring errors.
Design Problems¶
The math on \(b\)¶
The idea behind \(b\), Kademlia’s accelleration parameter, is that you can trade between routing efficiency and routing table size by looking at \(b\)-bit symbols rather than 1-bit ones. So if you are ID 101010…, \(b=1\) would have you making a routing table with entries like:
0…
01…
010…
0101…
01010…
010101…
etc.
If \(b=2\), however, you would end up with a routing table like:
00…
01…
11…
0100…
0101…
0111…
010100…
010101…
010111…
etc.
The trade off you make here is that the number of queries you need to find a node is \(\mathcal{O}(\log_{2^b}(n))\) whereas routing table size is \(\mathcal{O}(2^b \cdot \log_{2^b}(n))\). A more precise guess for the routing table is that it would scale as \(\left\lceil\tfrac{(2^b - 1)}{b}\right\rceil \cdot \log_{2^b}(n)\), but I haven’t actually sat down to check that math, so take it with a grain of salt.
The problem is, I’m doing the math wrong in my routing table for
\(b > 1\), and I haven’t been able to identify where. All I know is
that occasionally I get an IndexError and my routing table has to
fall back to searching over the set of all known peers instead of a
particular bucket.
The Crappy Broadcast Algorithm¶
The broadcast method I used in this implementation is naive at best. It uses a simple flooding model using the following rules:
If you saw this broadcast already (identified by sender, sequence pairs), ignore this
For each peer in your routing table, echo this message to them unless
They are the original sender, as identified by the sender
They are the node that sent it to you, as identified by the addr, sock pair in
react()
This ends up with \(\mathcal{O}(n^2)\) messages sent, and given the topology of a Kademlia network, \(\mathcal{O}(log(n))\) hops before reaching the final node.
This is obviously sub-optimal, and is a topic I will explore further when I manage to get a write-up for Solution for the broadcasting in Kademlia peer-to-peer overlay by Czirkos and Hosszú.
What’s This \(\alpha\) Parameter?¶
Kademlia defines a concurrency parameter called \(\alpha\), which essentially says “thou shalt send \(\alpha\) messages per request.” Confusingly, some messages use \(k\) for their concurrency parameter instead. The main culprit there is STORE, but rather than figure out where to do what, I just assumed that \(k=\alpha\). It’s a relatively small change to fix that, so presumably I will before I do anything major with this.
Retrion Progress¶
Recall the initial goals of Retrion:
An object model and serialization standard that can be used easily in most languages
A simple high-level API (along with a fine-grained one)
\(log(n)\) distributed hash table get/sets
Support for disabling DHT support network-wide
Support for “edge nodes” which do not store DHT data except as a cache
\(log(n)\) broadcast delivery time
\(log(n)\) routed message delivery
Support for multicast routed messages
Support for in-protocol group messages
Transparent (to the developer) compression
transparent (to the user) encryption
Support for multiple transport protocols (TCP, UDP, etc.)
Support for “virtual transports” (using other nodes as a bridge)
A well-defined standard configuration space
Support for extension subprotocols and implementation-specific configurations
I would say that this implementation likely satisfies 1, 3, 6, 10, 14, and 15.
It certainly does not satisfy 4, 5, 7, 8, 9, 11, 12, or 13.
That means we need some kind of roadmap to delivering these other properties and addressing other deficiencies
4. Support for disabling DHT support network-wide¶
This is probably the easiest to do, as it just puts you into a mode similar to how Ethereum already works. Probably it’s as simple as adding a flag to the NetworkConfiguration object and having done with it.
5. Support for “edge nodes” which do not store DHT data except as a cache¶
This one is harder, as it presents some design challenges. My initial thought would be to have edge node status indicated by having the most significant bit of your ID be 1, but this presents a few problems:
How do you guarantee that every edge node is in someone’s routing table? Otherwise no broadcast support
This means that nodes have to decide that at ID-assignment time, unless we allow nodes to make new IDs
The use case for edge nodes is something that runs in a browser or phone. That means we need support for
roaming addresses, and
websocket or webrtc connections
For now, I think this is off the table. It definitely should be revisited in the future, but there are too many problems with the concept as it exists now to implement it.
7. \(log(n)\) routed message delivery¶
This one is trivial to implement, I just haven’t gotten to it yet. It’s almost a duplicate of the work I already do in FIND_NODE and STORE anyways, I just need to change the side effect.
Probably there are optimizable algorithms that I’ll need to look for.
8. Support for multicast routed messages¶
An naive implementation of this should be fairly easy, I would just need to change the target field in a message like above to hold an array of targets instead of just a target.
9. Support for in-protocol group messages¶
Leveraging the above, it should be possible to make a chatroom context or the like in-protocol. The real question is whether it should use a name registry at the network level which references some key in the DHT, or whether individual nodes should track their own rooms and be able to attach a room name to a multicast routed message. I kinda lean towards the latter, but we’ll see.
11. transparent (to the user) encryption¶
I have no idea how to approach this yet beyond broad concepts. It would be fantastic if you could have a bit in your GlobalPeerInfo object that says “hey, use the OWS protocol with me now”, or something similar. The only question is going to be how many methods are easily implementable in many languages. Probably the languages I will choose to care about here are C/C++, JavaScript, Python, and possibly Java or Kotlin.
12. Support for multiple transport protocols (TCP, UDP, etc.)¶
No idea how to approach this in terms of limiting its downsides. I’ll update when I have a confident idea of how to handle the local resource overhead (number of sockets the system lets you have, for example) of connection-based transports before this is really approachable. I think the way I’ll approach this is a two-pronged approach.
First I implement it under the assumption of UDPv4 being the minimum requirement to avoid network fragmentation.
In parallel I need to do some kind of formal analysis on network fragmentation risk if you allow some nodes to have incompatible lists of supported transports. It would be useful if your sandboxed browser code could interact with a large network of external nodes that communicate with more convenient transports than WebSockets, but I’m fairly worried about how many risks and inefficiencies that would expose.
13. Support for “virtual transports” (using other nodes as a bridge)¶
This is an idea that I might want to scrap. If routed messages are workable, then virtual transports are probably trivial to implement. The only question is gonna be how inefficient it is without explicitly setting up a bridge route.
Conclusions¶
I’ve got a long road ahead of me, but this is a pretty good start!
Citations
Olivia Appleton-Crocker. Kademlia (the protocol). URL: https://blog.oliviaappleton.com/posts/0002-Kademlia-protocol.
Olivia Appleton-Crocker. P2p-today/retrion-development: a development repository for a future release of the retrion protocol. URL: https://github.com/p2p-today/retrion-development.
Cite As
Click here to expand the bibtex entry.
@online{appleton_blog_0003, title = { Kademlia (My Implementation) }, author = { Olivia Appleton-Crocker }, language = { English }, version = { 1.1 }, date = { 2020-05-21 }, url = { https://blog.oliviaappleton.com/posts/0002-Kademlia-implementation } }