因为是多线程访问,所以这里用了synchronized (mutex) 保证线程安全。如果能从DB中查询出key是指定的URL的话,则返回相应的ID value,否则返回-1说明没有找到。
public int getNewDocID(String url) {
synchronized (mutex) {
try {
// Make sure that we have not already assigned a docid for this URL
int docid = getDocId(url);
if (docid > 0) {
return docid;
}
lastDocID++;
docIDsDB.put(null, new DatabaseEntry(url.getBytes()), new DatabaseEntry(Util.int2ByteArray(lastDocID)));
return lastDocID;
} catch (Exception e) {
e.printStackTrace();
}
return -1;
}
}
用getNewDocID生成新的ID并将它和URL存入DB。
addUrlAndDocId()是当你不想自动生成ID而想自己指定一个ID时使用,一般不建议用,除非是第二次使用并想用和之前一样的ID,但如果这样的话得先查出前一次的ID,效率不高,且真的没多大必要!
DocIDServer主要就这两个方法了,逻辑很简单,功能也很单一。
Frontier
回到addSeed方法,最后一句frontier.schedule(webUrl);将指定URL加入队列,只有加入队列之后爬虫线程才能对该URL进行解析。
Frontier有两个重要的新属性,一个是计数器Counters,另一个是URL队列WorkQueues:
protected WorkQueues workQueues = new WorkQueues(env, "PendingURLsDB", config.isResumableCrawling()); protected Counters counters = new Counters(env, config);
计数器Counters实现比较简单,用一个HashMap存储,目前只存储了两个值:已加入队列的URL数和已爬取完成的URL数。
URL队列WorkQueues保存当前已发现的但是又还没有分配给爬虫线程的WebURL,用BDB JE存储,创建了一个名为PendingURLsDB的数据库:
public WorkQueues(Environment env, String dbName, boolean resumable) throws DatabaseException {
this.env = env;
this.resumable = resumable;
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(resumable);
dbConfig.setDeferredWrite(!resumable);
urlsDB = env.openDatabase(null, dbName, dbConfig);
webURLBinding = new WebURLTupleBinding();
}
自定义了一个WebURLTupleBinding,可以在JE中保存WebURL的各个属性。如果你需要给WebURL添加一些属性,比如锚的标签名是a,img还是iframe,除了要在WebURL里面添加外,也需要修改WebURLTupleBinding,否则不会被存入DB,线程取出的时候该属性就会为空!
WorkQueues使用put, delete, get方法来实现增删查,以6位byte作为key,第一位是WebURL的priority属性,第二位是WebURL的深度属性,剩下4位是用WebURL的ID转换成byte;用WebURLTupleBinding中定义的内容作为value。因为数据库是以key为索引存储的,所以优先级高的即数字小的会排在前面,接着深度小的也会排在前面。
关于优先级,crawler4j有个小BUG,就是WebURL的priority属性默认就是最小0,这使得如果你想优先爬取某URL就不可能了,解决方法是在WebURL构造函数或setURL里为priority赋上默认值,至于赋什么值好,就看着办吧嘿嘿!
Frontier提供两个方法添加URL到队列:
public void scheduleAll(Listurls) { int maxPagesToFetch = config.getMaxPagesToFetch(); synchronized (mutex) { int newScheduledPage = 0; for (WebURL url : urls) { if (maxPagesToFetch > 0 && (scheduledPages + newScheduledPage) >= maxPagesToFetch) { break; } try { workQueues.put(url); newScheduledPage++; } catch (DatabaseException e) { logger.error("Error while puting the url in the work queue."); } } if (newScheduledPage > 0) { scheduledPages += newScheduledPage; counters.increment(Counters.ReservedCounterNames.SCHEDULED_PAGES, newScheduledPage); } synchronized (waitingList) { waitingList.notifyAll(); } } } public void schedule(WebURL url) { int maxPagesToFetch = config.getMaxPagesToFetch();