DefaultPipeline.java
/*
* SPDX-FileCopyrightText: 2023-2025 Lucimber UG
* SPDX-License-Identifier: Apache-2.0
*/
package com.lucimber.dbus.connection;
import com.lucimber.dbus.message.InboundMessage;
import com.lucimber.dbus.message.OutboundMessage;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* A thread-safe pipeline of {@link OutboundHandler} instances associated with a {@link Connection}.
* This class maintains the order of handlers and ensures propagation of read, write, exception, and
* user-defined events.
*/
public final class DefaultPipeline implements Pipeline {
private final Connection connection;
private final InternalContext head;
private final InternalContext tail;
private final ConcurrentMap<String, InternalContext> nameCtxMap = new ConcurrentHashMap<>();
/** Constructs a new pipeline for the given connection. */
public DefaultPipeline(Connection connection) {
this.connection = Objects.requireNonNull(connection, "connection must not be null");
head = new InternalContext(connection, this, "HEAD", new InternalHeadHandler());
tail = new InternalContext(connection, this, "TAIL", new InternalTailHandler());
head.setNext(tail);
tail.setPrev(head);
}
@Override
public synchronized Pipeline addLast(String name, Handler handler) {
if (nameCtxMap.containsKey(name)) {
throw new IllegalArgumentException("Handler name already exists: " + name);
}
final InternalContext newCtx = new InternalContext(connection, this, name, handler);
final InternalContext prev = tail.getPrev();
prev.setNext(newCtx);
newCtx.setPrev(prev);
newCtx.setNext(tail);
tail.setPrev(newCtx);
nameCtxMap.put(name, newCtx);
handler.onHandlerAdded(newCtx);
return this;
}
@Override
public Connection getConnection() {
return connection;
}
@Override
public synchronized Pipeline remove(String name) {
if (name == null || name.isBlank()) {
throw new IllegalArgumentException("Name must not be null or blank.");
}
if (name.equalsIgnoreCase("HEAD") || name.equalsIgnoreCase("TAIL")) {
throw new IllegalArgumentException("Removal of head or tail not allowed.");
}
final InternalContext ctx = nameCtxMap.remove(name);
if (ctx == null) {
throw new IllegalArgumentException("No such handler: " + name);
}
ctx.getHandler().onHandlerRemoved(ctx);
final InternalContext prev = ctx.getPrev();
final InternalContext next = ctx.getNext();
prev.setNext(next);
next.setPrev(prev);
ctx.setPrev(null);
ctx.setNext(null);
return this;
}
@Override
public void propagateConnectionActive() {
head.onConnectionActive();
}
@Override
public void propagateConnectionInactive() {
head.onConnectionInactive();
}
@Override
public void propagateInboundMessage(InboundMessage msg) {
Objects.requireNonNull(msg, "msg must not be null");
head.handleInboundMessage(msg);
}
@Override
public void propagateInboundFailure(Throwable cause) {
Objects.requireNonNull(cause, "cause must not be null");
head.handleInboundFailure(cause);
}
@Override
public void propagateOutboundMessage(OutboundMessage msg, CompletableFuture<Void> future) {
Objects.requireNonNull(msg, "msg must not be null");
Objects.requireNonNull(future, "future must not be null");
tail.handleOutboundMessage(msg, future);
}
}