solr源码解读(转)原文地址:http://blog.csdn.net/duck_genuine/article/details/6962624

配置

solr 对一个搜索请求的的流程

在solrconfig.xml会配置一个handler。配置了前置处理组件preParams,还有后置处理组件filterResult,当然还有默认的组件

  1. <requestHandler name="standard" class="solr.SearchHandler" default="true">
  2. <arr name="first-components">
  3. <str>preParams</str>
  4. </arr>
  5. <lst name="defaults">
  6. <str name="echoParams">explicit</str>
  7. <int name="rows">10</int>
  8. <int name="start">0</int>
  9. <str name="q">*:*</str>
  10. </lst>
  11. <arr name="last-components">
  12. <str>filterResult</str>
  13. </arr>
  14. </requestHandler>

http请求控制器

当一个查询请求过来的时候,先到类SolrDispatchFilter,由这个分发器寻找对应的handler来处理。

  1. String qt = solrReq.getParams().get( CommonParams.QT );
  2. handler = core.getRequestHandler( qt );

---------------------------------------------------------------------------------------------------

  1. this.execute( req, handler, solrReq, solrRsp );
  2. HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);

-----------------------------------------------------------------------------------------------

从上面的代码里看出是由solrCore留下的接口来处理请求。从代码框架上,从此刻开始进入solr的核心代码。

  1. protected void execute( HttpServletRequest req, SolrRequestHandler handler, SolrQueryRequest sreq, SolrQueryResponse rsp) {
  2. sreq.getContext().put( "webapp", req.getContextPath() );
  3. sreq.getCore().execute( handler, sreq, rsp );
  4. }

看一下solrCore代码execute的方法 的主要代码

  1. public void execute(SolrRequestHandler handler, SolrQueryRequest req, SolrQueryResponse rsp) {
  2. 。。。。。
  3. handler.handleRequest(req,rsp);
  4. setResponseHeaderValues(handler,req,rsp);
  5. 。。。。。。。
  6. }

主要实现对请求的处理,并将请求结果的状态信息写到响应的头部

SolrRequestHandler 处理器

再看一下对请求的处理。。先看定义该请求处理器的接口,可以更好理解。只有两个方法,一个是初始化信息,主要是配置时的默认参数,另一个就是处理请求的接口。

  1. public interface SolrRequestHandler extends SolrInfoMBean {
  2. public void init(NamedList args);
  3. public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp);
  4. }

先看一下实现该接口的类RequestHandlerBase

  1. public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) {
  2. numRequests++;
  3. try {
  4. SolrPluginUtils.setDefaults(req,defaults,appends,invariants);
  5. rsp.setHttpCaching(httpCaching);
  6. handleRequestBody( req, rsp );
  7. // count timeouts
  8. NamedList header = rsp.getResponseHeader();
  9. if(header != null) {
  10. Object partialResults = header.get("partialResults");
  11. boolean timedOut = partialResults == null ? false : (Boolean)partialResults;
  12. if( timedOut ) {
  13. numTimeouts++;
  14. rsp.setHttpCaching(false);
  15. }
  16. }
  17. } catch (Exception e) {
  18. SolrException.log(SolrCore.log,e);
  19. if (e instanceof ParseException) {
  20. e = new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
  21. }
  22. rsp.setException(e);
  23. numErrors++;
  24. }
  25. totalTime += rsp.getEndTime() - req.getStartTime();
  26. }

主要记录该请求处理的状态与处理时间记录。真正的实现方法交由各个子类      handleRequestBody( req, rsp );

现在看一下SearchHandler对于搜索处理的实现方法

首先是将solrconfig.xml上配置的各个处理组件按一定顺序组装起来,先是first-Component,默认的component,last-component.这些处理组件会按照它们的顺序来执行,以下是searchHandler的实现主体。方法handleRequestBody

  1. @Override
  2. public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception, ParseException, InstantiationException, IllegalAccessException
  3. {
  4. // int sleep = req.getParams().getInt("sleep",0);
  5. // if (sleep > 0) {log.error("SLEEPING for " + sleep);  Thread.sleep(sleep);}
  6. ResponseBuilder rb = new ResponseBuilder();
  7. rb.req = req;
  8. rb.rsp = rsp;
  9. rb.components = components;
  10. rb.setDebug(req.getParams().getBool(CommonParams.DEBUG_QUERY, false));
  11. final RTimer timer = rb.isDebug() ? new RTimer() : null;
  12. if (timer == null) {
  13. // non-debugging prepare phase
  14. for( SearchComponent c : components ) {
  15. c.prepare(rb);
  16. }
  17. } else {
  18. // debugging prepare phase
  19. RTimer subt = timer.sub( "prepare" );
  20. for( SearchComponent c : components ) {
  21. rb.setTimer( subt.sub( c.getName() ) );
  22. c.prepare(rb);
  23. rb.getTimer().stop();
  24. }
  25. subt.stop()<span style="color:#FF0000;">;</span>
  26. }
  27. //单机版
  28. if (rb.shards == null) {
  29. // a normal non-distributed request
  30. // The semantics of debugging vs not debugging are different enough that
  31. // it makes sense to have two control loops
  32. if(!rb.isDebug()) {
  33. // Process
  34. for( SearchComponent c : components ) {
  35. c.process(rb);
  36. }
  37. }
  38. else {
  39. // Process
  40. RTimer subt = timer.sub( "process" );
  41. for( SearchComponent c : components ) {
  42. rb.setTimer( subt.sub( c.getName() ) );
  43. c.process(rb);
  44. rb.getTimer().stop();
  45. }
  46. subt.stop();
  47. timer.stop();
  48. // add the timing info
  49. if( rb.getDebugInfo() == null ) {
  50. rb.setDebugInfo( new SimpleOrderedMap<Object>() );
  51. }
  52. rb.getDebugInfo().add( "timing", timer.asNamedList() );
  53. }
  54. } else {//分布式请求
  55. // a distributed request
  56. HttpCommComponent comm = new HttpCommComponent();
  57. if (rb.outgoing == null) {
  58. rb.outgoing = new LinkedList<ShardRequest>();
  59. }
  60. rb.finished = new ArrayList<ShardRequest>();
  61. //起始状态为0,结束状态为整数的最大值
  62. int nextStage = 0;
  63. do {
  64. rb.stage = nextStage;
  65. nextStage = ResponseBuilder.STAGE_DONE;
  66. // call all components
  67. for( SearchComponent c : components ) {
  68. //得到所有组件运行后返回的下一个状态,并取最小值
  69. nextStage = Math.min(nextStage, c.distributedProcess(rb));
  70. }
  71. // 如果有需要向子机发送请求
  72. while (rb.outgoing.size() > 0) {
  73. // submit all current request tasks at once
  74. while (rb.outgoing.size() > 0) {
  75. ShardRequest sreq = rb.outgoing.remove(0);
  76. sreq.actualShards = sreq.shards;
  77. if (sreq.actualShards==ShardRequest.ALL_SHARDS) {
  78. sreq.actualShards = rb.shards;
  79. }
  80. sreq.responses = new ArrayList<ShardResponse>();
  81. // 向各个子机发送请求
  82. for (String shard : sreq.actualShards) {
  83. ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);
  84. params.remove(ShardParams.SHARDS);      // not a top-level request
  85. params.remove("indent");
  86. params.remove(CommonParams.HEADER_ECHO_PARAMS);
  87. params.set(ShardParams.IS_SHARD, true);  // a sub (shard) request
  88. String shardHandler = req.getParams().get(ShardParams.SHARDS_QT);
  89. if (shardHandler == null) {
  90. params.remove(CommonParams.QT);
  91. } else {
  92. params.set(CommonParams.QT, shardHandler);
  93. }
  94. //提交子请求
  95. comm.submit(sreq, shard, params);
  96. }
  97. }
  98. // now wait for replies, but if anyone puts more requests on
  99. // the outgoing queue, send them out immediately (by exiting
  100. // this loop)
  101. while (rb.outgoing.size() == 0) {
  102. ShardResponse srsp = comm.takeCompletedOrError();
  103. if (srsp == null) break;  // no more requests to wait for
  104. // Was there an exception?  If so, abort everything and
  105. // rethrow
  106. if (srsp.getException() != null) {
  107. comm.cancelAll();
  108. if (srsp.getException() instanceof SolrException) {
  109. throw (SolrException)srsp.getException();
  110. } else {
  111. throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException());
  112. }
  113. }
  114. rb.finished.add(srsp.getShardRequest());
  115. //每个组件都对于返回的数据处理
  116. for(SearchComponent c : components) {
  117. c.handleResponses(rb, srsp.getShardRequest());
  118. }
  119. }
  120. }//请求队列结束
  121. //再对该轮请求进行收尾工作
  122. for(SearchComponent c : components) {
  123. c.finishStage(rb);
  124. }
  125. //如果状态未到结束,则继续循环
  126. } while (nextStage != Integer.MAX_VALUE);
  127. }
  128. }

