Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- * To change this license header, choose License Headers in Project Properties.
- * To change this template file, choose Tools | Templates
- * and open the template in the editor.
- */
- package com.spec.service;
- import java.util.concurrent.TimeUnit;
- import org.apache.kafka.common.serialization.Serdes;
- import org.apache.kafka.streams.KafkaStreams;
- import org.apache.kafka.streams.StreamsBuilder;
- import org.apache.kafka.streams.kstream.Consumed;
- import org.apache.kafka.streams.kstream.KStream;
- import org.apache.kafka.streams.kstream.Produced;
- /**
- *
- * @author Administrator
- */
- public class StreamService3 {
- // метод запускает процесс для потоковой обработки данных из топика getInputTopic()
- // результат обработки данных запишем в топик getOutputTopic()
- public void startProcess() {
- //
- System.out.println("StreamService3.startProcess>>");
- // создаем объект типа StreamsBuilder
- StreamsBuilder builder = new StreamsBuilder();
- // описываем источник для потока данных
- KStream<String, String> firstStream = builder.stream(SreamsKafkaConfig.getInputTopic(),
- Consumed.with(Serdes.String(), Serdes.String()));
- //
- //SreamsKafkaConfig.getOutputTopic()
- // с помощью mapValues выполняем преобразование для каждого значения из потока
- KStream<String, String> trimmedStream = firstStream.mapValues(v -> v.trim());
- // с помощью метода filter фильтруем значения по условию: v.length()>3
- KStream<String, String> byLenStream = trimmedStream.filter((String k, String v) -> v.length()>3);
- //trimmedStream.filterNot((String k, String v) -> v.length()>3);
- // записываем данные в результироущий поток
- byLenStream.to(SreamsKafkaConfig.getOutputTopic(), Produced.with(Serdes.String(), Serdes.String()));
- // создаем объект типа KafkaStreams
- try ( KafkaStreams streams = new KafkaStreams(builder.build(), SreamsKafkaConfig.getConfig());) {
- // запускаем процесс для потоковой обработки записей
- streams.start();
- TimeUnit.SECONDS.sleep(40);
- } catch (Exception exc) {
- exc.printStackTrace();
- }
- System.out.println("<<");
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement