Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- """ ExaBGP Pipes handling only valid JSON responses """
- import asyncio
- import logging
- from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
- from json import loads
- from os import R_OK, W_OK, access
- from pathlib import Path
- from typing import (
- Awaitable,
- Callable,
- Dict,
- List,
- NamedTuple,
- Optional,
- Sequence,
- Union,
- )
- LOG = logging.getLogger(__name__)
- class PipePaths(NamedTuple):
- in_pipe: Path
- out_pipe: Path
- class ExaBGPPipes:
- """ Class to control reading and writing to ExaBGP FIFO Named Pipes
- - Caller to maintain synconization with self.alock """
- def __init__(
- self,
- in_pipe: Path,
- out_pipe: Path,
- executor: Optional[ProcessPoolExecutor, ThreadPoolExecutor] = None,
- ) -> None:
- self.alock = asyncio.Lock()
- self.executor = executor
- self.loop = asyncio.get_event_loop()
- self.pipe_paths = PipePaths(in_pipe, out_pipe)
- async def check_pipes(self) -> bool:
- """ Check that we can stat each pipe """
- access_results = await asyncio.gather(
- self.loop.run_in_executor(
- self.executor, access, self.pipe_paths.in_pipe, W_OK
- ),
- self.loop.run_in_executor(
- self.executor, access, self.pipe_paths.out_pipe, R_OK
- ),
- )
- for idx, access_success in enumerate(access_results):
- if not access_success:
- LOG.error(f"{self.pipe_paths[idx]} does not have required access")
- return True
- def _read(self, json_deserializer: Callable) -> Dict:
- with self.pipe_paths.out_pipe.open("rb") as opfp:
- pipe_bytes = opfp.read()
- return json_deserializer(pipe_bytes.decode("utf-8"))
- async def read(
- self, *, json_deserializer: Callable = loads, timeout: float = 5.0
- ) -> Dict:
- """ Read API response and deserialize it
- - Wrap blocking read in an executor so it's non blocking
- and has a customizable timeout
- - Can also import a faster JSON serializer
- Throws:
- - IOError
- - asyncio.TimeoutError,
- - json.JSONDecodeError (if using json module loads) """
- return asyncio.wait_for(
- self.loop.run_in_executor(self.executor, self._read, json_deserializer),
- timeout=timeout,
- )
- def _write(self, msg: bytes) -> int:
- with self.pipe_paths.in_pipe.open("wb") as ipfp:
- return ipfp.write(msg)
- async def write(self, msg: Union[bytes, str], *, timeout: float = 5.0) -> int:
- """ Write str to API FIFO
- - Wrap blocking write in an executor so it's non blocking
- and has a customizable timeout
- Throws: IOError, asyncio.TimeoutError """
- if isinstance(msg, bytes):
- msg = msg.encode("utf-8")
- return asyncio.wait_for(
- self.loop.run_in_executor(self.executor, self._write, msg), timeout=timeout
- )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement