RBBTClient
is a JavaScript library designed for seamless interaction with RabbitMQ over WebSockets. It offers a simple and intuitive API for connecting to RabbitMQ brokers, managing exchanges and subscribing to queues.
To install the rbbt-client
package, use npm:
npm install rbbt-client
Here's a basic example of how to use RBBTClient
:
import { RBBTClient } from "rbbt-client";
// Initialize the RBBTClient: Stomp URL, vhost, username, password
const rbbt = new RBBTClient("ws://localhost:15674/ws", "/", "guest", "guest");
// The default stomp port is 15674, please don't make the same mistakes I made that lead to this package
// Connect to the RabbitMQ broker
const conn = rbbt.connect();
// Create a new channel
const ex = conn.exchange("amq.direct");
// Create a new exclusive queue
const q = ex.queue("", { exclusive: true });
// Bind the queue with a routing key
q.bind("test");
// Subscribe to the queue
q.subscribe({ noAck: true }, (msg) => {
console.log(msg);
});
url
: The WebSocket URL of the RabbitMQ broker (e.g.,"ws://localhost:15674/ws"
).vhost
: The virtual host to connect to (default:"/"
).username
: The username for authentication (default:"guest"
).password
: The password for authentication (default:"guest"
).name?
: Optional name for the client instance.
-
connect()
- Establishes a connection to the RabbitMQ broker using the provided credentials.
- Returns the
RBBTClient
instance for chaining further calls. - Example:
const conn = rbbt.connect();
-
close()
- Closes the connection to the RabbitMQ broker.
- Example:
conn.close();
-
exchange(name, options)
- Creates or retrieves an exchange by its
name
. - Parameters:
name
: The name of the exchange. If omitted, an automatically generated name is used.options
: The exchange options (default:{}
).
- Returns: An
RBBTExchange
object. - Example:
const ex = conn.exchange("amq.direct");
- Creates or retrieves an exchange by its
-
debug(msg)
- A function that can be overridden to log debug messages.
- Example:
conn.debug = (msg) => console.log(msg);
message
: A string describing the error.connection
: TheRBBTClient
instance that encountered the error.
RBBTError
is a custom error class used throughout the RBBTClient
library to handle client-specific errors.
try {
throw new RBBTError("Invalid connection", rbbt);
} catch (err) {
console.error(err.message); // "Invalid connection"
}
connection
: TheRBBTClient
instance used for the connection.name
: The name of the exchange. If not provided, a unique name will be generated (default:""
).options?
: Optional parameters to configure the exchange:passive
: Whether the exchange is passive (default:false
).durable
: Whether the exchange is durable (default:false
).autoDelete
: Whether the exchange should auto-delete (default:false
).internal
: Whether the exchange is internal (default:false
).
connection
: The associatedRBBTClient
instance.name
: The name of the exchange.watch
: The watch subscription for the exchange.helper
: An instance ofRBBTHelpers
.queues
: The list of queues associated with the exchange.closed
: A boolean indicating whether the exchange is closed (default:false
).options
: The options used to configure the exchange.
-
open()
- Opens the exchange by subscribing to it and watching for messages.
- Throws an error if the client is not connected.
- Example:
const exchange = new RBBTExchange(connection, "myExchange");
-
close()
- Closes the exchange by unsubscribing from the watch and marking it as closed.
- Example:
exchange.close();
-
send(body, routingKey, properties?)
- Sends a message to the exchange with the specified
body
,routingKey
, and optionalproperties
. - Parameters:
body
: The message body (can be astring
orUint8Array
).routingKey
: The routing key for the message.properties
: Additional message properties (default:{}
).
- Example:
exchange.send("Hello, World!", "myRoutingKey");
- Sends a message to the exchange with the specified
-
subscribe(callback, { noAck, exclusive })
- Subscribes to the exchange and receives messages, triggering the provided
callback
function. - Parameters:
callback
: The function to handle the received messages (message: RBBTMessage
).noAck
: Whether to automatically acknowledge messages (default:false
).exclusive
: Whether the subscription is exclusive (default:false
).
- Example:
exchange.subscribe((msg) => { console.log("Received message:", msg.body); });
- Subscribes to the exchange and receives messages, triggering the provided
-
unsubscribe()
- Unsubscribes from the exchange and stops receiving messages.
- Example:
exchange.unsubscribe();
-
queue(queueName, options?)
- Returns a queue associated with the exchange, creating it if necessary.
- Parameters:
queueName
: The name of the queue (default:""
).options
: Queue options such aspassive
,durable
,autoDelete
, andexclusive
.
- Example:
const queue = exchange.queue("myQueue", { durable: true });
exchange
: TheRBBTExchange
object associated with the message.
The RBBTMessage
class represents a message sent or received from an exchange or queue. It contains properties and the message body.
exchange
: The exchange where the message was published.routingKey
: The routing key used for message delivery (default:""
).properties
: A collection of message properties (headers, delivery mode, etc.).bodySize
: The size of the message body (default:0
).body
: The message content, which can be aUint8Array
,string
, ornull
.redelivered
: A flag indicating if the message was redelivered (default:false
).
const message = new RBBTMessage(exchange);
message.body = "Hello, world!";
console.log(message.body); // Output: "Hello, world!"
exchange
: TheRBBTExchange
instance associated with the queue.name
: The name of the queue. If not provided, a unique name will be generated (default:""
).options?
: Optional parameters to configure the queue:passive
: Whether the queue should be passive (default:false
).durable
: Whether the queue should be durable (default:true
for non-empty names,false
for empty names).autoDelete
: Whether the queue should auto-delete when no consumers are connected (default:true
for empty names,false
for non-empty names).exclusive
: Whether the queue should be exclusive (default:true
for empty names,false
for non-empty names).
-
create()
- Creates the queue if the connection client is active and the exchange is not closed.
- Example:
const q = new RBBTQueue(ex, "myQueue");
-
bind(routingKey?)
- Binds the queue to an exchange with the given
routingKey
. If the queue is already bound, the binding is updated. - Parameters:
routingKey
: The routing key for binding (default:""
).
- Example:
q.bind("routing.key");
- Binds the queue to an exchange with the given
-
unbind(routingKey?)
- Unbinds the queue from an exchange with the given
routingKey
. - Parameters:
routingKey
: The routing key for unbinding (default:""
).
- Example:
q.unbind("routing.key");
- Unbinds the queue from an exchange with the given
-
subscribe(options, callback)
- Subscribes to the queue for receiving messages. A callback is triggered on each new message.
- Parameters:
options
: Subscription options:noAck
: Whether to acknowledge messages automatically (default:true
).exclusive
: Whether the subscription is exclusive (default:false
).tag
: A custom tag for the subscription (default:""
).args
: Additional arguments for the subscription (default:{}
).
callback
: The function to handle the received messages (msg: RBBTMessage
).
- Example:
q.subscribe({ noAck: false }, (msg) => { console.log(msg.body); });
-
unsubscribe()
- Unsubscribes from the queue and stops receiving messages.
- Example:
q.unsubscribe();
-
send(body, properties?)
- Sends a message to the queue with the specified
body
andproperties
. - Parameters:
body
: The message body (can be astring
orUint8Array
).properties
: Additional message properties (default:{}
).
- Example:
q.send("Hello, World!", { priority: 1 });
- Sends a message to the queue with the specified
Defines the parameters for queue configuration.
passive
: (optional) Iftrue
, the queue must already exist (default:false
).durable
: (optional) Iftrue
, the queue will survive server restarts (default:true
ifname
is provided, otherwisefalse
).autoDelete
: (optional) Iftrue
, the queue will automatically delete itself when no longer in use (default:true
ifname
is not provided).exclusive
: (optional) Iftrue
, the queue is exclusive to the connection (default:true
ifname
is not provided).
type RBBTQueueParams = {
passive?: boolean;
durable?: boolean;
autoDelete?: boolean;
exclusive?: boolean;
};
Defines the parameters for consuming messages from a queue.
tag
: (optional) A consumer tag to identify the consumer.noAck
: (optional) Iftrue
, messages are automatically acknowledged (default:true
).exclusive
: (optional) Iftrue
, the consumer is exclusive (default:false
).args
: (optional) Additional arguments for the consumer.
export type RBBTConsumeParams = {
tag?: string;
noAck?: boolean;
exclusive?: boolean;
args?: Record<string, any>;
};
Defines the message properties that can be set on a published message.
headers
: (optional) Custom headers for the message, as key-value pairs.messageId
: (optional) A unique identifier for the message.
export type RBBTProperties = {
headers?: Record<string, any>;
messageId?: string;
};
Defines the parameters for exchange configuration.
passive
: (optional) Iftrue
, the exchange must already exist (default:false
).durable
: (optional) Iftrue
, the exchange will survive server restarts (default:false
).autoDelete
: (optional) Iftrue
, the exchange will be deleted when no longer in use (default:false
).internal
: (optional) Iftrue
, the exchange is used only for internal message routing (default:false
).args
: (optional) Additional arguments for the exchange, passed as key-value pairs.
export type RBBTExchangeParams = {
passive?: boolean;
durable?: boolean;
autoDelete?: boolean;
internal?: boolean;
args?: Record<string, any>;
};
Below is a full example demonstrating how to set up a connection, create a channel, declare a queue, bind it to an exchange, and start receiving messages:
import { RBBTClient } from "rbbt-client";
const rbbt = new RBBTClient("ws://localhost:15674/ws", "/", "guest", "guest");
// Step 2: Connect to RabbitMQ
const connection = rbbt.connect();
// Step 3: Create an exchange (this is where messages will be sent)
const exchange = connection.exchange("my.direct.exchange", {
durable: true, // The exchange will survive server restarts
});
// Step 4: Create a queue (this is where messages will be received)
const queue = exchange.queue("my.queue", {
durable: true, // The queue will survive server restarts
});
// Step 5: Bind the queue to the exchange using a routing key
queue.bind("my.routing.key");
// Step 6: Publish a message to the exchange
exchange.send("Hello RabbitMQ!", "my.routing.key");
// Step 7: Subscribe to the queue to receive messages
queue.subscribe(
{
noAck: false,
},
(message) => {
console.log("Received message:", message.body); // Display the message body in the console
},
);