跟朋友聊天,说输出的时间不对,之前测试没关注到这个,然后就在processing模式下看了下,发现时间确实不正确
  
  然后就debug,看问题在哪,最终分析出了原因,记录如下:
  
    最下面给出了复现方案及原因分析
  
  let me show how to generate the wrong result
  
  background: processing time in tumbling window flink:1.5.0
  
  the invoke stack is as follows:
  
  [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747)
  
  [2] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:53)
  
  [3] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74)
  
  [4] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72)
  
  [5] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39)
  
  [6] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46)
  
  [7] org.apache.flink.www.trgj888.com streaming.runtime.operators.www.gcyL157.com windowing.WindowOperator.emitWindowContents (WindowOperator.java:550)
  
  [8] org.apache.flink.www.mingcheng178.com streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505)
  
  [9] org.apache.flink.www.yongshiyule178.com streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266)
  
  [10] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281)
  
  [11] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
  
  [12] java.util.concurrent.FutureTask.run (FutureTask.java:266)
  
  [13] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180)
  
  [14] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293)
  
  [15] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142)
  
  [16] java.util.www.yigouyule2.cn concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617)
  
  [17] java.lang.Thread.run (Thread.java:www.michenggw.com 748)
  
  now ,we are at [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747)
  
  and the code is as follows:
  
  public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - LOCAL_TZ.getOffset(v)); }
  
  let us print the value of windowStart:v
  
  print v
  
  v = 1544074830000
  
  let us print the value of windowEnd:v
  
  print v
  
  v = 1544074833000
  
  after this, come back to
  
  [1] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:51)
  
  then,we will execute
  
  `
  
  if (windowStartOffset.isDefined) {
  
  output.setField(www.mhylpt.com
  
  lastFieldPos + windowStartOffset.get,
  
  SqlFunctions.internalToTimestamp(windowStart))
  
  }
  
  if (windowEndOffset.isDefined) {
  
  output.setField(
  
  lastFieldPos + windowEndOffset.get,
  
  SqlFunctions.internalToTimestamp(windowEnd))
  
  }
  
  `
  
  before execute,the output is
  
  output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null"
  
  after execute,the output is
  
  output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 05:40:30.0,2018-12-06 05:40:33.0,null"
  
  so,do you think the
  
  long value 1544074830000 translated to be 2018-12-06 05:40:30.0
  
  long value 1544074833000 translated to be 2018-12-06 05:40:33.0
  
  would be right?
  
  I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 2018-12-06 13:40:33.0
  
  okay,let us continue
  
  now ,the data will be write to kafka,before write ,the data will be serialized
  
  let us see what happened!
  
  the call stack is as follows:
  
  [1] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp (DateSerializer.java:41) [2] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:48) [3] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:15) [4] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue (DefaultSerializerProvider.java:130) [5] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue (ObjectMapper.java:2,444) [6] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree (ObjectMapper.java:2,586) [7] org.apache.flink.formats.json.JsonRowSerializationSchema.convert (JsonRowSerializationSchema.java:189) [8] org.apache.flink.formats.json.JsonRowSerializationSchema.convertRow (JsonRowSerializationSchema.java:128) [9] org.apache.flink.formats.json.JsonRowSerializationSchema.serialize (JsonRowSerializationSchema.java:102) [10] org.apache.flink.formats.json.JsonRowSerializationSchema.serialize (JsonRowSerializationSchema.java:51) [11] org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue (KeyedSerializationSchemaWrapper.java:46) [12] org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke (FlinkKafkaProducer010.java:355) [13] org.apache.flink.streaming.api.operators.StreamSink.processElement (StreamSink.java:56) [14] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560) [15] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535) [16] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515) [17] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [18] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [19] org.apache.flink.streaming.api.operators.StreamMap.processElement (StreamMap.java:41) [20] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560) [21] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535) [22] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515) [23] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [24] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [25] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51) [26] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:37) [27] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:28) [28] DataStreamCalcRule$88.processElement (null) [29] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:66) [30] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:35) [31] org.apache.flink.streaming.api.operators.ProcessOperator.processElement (ProcessOperator.java:66) [32] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560) [33] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535) [34] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515) [35] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [36] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [37] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51) [38] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:65) [39] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74) [40] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72) [41] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39) [42] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46) [43] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents (WindowOperator.java:550) [44] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505) [45] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266) [46] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281) [47] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) [48] java.util.concurrent.FutureTask.run (FutureTask.java:266) [49] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180) [50] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) [51] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142) [52] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617) [53] java.lang.Thread.run (Thread.java:748)
  
  and the code is as follows:
  
  protected long _timestamp(Date value) { return value == null ? 0L : value.getTime(); }
  
  here,use windowEnd for example,the value is
  
  value = "2018-12-06 05:40:33.0"
  
  value.getTime() = 1544046033000
  
  see,the initial value is 1544074833000 and the final value is 1544046033000
  
  the minus value is 28800000, ---> 8 hours ,because I am in China.
  
  why? the key reason is SqlFunctions.internalToTimestamp
  
  public static Timestamp internalToTimestamp(long v)
  
  {
  
  return new Timestamp(v - LOCAL_TZ.getOffset(v));
  
  }
  
  in the code, It minus the LOCAL_TZ , I think it is redundant!
  
  刚才又看了下,其实根本原因就是时间转换来转换去,没有用同一个类,用了2个类的方法
  
  结果就乱套了,要改的话就是SqlFunctions的那个类

最新文章

  1. Broadmann分区
  2. eclipse字体颜色设置
  3. Install eclipse groovy plugin
  4. U8800安装软件显示无效的URI问题
  5. MacOSX 中如何动态隐藏Dock Icon
  6. 201521123107 《Java程序设计》第1周学习总结
  7. java小白之面向对象
  8. windows安装mycat(转)
  9. linux下配置tomcat集群的负载均衡
  10. C#(.net)水印图片的生成
  11. 【python】Python框架、库和软件资源大全
  12. Python 全栈开发:str(字符串)索引和切片
  13. 几何+思维 Samara University ACM ICPC 2016-2017 Quarterfinal Qualification Contest K. Revenge of the Dragon
  14. jquery ajax、get、post实例
  15. 用js写三级联动
  16. Python之tuple的创建以及使用
  17. io_service work 的作用
  18. fzu 2204 7 dp
  19. 【airtest, python】报错:requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(54, 'Connection reset by peer')),解决方法如下
  20. Mysql—mysqladmin 命令详解

热门文章

  1. python逻辑运算(not,and,or)总结
  2. python之doctest的用法
  3. python之获取微信好友列表并保存文档中
  4. C++基础 new和delete
  5. HDU 6274 二分+预处理(CCPC K题
  6. [Codeforces967C]Stairs and Elevators(二分查找)
  7. Spring使用mutipartFile上传文件报错【Failed to instantiate [org.springframework.web.multipart.MultipartFile]】
  8. 菜鸟教程perl总结
  9. 笔记-python-多环境-virtualenv
  10. PHP.36-TP框架商城应用实例-后台12-商品管理-主分类添加、修改、搜索(连表查询)