/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.cluster.metadata;

import java.util.Collections;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.action.admin.indices.streamingingestion.state.TransportUpdateIngestionStateAction;
import org.opensearch.action.admin.indices.streamingingestion.state.UpdateIngestionStateRequest;
import org.opensearch.action.admin.indices.streamingingestion.state.UpdateIngestionStateResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IngestionStatus;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.Index;

public class MetadataStreamingIngestionStateService {
    private static final Logger logger = LogManager.getLogger(MetadataStreamingIngestionStateService.class);
    private final ClusterService clusterService;
    private final TransportUpdateIngestionStateAction transportUpdateIngestionStateAction;

    @Inject
    public MetadataStreamingIngestionStateService(ClusterService clusterService, TransportUpdateIngestionStateAction transportUpdateIngestionStateAction) {
        this.clusterService = clusterService;
        this.transportUpdateIngestionStateAction = transportUpdateIngestionStateAction;
    }

    public void updateIngestionPollerState(String source, final Index[] concreteIndices, final UpdateIngestionStateRequest request, final ActionListener<UpdateIngestionStateResponse> listener) {
        if (concreteIndices == null || concreteIndices.length == 0) {
            throw new IllegalArgumentException("Index is missing");
        }
        if (request.getIngestionPaused() == null) {
            throw new IllegalArgumentException("Ingestion poller target state is missing");
        }
        if (request.getResetSettings() != null && request.getResetSettings().length > 0 && request.getIngestionPaused().booleanValue()) {
            throw new IllegalArgumentException("Poller position can only be reset during a resume operation.");
        }
        this.clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(Priority.URGENT){

            @Override
            public ClusterState execute(ClusterState currentState) {
                return MetadataStreamingIngestionStateService.this.getUpdatedIngestionPausedClusterState(concreteIndices, currentState, request.getIngestionPaused());
            }

            @Override
            public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                if (oldState == newState) {
                    logger.debug("Cluster state did not change when trying to set ingestionPaused={}", (Object)request.getIngestionPaused());
                    listener.onResponse((Object)new UpdateIngestionStateResponse(false, 0, 0, 0, Collections.emptyList()));
                } else {
                    MetadataStreamingIngestionStateService.this.processUpdateIngestionRequestOnShards(request, new ActionListener<UpdateIngestionStateResponse>(){

                        public void onResponse(UpdateIngestionStateResponse updateIngestionStateResponse) {
                            listener.onResponse((Object)updateIngestionStateResponse);
                        }

                        public void onFailure(Exception e) {
                            UpdateIngestionStateResponse response = new UpdateIngestionStateResponse(true, 0, 0, 0, Collections.emptyList());
                            response.setErrorMessage("Error encountered while verifying ingestion poller state: " + e.getMessage());
                            listener.onResponse((Object)response);
                        }
                    });
                }
            }

            @Override
            public void onFailure(String source, Exception e) {
                listener.onFailure((Exception)((Object)new OpenSearchException("Ingestion cluster state update failed to set ingestionPaused={}", new Object[]{request.getIngestionPaused(), e})));
            }

            @Override
            public TimeValue timeout() {
                return request.timeout();
            }
        });
    }

    public void processUpdateIngestionRequestOnShards(UpdateIngestionStateRequest updateIngestionStateRequest, ActionListener<UpdateIngestionStateResponse> listener) {
        this.transportUpdateIngestionStateAction.execute(updateIngestionStateRequest, listener);
    }

    public void resetShardPointerAndResumeIngestion(final String source, final Index[] concreteIndices, UpdateIngestionStateRequest shardPointerUpdateRequest, final UpdateIngestionStateRequest resumeIngestionRequest, final ActionListener<UpdateIngestionStateResponse> listener) {
        if (concreteIndices == null || concreteIndices.length == 0) {
            throw new IllegalArgumentException("Index is missing");
        }
        this.transportUpdateIngestionStateAction.execute(shardPointerUpdateRequest, new ActionListener<UpdateIngestionStateResponse>(){

            public void onResponse(UpdateIngestionStateResponse updateIngestionStateResponse) {
                boolean isSuccessfulUpdate;
                boolean bl = isSuccessfulUpdate = updateIngestionStateResponse.isAcknowledged() && updateIngestionStateResponse.getFailedShards() == 0;
                if (isSuccessfulUpdate) {
                    MetadataStreamingIngestionStateService.this.updateIngestionPollerState(source, concreteIndices, resumeIngestionRequest, (ActionListener<UpdateIngestionStateResponse>)listener);
                } else {
                    logger.debug("Error resetting consumer pointers");
                    listener.onResponse((Object)updateIngestionStateResponse);
                }
            }

            public void onFailure(Exception e) {
                logger.debug("Error resetting consumer pointers", (Throwable)e);
                listener.onFailure(e);
            }
        });
    }

    private ClusterState getUpdatedIngestionPausedClusterState(Index[] indices, ClusterState currentState, boolean ingestionPaused) {
        Metadata.Builder metadata = Metadata.builder(currentState.metadata());
        for (Index index : indices) {
            IndexMetadata indexMetadata = metadata.getSafe(index);
            if (!indexMetadata.useIngestionSource()) {
                logger.debug("Pause/resume request will be ignored for index {} as streaming ingestion is not enabled", (Object)index);
            }
            if (indexMetadata.getIngestionStatus().isPaused() != ingestionPaused) {
                IngestionStatus updatedIngestionStatus = new IngestionStatus(ingestionPaused);
                IndexMetadata.Builder updatedMetadata = IndexMetadata.builder(indexMetadata).ingestionStatus(updatedIngestionStatus);
                metadata.put(updatedMetadata);
                continue;
            }
            logger.debug("Received request for ingestionPaused:{} for index {}. The state is already ingestionPaused:{}", (Object)ingestionPaused, (Object)index, (Object)ingestionPaused);
        }
        return ClusterState.builder(currentState).metadata(metadata).build();
    }
}

