NettyConnectionContext.java

/*
 * SPDX-FileCopyrightText: 2025 Lucimber UG
 * SPDX-License-Identifier: Apache-2.0
 */
package com.lucimber.dbus.netty;

import com.lucimber.dbus.connection.ConnectionContext;
import com.lucimber.dbus.connection.ConnectionState;
import com.lucimber.dbus.connection.Pipeline;
import com.lucimber.dbus.message.InboundMessage;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Netty-specific implementation of ConnectionContext that bridges the strategy pattern with the
 * existing NettyConnection infrastructure.
 */
final class NettyConnectionContext implements ConnectionContext {

    private static final Logger LOGGER = LoggerFactory.getLogger(NettyConnectionContext.class);

    private final Pipeline pipeline;
    private final ExecutorService applicationTaskExecutor;
    private final NettyConnection connection;

    NettyConnectionContext(
            Pipeline pipeline,
            ExecutorService applicationTaskExecutor,
            NettyConnection connection) {
        this.pipeline = Objects.requireNonNull(pipeline);
        this.applicationTaskExecutor = Objects.requireNonNull(applicationTaskExecutor);
        this.connection = Objects.requireNonNull(connection);
    }

    /**
     * Gets the underlying NettyConnection instance. This method is used by strategy implementations
     * to access the connection.
     *
     * @return the NettyConnection instance
     */
    NettyConnection getConnection() {
        return connection;
    }

    @Override
    public void onMessageReceived(InboundMessage message) {
        LOGGER.debug(
                "Message received via strategy pattern: {}", message.getClass().getSimpleName());

        // Submit to application task executor for processing
        applicationTaskExecutor.submit(
                () -> {
                    try {
                        // Route message through the pipeline
                        pipeline.propagateInboundMessage(message);
                    } catch (Exception e) {
                        LOGGER.error("Error processing inbound message", e);
                        onError(e);
                    }
                });
    }

    @Override
    public void onStateChanged(ConnectionState newState) {
        LOGGER.debug("Connection state changed to: {}", newState);

        // Notify the connection about state changes
        // This would integrate with the health handler if present
        connection.notifyStateChanged(newState);
    }

    @Override
    public void onError(Throwable error) {
        LOGGER.error("Connection error occurred", error);

        // Notify the connection about errors
        connection.notifyError(error);
    }

    @Override
    public void onConnectionEstablished() {
        LOGGER.info("Connection successfully established via strategy pattern");

        // Notify pipeline that connection is active - this will update health handler state
        pipeline.propagateConnectionActive();

        // Notify connection handlers
        connection.notifyConnectionEstablished();
    }

    @Override
    public void onConnectionLost() {
        LOGGER.warn("Connection lost");

        // Notify pipeline that connection is inactive
        pipeline.propagateConnectionInactive();

        // Trigger reconnection if enabled
        connection.notifyConnectionLost();
    }

    @Override
    public Pipeline getPipeline() {
        return pipeline;
    }
}