Advertisement
theultraman20

server.rs

Dec 7th, 2024
64
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Rust 4.20 KB | None | 0 0
  1.  
  2. use futures::SinkExt;
  3. use std::hash::Hash;
  4. use std::io::Read;
  5. use std::collections::HashSet;
  6. //use tokio_serde::{Serializer, Deserializer, Framed};
  7. use std::sync::{Arc, Mutex};
  8. use serde::*;
  9. use tokio_util::codec::{Decoder, Encoder, Framed};
  10. use futures::StreamExt;
  11. use bytes::{BytesMut, Buf, BufMut};
  12. use std::io;
  13. use coin::block::*;
  14. use coin::tx::*;
  15. use ClientFrame::*;
  16. use ServerFrame::*;
  17.  
  18. use std::fs;
  19. use serde::*;
  20. use tokio::{
  21.     net::{TcpListener, TcpStream}
  22. };
  23.  
  24. //enum ConnectionType {
  25. //    Spender,
  26. //    Miner,
  27. //}
  28.  
  29. //sent from client
  30. #[derive(Serialize, Deserialize)]
  31. enum ClientFrame {
  32.     //ConnectionType,
  33.     TxFrame(Tx),
  34.     Mined(Block),
  35.     GetBlockchain,
  36.     GetLastBlock,
  37.     GetNewTxpool,
  38.     GetVersion,
  39. }
  40.  
  41. #[derive(Serialize, Deserialize)]
  42. //sent from server
  43. enum ServerFrame {
  44.     //idk if we'll need these two
  45.     NewBlockMined,
  46.     //Read this from cargotoml
  47.     Version(&str),
  48.     //Client gets to decide which txs to mine
  49.     NewTxPool(Vec<Tx>),
  50. }
  51.  
  52. struct CoinCodec;
  53.  
  54. #[tokio::main]
  55. async fn main() {
  56.     println!("Starting server");
  57.     let serialized = fs::read("state.bin").expect("Error reading file");
  58.     let state: State = bincode::deserialize(&serialized).expect("Error deserializing");
  59.     assert!(state.verify_all_blocks().is_ok());
  60.  
  61.     //wish there was an arcmutex macro or something
  62.  
  63.     let state = Arc::new(Mutex::new(state));
  64.     //need hashmap since we're
  65.     let new_txs = Arc::new(Mutex::new(HashSet::<Tx>::new()));
  66.  
  67.     let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
  68.  
  69.     loop {
  70.         let (socket, addr) = listener.accept().await.unwrap();
  71.         println!("New connection from: {}", addr);
  72.  
  73.         let mut framed_stream = Framed::new(socket, CoinCodec);
  74.         let new_txs = new_txs.clone();
  75.         let state = state.clone();
  76.         tokio::spawn(async move {
  77.  
  78.             while let Some(Ok(frame)) = framed_stream.next().await {
  79.                 match frame {
  80.                     TxFrame(tx) => {
  81.                         let mut new_txs = new_txs.lock().unwrap();
  82.                         new_txs.insert(tx);
  83.                     },
  84.                     Mined(block) => {
  85.                         let mut state = state.lock().unwrap();
  86.                         if state.add_block_if_valid(block).is_ok() {
  87.                                 println!("New block accepted");
  88.                                 let mut new_txs = new_txs.lock().unwrap();
  89.                                 new_txs.retain(|item| !block.txs.contains(item));
  90.                         } else {
  91.                             println!("New block rejected");
  92.                         }
  93.                     },
  94.                     GetNewTxpool => {
  95.                         let new_txs = {
  96.                             let new_txs = new_txs.lock().unwrap();
  97.                             new_txs.iter().cloned().collect::<Vec<_>>()
  98.                         };
  99.                         framed_stream.send(ServerFrame::NewTxPool(new_txs)).await;
  100.                     },
  101.                     GetVersion => {
  102.                         framed_stream.send(ServerFrame::Version(env!("CARGO_PKG_VERSION"))).await;
  103.                     },
  104.  
  105.                     //do later
  106.                     _ => {
  107.                         unimplemented!();
  108.                     }
  109.                 }
  110.             }
  111.  
  112.         });
  113.         //match
  114.  
  115.     }
  116. }
  117.  
  118.  
  119. impl Decoder for CoinCodec {
  120.     type Item = ClientFrame;
  121.     type Error = io::Error;
  122.  
  123.     fn decode(&mut self, src: &mut BytesMut) ->
  124.         Result<Option<Self::Item>, Self::Error> {
  125.  
  126.         if src.is_empty() {
  127.             return Ok(None)
  128.         };
  129.  
  130.         match bincode::deserialize(&src[..]) {
  131.             Ok(frame) => Ok(Some(frame)),
  132.             Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
  133.         }
  134.     }
  135. }
  136.  
  137. impl Encoder<ServerFrame> for CoinCodec {
  138.     type Error = io::Error;
  139.  
  140.     fn encode(&mut self, item: ServerFrame, dst: &mut BytesMut) ->
  141.         Result<(), Self::Error> {
  142.  
  143.             let bytes = bincode::serialize(&item)
  144.                 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
  145.  
  146.             dst.extend_from_slice(&bytes);
  147.             Ok(())
  148.         }
  149. }
  150.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement