Advertisement
NLinker

RxJava: n-split and rendezvous

Jun 29th, 2017
289
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 2.19 KB | None | 0 0
  1.     {
  2.         for (int i = 0; i < 100; i++) {
  3.             val theNow = Instant.now();
  4.             val qu = new QueueUtil(ru, Provider.Spotify, () -> theNow);
  5.             val tasks = qu.randomTasks(10);
  6.             val gtasks = tasks
  7.                 .stream()
  8.                 .collect(Collectors.groupingBy(CrawlerTask::getEntityType));
  9.             gtasks.computeIfAbsent(EntityType.Song, k -> Collections.emptyList());
  10.             gtasks.computeIfAbsent(EntityType.Album, k -> Collections.emptyList());
  11.             gtasks.computeIfAbsent(EntityType.Artist, k -> Collections.emptyList());
  12.  
  13.             val src1 = Observable
  14.                 .fromCallable(() -> callSpotify(EntityType.Song, gtasks.get(EntityType.Song)))
  15.                 .subscribeOn(Schedulers.io());
  16.             val src2 = Observable
  17.                 .fromCallable(() -> callSpotify(EntityType.Album, gtasks.get(EntityType.Album)))
  18.                 .subscribeOn(Schedulers.io());
  19.             val src3 = Observable
  20.                 .fromCallable(() -> callSpotify(EntityType.Artist, gtasks.get(EntityType.Artist)))
  21.                 .subscribeOn(Schedulers.io());
  22.             val zipper = (Function3<List<Optional<String>>, List<Optional<String>>, List<Optional<String>>, Integer>) (rs1, rs2, rs3) -> {
  23.                 int sz1 = rs1.stream().filter(Optional::isPresent).collect(Collectors.toList()).size();
  24.                 int sz2 = rs2.stream().filter(Optional::isPresent).collect(Collectors.toList()).size();
  25.                 int sz3 = rs2.stream().filter(Optional::isPresent).collect(Collectors.toList()).size();
  26.                 return sz1 + sz2 + sz3;
  27.             };
  28.             val founds = Observable.zip(src1, src2, src3, zipper);
  29.             founds.blockingForEach(System.out::println);
  30.         }
  31.     }
  32.  
  33.     @SneakyThrows
  34.     private List<Optional<String>> callSpotify(EntityType et, List<CrawlerTask> cts) {
  35.         Thread.sleep(ru.nextInt(1000, 2000));
  36.         System.out.println("Crawling " + et + " in " + Thread.currentThread().getName());
  37.         return mapList(cts, ct ->
  38.             ru.nextInt(0, 10) == 0 ?
  39.                 Optional.<String>empty() :
  40.                 Optional.of(ct.getExternalId() + "-" + et.toString()));
  41.     }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement