NATS Controllers for Kubernetes (NACK)
The JetStream controllers allows you to manage NATS JetStream Streams and Consumers via K8S CRDs.
First, we'll need to NATS cluster that has enabled JetStream. You can install one as follows:
# Creates cluster of NATS Servers that are not JetStream enabled
$ kubectl apply -f https://raw.githubusercontent.com/nats-io/k8s/master/nats-server/simple-nats.yml
# Creates NATS Server with JetStream enabled as a leafnode connection
$ kubectl apply -f https://raw.githubusercontent.com/nats-io/k8s/master/nats-server/nats-js-leaf.yml
Now install the JetStream CRDs and Controller:
$ kubectl apply -f https://raw.githubusercontent.com/nats-io/nack/main/deploy/crds.yml
customresourcedefinition.apiextensions.k8s.io/streams.jetstream.nats.io configured
customresourcedefinition.apiextensions.k8s.io/consumers.jetstream.nats.io configured
customresourcedefinition.apiextensions.k8s.io/streamtemplates.jetstream.nats.io configured
$ kubectl apply -f https://raw.githubusercontent.com/nats-io/nack/main/deploy/rbac.yml
$ kubectl apply -f https://raw.githubusercontent.com/nats-io/nack/main/deploy/deployment.yml
You can also use Helm to install:
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install nats nats/nats --set nats.image=synadia/nats-server:nightly --set=nats.jetstream.enabled=true
helm install nack nats/nack --set=jetstream.nats.url=nats://nats:4222
Let's create a a stream and a couple of consumers:
---
apiVersion: jetstream.nats.io/v1beta1
kind: Stream
metadata:
name: mystream
spec:
name: mystream
subjects: ["orders.*"]
storage: memory
maxAge: 1h
---
apiVersion: jetstream.nats.io/v1beta1
kind: Consumer
metadata:
name: my-push-consumer
spec:
streamName: mystream
durableName: my-push-consumer
deliverSubject: my-push-consumer.orders
deliverPolicy: last
ackPolicy: none
replayPolicy: instant
---
apiVersion: jetstream.nats.io/v1beta1
kind: Consumer
metadata:
name: my-pull-consumer
spec:
streamName: mystream
durableName: my-pull-consumer
deliverPolicy: all
filterSubject: orders.received
maxDeliver: 20
ackPolicy: explicit
# Create a stream.
$ kubectl apply -f https://raw.githubusercontent.com/nats-io/nack/main/deploy/examples/stream.yml
# Check if it was successfully created.
$ kubectl get streams
NAME STATE STREAM NAME SUBJECTS
mystream Created mystream [orders.*]
# Create a push-based consumer
$ kubectl apply -f https://raw.githubusercontent.com/nats-io/nack/main/deploy/examples/consumer_push.yml
# Create a pull based consumer
$ kubectl apply -f https://raw.githubusercontent.com/nats-io/nack/main/deploy/examples/consumer_pull.yml
# Check if they were successfully created.
$ kubectl get consumers
NAME STATE STREAM CONSUMER ACK POLICY
my-pull-consumer Created mystream my-pull-consumer explicit
my-push-consumer Created mystream my-push-consumer none
# If you end up in an Errored state, run kubectl describe for more info.
# kubectl describe streams mystream
# kubectl describe consumers my-pull-consumer
Now we're ready to use Streams and Consumers. Let's start off with writing some
data into mystream
.
# Run nats-box that includes the NATS management utilities, and exec into it.
$ kubectl apply -f https://nats-io.github.io/k8s/tools/nats-box.yml
$ kubectl exec -it nats-box -- /bin/sh -l
# Publish a couple of messages from nats-box
nats-box:~$ nats context save jetstream -s nats://nats:4222
nats-box:~$ nats context select jetstream
nats-box:~$ nats pub orders.received "order 1"
nats-box:~$ nats pub orders.received "order 2"
First, we'll read the data using a pull-based consumer.
From the above my-pull-consumer
Consumer CRD, we have set the filterSubject
of orders.received
. You can double check with the following command:
$ kubectl get consumer my-pull-consumer -o jsonpath={.spec.filterSubject}
orders.received
So that's the subject my-pull-consumer will pull messages from.
# Pull first message.
nats-box:~$ nats consumer next mystream my-pull-consumer
--- subject: orders.received / delivered: 1 / stream seq: 1 / consumer seq: 1
order 1
Acknowledged message
# Pull next message.
nats-box:~$ nats consumer next mystream my-pull-consumer
--- subject: orders.received / delivered: 1 / stream seq: 2 / consumer seq: 2
order 2
Acknowledged message
Next, let's read data using a push-based consumer.
From the above my-push-consumer
Consumer CRD, we have set the deliverSubject
of my-push-consumer.orders
, as you can confirm with the following command:
$ kubectl get consumer my-push-consumer -o jsonpath={.spec.deliverSubject}
my-push-consumer.orders
So pushed messages will arrive on that subject. This time all messages arrive automatically.
nats-box:~$ nats sub my-push-consumer.orders
17:57:24 Subscribing on my-push-consumer.orders
[#1] Received JetStream message: consumer: mystream > my-push-consumer / subject: orders.received /
delivered: 1 / consumer seq: 1 / stream seq: 1 / ack: false
order 1
[#2] Received JetStream message: consumer: mystream > my-push-consumer / subject: orders.received /
delivered: 1 / consumer seq: 2 / stream seq: 2 / ack: false
order 2
# First, build the jetstream controller.
make jetstream-controller
# Next, run the controller like this
./jetstream-controller -kubeconfig ~/.kube/config -s nats://localhost:4222
# Pro tip: jetstream-controller uses klog just like kubectl or kube-apiserver.
# This means you can change the verbosity of logs with the -v flag.
#
# For example, this prints raw HTTP requests and responses.
# ./jetstream-controller -v=10
# You'll probably want to start a local Jetstream-enabled NATS server, unless
# you use a public one.
nats-server -DV -js
Build Docker image
make jetstream-controller-docker jetstreamVersion=1.2.3
This is a sidecar that you can use to automatically reload your NATS Server configuration file.
For more information see the Chart repo.
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install my-nats nats/nats
reloader:
enabled: true
image: natsio/nats-server-config-reloader:0.6.0
pullPolicy: IfNotPresent
# First, build the config reloader.
make nats-server-config-reloader
# Next, run the reloader like this
./nats-server-config-reloader
Build Docker image
make nats-server-config-reloader-docker configReloaderVersion=1.2.3
For more information see the Chart repo.
helm repo add nats https://nats-io.github.io/k8s/helm/charts/
helm install my-nats nats/nats
bootconfig:
image: natsio/nats-boot-config:0.5.2
pullPolicy: IfNotPresent
# First, build the project.
make nats-boot-config
# Next, run the project like this
./nats-boot-config
Build Docker image
make nats-boot-config-docker bootConfigVersion=1.2.3