ConnectionReconnectHandler.java
/*
* SPDX-FileCopyrightText: 2023-2025 Lucimber UG
* SPDX-License-Identifier: Apache-2.0
*/
package com.lucimber.dbus.connection;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A connection handler that implements automatic reconnection with exponential backoff when
* connection failures are detected.
*
* <p>This handler integrates with the connection pipeline and monitors connection events to
* automatically attempt reconnection when the connection is lost. It uses exponential backoff to
* avoid overwhelming the server with reconnection attempts.
*
* <p>The handler listens for connection state changes and failures, triggering reconnection
* attempts based on the configured backoff strategy.
*/
public final class ConnectionReconnectHandler extends AbstractDuplexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionReconnectHandler.class);
private final ConnectionConfig config;
private final ScheduledExecutorService scheduler;
private final AtomicBoolean enabled = new AtomicBoolean(false);
private final AtomicInteger attemptCount = new AtomicInteger(0);
private final AtomicReference<ScheduledFuture<?>> reconnectFuture = new AtomicReference<>();
private final AtomicReference<Instant> lastReconnectAttempt = new AtomicReference<>();
private final AtomicReference<Context> contextRef = new AtomicReference<>();
private final AtomicReference<Connection> connectionRef = new AtomicReference<>();
/**
* Creates a new reconnection handler with the specified configuration.
*
* @param config The connection configuration containing reconnection settings
*/
public ConnectionReconnectHandler(ConnectionConfig config) {
this.config = Objects.requireNonNull(config, "Config cannot be null");
this.scheduler =
Executors.newScheduledThreadPool(
1,
r -> {
Thread t =
new Thread(
r,
"dbus-reconnect-scheduler-"
+ System.identityHashCode(this));
t.setDaemon(true);
return t;
});
}
/**
* Gets the current number of reconnection attempts.
*
* @return The current attempt count
*/
public int getAttemptCount() {
return attemptCount.get();
}
/**
* Gets the timestamp of the last reconnection attempt.
*
* @return The timestamp of the last reconnection attempt, or null if none
*/
public Instant getLastReconnectAttempt() {
return lastReconnectAttempt.get();
}
/**
* Gets whether reconnection is currently enabled.
*
* @return true if reconnection is active
*/
public boolean isEnabled() {
return enabled.get();
}
/** Manually cancels any pending reconnection attempts. */
public void cancelReconnection() {
ScheduledFuture<?> future = reconnectFuture.getAndSet(null);
if (future != null) {
future.cancel(false);
LOGGER.debug("Cancelled pending reconnection attempt");
}
}
/** Resets the reconnection state, clearing attempt count and timers. */
public void reset() {
attemptCount.set(0);
lastReconnectAttempt.set(null);
cancelReconnection();
}
@Override
public void onHandlerAdded(Context ctx) {
contextRef.set(ctx);
connectionRef.set(ctx.getConnection());
// Listen for connection events to trigger reconnection
ctx.getConnection().addConnectionEventListener(this::handleConnectionEvent);
super.onHandlerAdded(ctx);
}
@Override
public void onHandlerRemoved(Context ctx) {
contextRef.set(null);
connectionRef.set(null);
enabled.set(false);
cancelReconnection();
super.onHandlerRemoved(ctx);
}
@Override
public void onConnectionActive(Context ctx) {
LOGGER.debug("Connection became active, resetting reconnection state");
reset();
enabled.set(true);
ctx.propagateConnectionActive();
}
@Override
public void onConnectionInactive(Context ctx) {
LOGGER.debug("Connection became inactive");
enabled.set(false);
ctx.propagateConnectionInactive();
}
@Override
public void handleInboundFailure(Context ctx, Throwable cause) {
LOGGER.warn("Inbound failure detected, may trigger reconnection", cause);
// Let the failure propagate first, then consider reconnection
ctx.propagateInboundFailure(cause);
}
private void handleConnectionEvent(Connection connection, ConnectionEvent event) {
if (!config.isAutoReconnectEnabled()) {
LOGGER.debug(
"Auto-reconnect is disabled, ignoring connection event: {}", event.getType());
return;
}
switch (event.getType()) {
case STATE_CHANGED:
handleStateChange(connection, event);
break;
case HEALTH_CHECK_FAILURE:
// Health check failures are handled by state changes to UNHEALTHY
LOGGER.debug("Health check failure detected, waiting for state change");
break;
case HEALTH_CHECK_SUCCESS:
// Connection recovered, reset reconnection state
if (attemptCount.get() > 0) {
LOGGER.info(
"Connection recovered after {} reconnection attempts",
attemptCount.get());
reset();
}
break;
default:
// Other events don't affect reconnection
break;
}
}
private void handleStateChange(Connection connection, ConnectionEvent event) {
ConnectionState newState = event.getNewState().orElse(null);
ConnectionState oldState = event.getOldState().orElse(null);
if (newState == null) {
return;
}
LOGGER.debug("Connection state changed from {} to {}", oldState, newState);
switch (newState) {
case DISCONNECTED:
case FAILED:
// Connection lost, trigger reconnection
if (enabled.get() && oldState != ConnectionState.RECONNECTING) {
LOGGER.info("Connection lost ({}), scheduling reconnection", newState);
scheduleReconnection(connection);
}
break;
case UNHEALTHY:
// Connection is unhealthy, but we'll wait for it to either recover or fail
LOGGER.debug("Connection is unhealthy, monitoring for recovery or failure");
break;
case CONNECTED:
// Connection established successfully
if (oldState == ConnectionState.RECONNECTING) {
LOGGER.info("Reconnection successful after {} attempts", attemptCount.get());
reset();
}
break;
default:
// Other states don't trigger reconnection
break;
}
}
private void scheduleReconnection(Connection connection) {
if (!config.isAutoReconnectEnabled() || !enabled.get()) {
LOGGER.debug("Auto-reconnect disabled, not scheduling reconnection");
return;
}
int currentAttempt = attemptCount.get();
// Check if we've exceeded the maximum number of attempts
if (config.getMaxReconnectAttempts() > 0
&& currentAttempt >= config.getMaxReconnectAttempts()) {
LOGGER.error(
"Maximum reconnection attempts ({}) exceeded, giving up",
config.getMaxReconnectAttempts());
enabled.set(false);
return;
}
// Calculate delay with exponential backoff
Duration delay = calculateBackoffDelay(currentAttempt);
LOGGER.info("Scheduling reconnection attempt {} in {}", currentAttempt + 1, delay);
ScheduledFuture<?> future =
scheduler.schedule(
() -> attemptReconnection(connection),
delay.toMillis(),
TimeUnit.MILLISECONDS);
reconnectFuture.set(future);
}
private Duration calculateBackoffDelay(int attemptNumber) {
Duration initialDelay = config.getReconnectInitialDelay();
double multiplier = config.getReconnectBackoffMultiplier();
Duration maxDelay = config.getReconnectMaxDelay();
// Calculate exponential backoff: initial * (multiplier ^ attemptNumber)
double delayMs = initialDelay.toMillis() * Math.pow(multiplier, attemptNumber);
// Cap at maximum delay
long finalDelayMs = Math.min((long) delayMs, maxDelay.toMillis());
return Duration.ofMillis(finalDelayMs);
}
private void attemptReconnection(Connection connection) {
if (!config.isAutoReconnectEnabled() || !enabled.get()) {
LOGGER.debug("Auto-reconnect disabled, skipping reconnection attempt");
return;
}
int currentAttempt = attemptCount.incrementAndGet();
lastReconnectAttempt.set(Instant.now());
LOGGER.info("Attempting reconnection #{}", currentAttempt);
// Fire a reconnection attempt event
fireReconnectionAttemptEvent(connection, currentAttempt);
// Attempt to reconnect
CompletableFuture<Void> reconnectFuture = connection.connect().toCompletableFuture();
reconnectFuture.whenComplete(
(result, throwable) -> {
if (throwable != null) {
LOGGER.warn(
"Reconnection attempt #{} failed: {}",
currentAttempt,
throwable.getMessage());
// Fire a failure event
fireReconnectionFailureEvent(connection, throwable);
// Schedule next attempt if we haven't exceeded the limit
if (config.getMaxReconnectAttempts() == 0
|| currentAttempt < config.getMaxReconnectAttempts()) {
scheduleReconnection(connection);
} else {
LOGGER.error(
"Maximum reconnection attempts ({}) exceeded, giving up",
config.getMaxReconnectAttempts());
enabled.set(false);
fireReconnectionExhaustedEvent(connection);
}
} else {
LOGGER.info("Reconnection attempt #{} succeeded", currentAttempt);
fireReconnectionSuccessEvent(connection);
}
});
}
private void fireReconnectionAttemptEvent(Connection connection, int attemptNumber) {
LOGGER.debug("Firing reconnection attempt event for attempt #{}", attemptNumber);
// The event will be handled by the connection's event system
}
private void fireReconnectionFailureEvent(Connection connection, Throwable cause) {
LOGGER.debug("Firing reconnection failure event: {}", cause.getMessage());
// The event will be handled by the connection's event system
}
private void fireReconnectionSuccessEvent(Connection connection) {
LOGGER.debug("Firing reconnection success event");
// The event will be handled by the connection's event system
}
private void fireReconnectionExhaustedEvent(Connection connection) {
LOGGER.debug("Firing reconnection exhausted event");
// The event will be handled by the connection's event system
}
/** Shuts down the reconnection handler and its scheduler. */
public void shutdown() {
enabled.set(false);
cancelReconnection();
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
@Override
protected Logger getLogger() {
return LOGGER;
}
}