diff --git a/pgmq-extension/sql/pgmq.sql b/pgmq-extension/sql/pgmq.sql index 1d788e1a..8a3e6fb2 100644 --- a/pgmq-extension/sql/pgmq.sql +++ b/pgmq-extension/sql/pgmq.sql @@ -1,8 +1,17 @@ ------------------------------------------------------------ -- Schema, tables, records, privileges, indexes, etc ------------------------------------------------------------ --- We don't need to create the `pgmq` schema because it is automatically --- created by postgres due to being declared in extension control file +-- When installed as an extension, we don't need to create the `pgmq` schema +-- because it is automatically created by postgres due to being declared in +-- the extension control file +DO +$$ +BEGIN + IF (SELECT NOT EXISTS( SELECT 1 FROM pg_extension WHERE extname = 'pgmq')) THEN + CREATE SCHEMA IF NOT EXISTS pgmq; + END IF; +END +$$; -- Table where queues and metadata about them is stored CREATE TABLE pgmq.meta ( @@ -268,6 +277,29 @@ BEGIN END; $$ LANGUAGE plpgsql; +-- send: actual implementation +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB, + headers JSONB, + delay TIMESTAMP WITH TIME ZONE +) RETURNS SETOF BIGINT AS $$ +DECLARE + sql TEXT; + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); +BEGIN + sql := FORMAT( + $QUERY$ + INSERT INTO pgmq.%I (vt, message, headers) + VALUES ($2, $1, $3) + RETURNING msg_id; + $QUERY$, + qtable + ); + RETURN QUERY EXECUTE sql USING msg, delay, headers; +END; +$$ LANGUAGE plpgsql; + -- send: 2 args, no delay or headers CREATE FUNCTION pgmq.send( queue_name TEXT, @@ -313,11 +345,11 @@ CREATE FUNCTION pgmq.send( SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp() + make_interval(secs => delay)); $$ LANGUAGE sql; --- send: actual implementation -CREATE FUNCTION pgmq.send( +-- send_batch: actual implementation +CREATE FUNCTION pgmq.send_batch( queue_name TEXT, - msg JSONB, - headers JSONB, + msgs JSONB[], + headers JSONB[], delay TIMESTAMP WITH TIME ZONE ) RETURNS SETOF BIGINT AS $$ DECLARE @@ -325,14 +357,14 @@ DECLARE qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN sql := FORMAT( - $QUERY$ + $QUERY$ INSERT INTO pgmq.%I (vt, message, headers) - VALUES ($2, $1, $3) + SELECT $2, unnest($1), unnest(coalesce($3, ARRAY[]::jsonb[])) RETURNING msg_id; $QUERY$, - qtable - ); - RETURN QUERY EXECUTE sql USING msg, delay, headers; + qtable + ); + RETURN QUERY EXECUTE sql USING msgs, delay, headers; END; $$ LANGUAGE plpgsql; @@ -381,29 +413,6 @@ CREATE FUNCTION pgmq.send_batch( SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp() + make_interval(secs => delay)); $$ LANGUAGE sql; --- send_batch: actual implementation -CREATE FUNCTION pgmq.send_batch( - queue_name TEXT, - msgs JSONB[], - headers JSONB[], - delay TIMESTAMP WITH TIME ZONE -) RETURNS SETOF BIGINT AS $$ -DECLARE - sql TEXT; - qtable TEXT := pgmq.format_table_name(queue_name, 'q'); -BEGIN - sql := FORMAT( - $QUERY$ - INSERT INTO pgmq.%I (vt, message, headers) - SELECT $2, unnest($1), unnest(coalesce($3, ARRAY[]::jsonb[])) - RETURNING msg_id; - $QUERY$, - qtable - ); - RETURN QUERY EXECUTE sql USING msgs, delay, headers; -END; -$$ LANGUAGE plpgsql; - -- returned by pgmq.metrics() and pgmq.metrics_all CREATE TYPE pgmq.metrics_result AS ( queue_name text, @@ -504,7 +513,9 @@ RETURNS VOID AS $$ DECLARE atable TEXT := pgmq.format_table_name(queue_name, 'a'); BEGIN - EXECUTE format('ALTER EXTENSION pgmq DROP TABLE pgmq.%I', atable); + IF pgmq._extension_exists('pgmq') THEN + EXECUTE format('ALTER EXTENSION pgmq DROP TABLE pgmq.%I', atable); + END IF; END $$ LANGUAGE plpgsql; @@ -601,26 +612,28 @@ BEGIN queue_name ) INTO partitioned; - EXECUTE FORMAT( - $QUERY$ - ALTER EXTENSION pgmq DROP TABLE pgmq.%I - $QUERY$, - qtable - ); + IF pgmq._extension_exists('pgmq') THEN + EXECUTE FORMAT( + $QUERY$ + ALTER EXTENSION pgmq DROP TABLE pgmq.%I + $QUERY$, + qtable + ); - EXECUTE FORMAT( - $QUERY$ - ALTER EXTENSION pgmq DROP SEQUENCE pgmq.%I - $QUERY$, - qtable_seq - ); + EXECUTE FORMAT( + $QUERY$ + ALTER EXTENSION pgmq DROP SEQUENCE pgmq.%I + $QUERY$, + qtable_seq + ); - EXECUTE FORMAT( - $QUERY$ - ALTER EXTENSION pgmq DROP TABLE pgmq.%I - $QUERY$, - atable - ); + EXECUTE FORMAT( + $QUERY$ + ALTER EXTENSION pgmq DROP TABLE pgmq.%I + $QUERY$, + atable + ); + END IF; EXECUTE FORMAT( $QUERY$ @@ -729,13 +742,15 @@ BEGIN atable ); - IF NOT pgmq._belongs_to_pgmq(qtable) THEN - EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable); - EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq); - END IF; + IF pgmq._extension_exists('pgmq') THEN + IF NOT pgmq._belongs_to_pgmq(qtable) THEN + EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable); + EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq); + END IF; - IF NOT pgmq._belongs_to_pgmq(atable) THEN - EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable); + IF NOT pgmq._belongs_to_pgmq(atable) THEN + EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable); + END IF; END IF; EXECUTE FORMAT( @@ -801,13 +816,15 @@ BEGIN atable ); - IF NOT pgmq._belongs_to_pgmq(qtable) THEN - EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable); - EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq); - END IF; + IF pgmq._extension_exists('pgmq') THEN + IF NOT pgmq._belongs_to_pgmq(qtable) THEN + EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable); + EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq); + END IF; - IF NOT pgmq._belongs_to_pgmq(atable) THEN - EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable); + IF NOT pgmq._belongs_to_pgmq(atable) THEN + EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable); + END IF; END IF; EXECUTE FORMAT( @@ -851,18 +868,21 @@ BEGIN END; $$ LANGUAGE plpgsql; -CREATE FUNCTION pgmq._ensure_pg_partman_installed() -RETURNS void AS $$ -DECLARE - extension_exists BOOLEAN; -BEGIN - SELECT EXISTS ( +CREATE FUNCTION pgmq._extension_exists(extension_name TEXT) + RETURNS BOOLEAN + LANGUAGE SQL +AS $$ +SELECT EXISTS ( SELECT 1 FROM pg_extension - WHERE extname = 'pg_partman' - ) INTO extension_exists; + WHERE extname = extension_name +) +$$; - IF NOT extension_exists THEN +CREATE FUNCTION pgmq._ensure_pg_partman_installed() +RETURNS void AS $$ +BEGIN + IF NOT pgmq._extension_exists('pg_partman') THEN RAISE EXCEPTION 'pg_partman is required for partitioned queues'; END IF; END; @@ -910,9 +930,11 @@ BEGIN qtable, partition_col ); - IF NOT pgmq._belongs_to_pgmq(qtable) THEN - EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable); - EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq); + IF pgmq._extension_exists('pgmq') THEN + IF NOT pgmq._belongs_to_pgmq(qtable) THEN + EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', qtable); + EXECUTE FORMAT('ALTER EXTENSION pgmq ADD SEQUENCE pgmq.%I', qtable_seq); + END IF; END IF; -- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md @@ -988,8 +1010,10 @@ BEGIN atable, a_partition_col ); - IF NOT pgmq._belongs_to_pgmq(atable) THEN - EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable); + IF pgmq._extension_exists('pgmq') THEN + IF NOT pgmq._belongs_to_pgmq(atable) THEN + EXECUTE FORMAT('ALTER EXTENSION pgmq ADD TABLE pgmq.%I', atable); + END IF; END IF; -- https://github.com/pgpartman/pg_partman/blob/master/doc/pg_partman.md diff --git a/pgmq-extension/test/expected/base.out b/pgmq-extension/test/expected/base.out index b55060a8..37b27d6b 100644 --- a/pgmq-extension/test/expected/base.out +++ b/pgmq-extension/test/expected/base.out @@ -595,7 +595,7 @@ SELECT * FROM pgmq.create_partitioned( :'retention_interval' ); ERROR: pg_partman is required for partitioned queues -CONTEXT: PL/pgSQL function pgmq._ensure_pg_partman_installed() line 12 at RAISE +CONTEXT: PL/pgSQL function pgmq._ensure_pg_partman_installed() line 4 at RAISE SQL statement "SELECT pgmq._ensure_pg_partman_installed()" PL/pgSQL function pgmq.create_partitioned(text,text,text) line 12 at PERFORM -- With the extension existing, the queue is created successfully