NettyConnection.java
/*
* SPDX-FileCopyrightText: 2023-2025 Lucimber UG
* SPDX-License-Identifier: Apache-2.0
*/
package com.lucimber.dbus.netty;
import com.lucimber.dbus.connection.Connection;
import com.lucimber.dbus.connection.ConnectionConfig;
import com.lucimber.dbus.connection.ConnectionEventListener;
import com.lucimber.dbus.connection.ConnectionHandle;
import com.lucimber.dbus.connection.ConnectionHealthHandler;
import com.lucimber.dbus.connection.ConnectionLifecycleManager;
import com.lucimber.dbus.connection.ConnectionReconnectHandler;
import com.lucimber.dbus.connection.ConnectionState;
import com.lucimber.dbus.connection.ConnectionStrategy;
import com.lucimber.dbus.connection.ConnectionStrategyRegistry;
import com.lucimber.dbus.connection.ConnectionThreadPoolManager;
import com.lucimber.dbus.connection.DefaultPipeline;
import com.lucimber.dbus.connection.Pipeline;
import com.lucimber.dbus.message.InboundMessage;
import com.lucimber.dbus.message.OutboundMessage;
import com.lucimber.dbus.type.DBusUInt32;
import com.lucimber.dbus.util.ErrorRecoveryManager;
import com.lucimber.dbus.util.ErrorRecoveryManager.CircuitBreaker;
import com.lucimber.dbus.util.ErrorRecoveryManager.RetryConfig;
import io.netty.channel.unix.DomainSocketAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class NettyConnection implements Connection {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyConnection.class);
private final SocketAddress serverAddress;
private final Pipeline pipeline;
private final ConnectionConfig config;
private final ConnectionStrategy strategy;
private final AtomicReference<ConnectionHandle> connectionHandle = new AtomicReference<>();
private final ConnectionLifecycleManager lifecycleManager;
private final ConnectionThreadPoolManager threadPoolManager;
private final String connectionId;
private volatile ConnectionHealthHandler healthHandler;
private volatile ConnectionReconnectHandler reconnectHandler;
private final Object handlerInitLock = new Object();
public NettyConnection(SocketAddress serverAddress) {
this(serverAddress, ConnectionConfig.defaultConfig());
}
public NettyConnection(SocketAddress serverAddress, ConnectionConfig config) {
this.serverAddress =
Objects.requireNonNull(serverAddress, "serverAddress must not be null");
this.config = Objects.requireNonNull(config, "config must not be null");
this.connectionId = generateConnectionId(serverAddress);
ConnectionStrategyRegistry strategyRegistry = createDefaultStrategyRegistry();
this.strategy =
strategyRegistry
.findStrategy(serverAddress)
.orElseThrow(
() ->
new IllegalArgumentException(
"No strategy available for: " + serverAddress));
LOGGER.info("Using transport strategy: {}", strategy.getTransportName());
this.pipeline = new DefaultPipeline(this);
// Initialize lifecycle and thread pool managers
this.lifecycleManager = new ConnectionLifecycleManager(config, connectionId);
this.threadPoolManager = new ConnectionThreadPoolManager(connectionId);
}
/**
* Generates a unique connection ID based on the server address.
*
* @param serverAddress the server address
* @return unique connection ID
*/
private static String generateConnectionId(SocketAddress serverAddress) {
return serverAddress.toString().replaceAll("[^a-zA-Z0-9]", "-")
+ "-"
+ System.identityHashCode(serverAddress);
}
/**
* Creates the default strategy registry with all available Netty strategies.
*
* @return configured strategy registry
*/
private static ConnectionStrategyRegistry createDefaultStrategyRegistry() {
ConnectionStrategyRegistry registry = new ConnectionStrategyRegistry();
registry.registerStrategy(new NettyUnixSocketStrategy());
registry.registerStrategy(new NettyTcpStrategy());
return registry;
}
/**
* Creates a connection for the standard system bus path. (Typically
* /var/run/dbus/system_bus_socket)
*
* @return A new instance.
* @throws UnsupportedOperationException if native transport for UDS is not available.
*/
public static NettyConnection newSystemBusConnection() {
return newSystemBusConnection(ConnectionConfig.defaultConfig());
}
/**
* Creates a connection for the standard system bus path with custom configuration. (Typically
* /var/run/dbus/system_bus_socket)
*
* @param config The connection configuration to use
* @return A new instance.
* @throws UnsupportedOperationException if native transport for UDS is not available.
*/
public static NettyConnection newSystemBusConnection(ConnectionConfig config) {
// Standard system bus path, can be overridden by DBUS_SYSTEM_BUS_ADDRESS env var
String path = System.getenv("DBUS_SYSTEM_BUS_ADDRESS");
if (path == null || path.isEmpty()) {
path = "/var/run/dbus/system_bus_socket"; // Default
} else if (path.startsWith("unix:path=")) {
path = path.substring("unix:path=".length());
} else {
// Handle other address formats if necessary (e.g., abstract sockets, tcp)
LOGGER.warn(
"DBUS_SYSTEM_BUS_ADDRESS format not fully parsed, using raw value: {}", path);
}
return new NettyConnection(new DomainSocketAddress(path), config);
}
/**
* Creates a connection for the standard session bus path. (Path is usually obtained from
* DBUS_SESSION_BUS_ADDRESS env var)
*
* @return A new instance.
* @throws UnsupportedOperationException if native transport for UDS is not available or address
* not found.
*/
public static NettyConnection newSessionBusConnection() {
return newSessionBusConnection(ConnectionConfig.defaultConfig());
}
/**
* Creates a connection for the standard session bus path with custom configuration. (Path is
* usually obtained from DBUS_SESSION_BUS_ADDRESS env var)
*
* @param config The connection configuration to use
* @return A new instance.
* @throws UnsupportedOperationException if native transport for UDS is not available or address
* not found.
*/
public static NettyConnection newSessionBusConnection(ConnectionConfig config) {
String address = System.getenv("DBUS_SESSION_BUS_ADDRESS");
if (address == null || address.isEmpty()) {
throw new IllegalStateException(
"DBUS_SESSION_BUS_ADDRESS environment variable not set.");
}
// DBUS_SESSION_BUS_ADDRESS can be complex, e.g., "unix:path=/tmp/dbus-...,guid=..."
// For now, we'll parse simple "unix:path="
if (address.startsWith("unix:path=")) {
String path = address.substring("unix:path=".length());
// It might have a comma and other params, e.g., unix:path=/tmp/dbus-...,guid=...
int commaIndex = path.indexOf(',');
if (commaIndex != -1) {
path = path.substring(0, commaIndex);
}
return new NettyConnection(new DomainSocketAddress(path), config);
} else if (address.startsWith("tcp:host=")) {
// Example: tcp:host=localhost,port=12345 or tcp:host=127.0.0.1,port=12345,family=ipv4
try {
String host = null;
int port = -1;
String[] params = address.substring("tcp:".length()).split(",");
for (String param : params) {
String[] kv = param.split("=");
if (kv.length == 2) {
if ("host".equals(kv[0])) {
host = kv[1];
} else if ("port".equals(kv[0])) {
port = Integer.parseInt(kv[1]);
}
}
}
if (host != null && port != -1) {
return new NettyConnection(new InetSocketAddress(host, port), config);
}
} catch (Exception e) {
throw new IllegalArgumentException(
"Could not parse TCP DBUS_SESSION_BUS_ADDRESS: " + address, e);
}
}
throw new IllegalArgumentException(
"Unsupported DBUS_SESSION_BUS_ADDRESS format: "
+ address
+ ". Only simple 'unix:path=' or 'tcp:host=...,port=...' currently supported.");
}
@Override
public CompletionStage<Void> connect() {
// Check if already connected
ConnectionHandle currentHandle = this.connectionHandle.get();
if (currentHandle != null && currentHandle.isActive()) {
LOGGER.warn("Already connected.");
return CompletableFuture.completedFuture(null);
}
LOGGER.info(
"Attempting to connect to DBus server at {} using strategy: {}",
serverAddress,
strategy.getTransportName());
// Use lifecycle manager to handle connection
return lifecycleManager
.connect(
() -> {
// Initialize handlers inside the connect supplier to ensure thread
// safety
synchronized (handlerInitLock) {
if (this.healthHandler == null) {
this.healthHandler = new ConnectionHealthHandler(config);
// Add health handler to pipeline if health monitoring is
// enabled
if (config.isHealthCheckEnabled()) {
try {
this.pipeline.addLast("health-monitor", healthHandler);
} catch (IllegalArgumentException e) {
// Handler already exists, ignore
LOGGER.debug(
"Health monitor handler already in pipeline");
}
}
}
if (this.reconnectHandler == null) {
this.reconnectHandler = new ConnectionReconnectHandler(config);
// Add reconnect handler to pipeline if auto-reconnect is
// enabled
if (config.isAutoReconnectEnabled()) {
try {
this.pipeline.addLast(
"reconnect-handler", reconnectHandler);
} catch (IllegalArgumentException e) {
// Handler already exists, ignore
LOGGER.debug("Reconnect handler already in pipeline");
}
}
}
}
// Create connection context for strategy
NettyConnectionContext context =
new NettyConnectionContext(
pipeline,
threadPoolManager.getApplicationTaskExecutor(),
this);
// Create retry configuration for connection attempts
RetryConfig retryConfig =
RetryConfig.builder()
.maxRetries(3)
.initialDelay(Duration.ofMillis(500))
.maxDelay(config.getConnectTimeout())
.backoffMultiplier(2.0)
.jitterFactor(0.1)
.build();
// Use error recovery from lifecycle manager
ErrorRecoveryManager errorRecovery =
lifecycleManager.getErrorRecoveryManager();
return errorRecovery.executeWithRetry(
() -> {
LOGGER.debug("Attempting connection to {}", serverAddress);
return strategy.connect(serverAddress, config, context)
.toCompletableFuture();
},
retryConfig);
})
.handle(
(handle, error) -> {
if (error != null) {
if (error.getMessage() != null
&& error.getMessage().contains("already in progress")) {
// Connection attempt already in progress - this is OK for
// concurrent calls
LOGGER.debug(
"Concurrent connection attempt detected, ignoring");
return null;
}
throw new RuntimeException(error);
}
this.connectionHandle.set(handle);
LOGGER.info("Connection established successfully");
return null;
});
}
@Override
public boolean isConnected() {
// Don't report as connected if we're in the process of closing
if (lifecycleManager.isClosing()) {
return false;
}
ConnectionHandle handle = connectionHandle.get();
return handle != null && handle.isActive() && lifecycleManager.isConnected();
}
@Override
public void close() {
LOGGER.info("Closing DBus connection to {}...", serverAddress);
lifecycleManager
.disconnect(
() -> {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
Exception shutdownException = null;
try {
// Shutdown reconnect handler first
if (reconnectHandler != null) {
try {
reconnectHandler.shutdown();
LOGGER.debug("Reconnect handler shut down successfully");
} catch (Exception e) {
LOGGER.error("Error shutting down reconnect handler", e);
shutdownException =
addSuppressedException(shutdownException, e);
}
}
// Shutdown health handler
if (healthHandler != null) {
try {
healthHandler.shutdown();
LOGGER.debug("Health handler shut down successfully");
} catch (Exception e) {
LOGGER.error("Error shutting down health handler", e);
shutdownException =
addSuppressedException(shutdownException, e);
}
}
// Close connection handle
ConnectionHandle handle = connectionHandle.getAndSet(null);
if (handle != null) {
try {
long timeoutMs = config.getCloseTimeout().toMillis();
handle.close()
.toCompletableFuture()
.get(timeoutMs, TimeUnit.MILLISECONDS);
LOGGER.debug("Connection handle closed successfully");
} catch (Exception e) {
LOGGER.error("Error closing connection handle", e);
shutdownException =
addSuppressedException(shutdownException, e);
}
}
// Shutdown thread pool manager
threadPoolManager.shutdown();
// Shutdown error recovery manager
ErrorRecoveryManager errorRecovery =
lifecycleManager.getErrorRecoveryManager();
if (errorRecovery != null) {
try {
errorRecovery.shutdown();
LOGGER.debug(
"Error recovery manager shut down successfully");
} catch (Exception e) {
LOGGER.error(
"Error shutting down error recovery manager", e);
shutdownException =
addSuppressedException(shutdownException, e);
}
}
// Complete the future
if (shutdownException != null) {
LOGGER.warn(
"DBus connection to {} closed with errors",
serverAddress);
closeFuture.completeExceptionally(shutdownException);
} else {
LOGGER.info(
"DBus connection to {} closed successfully",
serverAddress);
closeFuture.complete(null);
}
} catch (Exception e) {
closeFuture.completeExceptionally(e);
}
return closeFuture;
})
.toCompletableFuture()
.join();
}
private Exception addSuppressedException(Exception primary, Exception suppressed) {
if (primary == null) {
return suppressed;
} else {
primary.addSuppressed(suppressed);
return primary;
}
}
@Override
public DBusUInt32 getNextSerial() {
ConnectionHandle handle = connectionHandle.get();
if (handle == null || !handle.isActive()) {
throw new IllegalStateException("Cannot get next serial, connection is not active.");
}
return handle.getNextSerial();
}
@Override
public Pipeline getPipeline() {
return pipeline;
}
@Override
public CompletionStage<InboundMessage> sendRequest(OutboundMessage msg) {
ConnectionHandle handle = connectionHandle.get();
if (handle == null || !handle.isActive()) {
CompletableFuture<InboundMessage> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(
new IllegalStateException("Not connected to D-Bus."));
return failedFuture;
}
// Apply circuit breaker protection to critical message sending operations
CircuitBreaker circuitBreaker = lifecycleManager.getConnectionCircuitBreaker();
if (circuitBreaker != null && circuitBreaker.getState() != CircuitBreaker.State.CLOSED) {
LOGGER.debug(
"Circuit breaker is {} - allowing message through without protection",
circuitBreaker.getState());
}
return handle.sendRequest(msg);
}
@Override
public void sendAndRouteResponse(OutboundMessage msg, CompletionStage<Void> future) {
ConnectionHandle handle = connectionHandle.get();
if (handle == null || !handle.isActive()) {
var re = new IllegalStateException("Not connected to D-Bus.");
future.toCompletableFuture().completeExceptionally(re);
} else {
handle.send(msg)
.whenComplete(
(result, throwable) -> {
if (throwable != null) {
future.toCompletableFuture().completeExceptionally(throwable);
} else {
future.toCompletableFuture().complete(null);
}
});
}
}
@Override
public ConnectionConfig getConfig() {
return config;
}
@Override
public ConnectionState getState() {
// First check lifecycle manager state
ConnectionState lifecycleState = lifecycleManager.getState();
// If health handler is available, use its more detailed state
if (healthHandler != null && lifecycleState == ConnectionState.CONNECTED) {
return healthHandler.getCurrentState();
}
return lifecycleState;
}
@Override
public void addConnectionEventListener(ConnectionEventListener listener) {
if (healthHandler != null) {
healthHandler.addConnectionEventListener(listener);
}
}
@Override
public void removeConnectionEventListener(ConnectionEventListener listener) {
if (healthHandler != null) {
healthHandler.removeConnectionEventListener(listener);
}
}
@Override
public CompletionStage<Void> triggerHealthCheck() {
if (healthHandler != null) {
return healthHandler.triggerHealthCheck();
}
return CompletableFuture.completedFuture(null);
}
@Override
public int getReconnectAttemptCount() {
if (reconnectHandler != null) {
return reconnectHandler.getAttemptCount();
}
return 0;
}
@Override
public void cancelReconnection() {
if (reconnectHandler != null) {
reconnectHandler.cancelReconnection();
}
}
@Override
public void resetReconnectionState() {
if (reconnectHandler != null) {
reconnectHandler.reset();
}
}
// Package-private methods for NettyConnectionContext callbacks
/** Called by NettyConnectionContext when connection state changes. */
void notifyStateChanged(ConnectionState newState) {
LOGGER.debug("State changed to: {}", newState);
if (healthHandler != null) {
// Update the health handler with the new state
// Note: This is a simplified approach. The health handler might need
// a different mechanism to update its internal state.
LOGGER.debug("Notifying health handler of state change to: {}", newState);
}
}
/** Called by NettyConnectionContext when an error occurs. */
void notifyError(Throwable error) {
LOGGER.error("Connection error reported by strategy", error);
// Could trigger reconnection logic here
}
/** Called by NettyConnectionContext when connection is established. */
void notifyConnectionEstablished() {
LOGGER.debug("Connection establishment confirmed by strategy");
}
/** Called by NettyConnectionContext when connection is lost. */
void notifyConnectionLost() {
LOGGER.warn("Connection loss reported by strategy");
// Could trigger reconnection logic here if enabled
if (reconnectHandler != null && config.isAutoReconnectEnabled()) {
// Trigger reconnection
LOGGER.info("Triggering auto-reconnection");
}
}
}