ErrorRecoveryManager.java

/*
 * SPDX-FileCopyrightText: 2025 Lucimber UG
 * SPDX-License-Identifier: Apache-2.0
 */
package com.lucimber.dbus.util;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Comprehensive error recovery manager for D-Bus operations.
 *
 * <p>Provides sophisticated error handling mechanisms including:
 *
 * <ul>
 *   <li>Exponential backoff with jitter for retry operations
 *   <li>Circuit breaker pattern for failing operations
 *   <li>Error classification for appropriate recovery strategies
 *   <li>Adaptive timeout based on historical performance
 *   <li>Resource cleanup and leak prevention
 * </ul>
 *
 * <p>This manager is designed to be thread-safe and can be shared across multiple connection
 * instances and operation types.
 */
public class ErrorRecoveryManager {

    private static final Logger LOGGER = LoggerFactory.getLogger(ErrorRecoveryManager.class);

    // Default configuration values
    private static final int DEFAULT_MAX_RETRIES = 3;
    private static final Duration DEFAULT_INITIAL_DELAY = Duration.ofMillis(100);
    private static final Duration DEFAULT_MAX_DELAY = Duration.ofSeconds(30);
    private static final double DEFAULT_BACKOFF_MULTIPLIER = 2.0;
    private static final double DEFAULT_JITTER_FACTOR = 0.1;

    // Circuit breaker configuration
    private static final int DEFAULT_FAILURE_THRESHOLD = 5;
    private static final Duration DEFAULT_RECOVERY_TIMEOUT = Duration.ofSeconds(60);
    private static final int DEFAULT_SUCCESS_THRESHOLD = 3;

    private final ScheduledExecutorService scheduler;
    private final boolean ownScheduler;

    /** Creates a new ErrorRecoveryManager with its own scheduler. */
    public ErrorRecoveryManager() {
        this.scheduler =
                Executors.newScheduledThreadPool(
                        2,
                        r -> {
                            Thread thread =
                                    new Thread(r, "ErrorRecovery-" + System.identityHashCode(this));
                            thread.setDaemon(true);
                            return thread;
                        });
        this.ownScheduler = true;
    }

    /**
     * Creates a new ErrorRecoveryManager using the provided scheduler.
     *
     * @param scheduler the scheduler to use for retry operations
     */
    public ErrorRecoveryManager(ScheduledExecutorService scheduler) {
        this.scheduler = scheduler;
        this.ownScheduler = false;
    }

    /**
     * Executes an operation with retry logic using exponential backoff.
     *
     * @param operation the operation to execute
     * @param config retry configuration
     * @param <T> the result type
     * @return CompletableFuture that completes with the operation result or failure
     */
    public <T> CompletableFuture<T> executeWithRetry(
            Supplier<CompletableFuture<T>> operation, RetryConfig config) {

        CompletableFuture<T> result = new CompletableFuture<>();
        executeWithRetryInternal(operation, config, 0, result);
        return result;
    }

    /**
     * Creates a circuit breaker for an operation.
     *
     * @param name unique name for the circuit breaker
     * @param config circuit breaker configuration
     * @return a new CircuitBreaker instance
     */
    public CircuitBreaker createCircuitBreaker(String name, CircuitBreakerConfig config) {
        return new CircuitBreaker(name, config, scheduler);
    }

    /**
     * Classifies an exception to determine the appropriate recovery strategy.
     *
     * @param throwable the exception to classify
     * @return the error classification
     */
    public static ErrorClassification classifyError(Throwable throwable) {
        if (throwable == null) {
            return ErrorClassification.UNKNOWN;
        }

        String className = throwable.getClass().getSimpleName().toLowerCase();
        String message = throwable.getMessage() != null ? throwable.getMessage().toLowerCase() : "";

        // Network/connection errors (potentially recoverable)
        if (className.contains("connect")
                || className.contains("timeout")
                || className.contains("socket")
                || message.contains("connection refused")
                || message.contains("timeout")
                || message.contains("network")) {
            return ErrorClassification.TRANSIENT;
        }

        // Authentication/authorization errors (not recoverable without intervention)
        if (className.contains("auth")
                || className.contains("security")
                || message.contains("authentication")
                || message.contains("authorization")
                || message.contains("permission")
                || message.contains("access denied")) {
            return ErrorClassification.AUTHENTICATION;
        }

        // Configuration/validation errors (not recoverable)
        if (className.contains("illegal")
                || className.contains("invalid")
                || className.contains("validation")
                || message.contains("invalid")
                || message.contains("malformed")
                || message.contains("configuration")) {
            return ErrorClassification.CONFIGURATION;
        }

        // Resource exhaustion (potentially recoverable after delay)
        if (className.contains("memory")
                || className.contains("resource")
                || message.contains("out of memory")
                || message.contains("resource")
                || message.contains("quota")
                || message.contains("limit exceeded")) {
            return ErrorClassification.RESOURCE_EXHAUSTION;
        }

        // Corruption/protocol errors (potentially recoverable)
        if (className.contains("corrupt")
                || className.contains("protocol")
                || message.contains("corrupt")
                || message.contains("protocol")
                || message.contains("invalid frame")
                || message.contains("parse")) {
            return ErrorClassification.PROTOCOL;
        }

        // Default to transient for unknown errors
        return ErrorClassification.TRANSIENT;
    }

    /**
     * Calculates the next delay with exponential backoff and jitter.
     *
     * @param attempt the current attempt number (0-based)
     * @param config retry configuration
     * @return the delay duration
     */
    public static Duration calculateBackoffDelay(int attempt, RetryConfig config) {
        double baseDelay = config.initialDelay.toMillis();
        double multipliedDelay = baseDelay * Math.pow(config.backoffMultiplier, attempt);

        // Cap at maximum delay
        double cappedDelay = Math.min(multipliedDelay, config.maxDelay.toMillis());

        // Add jitter to prevent thundering herd
        double jitter =
                cappedDelay
                        * config.jitterFactor
                        * (ThreadLocalRandom.current().nextDouble() - 0.5)
                        * 2;
        long finalDelay = Math.max(0, Math.round(cappedDelay + jitter));

        return Duration.ofMillis(finalDelay);
    }

    /** Shuts down the error recovery manager and releases resources. */
    public void shutdown() {
        if (ownScheduler) {
            scheduler.shutdown();
            try {
                if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                    scheduler.shutdownNow();
                }
            } catch (InterruptedException e) {
                scheduler.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }

    private <T> void executeWithRetryInternal(
            Supplier<CompletableFuture<T>> operation,
            RetryConfig config,
            int attempt,
            CompletableFuture<T> result) {

        try {
            CompletableFuture<T> operationFuture = operation.get();

            operationFuture.whenComplete(
                    (value, throwable) -> {
                        if (throwable == null) {
                            result.complete(value);
                        } else {
                            ErrorClassification classification = classifyError(throwable);

                            // Check if we should retry
                            if (shouldRetry(classification, attempt, config)) {
                                Duration delay = calculateBackoffDelay(attempt, config);

                                LOGGER.debug(
                                        "Retrying operation after {} ms (attempt {}/{}): {}",
                                        delay.toMillis(),
                                        attempt + 1,
                                        config.maxRetries,
                                        throwable.getMessage());

                                scheduler.schedule(
                                        () -> {
                                            executeWithRetryInternal(
                                                    operation, config, attempt + 1, result);
                                        },
                                        delay.toMillis(),
                                        TimeUnit.MILLISECONDS);
                            } else {
                                LOGGER.debug(
                                        "Not retrying operation (attempt {}/{}), classification: {}, error: {}",
                                        attempt + 1,
                                        config.maxRetries,
                                        classification,
                                        throwable.getMessage());
                                result.completeExceptionally(throwable);
                            }
                        }
                    });

        } catch (Exception e) {
            result.completeExceptionally(e);
        }
    }

    private static boolean shouldRetry(
            ErrorClassification classification, int attempt, RetryConfig config) {
        if (attempt >= config.maxRetries) {
            return false;
        }

        return switch (classification) {
            case TRANSIENT, RESOURCE_EXHAUSTION, PROTOCOL -> true;
            case AUTHENTICATION, CONFIGURATION -> false;
            case UNKNOWN ->
                    attempt < config.maxRetries / 2; // Conservative retry for unknown errors
        };
    }

    /** Configuration for retry operations. */
    public static class RetryConfig {
        public final int maxRetries;
        public final Duration initialDelay;
        public final Duration maxDelay;
        public final double backoffMultiplier;
        public final double jitterFactor;

        private RetryConfig(Builder builder) {
            this.maxRetries = builder.maxRetries;
            this.initialDelay = builder.initialDelay;
            this.maxDelay = builder.maxDelay;
            this.backoffMultiplier = builder.backoffMultiplier;
            this.jitterFactor = builder.jitterFactor;
        }

        public static Builder builder() {
            return new Builder();
        }

        public static RetryConfig defaultConfig() {
            return builder().build();
        }

        public static class Builder {
            private int maxRetries = DEFAULT_MAX_RETRIES;
            private Duration initialDelay = DEFAULT_INITIAL_DELAY;
            private Duration maxDelay = DEFAULT_MAX_DELAY;
            private double backoffMultiplier = DEFAULT_BACKOFF_MULTIPLIER;
            private double jitterFactor = DEFAULT_JITTER_FACTOR;

            public Builder maxRetries(int maxRetries) {
                this.maxRetries = maxRetries;
                return this;
            }

            public Builder initialDelay(Duration initialDelay) {
                this.initialDelay = initialDelay;
                return this;
            }

            public Builder maxDelay(Duration maxDelay) {
                this.maxDelay = maxDelay;
                return this;
            }

            public Builder backoffMultiplier(double backoffMultiplier) {
                this.backoffMultiplier = backoffMultiplier;
                return this;
            }

            public Builder jitterFactor(double jitterFactor) {
                this.jitterFactor = jitterFactor;
                return this;
            }

            public RetryConfig build() {
                return new RetryConfig(this);
            }
        }
    }

    /** Configuration for circuit breaker operations. */
    public static class CircuitBreakerConfig {
        public final int failureThreshold;
        public final Duration recoveryTimeout;
        public final int successThreshold;

        private CircuitBreakerConfig(Builder builder) {
            this.failureThreshold = builder.failureThreshold;
            this.recoveryTimeout = builder.recoveryTimeout;
            this.successThreshold = builder.successThreshold;
        }

        public static Builder builder() {
            return new Builder();
        }

        public static CircuitBreakerConfig defaultConfig() {
            return builder().build();
        }

        public static class Builder {
            private int failureThreshold = DEFAULT_FAILURE_THRESHOLD;
            private Duration recoveryTimeout = DEFAULT_RECOVERY_TIMEOUT;
            private int successThreshold = DEFAULT_SUCCESS_THRESHOLD;

            public Builder failureThreshold(int failureThreshold) {
                this.failureThreshold = failureThreshold;
                return this;
            }

            public Builder recoveryTimeout(Duration recoveryTimeout) {
                this.recoveryTimeout = recoveryTimeout;
                return this;
            }

            public Builder successThreshold(int successThreshold) {
                this.successThreshold = successThreshold;
                return this;
            }

            public CircuitBreakerConfig build() {
                return new CircuitBreakerConfig(this);
            }
        }
    }

    /** Circuit breaker implementation for preventing cascading failures. */
    public static class CircuitBreaker {
        public enum State {
            CLOSED,
            OPEN,
            HALF_OPEN
        }

        private final String name;
        private final CircuitBreakerConfig config;
        private final ScheduledExecutorService scheduler;

        private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);
        private final AtomicInteger failureCount = new AtomicInteger(0);
        private final AtomicInteger successCount = new AtomicInteger(0);
        private final AtomicLong lastFailureTime = new AtomicLong(0);

        private CircuitBreaker(
                String name, CircuitBreakerConfig config, ScheduledExecutorService scheduler) {
            this.name = name;
            this.config = config;
            this.scheduler = scheduler;
        }

        /**
         * Executes an operation through the circuit breaker.
         *
         * @param operation the operation to execute
         * @param <T> the result type
         * @return CompletableFuture that completes with the operation result
         */
        public <T> CompletableFuture<T> execute(Supplier<CompletableFuture<T>> operation) {
            if (state.get() == State.OPEN) {
                long timeSinceLastFailure = System.currentTimeMillis() - lastFailureTime.get();
                if (timeSinceLastFailure < config.recoveryTimeout.toMillis()) {
                    return CompletableFuture.failedFuture(
                            new CircuitBreakerOpenException(
                                    "Circuit breaker '" + name + "' is OPEN"));
                } else {
                    state.set(State.HALF_OPEN);
                    successCount.set(0);
                }
            }

            CompletableFuture<T> result = new CompletableFuture<>();

            try {
                CompletableFuture<T> operationFuture = operation.get();

                operationFuture.whenComplete(
                        (value, throwable) -> {
                            if (throwable == null) {
                                onSuccess();
                                result.complete(value);
                            } else {
                                onFailure();
                                result.completeExceptionally(throwable);
                            }
                        });

            } catch (Exception e) {
                onFailure();
                result.completeExceptionally(e);
            }

            return result;
        }

        /** Gets the current state of the circuit breaker. */
        public State getState() {
            return state.get();
        }

        /** Gets the current failure count. */
        public int getFailureCount() {
            return failureCount.get();
        }

        /** Resets the circuit breaker to CLOSED state. */
        public void reset() {
            state.set(State.CLOSED);
            failureCount.set(0);
            successCount.set(0);
        }

        private void onSuccess() {
            if (state.get() == State.HALF_OPEN) {
                int successes = successCount.incrementAndGet();
                if (successes >= config.successThreshold) {
                    state.set(State.CLOSED);
                    failureCount.set(0);
                    LOGGER.info(
                            "Circuit breaker '{}' transitioned to CLOSED after {} successes",
                            name,
                            successes);
                }
            } else if (state.get() == State.CLOSED) {
                failureCount.set(0); // Reset failure count on success
            }
        }

        private void onFailure() {
            int failures = failureCount.incrementAndGet();
            lastFailureTime.set(System.currentTimeMillis());

            if (state.get() == State.CLOSED && failures >= config.failureThreshold) {
                state.set(State.OPEN);
                LOGGER.warn(
                        "Circuit breaker '{}' transitioned to OPEN after {} failures",
                        name,
                        failures);

                // Schedule automatic transition to HALF_OPEN
                scheduler.schedule(
                        () -> {
                            if (state.get() == State.OPEN) {
                                state.set(State.HALF_OPEN);
                                successCount.set(0);
                                LOGGER.info(
                                        "Circuit breaker '{}' transitioned to HALF_OPEN for recovery testing",
                                        name);
                            }
                        },
                        config.recoveryTimeout.toMillis(),
                        TimeUnit.MILLISECONDS);

            } else if (state.get() == State.HALF_OPEN) {
                state.set(State.OPEN);
                LOGGER.warn(
                        "Circuit breaker '{}' returned to OPEN after failure during recovery",
                        name);
            }
        }
    }

    /** Error classification for determining retry strategies. */
    public enum ErrorClassification {
        /** Temporary errors that may succeed on retry */
        TRANSIENT,
        /** Authentication/authorization errors that need intervention */
        AUTHENTICATION,
        /** Configuration errors that need code/config changes */
        CONFIGURATION,
        /** Resource exhaustion that may recover after delay */
        RESOURCE_EXHAUSTION,
        /** Protocol/corruption errors that may succeed on retry */
        PROTOCOL,
        /** Unknown errors */
        UNKNOWN
    }

    /** Exception thrown when circuit breaker is in OPEN state. */
    public static class CircuitBreakerOpenException extends RuntimeException {
        public CircuitBreakerOpenException(String message) {
            super(message);
        }
    }
}