ConnectionThreadPoolManager.java

/*
 * SPDX-FileCopyrightText: 2025 Lucimber UG
 * SPDX-License-Identifier: Apache-2.0
 */
package com.lucimber.dbus.connection;

import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Manages thread pools for D-Bus connections. This class encapsulates thread pool creation,
 * configuration, and lifecycle management.
 */
public class ConnectionThreadPoolManager {

    private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionThreadPoolManager.class);
    private static final int MIN_THREADS = 2;
    private static final long SHUTDOWN_TIMEOUT_SECONDS = 30;

    private final String connectionId;
    private final ExecutorService applicationTaskExecutor;
    private final int threadPoolSize;

    /**
     * Creates a new thread pool manager with default configuration.
     *
     * @param connectionId unique identifier for the connection
     */
    public ConnectionThreadPoolManager(String connectionId) {
        this(connectionId, calculateDefaultPoolSize());
    }

    /**
     * Creates a new thread pool manager with specified thread pool size.
     *
     * @param connectionId unique identifier for the connection
     * @param threadPoolSize size of the thread pool
     */
    public ConnectionThreadPoolManager(String connectionId, int threadPoolSize) {
        this.connectionId = Objects.requireNonNull(connectionId, "connectionId must not be null");
        this.threadPoolSize = Math.max(MIN_THREADS, threadPoolSize);

        LOGGER.debug(
                "Creating thread pool for connection {} with {} threads",
                connectionId,
                this.threadPoolSize);

        this.applicationTaskExecutor =
                Executors.newFixedThreadPool(
                        this.threadPoolSize, new DBusThreadFactory(connectionId));
    }

    /**
     * Calculates the default thread pool size based on available processors.
     *
     * @return default thread pool size
     */
    private static int calculateDefaultPoolSize() {
        return Math.max(MIN_THREADS, Runtime.getRuntime().availableProcessors() / 2);
    }

    /**
     * Gets the application task executor.
     *
     * @return executor service for application tasks
     */
    public ExecutorService getApplicationTaskExecutor() {
        return applicationTaskExecutor;
    }

    /**
     * Gets the configured thread pool size.
     *
     * @return thread pool size
     */
    public int getThreadPoolSize() {
        return threadPoolSize;
    }

    /**
     * Shuts down the thread pools gracefully. This method attempts to shut down the executor
     * service gracefully, waiting for tasks to complete before forcing shutdown.
     */
    public void shutdown() {
        LOGGER.info("Shutting down thread pool for connection {}", connectionId);

        applicationTaskExecutor.shutdown();

        try {
            if (!applicationTaskExecutor.awaitTermination(
                    SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
                LOGGER.warn(
                        "Thread pool for connection {} did not terminate in {} seconds, forcing shutdown",
                        connectionId,
                        SHUTDOWN_TIMEOUT_SECONDS);
                applicationTaskExecutor.shutdownNow();

                if (!applicationTaskExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
                    LOGGER.error(
                            "Thread pool for connection {} did not terminate after forced shutdown",
                            connectionId);
                }
            }
        } catch (InterruptedException e) {
            LOGGER.warn(
                    "Interrupted while waiting for thread pool shutdown for connection {}",
                    connectionId,
                    e);
            applicationTaskExecutor.shutdownNow();
            Thread.currentThread().interrupt();
        }

        LOGGER.debug("Thread pool for connection {} shut down successfully", connectionId);
    }

    /**
     * Checks if the thread pools are shut down.
     *
     * @return true if shut down, false otherwise
     */
    public boolean isShutdown() {
        return applicationTaskExecutor.isShutdown();
    }

    /**
     * Checks if the thread pools are terminated.
     *
     * @return true if terminated, false otherwise
     */
    public boolean isTerminated() {
        return applicationTaskExecutor.isTerminated();
    }

    /** Custom thread factory for D-Bus worker threads. */
    private static class DBusThreadFactory implements ThreadFactory {
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DBusThreadFactory(String connectionId) {
            this.namePrefix = "dbus-" + connectionId + "-worker-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement());
            thread.setDaemon(true);
            if (thread.getPriority() != Thread.NORM_PRIORITY) {
                thread.setPriority(Thread.NORM_PRIORITY);
            }
            return thread;
        }
    }
}