id | title |
---|---|
gcs |
Google Cloud Storage |
Google Cloud Storage is a durable and highly available object storage service, almost infinitely scalable and guarantees consistency: when a write succeeds, the latest copy of the object will be returned to any get request, globally.
Add the following dependency to get started:
libraryDependencies += "io.monix" %% "monix-gcs" % "0.5.1"
The Monix Google Cloud Storage connector is built on top of the Google Cloud Storage Client for Java and is divided into three main abstractions: Storage, Bucket and Blob, which will be described and explained in detail on the next sections:
The ´Storage´ acts as an interface for Google Cloud Storage, it provides very basic functionality limited to authentication for creating a connection with the service, and to creating and accessing the Buckets and Blobs.
The connector uses the Application Default Credentials method for
authenticating to GCS. This requires the user to have the
GOOGLE_APPLICATION_CREDENTIALS
environment variable pointing to a
Service Account with the required permissions in order to use the
connector.
import monix.connect.gcp.storage.GcsStorage
val storage = GcsStorage.create()
Alternatively you will be able to point a credentials
file on disk in the event you don't have the
GOOGLE_APPLICATION_CREDENTIALS
environment variable set.
import java.nio.file.Paths
import monix.connect.gcp.storage.GcsStorage
val projectId = "monix-connect-gcs"
val credentials = Paths.get("/path/to/credentials.json")
val storage = GcsStorage.create(projectId, credentials)
Once you have a GcsStorage object created you can begin to work with GCS, the first thing is to create a new GcsBucket from the same instance:
import java.io.File
import monix.connect.gcp.storage.{GcsStorage, GcsBucket}
import monix.connect.gcp.storage.configuration.GcsBucketInfo
import monix.connect.gcp.storage.configuration.GcsBucketInfo.Locations
val storage = GcsStorage.create()
val metadata = GcsBucketInfo.Metadata(
labels = Map(
"project" -> "my-first-gcs-bucket"
),
storageClass = Some(StorageClass.REGIONAL)
)
val bucket: Task[GcsBucket] = storage.createBucket("mybucket", Locations.`EUROPE-WEST1`, Some(metadata)).memoizeOnSuccess
On the other hand, you can create Blobs in the same way as with Bucket.
import monix.connect.gcp.storage.{GcsStorage, GcsBlob}
val storage: GcsStorage = GcsStorage.create()
val blob: Task[GcsBlob] = storage.createBlob("mybucket", "myBlob").memoizeOnSuccess
It also exposes a get operation for buckets and blobs that gets executed asyncronously
and it is type-safe
, returning an Option
with the resource we asked for, being None if it did not existed:
import monix.connect.gcp.storage.{GcsStorage, GcsBlob}
val storage: GcsStorage = GcsStorage.create()
val t: Task[Unit] = {
for {
maybeBucket <- storage.getBucket("myBucket"): Task[Option[GcsBucket]]
_ <- maybeBucket match {
case Some(bucket) => Task.now(println("My bucket exists!"))
case None => Task.unit // alternatively a failure could be raised
}
} yield ()
}
The same would apply for Blob.
import monix.connect.gcp.storage.{GcsStorage, GcsBlob}
val storage: GcsStorage = GcsStorage.create()
val t: Task[Unit] = {
for {
maybeBlob <- storage.getBlob("myBucket", "myBlob"): Task[Option[GcsBlob]]
_ <- maybeBlob match {
case Some(blob) => Task.now(println("My blob exists!"))
case None => Task.unit // alternatively a failure could be raised
}
} yield ()
}
You could also find a list of buckets or blobs by using respectively the signatures getBuckets
and getBlobs
, and also list all of them with listBuckets
and listBlobs
.
A Bucket is basically a container that holds your data in GCS. You can use buckets to organize your data and control its access but unlike directories and folders, you cannot nest them.
The Monix GCS connector relies in the underlying com.google.cloud.storage.Bucket
, but with some additions and and integrations with Monix
data types that makes it possible expose an idiomatic and type-safe
non blocking api.
This implementation is named GcsBucket
, and you can start using it different ways listed in the following example:
import java.io.File
import monix.connect.gcp.storage.{GcsStorage, GcsBucket}
val storage: GcsStorage = GcsStorage.create()
/** 1- When creating a bucket you will make sure that the bucket you want to use exists,
* since it returns the new bucket on completion. */
val bucket1: Task[GcsBucket] = storage.createBucket("mybucket1", Locations.`EUROPE-WEST1`).memoizeOnSuccess
/** 2- You can also get / find the bucket by its name, in this case if it does not exist
* it will return an empty Option. */
val bucket2: Task[Option[GcsBucket]] = storage.getBucket("myBucket2")
/** 3- Finally, if you do already have an instance of [[com.google.cloud.storage.Bucket]],
* you can convert it to a GcsBucket by using its compainon object*/
val underlying: com.google.cloud.storage.Bucket = ???
val bucket3: GcsBucket = GcsBucket(underlying)
Once we have an instance of GcsBucket
, we will be able to use its very simple methods that it exposes to manage our Bucket, such like get blob/s stored in it, update, reload its metadata, various ones to manage its Access Control List (ACL), etc.
There are no code examples on the documentation to show these operations since they are very basic and easy to use. On the other hand, there are also available methods for uploading and downloading from Blobs of this same Bucket, they are very interesting and unique from this connector, see how can they be used in below code examples.
In order to download a blob using the GcsBucket
you would just need to specify the Blob name that should be allocated in the same Bucket:
val bucket: Task[Option[GcsBucket]] = storage.getBucket("myBucket")
val ob: Observable[Array[Byte]] = {
Observable.fromTask(bucket)
.flatMap {
case Some(bucket) => bucket.download("myBlob")
case None => Observable.empty // alternatively a failure could be raised
}
}
There is also a handy operation for downloading directly into a file, beware that GCS is designed to allocate files of any size, therefore it should only be used with relative small files that we know for a fact that our local system have enough capacity.
import java.io.File
import monix.connect.gcp.storage.{GcsStorage, GcsBucket}
import monix.eval.Task
val storage = GcsStorage.create()
val targetFile = new File("example/target/file.txt")
val t: Task[Unit] = {
for {
maybeBucket <- storage.getBucket("myBucket"): Task[Option[GcsBucket]]
_ <- maybeBucket match {
case Some(bucket) => bucket.downloadToFile("myBlob", targetFile.toPath)
case None => Task.unit // alternatively a failure could be raised
}
} yield ()
}
On the other hand you can upload data into a Blob by using the pre-built Consumer
implementation that expects and pushes byte arrays
into the specified Blob
and materializes to Unit
when it completes.
import monix.connect.gcp.storage.{GcsStorage, GcsBucket}
import monix.eval.Task
val storage = GcsStorage.create()
val memoizedBucket = storage.createBucket("mybucket", Locations.`EUROPE-WEST1`, Some(metadata)).memoizeOnSuccess
val ob: Observable[Array[Byte]] = Observable.now("dummy content".getBytes)
val t: Task[Unit] = for {
bucket <- memoizedBucket: Task[GcsBucket]
_ <- ob.consumeWith(bucket.upload("myBlob"))
} yield ()
Alternatively, you can upload data from a local file into the specified Blob.
import java.io.File
import monix.connect.gcp.storage.{GcsStorage, GcsBucket}
import monix.eval.Task
val storage: GcsStorage = GcsStorage.create()
val memoizedBucket = storage.createBucket("mybucket", Locations.`EUROPE-WEST1`, Some(metadata)).memoizeOnSuccess
val sourceFile = new File("example/source/file.txt")
val t: Task[Unit] = for {
bucket <- memoizedBucket
unit <- bucket.uploadFromFile("myBlob", sourceFile.toPath)
} yield ()
A Blob is nothing else than an Object, pieces of data that you have uploaded to Cloud Storage that have to reside in a Bucket.
The representation of an object in this connector is called GcsBlob
, it provides also various simple methods for managing
things like update metadata, manage its acl and delete permanently.
GcsBlob
also exposes the methods download, downloadToFile, upload and uploadFromFile that allow to manage the Blob's data in a reactive way.
The only difference from using GcsBucket
is that but in this case there is no need of specifying the Bucket,
since it will use the one which the Blob is stored in.
An overloaded method that allows you to copy a Blob into the specified Bucket and Blob. The target Bucket can be the same or a different as the source.
import monix.connect.gcp.storage.{GcsStorage, GcsBlob}
import monix.eval.Task
val storage = GcsStorage.create()
val sourceBlob: Task[GcsBlob] = storage.createBlob("myBucket", "sourceBlob").memoizeOnSuccess
val targetBlob: Task[GcsBlob] = sourceBlob.flatMap(_.copyTo("targetBucket", "targetBlob"))
Testing Google Cloud Storage locally and offline is challenging since there is yet 'not too good support' on that front.
There is a google library called java-storage-nio that emulates this service, however, it has some limitations since it does not provide support for some the operations (mostly for the Bucket api) and it is not thread-safe. That's why it is highly recommended to run the functional tests directly using the Google Cloud Storage service.
However, in case you can not access to the real google cloud service, this library will be suitable for you:
Add it to the sbt library dependencies:
libraryDependencies ++= "com.google.cloud" % "google-cloud-nio" % "0.121.2" % Test
Then you should be able to create a fake Storage
instance and use it to build GcsStorage
from the companion object
apply method.
import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper
import com.google.cloud.storage.{Blob, BlobId, BlobInfo, Storage, Option => _}
val storage: Storage = LocalStorageHelper.getOptions.getService
val blobInfo: BlobInfo = BlobInfo.newBuilder(BlobId.of("myBucket", "myBlob")).build
val blob: Blob = storage.create(blobInfo)
val gcsBlob: GcsBlob = new GcsBlob(blob)
Some advantages against using the real service would be that it does not require to deal with any type of google access credentials, which may be good in some cases and it can save crucial time spent on setting the right credentials.