package com.couchbase.client.core.transaction.util;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.transaction.log.CoreTransactionLogger;
import java.util.Objects;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

@Stability.Internal
/* loaded from: input_file:lib/core-io-2.7.1.jar:com/couchbase/client/core/transaction/util/MonoBridge.class */
public class MonoBridge<T> {
    private boolean done;
    private final Sinks.One<T> actual = Sinks.one();
    private final Mono<T> external = this.actual.asMono();

    public MonoBridge(Mono<T> mono, String str, Object obj, @Nullable CoreTransactionLogger coreTransactionLogger) {
        Objects.requireNonNull(mono);
        Objects.requireNonNull(str);
        Objects.requireNonNull(obj);
        mono.onErrorResume(th -> {
            Mono empty;
            synchronized (obj) {
                if (!this.done) {
                    if (coreTransactionLogger != null) {
                        coreTransactionLogger.info("", "MB: [%s] propagating err %s", str, th.toString());
                    }
                    this.actual.tryEmitError(th).orThrow();
                } else if (coreTransactionLogger != null) {
                    coreTransactionLogger.info("", "MB: [%s] skipping err propagating as done", str);
                }
                empty = Mono.empty();
            }
            return empty;
        }).subscribe(obj2 -> {
            if (this.done) {
                if (coreTransactionLogger != null) {
                    coreTransactionLogger.info("", "MB: [%s] skipping next propagating as done", str);
                }
            } else {
                if (coreTransactionLogger != null) {
                    coreTransactionLogger.info("", "MB: [%s] propagating next", str);
                }
                this.actual.tryEmitValue(obj2).orThrow();
            }
        }, th2 -> {
            throw new IllegalStateException("Should not reach MonoBridge error producer");
        }, () -> {
            if (this.done) {
                if (coreTransactionLogger != null) {
                    coreTransactionLogger.info("", "MB: [%s] skipping complete propagating as done", str);
                }
            } else {
                if (coreTransactionLogger != null) {
                    coreTransactionLogger.info("", "MB: [%s] propagating complete", str);
                }
                this.actual.tryEmitEmpty();
            }
        });
        this.external.doOnCancel(() -> {
            if (coreTransactionLogger != null) {
                coreTransactionLogger.info("", "MB: [%s] is cancelled", str);
            }
            this.done = true;
        }).doOnTerminate(() -> {
            if (coreTransactionLogger != null) {
                coreTransactionLogger.info("", "MB: [%s] is errored or complete", str);
            }
            this.done = true;
        });
    }

    public Mono<T> external() {
        return this.external;
    }
}
