compact处理流程分析

compact的处理与split同样。由client端与flush时检查发起。

针对compact另一个在rs生成时生成的CompactionChecker线程定期去检查是否须要做compact操作

线程运行的间隔时间通过hbase.server.thread.wakefrequency配置,默觉得10*1000ms

CompactionChecker线程主要作用:

生成通过hbase.server.thread.wakefrequency(10*1000ms)配置的定期检查region是否须要compact的检查线程,

假设须要进行compact,会在此处通过compact的线程触发compcat的请求

此实例中通过hbase.server.thread.wakefrequency(10*1000ms)配置majorcompact的优先级,

假设majorcompact的优先级大过此值,把compact的优先级设置为此值.

Store中通过hbase.server.compactchecker.interval.multiplier配置多少时间须要进行compact检查的间隔

默觉得1000ms,

compactionChecker的检查周期为wakefrequency*multiplierms,

次运行一次compact检查

a.compaction检查时发起compact的条件是

假设一个store中全部的file个数减去在做(或发起compact请求)的个数,大于或等于

hbase.hstore.compaction.min配置的值,

b.majorcompact的条件检查

通过hbase.hregion.majorcompaction配置major的检查周期,default=1000*60*60*24

通过hbase.hregion.majorcompaction.jitter配置major的浮动时间,默觉得0.2,

也就是major的时间上下浮动4.8小时

b2.检查(当前时间-major配置时间>store最小的文件生成时间)表示须要major,

b2.1>store下是否仅仅有一个文件,同一时候这个文件已经到了major的时间,

b2.1>检查ttl时间是否达到(intager.max表示没配置),达到ttl时间须要major,否则不做

b2.2>文件个数大于1,到达major的时间,须要major

Client端发起compactRegion的request

Client通过HBaseAdmin.compact发起regionserver的rpc连接,调用regionserver.compactRegion

假设传入的是tablename而不是regionname,会迭代出此table的全部region调用HRegionServer.compactRegion

由client发起。调用HRegionServer.compactRegion

publicCompactRegionResponse
compactRegion(finalRpcController
controller,

finalCompactRegionRequest
request)throwsServiceException {

try{

checkOpen();

requestCount.increment();

从onlineRegions中得到request的Hregion实例

HRegion
region=
getRegion(request.getRegion());

region.startRegionOperation(Operation.COMPACT_REGION);

LOG.info("Compacting"
+region.getRegionNameAsString());

booleanmajor
=false;

byte[]
family =null;

Storestore
=null;

假设client发起的request中传入有columnfamily的值,得到此cf的HStore

if(request.hasFamily()){

family=
request.getFamily().toByteArray();

store=
region.getStore(family);

if(store
==null){

thrownewServiceException(newIOException("columnfamily
" + Bytes.toString(family)+

"does not exist in region " +region.getRegionNameAsString()));

}

}

检查是否是major的compact请求

if(request.hasMajor()){

major=
request.getMajor();

}

假设是发起majorcompaction的操作,

if(major)
{

if(family
!=null){

store.triggerMajorCompaction();

}
else{

region.triggerMajorCompaction();

}

}

String
familyLogMsg= (family!=
null)?"for column family: " +Bytes.toString(family):"";

LOG.trace("User-triggeredcompaction
requested for region " +

region.getRegionNameAsString()+
familyLogMsg);

String
log=
"User-triggered "+ (major ?

"major ":
"")+
"compaction"+
familyLogMsg;

否则是一般compation的请求,通过compactsplitThread.requestCompaction发起compactrequest

if(family!=
null){

compactSplitThread.requestCompaction(region,store,
log,

Store.PRIORITY_USER,null);

}
else{

compactSplitThread.requestCompaction(region,log,

Store.PRIORITY_USER,null);

}

returnCompactRegionResponse.newBuilder().build();

}catch(IOException
ie){

thrownewServiceException(ie);

}

}

非major的compact处理流程

requestCompaction无论是直接传入sotre或者是region的传入,

假设传入的是region,那么会拿到region下的全部store,迭代调用每个store的compactionrequest操作。

全部的非majorcompaction request终于会通过例如以下方法发起compactionrequest

privatesynchronized CompactionRequestrequestCompactionInternal(finalHRegion
r,

finalStore
s,

finalString
why,intpriority,CompactionRequest
request,booleanselectNow)

针对store的compactionrequest处理流程

假设要对一个HBASE的表禁用掉compaction操作,能够通过createtable时配置COMPACTION_ENABLED属性

privatesynchronized CompactionRequestrequestCompactionInternal(finalHRegion
r, finalStore
s,

finalString
why,intpriority,CompactionRequest
request,booleanselectNow)

throwsIOException {

if(this.server.isStopped()

|| (r.getTableDesc()!=
null&& !r.getTableDesc().isCompactionEnabled())){

returnnull;

}

CompactionContextcompaction=
null;

此时的调用selectNow为true,(假设是系统调用,此时的selectNow为false,)

也就是在发起request到CompactSplitThread.CompactionRunner线程运行时。

假设是系统调用,传入的CompactionContext的实例为null,否则是用户发起的调用在这个地方得到compaction实例

if(selectNow){

通过HStore.requestCompaction得到一个compactionContext,计算要进行compact的storefile

并设置其request.priority为Store.PRIORITY_USER表示用户发起的request

假设是flush时发起的compact,

并设置其request.priority为hbase.hstore.blockingStoreFiles配置的值减去storefile的个数,

表示系统发起的request,

假设hbase.hstore.blockingStoreFiles配置的值减去storefile的个数==PRIORITY_USER

见生成CompactionRequest实例

compaction=
selectCompaction(r,s,priority,request);

if(compaction==
null)returnnull;//
message logged inside

}

//We assume that most
compactionsare small. So, put system
compactionsinto small

//pool; we will do selection there, and move to large pool ifnecessary.

longsize
=selectNow ?compaction.getRequest().getSize():
0;

此时好像一直就得不到largeCompactions的实例(在system时通过CompactionRunner线程检查)。

不可能大于hbase.regionserver.thread.compaction.throttle配置的值

此配置的默认值是hbase.hstore.compaction.max*2*memstoresize

ThreadPoolExecutor
pool= (!selectNow&&
s.throttleCompaction(size))

?

largeCompactions:
smallCompactions;

通过smallCompactions的线程池生成CompactionRunner线程并运行,见运行Compaction的处理线程

pool.execute(newCompactionRunner(s,r,compaction,pool));

if(LOG.isDebugEnabled()){

String
type= (pool ==smallCompactions)?

"Small ":
"Large ";

LOG.debug(type+
"Compaction requested: "+ (selectNow?
compaction.toString():
"system")

+ (why!=
null&& !why.isEmpty()?
"; Because: "+
why :
"")+ "; "+
this);

}

returnselectNow
?compaction.getRequest():
null;

}

生成CompactionRequest实例

Hstore.requestcompaction得到要进行compact的storefile,并生成一个CompactionContext

publicCompactionContextrequestCompaction(intpriority,
CompactionRequest baseRequest)

throwsIOException {

//don't even select for compaction if writes are disabled

if(!this.areWritesEnabled()){

returnnull;

}

生成一个DefaultStoreEngine.DefaultCompactionContext实例(假设storeEngine是默认的配置)

CompactionContextcompaction=
storeEngine.createCompaction();

this.lock.readLock().lock();

try{

synchronized(filesCompacting){

//First, see if
coprocessorwould want to override selection.

if(this.getCoprocessorHost()!=
null){

List<StoreFile>candidatesForCoproc=
compaction.preSelect(this.filesCompacting);

booleanoverride
=this.getCoprocessorHost().preCompactSelection(

this,candidatesForCoproc,baseRequest);

if(override){

//Coprocessoris
overriding normal file selection.

compaction.forceSelect(newCompactionRequest(candidatesForCoproc));

}

}

//Normal case -
coprocessoris not overriding file selection.

if(!compaction.hasSelection()){

假设是client端发起的compact,此时的值为true,假设是flush时发起的compact,此时的值为false

booleanisUserCompaction=
priority==
Store.PRIORITY_USER;

offPeakHours的值说明:

与2配置的小时时间内,那么配置有这两个值后。

个文件的总和的多少倍,

个待做compact的文件,第一个文件(i=0)的size是=i+max(10)-1=9。

个文件总size的大小的多少倍,假设超过了倍数,不做compact

与2配置为不等于-1,同一时候start小于end,当前做compact的时间刚好在此时间内。

多少倍这个值通过hbase.hstore.compaction.ratio.offpeak配置得到,默觉得5.0f

否则通过hbase.hstore.compaction.ratio配置得到,默觉得1.2f

booleanmayUseOffPeak=
offPeakHours.isOffPeakHour()&&

offPeakCompactionTracker.compareAndSet(false,true);

try{

调用DefaultStoreEngine.DefaultCompactionContext实例的select方法。返回true/false,

对compaction.select的详细分析说明可參见majorcompact处理流程

true表示有compactrequest,否则表示没有compactrequest

此方法终于调用RatioBasedCompactionPolicy.selectCompaction方法,

生成CompactRequest并放入到DefaultStoreEngine.DefaultCompactionContext的request属性中

得到要compact的storefile列表,放入到HStore.filesCompacting列表中

方法传入的forceMajor实例仅仅有在发起majorcompact时同一时候fileCompacting列表中没有值时,此值为true,

其他情况值都为false.就是最后一个參数的值为false

a.在compaction.select方法中得到此store中全部的storefile列表,

传入到compactionPolicy.selectCompaction方法中。

RatioBasedCompactionPolicy.selectCompaction方法处理流程:

1.检查全部的storefile的个数减去正在做compact的storefile文件个数

是否大于hbase.hstore.blockingStoreFiles配置的值。默觉得7,

比对方法:

a.假设filesCompacting(正在做compact的storefile列表)不为空

是否大于blockingStoreFiles配置的值

b.假设filesCompacting(正在做compact的storefile列表)为空

那么storefiles的个数减去正在做compact的storefile文件个数是否大于blockingStoreFiles配置的值

2.从全部的storefile列表中移出正在做compcat的storefile列表(fileCompacting列表中的数据)

得到还没做(可选的)compact的storefiles列表

3.假设columnfamily配置中的MIN_VERSIONS的值没有配置(=0)。

得到TTL配置的值(不配置=Integer.MAX_VALUE=-1)配置的值为秒为单位,否则得到Long.MAX_VALUE

4.检查假设hbase.store.delete.expired.storefile配置的值为true(default=true),同一时候ttl非默认值

中得到的storefile列表中得到ttl超时的全部storefile列表。

4.1假设有ttl过期的storefile,生成这些storefile的CompactionRequest请求并返回

4.2假设没有ttl过期的storefile,(控制大文件先不做小的compact)

把storefile列表中size超过hbase.hstore.compaction.max.size配置的storefile移出。默觉得Long.MAX_VALUE

5.检查storefile是否须要做majorcompact操作,

5.1得到通过hbase.hregion.majorcompaction配置的值默觉得1000*60*60*24*7

5.2得到通过hbase.hregion.majorcompaction.jitter配置的值。默觉得0.5f

天之间)

小时,那么运行时间的加减为4.8个小时

5.4假设还没有超过配置的时间,表示不须要发做majorcompact,

5.5假设在时间范围内或超过此配置的时间。表示须要做majorcompact,

a.同一时候假设仅仅有一个storefile此storefile的最小更新时间已经超过了ttl的配置时间,须要做majorcompact

b.假设有多个storefile文件。表示须要做majorcompat.

成立的条件下。

  1. 的检查条件都成立,或者此region(有个split操作。References文件),。表示升级为major的compact

  2. 假设没有升级成major的compact,把storefile列表中的blukload的file移出

  3. 计算出最大的几个storefile,也就是filesize的值是后面几个文件的size的多少倍,

    把超过倍数的storefile移出。不做compact

    能够看上面对offPeakHours的值说明:

10.假设如今还有须要做compcat的storefile列表,检查文件个数是否达到最小compact的配置的值。

通过hbase.hstore.compaction.min配置,默觉得3,老版本号通过hbase.hstore.compactionThreshold配置

假设没有达到最小的配置值。不做compact

12.生成并返回一个CompactionRequest的实例。假设非major,同一时候在offPeakHours的值说明的时间内,

把CompactionRequest的isOffPeak设置为true,否则设置为false(major)

compaction.select(this.filesCompacting,isUserCompaction,

mayUseOffPeak,forceMajor&&
filesCompacting.isEmpty());

}
catch(IOException
e){

if(mayUseOffPeak){

offPeakCompactionTracker.set(false);

}

throwe;

}

assertcompaction.hasSelection();

if(mayUseOffPeak&&
!compaction.getRequest().isOffPeak()){

//Compaction policy doesn't want to take advantage of off-peak.

offPeakCompactionTracker.set(false);

}

}

if(this.getCoprocessorHost()!=
null){

this.getCoprocessorHost().postCompactSelection(

this,ImmutableList.copyOf(compaction.getRequest().getFiles()),baseRequest);

}

//Selected files; see if we have a compaction with some custom baserequest.

if(baseRequest!=
null){

//Update the request with what the system thinks the request should be;

//its up to the request if it wants to listen.

compaction.forceSelect(

baseRequest.combineWith(compaction.getRequest()));

}

//Finally, we have the resulting files list. Check if we have any filesat all.

finalCollection<StoreFile>selectedFiles=
compaction.getRequest().getFiles();

if(selectedFiles.isEmpty()){

returnnull;

}

//Update filesCompacting (check that we do not try to compact the sameStoreFile twice).

if(!Collections.disjoint(filesCompacting,selectedFiles)){

Preconditions.checkArgument(false,"%s
overlaps with %s",

selectedFiles,filesCompacting);

}

把当前要运行compact的storefile列表加入到HStore.filesCompacting中。

filesCompacting.addAll(selectedFiles);

通过storefile的seqid按从小到大排序

Collections.sort(filesCompacting,StoreFile.Comparators.SEQ_ID);

//If we're
enqueuinga major, clear the force flag.

假设当前要做compact的文件个数等待当前sotre中全部的storefile个数,把当前的compact提升为major

booleanisMajor
=selectedFiles.size()==
this.getStorefilesCount();

this.forceMajor=
this.forceMajor&& !isMajor;

//Set common request properties.

//Set priority, either override value supplied by caller or from store.

compaction.getRequest().setPriority(

(priority!=
Store.NO_PRIORITY)?
priority:
getCompactPriority());

compaction.getRequest().setIsMajor(isMajor);

compaction.getRequest().setDescription(

getRegionInfo().getRegionNameAsString(),getColumnFamilyName());

}

}finally{

this.lock.readLock().unlock();

}

LOG.debug(getRegionInfo().getEncodedName()+
" - "+
getColumnFamilyName()+
": Initiating "

+(compaction.getRequest().isMajor()?
"major":
"minor")+ " compaction");

this.region.reportCompactionRequestStart(compaction.getRequest().isMajor());

returncompaction;

}

运行Compaction的处理流程

在compact运行时是通过指定的线程池生成并运行CompactSplitThread.CompactionRunner线程

下面是线程运行的详细说明:

publicvoid
run(){

Preconditions.checkNotNull(server);

if(server.isStopped()

|| (region.getTableDesc()!=
null&& !region.getTableDesc().isCompactionEnabled())){

return;

}

//Common case - system compaction without a file selection. Select now.

假设compaction==null表示是systemcompact非用户发起的compaction得到一个compactionContext

if(this.compaction==
null){

queuedPriority的值在此线程实例生成时默认是hbase.hstore.blockingStoreFiles配置的值减去storefile的个数

时返回2,否则返回相减的值

intoldPriority =
this.queuedPriority;

又一次拿到hbase.hstore.blockingStoreFiles配置的值减去storefile的个数的值。

this.queuedPriority=
this.store.getCompactPriority();

假设这次拿到的值比上次的值要大,表示有storefile被删除(基本上是有compact完毕)

if(this.queuedPriority>
oldPriority){

//Store priority decreased while we were in queue (due to some othercompaction?),

//requeuewith
new priority to avoid blocking potential higher priorities.

结束本次线程调用。发起一个新的线程调用,用最新的priority

this.parent.execute(this);

return;

}

try{

通过HStore.requestCompaction得到一个compactionContext,计算要进行compact的storefile

并设置其request.priority为hbase.hstore.blockingStoreFiles配置的值减去storefile的个数,

表示系统发起的request,

假设hbase.hstore.blockingStoreFiles配置的值减去storefile的个数==PRIORITY_USER

假设是client时发起的compact,此处会设置其request.priority为Store.PRIORITY_USER表示是用户发起的request

见生成CompactionRequest实例

this.compaction=
selectCompaction(this.region,this.store,queuedPriority,null);

}
catch(IOException
ex){

LOG.error("Compactionselection
failed " + this,ex);

server.checkFileSystem();

return;

}

if(this.compaction==
null)return;// nothing to do

//Now see if we are in correct pool for the size; if not, go to thecorrect one.

//We might end up waiting for a while, so cancel the selection.

assertthis.compaction.hasSelection();

此处检查上面提到无用的地方:

compaction.getRequest().getSize()的大小为全部当此要做compact的storefile的总大小

检查是否大于hbase.regionserver.thread.compaction.throttle配置的值

此配置的默认值是hbase.hstore.compaction.max*2*memstoresize

假设大于指定的值,使用largeCompactions,否则使用smallCompactions

ThreadPoolExecutor
pool=
store.throttleCompaction(

compaction.getRequest().getSize())?

largeCompactions:
smallCompactions;

假设发现当前又一次生成的运行线程池不是上次选择的线程池,结束compaction操作,

并又一次通过新的线程池运行当前线程,结束当前线程的调用运行

if(this.parent!=
pool) {

this.store.cancelRequestedCompaction(this.compaction);

this.compaction=
null;

this.parent=
pool;

this.parent.execute(this);

return;

}

}

//Finally we can compact something.

assertthis.compaction!=
null;

this.compaction.getRequest().beforeExecute();

try{

//Note: please don't put single-compaction logic here;

// put it into region/store/etc. This is CST logic.

longstart
=EnvironmentEdgeManager.currentTimeMillis();

调用HRegion.compact方法,此方法调用HStore.compact方法,把CompactionContext传入

此方法调用返回compact是否成功。假设成功返回true,否则返回false

booleancompleted
=region.compact(compaction,store);

longnow
=EnvironmentEdgeManager.currentTimeMillis();

LOG.info(((completed)?
"Completed":
"Aborted")+
" compaction: "+

this+
"; duration="+ StringUtils.formatTimeDiff(now,start));

if(completed){

的值,表示还须要进行compact操作,又一次再发起一次compact的request

//degenerate case: blocked regions require recursive enqueues

if(store.getCompactPriority()<=
0) {

requestSystemCompaction(region,store,"Recursive
enqueue");

}
else{

此时表示compact操作完毕后,storefile的个数在配置的范围内,不须要在做compact。

检查是否须要split,假设须要发起split操作。

Split的发起条件:

a.splitlimit,hbase.regionserver.regionSplitLimit配置的值大于当前rs中的allonlineregions

默觉得integer.maxvalue

b.a检查通过的同一时候hbase.hstore.blockingStoreFiles配置的值减去storefile的个数

大于等于Store.PRIORITY_USER(1)

c.非meta与namespace表。同一时候其他条件见split的分析部分

//see if the compaction has caused us to exceed max region size

requestSplit(region);

}

}

}
catch(IOException
ex){

IOException
remoteEx= RemoteExceptionHandler.checkIOException(ex);

LOG.error("Compactionfailed
" + this,remoteEx);

if(remoteEx!=
ex) {

LOG.info("Compactionfailed
at original callstack: " +formatStackTrace(ex));

}

server.checkFileSystem();

}
catch(Exception
ex){

LOG.error("Compactionfailed
" + this,ex);

server.checkFileSystem();

}
finally{

LOG.debug("CompactSplitThreadStatus:
" +CompactSplitThread.this);

}

this.compaction.getRequest().afterExecute();

}

Hstore.compact方法流程:

publicList<StoreFile>compact(CompactionContextcompaction)
throwsIOException {

assertcompaction!=
null&&
compaction.hasSelection();

CompactionRequest
cr=
compaction.getRequest();

得到要做compact的storefile列表

Collection<StoreFile>filesToCompact=
cr.getFiles();

assert!filesToCompact.isEmpty();

synchronized(filesCompacting){

//sanity check: we're compacting files that this store knows about

//TODO:change
this to LOG.error() after more debugging

Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));

}

//Ready to go. Have list of files to compact.

LOG.info("Startingcompaction
of " +filesToCompact.size()+
" file(s) in "

+
this+
" of "+
this.getRegionInfo().getRegionNameAsString()

+
"into tmpdir=" +
fs.getTempDir()+
", totalSize="

+StringUtils.humanReadableInt(cr.getSize()));

longcompactionStartTime=
EnvironmentEdgeManager.currentTimeMillis();

List<StoreFile>sfs
= null;

try{

运行compact操作,把全部的storefile全并成一个storefile。放入到store/.tmp文件夹下

通过DefaultCompactor.compact操作,把原有的全部storefile生成一个StoreFileScanner列表,

并生成一个StoreScanner把StoreFileScanner列表增加。

假设compact提升成了major,ScanType=COMPACT_DROP_DELETES,否则等于COMPACT_RETAIN_DELETES

针对compact的数据scan可參见后期分析的scan流程

//Commence the compaction.

List<Path>newFiles
=compaction.compact();

假设hbase.hstore.compaction.complete设置为false,检查storefile生成是否可用

//TODO:get
rid of this!

if(!this.conf.getBoolean("hbase.hstore.compaction.complete",true)){

LOG.warn("hbase.hstore.compaction.completeis
set to false");

sfs=
newArrayList<StoreFile>();

for(Path
newFile:
newFiles){

//Create
storefilearound what we wrote with a reader on it.

StoreFile
sf=
createStoreFileAndReader(newFile);

sf.closeReader(true);

sfs.add(sf);

}

returnsfs;

}

把生成的新的storefile加入到cf的文件夹下。并返回生成后的storefile,此storefile已经生成好reader

//Do the steps necessary to complete the compaction.

sfs=
moveCompatedFilesIntoPlace(cr,newFiles);

生成一个compaction的说明信息,写入到wal日志中

writeCompactionWalRecord(filesToCompact,sfs);

把原有的storefile列表中store中的storefiles列表中移出,

并把新的storefile加入到storefiles列表中。对storefiles列表又一次排序,通过storefile.seqid

storefiles列表是scan操作时对store中的查询用的storefile与reader

从HStore.filesCompacting列表中移出完毕compact的storefiles列表

replaceStoreFiles(filesToCompact,sfs);

从hdfs中此store下移出compact完毕的storefile文件列表。

//At this point the store will use new files for all new scanners.

completeCompaction(filesToCompact);//
Archive old files & update storesize.

}finally{

从HStore.filesCompacting列表中移出完毕compact的storefiles列表,假设compact完毕此时没有要移出的文件

假设compact失败,此时把没有compact的文件移出

finishCompactionRequest(cr);

}

logCompactionEndMessage(cr,sfs,compactionStartTime);

returnsfs;

}

major的compact处理流程

majorCompaction无论是直接传入sotre或者是region的传入。

假设传入的是region,那么会拿到region下的全部store,迭代调用每个store的triggerMajorCompaction操作。

Hstore.triggerMajorCompaction操作流程:设置store中的forcemajor的值为true

publicvoid
triggerMajorCompaction(){

this.forceMajor=
true;

}

设置完毕forceMajor的值后,终于还是直接触发requestCompaction方法

if(family!=
null) {

compactSplitThread.requestCompaction(region,store,
log,

Store.PRIORITY_USER,null);

}
else{

compactSplitThread.requestCompaction(region,log,

Store.PRIORITY_USER,null);

}

requestCompaction的处理流程大至与非major的coompact处理流程无差别:

CompactSplitThread.requestCompaction-->requestCompactionInternal-->selectCompaction

-->Hstore.requestCompaction(priority,request)得到compactionContext

代码细节例如以下所看到的:

是否是用户发起的compaction操作

booleanisUserCompaction=
priority==
Store.PRIORITY_USER;

下面代码返回为true的条件:

a.hbase.offpeak.start.hour的值不等于-1(0-23之间的值)

b.hbase.offpeak.end.hour的值不等-1(0-23之间的值),同一时候此值大于a配置的值

c.当前时间的小时部分在a与b配置的时间之间

否则返回的值为false

booleanmayUseOffPeak=
offPeakHours.isOffPeakHour()&&

offPeakCompactionTracker.compareAndSet(false,true);

try{

此时最后一个參数为true(在没有其他的compact操作的情况下,同一时候指定的compact模式为major),

compaction.select(this.filesCompacting,isUserCompaction,

mayUseOffPeak,forceMajor&&
filesCompacting.isEmpty());

}
catch(IOException
e){

if(mayUseOffPeak){

offPeakCompactionTracker.set(false);

}

throwe;

}

以上代码的中的compaction.select默认调用为DefaultStoreEngine.DefaultCompactionContext.select方法

publicbooleanselect(List<StoreFile>filesCompacting,booleanisUserCompaction,

booleanmayUseOffPeak,booleanforceMajor)throwsIOException
{

调用RatioBasedCompactionPolicy.selectCompaction得到一个CompactionRequest,

并把此request设置到当前compaction实例的request属性中

request=
compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),

filesCompacting,isUserCompaction,mayUseOffPeak,forceMajor);

returnrequest!=
null;

}

RatioBasedCompactionPolicy.selectCompaction处理流程说明:

publicCompactionRequest
selectCompaction(Collection<StoreFile>candidateFiles,

finalList<StoreFile>filesCompacting,finalbooleanisUserCompaction,

finalbooleanmayUseOffPeak,finalbooleanforceMajor)throwsIOException
{

//Preliminary compaction subject to filters

ArrayList<StoreFile>candidateSelection=
newArrayList<StoreFile>(candidateFiles);

//Stuck and not compacting enough (estimate). It is not guaranteed thatwe will be

//able to compact more if stuck and compacting, because ratio policyexcludes some

//non-compacting files from consideration during compaction (seegetCurrentEligibleFiles).

intfutureFiles=
filesCompacting.isEmpty()? 0 : 1;

此store下全部的storefile的个数减去当前已经在做compact的个数是否大于blockingfile的配置个数

booleanmayBeStuck=
(candidateFiles.size()-
filesCompacting.size()+
futureFiles)

>=storeConfigInfo.getBlockingFileCount();

得到可选择的storefile,也就是得到全部的storefile中不包括正在做compact的sotrefile的列表

candidateSelection=
getCurrentEligibleFiles(candidateSelection,filesCompacting);

LOG.debug("Selectingcompaction
from " +candidateFiles.size()+
" store files, "+

filesCompacting.size()+
" compacting, "+
candidateSelection.size()+

"eligible, " +storeConfigInfo.getBlockingFileCount()+
" blocking");

得到配置的ttl过期时间,通过在cf的表属性中配置TTL属性,

假设配置值为Integer.MAX_VALUE或者-1或者不配置,表示不控制ttl,

TTL属性生效的前提是MIN_VERSIONS属性不配置,TTL属性配置单位为秒

假设以上条件检查通过表示有配置ttl,返回ttl的配置时间,否则返回Long.maxvalue

longcfTtl
=this.storeConfigInfo.getStoreFileTtl();

假设不是发起的major操作。

同一时候配置有ttl过期时间。同一时候hbase.store.delete.expired.storefile配置的值为true,默觉得true,

同一时候ttl属性有配置,

得到当前未做compact操作的全部sotrefile中ttl过期的storefile,

假设有ttl过期的storefile文件。生成CompactionRequest实例,并结束此流程处理

if(!forceMajor){

//If there are expired files, only select them so that compactiondeletes them

if(comConf.shouldDeleteExpired()&&
(cfTtl!= Long.MAX_VALUE)){

ArrayList<StoreFile>expiredSelection=
selectExpiredStoreFiles(

candidateSelection,EnvironmentEdgeManager.currentTimeMillis()-
cfTtl);

if(expiredSelection!=
null){

returnnewCompactionRequest(expiredSelection);

}

}

假设非major把storefile中非reference(split后的文件为reference文件)的storefile文件。

同一时候storefile的大小超过了hbase.hstore.compaction.max.size配置的最大storefile文件限制大小

移出这些文件

candidateSelection=
skipLargeFiles(candidateSelection);

}

//Force a major compaction if this is a user-requested majorcompaction,

//or if we do not have too many files to compact and this was requested

//as a major compaction.

//Or, if there are any references among the candidates.

此处检查major的条件包括下面几个:

(forceMajor&&
isUserCompaction)

a.假设是用户发起的compaction,同一时候用户发起的compaction是major的compact,

同一时候store中没有其他正在做compact的storefile,此值为true

((forceMajor||
isMajorCompaction(candidateSelection))

&&(candidateSelection.size()<
comConf.getMaxFilesToCompact()))

个条件,第一个(b1)与第二个(b2)为一个通过即可,第三个(b3)必须通过

forceMajor

b1.假设是发起的compaction,同一时候store中没有其他正在做compact的storefile

isMajorCompaction(candidateSelection)

b2.或者下面几个条件检查通过:

b2.1.可选的storefile列表中改动时间最老的一个storefile的时间达到了间隔的majorcompact时间

b2.2.假设可选的storefile列表中仅仅有一个storefile,同一时候此storefile的最老的一条数据的时间已经达到ttl时间

同一时候此storefile的时间达到了间隔的major时间间隔

b2.3.假设可选的storefile列表中有多少storefile。同一时候更新时间最老的一个storefile达到了major的时间间隔

b2.4.也就是storefile列表中最老的更新时间的一个storefile的时间达到了间隔的major时间。

可是可选的storefile个数仅仅有一个。同一时候此storefile已经做过major(StoreFile.majorCompaction==true)

同一时候ttl时间没有配置或者ttl还没有过期那么此时这个storefile是不做majorcompact

通过hbase.hregion.majorcompaction配置major的间隔时间,

通过hbase.hregion.majorcompaction.jitter配置major的间隔的左右差

小时,同一时候间隔的左右差是0.2f,那么default=
20% = +/- 4.8 hrs

(candidateSelection.size()<
comConf.getMaxFilesToCompact())

b3.可选的storefile列表的个数小于compactmaxfiles的配置个数,

StoreUtils.hasReferences(candidateSelection)

c.假设storefile列表中包括有reference(split后的文件为reference文件)的storefile

booleanmajorCompaction=
(

(forceMajor&&
isUserCompaction)

|| ((forceMajor||
isMajorCompaction(candidateSelection))

&&(candidateSelection.size()<
comConf.getMaxFilesToCompact()))

||StoreUtils.hasReferences(candidateSelection)

);

假设是非major的compact

if(!majorCompaction){

//we're doing a minor compaction, let's see what files are applicable

从可选的storefile列表中移出是bulkload的storefile

candidateSelection=
filterBulk(candidateSelection);

假设可选的storefile列表中的个数大于或等于hbase.hstore.compaction.max配置的值,

移出可选的storefile列表中最大的几个storefile,

通过例如以下说明来计算什么文件算是较大的storefile:

a.storefile的文件大小是后面几个文件的总和的多少倍数,倍数的说明在例如以下几行中进行了说明,

与2配置的小时时间内,那么配置有这两个值后,

个文件的总和的多少倍,

个待做compact的文件,第一个文件(i=0)的size是=i+max(10)-1=9,

个文件总size的大小的多少倍,假设超过了倍数,不做compact

与2配置为不等于-1,同一时候start小于end,当前做compact的时间刚好在此时间内,

多少倍这个值通过hbase.hstore.compaction.ratio.offpeak配置得到,默觉得5.0f

否则通过hbase.hstore.compaction.ratio配置得到,默觉得1.2f

b.storefile的大小必须是大于hbase.hstore.compaction.min.size配置的值。默认是memstore的大小

c.假设如今全部的storefile的个数减去正在做compact的storefile个数大于

通过,移出最大的几个storefile,

仅仅保留通过hbase.hstore.compaction.min配置的个数,默觉得3(配置不能小于2)

老版本号通过hbase.hstore.compactionThreshold配置

candidateSelection=
applyCompactionPolicy(candidateSelection,mayUseOffPeak,mayBeStuck);

检查可选的能做compact的文件个数是否达到最少文件要求。假设没有达到,清空全部可选的storefile列表值

candidateSelection=
checkMinFilesCriteria(candidateSelection);

}

假设不是用户发起的major的compact。移出可选的storefile列表中超过hbase.hstore.compaction.max配置的个数

candidateSelection=
removeExcessFiles(candidateSelection,isUserCompaction,majorCompaction);

生成CompactionRequest实例

CompactionRequest
result=
newCompactionRequest(candidateSelection);

假设非major同一时候offpeak有配置,同一时候当前时间在配置的时间范围内,设置CompactionRequest的offpeak为true

表示当前时间是非高峰时间内

result.setOffPeak(!candidateSelection.isEmpty()&&
!majorCompaction&&
mayUseOffPeak);

returnresult;

}

运行compaction的详细处理。见非majorcompaction处理流程中的运行compaction处理流程

flush时的compaction

flush时的compaction通过MemStoreFlusher.FlusherHander.run运行

当flushRegion完毕后,会触发compact的运行

CompactSplitThread.requestSystemCompaction-->requestCompactionInternal(region)

publicsynchronized
voidrequestSystemCompaction(

finalHRegion
r,finalString
why)throwsIOException {

requestCompactionInternal(r,why,Store.NO_PRIORITY,null,false);

}

CompactSplitThread.requestCompactionInternal(Region)-->requestCompactionInternal(Store)

privateList<CompactionRequest>requestCompactionInternal(finalHRegion
r, finalString
why,

intp,List<Pair<CompactionRequest,Store>>requests,booleanselectNow)throwsIOException
{

//not a special compaction request, so make our own list

List<CompactionRequest>ret
= null;

if(requests==
null){

ret=
selectNow?
newArrayList<CompactionRequest>(r.getStores().size()):
null;

for(Stores
:r.getStores().values()){

迭代发起针对store的compaction操作,传入的priority=Store.NO_PRIORITY,可參见非majorcompact处理流程

CompactionRequest
cr=
requestCompactionInternal(r,s,
why,p,
null,selectNow);

if(selectNow)ret.add(cr);

}

}else{

Preconditions.checkArgument(selectNow);//
only system requests have selectNow== false

ret=
newArrayList<CompactionRequest>(requests.size());

for(Pair<CompactionRequest,Store>pair
:requests) {

ret.add(requestCompaction(r,pair.getSecond(),why,
p,pair.getFirst()));

}

}

returnret;

}

定时线程运行的compact流程

定期线程运行通过HRegionServer.CompactionChecker运行,

CompactionChecker线程主要作用:

生成通过hbase.server.thread.wakefrequency(10*1000ms)配置的定期检查region是否须要compact的检查线程,

假设须要进行compact,会在此处通过compact的线程触发compcat的请求

此实例中通过hbase.server.thread.wakefrequency(10*1000ms)配置majorcompact的优先级,

假设majorcompact的优先级大过此值,把compact的优先级设置为此值.

Store中通过hbase.server.compactchecker.interval.multiplier配置多少时间须要进行compact检查的间隔

默觉得1000ms,

compactionChecker的检查周期为wakefrequency*multiplierms,

次运行一次compact检查

a.compaction检查时发起compact的条件是

假设一个store中全部的file个数减去在做(或发起compact请求)的个数,大于或等于

hbase.hstore.compaction.min配置的值,

b.majorcompact的条件检查

通过hbase.hregion.majorcompaction配置major的检查周期,default=1000*60*60*24

通过hbase.hregion.majorcompaction.jitter配置major的浮动时间,默觉得0.2,

也就是major的时间上下浮动4.8小时

b2.检查(当前时间-major配置时间>store最小的文件生成时间)表示须要major,

b2.1>store下是否仅仅有一个文件,同一时候这个文件已经到了major的时间,

b2.1>检查ttl时间是否达到(intager.max表示没配置),达到ttl时间须要major,否则不做

b2.2>文件个数大于1,到达major的时间,须要major

protectedvoid
chore(){

for(HRegion
r: this.instance.onlineRegions.values()){

if(r
== null)

continue;

for(Stores
:r.getStores().values()){

try{

longmultiplier=
s.getCompactionCheckMultiplier();

assertmultiplier>
0;

if(iteration%
multiplier!= 0)
continue;

检查是否须要system的compact,当前全部的storefile个数减去正在做compact的storefile个数,

大于或等于hbase.hstore.compaction.min配置的值。表示须要compact

if(s.needsCompaction()){

//Queue a compaction. Will recognize if major is needed.

发起系统的compact操作。flush时的coompaction

this.instance.compactSplitThread.requestSystemCompaction(r,s,getName()

              • "requests compaction");

b2.或者下面几个条件检查通过:

b2.1.可选的storefile列表中改动时间最老的一个storefile的时间达到了间隔的majorcompact时间

b2.2.假设可选的storefile列表中仅仅有一个storefile,同一时候此storefile的最老的一条数据的时间已经达到ttl时间

同一时候此storefile的时间达到了间隔的major时间间隔

b2.3.假设可选的storefile列表中有多少storefile,同一时候更新时间最老的一个storefile达到了major的时间间隔

b2.4.也就是storefile列表中最老的更新时间的一个storefile的时间达到了间隔的major时间,

可是可选的storefile个数仅仅有一个,同一时候此storefile已经做过major(StoreFile.majorCompaction==true)

同一时候ttl时间没有配置或者ttl还没有过期那么此时这个storefile是不做majorcompact

通过hbase.hregion.majorcompaction配置major的间隔时间。

通过hbase.hregion.majorcompaction.jitter配置major的间隔的左右差

小时,同一时候间隔的左右差是0.2f,那么default=
20% = +/- 4.8 hrs

}
elseif(s.isMajorCompaction()){

if(majorCompactPriority==
DEFAULT_PRIORITY

||majorCompactPriority>
r.getCompactPriority()){

发起requestCompaction操作,见以下说明A

this.instance.compactSplitThread.requestCompaction(r,s,getName()

+
"requests major compaction; use default priority",null);

}
else{

发起requestCompaction操作,见以下说明B

this.instance.compactSplitThread.requestCompaction(r,s,getName()

+
"requests major compaction; use configured priority",

this.majorCompactPriority,null);

}

}

}
catch(IOException
e){

LOG.warn("Failedmajor
compaction check on " + r,e);

}

}

}

iteration= (iteration==
Long.MAX_VALUE)?

0 : (iteration+ 1);

}

}

说明A:

CompactSplitThread.requestCompaction-->

requestCompaction(r,s,
why,Store.NO_PRIORITY,request);

-->requestCompactionInternal(r,s,
why,priority,request,true);此时设置selectNow为true

说明B:

CompactSplitThread.requestCompaction-->

requestCompactionInternal(r,s,
why,priority,request,true);此时设置selectNow为true

-------------------------------------------------------------

requestCompactionInternal处理流程:

privatesynchronized CompactionRequestrequestCompactionInternal(finalHRegion
r,

finalStore
s,

finalString
why,intpriority,CompactionRequest
request,booleanselectNow)

针对store的compactionrequest处理流程

假设要对一个HBASE的表禁用掉compaction操作,能够通过createtable时配置COMPACTION_ENABLED属性

privatesynchronized CompactionRequestrequestCompactionInternal(finalHRegion
r, finalStore
s,

finalString
why,intpriority,CompactionRequest
request,booleanselectNow)

throwsIOException {

if(this.server.isStopped()

|| (r.getTableDesc()!=
null&& !r.getTableDesc().isCompactionEnabled())){

returnnull;

}

CompactionContextcompaction=
null;

此时的调用selectNow为true,(假设是系统调用,此时的selectNow为false,)

也就是在发起request到CompactSplitThread.CompactionRunner线程运行时,

假设是系统调用,传入的CompactionContext的实例为null,否则是用户发起的调用在这个地方得到compaction实例

if(selectNow){

通过HStore.requestCompaction得到一个compactionContext,计算要进行compact的storefile

并设置其request.priority为Store.PRIORITY_USER表示用户发起的request

假设是flush时发起的compact,

并设置其request.priority为hbase.hstore.blockingStoreFiles配置的值减去storefile的个数,

表示系统发起的request,

假设hbase.hstore.blockingStoreFiles配置的值减去storefile的个数==PRIORITY_USER

见生成CompactionRequest实例

compaction=
selectCompaction(r,s,priority,request);

if(compaction==
null)returnnull;//
message logged inside

}

//We assume that most
compactionsare small. So, put system
compactionsinto small

//pool; we will do selection there, and move to large pool ifnecessary.

longsize
=selectNow ?

compaction.getRequest().getSize():
0;

不可能大于hbase.regionserver.thread.compaction.throttle配置的值

此配置的默认值是hbase.hstore.compaction.max*2*memstoresize

ThreadPoolExecutor
pool= (!selectNow&&
s.throttleCompaction(size))

?
largeCompactions:
smallCompactions;

通过smallCompactions的线程池生成CompactionRunner线程并运行,见运行Compaction的处理线程

pool.execute(newCompactionRunner(s,r,compaction,pool));

if(LOG.isDebugEnabled()){

String
type= (pool ==smallCompactions)?
"Small ":
"Large ";

LOG.debug(type+
"Compaction requested: "+ (selectNow?
compaction.toString():
"system")

+ (why!=
null&& !why.isEmpty()?
"; Because: "+
why :
"")+ "; "+
this);

}

returnselectNow
?

compaction.getRequest():
null;

}

最新文章

  1. CentOS7安装Nginx并部署
  2. javaweb学习总结一(eclipse常用快捷键、debug调试以及junit测试框架)
  3. CSDN 高校俱乐部: 排列搜索
  4. JavaScript里的依赖注入
  5. 本地Linux服务器上配置Git
  6. AI 学习新的开始
  7. 什么是TensorFlow?
  8. 【c的文件操作】文本文件和二进制文件(内存映像)的不同 文件结尾判断feof , EOF
  9. Table组件设置文字超出宽度显示省略号,鼠标悬停以悬浮框显示
  10. 解决 linux 下面解压缩 中文文件名乱码问题的方法 unzip -O CP936
  11. vue插件 使用use注册Vue全局组件和全局指令
  12. 免花生壳 TCP测试 DTU测试 GPRS测试TCP服务器
  13. 2017-2018 ACM-ICPC Southeast Regional Contest (Div. 1)
  14. spring-cloud-config——Quick Start
  15. ThreadPoolExecutor代码解析
  16. CF710F String Set Queries
  17. 【BZOJ3626】[LNOI2014]LCA
  18. php+js 防止被抓包篡改数据,数据签名校验
  19. MMIO----Wav格式文件解析
  20. BootstrapClassloader ExtClassloader AppClassloader

热门文章

  1. 【转】mysql的数据类型
  2. scala的Class
  3. java中使用String的replace方法替换html模板保存文件
  4. [ BZOJ 3038 &amp; 3211 / SPOJ GSS4 ] 上帝造题七分钟2 / 花神游历各国
  5. Win32子窗口的创建
  6. DeltaFish 校园物资共享平台 第八次小组会议
  7. 关于 WebView 的一些笔记
  8. python中struct.pack()函数和struct.unpack()函数
  9. jquery 零碎笔记
  10. Springboot使用JdbcTemplate RowMapper查询,直接返回实体列表