package com.couchbase.client.core.diagnostics;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.AbstractEvent;
import com.couchbase.client.core.cnc.Context;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.events.core.WaitUntilReadyCompletedEvent;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.ArrayNode;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.ObjectNode;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.DefaultFullHttpRequest;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.HttpMethod;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.HttpVersion;
import com.couchbase.client.core.endpoint.http.CoreCommonOptions;
import com.couchbase.client.core.endpoint.http.CoreHttpPath;
import com.couchbase.client.core.error.UnambiguousTimeoutException;
import com.couchbase.client.core.error.context.CancellationErrorContext;
import com.couchbase.client.core.json.Mapper;
import com.couchbase.client.core.msg.RequestTarget;
import com.couchbase.client.core.msg.ResponseStatus;
import com.couchbase.client.core.msg.manager.GenericManagerRequest;
import com.couchbase.client.core.retry.FailFastRetryStrategy;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.util.CbCollections;
import com.couchbase.client.core.util.CbThrowables;
import com.couchbase.client.core.util.Deadline;
import com.couchbase.client.core.util.NanoTimestamp;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;

@Stability.Internal
/* loaded from: input_file:lib/core-io-2.7.1.jar:com/couchbase/client/core/diagnostics/WaitUntilReadyHelper.class */
public class WaitUntilReadyHelper {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/core-io-2.7.1.jar:com/couchbase/client/core/diagnostics/WaitUntilReadyHelper$NotReadyYetException.class */
    public static class NotReadyYetException extends RuntimeException {
        public NotReadyYetException(String str) {
            super(str);
        }
    }

    /* loaded from: input_file:lib/core-io-2.7.1.jar:com/couchbase/client/core/diagnostics/WaitUntilReadyHelper$WaitUntilReadyDiagnostic.class */
    private static class WaitUntilReadyDiagnostic extends AbstractEvent {
        private final String message;

        protected WaitUntilReadyDiagnostic(String str) {
            super(Event.Severity.DEBUG, Event.Category.CORE.path() + ".WaitUntilReady", Duration.ZERO, (Context) null);
            this.message = (String) Objects.requireNonNull(str);
        }

        @Override // com.couchbase.client.core.cnc.Event
        public String description() {
            return this.message;
        }
    }

    @Stability.Internal
    /* loaded from: input_file:lib/core-io-2.7.1.jar:com/couchbase/client/core/diagnostics/WaitUntilReadyHelper$WaitUntilReadyLogger.class */
    public interface WaitUntilReadyLogger {
        public static final WaitUntilReadyLogger dummy = new WaitUntilReadyLogger() { // from class: com.couchbase.client.core.diagnostics.WaitUntilReadyHelper.WaitUntilReadyLogger.1
        };

        static WaitUntilReadyLogger create(final EventBus eventBus) {
            final String uuid = UUID.randomUUID().toString();
            return new WaitUntilReadyLogger() { // from class: com.couchbase.client.core.diagnostics.WaitUntilReadyHelper.WaitUntilReadyLogger.2
                @Override // com.couchbase.client.core.diagnostics.WaitUntilReadyHelper.WaitUntilReadyLogger
                public void message(String str) {
                    String str2 = uuid + " " + str;
                    if (EventBus.PublishResult.SUCCESS != eventBus.publish(new WaitUntilReadyDiagnostic(str2))) {
                        System.err.println("[WaitUntilReadyDiagnostic] " + str2);
                    }
                }
            };
        }

        default void message(String str) {
        }

