Handling logical replication slots

Logical replication slots are essential to logical replication. Due to space limitations in this chapter, it is unfortunately not possible to cover all aspects of logical replication. However, I want to outline some of the basic concepts that are essential for logical decoding and, therefore, also for logical replication.

If we want to create a replication slot, here's how it works. The function that's needed here takes two parameters: the first one will define the name of the replication slot, while the second one will carry the plugin that will be used to decode the transaction log. It will determine the format PostgreSQL is going to use to return the data:

test=# SELECT * 
FROM pg_create_logical_replication_slot('logical_slot', 'test_decoding');
slot_name | lsn
---------------+---------------
logical_slot
| 0/EF8AD4B0

(1 row)

We can check for the existence of the slot using the same command that we used earlier. To check what a slot really does, a small test can be created:

test=# CREATE TABLE t_demo (id int, name text, payload text);
CREATE TABLE
test=#BEGIN;
BEGIN
test=# INSERT INTO t_demo
VALUES (1, 'hans', 'some data');
INSERT 0 1
test=# INSERT INTO t_demo VALUES (2, 'paul', 'some more data');
INSERT 0 1
test=# COMMIT;
COMMIT
test=# INSERT INTO t_demo VALUES (3, 'joe', 'less data');
INSERT 0 1

Note that two transactions were executed. The changes made to those transactions can now be extracted from the slot:

test=# SELECT pg_logical_slot_get_changes('logical_slot', NULL, NULL);
pg_logical_slot_get_changes
-----------------------------------------------------------------------
(0/EF8AF5B0,606546,"BEGIN 606546")
(0/EF8CCCA0,606546,"COMMIT 606546")
(0/EF8CCCD8,606547,"BEGIN 606547")
(0/EF8CCCD8,606547,"table public.t_demo: INSERT: id[integer]:1
name[text]:'hans' payload[text]:'some data'")
(0/EF8CCD60,606547,"table public.t_demo: INSERT: id[integer]:2
name[text]:'paul' payload[text]:'some more data'")
(0/EF8CCDE0,606547,"COMMIT 606547")
(0/EF8CCE18,606548,"BEGIN 606548")
(0/EF8CCE18,606548,"table public.t_demo: INSERT: id[integer]:3
name[text]:'joe' payload[text]:'less data'")
(0/EF8CCE98,606548,"COMMIT 606548")
(9 rows)

The format that's used here depends on the output plugin we chose previously. There are various output plugins for PostgreSQL, such as wal2json.

If default values are used, the logical stream will contain real values and not just functions. The logical stream has the data that ended up in the underlying tables.

Also, keep in mind that the slot does not return data anymore once it is consumed:

test=# SELECT pg_logical_slot_get_changes('logical_slot', NULL, NULL);
pg_logical_slot_get_changes
-----------------------------
(0 rows)

The result set on the second call is therefore empty. If we want to fetch data repeatedly, PostgreSQL offers the pg_logical_slot_peek_changes function. It works just like the pg_logical_slot_get_changes function but assures that data will still be available in the slot.

Using plain SQL is, of course, not the only way to consume a transaction log. There is also a command-line tool called pg_recvlogical. It can be compared to doing tail -f on an entire database instance and receives the flow of data in real time.

Let's start the pg_recvlogical command:

[hs@zenbook ~]$ pg_recvlogical -S logical_slot -P test_decoding 
-d test -U postgres --start -f -

In this case, the tool connects to the test database and consumes data from logical_slot. -f means that the stream will be sent to stdout. Let's kill some data:

test=# DELETE FROM t_demo WHERE id < random()*10;
DELETE 3

The changes will make it into the transaction log. However, by default, the database only cares about what the table will look like after the deletion. It knows which blocks have to be touched and so on, but it doesn't know what it was previously:

BEGIN 606549
table public.t_demo: DELETE: (no-tuple-data)
table public.t_demo: DELETE: (no-tuple-data)
table public.t_demo: DELETE: (no-tuple-data)
COMMIT 606549

Therefore, the output is pretty pointless. To fix that, the following line comes to the rescue:

test=# ALTER TABLE t_demo REPLICA IDENTITY FULL;
ALTER TABLE

If the table is repopulated with data and deleted again, the transaction log stream will look as follows:

BEGIN 606558 
table public.t_demo: DELETE: id[integer]:1 name[text]:'hans'
payload[text]:'some data'
table public.t_demo: DELETE: id[integer]:2 name[text]:'paul'
payload[text]:'some more data'
table public.t_demo: DELETE: id[integer]:3 name[text]:'joe'
payload[text]:'less data'
COMMIT 606558

Now, all of the changes are in.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset