/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EventAnnouncement;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PrioritizedDeque;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.BufferManager;
import org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteInputChannel
extends InputChannel {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteInputChannel.class);
    private static final int NONE = -1;
    private final InputChannelID id = new InputChannelID();
    private final ConnectionID connectionId;
    private final ConnectionManager connectionManager;
    private final PrioritizedDeque<SequenceBuffer> receivedBuffers = new PrioritizedDeque();
    private final AtomicBoolean isReleased = new AtomicBoolean();
    private volatile PartitionRequestClient partitionRequestClient;
    private int expectedSequenceNumber = 0;
    private final int initialCredit;
    private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
    private final BufferManager bufferManager;
    @GuardedBy(value="receivedBuffers")
    private int lastBarrierSequenceNumber = -1;
    @GuardedBy(value="receivedBuffers")
    private long lastBarrierId = -1L;
    private final ChannelStatePersister channelStatePersister;
    private long totalQueueSizeInBytes;

    public RemoteInputChannel(SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId, int consumedSubpartitionIndex, ConnectionID connectionId, ConnectionManager connectionManager, int initialBackOff, int maxBackoff, int networkBuffersPerChannel, Counter numBytesIn, Counter numBuffersIn, ChannelStateWriter stateWriter) {
        super(inputGate, channelIndex, partitionId, consumedSubpartitionIndex, initialBackOff, maxBackoff, numBytesIn, numBuffersIn);
        Preconditions.checkArgument(networkBuffersPerChannel >= 0, "Must be non-negative.");
        this.initialCredit = networkBuffersPerChannel;
        this.connectionId = Preconditions.checkNotNull(connectionId);
        this.connectionManager = Preconditions.checkNotNull(connectionManager);
        this.bufferManager = new BufferManager(inputGate.getMemorySegmentProvider(), this, 0);
        this.channelStatePersister = new ChannelStatePersister(stateWriter, this.getChannelInfo());
    }

    @VisibleForTesting
    void setExpectedSequenceNumber(int expectedSequenceNumber) {
        this.expectedSequenceNumber = expectedSequenceNumber;
    }

    @Override
    void setup() throws IOException {
        Preconditions.checkState(this.bufferManager.unsynchronizedGetAvailableExclusiveBuffers() == 0, "Bug in input channel setup logic: exclusive buffers have already been set for this input channel.");
        this.bufferManager.requestExclusiveBuffers(this.initialCredit);
    }

    @Override
    @VisibleForTesting
    public void requestSubpartition() throws IOException, InterruptedException {
        if (this.partitionRequestClient == null) {
            LOG.debug("{}: Requesting REMOTE subpartition {} of partition {}. {}", new Object[]{this, this.consumedSubpartitionIndex, this.partitionId, this.channelStatePersister});
            try {
                this.partitionRequestClient = this.connectionManager.createPartitionRequestClient(this.connectionId);
            }
            catch (IOException e) {
                throw new PartitionConnectionException(this.partitionId, (Throwable)e);
            }
            this.partitionRequestClient.requestSubpartition(this.partitionId, this.consumedSubpartitionIndex, this, 0);
        }
    }

    void retriggerSubpartitionRequest() throws IOException {
        this.checkPartitionRequestQueueInitialized();
        if (this.increaseBackoff()) {
            this.partitionRequestClient.requestSubpartition(this.partitionId, this.consumedSubpartitionIndex, this, this.getCurrentBackoff());
        } else {
            this.failPartitionRequest();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    Optional<InputChannel.BufferAndAvailability> getNextBuffer() throws IOException {
        Buffer.DataType nextDataType;
        SequenceBuffer next;
        this.checkPartitionRequestQueueInitialized();
        PrioritizedDeque<SequenceBuffer> prioritizedDeque = this.receivedBuffers;
        synchronized (prioritizedDeque) {
            next = this.receivedBuffers.poll();
            if (next != null) {
                this.totalQueueSizeInBytes -= (long)next.buffer.getSize();
            }
            nextDataType = this.receivedBuffers.peek() != null ? this.receivedBuffers.peek().buffer.getDataType() : Buffer.DataType.NONE;
        }
        if (next == null) {
            if (this.isReleased.get()) {
                throw new CancelTaskException("Queried for a buffer after channel has been released.");
            }
            return Optional.empty();
        }
        NetworkActionsLogger.traceInput("RemoteInputChannel#getNextBuffer", next.buffer, this.inputGate.getOwningTaskName(), this.channelInfo, this.channelStatePersister, next.sequenceNumber);
        this.numBytesIn.inc((long)next.buffer.getSize());
        this.numBuffersIn.inc();
        return Optional.of(new InputChannel.BufferAndAvailability(next.buffer, nextDataType, 0, next.sequenceNumber));
    }

    @Override
    void sendTaskEvent(TaskEvent event) throws IOException {
        Preconditions.checkState(!this.isReleased.get(), "Tried to send task event to producer after channel has been released.");
        this.checkPartitionRequestQueueInitialized();
        this.partitionRequestClient.sendTaskEvent(this.partitionId, event, this);
    }

    @Override
    public boolean isReleased() {
        return this.isReleased.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    void releaseAllResources() throws IOException {
        if (this.isReleased.compareAndSet(false, true)) {
            ArrayDeque releasedBuffers;
            PrioritizedDeque<SequenceBuffer> prioritizedDeque = this.receivedBuffers;
            synchronized (prioritizedDeque) {
                releasedBuffers = this.receivedBuffers.stream().map(sb -> sb.buffer).collect(Collectors.toCollection(ArrayDeque::new));
                this.receivedBuffers.clear();
            }
            this.bufferManager.releaseAllBuffers(releasedBuffers);
            if (this.partitionRequestClient != null) {
                this.partitionRequestClient.close(this);
            } else {
                this.connectionManager.closeOpenChannelConnections(this.connectionId);
            }
        }
    }

    @Override
    int getBuffersInUseCount() {
        return this.getNumberOfQueuedBuffers() + Math.max(0, this.bufferManager.getNumberOfRequiredBuffers() - this.initialCredit);
    }

    @Override
    void announceBufferSize(int newBufferSize) {
        try {
            this.notifyNewBufferSize(newBufferSize);
        }
        catch (Throwable t) {
            ExceptionUtils.rethrow(t);
        }
    }

    private void failPartitionRequest() {
        this.setError(new PartitionNotFoundException(this.partitionId));
    }

    public String toString() {
        return "RemoteInputChannel [" + this.partitionId + " at " + this.connectionId + "]";
    }

    private void notifyCreditAvailable() throws IOException {
        this.checkPartitionRequestQueueInitialized();
        this.partitionRequestClient.notifyCreditAvailable(this);
    }

    private void notifyNewBufferSize(int newBufferSize) throws IOException {
        Preconditions.checkState(!this.isReleased.get(), "Channel released.");
        this.checkPartitionRequestQueueInitialized();
        this.partitionRequestClient.notifyNewBufferSize(this, newBufferSize);
    }

    @VisibleForTesting
    public int getNumberOfAvailableBuffers() {
        return this.bufferManager.getNumberOfAvailableBuffers();
    }

    @VisibleForTesting
    public int getNumberOfRequiredBuffers() {
        return this.bufferManager.unsynchronizedGetNumberOfRequiredBuffers();
    }

    @VisibleForTesting
    public int getSenderBacklog() {
        return this.getNumberOfRequiredBuffers() - this.initialCredit;
    }

    @VisibleForTesting
    boolean isWaitingForFloatingBuffers() {
        return this.bufferManager.unsynchronizedIsWaitingForFloatingBuffers();
    }

    @VisibleForTesting
    public Buffer getNextReceivedBuffer() {
        SequenceBuffer sequenceBuffer = this.receivedBuffers.poll();
        return sequenceBuffer != null ? sequenceBuffer.buffer : null;
    }

    @VisibleForTesting
    BufferManager getBufferManager() {
        return this.bufferManager;
    }

    @VisibleForTesting
    PartitionRequestClient getPartitionRequestClient() {
        return this.partitionRequestClient;
    }

    @Override
    public void notifyBufferAvailable(int numAvailableBuffers) throws IOException {
        if (numAvailableBuffers > 0 && this.unannouncedCredit.getAndAdd(numAvailableBuffers) == 0) {
            this.notifyCreditAvailable();
        }
    }

    @Override
    public void resumeConsumption() throws IOException {
        Preconditions.checkState(!this.isReleased.get(), "Channel released.");
        this.checkPartitionRequestQueueInitialized();
        if (this.initialCredit == 0) {
            this.unannouncedCredit.set(0);
        }
        this.partitionRequestClient.resumeConsumption(this);
    }

    @Override
    public void acknowledgeAllRecordsProcessed() throws IOException {
        Preconditions.checkState(!this.isReleased.get(), "Channel released.");
        this.checkPartitionRequestQueueInitialized();
        this.partitionRequestClient.acknowledgeAllRecordsProcessed(this);
    }

    private void onBlockingUpstream() {
        if (this.initialCredit == 0) {
            this.bufferManager.releaseFloatingBuffers();
        }
    }

    public int getUnannouncedCredit() {
        return this.unannouncedCredit.get();
    }

    public int getAndResetUnannouncedCredit() {
        return this.unannouncedCredit.getAndSet(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getNumberOfQueuedBuffers() {
        PrioritizedDeque<SequenceBuffer> prioritizedDeque = this.receivedBuffers;
        synchronized (prioritizedDeque) {
            return this.receivedBuffers.size();
        }
    }

    @Override
    public int unsynchronizedGetNumberOfQueuedBuffers() {
        return Math.max(0, this.receivedBuffers.size());
    }

    @Override
    public long unsynchronizedGetSizeOfQueuedBuffers() {
        return Math.max(0L, this.totalQueueSizeInBytes);
    }

    public int unsynchronizedGetExclusiveBuffersUsed() {
        return Math.max(0, this.initialCredit - this.bufferManager.unsynchronizedGetAvailableExclusiveBuffers());
    }

    public int unsynchronizedGetFloatingBuffersAvailable() {
        return Math.max(0, this.bufferManager.unsynchronizedGetFloatingBuffersAvailable());
    }

    public InputChannelID getInputChannelId() {
        return this.id;
    }

    public int getInitialCredit() {
        return this.initialCredit;
    }

    public BufferProvider getBufferProvider() throws IOException {
        if (this.isReleased.get()) {
            return null;
        }
        return this.inputGate.getBufferProvider();
    }

    @Nullable
    public Buffer requestBuffer() {
        return this.bufferManager.requestBuffer();
    }

    public void onSenderBacklog(int backlog) throws IOException {
        this.notifyBufferAvailable(this.bufferManager.requestFloatingBuffers(backlog + this.initialCredit));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
        boolean recycleBuffer = true;
        try {
            boolean wasEmpty;
            if (this.expectedSequenceNumber != sequenceNumber) {
                this.onError(new BufferReorderingException(this.expectedSequenceNumber, sequenceNumber));
                return;
            }
            if (buffer.getDataType().isBlockingUpstream()) {
                this.onBlockingUpstream();
                Preconditions.checkArgument(backlog == 0, "Illegal number of backlog: %s, should be 0.", backlog);
            }
            boolean firstPriorityEvent = false;
            PrioritizedDeque<SequenceBuffer> prioritizedDeque = this.receivedBuffers;
            synchronized (prioritizedDeque) {
                block19: {
                    NetworkActionsLogger.traceInput("RemoteInputChannel#onBuffer", buffer, this.inputGate.getOwningTaskName(), this.channelInfo, this.channelStatePersister, sequenceNumber);
                    if (!this.isReleased.get()) break block19;
                    return;
                }
                wasEmpty = this.receivedBuffers.isEmpty();
                SequenceBuffer sequenceBuffer = new SequenceBuffer(buffer, sequenceNumber);
                Buffer.DataType dataType = buffer.getDataType();
                if (dataType.hasPriority()) {
                    firstPriorityEvent = this.addPriorityBuffer(sequenceBuffer);
                    recycleBuffer = false;
                } else {
                    this.receivedBuffers.add(sequenceBuffer);
                    recycleBuffer = false;
                    if (dataType.requiresAnnouncement()) {
                        firstPriorityEvent = this.addPriorityBuffer(this.announce(sequenceBuffer));
                    }
                }
                this.totalQueueSizeInBytes += (long)buffer.getSize();
                OptionalLong barrierId = this.channelStatePersister.checkForBarrier(sequenceBuffer.buffer);
                if (barrierId.isPresent() && barrierId.getAsLong() > this.lastBarrierId) {
                    this.lastBarrierId = barrierId.getAsLong();
                    this.lastBarrierSequenceNumber = sequenceBuffer.sequenceNumber;
                }
                this.channelStatePersister.maybePersist(buffer);
                ++this.expectedSequenceNumber;
            }
            if (firstPriorityEvent) {
                this.notifyPriorityEvent(sequenceNumber);
            }
            if (wasEmpty) {
                this.notifyChannelNonEmpty();
            }
            if (backlog >= 0) {
                this.onSenderBacklog(backlog);
            }
        }
        finally {
            if (recycleBuffer) {
                buffer.recycleBuffer();
            }
        }
    }

    private boolean addPriorityBuffer(SequenceBuffer sequenceBuffer) {
        this.receivedBuffers.addPriorityElement(sequenceBuffer);
        return this.receivedBuffers.getNumPriorityElements() == 1;
    }

    private SequenceBuffer announce(SequenceBuffer sequenceBuffer) throws IOException {
        Preconditions.checkState(!sequenceBuffer.buffer.isBuffer(), "Only a CheckpointBarrier can be announced but found %s", sequenceBuffer.buffer);
        this.checkAnnouncedOnlyOnce(sequenceBuffer);
        AbstractEvent event = EventSerializer.fromBuffer(sequenceBuffer.buffer, this.getClass().getClassLoader());
        Preconditions.checkState(event instanceof CheckpointBarrier, "Only a CheckpointBarrier can be announced but found %s", sequenceBuffer.buffer);
        CheckpointBarrier barrier = (CheckpointBarrier)event;
        return new SequenceBuffer(EventSerializer.toBuffer(new EventAnnouncement(barrier, sequenceBuffer.sequenceNumber), true), sequenceBuffer.sequenceNumber);
    }

    private void checkAnnouncedOnlyOnce(SequenceBuffer sequenceBuffer) {
        Iterator<SequenceBuffer> iterator = this.receivedBuffers.iterator();
        int count = 0;
        while (iterator.hasNext()) {
            if (iterator.next().sequenceNumber != sequenceBuffer.sequenceNumber) continue;
            ++count;
        }
        Preconditions.checkState(count == 1, "Before enqueuing the announcement there should be exactly single occurrence of the buffer, but found [%d]", count);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException {
        PrioritizedDeque<SequenceBuffer> prioritizedDeque = this.receivedBuffers;
        synchronized (prioritizedDeque) {
            if (barrier.getId() < this.lastBarrierId) {
                throw new CheckpointException(String.format("Sequence number for checkpoint %d is not known (it was likely been overwritten by a newer checkpoint %d)", barrier.getId(), this.lastBarrierId), CheckpointFailureReason.CHECKPOINT_SUBSUMED);
            }
            if (barrier.getId() > this.lastBarrierId) {
                this.resetLastBarrier();
            }
            this.channelStatePersister.startPersisting(barrier.getId(), this.getInflightBuffersUnsafe(barrier.getId()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void checkpointStopped(long checkpointId) {
        PrioritizedDeque<SequenceBuffer> prioritizedDeque = this.receivedBuffers;
        synchronized (prioritizedDeque) {
            this.channelStatePersister.stopPersisting(checkpointId);
            if (this.lastBarrierId == checkpointId) {
                this.resetLastBarrier();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    List<Buffer> getInflightBuffers(long checkpointId) {
        PrioritizedDeque<SequenceBuffer> prioritizedDeque = this.receivedBuffers;
        synchronized (prioritizedDeque) {
            return this.getInflightBuffersUnsafe(checkpointId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void convertToPriorityEvent(int sequenceNumber) throws IOException {
        boolean firstPriorityEvent;
        PrioritizedDeque<SequenceBuffer> prioritizedDeque = this.receivedBuffers;
        synchronized (prioritizedDeque) {
            Preconditions.checkState(this.channelStatePersister.hasBarrierReceived());
            int numPriorityElementsBeforeRemoval = this.receivedBuffers.getNumPriorityElements();
            SequenceBuffer toPrioritize = this.receivedBuffers.getAndRemove(sequenceBuffer -> sequenceBuffer.sequenceNumber == sequenceNumber);
            Preconditions.checkState(this.lastBarrierSequenceNumber == sequenceNumber);
            Preconditions.checkState(!toPrioritize.buffer.isBuffer());
            Preconditions.checkState(numPriorityElementsBeforeRemoval == this.receivedBuffers.getNumPriorityElements(), "Attempted to convertToPriorityEvent an event [%s] that has already been prioritized [%s]", toPrioritize, numPriorityElementsBeforeRemoval);
            AbstractEvent e = EventSerializer.fromBuffer(toPrioritize.buffer, this.getClass().getClassLoader());
            toPrioritize.buffer.setReaderIndex(0);
            toPrioritize = new SequenceBuffer(EventSerializer.toBuffer(e, true), toPrioritize.sequenceNumber);
            firstPriorityEvent = this.addPriorityBuffer(toPrioritize);
        }
        if (firstPriorityEvent) {
            this.notifyPriorityEventForce();
        }
    }

    private void notifyPriorityEventForce() {
        this.inputGate.notifyPriorityEventForce(this);
    }

    private List<Buffer> getInflightBuffersUnsafe(long checkpointId) {
        assert (Thread.holdsLock(this.receivedBuffers));
        Preconditions.checkState(checkpointId == this.lastBarrierId || this.lastBarrierId == -1L);
        ArrayList<Buffer> inflightBuffers = new ArrayList<Buffer>();
        Iterator<SequenceBuffer> iterator = this.receivedBuffers.iterator();
        Iterators.advance(iterator, this.receivedBuffers.getNumPriorityElements());
        while (iterator.hasNext()) {
            SequenceBuffer sequenceBuffer = iterator.next();
            if (!sequenceBuffer.buffer.isBuffer()) continue;
            if (!this.shouldBeSpilled(sequenceBuffer.sequenceNumber)) break;
            inflightBuffers.add(sequenceBuffer.buffer.retainBuffer());
        }
        return inflightBuffers;
    }

    private void resetLastBarrier() {
        this.lastBarrierId = -1L;
        this.lastBarrierSequenceNumber = -1;
    }

    private boolean shouldBeSpilled(int sequenceNumber) {
        boolean possibleOverflowBeforeOvertaking;
        if (this.lastBarrierSequenceNumber == -1) {
            return true;
        }
        Preconditions.checkState(this.receivedBuffers.size() < 0x3FFFFFFF, "Too many buffers for sequenceNumber overflow detection code to work correctly");
        boolean possibleOverflowAfterOvertaking = 0x3FFFFFFF < this.lastBarrierSequenceNumber;
        boolean bl = possibleOverflowBeforeOvertaking = this.lastBarrierSequenceNumber < -1073741823;
        if (possibleOverflowAfterOvertaking) {
            return sequenceNumber < this.lastBarrierSequenceNumber && sequenceNumber > 0;
        }
        if (possibleOverflowBeforeOvertaking) {
            return sequenceNumber < this.lastBarrierSequenceNumber || sequenceNumber > 0;
        }
        return sequenceNumber < this.lastBarrierSequenceNumber;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onEmptyBuffer(int sequenceNumber, int backlog) throws IOException {
        boolean success = false;
        PrioritizedDeque<SequenceBuffer> prioritizedDeque = this.receivedBuffers;
        synchronized (prioritizedDeque) {
            if (!this.isReleased.get()) {
                if (this.expectedSequenceNumber == sequenceNumber) {
                    ++this.expectedSequenceNumber;
                    success = true;
                } else {
                    this.onError(new BufferReorderingException(this.expectedSequenceNumber, sequenceNumber));
                }
            }
        }
        if (success && backlog >= 0) {
            this.onSenderBacklog(backlog);
        }
    }

    public void onFailedPartitionRequest() {
        this.inputGate.triggerPartitionStateCheck(this.partitionId, this.consumedSubpartitionIndex);
    }

    public void onError(Throwable cause) {
        this.setError(cause);
    }

    private void checkPartitionRequestQueueInitialized() throws IOException {
        this.checkError();
        Preconditions.checkState(this.partitionRequestClient != null, "Bug: partitionRequestClient is not initialized before processing data and no error is detected.");
    }

    private static final class SequenceBuffer {
        final Buffer buffer;
        final int sequenceNumber;

        private SequenceBuffer(Buffer buffer, int sequenceNumber) {
            this.buffer = buffer;
            this.sequenceNumber = sequenceNumber;
        }

        public String toString() {
            return String.format("SequenceBuffer(isEvent = %s, dataType = %s, sequenceNumber = %s)", new Object[]{!this.buffer.isBuffer(), this.buffer.getDataType(), this.sequenceNumber});
        }
    }

    private static class BufferReorderingException
    extends IOException {
        private static final long serialVersionUID = -888282210356266816L;
        private final int expectedSequenceNumber;
        private final int actualSequenceNumber;

        BufferReorderingException(int expectedSequenceNumber, int actualSequenceNumber) {
            this.expectedSequenceNumber = expectedSequenceNumber;
            this.actualSequenceNumber = actualSequenceNumber;
        }

        @Override
        public String getMessage() {
            return String.format("Buffer re-ordering: expected buffer with sequence number %d, but received %d.", this.expectedSequenceNumber, this.actualSequenceNumber);
        }
    }
}

