package com.sxsoft.admin.Component;

import com.alibaba.fastjson.JSON;
import io.netty.handler.codec.http.HttpHeaders;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.yeauty.annotation.*;
import org.yeauty.pojo.Session;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet; /**
* @program: sxsoft_bidlater_guarantee
* @ClassName WebSocket
* @description: 进行群推送以及单点推送 部署时docker要配置端口(docker-compose.yml),并且服务器要允许端口开放外网。websocket端口与主程序端口要区分开来
* @create: 2023-01-05 09:13
* @Version 1.0
**/
//调用如(http是ws开头,https是wss开头): ws://localhost:8816/ws/bhbl/123
@Slf4j
@Component
@ServerEndpoint(path = "/ws/bhbl/{userId}", port = "${sxsoft.data.app-config.netty-websocket.port}")
public class NettyWebSocket { //无参构造
// public NettyWebSocket() {
// System.out.println("通过这里可以看到 每次请求过来都会创建");
// } //region 内部私有方法
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session; //连接用户ID
private String userId; //虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
//NettyWebSocket是当前类名
private static CopyOnWriteArraySet<NettyWebSocket> webSockets =new CopyOnWriteArraySet<>();
// concurrent包的线程安全Set,用来存在线连接用户信息
private static ConcurrentHashMap<String,Session> sessionPool = new ConcurrentHashMap<String,Session>(); //加载静态文件
// static {
// String roomStr = FileToJsonUtils.readData("/config/room.json");
// JSONArray rooms = JSON.parseArray(roomStr);
// for (Object object : rooms) {
// JSONObject room = (JSONObject) object;
// roomMap.put(room.getString("code"), room);
// }
// } @BeforeHandshake
public void handshake(Session session, HttpHeaders headers,@PathVariable String arg, @PathVariable String group) {
session.setSubprotocols("stomp");
} /**
* 建立连接
* @param session
* @param headers
*/
@OnOpen
public void onOpen(Session session, HttpHeaders headers, @PathVariable String userId) {
//判断是否有此群组,没有则关闭连接
try {
this.session = session;
this.userId = userId;
webSockets.add(this);
sessionPool.put(userId, session);
log.info("【websocket消息】有新的连接,总数为:{},userid:{}",webSockets.size(),userId);
} catch (Exception e) {
e.printStackTrace();
}
} /**
* 关闭连接
* @param session
*/
@OnClose
public void onClose(Session session){
try {
webSockets.remove(this);
sessionPool.remove(this.userId);
log.info("【websocket消息】连接断开,总数为:"+webSockets.size());
} catch (Exception e) {
e.printStackTrace();
}
} /**
* 错误回调
* @param session
* @param throwable
*/
@OnError
public void onError(Session session, Throwable throwable) {
log.error("用户错误,原因:"+throwable.getMessage());
throwable.printStackTrace();
} /**
* 接收消息
* @param message
*/
@OnMessage
public void onMessage(String message) {
log.info("【websocket消息】收到客户端消息:"+message);
} /**
* 绑定数据流
* @param session
* @param bytes
*/
@OnBinary
public void onBinary(Session session, byte[] bytes) {
session.sendBinary(bytes);
} /**
* 绑定事件
* @param session
* @param evt
*/
@OnEvent
public void onEvent(Session session, Object evt) {
// if (evt instanceof IdleStateEvent) {
// IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
// switch (idleStateEvent.state()) {
// case READER_IDLE:
// System.out.println("read idle");
// break;
// case WRITER_IDLE:
// System.out.println("write idle");
// break;
// case ALL_IDLE:
// System.out.println("all idle");
// break;
// default:
// break;
// }
// }
}
//endregion //region 消息发送 /**
* 广播消息
* @param message
*/
public static void sendAllMessage(String message) {
log.info("【websocket消息】广播消息:"+message);
try {
for(NettyWebSocket webSocket : webSockets) {
if(webSocket.session.isOpen()) {
webSocket.session.sendText(message);
}
}
} catch (Exception e) {
e.printStackTrace();
}
} /**
* 单点消息
* @param userId
* @param message
*/
public static void sendOneMessage(String userId, String message) {
try {
log.info("【websocket消息】 单点消息:"+message);
Session session = sessionPool.get(userId);
log.info("【websocket消息】 单点消息,当前在线投标人session:{}", JSON.toJSONString(session));
if (session != null && session.isOpen()) {
session.sendText(message);
}
} catch (Exception e) {
e.printStackTrace();
}
} /**
* 单点消息(多人)
* @param userIds
* @param message
*/
public static void sendMoreMessage(String[] userIds, String message) {
try { log.info("【websocket消息】 单点消息(多人),userids={},message={}", JSON.toJSONString(userIds),message);
for(String userId:userIds) {
Session session = sessionPool.get(userId);
log.info("【websocket消息】 单点消息(多人),当前在线审核人session:{}", JSON.toJSONString(session));
if (session != null && session.isOpen()) {
session.sendText(message);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
//endregion }
        <!--netty-websocket-spring-boot-->
<dependency>
<groupId>org.yeauty</groupId>
<artifactId>netty-websocket-spring-boot-starter</artifactId>
<version>0.9.5</version>
</dependency>
<!-- netty工具类 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>

最新文章

  1. Core Java 总结(异常类问题)
  2. HTTP请求 GET POST 网络编程实现(转)
  3. WindowsPhone-GameBoy模拟器开发四--Gameboy显示系统分析
  4. spark streaming中使用flume数据源
  5. python matplotlib plot 数据中的中文无法正常显示的解决办法
  6. [Bhatia.Matrix Analysis.Solutions to Exercises and Problems]ExI.5.1
  7. MYSQL 索引页 结构图
  8. RFM模型+SOM聚类︱离群值筛选问题
  9. python3 动态import
  10. Spring MVC 使用介绍(六)—— 注解式控制器(二):请求映射与参数绑定
  11. AX2009 批处理作业中使用多线程---批量捆绑
  12. SpringCloud无废话入门02:Ribbon负载均衡
  13. Linux : Vim 使用与配置 (附 GitHub 自动化配置脚本)
  14. ldap集成x-pack
  15. ps记录
  16. ecplice中去掉提示信息的步骤
  17. flutter实现(OutlineButton)线框按钮
  18. Linux中网络通信中 使用的结构体
  19. SpringData修改和删除操作
  20. Spring事务不起作用原因

热门文章

  1. Study for Go ! Chapter two - Expression
  2. 同一ip地址不同的主机冲突解决
  3. windows-sam文件
  4. 面试笔记1-redis
  5. lg8365题解
  6. Docker中安装Gitlab详细全教程
  7. java开发细节
  8. Oracle 详细-创建用户并导入sql文件
  9. 【Frida】调试js代码
  10. CH582m模拟JoyStick使用USB与电脑通信