多节点部署保证HA,分布式锁代码

public class DistributedLock implements Watcher,Runnable{

    private static final Logger logger = LoggerFactory.getLogger(DistributedLock.class);
private int threadId;
private ZKConnector zkClient;
private String selfPath;
private String waitPath;
private String LOG_PREFIX_OF_THREAD;
private AbstractApplicationContext ctx;
private static boolean hascreated = false; //确保连接zookeeper成功
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
//确保每个进程运行结束
private static final CountDownLatch threadSemaphore = new CountDownLatch(Constant.THREAD_NUM); public ZKConnector getZkClient() {
return zkClient;
}
public void setZkClient(ZKConnector zkClient) {
this.zkClient = zkClient;
} public DistributedLock(int id,AbstractApplicationContext context,ZKConnector zkClient){
this.threadId = id;
this.zkClient = zkClient;
LOG_PREFIX_OF_THREAD = Thread.currentThread().getName().concat("_").concat(String.valueOf(Thread.currentThread().getId())); try{
zkClient.createConnection(Constant.ZKSERVER, Constant.SESSION_TIMEOUT);
//GROUP_PATH 不存在的话,由一个线程创建即可
synchronized (threadSemaphore) {
if(!zkClient.exist(Constant.GROUP_PATH)){
zkClient.createPersistNode(Constant.GROUP_PATH, "该节点由线程"+threadId+"创建");
}
}
ctx = context;
}catch(Exception e){
e.printStackTrace();
}
} @Override
public void run() {
getLock();
} @Override
public void process(WatchedEvent event) {
if(event ==null){
return;
}
if(KeeperState.SyncConnected ==event.getState()){
if(EventType.None == event.getType()){
connectedSemaphore.countDown();
}else if(event.getType()==EventType.NodeDeleted && event.getPath().equals(waitPath)){
if(checkMinPath()){
getLockSuccess();
}
} }
} /**
* 获取锁逻辑:
* 首先是上来先在zookeeper上注册一把属于自己的锁,然后修改状态为已创建
* 第二步,检查自己是否是最小id的锁,若是则获取锁,不是则继续等待
*/
private void getLock(){
if(!hascreated){
selfPath = this.getZkClient().createEsquentialNode(Constant.SUB_PATH, "");
hascreated = true;
}
if(checkMinPath()){
getLockSuccess();
}else{
Executor.run(this, 1, 1,TimeUnit.SECONDS);
} } /**
* 检查自己是不是最小路径
* @return
*/
public boolean checkMinPath(){
List<String> subNodes = this.getZkClient().getChildren(Constant.GROUP_PATH);
Collections.sort(subNodes);
//查找"/syncLocks"后面的路径
int index = subNodes.indexOf(selfPath.substring(Constant.GROUP_PATH.length()+1));
switch(index){
case -1:{
return false;
}
case 0:{
return true;
}
default:{
this.waitPath = Constant.GROUP_PATH+"/"+subNodes.get(index-1);
//Logger.info("waitPath: "+waitPath);
this.getZkClient().readData(waitPath);
if(!this.getZkClient().exist(waitPath)){
return checkMinPath();
}
}
}
return false;
} /**
* 获取锁成功
*/
public void getLockSuccess(){
if(!this.getZkClient().exist(selfPath)){
logger.error(LOG_PREFIX_OF_THREAD+"本节点已不存在.");
return;
}
logger.info(LOG_PREFIX_OF_THREAD + "获取锁成功,进行同步工作!"); try{
new Worker(ctx).doWork();
}catch(Exception ex){
logger.info(ex.getMessage());
Executor.run(this, 1, 1, TimeUnit.SECONDS);
return;
} logger.info(LOG_PREFIX_OF_THREAD+"删除本节点:"+selfPath);
this.getZkClient().deleteNode(selfPath);
this.getZkClient().releaseConnection();
threadSemaphore.countDown();
} }

执行同步工作代码

public class Worker {

    private static final Logger logger = LoggerFactory.getLogger(Worker.class);
private static JdbcTemplate jdbcTemplate;
private final ObjectMapper mapper = new ObjectMapper();
private ZKConnector zkClient =null;
private TransportClient client =null;
private Timestamp currentTimestamp = null;
private Timestamp previousTimestamp = null;
private static final String oggSql = "select * from t_order t0 left join t_order_attachedinfo t1 on t0.order_id = t1.order_id where "; private String sql; public String getSql() {
return sql;
} public void setSql(String sql) {
this.sql = sql;
} private TransportClient getClient() {
Settings settings = Settings.settingsBuilder().put("cluster.name", Constant.CLUSTER).build();
TransportClient client = TransportClient.builder().settings(settings).build();
try {
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(Constant.ESHOST), Constant.ESPORT));
} catch (UnknownHostException e) {
e.printStackTrace();
}
return client;
} public Worker(AbstractApplicationContext ctx){
//初始化Oracle连接
jdbcTemplate = (JdbcTemplate) ctx.getBean("jdbcTemplate");
client = getClient();
zkClient = new ZKConnector();
zkClient.createConnection(Constant.ZKSERVER, Constant.SESSION_TIMEOUT); //初始化zookeeper锁,由于zookeeper不能联级创建
if(!zkClient.exist(Constant.ZK_PATH)){
zkClient.createPersistNode(Constant.ZK_PATH,"");
} /**
* 获取zookeeper的最后同步时间
*/
if(currentTimestamp == null){
String zkTimestamp = zkClient.readData(Constant.NODE_PATH);
if(zkTimestamp != null && !zkTimestamp.equals(""))
{
try
{
currentTimestamp = Timestamp.valueOf(zkTimestamp);
logger.info("获取zookeeper最后同步时间: "+currentTimestamp);
}catch(Exception e){
zkClient.deleteNode(Constant.NODE_PATH);
}
}
}
} /**
* 同步work的逻辑:
* 将Oracle里面的规则表同步到缓存当中
* 首先是访问Oracle里面数据,通过访问最小锁里面的同步时间戳,查询出大于同步时间戳的数据
* 如果在zookeeper中获取的时间戳为空,则查询条件增加时间戳,写入存储框架
* 写入成功之后,将最后一条记录的同步时间戳写到zookeeper集群中
* 若写入失败,和zookeeper握手失败,会话锁消失
* 然后导入ElasticSearch中
*/
public void doWork(){
logger.info("start ...");
//一直进行同步工作
while(true){
String sqlwhere = "";
//根据时间戳获取Mycat中规则表数据
String sql = "";
//若最后一次同步时间为空,则按最后更新时间排序,取最小的时间作为当前时间戳
if(currentTimestamp != null){
sql = "select order_id,timestamp from t_order_changes where rownum <= 10 and timestamp > to_timestamp('" + currentTimestamp.toString() + "','yyyy-mm-dd hh24:mi:ss.ff6')";
}else{
sql = "select order_id,timestamp from t_order_changes where rownum <= 10 order by timestamp";
} //查詢该时间段的订单id
List<String> ids = new ArrayList<String>(); //升序会将最后一次的时间也就是最大的时间作为当前的currentTimeStamp
ids = jdbcTemplate.query(sql, new Object[] {}, new RowMapper<String>()
{
public String mapRow(ResultSet result, int rowNum) throws SQLException {
currentTimestamp = result.getTimestamp("timestamp");
return result.getString("order_id");
}
}); if(ids.size() ==0){
continue;
} int i =0;
List<String> checkIds = new ArrayList<String>();
for (String id : ids) {
//若存在更新的id则跳过
if (checkIds.contains(id)) {
continue;
}
if (i == 0) {
sqlwhere = sqlwhere.concat(" t0.order_id = '" + id + "'");
} else {
sqlwhere = sqlwhere.concat(" or t0.order_id = '" + id + "'");
}
checkIds.add(id);
i++;
} System.out.println(oggSql.concat(sqlwhere));
//objs 即是Oracle里面查询出来需要同步的数据
List<JSONObject> objs = jdbcTemplate.query(oggSql.concat(sqlwhere), new Object[] {}, new RowMapper<JSONObject>()
{
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public JSONObject mapRow(ResultSet result, int rowNum) throws SQLException {
int c = result.getMetaData().getColumnCount();
JSONObject obj = new JSONObject(); for(int t =1 ;t <= c;t++)
{
if(result.getObject(t) == null)
{
continue;
}
if(result.getMetaData().getColumnType(t) == Types.DATE)
{
obj.put(result.getMetaData().getColumnLabel(t).toLowerCase(), result.getDate(t));
}else if(result.getMetaData().getColumnType(t) == Types.TIMESTAMP)
{
Date date = new Date(result.getTimestamp(t).getTime());
String f = sdf.format(date);
obj.put(result.getMetaData().getColumnLabel(t).toLowerCase(),sdf.format(date));
}else
{
obj.put(result.getMetaData().getColumnLabel(t).toLowerCase(), result.getObject(t));
}
}
return obj;
}
}); /*for (JSONObject obj : objs) {
System.out.println(obj.toJSONString());
}*/ /**
* 将查询出来的数据写入到elasticsearch中
*/
BulkRequestBuilder bulkRequest =null;
try {
bulkRequest = client.prepareBulk(); for (JSONObject obj : objs) {
byte[] json; try {
json = mapper.writeValueAsBytes(obj);
bulkRequest.add(new IndexRequest(Constant.INDEX, Constant.INDEX, obj.getString("order_id"))
.source(json)); } catch (JsonProcessingException e) {
e.printStackTrace();
}
} BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) {
logger.info("====================批量创建索引过程中出现错误 下面是错误信息==========================");
long count = 0L;
for (BulkItemResponse bulkItemResponse : bulkResponse) {
System.out.println("发生错误的 索引id为 : "+bulkItemResponse.getId()+" ,错误信息为:"+ bulkItemResponse.getFailureMessage());
count++;
}
logger.info("====================批量创建索引过程中出现错误 上面是错误信息 共有: "+count+" 条记录==========================");
currentTimestamp = previousTimestamp;
} else {
logger.info("The lastest currenttimestamp : ".concat(currentTimestamp.toString()));
previousTimestamp = currentTimestamp;
//将写入成功后的时间写到zookeeper中
zkClient.writeData(Constant.NODE_PATH, String.valueOf(currentTimestamp));
} } catch (NoNodeAvailableException e) {
currentTimestamp = previousTimestamp;
e.printStackTrace();
}
} } }

调度工具代码

public class Executor {
private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
public static void run(Runnable r,long init,long delay,TimeUnit u){
service.scheduleWithFixedDelay(r, init, delay, u);
} }

启动类

public class StartRcpSync {

    private static final Logger Logger = LoggerFactory.getLogger(StartRcpSync.class);
private static AbstractApplicationContext appContext = null;
private static String confPath = null; static{
//后续来读取命令中的conf 例如 java -Dconf=conf/*.xml -classpath .:lib/*
if(System.getProperty("conf") !=null){
System.out.println(System.getProperty("user.dir"));
confPath = System.getProperty("conf");
System.out.println("读取配置路径conf目录:"+confPath);
appContext = new FileSystemXmlApplicationContext(confPath.concat("/applicationContext*.xml"));
}else{
confPath = "E:/aa/bb/src/main/resources/conf";
appContext = new FileSystemXmlApplicationContext(confPath.concat("/applicationContext*.xml"));
}
} public static void main(String[] args) {
Logger.info("Sync will starting ...");
//加载配置文件
appContext.registerShutdownHook();
appContext.start();
Logger.info("Sync has been started successfully.");
//获取zookeeper的连接
ZKConnector zkClient = new ZKConnector();
DistributedLock dl = new DistributedLock(new Random().nextInt(),appContext,zkClient);
dl.run();
} //just for Test
public static void DoTask(){
Worker w =new Worker(appContext);
w.doWork();
} }

最新文章

  1. 分析nuget源码,用nuget + nuget.server实现winform程序的自动更新
  2. C语言学习笔记(二)_system系统调用及posix说明
  3. easyui-validatebox 验证两次密码是否输入一致
  4. APP store 官方统计工具的常见的Q&amp;A
  5. Android 完美退出 App (Exit)
  6. 宏汇编软件MASM51的使用
  7. HDU 1003(A - 最大子段和)
  8. centos php nginx 添加到service
  9. Git-最简单的本地项目变成版本仓库,然后把内容推送到GitHub仓库
  10. 计算机四级网络工程师--《操作系统(Operating System)》重点内容学习
  11. TomCat系统架构
  12. python中的类机制
  13. Python-list,字典,Tuple
  14. (转)Redis &amp; EhCache
  15. maven工程聚合和继承的意义
  16. php 生成二维码(qrcode)
  17. preg_match_all使用实例
  18. R语言进行词云统计分析
  19. [LeetCode] Add Two Numbers(stored in List)
  20. 默认权限umask

热门文章

  1. maven学习(十)——maven生命周期以及插件
  2. HLG1125 循环小数2
  3. table中填写数据并批量增加
  4. 0-Android使用Ashmem机制进行跨进程共享内存
  5. 如何通过IP地址分辨公网、私网、内网、外网
  6. python请求带cookie
  7. 为MYSQL加注释--mysql注释符
  8. wsl折腾记
  9. 汽车加油行驶(cogs 737)
  10. pat 团体天梯赛 L3-015. 球队“食物链”