ConnectionHealthHandler.java
/*
* SPDX-FileCopyrightText: 2023-2025 Lucimber UG
* SPDX-License-Identifier: Apache-2.0
*/
package com.lucimber.dbus.connection;
import com.lucimber.dbus.message.InboundMessage;
import com.lucimber.dbus.message.InboundMethodReturn;
import com.lucimber.dbus.message.OutboundMethodCall;
import com.lucimber.dbus.type.DBusObjectPath;
import com.lucimber.dbus.type.DBusString;
import com.lucimber.dbus.type.DBusUInt32;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A connection handler that monitors the health of a D-Bus connection by performing periodic ping
* operations using the standard D-Bus Peer.Ping method.
*
* <p>This handler integrates with the connection pipeline and manages connection state transitions,
* health checks, and event firing to registered listeners.
*
* <p>The health monitoring is performed using the standard D-Bus Peer.Ping method, which is
* supported by all D-Bus implementations. The handler automatically starts monitoring when the
* connection becomes active and stops when it becomes inactive.
*
* <p>Connection state transitions are tracked and notified to registered listeners through the
* {@link ConnectionEventListener} interface.
*
* @see ConnectionState
* @see ConnectionEvent
* @see ConnectionEventListener
* @see ConnectionConfig
* @since 1.0.0
*/
public final class ConnectionHealthHandler extends AbstractDuplexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionHealthHandler.class);
// Resource limits to prevent exhaustion
private static final int MAX_PENDING_HEALTH_CHECKS = 100;
private static final int MAX_CONSECUTIVE_FAILURES = 10;
private final ConnectionConfig config;
private final ScheduledExecutorService scheduler;
private final ExecutorService eventExecutor;
private final ConcurrentLinkedQueue<ConnectionEventListener> listeners =
new ConcurrentLinkedQueue<>();
private final ConcurrentHashMap<DBusUInt32, CompletableFuture<InboundMessage>>
pendingHealthChecks = new ConcurrentHashMap<>();
private final AtomicBoolean active = new AtomicBoolean(false);
private final AtomicReference<ScheduledFuture<?>> healthCheckFuture = new AtomicReference<>();
private final AtomicReference<Instant> lastSuccessfulCheck = new AtomicReference<>();
private final AtomicReference<ConnectionState> currentState =
new AtomicReference<>(ConnectionState.DISCONNECTED);
private final AtomicReference<Context> contextRef = new AtomicReference<>();
private final AtomicInteger consecutiveFailures = new AtomicInteger(0);
/**
* Creates a new connection health handler with the specified configuration.
*
* @param config the connection configuration, must not be null
* @throws NullPointerException if config is null
* @since 1.0.0
*/
public ConnectionHealthHandler(ConnectionConfig config) {
this.config = Objects.requireNonNull(config, "Config cannot be null");
this.scheduler =
Executors.newScheduledThreadPool(
1,
r -> {
Thread t =
new Thread(
r,
"dbus-health-scheduler-"
+ System.identityHashCode(this));
t.setDaemon(true);
return t;
});
this.eventExecutor =
Executors.newSingleThreadExecutor(
r -> {
Thread t =
new Thread(
r,
"dbus-health-events-" + System.identityHashCode(this));
t.setDaemon(true);
return t;
});
}
/**
* Adds a connection event listener.
*
* @param listener the listener to add, ignored if null
* @since 1.0.0
*/
public void addConnectionEventListener(ConnectionEventListener listener) {
if (listener != null) {
listeners.add(listener);
}
}
/**
* Removes a connection event listener.
*
* @param listener the listener to remove
* @since 1.0.0
*/
public void removeConnectionEventListener(ConnectionEventListener listener) {
listeners.remove(listener);
}
/**
* Gets the current connection state.
*
* @return the current connection state
* @since 1.0.0
*/
public ConnectionState getCurrentState() {
return currentState.get();
}
/**
* Gets the timestamp of the last successful health check.
*
* @return the timestamp of the last successful health check, or null if none
* @since 1.0.0
*/
public Instant getLastSuccessfulCheck() {
return lastSuccessfulCheck.get();
}
@Override
public void onHandlerAdded(Context ctx) {
contextRef.set(ctx);
super.onHandlerAdded(ctx);
}
@Override
public void onHandlerRemoved(Context ctx) {
contextRef.set(null);
super.onHandlerRemoved(ctx);
}
@Override
public void onConnectionActive(Context ctx) {
LOGGER.debug("Connection became active, starting health monitoring");
updateState(ConnectionState.CONNECTED);
startHealthMonitoring(ctx);
ctx.propagateConnectionActive();
}
@Override
public void onConnectionInactive(Context ctx) {
LOGGER.debug("Connection became inactive, stopping health monitoring");
updateState(ConnectionState.DISCONNECTED);
stopHealthMonitoring();
ctx.propagateConnectionInactive();
}
@Override
public void handleInboundMessage(Context ctx, InboundMessage msg) {
// Check if this is a reply to one of our health check pings
if (msg instanceof InboundMethodReturn methodReturn) {
CompletableFuture<InboundMessage> future =
pendingHealthChecks.remove(methodReturn.getReplySerial());
if (future != null) {
// This is a health check response
LOGGER.debug(
"Received health check response for serial: {}",
methodReturn.getReplySerial());
future.complete(msg);
return; // Don't propagate health check responses
}
}
// Not a health check response, propagate normally
ctx.propagateInboundMessage(msg);
}
@Override
public void handleInboundFailure(Context ctx, Throwable cause) {
LOGGER.warn("Inbound failure detected, updating connection state", cause);
// Connection failure detected, update state and stop health monitoring
updateState(ConnectionState.FAILED);
stopHealthMonitoring();
// Fail any pending health checks
pendingHealthChecks.values().forEach(future -> future.completeExceptionally(cause));
pendingHealthChecks.clear();
ctx.propagateInboundFailure(cause);
}
private void startHealthMonitoring(Context ctx) {
if (!config.isHealthCheckEnabled()) {
LOGGER.debug("Health monitoring is disabled in configuration");
return;
}
if (active.compareAndSet(false, true)) {
LOGGER.info(
"Starting connection health monitoring with interval: {}",
config.getHealthCheckInterval());
lastSuccessfulCheck.set(Instant.now());
ScheduledFuture<?> future =
scheduler.scheduleAtFixedRate(
() -> performHealthCheck(ctx),
config.getHealthCheckInterval().toMillis(),
config.getHealthCheckInterval().toMillis(),
TimeUnit.MILLISECONDS);
healthCheckFuture.set(future);
}
}
private void stopHealthMonitoring() {
if (active.compareAndSet(true, false)) {
LOGGER.info("Stopping connection health monitoring");
ScheduledFuture<?> future = healthCheckFuture.getAndSet(null);
if (future != null) {
future.cancel(false);
}
// Fail any pending health checks
pendingHealthChecks.values().forEach(f -> f.cancel(true));
pendingHealthChecks.clear();
}
}
private void performHealthCheck(Context ctx) {
if (!active.get()) {
return;
}
// Check for too many consecutive failures
if (consecutiveFailures.get() >= MAX_CONSECUTIVE_FAILURES) {
LOGGER.error(
"Too many consecutive health check failures ({}), stopping health monitoring",
consecutiveFailures.get());
updateState(ConnectionState.FAILED);
stopHealthMonitoring();
return;
}
// Check for too many pending health checks
if (pendingHealthChecks.size() >= MAX_PENDING_HEALTH_CHECKS) {
LOGGER.warn(
"Too many pending health checks ({}), skipping this check",
pendingHealthChecks.size());
return;
}
try {
LOGGER.debug("Performing health check...");
// Create a ping method call using the standard D-Bus Peer interface
DBusUInt32 serial = ctx.getConnection().getNextSerial();
OutboundMethodCall pingCall =
OutboundMethodCall.Builder.create()
.withSerial(serial)
.withPath(DBusObjectPath.valueOf("/org/freedesktop/DBus"))
.withMember(DBusString.valueOf("Ping"))
.withDestination(DBusString.valueOf("org.freedesktop.DBus"))
.withInterface(DBusString.valueOf("org.freedesktop.DBus.Peer"))
.withReplyExpected(true)
.withTimeout(config.getHealthCheckTimeout())
.build();
// Set up a future to track the ping response
CompletableFuture<InboundMessage> pingFuture = new CompletableFuture<>();
pendingHealthChecks.put(serial, pingFuture);
// Send the ping message
CompletableFuture<Void> sendFuture = new CompletableFuture<>();
sendFuture.exceptionally(
throwable -> {
pingFuture.completeExceptionally(throwable);
return null;
});
ctx.propagateOutboundMessage(pingCall, sendFuture);
// Handle the ping response asynchronously
pingFuture
.orTimeout(config.getHealthCheckTimeout().toMillis(), TimeUnit.MILLISECONDS)
.whenComplete(
(response, throwable) -> {
pendingHealthChecks.remove(serial);
if (throwable != null) {
// Health check failed
int failures = consecutiveFailures.incrementAndGet();
LOGGER.warn(
"Health check failed (consecutive failures: {}): {}",
failures,
throwable.getMessage());
fireConnectionEvent(
ConnectionEvent.healthCheckFailure(throwable));
// Update state to unhealthy if we were connected
if (currentState.get() == ConnectionState.CONNECTED) {
updateState(ConnectionState.UNHEALTHY);
}
} else {
// Health check succeeded
consecutiveFailures.set(0); // Reset failure counter
lastSuccessfulCheck.set(Instant.now());
fireConnectionEvent(ConnectionEvent.healthCheckSuccess());
// Update state to healthy if we were unhealthy
if (currentState.get() == ConnectionState.UNHEALTHY) {
updateState(ConnectionState.CONNECTED);
}
LOGGER.debug("Health check succeeded");
}
});
} catch (Exception e) {
int failures = consecutiveFailures.incrementAndGet();
LOGGER.error("Error performing health check (consecutive failures: {})", failures, e);
fireConnectionEvent(ConnectionEvent.healthCheckFailure(e));
if (currentState.get() == ConnectionState.CONNECTED) {
updateState(ConnectionState.UNHEALTHY);
}
}
}
private void updateState(ConnectionState newState) {
ConnectionState oldState = currentState.getAndSet(newState);
if (oldState != newState) {
LOGGER.debug("Connection state changed from {} to {}", oldState, newState);
fireConnectionEvent(ConnectionEvent.stateChanged(oldState, newState));
}
}
private void fireConnectionEvent(ConnectionEvent event) {
if (listeners.isEmpty()) {
return;
}
// Fire events asynchronously to avoid blocking the health check thread
eventExecutor.execute(
() -> {
Context context = contextRef.get();
Connection connection = context != null ? context.getConnection() : null;
for (ConnectionEventListener listener : listeners) {
try {
listener.onConnectionEvent(connection, event);
} catch (Exception e) {
LOGGER.warn("Error firing connection event to listener", e);
}
}
});
}
/**
* Manually triggers a health check.
*
* <p>If the handler is not active or no context is available, this method returns a completed
* future immediately.
*
* @return a CompletableFuture that indicates when the health check is triggered
* @since 1.0.0
*/
public CompletableFuture<Void> triggerHealthCheck() {
Context ctx = contextRef.get();
if (ctx == null || !active.get()) {
return CompletableFuture.completedFuture(null);
}
return CompletableFuture.runAsync(() -> performHealthCheck(ctx), scheduler);
}
/**
* Shuts down the health monitor and its executors.
*
* <p>This method stops all health monitoring, cancels pending health checks, and shuts down the
* internal thread pools. It should be called when the handler is no longer needed to prevent
* resource leaks.
*
* @since 1.0.0
*/
public void shutdown() {
stopHealthMonitoring();
scheduler.shutdown();
eventExecutor.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
if (!eventExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
eventExecutor.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
eventExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
@Override
protected Logger getLogger() {
return LOGGER;
}
}