Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.BlockingQueue;
- import java.util.function.Consumer;
- import lombok.AllArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import lombok.val;
- import com.vertigo.model.json.sync.CrawlerTask;
- import static com.vertigo.crawler.job.SpotifyCrawlerJob.mapList;
- /** A {@code QueueConsumer} is an object that... */
- @Slf4j
- @AllArgsConstructor
- public class QueueConsumer implements Runnable {
- private final BlockingQueue<CrawlerTask> loadedQueue;
- private final Consumer<List<CrawlerTask>> consumer;
- @Override
- public void run() {
- try {
- while (!Thread.currentThread().isInterrupted()) {
- // block if the queue empty
- CrawlerTask head = loadedQueue.take();
- int n = loadedQueue.size();
- val cts = new ArrayList<CrawlerTask>(n + 1);
- cts.add(head);
- for (int i = 0; i < n; i++) {
- cts.add(loadedQueue.take());
- }
- log.debug("consuming cts = {} {}", cts.size(), mapList(cts, SpotifyCrawlerJob::describe));
- consumer.accept(cts);
- }
- } catch (Exception e) {
- log.error("Interrupted", e);
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement