今天在用 Laravel 批量处理一批数据时,用 chunkById 处理200多万数据时,发现处理速度太慢了,毕竟 LaravelchunkById 处理大数据集时,单线程处理往往效率低下,尤其是超过百万条记录进行操作的情况下。

本文将介绍如何利用Java多线程技术,在Spring Boot项目中高效处理一个包含200多万条记录的产品表,并分享一些关键的注意事项。

背景

我们有一个products表,其中存储了大量产品信息。为了执行一些复杂的业务逻辑,如更新产品状态、计算统计信息等,我们需要遍历并处理这些数据。

Laravel 实现

我们先看 Laravel 是怎么实现的:

    public function handle(): void
    {
        $i = 0;
        // 分500一个批次处理
        Product::chunkById(500, function ($products) use (&$i) {
            /** @var Product $product */
            foreach ($products as $product) {
                $i ++;
                $this->info('i=' . $i . ', product id: ' . $product->id);
                try {
                    // your code here
                }catch (\Throwable $e){
                    Log::error('product id: ' . $product->id . ' error: ' . $e->getMessage());
                }
            }
        });
    }

Java 多线程实现

我这边用的是 Spring boot + mybatis plus 的框架,就只上关键代码了:

    public void batchProcessProducts() {
        Long lastId = null;

        while (true) {
            List<ProductPO> productList = fetchNextBatch(lastId);
            if (productList.isEmpty()) {
                break;
            }

            List<? extends Future<?>> futures = productList.stream()
                    .map(product -> executorService.submit(() -> processProduct(product)))
                    .toList();

            // 等待所有线程完成
            futures.forEach(future -> {
                try {
                    future.get();
                } catch (Exception e) {
                    log.debug(e.getMessage());
                }
            });

            // 更新lastId为当前批次中最后一个产品的ID
            lastId = Long.valueOf(productList.get(productList.size() - 1).getId());
        }
    }

    private List<ProductPO> fetchNextBatch(Long lastId) {
        // 如果有lastId,则查询从该ID之后的数据,否则查询前BATCH_SIZE条数据
        if (lastId != null) {
            return productMapper.selectByLastId(lastId, BATCH_SIZE);
        } else {
            return productMapper.selectFirstBatch(BATCH_SIZE);
        }
    }

这里用了框架的默认线程池去完成,即:ExecutorService能够管理多个线程的生命周期。我们将每一批次的数据分成多个任务,并提交给线程池执行。通过等待每个Future的结果future.get(),我们可以确保所有任务都已完成。

注意,并没有用MyBatis Plus的分页来做分组,原因参考:

使用默认的ExecutorServiceThreadPoolTaskExecutor通常已经能满足大部分的需求,但自定义线程池可以提供更精细的控制,例如:

  • 线程数量:你可以根据系统的负载和资源限制自定义线程池的大小。
  • 线程优先级:在某些场景下,你可能希望某些任务具有更高的优先级。
  • 线程命名:自定义线程命名可以帮助在日志中更容易地追踪线程活动。
  • 拒绝策略:当线程池和队列都满了时,自定义拒绝策略可以决定如何处理新的任务请求。
  • 生命周期管理:自定义线程池允许你更好地控制线程的生命周期,比如在应用程序停止时优雅地关闭线程池。

结果

使用Java多线程,处理100多万数据(仅更新一个字段),只用了 3分25秒;
同样的工作对比Laravel,用时1个小时,快了不知道多少倍。

Snipaste_2024-07-10_12-19-19

打赏不准超过你工资的一半!!!