Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import com.google.gson.Gson;
- import com.siemens.domain.remote.IotEntity;
- import com.siemens.repository.IotEntityRepository;
- import com.siemens.websocket.WebSocketService;
- import com.siemens.websocket.model.CameraResultResponse;
- import com.siemens.websocket.model.RecognitionRecord;
- import lombok.Getter;
- import lombok.RequiredArgsConstructor;
- import lombok.Setter;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Service;
- import org.springframework.web.socket.*;
- import org.springframework.web.socket.client.WebSocketClient;
- import org.springframework.web.socket.client.standard.StandardWebSocketClient;
- import org.springframework.web.socket.handler.TextWebSocketHandler;
- import javax.annotation.PreDestroy;
- import java.net.URI;
- import java.time.Instant;
- import java.util.List;
- import java.util.Objects;
- import java.util.Optional;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.stream.Collectors;
- @Slf4j
- @Service
- @RequiredArgsConstructor
- public class WebSocketServiceImpl implements WebSocketService {
- private final IotEntityRepository entityRepository;
- private ExecutorService executors;
- private List<WebSocketTask> tasks;
- @Override
- public void connectToCameras() {
- List<IotEntity> allCameras = entityRepository.getAllCameras();
- log.info("ALL_CAMERAS: {} : {}", allCameras.size(), allCameras);
- tasks = allCameras.stream()
- .map(WebSocketTask::new)
- .collect(Collectors.toList());
- executors = Executors.newFixedThreadPool(allCameras.size());
- for (WebSocketTask task : tasks) {
- executors.execute(task);
- }
- }
- @Override
- @PreDestroy
- public void shutdownQuietly() {
- log.info("shutdown called");
- if (tasks != null && !tasks.isEmpty()) {
- tasks.forEach(t -> t.setRunning(false));
- }
- if (executors != null) {
- executors.shutdownNow();
- }
- }
- }
- @Slf4j
- class WebSocketTask implements Runnable {
- private final WebSocketClient webSocketClient = new StandardWebSocketClient();
- private final WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
- private final CameraWebSocketHandler handler;
- @Value(value = "camera.call-frequency")
- private int callFrequency;
- @Setter
- private volatile boolean running = true;
- private final String url;
- @Getter
- private final IotEntity camera;
- @Getter
- @Setter
- private Instant maxTimeStamp;
- public WebSocketTask(IotEntity camera) {
- this.camera = camera;
- handler = new CameraWebSocketHandler(this);
- maxTimeStamp = Instant.now();
- // url = String.format("https://%s/vision/resultdatabase/websocket.cgi", camera.getIpAddress1());
- url = "ws://echo.websocket.org";
- Thread.currentThread().setName(String.valueOf(camera.getId()));
- }
- @Override
- public void run() {
- while (true) {
- try (WebSocketSession session = webSocketClient
- .doHandshake(handler, headers, URI.create(url)).get()) {
- // inner while loop -> check that you aren't stopped
- while (running) {
- String sqlQuery = String.format("select * from results where timestamp > %s;", maxTimeStamp);
- TextMessage message = new TextMessage(sqlQuery);
- session.sendMessage(message);
- log.info("SQL_QUERY: {} : {}", Thread.currentThread().getName(), message.getPayload());
- Thread.sleep(callFrequency);
- }
- if (!running) {
- log.debug("stop execution for: {}", Thread.currentThread().getName());
- return;
- }
- } catch (Exception e) { // end of external while loop -> reconnect again with "exponential back off"
- log.error("Exception while accessing websockets", e);
- }
- }
- }
- }
- @Slf4j
- @RequiredArgsConstructor
- class CameraWebSocketHandler extends TextWebSocketHandler {
- private final WebSocketTask task;
- private Gson gson = new Gson();
- @Override
- public void afterConnectionEstablished(WebSocketSession session) {
- log.info("connection established {}", session.getId());
- }
- @Override
- public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) {
- /*CameraResultResponse json = gson.fromJson((String) message.getPayload(), CameraResultResponse.class);
- log.info("response: {}", json);
- Optional<Instant> maxDT = json.getResult().stream()
- .filter(Objects::nonNull)
- .map(RecognitionRecord::getTimestamp)
- .max(Instant::compareTo);
- if (maxDT.isPresent() && maxDT.get().isAfter(task.getMaxTimeStamp())) {
- task.setMaxTimeStamp(maxDT.get());
- }*/
- log.info("response message: {}; thread: [{}]", message.getPayload(), Thread.currentThread().getName());
- }
- @Override
- public void handleTransportError(WebSocketSession session, Throwable exception) {
- log.info("transport error");
- }
- @Override
- public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
- log.info("connection closed");
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement