Skip to content

Commit

Permalink
Both Async and Blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
ygree committed Jul 16, 2018
1 parent 672ed3e commit 9006653
Show file tree
Hide file tree
Showing 12 changed files with 298 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

import akka.actor.Extension;
import com.couchbase.client.java.AsyncBucket;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import rx.Observable;

public interface Couchbase extends Extension {
CouchbaseEnvironment getEnvironment();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package com.lightbend.couchbase;

import com.google.inject.AbstractModule;
import com.lightbend.couchbase.blocking.CouchbaseBlocking;
import com.lightbend.couchbase.blocking.CouchbaseBlockingProvider;

public class CouchbaseModule extends AbstractModule {
@Override
protected void configure() {
bind(Couchbase.class).toProvider(CouchbaseProvider.class);
bind(CouchbaseBlocking.class).toProvider(CouchbaseBlockingProvider.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.lightbend.couchbase.blocking;

import akka.actor.Extension;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.CouchbaseCluster;

public interface CouchbaseBlocking extends Extension {
CouchbaseCluster getCluster();

Bucket getBucket();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.lightbend.couchbase.blocking;

import akka.actor.ActorSystem;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.auth.PasswordAuthenticator;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.typesafe.config.Config;

import java.util.List;
import java.util.Optional;

public class CouchbaseBlockingConfig {
private final Config config;
private final Optional<String> username;
private final Optional<String> bucketPassword;
private final String bucketName;
private final List<String> nodes;

public CouchbaseBlockingConfig(ActorSystem system) {
this(system.settings().config().getConfig("couchbase"));
}

protected CouchbaseBlockingConfig(Config config) {
this.config = config;
this.nodes = config.getStringList("nodes");
this.username = Optional.ofNullable(config.getString("username"));
this.bucketPassword = Optional.ofNullable(config.getString("password"));
this.bucketName = config.getString("bucket");
}

protected Bucket openBlockingBucket(Cluster cluster) {
if (username.isPresent()) {
cluster.authenticate(new PasswordAuthenticator(username.get(), bucketPassword.orElse(null)));
return cluster.openBucket(bucketName);
}
return cluster.openBucket(bucketName, bucketPassword.orElse(null));
}

protected CouchbaseCluster createBlockingCluster(CouchbaseEnvironment environment) {
return CouchbaseCluster.create(environment, nodes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.lightbend.couchbase.blocking;

import akka.actor.*;

public class CouchbaseBlockingExtension extends AbstractExtensionId<CouchbaseBlocking> implements ExtensionIdProvider {

public static final CouchbaseBlockingExtension CouchbaseExtensionProvider = new CouchbaseBlockingExtension();

private CouchbaseBlockingExtension() {}

@Override
public ExtensionId<? extends Extension> lookup() {
return CouchbaseExtensionProvider;
}

@Override
public CouchbaseBlocking createExtension(ExtendedActorSystem system) {
DefaultCouchbaseBlocking couchbase = new DefaultCouchbaseBlocking(system);
system.registerOnTermination(couchbase::shutdown);
return couchbase;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.lightbend.couchbase.blocking;

import akka.actor.ActorSystem;
import com.google.inject.Provider;

import javax.inject.Inject;

public class CouchbaseBlockingProvider implements Provider<CouchbaseBlocking> {

private final ActorSystem system;

@Inject
public CouchbaseBlockingProvider(ActorSystem system) {
this.system = system;
}

@Override
public CouchbaseBlocking get() {
return (CouchbaseBlocking) CouchbaseBlockingExtension.CouchbaseExtensionProvider.get(system);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.lightbend.couchbase.blocking;

import akka.actor.ExtendedActorSystem;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import com.couchbase.client.core.utils.Blocking;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;

import java.util.concurrent.TimeUnit;

public class DefaultCouchbaseBlocking implements CouchbaseBlocking {
private final LoggingAdapter log;

private final DefaultCouchbaseEnvironment environment;
private final CouchbaseBlockingConfig couchbaseConfig;

private final CouchbaseCluster clusterBlocking;
private final Bucket bucketBlocking;

public DefaultCouchbaseBlocking(ExtendedActorSystem system) {
this.log = Logging.getLogger(system, getClass().getName());
this.environment = DefaultCouchbaseEnvironment.create();
this.couchbaseConfig = new CouchbaseBlockingConfig(system);

this.clusterBlocking = couchbaseConfig.createBlockingCluster(environment);
this.bucketBlocking = couchbaseConfig.openBlockingBucket(clusterBlocking);
}

@Override
public CouchbaseCluster getCluster() {
return clusterBlocking;
}

@Override
public Bucket getBucket() {
return bucketBlocking;
}

void shutdown() {
//TODO handle errors properly and add logging
bucketBlocking.close();
clusterBlocking.disconnect();

Blocking.blockForSingle(environment.shutdownAsync().single(), 30, TimeUnit.SECONDS);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.lightbend.readside.impl;

import akka.Done;
import com.couchbase.client.java.AsyncBucket;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonArray;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.query.N1qlQuery;
import com.couchbase.client.java.query.ParameterizedN1qlQuery;
import com.lightbend.couchbase.Couchbase;
import rx.Observable;
import utils.RxJava8Utils;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.Optional;
import java.util.concurrent.CompletionStage;

@Singleton
public class CrudAsyncRepository implements CrudRepository {

private final Couchbase couchbase;

@Inject
public CrudAsyncRepository(Couchbase couchbase) {
this.couchbase = couchbase;
}

@Override
public CompletionStage<Done> updateMessage(String name, String message) {

AsyncBucket bucket = couchbase.getBucket();

JsonObject obj = JsonObject.create()
.put("messages", JsonArray.from(message))
.put("message", message);

String docId = userMessageDocId(name);
JsonDocument doc = JsonDocument.create(docId, obj);

String queryText = "UPDATE test USE KEYS $1 SET messages = ARRAY_PREPEND($2, IFNULL(messages, [])), message = $2;";
ParameterizedN1qlQuery query = N1qlQuery.parameterized(queryText, JsonArray.from(docId, message));

Observable<Done> result = bucket
.insert(doc).map(x -> Done.getInstance())
.onErrorResumeNext(e -> bucket.query(query).map(x -> Done.getInstance()));

return RxJava8Utils.fromSingleObservable(result);
}

private String userMessageDocId(String name) {
return "crud_user_messages:" + name;
}

@Override
public CompletionStage<Optional<String>> getMessage(String name) {

String docId = userMessageDocId(name);

AsyncBucket bucket = couchbase.getBucket();

Observable<Optional<String>> result = bucket
.get(docId)
.map(v -> Optional.ofNullable(v.content().getString("message")));

return RxJava8Utils.fromSingleOptOptObservable(result);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.lightbend.readside.impl;

import akka.Done;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonArray;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.error.DocumentAlreadyExistsException;
import com.couchbase.client.java.query.N1qlQuery;
import com.couchbase.client.java.query.ParameterizedN1qlQuery;
import com.lightbend.couchbase.blocking.CouchbaseBlocking;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@Singleton
public class CrudBlockingRepository implements CrudRepository {

private final CouchbaseBlocking couchbase;

@Inject
public CrudBlockingRepository(CouchbaseBlocking couchbase) {
this.couchbase = couchbase;
}

private String userMessageDocId(String name) {
return "crud_user_messages:" + name;
}

public CompletionStage<Done> updateMessage(String name, String message) {

Bucket bucket = couchbase.getBucket();

JsonObject obj = JsonObject.create()
.put("messages", JsonArray.from(message))
.put("message", message);

String docId = userMessageDocId(name);
JsonDocument doc = JsonDocument.create(docId, obj);

String queryText = "UPDATE test USE KEYS $1 SET messages = ARRAY_PREPEND($2, IFNULL(messages, [])), message = $2;";
ParameterizedN1qlQuery query = N1qlQuery.parameterized(queryText, JsonArray.from(docId, message));

return CompletableFuture.supplyAsync(() -> {
try {
bucket.insert(doc);
} catch (DocumentAlreadyExistsException e) {
bucket.query(query);
}
return Done.getInstance();
});
}

public CompletionStage<Optional<String>> getMessage(String name) {

String docId = userMessageDocId(name);

Bucket bucket = couchbase.getBucket();


JsonDocument jsonDocument = bucket.get(docId);

Optional<String> result = Optional.ofNullable(jsonDocument.content().getString("message"));

return CompletableFuture.completedFuture(result);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
public class CrudModule extends AbstractModule implements ServiceGuiceSupport {
@Override
protected void configure() {

bind(CrudRepository.class).to(CrudAsyncRepository.class);
// bind(CrudRepository.class).to(CrudBlockingRepository.class);

// Bind the ReadSideService service
bindService(ReadSideService.class, CrudServiceImpl.class);

Expand Down
Loading

0 comments on commit 9006653

Please sign in to comment.