Piping Patterns: Multiplexing and Demultiplexing Object Streams
Learn implementation of client-side multiplexing and server-side demultiplexing with the help of a demo application.
Client side—multiplexing
Let’s start to build our application from the client side by creating a client.js
named module. This represents the part of the application that’s responsible for starting a child process (child_process
) and multiplexing its streams.
So, let’s start by defining the module. First, we need some dependencies.
import { fork } from 'child_process'import { connect } from 'net'
Next, let’s implement a function that performs the multiplexing of a list of sources.
function multiplexChannels (sources, destination) {let openChannels = sources.lengthfor (let i = 0; i < sources.length; i++) {sources[i].on('readable', function () { // (1)let chunkwhile ((chunk = this.read()) !== null) {const outBuff = Buffer.alloc(1 + 4 + chunk.length) // (2)outBuff.writeUInt8(i, 0)outBuff.writeUInt32BE(chunk.length, 1)chunk.copy(outBuff, 5)console.log(`Sending packet to channel: ${i}`)destination.write(outBuff) // (3)}}).on('end', () => { // (4)if (--openChannels === 0) {destination.end()}})}}
The multiplexChannels()
function takes the source streams to be multiplexed and the destination channel as input, and then it performs the following steps:
For each source stream, it registers a listener for the
readable
event, where we read the data from the stream using the non-flowing mode.When a chunk is read, we wrap it into a packet that contains, in order, 1 byte (
UInt8
) for the channel ID, 4 bytes (UInt32BE
) for the packet size, and then the actual data.When the packet is ready, we write it into the destination stream.
Finally, we register a listener for the end event so that we can terminate the destination stream when all the source streams have ...