Skip to content

Commit

Permalink
Enable PGMQ to run without installing an extension (#379)
Browse files Browse the repository at this point in the history
* Enable PGMQ to run without installing an extension

* Additional guard to avoid "An extension may only use CREATE ... IF NOT EXISTS to skip object creation if the conflicting object is one that it already owns."

* Adjusted line number
  • Loading branch information
axelfontaine authored Feb 17, 2025
1 parent 58fdf5e commit 87530ed
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 80 deletions.
182 changes: 103 additions & 79 deletions pgmq-extension/sql/pgmq.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
------------------------------------------------------------
-- Schema, tables, records, privileges, indexes, etc
------------------------------------------------------------
-- We don't need to create the `pgmq` schema because it is automatically
-- created by postgres due to being declared in extension control file
-- When installed as an extension, we don't need to create the `pgmq` schema
-- because it is automatically created by postgres due to being declared in
-- the extension control file
DO
$$
BEGIN
IF (SELECT NOT EXISTS( SELECT 1 FROM pg_extension WHERE extname = 'pgmq')) THEN
CREATE SCHEMA IF NOT EXISTS pgmq;
END IF;
END
$$;

-- Table where queues and metadata about them is stored
CREATE TABLE pgmq.meta (
Expand Down Expand Up @@ -268,6 +277,29 @@ BEGIN
END;
$$ LANGUAGE plpgsql;
-- send: actual implementation
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB,
headers JSONB,
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
INSERT INTO pgmq.%I (vt, message, headers)
VALUES ($2, $1, $3)
RETURNING msg_id;
$QUERY$,
qtable
);
RETURN QUERY EXECUTE sql USING msg, delay, headers;
END;
$$ LANGUAGE plpgsql;
-- send: 2 args, no delay or headers
CREATE FUNCTION pgmq.send(
queue_name TEXT,
Expand Down Expand Up @@ -313,26 +345,26 @@ CREATE FUNCTION pgmq.send(
SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp() + make_interval(secs => delay));
$$ LANGUAGE sql;
-- send: actual implementation
CREATE FUNCTION pgmq.send(
-- send_batch: actual implementation
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msg JSONB,
headers JSONB,
msgs JSONB[],
headers JSONB[],
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
$QUERY$
INSERT INTO pgmq.%I (vt, message, headers)
VALUES ($2, $1, $3)
SELECT $2, unnest($1), unnest(coalesce($3, ARRAY[]::jsonb[]))
RETURNING msg_id;
$QUERY$,
qtable
);
RETURN QUERY EXECUTE sql USING msg, delay, headers;
qtable
);
RETURN QUERY EXECUTE sql USING msgs, delay, headers;
END;
$$ LANGUAGE plpgsql;
Expand Down Expand Up @@ -381,29 +413,6 @@ CREATE FUNCTION pgmq.send_batch(
SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp() + make_interval(secs => delay));
$$ LANGUAGE sql;
-- send_batch: actual implementation
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
headers JSONB[],
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
INSERT INTO pgmq.%I (vt, message, headers)
SELECT $2, unnest($1), unnest(coalesce($3, ARRAY[]::jsonb[]))
RETURNING msg_id;
$QUERY$,
qtable
);
RETURN QUERY EXECUTE sql USING msgs, delay, headers;
END;
$$ LANGUAGE plpgsql;
-- returned by pgmq.metrics() and pgmq.metrics_all
CREATE TYPE pgmq.metrics_result AS (
queue_name text,
Expand Down Expand Up @@ -504,7 +513,9 @@ RETURNS VOID AS $$
DECLARE
atable TEXT := pgmq.format_table_name(queue_name, 'a');
BEGIN
EXECUTE format('ALTER EXTENSION pgmq DROP TABLE pgmq.%I', atable);
IF pgmq._extension_exists('pgmq') THEN
EXECUTE format('ALTER EXTENSION pgmq DROP TABLE pgmq.%I', atable);
END IF;
END
$$ LANGUAGE plpgsql;
Expand Down Expand Up @@ -601,26 +612,28 @@ BEGIN
queue_name
) INTO partitioned;
EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP TABLE pgmq.%I
$QUERY$,
qtable
);
IF pgmq._extension_exists('pgmq') THEN
EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP TABLE pgmq.%I
$QUERY$,
qtable
);
EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP SEQUENCE pgmq.%I
$QUERY$,
qtable_seq
);
EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP SEQUENCE pgmq.%I
$QUERY$,
qtable_seq
);
EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP TABLE pgmq.%I
$QUERY$,
atable
);
EXECUTE FORMAT(
$QUERY$
ALTER EXTENSION pgmq DROP TABLE pgmq.%I
$QUERY$,
atable
);
END IF;
EXECUTE FORMAT(
$QUERY$
Expand Down Expand Up @@ -729,13 +742,15 @@ BEGIN
atable
);
IF NOT pgmq._belongs_to_pgmq(qtable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable);
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq);
END IF;
IF pgmq._extension_exists('pgmq') THEN
IF NOT pgmq._belongs_to_pgmq(qtable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable);
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq);
END IF;
IF NOT pgmq._belongs_to_pgmq(atable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable);
IF NOT pgmq._belongs_to_pgmq(atable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable);
END IF;
END IF;
EXECUTE FORMAT(
Expand Down Expand Up @@ -801,13 +816,15 @@ BEGIN
atable
);
IF NOT pgmq._belongs_to_pgmq(qtable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable);
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq);
END IF;
IF pgmq._extension_exists('pgmq') THEN
IF NOT pgmq._belongs_to_pgmq(qtable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable);
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq);
END IF;
IF NOT pgmq._belongs_to_pgmq(atable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable);
IF NOT pgmq._belongs_to_pgmq(atable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable);
END IF;
END IF;
EXECUTE FORMAT(
Expand Down Expand Up @@ -851,18 +868,21 @@ BEGIN
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION pgmq._ensure_pg_partman_installed()
RETURNS void AS $$
DECLARE
extension_exists BOOLEAN;
BEGIN
SELECT EXISTS (
CREATE FUNCTION pgmq._extension_exists(extension_name TEXT)
RETURNS BOOLEAN
LANGUAGE SQL
AS $$
SELECT EXISTS (
SELECT 1
FROM pg_extension
WHERE extname = 'pg_partman'
) INTO extension_exists;
WHERE extname = extension_name
)
$$;
IF NOT extension_exists THEN
CREATE FUNCTION pgmq._ensure_pg_partman_installed()
RETURNS void AS $$
BEGIN
IF NOT pgmq._extension_exists('pg_partman') THEN
RAISE EXCEPTION 'pg_partman is required for partitioned queues';
END IF;
END;
Expand Down Expand Up @@ -910,9 +930,11 @@ BEGIN
qtable, partition_col
);
IF NOT pgmq._belongs_to_pgmq(qtable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable);
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq);
IF pgmq._extension_exists('pgmq') THEN
IF NOT pgmq._belongs_to_pgmq(qtable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable);
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq);
END IF;
END IF;
-- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md
Expand Down Expand Up @@ -988,8 +1010,10 @@ BEGIN
atable, a_partition_col
);
IF NOT pgmq._belongs_to_pgmq(atable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable);
IF pgmq._extension_exists('pgmq') THEN
IF NOT pgmq._belongs_to_pgmq(atable) THEN
EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable);
END IF;
END IF;
-- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md
Expand Down
2 changes: 1 addition & 1 deletion pgmq-extension/test/expected/base.out
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ SELECT * FROM pgmq.create_partitioned(
:'retention_interval'
);
ERROR: pg_partman is required for partitioned queues
CONTEXT: PL/pgSQL function pgmq._ensure_pg_partman_installed() line 12 at RAISE
CONTEXT: PL/pgSQL function pgmq._ensure_pg_partman_installed() line 4 at RAISE
SQL statement "SELECT pgmq._ensure_pg_partman_installed()"
PL/pgSQL function pgmq.create_partitioned(text,text,text) line 12 at PERFORM
-- With the extension existing, the queue is created successfully
Expand Down

0 comments on commit 87530ed

Please sign in to comment.