package com.couchbase.client.core.service;

import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.core.endpoint.Endpoint;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.internal.SignalFlush;
import com.couchbase.client.core.service.Service;
import com.couchbase.client.core.state.LifecycleState;
import java.util.concurrent.atomic.AtomicReference;
import rx.Subscriber;
import rx.functions.Action1;

/* loaded from: input_file:lib/core-io-1.7.11.jar:com/couchbase/client/core/service/AbstractLazyService.class */
public abstract class AbstractLazyService extends AbstractDynamicService {
    private final AtomicReference<Endpoint> storedEndpoint;

    protected AbstractLazyService(String str, String str2, String str3, String str4, int i, CoreContext coreContext, Service.EndpointFactory endpointFactory) {
        super(str, str2, str3, str4, i, coreContext, 0, endpointFactory);
        this.storedEndpoint = new AtomicReference<>();
    }

    @Override // com.couchbase.client.core.service.AbstractDynamicService
    protected void dispatch(final CouchbaseRequest couchbaseRequest) {
        if (this.storedEndpoint.get() == null) {
            final Endpoint createEndpoint = createEndpoint();
            if (this.storedEndpoint.compareAndSet(null, createEndpoint)) {
                endpointStates().register(createEndpoint, createEndpoint);
                createEndpoint.connect().subscribe((Subscriber<? super LifecycleState>) new Subscriber<LifecycleState>() { // from class: com.couchbase.client.core.service.AbstractLazyService.1
                    @Override // rx.Observer
                    public void onCompleted() {
                    }

                    @Override // rx.Observer
                    public void onError(Throwable th) {
                        couchbaseRequest.observable().onError(th);
                    }

                    @Override // rx.Observer
                    public void onNext(LifecycleState lifecycleState) {
                        if (lifecycleState == LifecycleState.DISCONNECTED) {
                            couchbaseRequest.observable().onError(new CouchbaseException("Could not connect storedEndpoint."));
                        }
                    }
                });
                whenState(createEndpoint, LifecycleState.DISCONNECTED, new Action1<LifecycleState>() { // from class: com.couchbase.client.core.service.AbstractLazyService.2
                    @Override // rx.functions.Action1
                    public void call(LifecycleState lifecycleState) {
                        AbstractLazyService.this.endpointStates().deregister(createEndpoint);
                        AbstractLazyService.this.storedEndpoint.set(null);
                    }
                });
            } else {
                createEndpoint.disconnect();
            }
        }
        sendAndFlushWhenConnected(this.storedEndpoint.get(), couchbaseRequest);
    }

    private static void sendAndFlushWhenConnected(final Endpoint endpoint, final CouchbaseRequest couchbaseRequest) {
        whenState(endpoint, LifecycleState.CONNECTED, new Action1<LifecycleState>() { // from class: com.couchbase.client.core.service.AbstractLazyService.3
            @Override // rx.functions.Action1
            public void call(LifecycleState lifecycleState) {
                Endpoint.this.send(couchbaseRequest);
                Endpoint.this.send(SignalFlush.INSTANCE);
            }
        });
    }

    Endpoint endpoint() {
        return this.storedEndpoint.get();
    }
}
