Golang 实现UDPServer并发送消息到ActiveMQ
2024-08-27 14:55:45
示例代码
package main import (
"net"
"os"
"github.com/gpmgo/gopm/modules/goconfig"
"github.com/go-stomp/stomp"
"time"
"strconv"
"log"
"strings"
) // 限制goroutine数量
var limitChan = make(chan bool, ) // Todo 从配置文件中读取 // 限制同时处理消息数量
var msgChan = make(chan string, ) // Todo 从配置文件中读取
var activeMqLimitedChan = make(chan bool, )
var activeMq *stomp.Conn
var activeQueue string
var host string
var port string
var connectTimes =
var udpAddress = "0.0.0.0" // Todo 从配置文件中读取
var udpPort = "" // Todo 从配置文件中读取
var logFilePath = "/var/log/syslog_server/"
var configFilePath = "./config.ini"
// UDP goroutine 实现并发读取UDP数据
func udpProcess(conn *net.UDPConn) {
defer func() {
if e := recover(); e != nil {
// 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[] + ".log"
logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, ) // 应该判断error,此处简略
defer logFile.Close()
if err != nil {
log.Fatalln("open log file error: ", err.Error())
}
logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile) // 记录错误日志
logger.Println("udpProcess error:", e)
}
// 释放出一个协程
<- limitChan
}() // 最大读取数据大小
data := make([]byte, )
n, _, err := conn.ReadFromUDP(data)
if err != nil {
panic(err)
} // 获取对端的IP地址
// remoteAddr := conn.RemoteAddr()
// msgChan <- remoteAddr.String() + " " + string(data[:n]) msgChan <- string(data[:n]) } func udpServer(address, port string) {
// @todo 如何防止udpServer 一直Panic导致无限循环重启
defer func() {
if e := recover(); e != nil {
// 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[] + ".log"
logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, ) // 应该判断error,此处简略
defer logFile.Close()
if err != nil {
log.Fatalln("open log file error: ", err.Error())
}
logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile) // 记录错误日志
logger.Println("udpServer error:", e) // udpServer启动失败后,间隔10秒后重试
time.Sleep( * time.Second)
udpServer(udpAddress, udpPort)
}
}() udpAddr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(address, port))
conn, err := net.ListenUDP("udp", udpAddr)
defer conn.Close()
if err != nil {
panic(err)
} for {
limitChan <- true
go udpProcess(conn)
}
} // 读取ActiveMQ配置信息
func getConfiguration(){
defer func() {
if e := recover(); e != nil {
// 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[] + ".log"
logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, ) // 应该判断error,此处简略
defer logFile.Close()
if err != nil {
log.Fatalln("open log file error: ", err.Error(), ", programing exit.")
os.Exit()
}
logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile) // 记录错误日志
logger.Println("Get Configuration error:", e)
}
}() configFile, err := goconfig.LoadConfigFile(configFilePath)
if err != nil {
panic(err)
} host, err = configFile.GetValue("active_mq", "host")
if err != nil {
// 如果没有配置主机,则使用本地主机
host = "127.0.0.1"
}
port, err = configFile.GetValue("active_mq", "port")
if err != nil {
// 如果没配置端口,则使用默认端口
port = ""
} activeQueue, err = configFile.GetValue("active_mq", "queue")
if err != nil {
// 如果没配置端口,则使用默认队列名
activeQueue = "syslog.queue"
}
} // 使用IP和端口连接到ActiveMQ服务器, 返回ActiveMQ连接对象
func connActiveMq(){
// @todo 如何防止无限循环
defer func() {
if e := recover(); e != nil {
// 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[] + ".log"
logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, ) // 应该判断error,此处简略
defer logFile.Close()
if err != nil {
log.Fatalln("open log file error: ", err.Error())
}
logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile) // 记录错误日志
logger.Println("connActiveMq error:", e) // ActiveMQ服务器连接失败后,间隔3秒后重试
time.Sleep( * time.Second)
activeMq = nil
connActiveMq()
}
}() // @todo 实现断开重连
if activeMq == nil {
var err error
activeMq, err = stomp.Dial("tcp", net.JoinHostPort(host, port))
if err != nil {
connectTimes ++
if connectTimes >= {
time.Sleep( * time.Second)
}else if connectTimes >= {
time.Sleep( * time.Second)
}else {
time.Sleep( * time.Second)
}
panic(err.Error() + ", 重新连接ActiveMQ, 已重试次数: " + strconv.Itoa(connectTimes)) }else {
connectTimes =
}
}
} func activeMqProducer(c chan string){
// @todo 如何防止activeMqProducer 退出
defer func() {
if e := recover(); e != nil {
// 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[] + ".log"
logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, ) // 应该判断error,此处简略
defer logFile.Close()
if err != nil {
log.Fatalln("open log file error: ", err.Error())
}
logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile) // 记录错误日志
logger.Println("activeMqProducer error:", e) // 重试
go activeMqProducer(msgChan)
}
}()
for{
activeMqLimitedChan <- true // 限制开启协程数量
contentMsg := <-c
go func() {
defer func() {
if e := recover(); e != nil {
err := os.MkdirAll(logFilePath, )
log.Fatalln("create log dirctory error: ", err.Error())
// 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[] + ".log"
logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, ) // 应该判断error,此处简略
defer logFile.Close()
if err != nil {
log.Fatalln("open log file error: ", err.Error())
}
logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile) // 记录错误日志
logger.Println("activeMqProducer error:", e)
}
// 释放出一个协程
<- activeMqLimitedChan
}() err := activeMq.Send(activeQueue, "text/plain", []byte(contentMsg))
if err != nil {
if err.Error() == "connection already closed"{
activeMq = nil
connActiveMq()
activeMq.Send(activeQueue, "text/plain", []byte(contentMsg))
}
panic(err)
}
}() } } func init(){
// 初始化 ActiveMQ 配置
getConfiguration() // 连接到 ActiveMQ 服务器
connActiveMq() // 启动一个协程将Syslog消息放入ActiveMQ队列中
go activeMqProducer(msgChan) } func main() {
defer activeMq.Disconnect()
udpServer(udpAddress, udpPort)
}
最新文章
- 李洪强iOS经典面试题141-报错警告调试
- [IOS初学]ios 第一篇 storyboard 与viewcontroller的关系
- CSS 魔法系列:纯 CSS 绘制图形(各种形状的钻石)
- 类似区间计数的种类并查集两题--HDU 3038 &; POJ 1733
- python随文档
- RHEL6.4找回root密码的方法
- asp.net正则表达式过滤标签和数据提取
- 简单3d RPG游戏 之 005 选择敌人
- HDU 5071 Chat(2014鞍山B,模拟)
- jquery easyui treegrid使用小结
- vlc_input buffer管理 &; 时钟同步(转)
- Django Middleware简介
- ORACLE设置自启动记录
- 大量的rcuob进程
- Linux系统VIM编辑器管理(2)
- vue父组件调用子组件资源
- bzoj 4177 Mike的农场
- CodeChef SADPAIRS:Chef and Sad Pairs
- CodeForces 154A Hometask dp
- 学生选课数据库SQL语句练习题