[聚合文章] rocketmq定时清理commitlog文件源码分析

消息系统 2017-12-19 41 阅读

rocketmq的配置参数

// 何时触发删除文件, 默认凌晨4点删除文件@ImportantFieldprivate String deleteWhen = "04";

猜想rocketmq会起一个一天执行一次的定时任务。
但看了代码发现并不是这样。

在存储服务启动时,启动如下的定时任务:

private void addScheduleTask() {    // 定时删除过期文件    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {        @Override        public void run() {            DefaultMessageStore.this.cleanFilesPeriodically();        }    }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

getCleanResourceInterval 默认是10s。

没10s执行CleanCommitLogService,清理过期文件。

删除时,有三个判断条件:
if (timeup || spacefull || manualDelete)
timeup就是到了定时时间点的判断。
spacefull是磁盘满的判断。
manualDelete是手动触发删除的判断。

判断定时时间的逻辑简单。
判断磁盘的,需要注意。
得到磁盘利用率的方法:
拿到commitlog的文件路径

long totalSpace = file.getTotalSpace();long freeSpace = file.getFreeSpace();

这样就可以得到磁盘空间的信息。

需要注意的是,有三个磁盘利用率的配置。

  1. diskMaxUsedSpaceRatio。我们设置的磁盘最大利用率。默认是75
  2. DiskSpaceWarningLevelRatio。磁盘空间警戒水位,超过,则停止接收新消息(出于保护自身目的)默认是90
  3. DiskSpaceCleanForciblyRatio。磁盘空间强制删除文件水位。默认是85

逻辑是:
当磁盘水位高于DiskSpaceWarningLevelRatio。会触发一个getAndMakeDiskFull操作,标记mq运行状态为磁盘满。并触发一次System.gc()。
标记cleanImmediately为true。

当磁盘水位高于DiskSpaceCleanForciblyRatio。标记cleanImmediately为true。

当上面两个条件不达到,执行getAndMakeDiskOK,将运行状态标记为ok。因为之前可能执行过getAndMakeDiskFull操作,所以这里要再执行下标记OK的操作,这就可以再次写了。也就是说如果之前达到过full状态,至少10s钟不能写,要等下一次执行周期改回ok的状态。

最后判断下,如果大于diskMaxUsedSpaceRatio。返回true。
所以我们设置的值不要大于85。

下面还有一次对storePathRootDir路径下磁盘空间的判断,逻辑一样。

下面是源码:

        /**         * 是否可以删除文件,空间是否满足         */        private boolean isSpaceToDelete() {            double ratio =                    DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;            cleanImmediately = false;            // 检测物理文件磁盘空间            {                String storePathPhysic =                        DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();                double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);                if (physicRatio > DiskSpaceWarningLevelRatio) {                    boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();                    if (diskok) {                        DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio                                + ", so mark disk full");                        System.gc();                    }                    cleanImmediately = true;                }                else if (physicRatio > DiskSpaceCleanForciblyRatio) {                    cleanImmediately = true;                }                else {                    boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();                    if (!diskok) {                        DefaultMessageStore.log.info("physic disk space OK " + physicRatio                                + ", so mark disk ok");                    }                }                if (physicRatio < 0 || physicRatio > ratio) {                    DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, "                            + physicRatio);                    return true;                }            }            // 检测逻辑文件磁盘空间            {                String storePathLogics =                        StorePathConfigHelper.getStorePathConsumeQueue(DefaultMessageStore.this                            .getMessageStoreConfig().getStorePathRootDir());                double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);                if (logicsRatio > DiskSpaceWarningLevelRatio) {                    boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();                    if (diskok) {                        DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio                                + ", so mark disk full");                        System.gc();                    }                    cleanImmediately = true;                }                else if (logicsRatio > DiskSpaceCleanForciblyRatio) {                    cleanImmediately = true;                }                else {                    boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();                    if (!diskok) {                        DefaultMessageStore.log.info("logics disk space OK " + logicsRatio                                + ", so mark disk ok");                    }                }                if (logicsRatio < 0 || logicsRatio > ratio) {                    DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, "                            + logicsRatio);                    return true;                }
                

注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。