Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- * To change this license header, choose License Headers in Project Properties.
- * To change this template file, choose Tools | Templates
- * and open the template in the editor.
- */
- package com.spec.service;
- import java.util.concurrent.TimeUnit;
- import org.apache.kafka.common.serialization.Serdes;
- import org.apache.kafka.streams.KafkaStreams;
- import org.apache.kafka.streams.StreamsBuilder;
- import org.apache.kafka.streams.kstream.Consumed;
- import org.apache.kafka.streams.kstream.KStream;
- import org.apache.kafka.streams.kstream.Produced;
- public class StreamService {
- // метод запускает процесс для потоковой обработки данных из топика getInputTopic()
- // результат обработки данных запишем в топик getOutputTopic()
- public void startProcess(){
- //
- System.out.println("StreamService.startProcess>>");
- // создаем объект типа StreamsBuilder
- StreamsBuilder builder = new StreamsBuilder();
- // описываем источник для потока данных
- KStream<String, String> firstStream = builder.stream(SreamsKafkaConfig.getInputTopic(),
- Consumed.with(Serdes.String(), Serdes.String()));
- //
- //SreamsKafkaConfig.getOutputTopic()
- // с помощью mapValues выполняем преобразование для каждого значения из потока
- KStream<String, String> trimmedStream = firstStream.mapValues(v -> v.trim());
- // записываем данные в результироущий поток
- trimmedStream.to(SreamsKafkaConfig.getOutputTopic(), Produced.with(Serdes.String(), Serdes.String()));
- // создаем объект типа KafkaStreams
- try(KafkaStreams streams = new KafkaStreams(builder.build(), SreamsKafkaConfig.getConfig());){
- // запускаем процесс для потоковой обработки записей
- streams.start();
- TimeUnit.SECONDS.sleep(20);
- }catch(Exception exc){
- exc.printStackTrace();
- }
- System.out.println("<<");
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement