Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- static class Oi {
- private final Operation op;
- private final Long i;
- private final int[] inr = new int[1024];
- public Oi(Operation op, Long i) {
- this.op = op;
- this.i = i;
- }
- @Override
- public String toString() {
- //noinspection StringBufferReplaceableByString
- final StringBuilder sb = new StringBuilder("Oi(");
- sb.append("i=").append(i);
- sb.append(",op=").append(op);
- sb.append(')');
- return sb.toString();
- }
- }
- @Test
- public void testSubjects() throws Exception {
- int n = 400_000;
- PublishSubject<Object> delSubj = PublishSubject.create();
- PublishSubject<Object> updSubj = PublishSubject.create();
- PublishSubject<Object> mrgSubj = PublishSubject.create();
- Operation[] ops = Operation.values();
- ThreadPoolExecutor tpeSub = new ThreadPoolExecutor(
- 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(),
- r -> {
- final Thread t = new Thread(r, "singleton thread pool");
- t.setDaemon(true);
- return t;
- }
- );
- ThreadPoolExecutor tpeObs = new ThreadPoolExecutor(
- 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(),
- r -> {
- final Thread t = new Thread(r, "singleton thread pool");
- t.setDaemon(true);
- return t;
- }
- );
- Scheduler schedulerSub = Schedulers.from(tpeSub);
- Scheduler schedulerObs = Schedulers.from(tpeObs);
- int[] row = new int[n];
- Long[] src = new Long[n];
- for (int i = 0; i < src.length; i++) {
- src[i] = (long) (i + 1);
- }
- AtomicInteger ai = new AtomicInteger(0);
- AtomicInteger bi = new AtomicInteger(0);
- CountDownLatch cdl = new CountDownLatch(1);
- Observable.interval(1, TimeUnit.MICROSECONDS)
- .map(i -> new Oi(ops[(int) (i % 3)], i))
- .flatMap(msg ->
- dispatch0(msg, delSubj, updSubj, mrgSubj, schedulerSub))
- //.observeOn(schedulerObs)
- .subscribe();
- //Observable.<Long>create(s -> {
- // for (int i = 0; i < n; i++) {
- // s.onNext((long) i);
- // }
- // })
- // .map(i -> new Oi(ops[(int) (i % 3)], i))
- // .flatMap(msg ->
- // dispatch0(msg, delSubj, updSubj, mrgSubj, schedulerSub))
- // //.observeOn(schedulerObs)
- // .subscribe();
- final Random random = new Random(123);
- delSubj.map(msg -> (Oi) msg)
- .subscribe(oi -> {
- if (oi.i >= n) cdl.countDown();
- else {
- row[oi.i.intValue()] = bi.incrementAndGet();
- randomDelay(random);
- logProgress(delSubj, "delSubj", oi.i);
- }
- });
- updSubj.map(msg -> (Oi) msg)
- .subscribe(oi -> {
- if (oi.i >= n) cdl.countDown();
- else {
- row[oi.i.intValue()] = bi.incrementAndGet();
- randomDelay(random);
- logProgress(updSubj, "updSubj", oi.i);
- }
- });
- mrgSubj.map(msg -> (Oi) msg)
- .subscribe(oi -> {
- if (oi.i >= n) cdl.countDown();
- else {
- row[oi.i.intValue()] = bi.incrementAndGet();
- randomDelay(random);
- logProgress(mrgSubj, "mrgSubj", oi.i);
- }
- });
- cdl.await();
- // verify
- System.out.println("row = " + row.length);
- for (int i = 0; i < row.length; i++) {
- Assert.assertEquals(row[i], i + 1);
- }
- tpeSub.shutdown();
- }
- private void logProgress(PublishSubject<Object> subj, String subjName, Long i) {
- //rx.schedulers.ExecutorScheduler$ExecutorSchedulerWorker 438296 14025472 1993321008
- //rx.schedulers.ExecutorScheduler$ExecutorAction 438295 10519080 1875223656
- //rx.internal.operators.OperatorSubscribeOn$1$1 438295 10519080 1864806376
- //rx.Observable 438784 7020544 1826583488
- //com.vertigo.service.test.su.QueueTest$$Lambda$6.7056937 438296 14025472 1819698968
- //int[] 458490 1806895632 1806895632
- //com.vertigo.service.test.su.QueueTest$Oi 438296 10519104 1805782968
- if (i % 1000 < 3) {
- System.out.println(i + ", " + subjName);
- }
- }
- int[] delays = new int[]{0, 2, 3};
- void randomDelay(Random random) {
- try {
- Thread.sleep(delays[random.nextInt(delays.length)]);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- private <T> Observable<Void> dispatch0(Oi msg,
- PublishSubject<Object> delSubj,
- PublishSubject<Object> updSubj,
- PublishSubject<Object> mrgSubj,
- Scheduler scheduler) {
- return Observable
- .<Void>create(voidSubscriber -> {
- try {
- switch (msg.op) {
- case Delete:
- delSubj.onNext(msg);
- break;
- case Merge:
- mrgSubj.onNext(msg);
- break;
- case Update:
- updSubj.onNext(msg);
- break;
- }
- } catch (Throwable e) {
- voidSubscriber.onError(e);
- }
- voidSubscriber.onCompleted();
- })
- .subscribeOn(scheduler);
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement