ReconnectionHandlerManager.java
/*
* SPDX-FileCopyrightText: 2023-2025 Lucimber UG
* SPDX-License-Identifier: Apache-2.0
*/
package com.lucimber.dbus.netty;
import com.lucimber.dbus.util.LoggerUtils;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.Promise;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Handler responsible for managing pipeline handlers during reconnection. This handler listens for
* reconnection events and re-adds handlers that were removed during the connection process, such as
* SASL handlers.
*/
public final class ReconnectionHandlerManager extends ChannelInboundHandlerAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(ReconnectionHandlerManager.class);
private final Promise<Void> connectPromise;
public ReconnectionHandlerManager(Promise<Void> connectPromise) {
this.connectPromise = connectPromise;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == DBusChannelEvent.RECONNECTION_STARTING) {
handleReconnectionStarting(ctx);
} else if (evt == DBusChannelEvent.RECONNECTION_HANDLERS_READD_REQUIRED) {
handleReconnectionHandlersReaddRequired(ctx);
}
// Always propagate the event
super.userEventTriggered(ctx, evt);
}
private void handleReconnectionStarting(ChannelHandlerContext ctx) {
LOGGER.debug(LoggerUtils.CONNECTION, "Reconnection starting - preparing pipeline");
// Reset channel attributes for reconnection
ctx.channel().attr(DBusChannelAttribute.SERIAL_COUNTER).set(new AtomicLong(1));
ctx.channel().attr(DBusChannelAttribute.ASSIGNED_BUS_NAME).set(null);
LOGGER.debug(LoggerUtils.CONNECTION, "Channel attributes reset for reconnection");
}
private void handleReconnectionHandlersReaddRequired(ChannelHandlerContext ctx) {
LOGGER.debug(
LoggerUtils.CONNECTION, "Re-adding handlers that were removed during connection");
// Re-add SASL handlers in the correct order if they don't exist
addSaslHandlersIfMissing(ctx);
// Update ConnectionCompletionHandler with new promise
updateConnectionCompletionHandler(ctx);
LOGGER.info(LoggerUtils.CONNECTION, "Handlers re-added for reconnection");
}
private void addSaslHandlersIfMissing(ChannelHandlerContext ctx) {
// Use centralized configuration to ensure consistent ordering
Map<String, Supplier<ChannelHandler>> saslHandlers =
DBusHandlerConfiguration.getSaslHandlers();
// Find the first D-Bus handler to use as insertion point
String firstDbusHandler = DBusHandlerConfiguration.findFirstDbusHandler(ctx.pipeline());
String previousHandler = null;
for (Map.Entry<String, Supplier<ChannelHandler>> entry : saslHandlers.entrySet()) {
String handlerName = entry.getKey();
// Check if handler is missing from pipeline
if (ctx.pipeline().get(handlerName) == null) {
ChannelHandler handler = entry.getValue().get();
if (previousHandler != null) {
// Add after the previous SASL handler
ctx.pipeline().addAfter(previousHandler, handlerName, handler);
} else if (firstDbusHandler != null) {
// Add before the first D-Bus handler
ctx.pipeline().addBefore(firstDbusHandler, handlerName, handler);
} else {
// Fallback: add at the beginning
ctx.pipeline().addFirst(handlerName, handler);
}
LOGGER.debug(LoggerUtils.HANDLER_LIFECYCLE, "Re-added {}", handlerName);
}
previousHandler = handlerName;
}
}
private void updateConnectionCompletionHandler(ChannelHandlerContext ctx) {
ConnectionCompletionHandler completionHandler =
(ConnectionCompletionHandler)
ctx.pipeline().get(DBusHandlerNames.CONNECTION_COMPLETION_HANDLER);
if (completionHandler != null) {
completionHandler.reset(connectPromise);
LOGGER.debug(
LoggerUtils.HANDLER_LIFECYCLE,
"Updated ConnectionCompletionHandler with new promise");
}
}
}