package com.couchbase.client.core.service.kv;

import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.core.cnc.TracingIdentifiers;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.error.FeatureNotAvailableException;
import com.couchbase.client.core.error.ReplicaNotConfiguredException;
import com.couchbase.client.core.msg.kv.MutationToken;
import com.couchbase.client.core.msg.kv.ObserveViaSeqnoRequest;
import com.couchbase.client.core.retry.RetryStrategy;
import com.couchbase.client.core.retry.reactor.Repeat;
import java.time.Duration;
import java.util.ArrayList;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:lib/core-io-2.7.1.jar:com/couchbase/client/core/service/kv/Observe.class */
public class Observe {

    /* loaded from: input_file:lib/core-io-2.7.1.jar:com/couchbase/client/core/service/kv/Observe$ObservePersistTo.class */
    public enum ObservePersistTo {
        ACTIVE(-1),
        NONE(0),
        ONE(1),
        TWO(2),
        THREE(3),
        FOUR(4);

        private final short value;

        ObservePersistTo(short s) {
            this.value = s;
        }

        public short value() {
            return this.value;
        }

        public boolean touchesReplica() {
            return this.value > 0;
        }
    }

    /* loaded from: input_file:lib/core-io-2.7.1.jar:com/couchbase/client/core/service/kv/Observe$ObserveReplicateTo.class */
    public enum ObserveReplicateTo {
        NONE(0),
        ONE(1),
        TWO(2),
        THREE(3);

        private final short value;

        ObserveReplicateTo(short s) {
            this.value = s;
        }

        public short value() {
            return this.value;
        }

        public boolean touchesReplica() {
            return this.value > 0;
        }
    }

    public static Mono<Void> poll(ObserveContext observeContext) {
        if (observeContext.persistTo() == ObservePersistTo.NONE && observeContext.replicateTo() == ObserveReplicateTo.NONE) {
            return Mono.empty();
        }
        if (!observeContext.environment().ioConfig().mutationTokensEnabled() || !observeContext.mutationToken().isPresent()) {
            return Mono.error(new FeatureNotAvailableException("To use PersistTo and/or ReplicateTo, mutation tokens must be enabled on the IO configuration"));
        }
        RequestSpan requestSpan = observeContext.environment().requestTracer().requestSpan(TracingIdentifiers.SPAN_REQUEST_KV_OBSERVE, observeContext.parentSpan());
        return maybeRetry(Flux.defer(() -> {
            return Flux.just(Integer.valueOf(validateReplicas(observeContext.core().clusterConfig().bucketConfig(observeContext.collectionIdentifier().bucket()), observeContext.persistTo(), observeContext.replicateTo())));
        }).flatMap(num -> {
            return viaMutationToken(num.intValue(), observeContext, requestSpan);
        }), observeContext).timeout(observeContext.timeout(), observeContext.environment().scheduler()).doFinally(signalType -> {
            requestSpan.end();
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Flux<ObserveItem> viaMutationToken(int i, ObserveContext observeContext, RequestSpan requestSpan) {
        if (!observeContext.mutationToken().isPresent()) {
            throw new IllegalStateException("MutationToken is not present, this is a bug!");
        }
        Duration timeout = observeContext.timeout();
        RetryStrategy retryStrategy = observeContext.retryStrategy();
        MutationToken mutationToken = observeContext.mutationToken().get();
        String key = observeContext.key();
        ArrayList arrayList = new ArrayList();
        if (observeContext.persistTo() != ObservePersistTo.NONE) {
            arrayList.add(new ObserveViaSeqnoRequest(timeout, observeContext, observeContext.collectionIdentifier(), retryStrategy, 0, true, mutationToken.partitionUUID(), key, observeContext.environment().requestTracer().requestSpan(TracingIdentifiers.SPAN_REQUEST_KV_OBSERVE, requestSpan)));
        }
        if (observeContext.persistTo().touchesReplica() || observeContext.replicateTo().touchesReplica()) {
            short s = 1;
            while (true) {
                short s2 = s;
                if (s2 > i) {
                    break;
                }
                arrayList.add(new ObserveViaSeqnoRequest(timeout, observeContext, observeContext.collectionIdentifier(), retryStrategy, s2, false, mutationToken.partitionUUID(), key, observeContext.environment().requestTracer().requestSpan(TracingIdentifiers.SPAN_REQUEST_KV_OBSERVE, requestSpan)));
                s = (short) (s2 + 1);
            }
        }
        return Flux.fromIterable(arrayList).flatMap(observeViaSeqnoRequest -> {
            observeContext.core().send(observeViaSeqnoRequest);
            return Reactor.wrap(observeViaSeqnoRequest, observeViaSeqnoRequest.response(), true).doOnNext(observeViaSeqnoResponse -> {
                observeViaSeqnoRequest.context().logicallyComplete();
            }).doOnError(th -> {
                observeViaSeqnoRequest.context().logicallyComplete(th);
            }).doOnCancel(() -> {
                observeViaSeqnoRequest.context().logicallyComplete();
            }).onErrorResume(th2 -> {
                return Mono.empty();
            });
        }).map(observeViaSeqnoResponse -> {
            return ObserveItem.fromMutationToken(mutationToken, observeViaSeqnoResponse);
        });
    }

    private static Mono<Void> maybeRetry(Flux<ObserveItem> flux, ObserveContext observeContext) {
        return flux.scan(ObserveItem.empty(), (v0, v1) -> {
            return v0.add(v1);
        }).repeatWhen(Repeat.times(Long.MAX_VALUE).exponentialBackoff(Duration.ofNanos(10000L), Duration.ofMillis(100L))).skipWhile(observeItem -> {
            return !observeItem.check(observeContext.persistTo(), observeContext.replicateTo());
        }).take(1L).then();
    }

    private static int validateReplicas(BucketConfig bucketConfig, ObservePersistTo observePersistTo, ObserveReplicateTo observeReplicateTo) {
        if (!(bucketConfig instanceof CouchbaseBucketConfig)) {
            throw new FeatureNotAvailableException("Only couchbase buckets support PersistTo and/or ReplicateTo");
        }
        CouchbaseBucketConfig couchbaseBucketConfig = (CouchbaseBucketConfig) bucketConfig;
        int numberOfReplicas = couchbaseBucketConfig.numberOfReplicas();
        if (couchbaseBucketConfig.ephemeral() && observePersistTo.value() != 0) {
            throw new FeatureNotAvailableException("Ephemeral Buckets do not support PersistTo");
        }
        if (observeReplicateTo.touchesReplica() && observeReplicateTo.value() > numberOfReplicas) {
            throw new ReplicaNotConfiguredException("Not enough replicas configured on the bucket");
        }
        if (!observePersistTo.touchesReplica() || observePersistTo.value() - 1 <= numberOfReplicas) {
            return numberOfReplicas;
        }
        throw new ReplicaNotConfiguredException("Not enough replicas configured on the bucket");
    }
}
