// Copyright (c) 2021. Huawei Technologies Co., Ltd. All rights reserved.

// Package common define common utils
package common

import (
"bufio"
"context"
"errors"
"io"
"os"
"path"
"strconv"
"strings"
"sync"
"time"

"github.com/gin-gonic/gin"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"

"huawei.com/mindxdl/base/common/constants"
"huawei.com/npu-exporter/hwlog"
)

var logChanMap map[string]chan bool

// LogQueryResp log query response
type LogQueryResp struct {
LogContent string `json:"logContent"`
RowNumber uint64 `json:"rowNumber"`
}

const (
checkServicePeriod = 5
byteToMB = 20
maxLogSize = 50
// FileMode file mode
FileMode = 0640
// FolderMode folder mode
FolderMode = 0750
runningStatus = "Running"
succeededStatus = "Succeeded"

// LogFolder service log folder
LogFolder = "ServiceLogs"
)

// GetLogFilePath get log file path
func GetLogFilePath(podName string, elem ...string) string {
return path.Join(path.Join(elem...), podName+".log")
}

func getStoragePath(b BaseCtx, srcModule string) string {
hwlog.RunLog.Infof("current component is %v", srcModule)
return GetUserPath(b)
}

func getLogParentPath(b BaseCtx, srcModule, namespace string) string {
return path.Join(getStoragePath(b, srcModule), LogFolder, namespace)
}

func getPodLogFilePath(b BaseCtx, srcModule, namespace, podName string) string {
return GetLogFilePath(podName, getLogParentPath(b, srcModule, namespace))
}

func logToFile(podLog io.ReadCloser, logFilePath string, wg *sync.WaitGroup) {
defer wg.Done()
_, err := os.Stat(logFilePath)
if err == nil || os.IsExist(err) {
return
}

logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC|os.O_APPEND, FileMode)
if err != nil {
hwlog.RunLog.Errorf("create %s log file failed", path.Base(logFilePath))
return
}
defer logFile.Close()
logWriter := bufio.NewWriter(logFile)
if _, err := io.Copy(logWriter, podLog); err != nil {
hwlog.RunLog.Errorf("write log file failed, path: %v, err: %v", path.Base(logFilePath), err)
}
if err := logWriter.Flush(); err != nil {
hwlog.RunLog.Errorf("flush %s log file failed", path.Base(logFilePath))
}
}

func closePodLogStream(podLogList map[string]io.ReadCloser) {
for podName, podLog := range podLogList {
if err := podLog.Close(); err != nil {
hwlog.RunLog.Errorf("close %s log reader failed", podName)
}
}
}

func deleteOldLogFile(logDir string, oldPodNameList []string) {
for _, podName := range oldPodNameList {
if err := os.Remove(GetLogFilePath(podName, logDir)); err != nil {
hwlog.RunLog.Errorf("remove log file failed, name: %v err: %v, please delete it manually",
podName, err)
}
}
}

func findFirst(element string, list []string) int {
for i, v := range list {
if element == v {
return i
}
}
return -1
}

// getNewPodNameList return newly added pod name list
// Only will deleted pod names remain in oldPodNameList after this function
func getNewPodNameList(clientSet *kubernetes.Clientset, svcNamespace, svcName string,
oldPodNameList []string) ([]string, []string) {
labelSelector := labels.Set(map[string]string{"app": svcName}).AsSelector().String()
podList, err := clientSet.CoreV1().Pods(svcNamespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return nil, nil
}
var podNameList []string
for _, pod := range podList.Items {
if pod.Status.Phase == runningStatus || pod.Status.Phase == succeededStatus {
if index := findFirst(pod.Name, oldPodNameList); index == -1 {
podNameList = append(podNameList, pod.Name)
} else {
oldPodNameList = append(oldPodNameList[:index], oldPodNameList[index+1:]...)
}
}
}

return podNameList, oldPodNameList
}

