Bidirectional Concurrent Streaming in HTTP/2 #71
-
Is it possible to achieve bidirectional streaming in an HTTP/2 POST request using HTTPx? Here is an example scenario I am working on: # Create a new HTTPX session
http = HTTPX
.plugin(:stream)
.with(fallback_protocol: "h2")
# Create an Enumerator that yields JSON objects
data = Enumerator.new do |yielder|
5.times do |i|
chunk = "#{{ message: "Hello #{i}", timestamp: Time.now.to_i }.to_json}\n"
puts "Sending chunk #{i}: #{chunk}"
yielder << chunk
sleep 1 # Simulate some delay between chunks
end
end
# Send the request with streaming body
response = http.post(
"http://localhost:3000/ndjson",
body: data,
headers: {
"Content-Type" => "application/x-ndjson"
},
stream: true
)
response.each_line do |line|
parsed_response = JSON.parse(line)
puts "Received response: #{parsed_response}"
end This code sends chunks of NDJSON to an echo server that returns the processed chunk, formatted as However, the responses are only printed after the entire request body is sent. The output looks like this:
What I want to achieve is immediate processing of each chunk, with output like this:
Is there a way to achieve this behavior using HTTPx or any of its plugins? If not, are there alternative approaches within HTTPx or recommendations for other libraries to enable such bidirectional streaming in HTTP/2? FYI, the following nodejs code worked as expected, so I think the problem is not at the server const http2 = require('http2');
// Form entries to send
const formEntries = [
{ name: 'John Doe' },
{ email: 'john.doe@example.com' },
{ message: 'This is a test message' },
];
// Create an HTTP/2 client
const client = http2.connect('http://localhost:3000');
// Create and send request
const req = client.request({
':path': '/ndjson',
':method': 'POST',
'content-type': 'application/x-ndjson',
});
let responseData = '';
req.on('data', (chunk) => {
console.log('chunk', chunk.toString())
responseData += chunk.toString();
});
req.on('end', () => {
client.close();
});
req.on('error', (err) => {
console.error('Request error:', err);
client.close();
});
// Send each form entry with a delay to simulate streaming
let index = 0;
const sendNextEntry = () => {
if (index < formEntries.length) {
const entry = JSON.stringify(formEntries[index]) + '\n'; // NDJSON format
req.write(entry);
console.log(`Sent form entry: ${entry.trim()}`);
index++;
setTimeout(sendNextEntry, 1000); // Simulate delay between entries
} else {
req.end(); // End the request after all entries are sent
}
};
sendNextEntry(); Its output is
|
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 23 replies
-
Hi @AbanoubGhadban , I believe it's possible, at the very least with the stream plugin. The this can probably work without plugins, as long as you provide the corresponding custom functionality in a plugin. FWIW this is an example how you could send the body in chunks (it may require adjustments): req_body = form_entries.lazy.map do |entry|
entry = JSON.dump(entry) + '\n'
sleep(1)
puts "Sent form entry: #{entry.strip}"
entry
end
stream_session = HTTPX.plugin(:stream)
response = .post("https://localhost:3000/ndjson", headers: { "content-type" => "application/nd+json" }, body: req_body, stream: true)
response.each do |chunk|
puts "chunk: #{chunk}"
end |
Beta Was this translation helpful? Give feedback.
Thx for that! I've started a new plugin in the
gh-disc-71
branch. You can follow the specs here to see how you can use it. I'd kindly ask you to test this branch in order to find loose ends.