MessageBatcher.java
/*
* SPDX-FileCopyrightText: 2025 Lucimber UG
* SPDX-License-Identifier: Apache-2.0
*/
package com.lucimber.dbus.util;
import com.lucimber.dbus.message.OutboundMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.ScheduledFuture;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Message batcher for improved throughput by grouping multiple small messages.
*
* <p>This batcher collects outbound messages and sends them in batches to reduce system call
* overhead and improve network utilization. It implements adaptive batching based on message rate
* and size.
*
* <p>Features:
*
* <ul>
* <li>Time-based batching with configurable delay
* <li>Size-based batching with configurable limits
* <li>Adaptive batch sizing based on throughput
* <li>Performance metrics tracking
* </ul>
*/
public class MessageBatcher {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageBatcher.class);
// Configuration constants
private static final int DEFAULT_MAX_BATCH_SIZE = 16;
private static final int DEFAULT_MAX_BATCH_BYTES = 16 * 1024; // 16KB
private static final long DEFAULT_BATCH_DELAY_MS = 1; // 1ms
private static final int MIN_BATCH_SIZE = 1;
private static final int MAX_BATCH_SIZE = 64;
private final int maxBatchSize;
private final int maxBatchBytes;
private final long batchDelayMs;
// Current batch state
private final List<OutboundMessage> currentBatch;
private final AtomicInteger currentBatchBytes;
private final AtomicBoolean flushScheduled;
private ScheduledFuture<?> flushFuture;
// Performance metrics
private final AtomicLong messagesProcessed;
private final AtomicLong batchesSent;
private final AtomicLong bytesProcessed;
private final AtomicInteger currentBatchSizeTarget;
// Adaptive batching state
private long lastAdaptiveCheck;
private long lastMessageCount;
private static final long ADAPTIVE_CHECK_INTERVAL_MS = 1000; // 1 second
/** Creates a new message batcher with default configuration. */
public MessageBatcher() {
this(
DEFAULT_MAX_BATCH_SIZE,
DEFAULT_MAX_BATCH_BYTES,
Duration.ofMillis(DEFAULT_BATCH_DELAY_MS));
}
/**
* Creates a new message batcher with custom configuration.
*
* @param maxBatchSize maximum messages per batch
* @param maxBatchBytes maximum bytes per batch
* @param batchDelay maximum delay before flushing a batch
*/
public MessageBatcher(int maxBatchSize, int maxBatchBytes, Duration batchDelay) {
this.maxBatchSize = Math.min(Math.max(maxBatchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE);
this.maxBatchBytes = maxBatchBytes;
this.batchDelayMs = batchDelay.toMillis();
this.currentBatch = new ArrayList<>(this.maxBatchSize);
this.currentBatchBytes = new AtomicInteger();
this.flushScheduled = new AtomicBoolean();
this.messagesProcessed = new AtomicLong();
this.batchesSent = new AtomicLong();
this.bytesProcessed = new AtomicLong();
this.currentBatchSizeTarget =
new AtomicInteger(this.maxBatchSize / 2); // Start conservative
this.lastAdaptiveCheck = System.currentTimeMillis();
this.lastMessageCount = 0;
}
/**
* Adds a message to the current batch.
*
* @param ctx channel handler context
* @param message the message to batch
* @param estimatedSize estimated size of the message in bytes
* @return true if the message was batched, false if it should be sent immediately
*/
public synchronized boolean addMessage(
ChannelHandlerContext ctx, OutboundMessage message, int estimatedSize) {
// Large messages bypass batching
if (estimatedSize > maxBatchBytes / 2) {
LOGGER.trace(
"Message too large for batching ({}B), sending immediately", estimatedSize);
return false;
}
// Check if adding this message would exceed limits
boolean wouldExceedSize = currentBatch.size() >= currentBatchSizeTarget.get();
boolean wouldExceedBytes = currentBatchBytes.get() + estimatedSize > maxBatchBytes;
if (wouldExceedSize || wouldExceedBytes) {
// Flush current batch and start new one
flushBatch(ctx);
}
// Add message to batch
currentBatch.add(message);
currentBatchBytes.addAndGet(estimatedSize);
messagesProcessed.incrementAndGet();
bytesProcessed.addAndGet(estimatedSize);
// Schedule flush if not already scheduled
if (!flushScheduled.get() && flushScheduled.compareAndSet(false, true)) {
flushFuture =
ctx.executor()
.schedule(
() -> {
synchronized (MessageBatcher.this) {
flushBatch(ctx);
}
},
batchDelayMs,
TimeUnit.MILLISECONDS);
}
// Periodically adapt batch size based on throughput
adaptBatchSize();
return true;
}
/**
* Flushes the current batch immediately.
*
* @param ctx channel handler context
* @return list of messages to send, or empty list if batch is empty
*/
public synchronized List<OutboundMessage> flushBatch(ChannelHandlerContext ctx) {
if (currentBatch.isEmpty()) {
return new ArrayList<>();
}
// Cancel scheduled flush
if (flushFuture != null) {
flushFuture.cancel(false);
flushFuture = null;
}
flushScheduled.set(false);
// Clear current batch first and prepare return value
final List<OutboundMessage> batch = new ArrayList<>(currentBatch);
currentBatch.clear();
int batchBytes = currentBatchBytes.get();
currentBatchBytes.set(0);
// Update metrics
batchesSent.incrementAndGet();
LOGGER.trace("Flushing batch: {} messages, {} bytes", batch.size(), batchBytes);
return batch;
}
/** Adapts the batch size based on current throughput. */
private void adaptBatchSize() {
long now = System.currentTimeMillis();
if (now - lastAdaptiveCheck < ADAPTIVE_CHECK_INTERVAL_MS) {
return;
}
long currentMessageCount = messagesProcessed.get();
long messageRate =
(currentMessageCount - lastMessageCount) * 1000 / ADAPTIVE_CHECK_INTERVAL_MS;
int currentTarget = currentBatchSizeTarget.get();
int newTarget = currentTarget;
// High message rate: increase batch size
if (messageRate > 1000 && currentTarget < maxBatchSize) {
newTarget = Math.min(currentTarget * 2, maxBatchSize);
} else if (messageRate < 100 && currentTarget > MIN_BATCH_SIZE) {
// Low message rate: decrease batch size for lower latency
newTarget = Math.max(currentTarget / 2, MIN_BATCH_SIZE);
}
if (newTarget != currentTarget) {
currentBatchSizeTarget.set(newTarget);
LOGGER.debug(
"Adapted batch size target: {} -> {} (message rate: {}/s)",
currentTarget,
newTarget,
messageRate);
}
lastAdaptiveCheck = now;
lastMessageCount = currentMessageCount;
}
/**
* Gets performance metrics for the batcher.
*
* @return formatted string with metrics
*/
public String getMetrics() {
long messages = messagesProcessed.get();
long batches = batchesSent.get();
long bytes = bytesProcessed.get();
double avgBatchSize = batches > 0 ? (double) messages / batches : 0;
double avgBatchBytes = batches > 0 ? (double) bytes / batches : 0;
return String.format(
"MessageBatcher Metrics:\n"
+ " Messages processed: %d\n"
+ " Batches sent: %d\n"
+ " Bytes processed: %d KB\n"
+ " Average batch size: %.1f messages\n"
+ " Average batch bytes: %.1f KB\n"
+ " Current batch target: %d\n"
+ " Current batch: %d messages, %d bytes",
messages,
batches,
bytes / 1024,
avgBatchSize,
avgBatchBytes / 1024,
currentBatchSizeTarget.get(),
currentBatch.size(),
currentBatchBytes.get());
}
/** Resets all metrics. */
public void resetMetrics() {
messagesProcessed.set(0);
batchesSent.set(0);
bytesProcessed.set(0);
lastMessageCount = 0;
lastAdaptiveCheck = System.currentTimeMillis();
}
/** Shuts down the batcher, canceling any pending flushes. */
public synchronized void shutdown() {
if (flushFuture != null) {
flushFuture.cancel(false);
flushFuture = null;
}
currentBatch.clear();
currentBatchBytes.set(0);
flushScheduled.set(false);
}
}