func updatePodLogList(clientSet *kubernetes.Clientset, svcNamespace string, podNameList, oldPodNameList []string,
podLogList map[string]io.ReadCloser) {
if podLogList == nil {
podLogList = make(map[string]io.ReadCloser, len(podNameList))
}
opt := &corev1.PodLogOptions{
Follow: true,
}
for _, podName := range podNameList {
logRequest := clientSet.CoreV1().Pods(svcNamespace).GetLogs(podName, opt)
if podLog, err := logRequest.Stream(context.TODO()); err == nil {
podLogList[podName] = podLog
}
}
for _, podName := range oldPodNameList {
if err := podLogList[podName].Close(); err != nil {
hwlog.RunLog.Errorf("close %s log reader failed", podName)
}
delete(podLogList, podName)
}
}

func logRecordGoroutine(svcNameSpace, svcName, logDir string, stopCh <-chan bool) {
var clientSet *kubernetes.Clientset
var cliErr error
podLogList := make(map[string]io.ReadCloser)
var podNameList, oldPodNameList []string
var wg sync.WaitGroup
for {
select {
case <-stopCh:
wg.Wait()
closePodLogStream(podLogList)
deleteOldLogFile(logDir, oldPodNameList)
return
default:
}
if clientSet == nil {
if clientSet, cliErr = K8sClient(""); cliErr != nil {
continue
}
}
podNameList, oldPodNameList = getNewPodNameList(clientSet, svcNameSpace, svcName, oldPodNameList)
updatePodLogList(clientSet, svcNameSpace, podNameList, oldPodNameList, podLogList)
deleteOldLogFile(logDir, oldPodNameList)
oldPodNameList = nil
for podName := range podLogList {
oldPodNameList = append(oldPodNameList, podName)
}

for _, podName := range podNameList {
wg.Add(1)
if _, ok := podLogList[podName]; ok {
go logToFile(podLogList[podName], GetLogFilePath(podName, logDir), &wg)
}
}
time.Sleep(checkServicePeriod * time.Second)
}
}

// StartLogRecord start log record goroutine
func StartLogRecord(svcNamespace, svcName, srcModule string, b BaseCtx) {
logParentDir := getLogParentPath(b, srcModule, svcNamespace)
if err := os.MkdirAll(logParentDir, FolderMode); err != nil {
hwlog.OpLog.WarnfWithCtx(b.Ctx, "log record failed: create the log folder failed, %s", err.Error())
return
}

if logChanMap == nil {
logChanMap = make(map[string]chan bool)
}
mapKey := srcModule + "-" + svcName
logChanMap[mapKey] = make(chan bool, 1)
hwlog.RunLog.InfofWithCtx(b.Ctx, "start to record log of service %v", svcName)
go logRecordGoroutine(svcNamespace, svcName, logParentDir, logChanMap[mapKey])
}

// StopLogRecord stop log record goroutine
func StopLogRecord(svcName, srcModule string, b BaseCtx) {
mapKey := srcModule + "-" + svcName
if _, ok := logChanMap[mapKey]; ok && logChanMap[mapKey] != nil {
logChanMap[mapKey] <- true
close(logChanMap[mapKey])
delete(logChanMap, mapKey)
}
hwlog.RunLog.InfofWithCtx(b.Ctx, "stop to record log of service %v", svcName)
}

func logQuery(logPath string, offset, limit uint64, b BaseCtx) (*LogQueryResp, error) {
logFile, err := os.OpenFile(logPath, os.O_RDONLY, FileMode)
if err != nil {
hwlog.RunLog.ErrorWithCtx(b.Ctx, "Fail to get log file")
return nil, err
}
defer logFile.Close()
logReader := bufio.NewReader(logFile)
logStrs := make([]string, 0, maxLogSize)
var logLineByte []byte
for line := uint64(0); line < offset+limit; line++ {
lineByte, isPrefix, err := logReader.ReadLine()
if err != nil {
if err != io.EOF {
hwlog.RunLog.ErrorfWithCtx(b.Ctx, "An error occurred while reading log file: %v", err)
return nil, err
}
break
}
logLineByte = append(logLineByte, lineByte...)
if isPrefix {
continue
}
if line >= offset {
logStrs = append(logStrs, string(logLineByte))
}
logLineByte = make([]byte, 0)
}
logQueryResp := LogQueryResp{
LogContent: strings.Join(logStrs, "\n"),
RowNumber: uint64(len(logStrs)),
}
return &logQueryResp, nil
}

func getOffsetAndLimit(logOffset, logLimit string, b BaseCtx) (uint64, uint64, string) {
offset, err := strconv.ParseUint(logOffset, BaseHex, BitSize64)
if err != nil {
hwlog.OpLog.ErrorfWithCtx(b.Ctx, "get service log offset convert to integer failed, err: %v", err)
return 0, 0, ParamConvert2IntegerFailed
}
limit, err := strconv.ParseUint(logLimit, BaseHex, BitSize64)
if err != nil {
hwlog.OpLog.ErrorfWithCtx(b.Ctx, "get service log limit convert to integer failed, err: %v", err)
return 0, 0, ParamConvert2IntegerFailed
}
if err := validLogLimitOffset(offset, limit); err != nil {
hwlog.OpLog.ErrorfWithCtx(b.Ctx, "get service log param invalid: %v", err)
return 0, 0, ParamInvalid
}
return offset, limit, Success
}

func validPodNameNamespace(podName, podNamespace string,
search func(name, svcType string, uid uint64) error, b BaseCtx) string {
if ok := ValidName("podName", podName, b); !ok {
hwlog.RunLog.ErrorWithCtx(b.Ctx, "podName invalid")
return ParamInvalid
}
if ok := ValidName("namespace", podNamespace, b); !ok {
hwlog.RunLog.ErrorWithCtx(b.Ctx, "pod namespace invalid")
return ParamInvalid
}
clientSet, cliErr := K8sClient("")
if cliErr != nil {
hwlog.RunLog.ErrorfWithCtx(b.Ctx, "Failed to get customClient err: %s", cliErr.Error())
return GetK8sClientFailed
}
pod, err := clientSet.CoreV1().Pods(podNamespace).Get(context.Background(),
podName, metav1.GetOptions{})
if err != nil {
hwlog.RunLog.ErrorWithCtx(b.Ctx, "the pod does not exist")
return ParamInvalid
}
if svcType, ok := pod.Labels[constants.TrainManageName]; ok {
name, ok := pod.Labels["dbjob-name"]
if !ok {
hwlog.RunLog.ErrorWithCtx(b.Ctx, "failed to get service name of the pod")
return ParamInvalid
}
if err := search(name, svcType, b.HdInfo.UserID); err != nil {
hwlog.RunLog.ErrorWithCtx(b.Ctx, "the pod does not belong to train manager")
return ParamInvalid
}
} else {
name, ok := pod.Labels["app"]
if !ok {
hwlog.RunLog.ErrorWithCtx(b.Ctx, "failed to get service name of the pod")
return ParamInvalid
}
if err := search(name, "", b.HdInfo.UserID); err != nil {
hwlog.RunLog.ErrorfWithCtx(b.Ctx, "the pod does not belong to task manager, %v", err)
return ParamInvalid
}
}

return ""
}

// QueryServiceLog query service log
func QueryServiceLog(srcModule string,
search func(name, svcType string, uid uint64) error, b BaseCtx, c *gin.Context) {
hwlog.OpLog.InfoWithCtx(b.Ctx, "start to query service log")
offset, limit, errCode := getOffsetAndLimit(c.Query("offset"), c.Query("limit"), b)
if errCode != Success {
ConstructResp(c, errCode, "", nil)
return
}
podName := c.Query("podName")
podNamespace := c.Query("namespace")
if err := validPodNameNamespace(podName, podNamespace, search, b); err != "" {
ConstructResp(c, ParamInvalid, "", nil)
return
}
logPath := getPodLogFilePath(b, srcModule, podNamespace, podName)
log, err := logQuery(logPath, offset, limit, b)
if err != nil {
hwlog.OpLog.ErrorfWithCtx(b.Ctx, "query service pod log failed, err: %v", err)
ConstructResp(c, QueryK8sPodLogFailed, "", nil)
return
}
hwlog.OpLog.InfoWithCtx(b.Ctx, "query pod log succeed")
ConstructResp(c, Success, "", log)
}

func downloadLogFile(writer io.Writer, logPath string, b BaseCtx) (int64, error) {
if fi, err := os.Stat(logPath); err == nil {
if fi.Size()>>byteToMB > maxLogSize {
return 0, errors.New("the log file is too large, please download it by other way")
}
}
logFile, err := os.OpenFile(logPath, os.O_RDONLY, FileMode)
if err != nil {
hwlog.RunLog.ErrorWithCtx(b.Ctx, "Fail to get log file")
return 0, err
}
defer logFile.Close()
return io.Copy(writer, logFile)
}

// DownloadServiceLog download log file
func DownloadServiceLog(srcModule string,
search func(name, svcType string, uid uint64) error, b BaseCtx, c *gin.Context) {
podName := c.Query("podName")
podNamespace := c.Query("namespace")
logFileSize := int64(0)
c.Header(ContentDisposition, "attachment; filename="+podName+".log")
c.Header(ContentType, "application/text/plain")
c.Header(AcceptLength, strconv.FormatInt(logFileSize, BaseHex))

if err := validPodNameNamespace(podName, podNamespace, search, b); err != "" {
ConstructResp(c, ParamInvalid, "", nil)
return
}

hwlog.OpLog.InfofWithCtx(b.Ctx, "get pod log podName(%v), namespace(%v)", podName, podNamespace)
logPath := getPodLogFilePath(b, srcModule, podNamespace, podName)
fileSize, err := downloadLogFile(c.Writer, logPath, b)
if err != nil {
hwlog.OpLog.ErrorfWithCtx(b.Ctx, "download service pod log failed, err: %v", err)
ConstructResp(c, DownloadPodLogFileFailed, "", nil)
return
}
c.Header(AcceptLength, strconv.FormatInt(fileSize, BaseHex))
hwlog.OpLog.InfoWithCtx(b.Ctx, "pod log downloaded")
ConstructResp(c, Success, "", nil)
}

最新文章

  1. 【学习笔记&amp;训练记录】数位DP
  2. C# 该行已经属于另一个表 的解决方法[转]
  3. android ExpandableListView实现不同的布局
  4. GitHub使用(一) - 新建个人网站
  5. JQuery自定义插件详解之Banner图滚动插件
  6. Sql 两个表left join 查左表最时间最大的一条记录显示
  7. Java中的换行符
  8. TRY
  9. DML数据操作语言之查询(一)
  10. 《Python 数据库 GUI CGI编程》
  11. 动态ip、静态ip、pppoe拨号的区别
  12. Oracle 12c pdb的数据泵导入导出
  13. 数组方法map(映射),reduce(规约),foreach(遍历),filter(过滤)
  14. KahaDB简介
  15. Ubuntu 14.04 LTS 火狐浏览器中,鼠标选择文字被删除的解决办法
  16. HDU 3226 背包
  17. Depth-first Search-690. Employee Importance
  18. webpack-dev-server的自动更新配置
  19. 在MyEclise中使用自己安装的tomcat
  20. [uwp]ImageSource和byte[]相互转换

热门文章

  1. KingbaseES R6 集群测试job管理测试
  2. SpringBoot_事务总结
  3. Linux云主机安全入侵排查步骤
  4. Odoo自建应用初步总结(一)
  5. ProxySQL(11):链式规则( flagIN 和 flagOUT )
  6. K8S Pod Pending 故障原因及解决方案
  7. 6.Ceph 基础篇 - CephFS 文件系统
  8. kubepi访问
  9. PPR的断管
  10. “kill -9”一时爽,秋后算账泪两行