首先运行的是各个组件的方法prepare

  1. for( SearchComponent c : components ) {
  2. c.prepare(rb);
  3. }

再则如果不是分布式搜索,则比较简单的运行

  1. for( SearchComponent c : components ) {
  2. c.process(rb);
  3. }

就结束!

如果是分布式搜索,过程会比较复杂些,对于每个组件处理都会返回一个状态,对于以下几个方法循环执行,直到状态结束 。

在类ResponseBuilder定义了几个状态。

  1. public static int STAGE_START           = 0;
  2. public static int STAGE_PARSE_QUERY     = 1000;
  3. public static int STAGE_EXECUTE_QUERY   = 2000;
  4. public static int STAGE_GET_FIELDS      = 3000;
  5. public static int STAGE_DONE            = Integer.MAX_VALUE;

从STAGE_START---->STAGE_PARSE_QUERY------>STAGE_EXECUTE_QUERY--------------->STAGE_GET_FIELDS------------>STAGE_DONE

从这些状态名称可以猜得出整个对应的过程。

每个组件先调用方法distributeProcess,并返回下一个状态

  1. for( SearchComponent c : components ) {
  2. // the next stage is the minimum of what all components report
  3. nextStage = Math.min(nextStage, c.distributedProcess(rb));
  4. }

而方法handleResponse主要处理返回来的数据

  1. for(SearchComponent c : components) {
  2. c.handleResponses(rb, srsp.getShardRequest());
  3. }

然后交由finishStage方法来对每一个状态的过程作结束动作。

------------------------------

  1. for(SearchComponent c : components) {
  2. c.finishStage(rb);
  3. }

-----------------------------

了解这个流程有助于扩展solr。比如有个业务是要我对搜索的自然结果排序进行干预,而这个干预只针对前几页结果,所以我不得不做个组件来对其中结果进行处理。

所以我想可以添加一个组件放在最后-------------》

1)如果是分布式搜索:

这个组件可以在重写finsihStage做处理。算是对最终结果的排序处理即可。

2)如果只是单机:

这个组件可以在重写process做处理

组件

现在看一下其中一个主要的组件QueryComponent

prepare

对于QueryComponent主要解析用户传送的语法解析参数defType,以及过滤查询fq,返回字段集fl.排序字段Sort

单机处理

process

分布式搜索过程中的某一步,这里应该是主机要合并文档,取出对应的文档的过程,

主机发出指定的solr主键ids来取文档集,首先取出对应的lucene的内部id集。如果某些文档已不在则弃掉。

  1. String ids = params.get(ShardParams.IDS);
  2. if (ids != null) {//将传过来的ids,放进结果集中,并在后面取出对应的结果文档
  3. SchemaField idField = req.getSchema().getUniqueKeyField();
  4. List<String> idArr = StrUtils.splitSmart(ids, ",", true);
  5. int[] luceneIds = new int[idArr.size()];
  6. int docs = 0;
  7. for (int i=0; i<idArr.size(); i++) {
  8. //solr主键id对应的文档lucene内部的id
  9. int id = req.getSearcher().getFirstMatch(
  10. new Term(idField.getName(), idField.getType().toInternal(idArr.get(i))));
  11. if (id >= 0)
  12. luceneIds[docs++] = id;
  13. }
  14. DocListAndSet res = new DocListAndSet();
  15. //这里并没有传入scores[]
  16. res.docList = new DocSlice(0, docs, luceneIds, null, docs, 0);
  17. //需要另一种doc集合处理。
  18. if (rb.isNeedDocSet()) {
  19. List<Query> queries = new ArrayList<Query>();
  20. queries.add(rb.getQuery());
  21. List<Query> filters = rb.getFilters();
  22. if (filters != null)
  23. queries.addAll(filters);
  24. res.docSet = searcher.getDocSet(queries);
  25. }
  26. rb.setResults(res);
  27. rsp.add("response",rb.getResults().docList);
  28. return;
  29. }
    1. <pre name="code" class="java">  //封装搜索值对象与封装结果值对象
    2. SolrIndexSearcher.QueryCommand cmd = rb.getQueryCommand();
    3. //设置超时最大值
    4. cmd.setTimeAllowed(timeAllowed);
    5. SolrIndexSearcher.QueryResult result = new SolrIndexSearcher.QueryResult();
    6. //搜索
    7. searcher.search(result,cmd);
    8. //设置搜索结果
    9. rb.setResult( result );
    10. rsp.add("response",rb.getResults().docList);
    11. rsp.getToLog().add("hits", rb.getResults().docList.matches());
    12. //对含有字段排序处理
    13. doFieldSortValues(rb, searcher);
    14. //非分布查询过程,且搜索结果数小于50,进行缓存
    15. doPrefetch(rb);
    16. <pre name="code" class="java"><p>目前看到真实获取文档内容的是在</p><p>QueryResponseWriter</p><p>例如xml的输出格式类XMLWriter</p></pre><p></p>
    17. <pre></pre>
    18. <pre></pre>
    19. <br>
    20. <p></p>
    21. <h2><a name="t10"></a>分布式处理<br>
    22. </h2>
    23. <h3><a name="t11"></a>1)distributedProcess</h3>
    24. <p></p><pre name="code" class="java">  @Override
    25. public int distributedProcess(ResponseBuilder rb) throws IOException {
    26. if (rb.stage < ResponseBuilder.STAGE_PARSE_QUERY)
    27. return ResponseBuilder.STAGE_PARSE_QUERY;
    28. if (rb.stage == ResponseBuilder.STAGE_PARSE_QUERY) {
    29. createDistributedIdf(rb);
    30. return ResponseBuilder.STAGE_EXECUTE_QUERY;
    31. }
    32. if (rb.stage < ResponseBuilder.STAGE_EXECUTE_QUERY) return ResponseBuilder.STAGE_EXECUTE_QUERY;
    33. if (rb.stage == ResponseBuilder.STAGE_EXECUTE_QUERY) {
    34. //分布式查询
    35. createMainQuery(rb);
    36. return ResponseBuilder.STAGE_GET_FIELDS;
    37. }
    38. if (rb.stage < ResponseBuilder.STAGE_GET_FIELDS) return ResponseBuilder.STAGE_GET_FIELDS;
    39. if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {
    40. //这里就会去对应的主机拿取需要的字段,封装请求字段的参数,放进请求队列里,可以由外部的searchHandler提交该请求,最后结果放在ShardResponse类里。
    41. createRetrieveDocs(rb);
    42. return ResponseBuilder.STAGE_DONE;
    43. }
    44. return ResponseBuilder.STAGE_DONE;
    45. }</pre><br>
    46. <br>
    47. <p></p>
    48. <p>   <br>
    49. </p>
    50. <p><br>
    51. </p>
    52. <h3><a name="t12"></a> 2) handleResponses<br>
    53. </h3>
    54. <pre name="code" class="java"> public void handleResponses(ResponseBuilder rb, ShardRequest sreq) {
    55. if ((sreq.purpose & ShardRequest.PURPOSE_GET_TOP_IDS) != 0) {
    56. //合并ids
    57. mergeIds(rb, sreq);
    58. //合并groupCount
    59. mergeGroupCounts(rb, sreq);
    60. }
    61. if ((sreq.purpose & ShardRequest.PURPOSE_GET_FIELDS) != 0) {
    62. //获取文档的字段,并将结题组装起来放到最终结果列表对应的位置里
    63. returnFields(rb, sreq);
    64. return;
    65. }
    66. }</pre><br>
    67. <br>
    68. <h3><a name="t13"></a>   3)  finishStage</h3>
    69. <p><br>
    70. </p>
    71. <p> </p><pre name="code" class="java"> @Override
    72. public void finishStage(ResponseBuilder rb) {
    73. //这里说是==获取文档内容的值,在
    74. if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {
    75. //有些文档可能已不存在了,则忽略掉
    76. for (Iterator<SolrDocument> iter = rb._responseDocs.iterator(); iter.hasNext();) {
    77. if (iter.next() == null) {
    78. iter.remove();
    79. rb._responseDocs.setNumFound(rb._responseDocs.getNumFound()-1);
    80. }
    81. }
    82. rb.rsp.add("response", rb._responseDocs);
    83. }
    84. }
    85. </pre><br>
    86. <p></p>
    87. <p><span style="color:#FF0000"><br>
    88. </span></p>
    89. <p><span style="color:#FF0000">同样最后的结果是保存在<br>
    90. <br>
    91. ResponseBuilder <br>
    92. <br>
    93. ResponseBuilder <br>
    94. NamedList values = new SimpleOrderedMap();<br>
    95. <br>
    96. 这个字段里,以键为"response",单机存储的是lucene 的内部id列表<br>
    97. 如果是分布式,则存储的是SolrDocumentList,不用再去索引拿出对应的存储字段,<br>
    98. 这个在QueryResponseWriter里有对应的处理</span><br>
    99. </p>
    100. <p></p>
    101. <p><br>
    102. </p>
    103. <p><br>
    104. </p>
    105. <p><br>
    106. </p>
    107. <p><br>
    108. </p>
    109. <p><br>
    110. </p>
    111. <p><br>
    112. </p>
    113. <p><br>
    114. </p>
    115. <p><br>
    116. </p>
    117. <p></p>
    118. </pre>

最新文章

  1. .NET Core与.NET Framework、Mono之间的关系
  2. 如何查看MapReduce执行的程序中的输出日志
  3. DSO的记录模式Record Mode字段测试
  4. CCapture directshow 视频捕获类
  5. hadoop集群之HDFS和YARN启动和停止命令
  6. webdynpro tree控件使用
  7. 上传文件复用代码【fileUpload】
  8. 信息化建设中的IT规划精要
  9. SSM搭建遇到的坑
  10. Thread.Abort 方法
  11. python网络编程(六)
  12. mysql数据表修复
  13. mysql 查询优化~ 分页优化讲解
  14. annoy ANN算法 调参
  15. AtCoder Beginner Contest 088 (ABC)
  16. js判断数组中是不是有某个元素
  17. Sql server 账号被锁住:&quot;the account is currently locked out. The system administrator can unlock it.&quot;的解决办法(转载)
  18. 测试用例excel模板
  19. 深入理解ajax系列第二篇
  20. python 类函数

热门文章

  1. thinkphp 数据写入
  2. Lock方法是用于数据库的锁机制,
  3. LUOGU P3355 骑士共存问题(二分图最大独立集)
  4. c语言学习笔记 - 指针和数组
  5. escape encodeURI和encodeURIComponent的区别
  6. VS2010文件包含
  7. CSS中关于多个class样式设置的不同写法
  8. gstore安装
  9. python 将值相同的key分组的方法
  10. python 之 heapq (堆)