Spring Boot实现异步任务

Spring Boot中的另外一个任务:异步任务。所谓异步任务,其实就是异步执行程序,有些时候遇到一些耗时的的任务,如果一直卡等待,肯定会影响其他程序的执行,所以就让这些程序需要以异步的方式去执行。那么下面就来介绍Spring Boot 如何实现异步任务。

一、使用注解@EnableAsync 开启异步调用方法

在application启动类中,加上@EnableAsync注解,Spring Boot 会自动扫描异步任务。

package com.weiz;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import tk.mybatis.spring.annotation.MapperScan;

@SpringBootApplication
//扫描 mybatis mapper 包路径
@MapperScan(basePackages = "com.weiz.mapper")
//扫描 所有需要的包, 包含一些自用的工具类包 所在的路径
@ComponentScan(basePackages = {"com.weiz","org.n3r.idworker"})
//开启定时任务
@EnableScheduling
//开启异步调用方法
@EnableAsync
public class SpringBootStarterApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringBootStarterApplication.class, args);
    }

}

二、创建异步执行类,定义@Component及@Async组件

创建com.weiz.tasks包,在tasks包里增加AsyncTask 异步任务类,加上@Component 注解,然后在需要异步执行的方法前面加上@Async注解,这样Spring Boot容器扫描到相关异步方法之后,调用时就会将这些方法异步执行。

package com.weiz.tasks;

import java.util.concurrent.Future;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;

@Component
public class AsyncTask {
    
    @Async
    public Future<Boolean> doTask11() throws Exception {
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        System.out.println("任务1耗时:" + (end - start) + "毫秒");
        return new AsyncResult<>(true);
    }
    
    @Async
    public Future<Boolean> doTask22() throws Exception {
        long start = System.currentTimeMillis();
        Thread.sleep(700);
        long end = System.currentTimeMillis();
        System.out.println("任务2耗时:" + (end - start) + "毫秒");
        return new AsyncResult<>(true);
    }
    
    @Async
    public Future<Boolean> doTask33() throws Exception {
        long start = System.currentTimeMillis();
        Thread.sleep(600);
        long end = System.currentTimeMillis();
        System.out.println("任务3耗时:" + (end - start) + "毫秒");
        return new AsyncResult<>(true); 
    }
}

说明:@Async 加上这个注解,就表示该方法是异步执行方法。

三、调用

创建一个DoTask调用类,我们看看这几个方法,是怎么执行的:

package com.weiz.tasks;

import java.util.concurrent.Future;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("tasks")
public class DoTask {

    @Autowired
    private AsyncTask asyncTask;

    @RequestMapping("test1")
    public String test1() throws Exception {

        long start = System.currentTimeMillis();

        Future<Boolean> a = asyncTask.doTask11();
        Future<Boolean> b = asyncTask.doTask22();
        Future<Boolean> c = asyncTask.doTask33();

        while (!a.isDone() || !b.isDone() || !c.isDone()) {
            if (a.isDone() && b.isDone() && c.isDone()) {
                break;
            }
        }

        long end = System.currentTimeMillis();

        String times = "任务全部完成,总耗时:" + (end - start) + "毫秒";
        System.out.println(times);

        return times;
    }
}

继续阅读Spring Boot实现异步任务

SpringBoot如何实现一个实时更新的进度条的示例代码

前言

导入excel表格批量修改状态,期间如果发生错误则所有数据不成功,为了防止重复提交,做一个类似进度条的东东。

那么下面我会结合实际业务对这个功能进行分析和记录。

正文

思路

前端使用bootstrap,后端使用SpringBoot分布式到注册中心,原先的想法是导入表格后异步调用修改数据状态的方法,然后每次计算修改的进度然后存放在session中,前台jquery写定时任务访问获取session中的进度,更新进度条进度和百分比。但是这存在session在服务间不共享,跨域问题。那么换一个方式存放,存放在redis中,前台定时任务直接操作获取redis的数据。

实施
进度条

先来看一下bootstrap的进度条

<div class="progress progress-striped active">
  <div class="progress-bar progress-bar-success" role="progressbar"
     aria-valuenow="60" aria-valuemin="0" aria-valuemax="100"
     style="width: 40%;">
    40%
  </div>
</div>

 进度条更新主要更新style="width: 40%;"的值即可,div里面的40%可以省略,无非时看着明确。

可以考虑将进度条放入弹出层。

定时任务
//点击确认导入执行此方法
function bulkImportChanges() {
  //获取批量操作状态文件
  var files = $("#importChanges").prop("files");
  var changesFile = files[0];
  var formData = new FormData();
  formData.append("importFile",changesFile);
  $.ajax({
    type : 'post',
    url : "/risk/bulk***es",
    data : formData,
    processData : false,   //文件ajax上传要加这两个的,要不然上传不了
    contentType : false,   //
    success : function(obj) {
      //导入成功
      if (obj.rspCode == "00") {
        //定时任务获取redis导入修改进度
        var progress = "";
        var timingTask = setInterval(function(){
          $.ajax({
            type: 'post',
            url: "/risk/t***k",
            dataType : 'json',
            success: function(result) {
              progress = result.value;
              if (progress != "error"){
                var date = progress.substring(0,6);
                //这里更新进度条的进度和数据
                $(".progress-bar").width(parseFloat(date)+"%");
                $(".progress-bar").text(parseFloat(date)+"%");
              }
            }
          });
          //导入修改完成或异常(停止定时任务)
          if (parseInt(progress)==100 || progress == "error") {
            //清除定时执行
            clearInterval(timingTask);
            $.ajax({
              type: 'post',
              url: "/risk/de***ess",
              dataType : 'json',
              success: function(result) {
                $("#bulkImportChangesProcessor").hide();
                if (parseInt(progress) == 100) {
                  alert("批量导入修改状态成功");
                }
                if (progress == "error") {
                  alert("批量导入修改状态失败");
                }
                //获取最新数据
                window.location.href="/risk/re***ByParam" rel="external nofollow" rel="external nofollow" ;
              }
            });
          }
        }, 1000)
      }else {
        $("#bulkImportChangesProcessor").hide();
        alert(obj.rspMsg);
        window.location.href="/risk/re***ByParam" rel="external nofollow" rel="external nofollow" ;
      }
    }
  });
}

解释:点击确认导入文件后成功后开启定时任务每一秒(一千毫秒)访问一次后台获取redis存放的进度,返回更新进度条,如果更新完成或者更新失败(根据后台返回的数据决定)则停止定时任务显示相应的信息并刷新页面。获取最新数据。

后台控制层
/**
 * 退单管理批量修改状态导入文件
 * @param importFile
 * @return
 */
 @ResponseBody
 @RequestMapping("/bulk***es")
 public Map<String,Object> bulk***es(MultipartFile importFile){
   log.info("退单管理批量修改状态导入文件,传入参数:"+importFile);
 Map<String,Object> map = new HashMap<>();
 List<Bulk***esEntity> fromExcel = null;
 try{
      //使用工具类导入转成list
  String[] header = {"sy***um","t***mt","ha***ult","re***nd","sy***nd","r**k"};
  fromExcel = importExcelUtil.importDataFromExcel(importFile, header, BulkImportChangesEntity.class);
  if (fromExcel.size()==0){
  map.put("rspCode","99");
  map.put("rspMsg","导入数据不能为空");
  return map;
  }
 }catch (Exception e){
  map.put("rspCode","99");
  map.put("rspMsg","导入操作表失败,请注意数据列格式");
  return map;
 }
 try {
  //这里会对list集合中的数据进行处理

  log.info("调用服务开始,参数:"+JSON.toJSONString(fromExcel));
  //String url = p4_zuul_url+"/***/ri***eat/bu***nges";
  String url = p4_zuul_url+"/***-surpass/ri***eat/bu***nges";
  String result = HttpClientUtil.doPost(url,JSON.toJSONString(fromExcel));
  log.info("调用服务结束,返回数据:"+result);
  if (result != null){
  map = JSONObject.parseObject(result, Map.class);
  log.info("批量修改状态导入:"+JSON.toJSONString(map));
  }
 }catch (Exception e){
  map.put("rspCode","99");
  map.put("rspMsg","导入操作表失败");
  log.info("bu***es exception",e);
  return map;
 }
 return map;
 }

 /**
 * 获取退单管理批量修改状态导入文件进度条进度
 * @return
 */
 @ResponseBody
 @RequestMapping("/t***sk")
 public Map<String,Object> t***sk(){
 Map<String,Object> map = new HashMap<>();
    //获取redis值
 String progress = HttpClientUtil.doGet(
  p4_zuul_url + "/" + p4_redis + "/redis***ler/get?key=progressSchedule");
 if (progress != null){
  map = JSONObject.parseObject(progress, Map.class);
  log.info("进度条进度:"+JSON.toJSONString(map));
  map.put("progressSchedule",progress);
 }else {
  HttpClientUtil.doGet(
   p4_zuul_url + "/" + p4_redis + "/redis***ler/del?key=progressSchedule");
 }
 return map;
 }

 /**
 * 清除redis进度条进度
 * @return
 */
 @ResponseBody
 @RequestMapping("/de***ess")
 public Map<String,Object> de***ess(){
 Map<String,Object> map = new HashMap<>();
 String progress = HttpClientUtil.doGet(
  p4_zuul_url + "/" + p4_redis + "/redis***ler/del?key=progressSchedule");
 if (progress != null){
  map = JSONObject.parseObject(progress, Map.class);
  log.info("返回数据:"+JSON.toJSONString(map));
 }
 return map;
 }

导入时调用第一个bulk***es方法,定时任务调用t***sk方法,导入完成或发生错误调用de***ess方法删除redis数据,避免占用资源。

服务层
@Async//开启异步
 @Transactional(rollbackFor = Exception.class)//事务回滚级别
 @Override
 public void bulkImportChanges(List<BulkImportChangesParam> list) {
 //初始化进度
 Double progressBarSchedule = 0.0;
 redisClient.set("progressSchedule", progressBarSchedule + "");//存redis
 try {
  for (int i = 1; i <= list.size(); i++) {
  RiskRetreatEntity entity = riskRetreatMapper.selectRetreatListBySysRefNum(list.get(i-1).getSysRefNum());
  if (entity == null){
   //查询结果为空直接进行下次循环不抛出
   continue;
  }
  //实体封装
        ···
        //更新
  riskRetreatMapper.updateRetreatByImport(entity);
  //计算修改进度并存放redis保存(1.0 / list.size())为一条数据进度
  progressBarSchedule = (1.0 / list.size()) * i*100;
  redisClient.set("progressSchedule", progressBarSchedule+"");
  if (i==list.size()){
   redisClient.set("progressSchedule", "100");
  }
  }
 }catch (Exception e){
  //当发生错误则清除缓存直接抛出回滚
  redisClient.set("progressSchedule","error");
  log.info("导入更新错误,回滚");
  log.info("bulkImportChanges exception:",e);
  throw e;
 }
 }

每更新一条数据存放进度,当发生错误则进行回滚。如果开启异步则需要在启动类添加注解@EnableAsync。

@EnableAsync
···//其他注解
public class Application {
  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }
}

继续阅读SpringBoot如何实现一个实时更新的进度条的示例代码