Advertisement
cooperlees

Untitled

Jul 10th, 2019
438
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.10 KB | None | 0 0
  1. #!/usr/bin/env python3
  2. """ ExaBGP Pipes handling only valid JSON responses """
  3.  
  4. import asyncio
  5. import logging
  6. from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
  7. from json import loads
  8. from os import R_OK, W_OK, access
  9. from pathlib import Path
  10. from typing import (
  11.     Awaitable,
  12.     Callable,
  13.     Dict,
  14.     List,
  15.     NamedTuple,
  16.     Optional,
  17.     Sequence,
  18.     Union,
  19. )
  20.  
  21.  
  22. LOG = logging.getLogger(__name__)
  23.  
  24.  
  25. class PipePaths(NamedTuple):
  26.     in_pipe: Path
  27.     out_pipe: Path
  28.  
  29.  
  30. class ExaBGPPipes:
  31.     """ Class to control reading and writing to ExaBGP FIFO Named Pipes
  32.        - Caller to maintain synconization with self.alock """
  33.  
  34.     def __init__(
  35.         self,
  36.         in_pipe: Path,
  37.         out_pipe: Path,
  38.         executor: Optional[ProcessPoolExecutor, ThreadPoolExecutor] = None,
  39.     ) -> None:
  40.         self.alock = asyncio.Lock()
  41.         self.executor = executor
  42.         self.loop = asyncio.get_event_loop()
  43.         self.pipe_paths = PipePaths(in_pipe, out_pipe)
  44.  
  45.     async def check_pipes(self) -> bool:
  46.         """ Check that we can stat each pipe """
  47.         access_results = await asyncio.gather(
  48.             self.loop.run_in_executor(
  49.                 self.executor, access, self.pipe_paths.in_pipe, W_OK
  50.             ),
  51.             self.loop.run_in_executor(
  52.                 self.executor, access, self.pipe_paths.out_pipe, R_OK
  53.             ),
  54.         )
  55.         for idx, access_success in enumerate(access_results):
  56.             if not access_success:
  57.                 LOG.error(f"{self.pipe_paths[idx]} does not have required access")
  58.  
  59.         return True
  60.  
  61.     def _read(self, json_deserializer: Callable) -> Dict:
  62.         with self.pipe_paths.out_pipe.open("rb") as opfp:
  63.             pipe_bytes = opfp.read()
  64.  
  65.         return json_deserializer(pipe_bytes.decode("utf-8"))
  66.  
  67.     async def read(
  68.         self, *, json_deserializer: Callable = loads, timeout: float = 5.0
  69.     ) -> Dict:
  70.         """ Read API response and deserialize it
  71.            - Wrap blocking read in an executor so it's non blocking
  72.              and has a customizable timeout
  73.            - Can also import a faster JSON serializer
  74.  
  75.            Throws:
  76.                - IOError
  77.                - asyncio.TimeoutError,
  78.                - json.JSONDecodeError (if using json module loads) """
  79.  
  80.         return asyncio.wait_for(
  81.             self.loop.run_in_executor(self.executor, self._read, json_deserializer),
  82.             timeout=timeout,
  83.         )
  84.  
  85.     def _write(self, msg: bytes) -> int:
  86.         with self.pipe_paths.in_pipe.open("wb") as ipfp:
  87.             return ipfp.write(msg)
  88.  
  89.     async def write(self, msg: Union[bytes, str], *, timeout: float = 5.0) -> int:
  90.         """ Write str to API FIFO
  91.            - Wrap blocking write in an executor so it's non blocking
  92.              and has a customizable timeout
  93.  
  94.            Throws: IOError, asyncio.TimeoutError """
  95.  
  96.         if isinstance(msg, bytes):
  97.             msg = msg.encode("utf-8")
  98.  
  99.         return asyncio.wait_for(
  100.             self.loop.run_in_executor(self.executor, self._write, msg), timeout=timeout
  101.         )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement