...

/

Piping Patterns: Multiplexing and Demultiplexing Object Streams

Piping Patterns: Multiplexing and Demultiplexing Object Streams

Learn implementation of client-side multiplexing and server-side demultiplexing with the help of a demo application.

Client side—multiplexing

Let’s start to build our application from the client side by creating a client.js named module. This represents the part of the application that’s responsible for starting a child process (child_process) and multiplexing its streams.
So, let’s start by defining the module. First, we need some dependencies.

import { fork } from 'child_process'
import { connect } from 'net'

Next, let’s implement a function that performs the multiplexing of a list of sources.

Press + to interact
function multiplexChannels (sources, destination) {
let openChannels = sources.length
for (let i = 0; i < sources.length; i++) {
sources[i]
.on('readable', function () { // (1)
let chunk
while ((chunk = this.read()) !== null) {
const outBuff = Buffer.alloc(1 + 4 + chunk.length) // (2)
outBuff.writeUInt8(i, 0)
outBuff.writeUInt32BE(chunk.length, 1)
chunk.copy(outBuff, 5)
console.log(`Sending packet to channel: ${i}`)
destination.write(outBuff) // (3)
}
})
.on('end', () => { // (4)
if (--openChannels === 0) {
destination.end()
}
})
}
}

The multiplexChannels() function takes the source streams to be multiplexed and the destination channel as input, and then it performs the following steps:

  1. For each source stream, it registers a listener for the readable event, where we read the data from the stream using the non-flowing mode.

  2. When a chunk is read, we wrap it into a packet that contains, in order, 1 byte ( UInt8) for the channel ID, 4 bytes ( UInt32BE) for the packet size, and then the actual data.

  3. When the packet is ready, we write it into the destination stream.

  4. Finally, we register a listener for the end event so that we can terminate the destination stream when all the source streams have ...