Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- use eyre::{eyre, Result, WrapErr};
- use gstreamer as gst;
- use std::net::IpAddr;
- use tracing::{error, info, instrument, warn};
- use gst::{prelude::*, State};
- #[derive(Clone, Debug)]
- pub struct StreamConfig {
- pub host: IpAddr,
- pub port: u16,
- }
- pub struct GStreamerInstance {
- pub(crate) pipeline: gst::Pipeline,
- }
- impl std::fmt::Debug for GStreamerInstance {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct(std::any::type_name::<Self>())
- .field("pipeline", &self.pipeline.name().to_string())
- .finish_non_exhaustive()
- }
- }
- impl GStreamerInstance {
- #[instrument(skip(init), err)]
- pub fn new<F: Fn(&mut gst::Pipeline) -> Result<()>>(init: F) -> Result<Self> {
- info!("creating a new GStreamer instance");
- gst::init()
- .wrap_err("initializing gstreamer")
- .and_then(|_| {
- let mut pipeline = gst::Pipeline::new(Some(std::any::type_name::<Self>()));
- init(&mut pipeline)
- .wrap_err("pipeline init script")
- .map(|_| pipeline)
- })
- .map(|pipeline| Self { pipeline })
- }
- }
- impl Drop for GStreamerInstance {
- fn drop(&mut self) {
- warn!("cleaning up the GStreamer instance");
- if let Err(message) = self.pipeline.set_state(State::Null) {
- error!(?message, "cleanup of GStreamer state has failed...");
- }
- unsafe { gst::deinit() }
- }
- }
- #[derive(Debug)]
- pub struct GStreamerStream {
- pub instance: GStreamerInstance,
- pub config: StreamConfig,
- }
- impl GStreamerStream {
- #[instrument(err)]
- pub async fn start_streaming(config: StreamConfig) -> Result<Self> {
- info!("starting a new stream");
- let StreamConfig { host, port } = config.clone();
- tokio::task::spawn_blocking(move || {
- GStreamerInstance::new(|pipeline| -> Result<()> {
- let src = gst::ElementFactory::make("videotestsrc")
- .build()
- .wrap_err("initializing src")?;
- let encoder = gst::ElementFactory::make("x264enc")
- .build()
- .wrap_err("initializing encoder")?;
- let muxer = gst::ElementFactory::make("mpegtsmux")
- .build()
- .wrap_err("initializing muxer")?;
- let sink = gst::ElementFactory::make("tcpserversink")
- .build()
- .wrap_err("initializing sink")?;
- // Set properties for the TCP server sink
- std::panic::catch_unwind(|| {
- sink.set_property("host", &host.to_string());
- sink.set_property("port", &(port as i32));
- })
- .map_err(|e| eyre!("{e:?}"))
- .wrap_err("setting network properties")?;
- pipeline
- .add_many(&[&src, &encoder, &muxer, &sink])
- .wrap_err("adding elements to pipeline")?;
- gst::Element::link_many(&[&src, &encoder, &muxer, &sink])
- .wrap_err("linking elements")?;
- // Set the pipeline to the playing state
- pipeline
- .set_state(State::Playing)
- .wrap_err("setting the state")?;
- // Start the main loop
- let main_loop = glib::MainLoop::new(None, false);
- info!("starting the stream");
- main_loop.run();
- Ok(())
- })
- .map(|instance| Self { instance, config })
- })
- .await
- .wrap_err("thread_crashed")
- .and_then(|r| r.wrap_err("initializing stream"))
- .wrap_err("starting the stream")
- }
- }
- #[cfg(test)]
- mod tests {
- use super::*;
- use test_log::test;
- #[test(tokio::test(flavor = "multi_thread"))]
- async fn test_example_stream() -> Result<()> {
- let _stream = GStreamerStream::start_streaming(StreamConfig {
- host: [127, 0, 0, 1].into(),
- port: 7744,
- })
- .await;
- Ok(())
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement