package org.apache.qpid.server.federation;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.BridgeConfig;
import org.apache.qpid.server.configuration.BridgeConfigType;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.flow.WindowCreditManager;
import org.apache.qpid.server.message.MessageMetaData_0_10;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.transport.ServerSession;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
import org.apache.qpid.transport.MessageFlowMode;
import org.apache.qpid.transport.MessageReject;
import org.apache.qpid.transport.MessageRejectCode;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.SessionListener;

/* loaded from: input_file:org/apache/qpid/server/federation/Bridge.class */
public class Bridge implements BridgeConfig {
    private final boolean _durable;
    private final boolean _dynamic;
    private final boolean _queueBridge;
    private final boolean _localSource;
    private final String _source;
    private final String _destination;
    private final String _key;
    private final String _tag;
    private final String _excludes;
    private final BrokerLink _link;
    private UUID _id;
    private Session _session;
    private BridgeImpl _delegate;
    private final int _bridgeNo;
    private long _createTime = System.currentTimeMillis();
    private AutoCommitTransaction _transaction = new AutoCommitTransaction(getVirtualHost().getMessageStore());

    /* loaded from: input_file:org/apache/qpid/server/federation/Bridge$AbstractPullBridge.class */
    private abstract class AbstractPullBridge implements BridgeImpl, SessionListener {
        private AbstractPullBridge() {
        }

        @Override // org.apache.qpid.server.federation.Bridge.BridgeImpl
        public final void setSession(Session session) {
            session.setSessionListener(this);
            onSession();
        }

        abstract void onSession();

        @Override // org.apache.qpid.transport.SessionListener
        public void message(Session session, MessageTransfer messageTransfer) {
            ArrayList<? extends BaseQueue> route;
            Exchange exchange = Bridge.this.getVirtualHost().getExchangeRegistry().getExchange(Bridge.this._destination);
            DeliveryProperties deliveryProperties = null;
            if (messageTransfer.getHeader() != null) {
                DeliveryProperties deliveryProperties2 = (DeliveryProperties) messageTransfer.getHeader().get(DeliveryProperties.class);
                deliveryProperties = deliveryProperties2;
                if (deliveryProperties2 != null && deliveryProperties.hasTtl() && !deliveryProperties.hasExpiration()) {
                    deliveryProperties.setExpiration(System.currentTimeMillis() + deliveryProperties.getTtl());
                }
            }
            StoredMessage addMessage = Bridge.this.getVirtualHost().getMessageStore().addMessage(new MessageMetaData_0_10(messageTransfer));
            addMessage.addContent(0, messageTransfer.getBody());
            addMessage.flushToStore();
            MessageTransferMessage messageTransferMessage = new MessageTransferMessage(addMessage, ((ServerSession) Bridge.this._session).getReference());
            ArrayList<? extends BaseQueue> route2 = exchange.route(messageTransferMessage);
            if (route2 != null && route2.size() != 0) {
                enqueue(messageTransferMessage, route2);
            } else if (deliveryProperties == null || !deliveryProperties.hasDiscardUnroutable() || !deliveryProperties.getDiscardUnroutable()) {
                if (messageTransfer.getAcceptMode() == MessageAcceptMode.EXPLICIT) {
                    RangeSet rangeSet = new RangeSet();
                    rangeSet.add(messageTransfer.getId());
                    session.invoke(new MessageReject(rangeSet, MessageRejectCode.UNROUTABLE, "Unroutable", new Option[0]));
                } else {
                    Exchange alternateExchange = exchange.getAlternateExchange();
                    if (alternateExchange != null && (route = alternateExchange.route(messageTransferMessage)) != null && route.size() != 0) {
                        enqueue(messageTransferMessage, route);
                    }
                }
            }
            session.processed(messageTransfer);
        }

        private void enqueue(final ServerMessage serverMessage, final ArrayList<? extends BaseQueue> arrayList) {
            Bridge.this._transaction.enqueue(arrayList, serverMessage, new ServerTransaction.Action() { // from class: org.apache.qpid.server.federation.Bridge.AbstractPullBridge.1
                BaseQueue[] _queues;

                {
                    this._queues = (BaseQueue[]) arrayList.toArray(new BaseQueue[arrayList.size()]);
                }

                @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                public void postCommit() {
                    for (int i = 0; i < this._queues.length; i++) {
                        try {
                            this._queues[i].enqueue(serverMessage);
                        } catch (AMQException e) {
                            throw new RuntimeException(e);
                        }
                    }
                }

                @Override // org.apache.qpid.server.txn.ServerTransaction.Action
                public void onRollback() {
                }
            });
        }

        @Override // org.apache.qpid.transport.SessionListener
        public void exception(Session session, SessionException sessionException) {
        }

        @Override // org.apache.qpid.transport.SessionListener
        public void closed(Session session) {
        }

        @Override // org.apache.qpid.transport.SessionListener
        public void opened(Session session) {
        }

        @Override // org.apache.qpid.transport.SessionListener
        public void resumed(Session session) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/qpid/server/federation/Bridge$BridgeImpl.class */
    public interface BridgeImpl {
        void setSession(Session session);

        void close();
    }

    /* loaded from: input_file:org/apache/qpid/server/federation/Bridge$DynamicExchangeBridge.class */
    private final class DynamicExchangeBridge extends AbstractPullBridge implements Exchange.BindingListener {
        private final String _tmpQueueName;
        private final ConcurrentMap<Binding, Binding> _bindings;

        private DynamicExchangeBridge() {
            super();
            this._tmpQueueName = "bridge_queue_" + Bridge.this._bridgeNo + "_" + Bridge.this._link.getFederationTag();
            this._bindings = new ConcurrentHashMap();
        }

        @Override // org.apache.qpid.server.federation.Bridge.AbstractPullBridge
        void onSession() {
            HashMap hashMap = new HashMap();
            hashMap.put("qpid.trace.exclude", Bridge.this._link.getFederationTag());
            hashMap.put("qpid.trace.id", Bridge.this._link.getRemoteFederationTag());
            Bridge.this._session.queueDeclare(this._tmpQueueName, null, hashMap, Option.AUTO_DELETE, Option.EXCLUSIVE);
            Bridge.this._session.sync();
            Map<String, Object> map = Collections.EMPTY_MAP;
            String valueOf = String.valueOf(Bridge.this._bridgeNo);
            Bridge.this._session.messageSubscribe(this._tmpQueueName, valueOf, MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, null, 0L, map, new Option[0]);
            Bridge.this._session.sync();
            Bridge.this._session.messageSetFlowMode(valueOf, MessageFlowMode.WINDOW, new Option[0]);
            Bridge.this._session.messageFlow(valueOf, MessageCreditUnit.MESSAGE, Bridge.this.getMessageWindowSize(), new Option[0]);
            Bridge.this._session.messageFlow(valueOf, MessageCreditUnit.BYTE, -1L, new Option[0]);
            Bridge.this._session.sync();
            Exchange exchange = Bridge.this.getVirtualHost().getExchangeRegistry().getExchange(Bridge.this._destination);
            exchange.addBindingListener(this);
            Iterator<Binding> it = exchange.getBindings().iterator();
            while (it.hasNext()) {
                propogateBinding(it.next());
            }
        }

        private void propogateBinding(Binding binding) {
            String str;
            if (this._bindings.putIfAbsent(binding, binding) == null) {
                HashMap hashMap = new HashMap(binding.getArguments());
                if (hashMap.get("qpid.fed.origin") == null) {
                    hashMap.put("qpid.fed.op", "");
                    hashMap.put("qpid.fed.origin", Bridge.this._link.getFederationTag());
                    hashMap.put("qpid.fed.tags", Bridge.this._link.getFederationTag());
                } else {
                    String str2 = (String) hashMap.get("qpid.fed.tags");
                    if (str2 == null) {
                        str = Bridge.this._link.getFederationTag();
                    } else if (Arrays.asList(str2.split(",")).contains(Bridge.this._link.getFederationTag())) {
                        return;
                    } else {
                        str = str2 + "," + Bridge.this._link.getFederationTag();
                    }
                    hashMap.put("qpid.fed.tags", str);
                }
                Bridge.this._session.exchangeBind(this._tmpQueueName, Bridge.this._source, binding.getBindingKey(), hashMap, new Option[0]);
                Bridge.this._session.sync();
            }
        }

        private void propogateBindingRemoval(Binding binding) {
            if (this._bindings.remove(binding) != null) {
                Bridge.this._session.exchangeUnbind(this._tmpQueueName, Bridge.this._source, binding.getBindingKey(), new Option[0]);
            }
        }

        @Override // org.apache.qpid.server.exchange.Exchange.BindingListener
        public void bindingAdded(Exchange exchange, Binding binding) {
            propogateBinding(binding);
        }

        @Override // org.apache.qpid.server.exchange.Exchange.BindingListener
        public void bindingRemoved(Exchange exchange, Binding binding) {
            propogateBindingRemoval(binding);
        }

        @Override // org.apache.qpid.server.federation.Bridge.BridgeImpl
        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/qpid/server/federation/Bridge$StaticExchangePullBridge.class */
    private final class StaticExchangePullBridge extends AbstractPullBridge {
        private final String _tmpQueueName;

        private StaticExchangePullBridge() {
            super();
            this._tmpQueueName = "bridge_queue_" + Bridge.this._bridgeNo + "_" + Bridge.this._link.getFederationTag();
        }

        @Override // org.apache.qpid.server.federation.Bridge.AbstractPullBridge
        public void onSession() {
            HashMap hashMap = new HashMap();
            hashMap.put("qpid.trace.exclude", Bridge.this._link.getFederationTag());
            hashMap.put("qpid.trace.id", Bridge.this._link.getRemoteFederationTag());
            Bridge.this._session.queueDeclare(this._tmpQueueName, null, hashMap, Option.AUTO_DELETE, Option.EXCLUSIVE);
            Bridge.this._session.sync();
            Bridge.this._session.exchangeBind(this._tmpQueueName, Bridge.this._source, Bridge.this._key, new HashMap(), new Option[0]);
            Bridge.this._session.sync();
            Map<String, Object> map = Collections.EMPTY_MAP;
            String valueOf = String.valueOf(Bridge.this._bridgeNo);
            Bridge.this._session.messageSubscribe(this._tmpQueueName, valueOf, MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, null, 0L, map, new Option[0]);
            Bridge.this._session.sync();
            Bridge.this._session.messageSetFlowMode(valueOf, MessageFlowMode.WINDOW, new Option[0]);
            Bridge.this._session.messageFlow(valueOf, MessageCreditUnit.MESSAGE, Bridge.this.getMessageWindowSize(), new Option[0]);
            Bridge.this._session.messageFlow(valueOf, MessageCreditUnit.BYTE, -1L, new Option[0]);
        }

        @Override // org.apache.qpid.server.federation.Bridge.BridgeImpl
        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/qpid/server/federation/Bridge$StaticExchangePushBridge.class */
    private class StaticExchangePushBridge implements BridgeImpl, SessionListener {
        private final String _tmpQueueName;
        private AMQQueue _queue;
        static final /* synthetic */ boolean $assertionsDisabled;

        private StaticExchangePushBridge() {
            this._tmpQueueName = "bridge_queue_" + Bridge.this._bridgeNo + "_" + Bridge.this._link.getFederationTag();
        }

        @Override // org.apache.qpid.server.federation.Bridge.BridgeImpl
        public void setSession(Session session) {
            if (!$assertionsDisabled && !(session instanceof ServerSession)) {
                throw new AssertionError();
            }
            session.setSessionListener(this);
            Exchange exchange = Bridge.this.getVirtualHost().getExchangeRegistry().getExchange(Bridge.this._source);
            HashMap hashMap = new HashMap();
            hashMap.put("qpid.trace.exclude", Bridge.this._link.getFederationTag());
            hashMap.put("qpid.trace.id", Bridge.this._link.getRemoteFederationTag());
            try {
                this._queue = AMQQueueFactory.createAMQQueueImpl(this._tmpQueueName, Bridge.this.isDurable(), Bridge.this._link.getFederationTag(), false, false, Bridge.this.getVirtualHost(), (Map<String, Object>) hashMap);
                Subscription_0_10 subscription_0_10 = new Subscription_0_10((ServerSession) session, Bridge.this._destination, MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, MessageFlowMode.WINDOW, new WindowCreditManager(-1L, Bridge.this.getMessageWindowSize()), null, null);
                ((ServerSession) session).register(Bridge.this._destination, subscription_0_10);
                try {
                    this._queue.registerSubscription(subscription_0_10, true);
                    Bridge.this.getVirtualHost().getBindingFactory().addBinding(Bridge.this._key, this._queue, exchange, Collections.emptyMap());
                } catch (AMQException e) {
                    throw new RuntimeException(e);
                }
            } catch (AMQException e2) {
                throw new RuntimeException(e2);
            }
        }

        @Override // org.apache.qpid.server.federation.Bridge.BridgeImpl
        public void close() {
        }

        @Override // org.apache.qpid.transport.SessionListener
        public void opened(Session session) {
        }

        @Override // org.apache.qpid.transport.SessionListener
        public void resumed(Session session) {
        }

        @Override // org.apache.qpid.transport.SessionListener
        public void message(Session session, MessageTransfer messageTransfer) {
        }

        @Override // org.apache.qpid.transport.SessionListener
        public void exception(Session session, SessionException sessionException) {
        }

        @Override // org.apache.qpid.transport.SessionListener
        public void closed(Session session) {
        }

        static {
            $assertionsDisabled = !Bridge.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/apache/qpid/server/federation/Bridge$StaticQueuePullBridge.class */
    private final class StaticQueuePullBridge extends AbstractPullBridge {
        private StaticQueuePullBridge() {
            super();
        }

        @Override // org.apache.qpid.server.federation.Bridge.AbstractPullBridge
        public void onSession() {
            Map<String, Object> map = Collections.EMPTY_MAP;
            String valueOf = String.valueOf(Bridge.this._bridgeNo);
            Bridge.this._session.messageSubscribe(Bridge.this._source, valueOf, MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, null, 0L, map, new Option[0]);
            Bridge.this._session.sync();
            Bridge.this._session.messageSetFlowMode(valueOf, MessageFlowMode.WINDOW, new Option[0]);
            Bridge.this._session.messageFlow(valueOf, MessageCreditUnit.MESSAGE, Bridge.this.getMessageWindowSize(), new Option[0]);
            Bridge.this._session.messageFlow(valueOf, MessageCreditUnit.BYTE, -1L, new Option[0]);
        }

        @Override // org.apache.qpid.server.federation.Bridge.BridgeImpl
        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/qpid/server/federation/Bridge$StaticQueuePushBridge.class */
    private class StaticQueuePushBridge implements BridgeImpl, SessionListener {
        private AMQQueue _queue;
        static final /* synthetic */ boolean $assertionsDisabled;

        private StaticQueuePushBridge() {
        }

        @Override // org.apache.qpid.server.federation.Bridge.BridgeImpl
        public void setSession(Session session) {
            if (!$assertionsDisabled && !(session instanceof ServerSession)) {
                throw new AssertionError();
            }
            session.setSessionListener(this);
            this._queue = Bridge.this.getVirtualHost().getQueueRegistry().getQueue(Bridge.this._source);
            Subscription_0_10 subscription_0_10 = new Subscription_0_10((ServerSession) session, Bridge.this._destination, MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, MessageFlowMode.WINDOW, new WindowCreditManager(-1L, Bridge.this.getMessageWindowSize()), null, null);
            ((ServerSession) session).register(Bridge.this._destination, subscription_0_10);
            try {
                this._queue.registerSubscription(subscription_0_10, false);
            } catch (AMQException e) {
                throw new RuntimeException(e);
            }
        }

        @Override // org.apache.qpid.server.federation.Bridge.BridgeImpl
        public void close() {
        }

        @Override // org.apache.qpid.transport.SessionListener
        public void opened(Session session) {
        }

        @Override // org.apache.qpid.transport.SessionListener
        public void resumed(Session session) {
        }

        @Override // org.apache.qpid.transport.SessionListener
        public void message(Session session, MessageTransfer messageTransfer) {
        }

        @Override // org.apache.qpid.transport.SessionListener
        public void exception(Session session, SessionException sessionException) {
        }

        @Override // org.apache.qpid.transport.SessionListener
        public void closed(Session session) {
        }

        static {
            $assertionsDisabled = !Bridge.class.desiredAssertionStatus();
        }
    }

    public Bridge(BrokerLink brokerLink, int i, boolean z, boolean z2, boolean z3, boolean z4, String str, String str2, String str3, String str4, String str5) {
        this._link = brokerLink;
        this._bridgeNo = i;
        this._durable = z;
        this._dynamic = z2;
        this._queueBridge = z3;
        this._localSource = z4;
        this._source = str;
        this._destination = str2;
        this._key = str3;
        this._tag = str4;
        this._excludes = str5;
        this._id = brokerLink.getConfigStore().createId();
        if (z2) {
            if (z4 || z3) {
                return;
            }
            this._delegate = new DynamicExchangeBridge();
            return;
        }
        if (z4) {
            if (z3) {
                this._delegate = new StaticQueuePushBridge();
                return;
            } else {
                this._delegate = new StaticExchangePushBridge();
                return;
            }
        }
        if (z3) {
            this._delegate = new StaticQueuePullBridge();
        } else {
            this._delegate = new StaticExchangePullBridge();
        }
    }

    @Override // org.apache.qpid.server.configuration.ConfiguredObject
    public UUID getId() {
        return this._id;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.qpid.server.configuration.ConfiguredObject
    public BridgeConfigType getConfigType() {
        return BridgeConfigType.getInstance();
    }

    @Override // org.apache.qpid.server.configuration.ConfiguredObject
    public ConfiguredObject<BridgeConfigType, BridgeConfig> getParent() {
        return getLink();
    }

    @Override // org.apache.qpid.server.configuration.ConfiguredObject
    public boolean isDurable() {
        return this._durable;
    }

    @Override // org.apache.qpid.server.configuration.BridgeConfig
    public boolean isDynamic() {
        return this._dynamic;
    }

    @Override // org.apache.qpid.server.configuration.BridgeConfig
    public boolean isQueueBridge() {
        return this._queueBridge;
    }

    @Override // org.apache.qpid.server.configuration.BridgeConfig
    public boolean isLocalSource() {
        return this._localSource;
    }

    @Override // org.apache.qpid.server.configuration.BridgeConfig
    public String getSource() {
        return this._source;
    }

    @Override // org.apache.qpid.server.configuration.BridgeConfig
    public String getDestination() {
        return this._destination;
    }

    @Override // org.apache.qpid.server.configuration.BridgeConfig
    public String getKey() {
        return this._key;
    }

    @Override // org.apache.qpid.server.configuration.BridgeConfig
    public String getTag() {
        return this._tag;
    }

    @Override // org.apache.qpid.server.configuration.BridgeConfig
    public String getExcludes() {
        return this._excludes;
    }

    @Override // org.apache.qpid.server.configuration.BridgeConfig
    public BrokerLink getLink() {
        return this._link;
    }

    @Override // org.apache.qpid.server.configuration.BridgeConfig
    public Integer getChannelId() {
        return Integer.valueOf(this._session == null ? 0 : this._session.getChannel());
    }

    @Override // org.apache.qpid.server.configuration.BridgeConfig
    public int getAckBatching() {
        return 0;
    }

    @Override // org.apache.qpid.server.configuration.ConfiguredObject
    public long getCreateTime() {
        return this._createTime;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        Bridge bridge = (Bridge) obj;
        if (this._durable != bridge._durable || this._dynamic != bridge._dynamic || this._localSource != bridge._localSource || this._queueBridge != bridge._queueBridge) {
            return false;
        }
        if (this._destination != null) {
            if (!this._destination.equals(bridge._destination)) {
                return false;
            }
        } else if (bridge._destination != null) {
            return false;
        }
        if (this._excludes != null) {
            if (!this._excludes.equals(bridge._excludes)) {
                return false;
            }
        } else if (bridge._excludes != null) {
            return false;
        }
        if (this._key != null) {
            if (!this._key.equals(bridge._key)) {
                return false;
            }
        } else if (bridge._key != null) {
            return false;
        }
        if (this._source != null) {
            if (!this._source.equals(bridge._source)) {
                return false;
            }
        } else if (bridge._source != null) {
            return false;
        }
        return this._tag != null ? this._tag.equals(bridge._tag) : bridge._tag == null;
    }

    public int hashCode() {
        return (31 * ((31 * ((31 * ((31 * ((31 * ((31 * ((31 * ((31 * (this._durable ? 1 : 0)) + (this._dynamic ? 1 : 0))) + (this._queueBridge ? 1 : 0))) + (this._localSource ? 1 : 0))) + (this._source != null ? this._source.hashCode() : 0))) + (this._destination != null ? this._destination.hashCode() : 0))) + (this._key != null ? this._key.hashCode() : 0))) + (this._tag != null ? this._tag.hashCode() : 0))) + (this._excludes != null ? this._excludes.hashCode() : 0);
    }

    public void setSession(Session session) {
        this._session = session;
        this._delegate.setSession(session);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getMessageWindowSize() {
        return 10L;
    }

    VirtualHost getVirtualHost() {
        return this._link.getVirtualHost();
    }

    public void close() {
        this._delegate.close();
        this._session = null;
    }
}
