Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- pub fn thread_fn_main(terminate: Arc<AtomicBool>,
- rx_main_to_ui: BroadcastFutReceiver<MsgMainToUi>,
- tx_ui_to_main: Sender<MsgUiToMain>
- ) {
- let ui = Arc::new(Mutex::new(UiState::default()));
- // this thread keeps track of the ui state by integrating the msgs
- let terminate_ui_state = terminate.clone();
- let rx_main_to_ui_ui_state = rx_main_to_ui.add_stream();
- let ui_ui_state = ui.clone();
- let thread_ui_state = thread::Builder::new().name("ui_state".to_string()).spawn(move || {
- for _ in FpsLoop::new(60) {
- if terminate_ui_state.load(Ordering::Relaxed) { break }
- while let Ok(msg) = rx_main_to_ui_ui_state.try_recv() {
- let mut ui = ui_ui_state.lock().unwrap();
- ui.apply_msg(msg);
- }
- }
- info!("terminating ui_state");
- }).unwrap();
- let port = ::std::env::var("WS_PORT").expect("WS_PORT must be set");
- let addr = format!("127.0.0.1:{}", port).to_socket_addrs().unwrap().next().unwrap();
- info!("listening on {}", addr);
- let mut core = Core::new().unwrap();
- let handle = core.handle();
- let server = TcpListener::bind(&addr, &handle).unwrap();
- let srv = server.incoming().for_each(|(stream, addr)| {
- let rx_main_to_ui = rx_main_to_ui.add_stream();
- let tx_ui_to_main = tx_ui_to_main.clone();
- let handle_inner = handle.clone();
- let ui_state_start = ui.lock().unwrap().serialize(); // current ui state for the new client
- accept_async(stream).and_then(move |ws_stream| {
- info!("new websocket connection: {}", addr);
- let (sink, stream) = ws_stream.split();
- let ws_reader = stream.for_each(move |msg| {
- match msg {
- Message::Text(msg) => match serde_json::from_str(&msg) {
- Ok(msg) => tx_ui_to_main.send(msg).unwrap(),
- Err(e) => error!("{}", e)
- },
- Message::Binary(_) => info!("got binary data")
- }
- Ok(())
- });
- use std::result::Result;
- let ws_writer = iter(ui_state_start.into_iter().map(|x| -> Result<MsgMainToUi, ()> { Ok(x) })).fold(sink, |mut sink, msg| {
- send_json(&mut sink, msg).map(|_| sink).map_err(|_| ())
- }).and_then(move |sink| {
- rx_main_to_ui.fold(sink, |mut sink, msg| {
- send_json(&mut sink, msg).unwrap();
- Ok(sink)
- })
- });
- let connection = ws_reader.map(|_| ()).map_err(|_| ())
- .select(ws_writer.map(|_| ()).map_err(|_| ()));
- handle_inner.spawn(connection.then(move |_| {
- info!("Connection {} closed.", addr);
- Ok(())
- }));
- Ok(())
- }).map_err(|e| {
- error!("Error during the websocket handshake occurred: {}", e);
- use std::io::{Error, ErrorKind};
- Error::new(ErrorKind::Other, e)
- })
- });
- core.run(srv).unwrap();
- thread_ui_state.join().unwrap();
- }
- fn send_json<S: Sink<SinkItem=Message, SinkError=Error>, T: Serialize>(sink: &mut S, payload: T) -> Result<AsyncSink<Message>> {
- let json = serde_json::to_string(&payload).unwrap();
- let msg = Message::Text(json);
- sink.start_send(msg)
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement