问题 在我们的业务中有一个生产者程序不停地向 message 表写入记录,简化的 message 表的结构如下所示
1 2 3 4 5 6 7 CREATE TABLE message( id SERIAL, content VARCHAR , create_time TIMESTAMPTZ, PRIMARY KEY (id) );
另外有一个消费者程序不停地读取 message 表的数据,这个消费者程序依赖一个称为“水位线”的时间戳,所有创建时间小于等于水位线的消息都已经处理,我们使用 consumer 表来记录当前的水位线
1 2 3 4 5 6 CREATE TABLE consumer( id SERIAL, watermark TIMESTAMPTZ, PRIMARY KEY (id) );
我们会有一个定时任务每隔一段时间调度一次消费者程序,消费者程序的主要逻辑是
检查当前水位线
1 2 3 4 5 test=# SELECT watermark FROM consumer; watermark ------------------------ 2024-04-30 15:16:17+08 (1 row)
使用当前水位线加载待处理消息
1 2 3 4 5 6 test= # SELECT content, create_time FROM message WHERE create_time > '2024-04-30 15:16:17+08' AND create_time <= NOW() ORDER BY create_time; content | create_time aaa | 2024 -04 -30 16 :17 :18.936795 + 08 bbb | 2024 -04 -30 17 :18 :19.956867 + 08 (2 rows )
逐条处理消息,并用当前消息的创建时间更新水位线,最后一条消息处理完成时
1 2 3 4 5 6 7 test= # UPDATE consumer SET watermark = '2024-04-30 17:18:19.956867+08' ; UPDATE 1 test= # SELECT watermark FROM consumer; watermark 2024 -04 -30 17 :18 :19.956867 + 08 (1 row )
就当前的 SQL 版本模拟的逻辑而言在定时任务第 2 轮调度时将没有待处理消息。但是当我们使用 Java 语言实现上面的逻辑时我们发现在定时任务第 2 轮调度时内容为 bbb 的消息会再一次被处理。在 Java 版本的实现里我们使用了 java.util.Date 类型来表示 create_time 和 watermark。