/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName;
import org.apache.kafka.clients.consumer.internals.MemberState;
import org.apache.kafka.clients.consumer.internals.MemberStateListener;
import org.apache.kafka.clients.consumer.internals.MembershipManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.Utils;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

public class MembershipManagerImpl
implements MembershipManager,
ClusterResourceListener {
    static final Utils.TopicPartitionComparator TOPIC_PARTITION_COMPARATOR = new Utils.TopicPartitionComparator();
    static final Utils.TopicIdPartitionComparator TOPIC_ID_PARTITION_COMPARATOR = new Utils.TopicIdPartitionComparator();
    private final String groupId;
    private final Optional<String> groupInstanceId;
    private final int rebalanceTimeoutMs;
    private String memberId = "";
    private int memberEpoch = 0;
    private MemberState state;
    private final Optional<String> serverAssignor;
    private Map<Uuid, SortedSet<Integer>> currentAssignment;
    private final SubscriptionState subscriptions;
    private final ConsumerMetadata metadata;
    private final Logger log;
    private final CommitRequestManager commitRequestManager;
    private final Map<Uuid, String> assignedTopicNamesCache;
    private final Map<Uuid, SortedSet<Integer>> assignmentUnresolved;
    private final SortedSet<TopicIdPartition> assignmentReadyToReconcile;
    private boolean reconciliationInProgress;
    private int memberEpochOnReconciliationStart;
    private Optional<CompletableFuture<Void>> leaveGroupInProgress = Optional.empty();
    private boolean isRegisteredForMetadataUpdates;
    private final List<MemberStateListener> stateUpdatesListeners;
    private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
    private final BackgroundEventHandler backgroundEventHandler;
    private final Time time;

    public MembershipManagerImpl(String groupId, Optional<String> groupInstanceId, int rebalanceTimeoutMs, Optional<String> serverAssignor, SubscriptionState subscriptions, CommitRequestManager commitRequestManager, ConsumerMetadata metadata, LogContext logContext, Optional<ClientTelemetryReporter> clientTelemetryReporter, BackgroundEventHandler backgroundEventHandler, Time time) {
        this.groupId = groupId;
        this.state = MemberState.UNSUBSCRIBED;
        this.serverAssignor = serverAssignor;
        this.groupInstanceId = groupInstanceId;
        this.subscriptions = subscriptions;
        this.commitRequestManager = commitRequestManager;
        this.metadata = metadata;
        this.assignedTopicNamesCache = new HashMap<Uuid, String>();
        this.assignmentUnresolved = new HashMap<Uuid, SortedSet<Integer>>();
        this.assignmentReadyToReconcile = new TreeSet<TopicIdPartition>(TOPIC_ID_PARTITION_COMPARATOR);
        this.currentAssignment = new HashMap<Uuid, SortedSet<Integer>>();
        this.log = logContext.logger(MembershipManagerImpl.class);
        this.stateUpdatesListeners = new ArrayList<MemberStateListener>();
        this.clientTelemetryReporter = clientTelemetryReporter;
        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
        this.backgroundEventHandler = backgroundEventHandler;
        this.time = time;
    }

    private void transitionTo(MemberState nextState) {
        if (!this.state.equals((Object)nextState) && !nextState.getPreviousValidStates().contains((Object)this.state)) {
            throw new IllegalStateException(String.format("Invalid state transition from %s to %s", new Object[]{this.state, nextState}));
        }
        this.log.trace("Member {} with epoch {} transitioned from {} to {}.", new Object[]{this.memberId, this.memberEpoch, this.state, nextState});
        this.state = nextState;
    }

    @Override
    public String groupId() {
        return this.groupId;
    }

    @Override
    public Optional<String> groupInstanceId() {
        return this.groupInstanceId;
    }

    @Override
    public String memberId() {
        return this.memberId;
    }

    @Override
    public int memberEpoch() {
        return this.memberEpoch;
    }

    @Override
    public boolean isStaled() {
        return this.state == MemberState.STALE;
    }

    @Override
    public void onHeartbeatResponseReceived(ConsumerGroupHeartbeatResponseData response) {
        if (response.errorCode() != Errors.NONE.code()) {
            String errorMessage = String.format("Unexpected error in Heartbeat response. Expected no error, but received: %s", new Object[]{Errors.forCode(response.errorCode())});
            throw new IllegalArgumentException(errorMessage);
        }
        if (response.memberId() != null && !response.memberId().equals(this.memberId)) {
            this.clientTelemetryReporter.ifPresent(reporter -> reporter.updateMetricsLabels(Collections.singletonMap("group_member_id", response.memberId())));
        }
        this.memberId = response.memberId();
        this.updateMemberEpoch(response.memberEpoch());
        ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment();
        if (assignment != null) {
            if (!this.state.canHandleNewAssignment()) {
                this.log.debug("Ignoring new assignment {} received from server because member is in {} state.", (Object)assignment, (Object)this.state);
                return;
            }
            this.processAssignmentReceived(assignment);
        } else if (this.allPendingAssignmentsReconciled()) {
            this.transitionTo(MemberState.STABLE);
        }
    }

    private void processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
        this.replaceUnresolvedAssignmentWithNewAssignment(assignment);
        if (!this.assignmentUnresolved.equals(this.currentAssignment)) {
            this.transitionTo(MemberState.RECONCILING);
            this.assignmentReadyToReconcile.clear();
            this.resolveMetadataForUnresolvedAssignment();
            this.reconcile();
        } else {
            this.log.debug("Target assignment {} received from the broker is equals to the member current assignment {}. Nothing to reconcile.", (Object)this.assignmentUnresolved, (Object)this.currentAssignment);
            if (this.state == MemberState.RECONCILING || this.state == MemberState.JOINING) {
                this.transitionTo(MemberState.STABLE);
            }
        }
    }

    private void replaceUnresolvedAssignmentWithNewAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
        this.assignmentUnresolved.clear();
        assignment.topicPartitions().forEach(topicPartitions -> {
            SortedSet cfr_ignored_0 = this.assignmentUnresolved.put(topicPartitions.topicId(), new TreeSet<Integer>(topicPartitions.partitions()));
        });
    }

    @Override
    public void transitionToFenced() {
        if (this.state == MemberState.PREPARE_LEAVING) {
            this.log.debug("Member {} with epoch {} got fenced but it is already preparing to leave the group, so it will stop sending heartbeat and won't attempt to rejoin.", (Object)this.memberId, (Object)this.memberEpoch);
            this.transitionTo(MemberState.UNSUBSCRIBED);
            return;
        }
        if (this.state == MemberState.LEAVING) {
            this.log.debug("Member {} with epoch {} got fenced but it is already leaving the group with state {}, so it won't attempt to rejoin.", new Object[]{this.memberId, this.memberEpoch, this.state});
            return;
        }
        if (this.state == MemberState.UNSUBSCRIBED) {
            this.log.debug("Member {} with epoch {} got fenced but it already left the group, so it won't attempt to rejoin.", (Object)this.memberId, (Object)this.memberEpoch);
            return;
        }
        this.transitionTo(MemberState.FENCED);
        this.resetEpoch();
        this.log.debug("Member {} with epoch {} transitioned to {} state. It will release its assignment and rejoin the group.", new Object[]{this.memberId, this.memberEpoch, MemberState.FENCED});
        CompletableFuture<Void> callbackResult = this.invokeOnPartitionsLostCallback(this.subscriptions.assignedPartitions());
        callbackResult.whenComplete((result, error) -> {
            if (error != null) {
                this.log.error("onPartitionsLost callback invocation failed while releasing assignment after member got fenced. Member will rejoin the group anyways.", (Throwable)error);
            }
            this.updateSubscription(new TreeSet<TopicIdPartition>(TOPIC_ID_PARTITION_COMPARATOR), true);
            this.transitionToJoining();
        });
    }

    @Override
    public void transitionToFatal() {
        MemberState previousState = this.state;
        this.transitionTo(MemberState.FATAL);
        this.log.error("Member {} with epoch {} transitioned to {} state", new Object[]{this.memberId, this.memberEpoch, MemberState.FATAL});
        this.notifyEpochChange(Optional.empty(), Optional.empty());
        if (previousState == MemberState.UNSUBSCRIBED) {
            this.log.debug("Member {} with epoch {} got fatal error from the broker but it already left the group, so onPartitionsLost callback won't be triggered.", (Object)this.memberId, (Object)this.memberEpoch);
            return;
        }
        CompletableFuture<Void> callbackResult = this.invokeOnPartitionsLostCallback(this.subscriptions.assignedPartitions());
        callbackResult.whenComplete((result, error) -> {
            if (error != null) {
                this.log.error("onPartitionsLost callback invocation failed while releasing assignmentafter member failed with fatal error.", (Throwable)error);
            }
            this.updateSubscription(new TreeSet<TopicIdPartition>(TOPIC_ID_PARTITION_COMPARATOR), true);
        });
    }

    @Override
    public void onSubscriptionUpdated() {
        if (this.state == MemberState.UNSUBSCRIBED) {
            this.transitionToJoining();
        }
    }

    private void updateSubscription(SortedSet<TopicIdPartition> assignedPartitions, boolean clearAssignments) {
        SortedSet<TopicPartition> assignedTopicPartitions = this.toTopicPartitionSet(assignedPartitions);
        this.subscriptions.assignFromSubscribed(assignedTopicPartitions);
        this.updateCurrentAssignment(assignedPartitions);
        if (clearAssignments) {
            this.clearPendingAssignmentsAndLocalNamesCache();
        }
    }

    @Override
    public void transitionToJoining() {
        if (this.state == MemberState.FATAL) {
            this.log.warn("No action taken to join the group with the updated subscription because the member is in FATAL state");
            return;
        }
        this.resetEpoch();
        this.transitionTo(MemberState.JOINING);
        this.clearPendingAssignmentsAndLocalNamesCache();
        this.registerForMetadataUpdates();
    }

    private void registerForMetadataUpdates() {
        if (!this.isRegisteredForMetadataUpdates) {
            this.metadata.addClusterUpdateListener(this);
            this.isRegisteredForMetadataUpdates = true;
        }
    }

    @Override
    public CompletableFuture<Void> leaveGroup() {
        if (this.state == MemberState.UNSUBSCRIBED || this.state == MemberState.FATAL) {
            return CompletableFuture.completedFuture(null);
        }
        if (this.state == MemberState.PREPARE_LEAVING || this.state == MemberState.LEAVING) {
            return this.leaveGroupInProgress.get();
        }
        this.transitionTo(MemberState.PREPARE_LEAVING);
        CompletableFuture<Void> leaveResult = new CompletableFuture<Void>();
        this.leaveGroupInProgress = Optional.of(leaveResult);
        CompletableFuture<Void> callbackResult = this.invokeOnPartitionsRevokedOrLostToReleaseAssignment();
        callbackResult.whenComplete((result, error) -> {
            this.updateSubscription(new TreeSet<TopicIdPartition>(TOPIC_ID_PARTITION_COMPARATOR), true);
            this.transitionToSendingLeaveGroup();
        });
        return leaveResult;
    }

    private CompletableFuture<Void> invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
        TreeSet<TopicPartition> droppedPartitions = new TreeSet<TopicPartition>(TOPIC_PARTITION_COMPARATOR);
        droppedPartitions.addAll(this.subscriptions.assignedPartitions());
        CompletableFuture<Object> callbackResult = droppedPartitions.isEmpty() ? CompletableFuture.completedFuture(null) : (this.memberEpoch > 0 ? this.revokePartitions(droppedPartitions) : this.invokeOnPartitionsLostCallback(droppedPartitions));
        return callbackResult;
    }

    void transitionToSendingLeaveGroup() {
        if (this.state == MemberState.FATAL) {
            this.log.warn("Member {} with epoch {} won't send leave group request because it is in FATAL state", (Object)this.memberId, (Object)this.memberEpoch);
            return;
        }
        if (this.state == MemberState.UNSUBSCRIBED) {
            this.log.warn("Member {} won't send leave group request because it is already out of the group.", (Object)this.memberId);
            return;
        }
        int leaveEpoch = this.groupInstanceId.isPresent() ? -2 : -1;
        this.updateMemberEpoch(leaveEpoch);
        this.currentAssignment = new HashMap<Uuid, SortedSet<Integer>>();
        this.transitionTo(MemberState.LEAVING);
    }

    private void notifyEpochChange(Optional<Integer> epoch, Optional<String> memberId) {
        this.stateUpdatesListeners.forEach(stateListener -> stateListener.onMemberEpochUpdated(epoch, memberId));
    }

    @Override
    public boolean shouldHeartbeatNow() {
        MemberState state = this.state();
        return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING || state == MemberState.JOINING;
    }

    @Override
    public void onHeartbeatRequestSent() {
        MemberState state = this.state();
        if (this.isStaled()) {
            this.log.debug("Member {} is staled and is therefore leaving the group.  It will rejoin upon the next poll.", (Object)this.memberEpoch);
            this.transitionToJoining();
            return;
        }
        if (state == MemberState.ACKNOWLEDGING) {
            if (this.allPendingAssignmentsReconciled()) {
                this.transitionTo(MemberState.STABLE);
            } else {
                this.log.debug("Member {} with epoch {} transitioned to {} after a heartbeat was sent to ack a previous reconciliation. New assignments are ready to be reconciled.", new Object[]{this.memberId, this.memberEpoch, MemberState.RECONCILING});
                this.transitionTo(MemberState.RECONCILING);
            }
        } else if (state == MemberState.LEAVING) {
            this.transitionToUnsubscribed();
        }
    }

    @Override
    public void onHeartbeatRequestSkipped() {
        if (this.state == MemberState.LEAVING) {
            this.log.debug("Heartbeat for leaving group could not be sent. Member {} with epoch {} will transition to {}.", new Object[]{this.memberId, this.memberEpoch, MemberState.UNSUBSCRIBED});
            this.transitionToUnsubscribed();
        }
    }

    private void transitionToUnsubscribed() {
        this.transitionTo(MemberState.UNSUBSCRIBED);
        this.leaveGroupInProgress.get().complete(null);
        this.leaveGroupInProgress = Optional.empty();
    }

    private boolean allPendingAssignmentsReconciled() {
        return this.assignmentUnresolved.isEmpty() && this.assignmentReadyToReconcile.isEmpty();
    }

    @Override
    public boolean shouldSkipHeartbeat() {
        MemberState state = this.state();
        return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
    }

    @Override
    public void transitionToStale() {
        this.memberEpoch = -1;
        this.updateSubscription(new TreeSet<TopicIdPartition>(TOPIC_ID_PARTITION_COMPARATOR), true);
        this.transitionTo(MemberState.STALE);
    }

    boolean reconcile() {
        if (this.reconciliationInProgress) {
            this.log.debug("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " + this.assignmentReadyToReconcile + " will be handled in the next reconciliation loop.");
            return false;
        }
        TreeSet<TopicIdPartition> assignedTopicIdPartitions = new TreeSet<TopicIdPartition>(TOPIC_ID_PARTITION_COMPARATOR);
        assignedTopicIdPartitions.addAll(this.assignmentReadyToReconcile);
        TreeSet<TopicPartition> ownedPartitions = new TreeSet<TopicPartition>(TOPIC_PARTITION_COMPARATOR);
        ownedPartitions.addAll(this.subscriptions.assignedPartitions());
        SortedSet<TopicPartition> assignedTopicPartitions = this.toTopicPartitionSet(assignedTopicIdPartitions);
        boolean sameAssignmentReceived = assignedTopicPartitions.equals(ownedPartitions);
        if (sameAssignmentReceived) {
            this.log.debug("Ignoring reconciliation attempt. Target assignment ready to reconcile {} is equal to the member current assignment {}.", (Object)assignedTopicPartitions, (Object)ownedPartitions);
            return false;
        }
        this.markReconciliationInProgress();
        TreeSet<TopicPartition> addedPartitions = new TreeSet<TopicPartition>(TOPIC_PARTITION_COMPARATOR);
        addedPartitions.addAll(assignedTopicPartitions);
        addedPartitions.removeAll(ownedPartitions);
        TreeSet<TopicPartition> revokedPartitions = new TreeSet<TopicPartition>(TOPIC_PARTITION_COMPARATOR);
        revokedPartitions.addAll(ownedPartitions);
        revokedPartitions.removeAll(assignedTopicPartitions);
        this.log.info("Updating assignment with\n\tAssigned partitions:                       {}\n\tCurrent owned partitions:                  {}\n\tAdded partitions (assigned - owned):       {}\n\tRevoked partitions (owned - assigned):     {}\n", assignedTopicIdPartitions, ownedPartitions, addedPartitions, revokedPartitions);
        CompletableFuture<Void> commitResult = this.commitRequestManager.maybeAutoCommitAllConsumedNow(Optional.of(this.getExpirationTimeForTimeout(this.rebalanceTimeoutMs)), true);
        commitResult.whenComplete((commitReqResult, commitReqError) -> {
            if (commitReqError != null) {
                this.log.error("Auto-commit request before reconciling new assignment failed. Will proceed with the reconciliation anyway.", (Throwable)commitReqError);
            } else {
                this.log.debug("Auto-commit before reconciling new assignment completed successfully.");
            }
            this.revokeAndAssign(assignedTopicIdPartitions, revokedPartitions, addedPartitions);
        });
        return true;
    }

    long getExpirationTimeForTimeout(long timeoutMs) {
        long expiration = this.time.milliseconds() + timeoutMs;
        if (expiration < 0L) {
            return Long.MAX_VALUE;
        }
        return expiration;
    }

    private void revokeAndAssign(SortedSet<TopicIdPartition> assignedTopicIdPartitions, SortedSet<TopicPartition> revokedPartitions, SortedSet<TopicPartition> addedPartitions) {
        CompletableFuture<Object> revocationResult = !revokedPartitions.isEmpty() ? this.revokePartitions(revokedPartitions) : CompletableFuture.completedFuture(null);
        CompletionStage reconciliationResult = revocationResult.thenCompose(__ -> {
            boolean memberHasRejoined;
            boolean bl = memberHasRejoined = this.memberEpochOnReconciliationStart != this.memberEpoch;
            if (this.state == MemberState.RECONCILING && !memberHasRejoined) {
                this.commitRequestManager.resetAutoCommitTimer();
                return this.assignPartitions(assignedTopicIdPartitions, addedPartitions);
            }
            this.log.debug("Revocation callback completed but the member already transitioned out of the reconciling state for epoch {} into {} state with epoch {}. Interrupting reconciliation as it's not relevant anymore,", new Object[]{this.memberEpochOnReconciliationStart, this.state, this.memberEpoch});
            String reason = this.interruptedReconciliationErrorMessage();
            CompletableFuture res = new CompletableFuture();
            res.completeExceptionally(new KafkaException("Interrupting reconciliation after revocation. " + reason));
            return res;
        });
        ((CompletableFuture)reconciliationResult).whenComplete((result, error) -> {
            this.markReconciliationCompleted();
            if (error != null) {
                this.log.error("Reconciliation failed.", (Throwable)error);
            } else {
                boolean memberHasRejoined;
                boolean bl = memberHasRejoined = this.memberEpochOnReconciliationStart != this.memberEpoch;
                if (this.state == MemberState.RECONCILING && !memberHasRejoined) {
                    this.transitionTo(MemberState.ACKNOWLEDGING);
                    this.assignmentReadyToReconcile.removeAll(assignedTopicIdPartitions);
                } else {
                    String reason = this.interruptedReconciliationErrorMessage();
                    this.log.error("Interrupting reconciliation after partitions assigned callback completed. " + reason);
                }
            }
        });
    }

    void updateCurrentAssignment(Set<TopicIdPartition> assignedTopicIdPartitions) {
        this.currentAssignment.clear();
        assignedTopicIdPartitions.forEach(topicIdPartition -> {
            Uuid topicId = topicIdPartition.topicId();
            this.currentAssignment.computeIfAbsent(topicId, k -> new TreeSet()).add(topicIdPartition.partition());
        });
    }

    private SortedSet<TopicPartition> toTopicPartitionSet(SortedSet<TopicIdPartition> topicIdPartitions) {
        TreeSet<TopicPartition> result = new TreeSet<TopicPartition>(TOPIC_PARTITION_COMPARATOR);
        topicIdPartitions.forEach(topicIdPartition -> result.add(topicIdPartition.topicPartition()));
        return result;
    }

    private String interruptedReconciliationErrorMessage() {
        String reason = this.state != MemberState.RECONCILING ? "The member already transitioned out of the reconciling state into " + (Object)((Object)this.state) : "The member has re-joined the group.";
        return reason;
    }

    void markReconciliationInProgress() {
        this.reconciliationInProgress = true;
        this.memberEpochOnReconciliationStart = this.memberEpoch;
    }

    void markReconciliationCompleted() {
        this.reconciliationInProgress = false;
    }

    private void resolveMetadataForUnresolvedAssignment() {
        this.assignmentReadyToReconcile.clear();
        Iterator<Map.Entry<Uuid, SortedSet<Integer>>> it = this.assignmentUnresolved.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Uuid, SortedSet<Integer>> e = it.next();
            Uuid topicId = e.getKey();
            SortedSet<Integer> topicPartitions = e.getValue();
            Optional<String> nameFromMetadata = this.findTopicNameInGlobalOrLocalCache(topicId);
            nameFromMetadata.ifPresent(resolvedTopicName -> {
                this.addToAssignmentReadyToReconcile(topicId, (String)resolvedTopicName, topicPartitions);
                it.remove();
            });
        }
        if (!this.assignmentUnresolved.isEmpty()) {
            this.log.debug("Topic Ids {} received in target assignment were not found in metadata and are not currently assigned. Requesting a metadata update now to resolve topic names.", (Object)this.assignmentUnresolved.keySet());
            this.metadata.requestUpdate(true);
        }
    }

    private Optional<String> findTopicNameInGlobalOrLocalCache(Uuid topicId) {
        String nameFromMetadataCache = this.metadata.topicNames().getOrDefault(topicId, null);
        if (nameFromMetadataCache != null) {
            this.assignedTopicNamesCache.put(topicId, nameFromMetadataCache);
            return Optional.of(nameFromMetadataCache);
        }
        String nameFromSubscriptionCache = this.assignedTopicNamesCache.getOrDefault(topicId, null);
        return Optional.ofNullable(nameFromSubscriptionCache);
    }

    private void addToAssignmentReadyToReconcile(Uuid topicId, String topicName, SortedSet<Integer> topicPartitions) {
        topicPartitions.forEach(tp -> {
            TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topicName, (int)tp));
            this.assignmentReadyToReconcile.add(topicIdPartition);
        });
    }

    CompletableFuture<Void> revokePartitions(Set<TopicPartition> revokedPartitions) {
        this.log.info("Revoking previously assigned partitions {}", (Object)Utils.join(revokedPartitions, ", "));
        this.logPausedPartitionsBeingRevoked(revokedPartitions);
        this.markPendingRevocationToPauseFetching(revokedPartitions);
        CompletableFuture<Void> revocationResult = new CompletableFuture<Void>();
        if (this.state == MemberState.FATAL) {
            String errorMsg = String.format("Member %s with epoch %s received a fatal error while waiting for a revocation commit to complete. Will abort revocation without triggering user callback.", this.memberId, this.memberEpoch);
            this.log.debug(errorMsg);
            revocationResult.completeExceptionally(new KafkaException(errorMsg));
            return revocationResult;
        }
        CompletableFuture<Void> userCallbackResult = this.invokeOnPartitionsRevokedCallback(revokedPartitions);
        userCallbackResult.whenComplete((callbackResult, callbackError) -> {
            if (callbackError != null) {
                this.log.error("onPartitionsRevoked callback invocation failed for partitions {}", (Object)revokedPartitions, callbackError);
                revocationResult.completeExceptionally((Throwable)callbackError);
            } else {
                revocationResult.complete(null);
            }
        });
        return revocationResult;
    }

    private CompletableFuture<Void> assignPartitions(SortedSet<TopicIdPartition> assignedPartitions, SortedSet<TopicPartition> addedPartitions) {
        this.updateSubscription(assignedPartitions, false);
        CompletableFuture<Void> result = this.invokeOnPartitionsAssignedCallback(addedPartitions);
        Set assignedTopics = assignedPartitions.stream().map(TopicIdPartition::topic).collect(Collectors.toSet());
        this.assignedTopicNamesCache.values().retainAll(assignedTopics);
        return result;
    }

    private void markPendingRevocationToPauseFetching(Set<TopicPartition> partitionsToRevoke) {
        this.log.debug("Marking partitions pending for revocation: {}", (Object)partitionsToRevoke);
        this.subscriptions.markPendingRevocation(partitionsToRevoke);
    }

    private CompletableFuture<Void> invokeOnPartitionsRevokedCallback(Set<TopicPartition> partitionsRevoked) {
        Optional<ConsumerRebalanceListener> listener = this.subscriptions.rebalanceListener();
        if (!partitionsRevoked.isEmpty() && listener.isPresent()) {
            return this.enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, partitionsRevoked);
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> invokeOnPartitionsAssignedCallback(Set<TopicPartition> partitionsAssigned) {
        Optional<ConsumerRebalanceListener> listener = this.subscriptions.rebalanceListener();
        if (listener.isPresent()) {
            return this.enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED, partitionsAssigned);
        }
        return CompletableFuture.completedFuture(null);
    }

    CompletableFuture<Void> invokeOnPartitionsLostCallback(Set<TopicPartition> partitionsLost) {
        Optional<ConsumerRebalanceListener> listener = this.subscriptions.rebalanceListener();
        if (!partitionsLost.isEmpty() && listener.isPresent()) {
            return this.enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST, partitionsLost);
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName methodName, Set<TopicPartition> partitions) {
        TreeSet<TopicPartition> sortedPartitions = new TreeSet<TopicPartition>(TOPIC_PARTITION_COMPARATOR);
        sortedPartitions.addAll(partitions);
        ConsumerRebalanceListenerCallbackNeededEvent event = new ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions);
        this.backgroundEventHandler.add(event);
        this.log.debug("The event to trigger the {} method execution was enqueued successfully", (Object)methodName.fullyQualifiedMethodName());
        return event.future();
    }

    @Override
    public void consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerCallbackCompletedEvent event) {
        ConsumerRebalanceListenerMethodName methodName = event.methodName();
        Optional<KafkaException> error = event.error();
        CompletableFuture<Void> future = event.future();
        if (error.isPresent()) {
            Exception e = error.get();
            this.log.warn("The {} method completed with an error ({}); signaling to continue to the next phase of rebalance", (Object)methodName.fullyQualifiedMethodName(), (Object)e.getMessage());
            future.completeExceptionally(e);
        } else {
            this.log.debug("The {} method completed successfully; signaling to continue to the next phase of rebalance", (Object)methodName.fullyQualifiedMethodName());
            future.complete(null);
        }
    }

    private void logPausedPartitionsBeingRevoked(Set<TopicPartition> partitionsToRevoke) {
        Set<TopicPartition> revokePausedPartitions = this.subscriptions.pausedPartitions();
        revokePausedPartitions.retainAll(partitionsToRevoke);
        if (!revokePausedPartitions.isEmpty()) {
            this.log.info("The pause flag in partitions [{}] will be removed due to revocation.", (Object)Utils.join(revokePausedPartitions, ", "));
        }
    }

    private void clearPendingAssignmentsAndLocalNamesCache() {
        this.assignmentUnresolved.clear();
        this.assignmentReadyToReconcile.clear();
        this.assignedTopicNamesCache.clear();
    }

    private void resetEpoch() {
        this.updateMemberEpoch(0);
    }

    private void updateMemberEpoch(int newEpoch) {
        boolean newEpochReceived = this.memberEpoch != newEpoch;
        this.memberEpoch = newEpoch;
        if (newEpochReceived) {
            if (this.memberEpoch > 0) {
                this.notifyEpochChange(Optional.of(this.memberEpoch), Optional.ofNullable(this.memberId));
            } else {
                this.notifyEpochChange(Optional.empty(), Optional.empty());
            }
        }
    }

    @Override
    public MemberState state() {
        return this.state;
    }

    @Override
    public Optional<String> serverAssignor() {
        return this.serverAssignor;
    }

    @Override
    public Map<Uuid, SortedSet<Integer>> currentAssignment() {
        return this.currentAssignment;
    }

    Set<Uuid> topicsWaitingForMetadata() {
        return Collections.unmodifiableSet(this.assignmentUnresolved.keySet());
    }

    Set<TopicIdPartition> assignmentReadyToReconcile() {
        return Collections.unmodifiableSet(this.assignmentReadyToReconcile);
    }

    boolean reconciliationInProgress() {
        return this.reconciliationInProgress;
    }

    @Override
    public void onUpdate(ClusterResource clusterResource) {
        this.resolveMetadataForUnresolvedAssignment();
        if (!this.assignmentReadyToReconcile.isEmpty()) {
            this.reconcile();
        }
    }

    @Override
    public void registerStateListener(MemberStateListener listener) {
        if (listener == null) {
            throw new IllegalArgumentException("State updates listener cannot be null");
        }
        this.stateUpdatesListeners.add(listener);
    }
}

