聊聊並發編程的12種業務場景

前言

並發編程是一項非常重要的技術,無論在面試,還是工作中出現的頻率非常高。

之前我發表的一篇《聊聊並發編程的10個坑》,在全網廣受好評。說明了這類文章還是比較有價值的,接下來,打算繼續聊聊並發編程這個話題。

並發編程說白了就是多執行緒編程,但多執行緒一定比單執行緒效率更高?

答:不一定,要看具體業務場景。

畢竟如果使用了多執行緒,那麼執行緒之間的競爭和搶佔cpu資源,執行緒的上下文切換,也是相對來說比較耗時的操作。

下面這幾個問題在面試中,你必定遇到過:

  1. 你在哪來業務場景中使用過多執行緒?
  2. 怎麼用的?
  3. 踩過哪些坑?

今天聊聊我之前在項目中用並發編程的12種業務場景,給有需要的朋友一個參考。

1. 簡單定時任務

各位親愛的朋友,你沒看錯,Thread類真的能做定時任務。如果你看過一些定時任務框架的源碼,你最後會發現,它們的底層也會使用Thread類。

實現這種定時任務的具體程式碼如下:

public static void init() {
    new Thread(() -> {
        while (true) {
            try {
                System.out.println("下載文件");
                Thread.sleep(1000 * 60 * 5);
            } catch (Exception e) {
                log.error(e);
            }
        }
    }).start();
}

使用Thread類可以做最簡單的定時任務,在run方法中有個while的死循環(當然還有其他方式),執行我們自己的任務。有個需要特別注意的地方是,需要用try...catch捕獲異常,否則如果出現異常,就直接退出循環,下次將無法繼續執行了。

但這種方式做的定時任務,只能周期性執行,不能支援定時在某個時間點執行。

特別提醒一下,該執行緒建議定義成守護執行緒,可以通過setDaemon方法設置,讓它在後台默默執行就好。

使用場景:比如項目中有時需要每隔5分鐘去下載某個文件,或者每隔10分鐘去讀取模板文件生成靜態html頁面等等,一些簡單的周期性任務場景。

使用Thread類做定時任務的優缺點:

  • 優點:這種定時任務非常簡單,學習成本低,容易入手,對於那些簡單的周期性任務,是個不錯的選擇。

  • 缺點:不支援指定某個時間點執行任務,不支援延遲執行等操作,功能過於單一,無法應對一些較為複雜的場景。

2.監聽器

有時候,我們需要寫個監聽器,去監聽某些數據的變化。

比如:我們在使用canal的時候,需要監聽binlog的變化,能夠及時把資料庫中的數據,同步到另外一個業務資料庫中。


如果直接寫一個監聽器去監聽數據就太沒意思了,我們想實現這樣一個功能:在配置中心有個開關,配置監聽器是否開啟,如果開啟了使用單執行緒非同步執行。

主要程式碼如下:

@Service
public CanalService {
    private volatile boolean running = false;
    private Thread thread;

    @Autowired
    private CanalConnector canalConnector;
    
    public void handle() {
        //連接canal
        while(running) {
           //業務處理
        }
    }
    
    public void start() {
       thread = new Thread(this::handle, "name");
       running = true;
       thread.start();
    }
    
    public void stop() {
       if(!running) {
          return;
       }
       running = false;
    }
}

在start方法中開啟了一個執行緒,在該執行緒中非同步執行handle方法的具體任務。然後通過調用stop方法,可以停止該執行緒。

其中,使用volatile關鍵字控制的running變數作為開關,它可以控制執行緒中的狀態。

接下來,有個比較關鍵的點是:如何通過配置中心的配置,控制這個開關呢?

apollo配置為例,我們在配置中心的後台,修改配置之後,自動獲取最新配置的核心程式碼如下:

public class CanalConfig {
    @Autowired
    private CanalService canalService;

    @ApolloConfigChangeListener
    public void change(ConfigChangeEvent event) {
        String value = event.getChange("test.canal.enable").getNewValue();
        if(BooleanUtils.toBoolean(value)) {
            canalService.start();
        } else {
            canalService.stop();
        }
    }
}

通過apolloApolloConfigChangeListener註解,可以監聽配置參數的變化。

如果test.canal.enable開關配置的true,則調用canalService類的start方法開啟canal數據同步功能。如果開關配置的false,則調用canalService類的stop方法,自動停止canal數據同步功能。

3.收集日誌

在某些高並發的場景中,我們需要收集部分用戶的日誌(比如:用戶登錄的日誌),寫到資料庫中,以便於做分析。

但由於項目中,還沒有引入消息中間件,比如:kafkarocketmq等。

如果直接將日誌同步寫入資料庫,可能會影響介面性能。

所以,大家很自然想到了非同步處理。

實現這個需求最簡單的做法是,開啟一個執行緒,非同步寫入數據到資料庫即可。

這樣做,可以是可以。

但如果用戶登錄操作的耗時,比非同步寫入資料庫的時間要少得多。這樣導致的結果是:生產日誌的速度,比消費日誌的速度要快得多,最終的性能瓶頸在消費端。

其實,還有更優雅的處理方式,雖說沒有使用消息中間件,但借用了它的思想。

這套記錄登錄日誌的功能,分為:日誌生產端、日誌存儲端和日誌消費端。

如下圖所示:

先定義了一個阻塞隊列。

@Component
public class LoginLogQueue {
    private static final int QUEUE_MAX_SIZE    = 1000;

    private BlockingQueueblockingQueue queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);

    //生成消息
    public boolean push(LoginLog loginLog) {
        return this.queue.add(loginLog);
    } 

    //消費消息
    public LoginLog poll() {
        LoginLog loginLog = null;
        try {
            loginLog = this.queue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }
}

然後定義了一個日誌的生產者。

@Service
public class LoginSerivce {
    
    @Autowired
    private LoginLogQueue loginLogQueue;

    public int login(UserInfo userInfo) {
        //業務處理
        LoginLog loginLog = convert(userInfo);
        loginLogQueue.push(loginLog);
    }  
}

接下來,定義了日誌的消費者。

@Service
public class LoginInfoConsumer {
    @Autowired
    private LoginLogQueue queue;

    @PostConstruct
    public voit init {
       new Thread(() -> {
          while (true) {
              LoginLog loginLog = queue.take();
              //寫入資料庫
          }
        }).start();
    }
}

當然,這個例子中使用單執行緒接收登錄日誌,為了提升性能,也可以使用執行緒池來處理業務邏輯(比如:寫入資料庫)等。

4.excel導入

我們可能會經常收到運營同學提過來的excel數據導入需求,比如:將某一大類下的所有子類一次性導入系統,或者導入一批新的供應商數據等等。

我們以導入供應商數據為例,它所涉及的業務流程很長,比如:

  1. 調用天眼查介面校驗企業名稱和統一社會信用程式碼。
  2. 寫入供應商基本表
  3. 寫入組織表
  4. 給供應商自動創建一個用戶
  5. 給該用戶分配許可權
  6. 自定義域名
  7. 發站內通知

等等。

如果在程式中,解析完excel,讀取了所有數據之後。用單執行緒一條條處理業務邏輯,可能耗時會非常長。

為了提升excel數據導入效率,非常有必要使用多執行緒來處理。

當然在java中實現多執行緒的手段有很多種,下面重點聊聊java8中最簡單的實現方式:parallelStream

偽程式碼如下:

supplierList.parallelStream().forEach(x -> importSupplier(x));

parallelStream是一個並行執行的流,它默認通過ForkJoinPool實現的,能提高你的多執行緒任務的速度。

ForkJoinPool處理的過程會分而治之,它的核心思想是:將一個大任務切分成多個小任務。每個小任務都能單獨執行,最後它會把所用任務的執行結果進行匯總。

下面用一張圖簡單介紹一下ForkJoinPool的原理:

當然除了excel導入之外,還有類似的讀取文本文件,也可以用類似的方法處理。

溫馨的提醒一下,如果一次性導入的數據非常多,用多執行緒處理,可能會使系統的cpu使用率飆升,需要特別關注。

5.查詢介面

很多時候,我們需要在某個查詢介面中,調用其他服務的介面,組合數據之後,一起返回。

比如有這樣的業務場景:

在用戶資訊查詢介面中需要返回:用戶名稱、性別、等級、頭像、積分、成長值等資訊。

而用戶名稱、性別、等級、頭像在用戶服務中,積分在積分服務中,成長值在成長值服務中。為了匯總這些數據統一返回,需要另外提供一個對外介面服務。

於是,用戶資訊查詢介面需要調用用戶查詢介面、積分查詢介面 和 成長值查詢介面,然後匯總數據統一返回。

調用過程如下圖所示:

調用遠程介面總耗時 530ms = 200ms + 150ms + 180ms

顯然這種串列調用遠程介面性能是非常不好的,調用遠程介面總的耗時為所有的遠程介面耗時之和。

那麼如何優化遠程介面性能呢?

既然串列調用多個遠程介面性能很差,為什麼不改成並行呢?

如下圖所示:

調用遠程介面總耗時 200ms = 200ms(即耗時最長的那次遠程介面調用)

在java8之前可以通過實現Callable介面,獲取執行緒返回結果。

java8以後通過CompleteFuture類實現該功能。我們這裡以CompleteFuture為例:

public UserInfo getUserInfo(Long id) throws InterruptedException, ExecutionException {
    final UserInfo userInfo = new UserInfo();
    CompletableFuture userFuture = CompletableFuture.supplyAsync(() -> {
        getRemoteUserAndFill(id, userInfo);
        return Boolean.TRUE;
    }, executor);

    CompletableFuture bonusFuture = CompletableFuture.supplyAsync(() -> {
        getRemoteBonusAndFill(id, userInfo);
        return Boolean.TRUE;
    }, executor);

    CompletableFuture growthFuture = CompletableFuture.supplyAsync(() -> {
        getRemoteGrowthAndFill(id, userInfo);
        return Boolean.TRUE;
    }, executor);
    CompletableFuture.allOf(userFuture, bonusFuture, growthFuture).join();

    userFuture.get();
    bonusFuture.get();
    growthFuture.get();
    return userInfo;
}

溫馨提醒一下,這兩種方式別忘了使用執行緒池。示例中我用到了executor,表示自定義的執行緒池,為了防止高並發場景下,出現執行緒過多的問題。

6.獲取用戶上下文

不知道你在項目開發時,有沒有遇到過這樣的需求:用戶登錄之後,在所有的請求介面中,通過某個公共方法,就能獲取到當前登錄用戶的資訊?

獲取的用戶上下文,我們以CurrentUser為例。

CurrentUser內部包含了一個ThreadLocal對象,它負責保存當前執行緒的用戶上下文資訊。當然為了保證在執行緒池中,也能從用戶上下文中獲取到正確的用戶資訊,這裡用了阿里的TransmittableThreadLocal。偽程式碼如下:

@Data
public class CurrentUser {
    private static final TransmittableThreadLocal<CurrentUser> THREA_LOCAL = new TransmittableThreadLocal<>();
    
    private String id;
    private String userName;
    private String password;
    private String phone;
    ...
    
    public statis void set(CurrentUser user) {
      THREA_LOCAL.set(user);
    }
    
    public static void getCurrent() {
      return THREA_LOCAL.get();
    }
}

這裡為什麼用了阿里的TransmittableThreadLocal,而不是普通的ThreadLocal呢?在執行緒池中,由於執行緒會被多次復用,導致從普通的ThreadLocal中無法獲取正確的用戶資訊。父執行緒中的參數,沒法傳遞給子執行緒,而TransmittableThreadLocal很好解決了這個問題。

然後在項目中定義一個全局的spring mvc攔截器,專門設置用戶上下文到ThreadLocal中。偽程式碼如下:

public class UserInterceptor extends HandlerInterceptorAdapter {
   
   @Override  
   public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
      CurrentUser user = getUser(request);
      if(Objects.nonNull(user)) {
         CurrentUser.set(user);
      }
   } 
}

用戶在請求我們介面時,會先觸發該攔截器,它會根據用戶cookie中的token,調用調用介面獲取redis中的用戶資訊。如果能獲取到,說明用戶已經登錄,則把用戶資訊設置到CurrentUser類的ThreadLocal中。

接下來,在api服務的下層,即business層的方法中,就能輕鬆通過CurrentUser.getCurrent();方法獲取到想要的用戶上下文資訊了。

這套用戶體系的想法是很good的,但深入使用後,發現了一個小插曲:

api服務和mq消費者服務都引用了business層,business層中的方法兩個服務都能直接調用。

我們都知道在api服務中用戶是需要登錄的,而mq消費者服務則不需要登錄。

如果business中的某個方法剛開始是給api開發的,在方法深處使用了CurrentUser.getCurrent();獲取用戶上下文。但後來,某位新來的帥哥在mq消費者中也調用了那個方法,並未發覺這個小機關,就會中招,出現找不到用戶上下文的問題。

所以我當時的第一個想法是:程式碼沒做兼容處理,因為之前這類問題偶爾會發生一次。

想要解決這個問題,其實也很簡單。只需先判斷一下能否從CurrentUser中獲取用戶資訊,如果不能,則取配置的系統用戶資訊。偽程式碼如下:

@Autowired
private BusinessConfig businessConfig;

CurrentUser user = CurrentUser.getCurrent();
if(Objects.nonNull(user)) {
   entity.setUserId(user.getUserId());
   entity.setUserName(user.getUserName());
} else {
   entity.setUserId(businessConfig.getDefaultUserId());
   entity.setUserName(businessConfig.getDefaultUserName());
}

這種簡單無公害的程式碼,如果只是在一兩個地方加還OK。

此外,眾所周知,SimpleDateFormat在java8以前,是用來處理時間的工具類,它是非執行緒安全的。也就是說,用該方法解析日期會有執行緒安全問題。

為了避免執行緒安全問題的出現,我們可以把SimpleDateFormat對象定義成局部變數。但如果你一定要把它定義成靜態變數,可以使用ThreadLocal保存日期,也能解決執行緒安全問題。

8. 傳遞參數

之前見過有些同事寫程式碼時,一個非常有趣的用法,即:使用MDC傳遞參數。

MDC是什麼?

MDCorg.slf4j包下的一個類,它的全稱是Mapped Diagnostic Context,我們可以認為它是一個執行緒安全的存放診斷日誌的容器。

MDC的底層是用了ThreadLocal來保存數據的。

例如現在有這樣一種場景:我們使用RestTemplate調用遠程介面時,有時需要在header中傳遞資訊,比如:traceId,source等,便於在查詢日誌時能夠串聯一次完整的請求鏈路,快速定位問題。

這種業務場景就能通過ClientHttpRequestInterceptor介面實現,具體做法如下:

第一步,定義一個LogFilter攔截所有介面請求,在MDC中設置traceId:

public class LogFilter implements Filter {
    @Override
    public void init(FilterConfig filterConfig) throws ServletException {
    }

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        MdcUtil.add(UUID.randomUUID().toString());
        System.out.println("記錄請求日誌");
        chain.doFilter(request, response);
        System.out.println("記錄響應日誌");
    }

    @Override
    public void destroy() {
    }
}

第二步,實現ClientHttpRequestInterceptor介面,MDC中獲取當前請求的traceId,然後設置到header中:

public class RestTemplateInterceptor implements ClientHttpRequestInterceptor {

    @Override
    public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
        request.getHeaders().set("traceId", MdcUtil.get());
        return execution.execute(request, body);
    }
}

第三步,定義配置類,配置上面定義的RestTemplateInterceptor類:

@Configuration
public class RestTemplateConfiguration {

    @Bean
    public RestTemplate restTemplate() {
        RestTemplate restTemplate = new RestTemplate();
        restTemplate.setInterceptors(Collections.singletonList(restTemplateInterceptor()));
        return restTemplate;
    }

    @Bean
    public RestTemplateInterceptor restTemplateInterceptor() {
        return new RestTemplateInterceptor();
    }
}

其中MdcUtil其實是利用MDC工具在ThreadLocal中存儲和獲取traceId

public class MdcUtil {

    private static final String TRACE_ID = "TRACE_ID";

    public static String get() {
        return MDC.get(TRACE_ID);
    }

    public static void add(String value) {
        MDC.put(TRACE_ID, value);
    }
}

當然,這個例子中沒有演示MdcUtil類的add方法具體調的地方,我們可以在filter中執行介面方法之前,生成traceId,調用MdcUtil類的add方法添加到MDC中,然後在同一個請求的其他地方就能通過MdcUtil類的get方法獲取到該traceId。

能使用MDC保存traceId等參數的根本原因是,用戶請求到應用伺服器,Tomcat會從執行緒池中分配一個執行緒去處理該請求。

那麼該請求的整個過程中,保存到MDCThreadLocal中的參數,也是該執行緒獨享的,所以不會有執行緒安全問題。

9. 模擬高並發

有時候我們寫的介面,在低並發的場景下,一點問題都沒有。

但如果一旦出現高並發調用,該介面可能會出現一些意想不到的問題。

為了防止類似的事情發生,一般在項目上線前,我們非常有必要對介面做一下壓力測試

當然,現在已經有比較成熟的壓力測試工具,比如:JmeterLoadRunner等。

如果你覺得下載壓測工具比較麻煩,也可以手寫一個簡單的模擬並發操作的工具,用CountDownLatch就能實現,例如:

public static void concurrenceTest() {
    /**
     * 模擬高並發情況程式碼
     */
    final AtomicInteger atomicInteger = new AtomicInteger(0);
    final CountDownLatch countDownLatch = new CountDownLatch(1000); // 相當於計數器,當所有都準備好了,再一起執行,模仿多並發,保證並發量
    final CountDownLatch countDownLatch2 = new CountDownLatch(1000); // 保證所有執行緒執行完了再列印atomicInteger的值
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    try {
        for (int i = 0; i < 1000; i++) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        countDownLatch.await(); //一直阻塞當前執行緒,直到計時器的值為0,保證同時並發
                    } catch (InterruptedException e) {
                        log.error(e.getMessage(),e);
                    }
                    //每個執行緒增加1000次,每次加1
                    for (int j = 0; j < 1000; j++) {
                        atomicInteger.incrementAndGet();
                    }
                    countDownLatch2.countDown();
                }
            });
            countDownLatch.countDown();
        }

        countDownLatch2.await();// 保證所有執行緒執行完
        executorService.shutdown();
    } catch (Exception e){
        log.error(e.getMessage(),e);
    }
}

10. 處理mq消息

在高並發的場景中,消息積壓問題,可以說如影隨形,真的沒辦法從根本上解決。表面上看,已經解決了,但後面不知道什麼時候,就會冒出一次,比如這次:

有天下午,產品過來說:有幾個商戶投訴過來了,他們說菜品有延遲,快查一下原因。

這次問題出現得有點奇怪。

為什麼這麼說?

首先這個時間點就有點奇怪,平常出問題,不都是中午或者晚上用餐高峰期嗎?怎麼這次問題出現在下午?

根據以往積累的經驗,我直接看了kafkatopic的數據,果然上面消息有積壓,但這次每個partition都積壓了十幾萬的消息沒有消費,比以往加壓的消息數量增加了幾百倍。這次消息積壓得極不尋常。

我趕緊查服務監控看看消費者掛了沒,還好沒掛。又查服務日誌沒有發現異常。這時我有點迷茫,碰運氣問了問訂單組下午發生了什麼事情沒?他們說下午有個促銷活動,跑了一個JOB批量更新過有些商戶的訂單資訊。

這時,我一下子如夢初醒,是他們在JOB中批量發消息導致的問題。怎麼沒有通知我們呢?實在太坑了。

雖說知道問題的原因了,倒是眼前積壓的這十幾萬的消息該如何處理呢?

此時,如果直接調大partition數量是不行的,歷史消息已經存儲到4個固定的partition,只有新增的消息才會到新的partition。我們重點需要處理的是已有的partition。

直接加服務節點也不行,因為kafka允許同組的多個partition被一個consumer消費,但不允許一個partition被同組的多個consumer消費,可能會造成資源浪費。

看來只有用多執行緒處理了。

為了緊急解決問題,我改成了用執行緒池處理消息,核心執行緒和最大執行緒數都配置成了50

大致用法如下:

  1. 先定義一個執行緒池:
@Configuration
public class ThreadPoolConfig {

    @Value("${thread.pool.corePoolSize:5}")
    private int corePoolSize;

    @Value("${thread.pool.maxPoolSize:10}")
    private int maxPoolSize;

    @Value("${thread.pool.queueCapacity:200}")
    private int queueCapacity;

    @Value("${thread.pool.keepAliveSeconds:30}")
    private int keepAliveSeconds;

    @Value("${thread.pool.threadNamePrefix:ASYNC_}")
    private String threadNamePrefix;

    @Bean("messageExecutor")
    public Executor messageExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setKeepAliveSeconds(keepAliveSeconds);
        executor.setThreadNamePrefix(threadNamePrefix);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}
  1. 再定義一個消息的consumer:
@Service
public class MyConsumerService {
    @Autowired
    private Executor messageExecutor;
    
    @KafkaListener(id="test",topics={"topic-test"})
    public void listen(String message){
        System.out.println("收到消息:" + message);
        messageExecutor.submit(new MyWork(message);
    }
}
  1. 在定義的Runable實現類中處理業務邏輯:
public class MyWork implements Runnable {
    private String message;
    
    public MyWork(String message) {
       this.message = message;
    }

    @Override
    public void run() {
        System.out.println(message);
    }
}

果然,調整之後消息積壓數量確實下降的非常快,大約半小時後,積壓的消息就非常順利的處理完了。

但此時有個更嚴重的問題出現:我收到了報警郵件,有兩個訂單系統的節點down機了。。。

更詳細內容,請看看我的另一篇文章《我用kafka兩年踩過的一些非比尋常的坑

11. 統計數量

在多執行緒的場景中,有時候需要統計數量,比如:用多執行緒導入供應商數據時,統計導入成功的供應商數有多少。

如果這時候用count++統計次數,最終的結果可能會不準。因為count++並非原子操作,如果多個執行緒同時執行該操作,則統計的次數,可能會出現異常。

為了解決這個問題,就需要使用concurentatomic包下面的類,比如:AtomicIntegerAtomicLong等。

@Servcie
public class ImportSupplierService {
  private static AtomicInteger count = new AtomicInteger(0);

  public int importSupplier(List<SupplierInfo> supplierList) {
       if(CollectionUtils.isEmpty(supplierList)) {
           return 0;
       }

       supplierList.parallelStream().forEach(x -> {
           try {
             importSupplier(x);
             count.addAndGet(1);
           } catch(Exception e) {
              log.error(e.getMessage(),e);
           }
       );

      return count.get();
  }    
}

AtomicInteger的底層說白了使用自旋鎖+CAS

public final int incrementAndGet() {
    for (;;) {
        int current = get();
        int next = current + 1;
        if (compareAndSet(current, next))
            return next;
    }
}

自旋鎖說白了就是一個死循環

CAS比較交換的意思。

它的實現邏輯是:將記憶體位置處的舊值預期值進行比較,若相等,則將記憶體位置處的值替換為新值。若不相等,則不做任何操作。

12. 延遲定時任務

我們經常有延遲處理數據的需求,比如:如果用戶下單後,超過30分鐘還未完成支付,則系統自動將該訂單取消。

這裡需求就可以使用延遲定時任務實現。

ScheduledExecutorServiceJDK1.5+版本引進的定時任務,該類位於java.util.concurrent並發包下。

ScheduledExecutorService是基於多執行緒的,設計的初衷是為了解決Timer單執行緒執行,多個任務之間會互相影響的問題。

它主要包含4個方法:

  • schedule(Runnable command,long delay,TimeUnit unit),帶延遲時間的調度,只執行一次,調度之後可通過Future.get()阻塞直至任務執行完畢。
  • schedule(Callable callable,long delay,TimeUnit unit),帶延遲時間的調度,只執行一次,調度之後可通過Future.get()阻塞直至任務執行完畢,並且可以獲取執行結果。
  • scheduleAtFixedRate,表示以固定頻率執行的任務,如果當前任務耗時較多,超過定時周期period,則當前任務結束後會立即執行。
  • scheduleWithFixedDelay,表示以固定延時執行任務,延時是相對當前任務結束為起點計算開始時間。

實現這種定時任務的具體程式碼如下:

public class ScheduleExecutorTest {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            System.out.println("doSomething");
        },1000,1000, TimeUnit.MILLISECONDS);
    }
}

調用ScheduledExecutorService類的scheduleAtFixedRate方法實現周期性任務,每隔1秒鐘執行一次,每次延遲1秒再執行。

這種定時任務是阿里巴巴開發者規範中用來替代Timer類的方案,對於多執行緒執行周期性任務,是個不錯的選擇。

使用ScheduledExecutorService類做延遲定時任務的優缺點:

  • 優點:基於多執行緒的定時任務,多個任務之間不會相關影響,支援周期性的執行任務,並且帶延遲功能。

  • 缺點:不支援一些較複雜的定時規則。

當然,你也可以使用分散式定時任務,比如:xxl-job或者elastic-job等等。

其實,在實際工作中我使用多執行緒的場景遠遠不只這12種,在這裡只是拋磚引玉,介紹了一些我認為比較常見的業務場景。

此外,如果你對並發編程中的一些坑,比較感興趣的話,可以看看我的另一個文章《聊聊並發編程的10個坑》,裡面寫的非常詳細。

最後說一句(求關注,別白嫖我)

如果這篇文章對您有所幫助,或者有所啟發的話,幫忙掃描下發二維碼關注一下,您的支援是我堅持寫作最大的動力。

求一鍵三連:點贊、轉發、在看。

關注公眾號:【蘇三說技術】,在公眾號中回復:面試、程式碼神器、開發手冊、時間管理有超贊的粉絲福利,另外回復:加群,可以跟很多BAT大廠的前輩交流和學習。