Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- pip install websockets requests --break-system
- ------------------------------------------------------------------------------------------
- ----------------------Vaughnsoft API------------------------------------------------------
- ------------------------------------------------------------------------------------------
- import websockets
- import asyncio
- import requests
- import base64
- import json
- import time
- # Base URL for the VaughnSoft API
- API_BASE_URL = "https://api.vaughnsoft.net/v1/"
- # Function to fetch stream data from REST API
- def fetch_stream_data(stream_name):
- endpoint = f"stream/vl/{stream_name}"
- response = requests.get(f"{API_BASE_URL}{endpoint}")
- if response.status_code == 200:
- return response.json()
- else:
- return {"error": "Failed to fetch stream data"}
- # Function to fetch server data (Ingest servers, sApi servers)
- def fetch_server_data():
- endpoints = {
- "ingest_servers": "ingest_servers",
- "sapi_servers": "sapi_servers"
- }
- server_data = {}
- for key, endpoint in endpoints.items():
- response = requests.get(f"{API_BASE_URL}{endpoint}")
- if response.status_code == 200:
- server_data[key] = response.json()
- else:
- server_data[key] = {"error": f"Failed to fetch {key} data"}
- return server_data
- # WebSocket connection to stream and handle server API responses
- async def handle_stream_data():
- url = "wss://sapi-ws-1x01.vaughnsoft.net/mvn"
- async with websockets.connect(url) as websocket:
- print("Connected to the WebSocket server")
- # Send an initial connection request
- await websocket.send("MVN LOAD3 #vl-mark guest guest\n\0")
- while True:
- try:
- response = await websocket.recv()
- process_server_api_response(response)
- # Poll for fresh stream data every 5 seconds (or another interval)
- await asyncio.sleep(5)
- await websocket.send("MVN STREAM3 #vl-mark\n\0")
- except websockets.exceptions.ConnectionClosed:
- print("Lost connection to WebSocket. Reconnecting...")
- await asyncio.sleep(2)
- await handle_stream_data()
- # Process the response from the server API
- def process_server_api_response(response):
- response = response.replace("\n", "").replace("\0", "")
- if response.startswith("ACK3 "):
- print("Acknowledgment received from server")
- elif response.startswith("STREAM3 "):
- data = response.split(" ")[1].split(";")
- # Parsing the response
- stream_name = data[0]
- live_viewers = data[2]
- is_live = "Yes" if data[6] == "1" else "No"
- stream_status = "Yes" if data[10] == "2" else "No"
- # Displaying stream information
- print("---")
- print(f"Stream Name: {stream_name}")
- print(f"Live Viewers: {live_viewers}")
- print(f"Is {stream_name} Live?: {is_live}")
- print(f"Is {stream_name} Banned?: {stream_status}")
- # WebSocket connection for follower notifications
- async def handle_follower_notifications():
- url = "wss://chat-ws-1x01.vaughnsoft.net/mvn"
- async with websockets.connect(url) as websocket:
- print("Connected to Follower WebSocket")
- # Send authentication request
- await websocket.send("MVN AUTH guest guest")
- while True:
- try:
- response = await websocket.recv()
- process_follower_notification(response)
- except websockets.exceptions.ConnectionClosed:
- print("Lost connection to Follower WebSocket. Reconnecting...")
- await asyncio.sleep(2)
- await handle_follower_notifications()
- # Process follower notification message
- def process_follower_notification(response):
- response = response.replace("\n", "").replace("\0", "")
- if response.startswith("MVN FOLLOW "):
- parts = response.split(" ")
- if len(parts) > 2:
- follower_username = parts[2]
- print(f"{follower_username} Followed!")
- elif response == "PING":
- print("Ping received, sending Pong...")
- return "PONG"
- # Main function to orchestrate WebSocket connections
- async def main():
- # Fetch initial server data (Ingest servers, sApi servers)
- server_data = fetch_server_data()
- print("Fetched Server Data: ", json.dumps(server_data, indent=4))
- # Example of fetching a specific stream data
- stream_name = "mark"
- stream_data = fetch_stream_data(stream_name)
- print("Stream Data for 'mark': ", json.dumps(stream_data, indent=4))
- # Run WebSocket handlers concurrently
- await asyncio.gather(
- handle_stream_data(), # Stream data (sApi)
- handle_follower_notifications() # Follower notifications
- )
- if __name__ == "__main__":
- asyncio.run(main())
- ------------------------------------------------------------------------------------------
- ------------------------------------------------------------------------------------------
- ------------------------------------------------------------------------------------------
- Explanation
- REST API Calls
- fetch_stream_data retrieves stream data (like viewers, status, etc.) for a given stream.
- fetch_server_data fetches server details like ingest and sApi servers from the VaughnSoft API.
- WebSocket Connections
- handle_stream_data establishes a WebSocket connection to the sApi server, sends commands, and processes responses. It retrieves and prints live stream data (viewers, status).
- handle_follower_notifications connects to the chat WebSocket to listen for new followers and outputs their username when a follow event occurs.
- Reconnection Logic
- If the WebSocket connection is closed, the script will attempt to reconnect after a brief delay.
- Polling and Real-time Updates
- The script continuously polls the server for fresh data (via WebSocket) at intervals, ensuring the data is always up-to-date.
- Base64 Encoded Messages
- Base64-encoded strings like status_msg and about_msg can be decoded to provide more human-readable information. You can add logic to decode these messages if needed.
- Concurrency with Asyncio
- asyncio is used to handle WebSocket connections concurrently, so the stream data and follower notifications can be processed in real-time.
- How It Works
- Stream Data: The WebSocket connection listens for stream data and displays important information such as viewers, status, and stream health.
- Follower Alerts: The script listens to the WebSocket for follower notifications, printing out the usernames of new followers.
- Polling: Every 5 seconds, the script requests fresh stream data and processes responses to keep information up to date.
- Future Enhancements
- Error Handling: Add more robust error handling for edge cases.
- Decoding Base64: Decode any Base64-encoded messages (like status and about messages) for better readability.
- Support for Other Streams: Extend the functionality to handle additional stream types.
- ------------------------------------------------------------------------------------------
- ------------------------------------------------------------------------------------------
- ------------------------------------------------------------------------------------------
- For that guy that made jtv big mad that one time
Add Comment
Please, Sign In to add comment