Advertisement
niedzwiedzw

http stream

Jun 27th, 2023
1,727
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Rust 4.19 KB | None | 0 0
  1. use eyre::{eyre, Result, WrapErr};
  2. use gstreamer as gst;
  3. use std::net::IpAddr;
  4. use tracing::{error, info, instrument, warn};
  5.  
  6. use gst::{prelude::*, State};
  7.  
  8. #[derive(Clone, Debug)]
  9. pub struct StreamConfig {
  10.     pub host: IpAddr,
  11.     pub port: u16,
  12. }
  13.  
  14. pub struct GStreamerInstance {
  15.     pub(crate) pipeline: gst::Pipeline,
  16. }
  17.  
  18. impl std::fmt::Debug for GStreamerInstance {
  19.     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  20.        f.debug_struct(std::any::type_name::<Self>())
  21.            .field("pipeline", &self.pipeline.name().to_string())
  22.            .finish_non_exhaustive()
  23.    }
  24. }
  25.  
  26. impl GStreamerInstance {
  27.    #[instrument(skip(init), err)]
  28.    pub fn new<F: Fn(&mut gst::Pipeline) -> Result<()>>(init: F) -> Result<Self> {
  29.        info!("creating a new GStreamer instance");
  30.        gst::init()
  31.            .wrap_err("initializing gstreamer")
  32.            .and_then(|_| {
  33.                let mut pipeline = gst::Pipeline::new(Some(std::any::type_name::<Self>()));
  34.                init(&mut pipeline)
  35.                    .wrap_err("pipeline init script")
  36.                    .map(|_| pipeline)
  37.            })
  38.            .map(|pipeline| Self { pipeline })
  39.    }
  40. }
  41.  
  42. impl Drop for GStreamerInstance {
  43.    fn drop(&mut self) {
  44.        warn!("cleaning up the GStreamer instance");
  45.        if let Err(message) = self.pipeline.set_state(State::Null) {
  46.            error!(?message, "cleanup of GStreamer state has failed...");
  47.        }
  48.  
  49.        unsafe { gst::deinit() }
  50.    }
  51. }
  52.  
  53. #[derive(Debug)]
  54. pub struct GStreamerStream {
  55.    pub instance: GStreamerInstance,
  56.    pub config: StreamConfig,
  57. }
  58.  
  59. impl GStreamerStream {
  60.    #[instrument(err)]
  61.    pub async fn start_streaming(config: StreamConfig) -> Result<Self> {
  62.        info!("starting a new stream");
  63.        let StreamConfig { host, port } = config.clone();
  64.        tokio::task::spawn_blocking(move || {
  65.            GStreamerInstance::new(|pipeline| -> Result<()> {
  66.                let src = gst::ElementFactory::make("videotestsrc")
  67.                    .build()
  68.                    .wrap_err("initializing src")?;
  69.                let encoder = gst::ElementFactory::make("x264enc")
  70.                    .build()
  71.                    .wrap_err("initializing encoder")?;
  72.                let muxer = gst::ElementFactory::make("mpegtsmux")
  73.                    .build()
  74.                    .wrap_err("initializing muxer")?;
  75.                let sink = gst::ElementFactory::make("tcpserversink")
  76.                    .build()
  77.                    .wrap_err("initializing sink")?;
  78.  
  79.                // Set properties for the TCP server sink
  80.                std::panic::catch_unwind(|| {
  81.                    sink.set_property("host", &host.to_string());
  82.                    sink.set_property("port", &(port as i32));
  83.                })
  84.                .map_err(|e| eyre!("{e:?}"))
  85.                .wrap_err("setting network properties")?;
  86.  
  87.                pipeline
  88.                    .add_many(&[&src, &encoder, &muxer, &sink])
  89.                    .wrap_err("adding elements to pipeline")?;
  90.  
  91.                gst::Element::link_many(&[&src, &encoder, &muxer, &sink])
  92.                    .wrap_err("linking elements")?;
  93.  
  94.                // Set the pipeline to the playing state
  95.                pipeline
  96.                    .set_state(State::Playing)
  97.                    .wrap_err("setting the state")?;
  98.  
  99.                // Start the main loop
  100.                let main_loop = glib::MainLoop::new(None, false);
  101.                info!("starting the stream");
  102.                main_loop.run();
  103.                Ok(())
  104.            })
  105.            .map(|instance| Self { instance, config })
  106.        })
  107.        .await
  108.        .wrap_err("thread_crashed")
  109.        .and_then(|r| r.wrap_err("initializing stream"))
  110.        .wrap_err("starting the stream")
  111.    }
  112. }
  113.  
  114. #[cfg(test)]
  115. mod tests {
  116.    use super::*;
  117.    use test_log::test;
  118.    #[test(tokio::test(flavor = "multi_thread"))]
  119.    async fn test_example_stream() -> Result<()> {
  120.        let _stream = GStreamerStream::start_streaming(StreamConfig {
  121.            host: [127, 0, 0, 1].into(),
  122.            port: 7744,
  123.        })
  124.        .await;
  125.        Ok(())
  126.    }
  127. }
  128.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement