NettyConnectionHandle.java

/*
 * SPDX-FileCopyrightText: 2025 Lucimber UG
 * SPDX-License-Identifier: Apache-2.0
 */
package com.lucimber.dbus.netty;

import com.lucimber.dbus.connection.ConnectionConfig;
import com.lucimber.dbus.connection.ConnectionHandle;
import com.lucimber.dbus.message.InboundMessage;
import com.lucimber.dbus.message.OutboundMessage;
import com.lucimber.dbus.type.DBusUInt32;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Netty-based implementation of ConnectionHandle.
 *
 * <p>This class wraps a Netty Channel and provides the transport-agnostic ConnectionHandle
 * interface for the D-Bus connection layer.
 */
public final class NettyConnectionHandle implements ConnectionHandle {

    private static final Logger LOGGER = LoggerFactory.getLogger(NettyConnectionHandle.class);

    private final Channel channel;
    private final EventLoopGroup eventLoopGroup;
    private final ConnectionConfig config;
    private final RealityCheckpoint realityCheckpoint;
    private final AtomicBoolean closing = new AtomicBoolean(false);

    public NettyConnectionHandle(
            Channel channel,
            EventLoopGroup eventLoopGroup,
            ConnectionConfig config,
            RealityCheckpoint realityCheckpoint) {
        this.channel = channel;
        this.eventLoopGroup = eventLoopGroup;
        this.config = config;
        this.realityCheckpoint = realityCheckpoint;
    }

    @Override
    public boolean isActive() {
        // Don't report as active if we're closing
        if (closing.get()) {
            return false;
        }
        return channel != null
                && channel.isActive()
                && channel.attr(DBusChannelAttribute.ASSIGNED_BUS_NAME).get() != null;
    }

    @Override
    public CompletionStage<Void> send(OutboundMessage message) {
        if (!isActive()) {
            CompletableFuture<Void> future = new CompletableFuture<>();
            future.completeExceptionally(new IllegalStateException("Connection is not active"));
            return future;
        }

        CompletableFuture<Void> future = new CompletableFuture<>();
        channel.writeAndFlush(message)
                .addListener(
                        channelFuture -> {
                            if (channelFuture.isSuccess()) {
                                future.complete(null);
                            } else {
                                future.completeExceptionally(channelFuture.cause());
                            }
                        });
        return future;
    }

    @Override
    public CompletionStage<InboundMessage> sendRequest(OutboundMessage message) {
        if (!isActive()) {
            CompletableFuture<InboundMessage> future = new CompletableFuture<>();
            future.completeExceptionally(new IllegalStateException("Connection is not active"));
            return future;
        }

        if (realityCheckpoint == null) {
            CompletableFuture<InboundMessage> future = new CompletableFuture<>();
            future.completeExceptionally(
                    new IllegalStateException("RealityCheckpoint not available"));
            return future;
        }

        // Use RealityCheckpoint for request-response correlation
        CompletableFuture<InboundMessage> resultFuture = new CompletableFuture<>();

        try {
            Future<Future<InboundMessage>> writeResult = realityCheckpoint.writeMessage(message);

            writeResult.addListener(
                    writeFuture -> {
                        if (writeFuture.isSuccess()) {
                            // Message was successfully written, now wait for the reply
                            @SuppressWarnings("unchecked")
                            Future<InboundMessage> replyFuture =
                                    (Future<InboundMessage>) writeFuture.getNow();
                            replyFuture.addListener(
                                    replyResult -> {
                                        if (replyResult.isSuccess()) {
                                            resultFuture.complete(
                                                    (InboundMessage) replyResult.getNow());
                                        } else {
                                            resultFuture.completeExceptionally(replyResult.cause());
                                        }
                                    });
                        } else {
                            // Failed to write the message
                            resultFuture.completeExceptionally(writeFuture.cause());
                        }
                    });
        } catch (Exception e) {
            resultFuture.completeExceptionally(e);
        }

        return resultFuture;
    }

    @Override
    public DBusUInt32 getNextSerial() {
        if (!isActive()) {
            throw new IllegalStateException("Cannot get next serial, connection is not active");
        }

        AtomicLong serialCounter = channel.attr(DBusChannelAttribute.SERIAL_COUNTER).get();
        if (serialCounter == null) {
            throw new IllegalStateException("Serial counter not initialized on channel");
        }

        return DBusUInt32.valueOf((int) serialCounter.getAndIncrement());
    }

    @Override
    public CompletionStage<Void> close() {
        // Atomic check-and-set to prevent concurrent close operations
        if (!closing.compareAndSet(false, true)) {
            LOGGER.debug("Close operation already in progress, returning completed future");
            return CompletableFuture.completedFuture(null);
        }

        LOGGER.info("Closing Netty connection handle");

        CompletableFuture<Void> closeFuture = new CompletableFuture<>();

        if (channel != null) {
            try {
                channel.close()
                        .addListener(
                                channelFuture -> {
                                    if (channelFuture.isSuccess()) {
                                        LOGGER.debug("Channel closed successfully");
                                    } else {
                                        LOGGER.warn("Error closing channel", channelFuture.cause());
                                    }

                                    if (eventLoopGroup != null
                                            && !eventLoopGroup.isShuttingDown()) {
                                        try {
                                            long quietPeriod =
                                                    Math.min(
                                                            1000,
                                                            config.getCloseTimeout().toMillis()
                                                                    / 10);
                                            long timeout = config.getCloseTimeout().toMillis();
                                            Future<?> shutdownFuture =
                                                    eventLoopGroup.shutdownGracefully(
                                                            quietPeriod,
                                                            timeout,
                                                            TimeUnit.MILLISECONDS);
                                            shutdownFuture.addListener(
                                                    groupFuture -> {
                                                        if (groupFuture.isSuccess()) {
                                                            LOGGER.debug(
                                                                    "EventLoopGroup shut down successfully");
                                                            closeFuture.complete(null);
                                                        } else {
                                                            LOGGER.error(
                                                                    "Error shutting down EventLoopGroup",
                                                                    groupFuture.cause());
                                                            closeFuture.completeExceptionally(
                                                                    groupFuture.cause());
                                                        }
                                                    });
                                        } catch (Exception e) {
                                            LOGGER.error(
                                                    "Error initiating EventLoopGroup shutdown", e);
                                            closeFuture.completeExceptionally(e);
                                        }
                                    } else {
                                        if (eventLoopGroup != null) {
                                            LOGGER.debug(
                                                    "EventLoopGroup already shutting down, skipping shutdown");
                                        }
                                        closeFuture.complete(null);
                                    }
                                });
            } catch (Exception e) {
                LOGGER.error("Error initiating channel close", e);
                closeFuture.completeExceptionally(e);
            }
        } else {
            LOGGER.debug("Channel is null, nothing to close");
            closeFuture.complete(null);
        }

        return closeFuture;
    }

    @Override
    public String getAssignedBusName() {
        if (channel == null) {
            return null;
        }
        var dbusString = channel.attr(DBusChannelAttribute.ASSIGNED_BUS_NAME).get();
        return dbusString != null ? dbusString.toString() : null;
    }
}