在 PostgreSQL 如何查询连续出现的记录
场景描述
我们有一个系统在不停接收其他系统发来的消息,我们的任务任务是查询连续出现两次及以上的消息。下面是一些例子
- 假设依次收到的消息是 A 则结果为空;
- 假设依次收到的消息是 AB 则结果为空;
- 假设依次收到的消息是 AA 则结果为 AA;
- 假设依次收到的消息是 AABB 则结果为 AABB;
- 假设依次收到的消息是 ABB 则结果为 BB;
- 假设依次收到的消息是 AAB 则结果为 AA;
- 假设依次收到的消息是 AABCC 则结果为 AACC;
- 假设依次收到的消息是 AABCDD 则结果为 AADD;
- 假设依次收到的消息是 AABAACC 则结果为 AAAACC;
- 假设依次收到的消息是 AABAABCC 则结果为 AAAACC;
- 假设依次收到的消息是 AABAACDD 则结果为 AAAADD;
- 假设依次收到的消息是 AABCCBDD 则结果为 AACCDD;
数据准备
下面我们创建一张消息表 message
来模拟这种情况
1 | CREATE TABLE message ( |
在这份样例数据里,没有连续出现两次及以上的记录是第 14 行 Bob 发送的消息 C,第 21 行 Bob 发送的消息 E 和第 24 行 Alice 发送的消息 B。
方法一
对于任意一条记录来说,它的内容连续出现两次及以上的条件是它的内容要么和前一条记录的内容一样,要么和后一条记录的内容一样。在 PostgreSQL 中可以通过 LAG
函数获取前一条记录的内容,通过 LEAD
函数获取后一条记录的内容
1 | SELECT *, |
这条语句执行的结果如下表所示
1 | +--+------+-------+-------------------+------------+------------+ |
我们只需要找到 content
要么等于 prev_content
要么等于 next_content
的记录,这些记录就是连续出现两次及以上的记录
1 | SELECT id, sender, content, create_at |
这条语句执行的结果如下表所示
1 | +--+------+-------+-------------------+ |
连续出现三次及以上的记录
对于任意一条记录来说,它的内容连续出现三次及以上的条件是它的内容要么和前两条记录的内容一样,要么和后两条记录的内容一样,要么和前一条和后一条的内容都一样。LAG
函数的完整定义为 lag(value anyelement [, offset integer [, default anyelement ]])
,它在分区内当前行的之前 offset
个位置的行上计算,offset
的默认值为 1。可以将 offset
的值设为 2 从而得到在当前记录前面第二条的记录。LEAD
函数是类似的。
1 | SELECT id, sender, content, create_at |
这条语句执行的结果如下表所示
1 | +--+------+-------+-------------------+ |
方法二
方法一如果要查询连续出现更多次的记录则需要更多的 LAG
和 LEAD
函数,如果把连续出现的次数作为一个参数则需要动态的拼接查询语句。下面来实现一种不需要动态拼接查询语句即可实现连续出现任意次数的需求。
观察前面的样例数据,一种直观想法是,为了查询连续出现的记录,只需要根据消息内容分组,统计每个分组的数量,筛选出数量大于等于 2 的那些分组即是我们需要的结果。但是这种实现方式无法处理 AABCCBDD 这种情形,按照这种实现方式 B 将被保留,结果为 AABCCBDD,而我们期望的结果为 AACCDD。因此需要寻找一种根据其他虚拟(或计算)字段进行分组的方法。
为了比较前后两条消息是否有变化,需要使用 LAG
函数获取当前消息的前一条消息
1 | SELECT *, LAG(content) OVER(PARTITION BY sender ORDER BY create_at) AS prev_content |
接下来是判断当前消息相对于前一条消息是否有变化,需要用到 CASE ... WHEN ... THEN ... ELSE ... END
表达式,0 表示没有变化,1 表示有变化
1 | SELECT *, CASE WHEN content = prev_content THEN 0 ELSE 1 END AS is_change |
这条语句执行的结果如下表所示
1 | +--+------+-------+-------------------+------------+---------+ |
现在使用 SUM
窗口函数对 is_change
列求和,求和的结果作为分组 ID
1 | SELECT *, SUM(is_change) OVER(PARTITION BY sender ORDER BY create_at) AS group_id |
这条语句执行的结果如下表所示
1 | +--+------+-------+-------------------+------------+---------+--------+ |
接下来使用新的分组 ID 统计每个分组的数量
1 | SELECT *, COUNT(*) OVER(PARTITION BY sender, group_id) AS group_count |
这条语句执行的结果如下表所示
1 | +--+------+-------+-------------------+------------+---------+--------+-----------+ |
在上面结果的基础上排除分组数量为 1 的记录
1 | SELECT id, sender, content, create_at |
这个查询语句可以把 CASE
表达式写在 SUM
函数里面从而简化一下语句
1 | SELECT id, sender, content, create_at |
这种方法可以通过调节每个分组中的记录数即可实现消息必须连续出现任意次数的需求,比如改为 group_count > 2
,就可以实现消息必须连续出现三次及以上的需求。
使用全局行号和局部行号的差作为分组的 ID
1 | WITH rowed AS ( |
这条语句执行的结果如下表所示
1 | +--+------+-------+-------------------+--------------+-------------+--------+ |
这种方式的一个问题是在同一大分区内 global_row_num - local_row_num
的结果后面的是否会和前面的重复,比如第 6~8 行的结果为 2,在若干行后,比如在第 14~15 行会不会也出现结果为 2。答案是不会的!下面是 ChatGPT 提供的证明结果
在上面查询语句的基础上使用 COUNT
窗口函数并筛选数量大于 1 的记录即可
1 | WITH rowed AS ( |
除此之外还可以不使用 COUNT
窗口函数,只使用普通的 COUNT
函数并使用 JOIN
的方式也可以实现相同的结果
1 | WITH grouped AS ( |