        default void waitingBecause(String str) {
            message("Waiting because " + str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/core-io-2.7.1.jar:com/couchbase/client/core/diagnostics/WaitUntilReadyHelper$WaitUntilReadyStage.class */
    public enum WaitUntilReadyStage {
        INITIAL,
        WAIT_FOR_CONFIG,
        WAIT_FOR_HEALTHY_NODES,
        WAIT_FOR_SUCCESSFUL_PING,
        COMPLETE
    }

    @Stability.Internal
    /* loaded from: input_file:lib/core-io-2.7.1.jar:com/couchbase/client/core/diagnostics/WaitUntilReadyHelper$WaitUntilReadyState.class */
    public static class WaitUntilReadyState {
        private final Map<WaitUntilReadyStage, Long> timings = new ConcurrentHashMap();
        private final AtomicLong totalDuration = new AtomicLong();
        private volatile WaitUntilReadyStage currentStage = WaitUntilReadyStage.INITIAL;
        private volatile NanoTimestamp currentStart = NanoTimestamp.now();
        private final WaitUntilReadyLogger log;

        public WaitUntilReadyState(WaitUntilReadyLogger waitUntilReadyLogger) {
            this.log = (WaitUntilReadyLogger) Objects.requireNonNull(waitUntilReadyLogger);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void transition(WaitUntilReadyStage waitUntilReadyStage) {
            long millis = this.currentStart.elapsed().toMillis();
            if (this.currentStage != WaitUntilReadyStage.INITIAL) {
                this.timings.put(this.currentStage, Long.valueOf(millis));
                this.log.message("Stage '" + this.currentStage + "' took " + this.currentStart.elapsed());
            }
            this.totalDuration.addAndGet(millis);
            this.log.message("Transitioning from stage " + this.currentStage + " to stage " + waitUntilReadyStage + ". Total elapsed time since waiting started: " + Duration.ofMillis(this.totalDuration.get()));
            this.currentStage = waitUntilReadyStage;
            this.currentStart = NanoTimestamp.now();
        }

        public Map<String, Object> export() {
            TreeMap treeMap = new TreeMap();
            treeMap.put("current_stage", this.currentStage);
            if (this.currentStage != WaitUntilReadyStage.COMPLETE) {
                long millis = this.currentStart.elapsed().toMillis();
                treeMap.put("current_stage_since_ms", Long.valueOf(millis));
                treeMap.put("total_ms", Long.valueOf(this.totalDuration.get() + millis));
            } else {
                treeMap.put("total_ms", Long.valueOf(this.totalDuration.get()));
            }
            treeMap.put("timings_ms", new TreeMap(this.timings));
            return treeMap;
        }
    }

    private static RetryBackoffSpec retryWithMaxBackoff(Duration duration) {
        return Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(10L)).maxBackoff(duration).jitter(0.5d);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> Mono<T> retryUntilReady(RetryBackoffSpec retryBackoffSpec, String str, WaitUntilReadyLogger waitUntilReadyLogger, Mono<T> mono) {
        return mono.retryWhen(retryBackoffSpec.filter(th -> {
            if (th instanceof NotReadyYetException) {
                waitUntilReadyLogger.waitingBecause(th.getMessage());
                return true;
            }
            waitUntilReadyLogger.message("Unexpected exception while waiting for " + str + ": " + CbThrowables.getStackTraceAsString(th));
            return true;
        }));
    }

    private static Mono<Void> waitForConfig(Core core, @Nullable String str, WaitUntilReadyLogger waitUntilReadyLogger) {
        return retryUntilReady(retryWithMaxBackoff(Duration.ofMillis(100L)), "config load", waitUntilReadyLogger, Mono.fromRunnable(() -> {
            if (core.configurationProvider().globalConfigLoadInProgress()) {
                throw new NotReadyYetException("global config load is in progress");
            }
            if (core.configurationProvider().bucketConfigLoadInProgress()) {
                throw new NotReadyYetException("bucket config load is in progress");
            }
            if (str != null && core.configurationProvider().collectionRefreshInProgress()) {
                throw new NotReadyYetException("collection refresh is in progress for bucket " + str);
            }
            if (str != null && core.clusterConfig().bucketConfig(str) == null) {
                throw new NotReadyYetException("cluster config does not yet have config for bucket " + str);
            }
        }));
    }

    private static Mono<Void> waitForNodeHealth(Core core, @Nullable String str, WaitUntilReadyLogger waitUntilReadyLogger) {
        if (str == null) {
            return Mono.fromRunnable(() -> {
                waitUntilReadyLogger.message("Skipping node health check because no bucket name was specified.");
            });
        }
        return retryUntilReady(retryWithMaxBackoff(Duration.ofSeconds(2L)), "checking bucket health", waitUntilReadyLogger, Mono.defer(() -> {
            String formatPath = CoreHttpPath.formatPath("/pools/default/buckets/{}", str);
            GenericManagerRequest genericManagerRequest = new GenericManagerRequest(core.context(), () -> {
                return new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, formatPath);
            }, true, null);
            waitUntilReadyLogger.message("Sending manager request to check bucket health; httpPath=" + formatPath);
            core.send(genericManagerRequest);
            return Reactor.wrap(genericManagerRequest, genericManagerRequest.response(), true).flatMap(genericManagerResponse -> {
                if (genericManagerResponse.status() != ResponseStatus.SUCCESS) {
                    return Mono.error(new NotReadyYetException("Manager request to check bucket health failed with response status " + genericManagerResponse.status() + "; httpStatusCode=" + genericManagerResponse.httpStatus() + ", responseBody=" + new String(genericManagerResponse.content(), StandardCharsets.UTF_8) + ", requestContext=" + genericManagerRequest.context()));
                }
                ArrayNode arrayNode = (ArrayNode) ((ObjectNode) Mapper.decodeIntoTree(genericManagerResponse.content())).get("nodes");
                long count = StreamSupport.stream(arrayNode.spliterator(), false).filter(jsonNode -> {
                    return jsonNode.get("status").asText().equals("healthy");
                }).count();
                if (arrayNode.size() != count) {
                    return Mono.error(new NotReadyYetException(count + " of " + arrayNode.size() + " nodes are healthy"));
                }
                waitUntilReadyLogger.message("All " + count + " nodes are healthy");
                return Mono.empty();
            });
        }));
    }

    public static CompletableFuture<Void> waitUntilReady(Core core, @Nullable Set<ServiceType> set, Duration duration, ClusterState clusterState, Optional<String> optional) {
        WaitUntilReadyLogger create = WaitUntilReadyLogger.create(core.environment().eventBus());
        create.message("Starting WaitUntilReady. serviceTypes=" + set + ", timeout=" + duration + ", desiredState=" + clusterState + ", bucketName=" + optional);
        Deadline of = Deadline.of(duration, 0.9d);
        WaitUntilReadyState waitUntilReadyState = new WaitUntilReadyState(create);
        AtomicReference atomicReference = new AtomicReference(set == null ? Collections.emptySet() : CbCollections.setCopyOf(set));
        ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
        return Mono.empty().then(Mono.fromRunnable(() -> {
            waitUntilReadyState.transition(WaitUntilReadyStage.WAIT_FOR_CONFIG);
        })).then(waitForConfig(core, optional.orElse(null), create)).then(Mono.fromRunnable(() -> {
            waitUntilReadyState.transition(WaitUntilReadyStage.WAIT_FOR_HEALTHY_NODES);
        })).then(waitForNodeHealth(core, optional.orElse(null), create)).then(Mono.fromRunnable(() -> {
            waitUntilReadyState.transition(WaitUntilReadyStage.WAIT_FOR_SUCCESSFUL_PING);
        })).then(waitForSuccessfulPing(core, optional.orElse(null), clusterState, atomicReference, of, newKeySet, create)).timeout(duration, Mono.defer(() -> {
            create.message("WaitUntilReady timed out :-(");
            return Mono.error(new UnambiguousTimeoutException("WaitUntilReady timed out in stage " + waitUntilReadyState.currentStage + " (spent " + waitUntilReadyState.currentStart.elapsed() + " in that stage)", new CancellationErrorContext(new WaitUntilReadyContext((Set) atomicReference.get(), duration, clusterState, optional, (Map) core.diagnostics().collect(Collectors.groupingBy((v0) -> {
                return v0.type();
            })), waitUntilReadyState, CbCollections.setCopyOf(newKeySet)))));
        }), core.context().environment().scheduler()).doOnSuccess(reason -> {
            waitUntilReadyState.transition(WaitUntilReadyStage.COMPLETE);
            core.context().environment().eventBus().publish(new WaitUntilReadyCompletedEvent(new WaitUntilReadyContext((Set) atomicReference.get(), duration, clusterState, optional, (Map) core.diagnostics().collect(Collectors.groupingBy((v0) -> {
                return v0.type();
            })), waitUntilReadyState, Collections.emptySet()), reason));
        }).then().toFuture();
    }

    private static Mono<WaitUntilReadyCompletedEvent.Reason> waitForSuccessfulPing(Core core, @Nullable String str, ClusterState clusterState, AtomicReference<Set<ServiceType>> atomicReference, Deadline deadline, Set<RequestTarget> set, WaitUntilReadyLogger waitUntilReadyLogger) {
        return Mono.defer(() -> {
            if (str == null && !core.clusterConfig().hasClusterOrBucketConfig()) {
                waitUntilReadyLogger.message("cluster.waitUntilReady() completed without action, because it was run against a Couchbase Server version which does not support it (only supported with 6.5 and later). Please open at least one bucket, and call bucket.waitUntilReady() instead.");
                return Mono.just(WaitUntilReadyCompletedEvent.Reason.CLUSTER_LEVEL_NOT_SUPPORTED);
            }
            Optional ofNullable = Optional.ofNullable(str);
            Set copyOf = CbCollections.setCopyOf(HealthPinger.extractPingTargets(core.clusterConfig(), (Set) atomicReference.get(), ofNullable, waitUntilReadyLogger));
            set.addAll(copyOf);
            atomicReference.set((Set) copyOf.stream().map((v0) -> {
                return v0.serviceType();
            }).collect(Collectors.toSet()));
            ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
            newKeySet.addAll((Collection) atomicReference.get());
            return Flux.fromIterable(copyOf).flatMap(requestTarget -> {
                return retryUntilReady(retryWithMaxBackoff(Duration.ofSeconds(1L)), "ping " + requestTarget, waitUntilReadyLogger, HealthPinger.pingTarget(core, requestTarget, CoreCommonOptions.of(deadline.remaining().orElse(Duration.ofSeconds(10L)), FailFastRetryStrategy.INSTANCE, null), waitUntilReadyLogger).flatMap(endpointPingReport -> {
                    if (endpointPingReport.state() == PingState.OK) {
                        set.remove(requestTarget);
                        return Mono.just(endpointPingReport);
                    }
                    if (HealthPinger.extractPingTargets(core.clusterConfig(), (Set) atomicReference.get(), ofNullable, WaitUntilReadyLogger.dummy).contains(requestTarget)) {
                        return Mono.error(new NotReadyYetException("ping for target " + requestTarget + " failed with status: " + endpointPingReport.state()));
                    }
                    waitUntilReadyLogger.message("Ignoring ping target " + requestTarget + " because it's no longer part of the cluster.");
                    set.remove(requestTarget);
                    return Mono.empty();
                }));
            }).takeUntil(endpointPingReport -> {
                if (clusterState == ClusterState.ONLINE) {
                    return false;
                }
                if (newKeySet.remove(endpointPingReport.type())) {
                    waitUntilReadyLogger.message("At least one " + endpointPingReport.type() + " ping was successful.");
                }
                if (!newKeySet.isEmpty()) {
                    return false;
                }
                waitUntilReadyLogger.message("At least one ping was successful for each awaited service; desired cluster state 'DEGRADED' is now satisfied.");
                return true;
            }).then(Mono.just(WaitUntilReadyCompletedEvent.Reason.SUCCESS));
        });
    }
}
