Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- {
- for (int i = 0; i < 100; i++) {
- val theNow = Instant.now();
- val qu = new QueueUtil(ru, Provider.Spotify, () -> theNow);
- val tasks = qu.randomTasks(10);
- val gtasks = tasks
- .stream()
- .collect(Collectors.groupingBy(CrawlerTask::getEntityType));
- gtasks.computeIfAbsent(EntityType.Song, k -> Collections.emptyList());
- gtasks.computeIfAbsent(EntityType.Album, k -> Collections.emptyList());
- gtasks.computeIfAbsent(EntityType.Artist, k -> Collections.emptyList());
- val src1 = Observable
- .fromCallable(() -> callSpotify(EntityType.Song, gtasks.get(EntityType.Song)))
- .subscribeOn(Schedulers.io());
- val src2 = Observable
- .fromCallable(() -> callSpotify(EntityType.Album, gtasks.get(EntityType.Album)))
- .subscribeOn(Schedulers.io());
- val src3 = Observable
- .fromCallable(() -> callSpotify(EntityType.Artist, gtasks.get(EntityType.Artist)))
- .subscribeOn(Schedulers.io());
- val zipper = (Function3<List<Optional<String>>, List<Optional<String>>, List<Optional<String>>, Integer>) (rs1, rs2, rs3) -> {
- int sz1 = rs1.stream().filter(Optional::isPresent).collect(Collectors.toList()).size();
- int sz2 = rs2.stream().filter(Optional::isPresent).collect(Collectors.toList()).size();
- int sz3 = rs2.stream().filter(Optional::isPresent).collect(Collectors.toList()).size();
- return sz1 + sz2 + sz3;
- };
- val founds = Observable.zip(src1, src2, src3, zipper);
- founds.blockingForEach(System.out::println);
- }
- }
- @SneakyThrows
- private List<Optional<String>> callSpotify(EntityType et, List<CrawlerTask> cts) {
- Thread.sleep(ru.nextInt(1000, 2000));
- System.out.println("Crawling " + et + " in " + Thread.currentThread().getName());
- return mapList(cts, ct ->
- ru.nextInt(0, 10) == 0 ?
- Optional.<String>empty() :
- Optional.of(ct.getExternalId() + "-" + et.toString()));
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement