Skip to content

Commit

Permalink
Add parameter include-column-positions
Browse files Browse the repository at this point in the history
This parameter is useful to detect schema changes (since attnum are
numbered from 1 up). Both formats (1 and 2) are supported although
element names are different ("columnpositions" for format 1 and
"position" for format 2). Default is false.

Issue eulerto#160
  • Loading branch information
eulerto committed Jun 30, 2020
1 parent c54d896 commit caaf9ac
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ MODULES = wal2json
REGRESS = cmdline insert1 update1 update2 update3 update4 delete1 delete2 \
delete3 delete4 savepoint specialvalue toast bytea message typmod \
filtertable selecttable include_timestamp include_lsn include_xids \
include_domain_data_type truncate actions
include_domain_data_type truncate actions position

PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ Parameters
* `include-typmod`: add modifier to types that have it (eg. varchar(20) instead of varchar). Default is _true_.
* `include-type-oids`: add type oids. Default is _false_.
* `include-domain-data-type`: replace domain name with the underlying data type. Default is _false_.
* `include-column-positions`: add column position (_pg_attribute.attnum_). Default is _false_.
* `include-not-null`: add _not null_ information as _columnoptionals_. Default is _false_.
* `pretty-print`: add spaces and indentation to JSON structures. Default is _false_.
* `write-in-chunks`: write after every change instead of every changeset. Default is _false_.
Expand Down
88 changes: 88 additions & 0 deletions expected/position.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
\set VERBOSITY terse
-- predictability
SET synchronous_commit = on;
CREATE TABLE w2j_position (a integer, b integer, primary key(a));
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');
?column?
----------
init
(1 row)

INSERT INTO w2j_position (a, b) VALUES(1,2);
UPDATE w2j_position SET b = 3 WHERE a = 1;
ALTER TABLE w2j_position ADD COLUMN c integer;
ALTER TABLE w2j_position DROP COLUMN b;
INSERT INTO w2j_position (a, c) VALUES(5,6);
UPDATE w2j_position SET c = 7 WHERE a = 5;
-- without include-column-position parameter
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1');
data
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"change":[{"kind":"insert","schema":"public","table":"w2j_position","columnnames":["a","b"],"columntypes":["integer","integer"],"columnvalues":[1,2]}]}
{"change":[{"kind":"update","schema":"public","table":"w2j_position","columnnames":["a","b"],"columntypes":["integer","integer"],"columnvalues":[1,3],"oldkeys":{"keynames":["a"],"keytypes":["integer"],"keyvalues":[1]}}]}
{"change":[]}
{"change":[]}
{"change":[{"kind":"insert","schema":"public","table":"w2j_position","columnnames":["a","c"],"columntypes":["integer","integer"],"columnvalues":[5,6]}]}
{"change":[{"kind":"update","schema":"public","table":"w2j_position","columnnames":["a","c"],"columntypes":["integer","integer"],"columnvalues":[5,7],"oldkeys":{"keynames":["a"],"keytypes":["integer"],"keyvalues":[5]}}]}
(6 rows)

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2');
data
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"action":"B"}
{"action":"I","schema":"public","table":"w2j_position","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"integer","value":2}]}
{"action":"C"}
{"action":"B"}
{"action":"U","schema":"public","table":"w2j_position","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"integer","value":3}],"identity":[{"name":"a","type":"integer","value":1}]}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"I","schema":"public","table":"w2j_position","columns":[{"name":"a","type":"integer","value":5},{"name":"c","type":"integer","value":6}]}
{"action":"C"}
{"action":"B"}
{"action":"U","schema":"public","table":"w2j_position","columns":[{"name":"a","type":"integer","value":5},{"name":"c","type":"integer","value":7}],"identity":[{"name":"a","type":"integer","value":5}]}
{"action":"C"}
(16 rows)

-- with include-column-position parameter
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'include-column-positions', '1');
data
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"change":[{"kind":"insert","schema":"public","table":"w2j_position","columnnames":["a","b"],"columntypes":["integer","integer"],"columnpositions":[1,2],"columnvalues":[1,2]}]}
{"change":[{"kind":"update","schema":"public","table":"w2j_position","columnnames":["a","b"],"columntypes":["integer","integer"],"columnpositions":[1,2],"columnvalues":[1,3],"oldkeys":{"keynames":["a"],"keytypes":["integer"],"keyvalues":[1]}}]}
{"change":[]}
{"change":[]}
{"change":[{"kind":"insert","schema":"public","table":"w2j_position","columnnames":["a","c"],"columntypes":["integer","integer"],"columnpositions":[1,3],"columnvalues":[5,6]}]}
{"change":[{"kind":"update","schema":"public","table":"w2j_position","columnnames":["a","c"],"columntypes":["integer","integer"],"columnpositions":[1,3],"columnvalues":[5,7],"oldkeys":{"keynames":["a"],"keytypes":["integer"],"keyvalues":[5]}}]}
(6 rows)

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'include-column-positions', '1');
data
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"action":"B"}
{"action":"I","schema":"public","table":"w2j_position","columns":[{"name":"a","type":"integer","value":1,"position":1},{"name":"b","type":"integer","value":2,"position":2}]}
{"action":"C"}
{"action":"B"}
{"action":"U","schema":"public","table":"w2j_position","columns":[{"name":"a","type":"integer","value":1,"position":1},{"name":"b","type":"integer","value":3,"position":2}],"identity":[{"name":"a","type":"integer","value":1}]}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"I","schema":"public","table":"w2j_position","columns":[{"name":"a","type":"integer","value":5,"position":1},{"name":"c","type":"integer","value":6,"position":3}]}
{"action":"C"}
{"action":"B"}
{"action":"U","schema":"public","table":"w2j_position","columns":[{"name":"a","type":"integer","value":5,"position":1},{"name":"c","type":"integer","value":7,"position":3}],"identity":[{"name":"a","type":"integer","value":5}]}
{"action":"C"}
(16 rows)

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
?column?
----------
stop
(1 row)

25 changes: 25 additions & 0 deletions sql/position.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
\set VERBOSITY terse

-- predictability
SET synchronous_commit = on;

CREATE TABLE w2j_position (a integer, b integer, primary key(a));

SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');

INSERT INTO w2j_position (a, b) VALUES(1,2);
UPDATE w2j_position SET b = 3 WHERE a = 1;
ALTER TABLE w2j_position ADD COLUMN c integer;
ALTER TABLE w2j_position DROP COLUMN b;
INSERT INTO w2j_position (a, c) VALUES(5,6);
UPDATE w2j_position SET c = 7 WHERE a = 5;

-- without include-column-position parameter
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1');
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2');

-- with include-column-position parameter
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'include-column-positions', '1');
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'include-column-positions', '1');

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
39 changes: 39 additions & 0 deletions wal2json.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ typedef struct
bool include_type_oids; /* include data type oids */
bool include_typmod; /* include typmod in types */
bool include_domain_data_type; /* include underlying data type of the domain */
bool include_column_positions; /* include column numbers */
bool include_not_null; /* include not-null constraints */

bool pretty_print; /* pretty-print JSON? */
Expand Down Expand Up @@ -241,6 +242,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
data->include_type_oids = false;
data->include_typmod = true;
data->include_domain_data_type = false;
data->include_column_positions = false;
data->pretty_print = false;
data->write_in_chunks = false;
data->include_lsn = false;
Expand Down Expand Up @@ -407,6 +409,19 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "include-column-positions") == 0)
{
if (elem->arg == NULL)
{
elog(DEBUG1, "include-column-positions argument is null");
data->include_column_positions = true;
}
else if (!parse_bool(strVal(elem->arg), &data->include_column_positions))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "include-not-null") == 0)
{
if (elem->arg == NULL)
Expand Down Expand Up @@ -899,6 +914,7 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu
StringInfoData colnames;
StringInfoData coltypes;
StringInfoData coltypeoids;
StringInfoData colpositions;
StringInfoData colnotnulls;
StringInfoData colvalues;
char comma[3] = "";
Expand All @@ -909,6 +925,8 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu
initStringInfo(&coltypes);
if (data->include_type_oids)
initStringInfo(&coltypeoids);
if (data->include_column_positions)
initStringInfo(&colpositions);
if (data->include_not_null)
initStringInfo(&colnotnulls);
initStringInfo(&colvalues);
Expand All @@ -933,6 +951,8 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu
appendStringInfo(&coltypes, "%s%s%s\"columntypes\":%s[", data->ht, data->ht, data->ht, data->sp);
if (data->include_type_oids)
appendStringInfo(&coltypeoids, "%s%s%s\"columntypeoids\":%s[", data->ht, data->ht, data->ht, data->sp);
if (data->include_column_positions)
appendStringInfo(&colpositions, "%s%s%s\"columnpositions\":%s[", data->ht, data->ht, data->ht, data->sp);
if (data->include_not_null)
appendStringInfo(&colnotnulls, "%s%s%s\"columnoptionals\":%s[", data->ht, data->ht, data->ht, data->sp);
appendStringInfo(&colvalues, "%s%s%s\"columnvalues\":%s[", data->ht, data->ht, data->ht, data->sp);
Expand Down Expand Up @@ -1081,6 +1101,9 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu

ReleaseSysCache(type_tuple);

if (!replident && data->include_column_positions)
appendStringInfo(&colpositions, "%s%d", comma, attr->attnum);

if (isnull)
{
appendStringInfo(&colvalues, "%snull", comma);
Expand Down Expand Up @@ -1164,6 +1187,8 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu
appendStringInfo(&coltypes, "],%s", data->nl);
if (data->include_type_oids)
appendStringInfo(&coltypeoids, "],%s", data->nl);
if (data->include_column_positions)
appendStringInfo(&colpositions, "],%s", data->nl);
if (data->include_not_null)
appendStringInfo(&colnotnulls, "],%s", data->nl);
if (hasreplident)
Expand All @@ -1178,6 +1203,8 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu
appendStringInfoString(ctx->out, coltypes.data);
if (data->include_type_oids)
appendStringInfoString(ctx->out, coltypeoids.data);
if (data->include_column_positions)
appendStringInfoString(ctx->out, colpositions.data);
if (data->include_not_null)
appendStringInfoString(ctx->out, colnotnulls.data);
appendStringInfoString(ctx->out, colvalues.data);
Expand All @@ -1186,6 +1213,8 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu
pfree(coltypes.data);
if (data->include_type_oids)
pfree(coltypeoids.data);
if (data->include_column_positions)
pfree(colpositions.data);
if (data->include_not_null)
pfree(colnotnulls.data);
pfree(colvalues.data);
Expand Down Expand Up @@ -1738,6 +1767,16 @@ pg_decode_write_tuple(LogicalDecodingContext *ctx, Relation relation, HeapTuple
appendStringInfoString(ctx->out, ",\"optional\":true");
}

/*
* Print position for columns. Positions are only available for new
* tuple (INSERT, UPDATE).
*/
if (kind == PGOUTPUTJSON_CHANGE && data->include_column_positions)
{
appendStringInfoString(ctx->out, ",\"position\":");
appendStringInfo(ctx->out, "%d", attr->attnum);
}

appendStringInfoChar(ctx->out, '}');
}

Expand Down

0 comments on commit caaf9ac

Please sign in to comment.