Advertisement
alyoshinkkaa

Untitled

Apr 25th, 2024 (edited)
56
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.13 KB | None | 0 0
  1. package info.kgeorgiy.ja.shinkareva.crawler;
  2.  
  3. import info.kgeorgiy.java.advanced.crawler.*;
  4.  
  5. import java.io.IOException;
  6. import java.net.MalformedURLException;
  7. import java.util.*;
  8. import java.util.concurrent.*;
  9. import java.util.function.BiFunction;
  10.  
  11. public class WebCrawler implements Crawler {
  12. private final Downloader downloader;
  13. private final int perHost;
  14. private final ExecutorService extractors;
  15. private final ExecutorService downloaders;
  16. private final ConcurrentMap<String, Semaphore> hostsMap = new ConcurrentHashMap<>();
  17.  
  18. private final Map<String, IOException> errors = new ConcurrentHashMap<>();
  19. private final Set<String> downloaded = ConcurrentHashMap.newKeySet();
  20. private final Set<String> cached = ConcurrentHashMap.newKeySet();
  21. private final Phaser phaser = new Phaser(1);
  22.  
  23. public WebCrawler(
  24. final Downloader downloader,
  25. final int downloaders,
  26. final int extractors,
  27. final int perHost
  28. ) {
  29. this.downloader = downloader;
  30. this.downloaders = Executors.newFixedThreadPool(downloaders);
  31. this.extractors = Executors.newFixedThreadPool(extractors);
  32. this.perHost = perHost;
  33. }
  34.  
  35. @Override
  36. public Result download(final String url, final int depth) {
  37. return downloadAndGetResult(url, depth);
  38. }
  39.  
  40. @Override
  41. public void close() {
  42. extractors.shutdown();
  43. downloaders.shutdown();
  44. if (!(extractors.isShutdown() && downloaders.isShutdown())) {
  45. System.err.println("Could not shutdown pools");
  46. }
  47. }
  48.  
  49.  
  50. public static void main(String[] args) {
  51. if (args == null || args.length == 0 || Arrays.stream(args).anyMatch(Objects::isNull)) {
  52. System.err.println("Expected not null arguments");
  53. } else {
  54. final BiFunction<String[], Integer, Integer> argValue =
  55. (arr, idx) -> idx >= arr.length ? 1 : Integer.parseInt(arr[idx]);
  56. try {
  57. try (Crawler crawler = new WebCrawler(
  58. new CachingDownloader(1),
  59. argValue.apply(args, 2),
  60. argValue.apply(args, 3),
  61. argValue.apply(args, 4)
  62. )) {
  63. crawler.download(args[0], argValue.apply(args, 1));
  64. }
  65. } catch (final NumberFormatException e) {
  66. System.err.printf("Expected numbers in arguments but found: %s \n", e.getMessage());
  67. } catch (final IOException e) {
  68. System.err.printf("Exception while initializing: %s ", e.getMessage());
  69. }
  70. }
  71. }
  72.  
  73. private void download(final String url, final int depth, final Queue<String> queue) {
  74. try {
  75. final String hostName = URLUtils.getHost(url);
  76. final var semaphore = hostsMap.computeIfAbsent(hostName, name -> new Semaphore(perHost));
  77. semaphore.acquire();
  78. phaser.register();
  79. extractors.submit(() -> {
  80. try {
  81. final var doc = downloader.download(url);
  82. downloaded.add(url);
  83. if (depth > 1) {
  84. phaser.register();
  85. extractors.submit(() -> {
  86. try {
  87. doc.extractLinks().forEach(link -> {
  88. if (!cached.contains(link)) {
  89. cached.add(link);
  90. queue.add(link);
  91. }
  92. });
  93. } catch (final IOException e) {
  94. System.err.printf("Exception while extracting links from doc by url: %s \n", url);
  95. } finally {
  96. phaser.arriveAndDeregister();
  97. }
  98. });
  99. }
  100. } catch (final IOException e) {
  101. errors.put(url, e);
  102. } finally {
  103. semaphore.release();
  104. phaser.arriveAndDeregister();
  105. }
  106. });
  107. } catch (final MalformedURLException e) {
  108. errors.put(url, e);
  109. } catch (InterruptedException e) {
  110. Thread.currentThread().interrupt();
  111. }
  112. }
  113. public Result downloadAndGetResult(final String url, final int depth) {
  114. cached.add(url);
  115. final Queue<String> prevUrls = new ConcurrentLinkedDeque<>();
  116. final Queue<String> curUrls = new ConcurrentLinkedDeque<>();
  117. prevUrls.add(url);
  118. for (int i = 0; i < depth; i++) {
  119. for (final String prevUrl : prevUrls) {
  120. download(prevUrl, depth - i, curUrls);
  121. }
  122. phaser.arriveAndAwaitAdvance();
  123. prevUrls.clear();
  124. prevUrls.addAll(curUrls);
  125. curUrls.clear();
  126. }
  127. return new Result(new ArrayList<>(downloaded), errors);
  128. }
  129.  
  130.  
  131. private static final class HostWorker {
  132. private final Queue<Runnable> nonWorkingTasks = new ConcurrentLinkedDeque<>();
  133. private int working;
  134. private final int perHost;
  135. private final ExecutorService downloaders;
  136.  
  137. private HostWorker(final int perHost, final ExecutorService downloaders) {
  138. this.perHost = perHost;
  139. this.downloaders = downloaders;
  140. }
  141.  
  142. public static HostWorker of(final int perHost, final ExecutorService downloaderPool) {
  143. return new HostWorker(perHost, downloaderPool);
  144. }
  145.  
  146. private void addTask(final Runnable task) {
  147. if (working >= perHost) {
  148. nonWorkingTasks.add(task);
  149. } else {
  150. downloaders.submit(task);
  151. working++;
  152. }
  153. }
  154.  
  155. private void runNewTask() {
  156. var task = nonWorkingTasks.poll();
  157. if (Objects.nonNull(task)) downloaders.submit(task);
  158. else working--;
  159. }
  160. }
  161. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement