package com.couchbase.client.java.transactions;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.transaction.CoreTransactionContext;
import com.couchbase.client.core.transaction.CoreTransactionsReactive;
import com.couchbase.client.core.transaction.config.CoreMergedTransactionConfig;
import com.couchbase.client.core.transaction.config.CoreTransactionOptions;
import com.couchbase.client.core.transaction.threadlocal.TransactionMarker;
import com.couchbase.client.core.transaction.threadlocal.TransactionMarkerOwner;
import com.couchbase.client.java.codec.JsonSerializer;
import com.couchbase.client.java.transactions.config.TransactionOptions;
import com.couchbase.client.java.transactions.internal.ErrorUtil;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

/* loaded from: input_file:lib/java-client-3.7.1.jar:com/couchbase/client/java/transactions/ReactiveTransactions.class */
public class ReactiveTransactions {
    private final CoreTransactionsReactive internal;
    private final JsonSerializer serializer;

    @Stability.Internal
    public ReactiveTransactions(Core core, JsonSerializer jsonSerializer) {
        Objects.requireNonNull(core);
        this.internal = new CoreTransactionsReactive(core, core.context().environment().transactionsConfig());
        this.serializer = (JsonSerializer) Objects.requireNonNull(jsonSerializer);
    }

    public Mono<TransactionResult> run(Function<ReactiveTransactionAttemptContext, Mono<?>> function, @Nullable TransactionOptions transactionOptions) {
        return this.internal.run(coreTransactionAttemptContext -> {
            return (Mono) function.apply(new ReactiveTransactionAttemptContext(coreTransactionAttemptContext, this.serializer));
        }, transactionOptions == null ? null : transactionOptions.build()).onErrorResume(ErrorUtil::convertTransactionFailedInternal).map(TransactionResult::new);
    }

    public Mono<TransactionResult> run(Function<ReactiveTransactionAttemptContext, Mono<?>> function) {
        return run(function, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TransactionResult runBlocking(Consumer<TransactionAttemptContext> consumer, @Nullable CoreTransactionOptions coreTransactionOptions) {
        return (TransactionResult) Mono.defer(() -> {
            CoreMergedTransactionConfig coreMergedTransactionConfig = new CoreMergedTransactionConfig(this.internal.config(), Optional.ofNullable(coreTransactionOptions));
            CoreTransactionContext coreTransactionContext = new CoreTransactionContext(this.internal.core().context(), UUID.randomUUID().toString(), coreMergedTransactionConfig, this.internal.core().transactionsCleanup());
            return this.internal.executeTransaction(Mono.defer(() -> {
                return Mono.just(this.internal.createAttemptContext(coreTransactionContext, coreMergedTransactionConfig, UUID.randomUUID().toString()));
            }), coreMergedTransactionConfig, coreTransactionContext, coreTransactionAttemptContext -> {
                return Mono.defer(() -> {
                    TransactionAttemptContext transactionAttemptContext = new TransactionAttemptContext(coreTransactionAttemptContext, this.serializer);
                    return Mono.fromRunnable(() -> {
                        TransactionMarkerOwner.set(new TransactionMarker(coreTransactionAttemptContext));
                        try {
                            consumer.accept(transactionAttemptContext);
                        } finally {
                            TransactionMarkerOwner.clear();
                        }
                    }).subscribeOn(this.internal.core().context().environment().transactionsSchedulers().schedulerBlocking()).then();
                });
            }, false).onErrorResume(ErrorUtil::convertTransactionFailedInternal);
        }).map(TransactionResult::new).publishOn(this.internal.core().context().environment().transactionsSchedulers().schedulerBlocking()).block();
    }
}
