05 August 2008
多线程的爬虫,ConcurrentHashMap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* // This is the HtmlParser's API interface.
* // You should not implement it, or speculate about its implementation
* interface HtmlParser {
* public List<String> getUrls(String url) {}
* }
*/
class Solution {
public List<String> crawl(String startUrl, HtmlParser htmlParser) {
String hostname = getHostname(startUrl);
Set<String> visited = ConcurrentHashMap.newKeySet();
visited.add(startUrl);
return crawl(startUrl, htmlParser, hostname, visited)
.collect(Collectors.toList());
}
private Stream<String> crawl(String startUrl, HtmlParser htmlParser, String hostname, Set<String> visited) {
Stream<String> stream = htmlParser.getUrls(startUrl)
.parallelStream()
.filter(url -> isSameHostname(url, hostname))
.filter(url -> visited.add(url))
.flatMap(url -> crawl(url, htmlParser, hostname, visited));
return Stream.concat(Stream.of(startUrl), stream);
}
private String getHostname(String url) {
int idx = url.indexOf('/', 7);
return (idx != -1) ? url.substring(0, idx) : url;
}
private boolean isSameHostname(String url, String hostname) {
if (!url.startsWith(hostname)) {
return false;
}
return url.length() == hostname.length() || url.charAt(hostname.length()) == '/';
}
}
BlockingQueue + ExecutorService:
Use main thread to take URLs from queue, and submit tasks into a thread pool. Threads in thread pool perform I/O and add URLs back to the queue.
The program exits when queue is empty and all tasks submitted to thread pool are completed.
BlockingQueue is thread-safe.
Other objects are only modified by the main thread, so no synchronization is needed.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class Solution {
public List<String> crawl(String startUrl, HtmlParser htmlParser) {
String hostName = getHostName(startUrl);
List<String> res = new ArrayList<>();
Set<String> visited = new HashSet<>();
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
Deque<Future> tasks = new ArrayDeque<>();
queue.offer(startUrl);
// Create a thread pool of 4 threads to perform I/O operations.
ExecutorService executor = Executors.newFixedThreadPool(4, r -> {
Thread t = new Thread(r);
// Leetcode doesn't allow executor.shutdown().
// Use daemon threads so the program can exit.
t.setDaemon(true);
return t;
});
while (true) {
String url = queue.poll();
if (url != null) {
if (getHostName(url).equals(hostName) && !visited.contains(url)) {
res.add(url);
visited.add(url);
// Use a thread in thread pool to fetch new URLs and put them into the queue.
tasks.add(executor.submit(() -> {
List<String> newUrls = htmlParser.getUrls(url);
for (String newUrl : newUrls) {
queue.offer(newUrl);
}
}));
}
} else {
if (!tasks.isEmpty()) {
// Wait for the next task to complete, which may supply new URLs into the queue.
Future nextTask = tasks.poll();
try {
nextTask.get();
} catch (InterruptedException | ExecutionException e) {}
} else {
// Exit when all tasks are completed.
break;
}
}
}
return res;
}
private String getHostName(String url) {
url = url.substring(7);
String[] parts = url.split("/");
return parts[0];
}
}