NettyTcpStrategy.java

/*
 * SPDX-FileCopyrightText: 2025 Lucimber UG
 * SPDX-License-Identifier: Apache-2.0
 */
package com.lucimber.dbus.netty;

import com.lucimber.dbus.connection.Connection;
import com.lucimber.dbus.connection.ConnectionConfig;
import com.lucimber.dbus.connection.ConnectionContext;
import com.lucimber.dbus.connection.ConnectionHandle;
import com.lucimber.dbus.connection.ConnectionStrategy;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Netty-based connection strategy for TCP transport.
 *
 * <p>This strategy handles connections over TCP/IP sockets using NIO transport with optimizations
 * for D-Bus communication.
 */
public final class NettyTcpStrategy implements ConnectionStrategy {

    private static final Logger LOGGER = LoggerFactory.getLogger(NettyTcpStrategy.class);

    @Override
    public boolean supports(SocketAddress address) {
        return address instanceof InetSocketAddress;
    }

    @Override
    public CompletionStage<ConnectionHandle> connect(
            SocketAddress address, ConnectionConfig config, ConnectionContext context) {
        EventLoopGroup workerGroup = createEventLoopGroup();
        Promise<Void> nettyConnectPromise = workerGroup.next().newPromise();
        CompletableFuture<ConnectionHandle> handleFuture = new CompletableFuture<>();

        RealityCheckpoint realityCheckpoint = createRealityCheckpoint(context, config);

        Bootstrap bootstrap = new Bootstrap();
        bootstrap
                .group(workerGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(
                        ChannelOption.CONNECT_TIMEOUT_MILLIS,
                        (int) config.getConnectTimeout().toMillis())
                .handler(new DBusChannelInitializer(realityCheckpoint, nettyConnectPromise));

        LOGGER.info("Attempting TCP connection to {}", address);
        ChannelFuture channelFuture = bootstrap.connect(address);

        channelFuture.addListener(
                future -> {
                    if (future.isSuccess()) {
                        LOGGER.debug("TCP connection established to {}", address);
                        // Create handle but wait for D-Bus handshake completion before resolving
                        // the future
                        NettyConnectionHandle handle =
                                new NettyConnectionHandle(
                                        channelFuture.channel(),
                                        workerGroup,
                                        config,
                                        realityCheckpoint);

                        // The nettyConnectPromise will be completed by ConnectionCompletionHandler
                        // when MANDATORY_NAME_ACQUIRED event is received
                        nettyConnectPromise.addListener(
                                connectResult -> {
                                    if (connectResult.isSuccess()) {
                                        LOGGER.debug(
                                                "D-Bus handshake completed for TCP connection");
                                        // Notify context that connection is fully established
                                        context.onConnectionEstablished();
                                        handleFuture.complete(handle);
                                    } else {
                                        LOGGER.error(
                                                "D-Bus handshake failed for TCP connection",
                                                connectResult.cause());
                                        context.onError(connectResult.cause());
                                        handleFuture.completeExceptionally(connectResult.cause());
                                    }
                                });
                    } else {
                        LOGGER.error(
                                "Failed to establish TCP connection to {}",
                                address,
                                future.cause());
                        handleFuture.completeExceptionally(future.cause());
                    }
                });

        return handleFuture;
    }

    @Override
    public String getTransportName() {
        return "NIO (TCP/IP)";
    }

    @Override
    public boolean isAvailable() {
        return true; // NIO is always available
    }

    private EventLoopGroup createEventLoopGroup() {
        LOGGER.debug("Creating NIO EventLoopGroup for TCP connection");
        return new MultiThreadIoEventLoopGroup(1, NioIoHandler.newFactory());
    }

    private RealityCheckpoint createRealityCheckpoint(
            ConnectionContext context, ConnectionConfig config) {
        // Extract the connection from the context - this requires the context to provide access
        if (!(context instanceof NettyConnectionContext nettyContext)) {
            throw new IllegalArgumentException("TCP strategy requires NettyConnectionContext");
        }

        // Create executor service for this connection
        java.util.concurrent.ExecutorService executor =
                java.util.concurrent.Executors.newFixedThreadPool(
                        Math.max(2, Runtime.getRuntime().availableProcessors() / 2),
                        runnable -> {
                            Thread t =
                                    new Thread(
                                            runnable,
                                            "dbus-app-worker-" + System.identityHashCode(runnable));
                            t.setDaemon(true);
                            return t;
                        });

        // Get the connection from context - this will need to be added to NettyConnectionContext
        Connection connection = nettyContext.getConnection();
        return new RealityCheckpoint(executor, connection, config.getMethodCallTimeoutMs());
    }
}