ConnectionLifecycleManager.java

/*
 * SPDX-FileCopyrightText: 2025 Lucimber UG
 * SPDX-License-Identifier: Apache-2.0
 */
package com.lucimber.dbus.connection;

import com.lucimber.dbus.util.ErrorRecoveryManager;
import com.lucimber.dbus.util.ErrorRecoveryManager.CircuitBreaker;
import com.lucimber.dbus.util.ErrorRecoveryManager.CircuitBreakerConfig;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Manages the lifecycle of a connection including connect, disconnect, and state transitions. This
 * class encapsulates connection state management and ensures thread-safe state transitions.
 */
public class ConnectionLifecycleManager {

    private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionLifecycleManager.class);

    private final ConnectionConfig config;
    private final AtomicReference<ConnectionState> state;
    private final AtomicBoolean connecting;
    private final AtomicBoolean closing;
    private final ErrorRecoveryManager errorRecoveryManager;
    private final CircuitBreaker connectionCircuitBreaker;
    private final String connectionId;

    /**
     * Creates a new connection lifecycle manager.
     *
     * @param config the connection configuration
     * @param connectionId unique identifier for this connection
     */
    public ConnectionLifecycleManager(ConnectionConfig config, String connectionId) {
        this.config = Objects.requireNonNull(config, "config must not be null");
        this.connectionId = Objects.requireNonNull(connectionId, "connectionId must not be null");
        this.state = new AtomicReference<>(ConnectionState.DISCONNECTED);
        this.connecting = new AtomicBoolean(false);
        this.closing = new AtomicBoolean(false);

        // Initialize error recovery manager
        this.errorRecoveryManager = new ErrorRecoveryManager();

        // Create circuit breaker for connection operations
        CircuitBreakerConfig cbConfig =
                CircuitBreakerConfig.builder()
                        .failureThreshold(3)
                        .recoveryTimeout(config.getConnectTimeout().multipliedBy(2))
                        .successThreshold(2)
                        .build();
        this.connectionCircuitBreaker =
                errorRecoveryManager.createCircuitBreaker("connection-" + connectionId, cbConfig);
    }

    /**
     * Initiates a connection attempt.
     *
     * @param connectionSupplier supplier that performs the actual connection
     * @return completion stage that completes when connection is established
     */
    public CompletionStage<ConnectionHandle> connect(ConnectionSupplier connectionSupplier) {

        // Check if already connecting or connected
        if (!connecting.compareAndSet(false, true)) {
            return CompletableFuture.failedStage(
                    new IllegalStateException("Connection attempt already in progress"));
        }

        ConnectionState currentState = state.get();
        if (currentState == ConnectionState.CONNECTED) {
            connecting.set(false);
            return CompletableFuture.failedStage(new IllegalStateException("Already connected"));
        }

        if (closing.get()) {
            connecting.set(false);
            return CompletableFuture.failedStage(
                    new IllegalStateException("Connection is closing"));
        }

        LOGGER.info("Initiating connection for {}", connectionId);
        updateState(ConnectionState.CONNECTING);

        // Use circuit breaker for connection attempt
        return connectionCircuitBreaker.execute(
                () ->
                        connectionSupplier
                                .connect()
                                .toCompletableFuture()
                                .whenComplete(
                                        (handle, error) -> {
                                            connecting.set(false);
                                            if (error != null) {
                                                LOGGER.error(
                                                        "Connection failed for {}",
                                                        connectionId,
                                                        error);
                                                updateState(ConnectionState.FAILED);
                                            } else {
                                                LOGGER.info(
                                                        "Connection established for {}",
                                                        connectionId);
                                                updateState(ConnectionState.CONNECTED);
                                            }
                                        }));
    }

    /**
     * Initiates disconnection.
     *
     * @param disconnectionSupplier supplier that performs the actual disconnection
     * @return completion stage that completes when disconnection is done
     */
    public CompletionStage<Void> disconnect(DisconnectionSupplier disconnectionSupplier) {
        if (!closing.compareAndSet(false, true)) {
            return CompletableFuture.completedStage(null);
        }

        ConnectionState currentState = state.get();
        if (currentState == ConnectionState.DISCONNECTED) {
            return CompletableFuture.completedStage(null);
        }

        LOGGER.info("Initiating disconnection for {}", connectionId);
        // No DISCONNECTING state, so we keep current state

        return disconnectionSupplier
                .disconnect()
                .handle(
                        (result, error) -> {
                            if (error != null) {
                                LOGGER.warn(
                                        "Error during disconnection for {}", connectionId, error);
                            }
                            updateState(ConnectionState.DISCONNECTED);
                            closing.set(false);
                            return result;
                        });
    }

    /**
     * Gets the current connection state.
     *
     * @return current connection state
     */
    public ConnectionState getState() {
        return state.get();
    }

    /**
     * Checks if the connection is currently connected.
     *
     * @return true if connected, false otherwise
     */
    public boolean isConnected() {
        return state.get() == ConnectionState.CONNECTED;
    }

    /**
     * Checks if a connection attempt is in progress.
     *
     * @return true if connecting, false otherwise
     */
    public boolean isConnecting() {
        return connecting.get();
    }

    /**
     * Checks if disconnection is in progress.
     *
     * @return true if closing, false otherwise
     */
    public boolean isClosing() {
        return closing.get();
    }

    /**
     * Updates the connection state and notifies listeners.
     *
     * @param newState the new connection state
     */
    private void updateState(ConnectionState newState) {
        ConnectionState oldState = state.getAndSet(newState);
        if (oldState != newState) {
            LOGGER.debug(
                    "Connection {} state changed from {} to {}", connectionId, oldState, newState);
        }
    }

    /**
     * Gets the error recovery manager.
     *
     * @return error recovery manager
     */
    public ErrorRecoveryManager getErrorRecoveryManager() {
        return errorRecoveryManager;
    }

    /**
     * Gets the connection circuit breaker.
     *
     * @return circuit breaker for connection operations
     */
    public CircuitBreaker getConnectionCircuitBreaker() {
        return connectionCircuitBreaker;
    }

    /** Functional interface for connection operations. */
    @FunctionalInterface
    public interface ConnectionSupplier {
        /**
         * Performs the connection operation.
         *
         * @return completion stage with connection handle
         */
        CompletionStage<ConnectionHandle> connect();
    }

    /** Functional interface for disconnection operations. */
    @FunctionalInterface
    public interface DisconnectionSupplier {
        /**
         * Performs the disconnection operation.
         *
         * @return completion stage that completes when disconnected
         */
        CompletionStage<Void> disconnect();
    }
}