Advertisement
NLinker

QueueConsumer implementation

Jun 15th, 2017
203
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 1.30 KB | None | 0 0
  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.concurrent.BlockingQueue;
  4. import java.util.function.Consumer;
  5. import lombok.AllArgsConstructor;
  6. import lombok.extern.slf4j.Slf4j;
  7. import lombok.val;
  8.  
  9. import com.vertigo.model.json.sync.CrawlerTask;
  10.  
  11. import static com.vertigo.crawler.job.SpotifyCrawlerJob.mapList;
  12.  
  13. /** A {@code QueueConsumer} is an object that... */
  14. @Slf4j
  15. @AllArgsConstructor
  16. public class QueueConsumer implements Runnable {
  17.     private final BlockingQueue<CrawlerTask>  loadedQueue;
  18.     private final Consumer<List<CrawlerTask>> consumer;
  19.  
  20.     @Override
  21.     public void run() {
  22.         try {
  23.             while (!Thread.currentThread().isInterrupted()) {
  24.                 // block if the queue empty
  25.                 CrawlerTask head = loadedQueue.take();
  26.                 int n = loadedQueue.size();
  27.                 val cts = new ArrayList<CrawlerTask>(n + 1);
  28.                 cts.add(head);
  29.                 for (int i = 0; i < n; i++) {
  30.                     cts.add(loadedQueue.take());
  31.                 }
  32.                 log.debug("consuming cts = {} {}", cts.size(), mapList(cts, SpotifyCrawlerJob::describe));
  33.                 consumer.accept(cts);
  34.             }
  35.         } catch (Exception e) {
  36.             log.error("Interrupted", e);
  37.         }
  38.     }
  39. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement