webmagic源碼淺析
webmagic簡介
webmagic可以說是中國傳播度最廣的Java爬蟲框架,//github.com/code4craft/webmagic,閱讀相關源碼,獲益良多。閱讀作者部落格【程式碼工匠】,能夠領略到一個IT工作者的工匠精神,希望以後成為他這樣的開源貢獻者。Webmagic的文檔也是寫得非常漂亮,這裡就不具體講它的使用方法了,見官方文檔
webmagic核心架構
webmagic幫我們做了幾個核心的事情:
1.執行緒池封裝,不用手動控制採集執行緒
2.url調度,實現了生產者消費者模型
3.封裝下載器組件(downloader),解析組件,持久化。 見官方文檔
4.支援註解
簡單案例
借用一段官方案例,快速入門,便於後面的理解。開啟一個爬蟲,只需要簡單幾步,編寫頁面解析器,寫具體的解析方法。新建Spider實例,添加至少一個種子URL,設置其他可選屬性,最後調用run()方法,或者start(),start()方法內部會為spider單獨開啟一個執行緒,使得爬蟲與主執行緒非同步。
import us.codecraft.webmagic.Page;
import us.codecraft.webmagic.Site;
import us.codecraft.webmagic.Spider;
import us.codecraft.webmagic.processor.PageProcessor;
public class GithubRepoPageProcessor implements PageProcessor {
private Site site = Site.me().setRetryTimes(3).setSleepTime(100);
@Override
public void process(Page page) {
//將提取的url加入page對象暫存,最終會加入到
page.addTargetRequests(page.getHtml().links().regex("(//github\\.com/\\w+/\\w+)").all());
page.putField("author", page.getUrl().regex("//github\\.com/(\\w+)/.*").toString());
page.putField("name", page.getHtml().xpath("//h1[@class='entry-title public']/strong/a/text()").toString());
if (page.getResultItems().get("name")==null){
//skip this page
page.setSkip(true);
}
page.putField("readme", page.getHtml().xpath("//div[@id='readme']/tidyText()"));
}
@Override
public Site getSite() {
return site;
}
public static void main(String[] args) {
//官方鏈式調用,拆解到下面方便理解
//Spider.create(new GithubRepoPageProcessor()).addUrl("//github.com/code4craft").thread(5).run();
//創建執行緒
Spider spider = Spider.create(new GithubRepoPageProcessor());
//添加採集種子URL
spider.addUrl("//github.com/code4craft");
//設置執行緒數
spider.thread(5);
//啟動爬蟲//run()方法既可以看作多執行緒中的Runnable介面方法,也可以直接運行,是爬蟲的核心方法
spider.run();
}
}
Spider類屬性
爬蟲的核心是us.codecraft.webmagic.Spider類,看看Spider類中都有哪些重要屬性
屬性列表:
public class Spider implements Runnable, Task {
//下載器對象
protected Downloader downloader;
//持久化統一處理器,可以有多個
protected List<Pipeline> pipelines = new ArrayList<Pipeline>();
//頁面解析器
protected PageProcessor pageProcessor;
//種子請求(這個地方看著種子請求也不是很對,因為spider對象在沒開始運行時,仍然可以使用addRequest,addUrl添加url )
protected List<Request> startRequests;
//瀏覽器資訊對象
protected Site site;
//爬蟲任務標識
protected String uuid;
//任務調度器,默認是JDK中的LinkedBlockingQueue的實現
protected Scheduler scheduler = new QueueScheduler();
protected Logger logger = LoggerFactory.getLogger(getClass());
//執行緒池(自己封裝的一個模型,內部的execute方法實際是executorService的execute實現添加執行緒的作用)
protected CountableThreadPool threadPool;
//執行管理器對象(和執行緒池配合使用)
protected ExecutorService executorService;
//執行緒數,控制採集並發
protected int threadNum = 1;
//爬蟲任務運行狀態
protected AtomicInteger stat = new AtomicInteger(STAT_INIT);
//是否採集完成退出
protected boolean exitWhenComplete = true;
protected final static int STAT_INIT = 0;
protected final static int STAT_RUNNING = 1;
protected final static int STAT_STOPPED = 2;
//是否迴流url,spawn產卵的意思。個人覺得這個參數很多餘,不想採集繼續下去,可以別把url加入隊列
protected boolean spawnUrl = true;
//退出時是否回收處理
protected boolean destroyWhenExit = true;
//控制新生成url鎖
private ReentrantLock newUrlLock = new ReentrantLock();
//控制新生成url鎖,配合newUrlLock 使用
private Condition newUrlCondition = newUrlLock.newCondition();
//監聽器集合,請求爬去成功或者失敗時,可以通過注入監聽器分別實現onSuccess和onError方法
private List<SpiderListener> spiderListeners;
//採集頁面數統計(只代表請求的次數,不代表成功抓取數)
private final AtomicLong pageCount = new AtomicLong(0);
//爬取開始時間
private Date startTime;
//調度器隊列中的URL已經被消費光,且採集執行緒未執行完成,仍然可能生產URL到調度器隊列中時,執行緒最多wait 30秒
private int emptySleepTime = 30000;
threadNum 這裡Spider本身實現了Runnable介面,可以作為一個獨立的執行緒開啟,當然它的執行緒控制不僅於此,這裡有一個屬性threadNum才是控制採集執行緒數的,後面再細說。
scheduler 對象做為調度器,內部採用隊列維護了一個實現生產者消費者模型,爬取的過程中,可以將採集的url提取到scheduler的隊列中,執行緒會持續不斷的消費scheduler 的隊列中消費。
pageProcessor 用於用戶自定義頁面解析規則,定義具體的解析邏輯,新建Spider實例的方式僅兩種,public static Spider create(PageProcessor pageProcessor)
和構造方法public Spider(PageProcessor pageProcessor)
create方法內部只是調用了一下構造方法。構造一個spider對象都需要一個自定義的解析器,不同頁面,解析邏輯不相同,PageProcessor介面中。spider會調用PageProcessor的process方法,這是一個策略設計模式。
uuid 這個名字可能讓人誤會,和平時uuid不是一個含義,這個屬性是一個爬蟲進程的唯一標識
其他屬性 比較重要的屬性還包括threadPool,executorService,控制多執行緒並發,瀏覽器對象site,對於有些反爬策略的網站,該對象可以用於模擬瀏覽器,達到反反爬蟲
的作用。
Spider核心方法run()
@Override
public void run() {
checkRunningStat();//檢查爬蟲運行狀態,防止run方法被調用多次
initComponent();//初始化
logger.info("Spider {} started!",getUUID());
while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
//循環消費Request,url在放入scheduler時,已經封裝為Request對象了
final Request request = scheduler.poll(this);
if (request == null) {
if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
//threadPool.getThreadAlive()執行緒池中仍然還有存活執行緒,那麼存活執行緒可能會生產出新的url來
//exitWhenComplete默認為true,
//exitWhenComplete如果為false,執行緒等待新URL,
//如果隊列(自定義隊列)能實現動態添加url,那就可以實現動態添加採集任務的功能
break;
}
// wait until new url added
//等待存活的執行緒生產新的url
waitNewUrl();
} else {
//將request封裝為執行緒,加入執行緒隊列,執行緒池會根據設置的並行參數threadNum,並行執行
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
processRequest(request);//執行請求
onSuccess(request);//調用執行成功的方法
} catch (Exception e) {
onError(request);
logger.error("process request " + request + " error", e);
} finally {
pageCount.incrementAndGet();
signalNewUrl();
}
}
});
}
}
stat.set(STAT_STOPPED);
// release some resources
if (destroyWhenExit) {
close();
}
logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get());
}
核心方法的流程還是比較簡答的,checkRunningStat()
會先檢查一下爬蟲是否已經啟動,這有點兒像多執行緒中的開啟執行緒的start()方法,兩次開啟是不允許的。
然後初始化方法initComponent()
各種組件,在initComponent()
方法中,加入startRequests中的Request,實際上在Spider啟動之前可以調用addUrl(String... urls)
和addRequest(Request... requests)
方法直接將請求加入到隊列中,startRequests和後面那種添加url的方法缺少了一定的一致性。
後面一個循環消費的過程,正如我注釋里寫的那樣,如果隊列中url被消費完畢,且沒有正在被消費的存活的執行緒了,且完成採集退出屬性exitWhenComplete為true(exitWhenComplete默認為true,設置為false則進程將會一直掛起),就會跳出死循環,採集結束,反之,如果依然有執行緒存活,或者exitWhenComplete為false,那麼執行緒waitNewUrl()等待,在exitWhenComplete為false的情況,進程就不會自動停止了,除非強殺了,這種設計在分散式的模式下才顯得有意義,可以動態添加url到隊列中去。
private void waitNewUrl() {
newUrlLock.lock();
try {
// double check
if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
return;
}
//默認是30秒後自動蘇醒,可以通過設置emptySleepTime屬性,控制自動蘇醒的時間
newUrlCondition.await(emptySleepTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.warn("waitNewUrl - interrupted, error {}", e);
} finally {
newUrlLock.unlock();
}
}
後面使用threadPool執行一個新的子執行緒。new Runnable構造的匿名內部類會通過threadPool開啟一個新的子執行緒,執行請求processRequest(request)
,執行成功就調用onSuccess(request)
,失敗就調用onError(request)
,接著finally程式碼塊中的內容是非常重要的,統計請求的頁面次數(無論失敗或者成功),signalNewUrl()喚醒等待的執行緒,這裡要和前面waitNewUrl()結合起來看,兩者使用同一個鎖,waitNewUrl()作為父執行緒,默認會自動蘇醒,但調用signalNewUrl()的用意在於,可能這個子執行緒已經又生成新的URL放到隊列中了,就不用再等30秒了。
private void signalNewUrl() {
try {
newUrlLock.lock();
newUrlCondition.signalAll();
} finally {
newUrlLock.unlock();
}
}
後面的程式碼則是爬蟲結束的操作,這種情況只有前文提到的跳出死循環,採集結束,結束前設置了一下狀態,做了一下close()操作
調度器Scheduler
調度器在webmagic中扮演的角色是非常重要的,說來功能也不算太複雜,實現生產者-消費者模式,順便去重。scheduler默認為QueueScheduler ,在scheduler聲明的時候就直接新建了這個實例
public class QueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler {
//LinkedBlockingQueue隊列存url
private BlockingQueue<Request> queue = new LinkedBlockingQueue<Request>();
@Override 覆蓋父類DuplicateRemovedScheduler 的方法
public void pushWhenNoDuplicate(Request request, Task task) {
queue.add(request);
}
@Override 實現DuplicateRemovedScheduler 不完全實現Scheduler的poll方法
public Request poll(Task task) {
return queue.poll();
}
@Override //實現MonitorableScheduler 的方法
public int getLeftRequestsCount(Task task) {
return queue.size();
}
@Override //實現MonitorableScheduler 的方法
public int getTotalRequestsCount(Task task) {
return getDuplicateRemover().getTotalRequestsCount(task);
}
}
以上程式碼,可以看到QueueScheduler的構成,QueueScheduler繼承了抽象類DuplicateRemovedScheduler 實現了介面MonitorableScheduler 介面,DuplicateRemovedScheduler 又實現了Scheduler,DuplicateRemovedScheduler 為抽象類,僅僅實現了push邏輯(生產者),而poll是QueueScheduler自己實現的(消費者)。push()方法使用去重器,判斷該請求有沒有被採集過。這裡要注意,默認Post請求是不去重的,能直接打開的請求都是get的😁,官方文檔也有特別說明
//DuplicateRemovedScheduler 源碼
private DuplicateRemover duplicatedRemover = new HashSetDuplicateRemover();
@Override
public void push(Request request, Task task) {
logger.trace("get a candidate url {}", request.getUrl());
//duplicatedRemover.isDuplicate(request, task) 檢查是否採集過
if (shouldReserved(request) || noNeedToRemoveDuplicate(request) || !duplicatedRemover.isDuplicate(request, task)) {
logger.debug("push to queue {}", request.getUrl());
pushWhenNoDuplicate(request, task);
}
}
//是否需要去重,POST請求則不需要去重
protected boolean noNeedToRemoveDuplicate(Request request) {
return HttpConstant.Method.POST.equalsIgnoreCase(request.getMethod());
}
public class HashSetDuplicateRemover implements DuplicateRemover {
private Set<String> urls = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
@Override
public boolean isDuplicate(Request request, Task task) {
//add成功,說明沒有添加過這條請求,返回true
return !urls.add(getUrl(request));
}
protected String getUrl(Request request) {
return request.getUrl();
}
@Override
public void resetDuplicateCheck(Task task) {
urls.clear();
}
@Override
public int getTotalRequestsCount(Task task) {
return urls.size();
}
}