SpringBoot项目:RedisTemplate实现轻量级消息队列(含代码)

  • 2020 年 1 月 14 日
  • 筆記

背景: 公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用ActiveMQ等消息队列中间件, 而redis的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发

一、本文涉及知识点

1、excel文件读写–阿里easyexcel sdk 2、文件上传、下载–腾讯云对象存储 3、远程服务调用–restTemplate 4、生产者、消费者–redisTemplate leftPush和rightPop操作 5、异步处理数据–Executors线程池 6、读取网络文件流–HttpClient 7、自定义注解实现用户身份认证–JWT token认证, 拦截器拦截标注有@LoginRequired注解的请求入口

当然, Java实现咯

涉及的知识点比较多, 每一个知识点都可以作为专题进行学习分析, 本文将完整实现呈现出来, 后期拆分与小伙伴分享学习。整编:微信公众号,搜云库技术团队,ID:souyunku

二、项目目录结构

项目结构

说明: 数据库DAO层放到另一个模块了, 不是本文重点

三、主要maven依赖

1、easyexcel

<easyexcel-latestVersion>1.1.2-beta4</easyexcel-latestVersion>    <dependency>      <groupId>com.alibaba</groupId>      <artifactId>easyexcel</artifactId>      <version>${easyexcel-latestVersion}</version>  </dependency>

2、JWT

<dependency>      <groupId>io.jsonwebtoken</groupId>      <artifactId>jjwt</artifactId>      <version>0.7.0</version>  </dependency>

3、redis

<dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-redis</artifactId>      <version>1.3.5.RELEASE</version>  </dependency>

4、腾讯cos

<dependency>      <groupId>com.qcloud</groupId>      <artifactId>cos_api</artifactId>      <version>5.4.5</version>  </dependency>

四、流程

1、用户上传文件 2、将文件存储到腾讯cos 3、将上传后的文件id及上传记录保存到数据库 4、redis生产一条导入消息, 即保存文件id到redis 5、请求结束, 返回"处理中"状态 6、redis消费消息 7、读取cos文件, 异步处理数据 8、将错误数据以excel形式上传至cos, 以供用户下载, 并更新处理状态为"处理完成" 9、客户端轮询查询处理状态, 并可以下载错误文件 10、结束

五、实现效果

1、上传文件

上传文件

2、数据库导入记录

数据库导入记录

3、导入的数据

导入的数据4、下载错误文件

下载错误文件

5、错误数据提示

错误数据提示

5、查询导入记录

查询导入记录

六、代码实现

1、导入excel控制层

@LoginRequired  @RequestMapping(value = "doImport", method = RequestMethod.POST)  public JsonResponse doImport(@RequestParam("file") MultipartFile file, HttpServletRequest request) {      PLUser user = getUser(request);      return orderImportService.doImport(file, user.getId());  }

2、service层

@Override      public JsonResponse doImport(MultipartFile file, Integer userId) {          if (null == file || file.isEmpty()) {              throw new ServiceException("文件不能为空");          }            String filename = file.getOriginalFilename();          if (!checkFileSuffix(filename)) {              throw new ServiceException("当前仅支持xlsx格式的excel");          }            // 存储文件          String fileId = saveToOss(file);          if (StringUtils.isBlank(fileId)) {              throw new ServiceException("文件上传失败, 请稍后重试");          }            // 保存记录到数据库          saveRecordToDB(userId, fileId, filename);            // 生产一条订单导入消息          redisProducer.produce(RedisKey.orderImportKey, fileId);            return JsonResponse.ok("导入成功, 处理中...");      }        /**       * 校验文件格式       * @param fileName       * @return       */      private static boolean checkFileSuffix(String fileName) {          if (StringUtils.isBlank(fileName) || fileName.lastIndexOf(".") <= 0) {              return false;          }            int pointIndex = fileName.lastIndexOf(".");          String suffix = fileName.substring(pointIndex, fileName.length()).toLowerCase();          if (".xlsx".equals(suffix)) {              return true;          }            return false;      }       /**       * 将文件存储到腾讯OSS       * @param file       * @return       */      private String saveToOss(MultipartFile file) {          InputStream ins = null;          try {              ins = file.getInputStream();          } catch (IOException e) {              e.printStackTrace();          }            String fileId;          try {              String originalFilename = file.getOriginalFilename();              File f = new File(originalFilename);              inputStreamToFile(ins, f);              FileSystemResource resource = new FileSystemResource(f);                MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();              param.add("file", resource);                ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);              fileId = (String) responseResult.getData();          } catch (Exception e) {              fileId = null;          }            return fileId;      }

3、redis生产者

@Service  public class RedisProducerImpl implements RedisProducer {        @Autowired      private RedisTemplate redisTemplate;        @Override      public JsonResponse produce(String key, String msg) {          Map<String, String> map = Maps.newHashMap();          map.put("fileId", msg);          redisTemplate.opsForList().leftPush(key, map);          return JsonResponse.ok();      }    }

4、redis消费者

@Service  public class RedisConsumer {        @Autowired      public RedisTemplate redisTemplate;        @Value("${txOssFileUrl}")      private String txOssFileUrl;        @Value("${txOssUploadUrl}")      private String txOssUploadUrl;        @PostConstruct      public void init() {          processOrderImport();      }        /**       * 处理订单导入       */      private void processOrderImport() {          ExecutorService executorService = Executors.newCachedThreadPool();          executorService.execute(() -> {              while (true) {                  Object object = redisTemplate.opsForList().rightPop(RedisKey.orderImportKey, 1, TimeUnit.SECONDS);                  if (null == object) {                      continue;                  }                  String msg = JSON.toJSONString(object);                  executorService.execute(new OrderImportTask(msg, txOssFileUrl, txOssUploadUrl));              }          });      }    }

5、处理任务线程类

public class OrderImportTask implements Runnable {      public OrderImportTask(String msg, String txOssFileUrl, String txOssUploadUrl) {          this.msg = msg;          this.txOssFileUrl = txOssFileUrl;          this.txOssUploadUrl = txOssUploadUrl;      }  }    /**   * 注入bean   */  private void autowireBean() {      this.restTemplate = BeanContext.getApplicationContext().getBean(RestTemplate.class);      this.transactionTemplate = BeanContext.getApplicationContext().getBean(TransactionTemplate.class);      this.orderImportService = BeanContext.getApplicationContext().getBean(OrderImportService.class);  }    @Override  public void run() {      // 注入bean      autowireBean();        JSONObject jsonObject = JSON.parseObject(msg);      String fileId = jsonObject.getString("fileId");        MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();      param.add("id", fileId);        ResponseResult responseResult = restTemplate.postForObject(txOssFileUrl, param, ResponseResult.class);      String fileUrl = (String) responseResult.getData();      if (StringUtils.isBlank(fileUrl)) {          return;      }        InputStream inputStream = HttpClientUtil.readFileFromURL(fileUrl);      List<Object> list = ExcelUtil.read(inputStream);      process(list, fileId);  }    /**   * 将文件上传至oss   * @param file   * @return   */  private String saveToOss(File file) {      String fileId;      try {          FileSystemResource resource = new FileSystemResource(file);          MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();          param.add("file", resource);            ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);          fileId = (String) responseResult.getData();      } catch (Exception e) {          fileId = null;      }      return fileId;  }

说明: 处理数据的业务逻辑代码就不用贴了

6、上传文件到cos

@RequestMapping("/txOssUpload")  @ResponseBody  public ResponseResult txOssUpload(@RequestParam("file") MultipartFile file) throws UnsupportedEncodingException {      if (null == file || file.isEmpty()) {          return ResponseResult.fail("文件不能为空");      }        String originalFilename = file.getOriginalFilename();      originalFilename = MimeUtility.decodeText(originalFilename);// 解决中文乱码问题      String contentType = getContentType(originalFilename);      String key;        InputStream ins = null;      File f = null;        try {          ins = file.getInputStream();          f = new File(originalFilename);          inputStreamToFile(ins, f);          key = iFileStorageClient.txOssUpload(new FileInputStream(f), originalFilename, contentType);      } catch (Exception e) {          return ResponseResult.fail(e.getMessage());      } finally {          if (null != ins) {              try {                  ins.close();              } catch (IOException e) {                  e.printStackTrace();              }          }          if (f.exists()) {// 删除临时文件              f.delete();          }      }        return ResponseResult.ok(key);  }    public static void inputStreamToFile(InputStream ins,File file) {      try {          OutputStream os = new FileOutputStream(file);          int bytesRead = 0;          byte[] buffer = new byte[8192];          while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {              os.write(buffer, 0, bytesRead);          }          os.close();          ins.close();      } catch (Exception e) {          e.printStackTrace();      }  }    public String txOssUpload(FileInputStream inputStream, String key, String contentType) {      key = Uuid.getUuid() + "-" + key;      OSSUtil.txOssUpload(inputStream, key, contentType);      try {          if (null != inputStream) {              inputStream.close();          }      } catch (IOException e) {          e.printStackTrace();      }      return key;  }    public static void txOssUpload(FileInputStream inputStream, String key, String contentType) {      ObjectMetadata objectMetadata = new ObjectMetadata();      try{          int length = inputStream.available();          objectMetadata.setContentLength(length);      }catch (Exception e){          logger.info(e.getMessage());      }      objectMetadata.setContentType(contentType);      cosclient.putObject(txbucketName, key, inputStream, objectMetadata);  }

7、下载文件

/**   * 腾讯云文件下载   * @param response   * @param id   * @return   */  @RequestMapping("/txOssDownload")  public Object txOssDownload(HttpServletResponse response, String id) {      COSObjectInputStream cosObjectInputStream = iFileStorageClient.txOssDownload(id, response);      String contentType = getContentType(id);      FileUtil.txOssDownload(response, contentType, cosObjectInputStream, id);      return null;  }    public static void txOssDownload(HttpServletResponse response, String contentType, InputStream fileStream, String fileName) {      FileOutputStream fos = null;      response.reset();      OutputStream os = null;      try {          response.setContentType(contentType + "; charset=utf-8");          if(!contentType.equals(PlConstans.FileContentType.image)){              try {                  response.setHeader("Content-Disposition", "attachment; filename=" + new String(fileName.getBytes("UTF-8"), "ISO8859-1"));              } catch (UnsupportedEncodingException e) {                  response.setHeader("Content-Disposition", "attachment; filename=" + fileName);                  logger.error("encoding file name failed", e);              }          }            os = response.getOutputStream();            byte[] b = new byte[1024 * 1024];          int len;          while ((len = fileStream.read(b)) > 0) {              os.write(b, 0, len);              os.flush();              try {                  if(fos != null) {                      fos.write(b, 0, len);                      fos.flush();                  }              } catch (Exception e) {                  logger.error(e.getMessage());              }          }      } catch (IOException e) {          IOUtils.closeQuietly(fos);          fos = null;      } finally {          IOUtils.closeQuietly(os);          IOUtils.closeQuietly(fileStream);          if(fos != null) {              IOUtils.closeQuietly(fos);          }      }  }

8、读取网络文件流

/**   * 读取网络文件流   * @param url   * @return   */  public static InputStream readFileFromURL(String url) {      if (StringUtils.isBlank(url)) {          return null;      }        HttpClient httpClient = new DefaultHttpClient();      HttpGet methodGet = new HttpGet(url);      try {          HttpResponse response = httpClient.execute(methodGet);          if (response.getStatusLine().getStatusCode() == 200) {              HttpEntity entity = response.getEntity();              return entity.getContent();          }      } catch (Exception e) {          e.printStackTrace();      }      return null;  }

9、ExcelUtil

/**       * 读excel       * @param inputStream 文件输入流       * @return list集合       */      public static List<Object> read(InputStream inputStream) {          return EasyExcelFactory.read(inputStream, new Sheet(1, 1));      }        /**       * 写excel       * @param data list数据       * @param clazz       * @param saveFilePath 文件保存路径       * @throws IOException       */      public static void write(List<? extends BaseRowModel> data, Class<? extends BaseRowModel> clazz, String saveFilePath) throws IOException {          File tempFile = new File(saveFilePath);          OutputStream out = new FileOutputStream(tempFile);          ExcelWriter writer = EasyExcelFactory.getWriter(out);          Sheet sheet = new Sheet(1, 3, clazz, "Sheet1", null);          writer.write(data, sheet);          writer.finish();          out.close();      }

说明: 至此, 整个流程算是完整了, 下面将其他知识点代码也贴出来参考

七、其他

1、@LoginRequired注解

/**   * 在需要登录验证的Controller的方法上使用此注解   */  @Target({ElementType.METHOD})  @Retention(RetentionPolicy.RUNTIME)  public @interface LoginRequired {  }

2、MyControllerAdvice

@ControllerAdvice  public class MyControllerAdvice {        @ResponseBody      @ExceptionHandler(TokenValidationException.class)      public JsonResponse tokenValidationExceptionHandler() {          return JsonResponse.loginInvalid();      }        @ResponseBody      @ExceptionHandler(ServiceException.class)      public JsonResponse serviceExceptionHandler(ServiceException se) {          return JsonResponse.fail(se.getMsg());      }        @ResponseBody      @ExceptionHandler(Exception.class)      public JsonResponse exceptionHandler(Exception e) {          e.printStackTrace();          return JsonResponse.fail(e.getMessage());      }    }

3、AuthenticationInterceptor

public class AuthenticationInterceptor implements HandlerInterceptor {        private static final String CURRENT_USER = "user";        @Autowired      private UserService userService;        @Override      public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {          // 如果不是映射到方法直接通过          if (!(handler instanceof HandlerMethod)) {              return true;          }          HandlerMethod handlerMethod = (HandlerMethod) handler;          Method method = handlerMethod.getMethod();            // 判断接口是否有@LoginRequired注解, 有则需要登录          LoginRequired methodAnnotation = method.getAnnotation(LoginRequired.class);          if (methodAnnotation != null) {              // 验证token              Integer userId = JwtUtil.verifyToken(request);              PLUser plUser = userService.selectByPrimaryKey(userId);              if (null == plUser) {                  throw new RuntimeException("用户不存在,请重新登录");              }              request.setAttribute(CURRENT_USER, plUser);              return true;          }          return true;      }        @Override      public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {      }        @Override      public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {      }  }

4、JwtUtil

public static final long EXPIRATION_TIME = 2592_000_000L; // 有效期30天  public static final String SECRET = "pl_token_secret";  public static final String HEADER = "token";  public static final String USER_ID = "userId";    /**   * 根据userId生成token   * @param userId   * @return   */  public static String generateToken(String userId) {      HashMap<String, Object> map = new HashMap<>();      map.put(USER_ID, userId);      String jwt = Jwts.builder()              .setClaims(map)              .setExpiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME))              .signWith(SignatureAlgorithm.HS512, SECRET)              .compact();      return jwt;  }    /**   * 验证token   * @param request   * @return 验证通过返回userId   */  public static Integer verifyToken(HttpServletRequest request) {      String token = request.getHeader(HEADER);      if (token != null) {          try {              Map<String, Object> body = Jwts.parser()                      .setSigningKey(SECRET)                      .parseClaimsJws(token)                      .getBody();                for (Map.Entry entry : body.entrySet()) {                  Object key = entry.getKey();                  Object value = entry.getValue();                  if (key.toString().equals(USER_ID)) {                      return Integer.valueOf(value.toString());// userId                  }              }              return null;          } catch (Exception e) {              logger.error(e.getMessage());              throw new TokenValidationException("unauthorized");          }      } else {          throw new TokenValidationException("missing token");      }  }

结语: OK, 搞定,睡了, 好困

作者:wangzaiplus

www.jianshu.com/p/0c684076367e