上边有介绍过使用命令行模式的wal2json扩展使用,以下是一个jdbc 集成的试用(pg jdbc 驱动天然支持复制)

环境准备

  • pg(包含wal2json扩展)docker-compose 文件
version: "3"
services: 
   mypg:
     image: dalongrong/wal2json:1.0
     ports: 
     - "5432:5432"
     environment: 
     - "POSTGRES_PASSWORD:dalong"
  • 项目结构
├── README.md
├── docker-compose.yaml
├── example1.sql
├── mypgcdc
│ ├── mypgcdc.iml
│ ├── pom.xml
│ └── src
│ ├── main
│ │ ├── java
│ │ │ └── com
│ │ │ └── dalong
│ │ │ └── Application.java
│ │ └── resources
│ └── test
│ └── java
└── postgresql.conf
 
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.dalong</groupId>
    <artifactId>mypgcdc</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <encoding>UTF-8</encoding>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.9</version>
        </dependency>
    </dependencies>
    <build>
        <!-- Maven Shade Plugin -->
        <finalName>postgres-wal2json-app</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <!-- add Main-Class to manifest file -->
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.dalong.Application</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  • 代码
    Application.java
package com.dalong;
import org.postgresql.PGConnection;
import org.postgresql.PGProperty;
import org.postgresql.replication.PGReplicationStream;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class Application {
    public static void main(String[] args) throws SQLException, InterruptedException {
        String url = "jdbc:postgresql://localhost:5432/postgres";
        Properties props = new Properties();
        PGProperty.USER.set(props, "postgres");
        PGProperty.PASSWORD.set(props, "dalong");
        PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.5");
        PGProperty.REPLICATION.set(props, "database");
        PGProperty.PREFER_QUERY_MODE.set(props, "simple");
        Connection con = DriverManager.getConnection(url, props);
        PGConnection replConnection = con.unwrap(PGConnection.class);
        replConnection.getReplicationAPI()
                .createReplicationSlot()
                .logical()
                .withSlotName("test_slot")
                .withOutputPlugin("wal2json")
                .make();
        PGReplicationStream stream =
                replConnection.getReplicationAPI()
                        .replicationStream()
                        .logical()
                        .withSlotName("test_slot")
                        .start();
        while (true) {
            //non blocking receive message
            ByteBuffer msg = stream.readPending();
            if (msg == null) {
                TimeUnit.MILLISECONDS.sleep(10L);
                continue;
            }
            int offset = msg.arrayOffset();
            byte[] source = msg.array();
            int length = source.length - offset;
            System.out.println(new String(source, offset, length));
        }
    }
}

运行&&测试

  • 启动服务
docker-compose up -d
  • 构建maven 项目
cd mypgcdc
mvn clean pacakge
  • 运行maven 项目
java -jar mypgcdc/target/postgres-wal2json-app.jar
psql -At -f example1.sql -h 127.0.0.1 -U postgres -d postgres
  • 效果

  • 说明
    因为slot 是代码创建的,如果已经存在会报错,解决方法(实际使用需要做判断处理)
 
SELECT 'drop' FROM pg_drop_replication_slot('test_slot');

说明

项目使用的docker 镜像是基于wal2json 1.0 版本构建的,master 分之的一些功能可能会没有,需要的话,可以参考dockerfile 自行修改

参考资料

https://github.com/eulerto/wal2json
https://jdbc.postgresql.org/documentation/head/replication.html
https://github.com/rongfengliang/wal2json-docker-demo
https://github.com/rongfengliang/wal2json-docker

最新文章

  1. linux命令每日一练习 创建新文件 列出文件的时候带着行号
  2. php 经典分页(推荐和laypage配合)
  3. 21世纪C语言(影印版)
  4. C++学习之DLL注入
  5. Android样式的编写格式
  6. vs2010 中检测到有潜在危险的 Request.Form 值
  7. --@angularJS--一个简单的UI-Router路由demo
  8. 《Head First 设计模式》【PDF】下载
  9. MySQL安装与使用过程中的相关问题
  10. HTML 练习js代码位置在head标签
  11. 分支&amp;循环
  12. redis使用问题一:Cannot get Jedis connection; nested exception is redis.clients.jedis.exceptions.JedisException: Could not get a resource from the pool] with root cause
  13. yolo
  14. ubantu16.04搭配新开发环境说明
  15. Jni 线程JNIEnv,JavaVM,JNI_OnLoad(GetEnv返回NULL?FindClass返回NULL?)
  16. debian的python蓝牙库
  17. AutoCompleteTextView 自定义提示样式
  18. ios之CABasicAnimation
  19. 命令行连接mysql服务器时 报Can&#39;t connect to local MySQL server through socket &#39;xxx.sock&#39;错误
  20. c#: using Microsoft.Office.Interop.Excel 异常

热门文章

  1. 重构与模式 (Joshua Kerievsky 著)
  2. vscode10个必装的插件
  3. 优雅的解决springboot Aop @Cacheable this不生效
  4. 【Linux命令】at、crontab定时任务
  5. mysql 8.0 group by 不对的问题
  6. MVC三层架构搭建
  7. 关于前端ajax请求url为何添加一个随机数
  8. 场sharepoint2016数据库恢复站点
  9. php中函数 isset(), empty(), is_null() 的区别,boolean类型和string类型的false判断
  10. web自动化测试