Advertisement
sergAccount

Untitled

Aug 8th, 2020
829
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 2.28 KB | None | 0 0
  1. /*
  2.  * To change this license header, choose License Headers in Project Properties.
  3.  * To change this template file, choose Tools | Templates
  4.  * and open the template in the editor.
  5.  */
  6. package com.spec.service;
  7.  
  8. import java.util.concurrent.TimeUnit;
  9. import org.apache.kafka.common.serialization.Serdes;
  10. import org.apache.kafka.streams.KafkaStreams;
  11. import org.apache.kafka.streams.StreamsBuilder;
  12. import org.apache.kafka.streams.kstream.Consumed;
  13. import org.apache.kafka.streams.kstream.KStream;
  14. import org.apache.kafka.streams.kstream.Produced;
  15.  
  16. public class StreamService {    
  17.     // метод запускает процесс для потоковой обработки данных из топика getInputTopic()
  18.     // результат обработки данных запишем в топик getOutputTopic()
  19.     public void startProcess(){
  20.         //
  21.         System.out.println("StreamService.startProcess>>");
  22.         // создаем объект типа StreamsBuilder
  23.         StreamsBuilder builder = new StreamsBuilder();
  24.         // описываем источник для потока данных
  25.         KStream<String, String> firstStream = builder.stream(SreamsKafkaConfig.getInputTopic(),
  26.                                                              Consumed.with(Serdes.String(), Serdes.String()));
  27.         //
  28.         //SreamsKafkaConfig.getOutputTopic()
  29.         // с помощью mapValues выполняем преобразование для каждого значения из потока
  30.         KStream<String, String> trimmedStream = firstStream.mapValues(v -> v.trim());
  31.         // записываем данные в результироущий поток
  32.         trimmedStream.to(SreamsKafkaConfig.getOutputTopic(), Produced.with(Serdes.String(), Serdes.String()));
  33.         // создаем объект типа KafkaStreams
  34.         try(KafkaStreams streams = new KafkaStreams(builder.build(), SreamsKafkaConfig.getConfig());){
  35.             // запускаем процесс для потоковой обработки записей
  36.             streams.start();
  37.             TimeUnit.SECONDS.sleep(20);
  38.         }catch(Exception exc){
  39.             exc.printStackTrace();
  40.         }
  41.         System.out.println("<<");        
  42.     }
  43. }
  44.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement