Advertisement
sergAccount

Untitled

Aug 8th, 2020
1,045
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 2.59 KB | None | 0 0
  1. /*
  2.  * To change this license header, choose License Headers in Project Properties.
  3.  * To change this template file, choose Tools | Templates
  4.  * and open the template in the editor.
  5.  */
  6. package com.spec.service;
  7.  
  8. import java.util.concurrent.TimeUnit;
  9. import org.apache.kafka.common.serialization.Serdes;
  10. import org.apache.kafka.streams.KafkaStreams;
  11. import org.apache.kafka.streams.StreamsBuilder;
  12. import org.apache.kafka.streams.kstream.Consumed;
  13. import org.apache.kafka.streams.kstream.KStream;
  14. import org.apache.kafka.streams.kstream.Produced;
  15.  
  16. /**
  17.  *
  18.  * @author Administrator
  19.  */
  20. public class StreamService3 {
  21.     // метод запускает процесс для потоковой обработки данных из топика getInputTopic()
  22.     // результат обработки данных запишем в топик getOutputTopic()
  23.     public void startProcess() {
  24.         //        
  25.         System.out.println("StreamService3.startProcess>>");
  26.         // создаем объект типа StreamsBuilder
  27.         StreamsBuilder builder = new StreamsBuilder();
  28.         // описываем источник для потока данных
  29.         KStream<String, String> firstStream = builder.stream(SreamsKafkaConfig.getInputTopic(),
  30.                 Consumed.with(Serdes.String(), Serdes.String()));
  31.         //
  32.         //SreamsKafkaConfig.getOutputTopic()
  33.         // с помощью mapValues выполняем преобразование для каждого значения из потока
  34.         KStream<String, String> trimmedStream = firstStream.mapValues(v -> v.trim());
  35.         // с помощью метода filter фильтруем значения по условию: v.length()>3
  36.         KStream<String, String> byLenStream   =  trimmedStream.filter((String k, String v) -> v.length()>3);
  37.         //trimmedStream.filterNot((String k, String v) -> v.length()>3);
  38.        
  39.         // записываем данные в результироущий поток
  40.         byLenStream.to(SreamsKafkaConfig.getOutputTopic(), Produced.with(Serdes.String(), Serdes.String()));
  41.         // создаем объект типа KafkaStreams
  42.         try ( KafkaStreams streams = new KafkaStreams(builder.build(), SreamsKafkaConfig.getConfig());) {
  43.             // запускаем процесс для потоковой обработки записей
  44.             streams.start();
  45.             TimeUnit.SECONDS.sleep(40);
  46.         } catch (Exception exc) {
  47.             exc.printStackTrace();
  48.         }
  49.         System.out.println("<<");
  50.     }
  51. }
  52.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement