SpringBoot項目:RedisTemplate實現輕量級消息隊列(含程式碼)

  • 2020 年 1 月 14 日
  • 筆記

背景: 公司項目有個需求, 前端上傳excel文件, 後端讀取數據、處理數據、返回錯誤數據, 最簡單的方式同步處理, 客戶端上傳文件後一直阻塞等待響應, 但用戶體驗無疑很差, 處理數據可能十分耗時, 沒人願意傻等, 由於項目暫未使用ActiveMQ等消息隊列中間件, 而redis的lpush和rpop很適合作為一種輕量級的消息隊列實現, 所以用它完成此次功能開發

一、本文涉及知識點

1、excel文件讀寫–阿里easyexcel sdk 2、文件上傳、下載–騰訊雲對象存儲 3、遠程服務調用–restTemplate 4、生產者、消費者–redisTemplate leftPush和rightPop操作 5、非同步處理數據–Executors執行緒池 6、讀取網路文件流–HttpClient 7、自定義註解實現用戶身份認證–JWT token認證, 攔截器攔截標註有@LoginRequired註解的請求入口

當然, Java實現咯

涉及的知識點比較多, 每一個知識點都可以作為專題進行學習分析, 本文將完整實現呈現出來, 後期拆分與小夥伴分享學習。整編:微信公眾號,搜雲庫技術團隊,ID:souyunku

二、項目目錄結構

項目結構

說明: 資料庫DAO層放到另一個模組了, 不是本文重點

三、主要maven依賴

1、easyexcel

<easyexcel-latestVersion>1.1.2-beta4</easyexcel-latestVersion>    <dependency>      <groupId>com.alibaba</groupId>      <artifactId>easyexcel</artifactId>      <version>${easyexcel-latestVersion}</version>  </dependency>

2、JWT

<dependency>      <groupId>io.jsonwebtoken</groupId>      <artifactId>jjwt</artifactId>      <version>0.7.0</version>  </dependency>

3、redis

<dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-redis</artifactId>      <version>1.3.5.RELEASE</version>  </dependency>

4、騰訊cos

<dependency>      <groupId>com.qcloud</groupId>      <artifactId>cos_api</artifactId>      <version>5.4.5</version>  </dependency>

四、流程

1、用戶上傳文件 2、將文件存儲到騰訊cos 3、將上傳後的文件id及上傳記錄保存到資料庫 4、redis生產一條導入消息, 即保存文件id到redis 5、請求結束, 返回"處理中"狀態 6、redis消費消息 7、讀取cos文件, 非同步處理數據 8、將錯誤數據以excel形式上傳至cos, 以供用戶下載, 並更新處理狀態為"處理完成" 9、客戶端輪詢查詢處理狀態, 並可以下載錯誤文件 10、結束

五、實現效果

1、上傳文件

上傳文件

2、資料庫導入記錄

資料庫導入記錄

3、導入的數據

導入的數據4、下載錯誤文件

下載錯誤文件

5、錯誤數據提示

錯誤數據提示

5、查詢導入記錄

查詢導入記錄

六、程式碼實現

1、導入excel控制層

@LoginRequired  @RequestMapping(value = "doImport", method = RequestMethod.POST)  public JsonResponse doImport(@RequestParam("file") MultipartFile file, HttpServletRequest request) {      PLUser user = getUser(request);      return orderImportService.doImport(file, user.getId());  }

2、service層

@Override      public JsonResponse doImport(MultipartFile file, Integer userId) {          if (null == file || file.isEmpty()) {              throw new ServiceException("文件不能為空");          }            String filename = file.getOriginalFilename();          if (!checkFileSuffix(filename)) {              throw new ServiceException("當前僅支援xlsx格式的excel");          }            // 存儲文件          String fileId = saveToOss(file);          if (StringUtils.isBlank(fileId)) {              throw new ServiceException("文件上傳失敗, 請稍後重試");          }            // 保存記錄到資料庫          saveRecordToDB(userId, fileId, filename);            // 生產一條訂單導入消息          redisProducer.produce(RedisKey.orderImportKey, fileId);            return JsonResponse.ok("導入成功, 處理中...");      }        /**       * 校驗文件格式       * @param fileName       * @return       */      private static boolean checkFileSuffix(String fileName) {          if (StringUtils.isBlank(fileName) || fileName.lastIndexOf(".") <= 0) {              return false;          }            int pointIndex = fileName.lastIndexOf(".");          String suffix = fileName.substring(pointIndex, fileName.length()).toLowerCase();          if (".xlsx".equals(suffix)) {              return true;          }            return false;      }       /**       * 將文件存儲到騰訊OSS       * @param file       * @return       */      private String saveToOss(MultipartFile file) {          InputStream ins = null;          try {              ins = file.getInputStream();          } catch (IOException e) {              e.printStackTrace();          }            String fileId;          try {              String originalFilename = file.getOriginalFilename();              File f = new File(originalFilename);              inputStreamToFile(ins, f);              FileSystemResource resource = new FileSystemResource(f);                MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();              param.add("file", resource);                ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);              fileId = (String) responseResult.getData();          } catch (Exception e) {              fileId = null;          }            return fileId;      }

3、redis生產者

@Service  public class RedisProducerImpl implements RedisProducer {        @Autowired      private RedisTemplate redisTemplate;        @Override      public JsonResponse produce(String key, String msg) {          Map<String, String> map = Maps.newHashMap();          map.put("fileId", msg);          redisTemplate.opsForList().leftPush(key, map);          return JsonResponse.ok();      }    }

4、redis消費者

@Service  public class RedisConsumer {        @Autowired      public RedisTemplate redisTemplate;        @Value("${txOssFileUrl}")      private String txOssFileUrl;        @Value("${txOssUploadUrl}")      private String txOssUploadUrl;        @PostConstruct      public void init() {          processOrderImport();      }        /**       * 處理訂單導入       */      private void processOrderImport() {          ExecutorService executorService = Executors.newCachedThreadPool();          executorService.execute(() -> {              while (true) {                  Object object = redisTemplate.opsForList().rightPop(RedisKey.orderImportKey, 1, TimeUnit.SECONDS);                  if (null == object) {                      continue;                  }                  String msg = JSON.toJSONString(object);                  executorService.execute(new OrderImportTask(msg, txOssFileUrl, txOssUploadUrl));              }          });      }    }

5、處理任務執行緒類

public class OrderImportTask implements Runnable {      public OrderImportTask(String msg, String txOssFileUrl, String txOssUploadUrl) {          this.msg = msg;          this.txOssFileUrl = txOssFileUrl;          this.txOssUploadUrl = txOssUploadUrl;      }  }    /**   * 注入bean   */  private void autowireBean() {      this.restTemplate = BeanContext.getApplicationContext().getBean(RestTemplate.class);      this.transactionTemplate = BeanContext.getApplicationContext().getBean(TransactionTemplate.class);      this.orderImportService = BeanContext.getApplicationContext().getBean(OrderImportService.class);  }    @Override  public void run() {      // 注入bean      autowireBean();        JSONObject jsonObject = JSON.parseObject(msg);      String fileId = jsonObject.getString("fileId");        MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();      param.add("id", fileId);        ResponseResult responseResult = restTemplate.postForObject(txOssFileUrl, param, ResponseResult.class);      String fileUrl = (String) responseResult.getData();      if (StringUtils.isBlank(fileUrl)) {          return;      }        InputStream inputStream = HttpClientUtil.readFileFromURL(fileUrl);      List<Object> list = ExcelUtil.read(inputStream);      process(list, fileId);  }    /**   * 將文件上傳至oss   * @param file   * @return   */  private String saveToOss(File file) {      String fileId;      try {          FileSystemResource resource = new FileSystemResource(file);          MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();          param.add("file", resource);            ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);          fileId = (String) responseResult.getData();      } catch (Exception e) {          fileId = null;      }      return fileId;  }

說明: 處理數據的業務邏輯程式碼就不用貼了

6、上傳文件到cos

@RequestMapping("/txOssUpload")  @ResponseBody  public ResponseResult txOssUpload(@RequestParam("file") MultipartFile file) throws UnsupportedEncodingException {      if (null == file || file.isEmpty()) {          return ResponseResult.fail("文件不能為空");      }        String originalFilename = file.getOriginalFilename();      originalFilename = MimeUtility.decodeText(originalFilename);// 解決中文亂碼問題      String contentType = getContentType(originalFilename);      String key;        InputStream ins = null;      File f = null;        try {          ins = file.getInputStream();          f = new File(originalFilename);          inputStreamToFile(ins, f);          key = iFileStorageClient.txOssUpload(new FileInputStream(f), originalFilename, contentType);      } catch (Exception e) {          return ResponseResult.fail(e.getMessage());      } finally {          if (null != ins) {              try {                  ins.close();              } catch (IOException e) {                  e.printStackTrace();              }          }          if (f.exists()) {// 刪除臨時文件              f.delete();          }      }        return ResponseResult.ok(key);  }    public static void inputStreamToFile(InputStream ins,File file) {      try {          OutputStream os = new FileOutputStream(file);          int bytesRead = 0;          byte[] buffer = new byte[8192];          while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {              os.write(buffer, 0, bytesRead);          }          os.close();          ins.close();      } catch (Exception e) {          e.printStackTrace();      }  }    public String txOssUpload(FileInputStream inputStream, String key, String contentType) {      key = Uuid.getUuid() + "-" + key;      OSSUtil.txOssUpload(inputStream, key, contentType);      try {          if (null != inputStream) {              inputStream.close();          }      } catch (IOException e) {          e.printStackTrace();      }      return key;  }    public static void txOssUpload(FileInputStream inputStream, String key, String contentType) {      ObjectMetadata objectMetadata = new ObjectMetadata();      try{          int length = inputStream.available();          objectMetadata.setContentLength(length);      }catch (Exception e){          logger.info(e.getMessage());      }      objectMetadata.setContentType(contentType);      cosclient.putObject(txbucketName, key, inputStream, objectMetadata);  }

7、下載文件

/**   * 騰訊雲文件下載   * @param response   * @param id   * @return   */  @RequestMapping("/txOssDownload")  public Object txOssDownload(HttpServletResponse response, String id) {      COSObjectInputStream cosObjectInputStream = iFileStorageClient.txOssDownload(id, response);      String contentType = getContentType(id);      FileUtil.txOssDownload(response, contentType, cosObjectInputStream, id);      return null;  }    public static void txOssDownload(HttpServletResponse response, String contentType, InputStream fileStream, String fileName) {      FileOutputStream fos = null;      response.reset();      OutputStream os = null;      try {          response.setContentType(contentType + "; charset=utf-8");          if(!contentType.equals(PlConstans.FileContentType.image)){              try {                  response.setHeader("Content-Disposition", "attachment; filename=" + new String(fileName.getBytes("UTF-8"), "ISO8859-1"));              } catch (UnsupportedEncodingException e) {                  response.setHeader("Content-Disposition", "attachment; filename=" + fileName);                  logger.error("encoding file name failed", e);              }          }            os = response.getOutputStream();            byte[] b = new byte[1024 * 1024];          int len;          while ((len = fileStream.read(b)) > 0) {              os.write(b, 0, len);              os.flush();              try {                  if(fos != null) {                      fos.write(b, 0, len);                      fos.flush();                  }              } catch (Exception e) {                  logger.error(e.getMessage());              }          }      } catch (IOException e) {          IOUtils.closeQuietly(fos);          fos = null;      } finally {          IOUtils.closeQuietly(os);          IOUtils.closeQuietly(fileStream);          if(fos != null) {              IOUtils.closeQuietly(fos);          }      }  }

8、讀取網路文件流

/**   * 讀取網路文件流   * @param url   * @return   */  public static InputStream readFileFromURL(String url) {      if (StringUtils.isBlank(url)) {          return null;      }        HttpClient httpClient = new DefaultHttpClient();      HttpGet methodGet = new HttpGet(url);      try {          HttpResponse response = httpClient.execute(methodGet);          if (response.getStatusLine().getStatusCode() == 200) {              HttpEntity entity = response.getEntity();              return entity.getContent();          }      } catch (Exception e) {          e.printStackTrace();      }      return null;  }

9、ExcelUtil

/**       * 讀excel       * @param inputStream 文件輸入流       * @return list集合       */      public static List<Object> read(InputStream inputStream) {          return EasyExcelFactory.read(inputStream, new Sheet(1, 1));      }        /**       * 寫excel       * @param data list數據       * @param clazz       * @param saveFilePath 文件保存路徑       * @throws IOException       */      public static void write(List<? extends BaseRowModel> data, Class<? extends BaseRowModel> clazz, String saveFilePath) throws IOException {          File tempFile = new File(saveFilePath);          OutputStream out = new FileOutputStream(tempFile);          ExcelWriter writer = EasyExcelFactory.getWriter(out);          Sheet sheet = new Sheet(1, 3, clazz, "Sheet1", null);          writer.write(data, sheet);          writer.finish();          out.close();      }

說明: 至此, 整個流程算是完整了, 下面將其他知識點程式碼也貼出來參考

七、其他

1、@LoginRequired註解

/**   * 在需要登錄驗證的Controller的方法上使用此註解   */  @Target({ElementType.METHOD})  @Retention(RetentionPolicy.RUNTIME)  public @interface LoginRequired {  }

2、MyControllerAdvice

@ControllerAdvice  public class MyControllerAdvice {        @ResponseBody      @ExceptionHandler(TokenValidationException.class)      public JsonResponse tokenValidationExceptionHandler() {          return JsonResponse.loginInvalid();      }        @ResponseBody      @ExceptionHandler(ServiceException.class)      public JsonResponse serviceExceptionHandler(ServiceException se) {          return JsonResponse.fail(se.getMsg());      }        @ResponseBody      @ExceptionHandler(Exception.class)      public JsonResponse exceptionHandler(Exception e) {          e.printStackTrace();          return JsonResponse.fail(e.getMessage());      }    }

3、AuthenticationInterceptor

public class AuthenticationInterceptor implements HandlerInterceptor {        private static final String CURRENT_USER = "user";        @Autowired      private UserService userService;        @Override      public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {          // 如果不是映射到方法直接通過          if (!(handler instanceof HandlerMethod)) {              return true;          }          HandlerMethod handlerMethod = (HandlerMethod) handler;          Method method = handlerMethod.getMethod();            // 判斷介面是否有@LoginRequired註解, 有則需要登錄          LoginRequired methodAnnotation = method.getAnnotation(LoginRequired.class);          if (methodAnnotation != null) {              // 驗證token              Integer userId = JwtUtil.verifyToken(request);              PLUser plUser = userService.selectByPrimaryKey(userId);              if (null == plUser) {                  throw new RuntimeException("用戶不存在,請重新登錄");              }              request.setAttribute(CURRENT_USER, plUser);              return true;          }          return true;      }        @Override      public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {      }        @Override      public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {      }  }

4、JwtUtil

public static final long EXPIRATION_TIME = 2592_000_000L; // 有效期30天  public static final String SECRET = "pl_token_secret";  public static final String HEADER = "token";  public static final String USER_ID = "userId";    /**   * 根據userId生成token   * @param userId   * @return   */  public static String generateToken(String userId) {      HashMap<String, Object> map = new HashMap<>();      map.put(USER_ID, userId);      String jwt = Jwts.builder()              .setClaims(map)              .setExpiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME))              .signWith(SignatureAlgorithm.HS512, SECRET)              .compact();      return jwt;  }    /**   * 驗證token   * @param request   * @return 驗證通過返回userId   */  public static Integer verifyToken(HttpServletRequest request) {      String token = request.getHeader(HEADER);      if (token != null) {          try {              Map<String, Object> body = Jwts.parser()                      .setSigningKey(SECRET)                      .parseClaimsJws(token)                      .getBody();                for (Map.Entry entry : body.entrySet()) {                  Object key = entry.getKey();                  Object value = entry.getValue();                  if (key.toString().equals(USER_ID)) {                      return Integer.valueOf(value.toString());// userId                  }              }              return null;          } catch (Exception e) {              logger.error(e.getMessage());              throw new TokenValidationException("unauthorized");          }      } else {          throw new TokenValidationException("missing token");      }  }

結語: OK, 搞定,睡了, 好睏

作者:wangzaiplus

www.jianshu.com/p/0c684076367e