-
Notifications
You must be signed in to change notification settings - Fork 883
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add hypercore chunk tests for triggers
Add tests for statement and row triggers on hypercore chunks. We test `INSERT`, `UPDATE`, `DELETE`, and `TRUNCATE` triggers, where applicable. We check that the `OLD` and `NEW` values are correct as well as that the statement and row triggers are executed the correct number of times.
- Loading branch information
Showing
3 changed files
with
454 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,299 @@ | ||
-- This file and its contents are licensed under the Timescale License. | ||
-- Please see the included NOTICE for copyright information and | ||
-- LICENSE-TIMESCALE for a copy of the license. | ||
\ir include/setup_hypercore.sql | ||
-- This file and its contents are licensed under the Timescale License. | ||
-- Please see the included NOTICE for copyright information and | ||
-- LICENSE-TIMESCALE for a copy of the license. | ||
\set hypertable readings | ||
\ir hypercore_helpers.sql | ||
-- This file and its contents are licensed under the Timescale License. | ||
-- Please see the included NOTICE for copyright information and | ||
-- LICENSE-TIMESCALE for a copy of the license. | ||
-- Function to run an explain analyze with and do replacements on the | ||
-- emitted plan. This is intended to be used when the structure of the | ||
-- plan is important, but not the specific chunks scanned nor the | ||
-- number of heap fetches, rows, loops, etc. | ||
create function explain_analyze_anonymize(text) returns setof text | ||
language plpgsql as | ||
$$ | ||
declare | ||
ln text; | ||
begin | ||
for ln in | ||
execute format('explain (analyze, costs off, summary off, timing off, decompress_cache_stats) %s', $1) | ||
loop | ||
if trim(both from ln) like 'Group Key:%' then | ||
continue; | ||
end if; | ||
ln := regexp_replace(ln, 'Array Cache Hits: \d+', 'Array Cache Hits: N'); | ||
ln := regexp_replace(ln, 'Array Cache Misses: \d+', 'Array Cache Misses: N'); | ||
ln := regexp_replace(ln, 'Array Cache Evictions: \d+', 'Array Cache Evictions: N'); | ||
ln := regexp_replace(ln, 'Heap Fetches: \d+', 'Heap Fetches: N'); | ||
ln := regexp_replace(ln, 'Workers Launched: \d+', 'Workers Launched: N'); | ||
ln := regexp_replace(ln, 'actual rows=\d+ loops=\d+', 'actual rows=N loops=N'); | ||
ln := regexp_replace(ln, '_hyper_\d+_\d+_chunk', '_hyper_I_N_chunk', 1, 0); | ||
return next ln; | ||
end loop; | ||
end; | ||
$$; | ||
create function explain_anonymize(text) returns setof text | ||
language plpgsql as | ||
$$ | ||
declare | ||
ln text; | ||
begin | ||
for ln in | ||
execute format('explain (costs off, summary off, timing off) %s', $1) | ||
loop | ||
ln := regexp_replace(ln, 'Array Cache Hits: \d+', 'Array Cache Hits: N'); | ||
ln := regexp_replace(ln, 'Array Cache Misses: \d+', 'Array Cache Misses: N'); | ||
ln := regexp_replace(ln, 'Array Cache Evictions: \d+', 'Array Cache Evictions: N'); | ||
ln := regexp_replace(ln, 'Heap Fetches: \d+', 'Heap Fetches: N'); | ||
ln := regexp_replace(ln, 'Workers Launched: \d+', 'Workers Launched: N'); | ||
ln := regexp_replace(ln, 'actual rows=\d+ loops=\d+', 'actual rows=N loops=N'); | ||
ln := regexp_replace(ln, '_hyper_\d+_\d+_chunk', '_hyper_I_N_chunk', 1, 0); | ||
return next ln; | ||
end loop; | ||
end; | ||
$$; | ||
create table :hypertable( | ||
metric_id serial, | ||
created_at timestamptz not null unique, | ||
location_id smallint, --segmentby attribute with index | ||
owner_id bigint, --segmentby attribute without index | ||
device_id bigint, --non-segmentby attribute | ||
temp float8, | ||
humidity float4 | ||
); | ||
create index hypertable_location_id_idx on :hypertable (location_id); | ||
create index hypertable_device_id_idx on :hypertable (device_id); | ||
select create_hypertable(:'hypertable', by_range('created_at')); | ||
create_hypertable | ||
------------------- | ||
(1,t) | ||
(1 row) | ||
|
||
-- Disable incremental sort to make tests stable | ||
set enable_incremental_sort = false; | ||
select setseed(1); | ||
setseed | ||
--------- | ||
|
||
(1 row) | ||
|
||
-- Insert rows into the tables. | ||
-- | ||
-- The timestamps for the original rows will have timestamps every 10 | ||
-- seconds. Any other timestamps are inserted as part of the test. | ||
insert into :hypertable (created_at, location_id, device_id, owner_id, temp, humidity) | ||
select t, ceil(random()*10), ceil(random()*30), ceil(random() * 5), random()*40, random()*100 | ||
from generate_series('2022-06-01'::timestamptz, '2022-07-01', '5m') t; | ||
alter table :hypertable set ( | ||
timescaledb.compress, | ||
timescaledb.compress_orderby = 'created_at', | ||
timescaledb.compress_segmentby = 'location_id, owner_id' | ||
); | ||
-- Get some test chunks as global variables (first and last chunk here) | ||
select format('%I.%I', chunk_schema, chunk_name)::regclass as chunk1 | ||
from timescaledb_information.chunks | ||
where format('%I.%I', hypertable_schema, hypertable_name)::regclass = :'hypertable'::regclass | ||
order by chunk1 asc | ||
limit 1 \gset | ||
select format('%I.%I', chunk_schema, chunk_name)::regclass as chunk2 | ||
from timescaledb_information.chunks | ||
where format('%I.%I', hypertable_schema, hypertable_name)::regclass = :'hypertable'::regclass | ||
order by chunk2 asc | ||
limit 1 offset 1 \gset | ||
create table saved_rows (like :chunk1, new_row bool not null, kind text); | ||
create table count_stmt (inserts int, updates int, deletes int); | ||
create function save_row() returns trigger as $$ | ||
begin | ||
if new is not null then | ||
insert into saved_rows select new.*, true, tg_op; | ||
end if; | ||
if old is not null then | ||
insert into saved_rows select old.*, false, tg_op; | ||
end if; | ||
return new; | ||
end; | ||
$$ language plpgsql; | ||
create function save_transition_table() returns trigger as $$ | ||
begin | ||
case tg_op | ||
when 'INSERT' then | ||
insert into saved_rows select n.*, true, tg_op from new_table n; | ||
when 'DELETE' then | ||
insert into saved_rows select o.*, false, tg_op from old_table o; | ||
when 'UPDATE' then | ||
insert into saved_rows select n.*, true, tg_op from new_table n; | ||
insert into saved_rows select o.*, false, tg_op from old_table o; | ||
end case; | ||
end; | ||
$$ language plpgsql; | ||
create function count_ops() returns trigger as $$ | ||
begin | ||
insert into count_stmt values ( | ||
(tg_op = 'INSERT')::int, | ||
(tg_op = 'UPDATE')::int, | ||
(tg_op = 'DELETE')::int | ||
); | ||
return null; | ||
end; | ||
$$ language plpgsql; | ||
create function notify_action() returns trigger as $$ | ||
begin | ||
raise notice 'table % was truncated', tg_table_name; | ||
return null; | ||
end; | ||
$$ language plpgsql; | ||
-- Compress all the chunks and make sure that they are compressed | ||
select compress_chunk(show_chunks(:'hypertable'), hypercore_use_access_method => true); | ||
compress_chunk | ||
---------------------------------------- | ||
_timescaledb_internal._hyper_1_1_chunk | ||
_timescaledb_internal._hyper_1_2_chunk | ||
_timescaledb_internal._hyper_1_3_chunk | ||
_timescaledb_internal._hyper_1_4_chunk | ||
_timescaledb_internal._hyper_1_5_chunk | ||
_timescaledb_internal._hyper_1_6_chunk | ||
(6 rows) | ||
|
||
select chunk_name, compression_status from chunk_compression_stats(:'hypertable'); | ||
chunk_name | compression_status | ||
------------------+-------------------- | ||
_hyper_1_1_chunk | Compressed | ||
_hyper_1_2_chunk | Compressed | ||
_hyper_1_3_chunk | Compressed | ||
_hyper_1_4_chunk | Compressed | ||
_hyper_1_5_chunk | Compressed | ||
_hyper_1_6_chunk | Compressed | ||
(6 rows) | ||
|
||
with the_info as ( | ||
select min(created_at) min_created_at, | ||
max(created_at) max_created_at | ||
from :hypertable | ||
) | ||
select min_created_at, | ||
max_created_at, | ||
'1m'::interval + min_created_at + (max_created_at - min_created_at) as mid_created_at, | ||
'1m'::interval + max_created_at + (max_created_at - min_created_at) as post_created_at | ||
from the_info \gset | ||
-- Insert a bunch of rows to make sure that we have a mix of | ||
-- uncompressed, partially compressed, and fully compressed | ||
-- chunks. Note that there is is an overlap between the compressed and | ||
-- uncompressed start and end timestamps. | ||
insert into :hypertable (created_at, location_id, device_id, owner_id, temp, humidity) | ||
select t, ceil(random()*10), ceil(random()*30), ceil(random() * 5), random()*40, random()*100 | ||
from generate_series(:'mid_created_at'::timestamptz, :'post_created_at', '15m') t; | ||
-- Create a table with some samples that we can re-use to generate conflicts. | ||
create table sample (like :chunk1 including generated including defaults including constraints); | ||
insert into sample(created_at, location_id, device_id, owner_id, temp, humidity) | ||
values | ||
('2022-06-01 00:01:23', 999, 666, 111, 3.14, 3.14), | ||
('2022-06-01 00:02:23', 999, 666, 111, 3.14, 3.14); | ||
-- Start by testing insert triggers for both statements and rows. In | ||
-- this case, the trigger will just save away the rows into a separate | ||
-- table and check that we get the same number of rows with the same | ||
-- values. | ||
create trigger save_insert_row_trg before insert on :chunk1 for each row execute function save_row(); | ||
create trigger count_inserts_trg before insert on :chunk1 for each statement execute function count_ops(); | ||
insert into :chunk1(created_at, location_id, device_id, owner_id, temp, humidity) | ||
select created_at, location_id, device_id, owner_id, temp, humidity from sample; | ||
select * from saved_rows where kind = 'INSERT'; | ||
metric_id | created_at | location_id | owner_id | device_id | temp | humidity | new_row | kind | ||
-----------+------------------------------+-------------+----------+-----------+------+----------+---------+-------- | ||
11525 | Wed Jun 01 00:01:23 2022 PDT | 999 | 111 | 666 | 3.14 | 3.14 | t | INSERT | ||
11526 | Wed Jun 01 00:02:23 2022 PDT | 999 | 111 | 666 | 3.14 | 3.14 | t | INSERT | ||
(2 rows) | ||
|
||
select sum(inserts), sum(updates), sum(deletes) from count_stmt; | ||
sum | sum | sum | ||
-----+-----+----- | ||
1 | 0 | 0 | ||
(1 row) | ||
|
||
-- Run update and upsert tests | ||
create trigger save_update_row_trg before update on :chunk1 for each row execute function save_row(); | ||
create trigger count_update_trg before update on :chunk1 for each statement execute function count_ops(); | ||
update :chunk1 set temp = 9.99 where device_id = 666; | ||
select * from saved_rows where kind = 'UPDATE'; | ||
metric_id | created_at | location_id | owner_id | device_id | temp | humidity | new_row | kind | ||
-----------+------------------------------+-------------+----------+-----------+------+----------+---------+-------- | ||
11525 | Wed Jun 01 00:01:23 2022 PDT | 999 | 111 | 666 | 9.99 | 3.14 | t | UPDATE | ||
11525 | Wed Jun 01 00:01:23 2022 PDT | 999 | 111 | 666 | 3.14 | 3.14 | f | UPDATE | ||
11526 | Wed Jun 01 00:02:23 2022 PDT | 999 | 111 | 666 | 9.99 | 3.14 | t | UPDATE | ||
11526 | Wed Jun 01 00:02:23 2022 PDT | 999 | 111 | 666 | 3.14 | 3.14 | f | UPDATE | ||
(4 rows) | ||
|
||
select sum(inserts), sum(updates), sum(deletes) from count_stmt; | ||
sum | sum | sum | ||
-----+-----+----- | ||
1 | 1 | 0 | ||
(1 row) | ||
|
||
truncate saved_rows; | ||
-- Upsert with conflicts on previously inserted rows | ||
insert into :chunk1(created_at, location_id, device_id, owner_id, temp, humidity) | ||
select created_at, location_id, device_id, owner_id, temp, humidity from sample | ||
on conflict (created_at) do update set temp = 6.66, device_id = 666; | ||
select * from saved_rows where kind = 'UPDATE'; | ||
metric_id | created_at | location_id | owner_id | device_id | temp | humidity | new_row | kind | ||
-----------+------------------------------+-------------+----------+-----------+------+----------+---------+-------- | ||
11525 | Wed Jun 01 00:01:23 2022 PDT | 999 | 111 | 666 | 6.66 | 3.14 | t | UPDATE | ||
11525 | Wed Jun 01 00:01:23 2022 PDT | 999 | 111 | 666 | 9.99 | 3.14 | f | UPDATE | ||
11526 | Wed Jun 01 00:02:23 2022 PDT | 999 | 111 | 666 | 6.66 | 3.14 | t | UPDATE | ||
11526 | Wed Jun 01 00:02:23 2022 PDT | 999 | 111 | 666 | 9.99 | 3.14 | f | UPDATE | ||
(4 rows) | ||
|
||
select sum(inserts), sum(updates), sum(deletes) from count_stmt; | ||
sum | sum | sum | ||
-----+-----+----- | ||
2 | 2 | 0 | ||
(1 row) | ||
|
||
truncate saved_rows, count_stmt; | ||
-- Run delete tests | ||
create trigger save_delete_row_trg before delete on :chunk1 for each row execute function save_row(); | ||
create trigger count_delete_trg before delete on :chunk1 for each statement execute function count_ops(); | ||
delete from :chunk1 where device_id = 666; | ||
select * from saved_rows where kind = 'DELETE'; | ||
metric_id | created_at | location_id | owner_id | device_id | temp | humidity | new_row | kind | ||
-----------+------------------------------+-------------+----------+-----------+------+----------+---------+-------- | ||
11525 | Wed Jun 01 00:01:23 2022 PDT | 999 | 111 | 666 | 6.66 | 3.14 | f | DELETE | ||
11526 | Wed Jun 01 00:02:23 2022 PDT | 999 | 111 | 666 | 6.66 | 3.14 | f | DELETE | ||
(2 rows) | ||
|
||
select sum(inserts), sum(updates), sum(deletes) from count_stmt; | ||
sum | sum | sum | ||
-----+-----+----- | ||
0 | 0 | 1 | ||
(1 row) | ||
|
||
truncate saved_rows; | ||
-- TODO(#1084): Transition tables do not work currently for chunks at | ||
-- all. Once this is implemented, we should get values saved in the | ||
-- saved_rows table, so keeping the function around for the time being | ||
-- and right now just to test that we get a proper error. | ||
\set ON_ERROR_STOP 0 | ||
create trigger save_insert_transition_table_trg | ||
after insert on :chunk1 | ||
referencing new table as new_table | ||
for each statement execute function save_transition_table(); | ||
ERROR: trigger with transition tables not supported on hypertable chunks | ||
create trigger save_update_transition_table_trg | ||
after update on :chunk1 | ||
referencing new table as new_table old table as old_table | ||
for each statement execute function save_transition_table(); | ||
ERROR: trigger with transition tables not supported on hypertable chunks | ||
create trigger save_delete_transition_table_trg | ||
after delete on :chunk1 | ||
referencing old table as old_table | ||
for each statement execute function save_transition_table(); | ||
ERROR: trigger with transition tables not supported on hypertable chunks | ||
\set ON_ERROR_STOP 1 | ||
-- Check truncate trigger | ||
create trigger notify_truncate after truncate on :chunk1 for each statement execute function notify_action(); | ||
truncate :chunk1; | ||
NOTICE: table _hyper_1_1_chunk was truncated |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.