Advertisement
vitareinforce

control.py

Nov 14th, 2024 (edited)
78
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.34 KB | None | 0 0
  1. import pika
  2. import keyboard
  3. import argparse
  4. import threading
  5. import time
  6. import sys
  7.  
  8. # Argument parser setup
  9. parser = argparse.ArgumentParser(description='Megabot Control')
  10. parser.add_argument('rmq_server', type=str, help='RabbitMQ Server Host IP Address / Domain')
  11. parser.add_argument('queue_name', type=str, help='Queue Name')
  12. parser.add_argument('queue_name_camera', type=str, help='Queue Name Camera')
  13. args = parser.parse_args()
  14.  
  15. camera_data = None
  16. exit_program = False  # Flag to indicate when to exit
  17.  
  18. # RabbitMQ connection parameters
  19. def create_connection():
  20.     return pika.BlockingConnection(
  21.         pika.ConnectionParameters(
  22.             host=args.rmq_server,
  23.             port=5672,
  24.             virtual_host='/megabot',
  25.             credentials=pika.PlainCredentials('megabot', '12345678'),
  26.             heartbeat=600
  27.         )
  28.     )
  29.  
  30. # Function to publish message to RabbitMQ
  31. def publish_message(channel, message):
  32.     try:
  33.         channel.basic_publish(exchange='amq.topic', routing_key=args.queue_name, body=message)
  34.         print("Message published:", message)
  35.     except pika.exceptions.ChannelClosed:
  36.         print("Channel is closed, trying to reconnect...")
  37.         reconnect_channel()
  38.  
  39. # Global variables
  40. connection = create_connection()
  41. channel = connection.channel()
  42. channel.queue_declare(queue=args.queue_name, durable=True, auto_delete=True)
  43. channel.queue_bind(exchange='amq.topic', queue=args.queue_name, routing_key=args.queue_name)
  44.  
  45. # Bind the camera queue
  46. channel.queue_declare(queue=args.queue_name_camera, durable=True, auto_delete=True)
  47. channel.queue_bind(exchange='amq.direct', queue=args.queue_name_camera, routing_key=args.queue_name_camera)
  48.  
  49. # Gear settings
  50. gear_settings = {
  51.     1: {'max_pwm_straight': 180, 'max_pwm_turn': 180, 'max_pwm_backward': 180, 'max_pwm_braking': 90},
  52.     2: {'max_pwm_straight': 215, 'max_pwm_turn': 200, 'max_pwm_backward': 215, 'max_pwm_braking': 125},
  53.     3: {'max_pwm_straight': 230, 'max_pwm_turn': 215, 'max_pwm_backward': 230, 'max_pwm_braking': 150}
  54. }
  55.  
  56. # Motor Direction
  57. motor_direction = "e"
  58. current_gear = 1
  59.  
  60. # Function to handle incoming messages from RabbitMQ
  61. def callback(ch, method, properties, body):
  62.     global camera_data
  63.     camera_data = body
  64.  
  65. # Function to start consuming messages
  66. def start_consuming():
  67.     channel.basic_consume(queue=args.queue_name_camera, on_message_callback=callback, auto_ack=True)
  68.     print("Waiting for messages. To exit press F4")
  69.     try:
  70.         channel.start_consuming()
  71.     except pika.exceptions.AMQPConnectionError:
  72.         print("Connection lost, attempting to reconnect...")
  73.         reconnect_channel()
  74.  
  75. # Function to handle reconnection
  76. def reconnect_channel():
  77.     global connection, channel
  78.     while True:
  79.         try:
  80.             connection = create_connection()
  81.             channel = connection.channel()
  82.             channel.queue_declare(queue=args.queue_name, durable=True, auto_delete=True)
  83.             channel.queue_bind(exchange='amq.topic', queue=args.queue_name, routing_key=args.queue_name)
  84.             print("Reconnected to RabbitMQ")
  85.             break
  86.         except Exception as e:
  87.             print(f"Reconnection failed: {e}")
  88.             time.sleep(5)  # Wait before trying to reconnect
  89.  
  90. # Function to handle keyboard input
  91. def handle_keyboard_input(event):
  92.     global camera_data
  93.     global current_gear, motor_direction
  94.     max_pwm_straight = gear_settings[current_gear]['max_pwm_straight']
  95.     max_pwm_turn = gear_settings[current_gear]['max_pwm_turn']
  96.     max_pwm_backward = gear_settings[current_gear]['max_pwm_backward']
  97.     max_pwm_braking = gear_settings[current_gear]['max_pwm_braking']
  98.  
  99.     print(camera_data)
  100.    
  101.     message = b''  # Initialize message as bytes
  102.    
  103.     if event.event_type == keyboard.KEY_DOWN:
  104.         if event.name == 'w':
  105.             if motor_direction == "e":
  106.                 message = bytes([max_pwm_straight, max_pwm_straight, 101, 101])
  107.                 publish_message(channel, message)
  108.                 print(f"Forward using Power L = {max_pwm_straight} R = {max_pwm_straight}")
  109.             elif motor_direction == "r":
  110.                 message = bytes([0, 0, 114, 114])
  111.                 publish_message(channel, message)
  112.                 print(f"Forward using Power L = 0 R = 0")
  113.         elif event.name == 's':
  114.             if motor_direction == "e":
  115.                 message = bytes([0, 0, 101, 101])
  116.                 publish_message(channel, message)
  117.                 print(f"Backward using Power L = 0 R = 0")
  118.             elif motor_direction == "r":
  119.                 message = bytes([max_pwm_backward, max_pwm_backward, 114, 114])
  120.                 publish_message(channel, message)
  121.                 print(f"Backward using Power L = {max_pwm_backward} R = {max_pwm_backward}")
  122.         elif event.name == 'a':
  123.             if motor_direction == "e":
  124.                 message = bytes([0, max_pwm_turn, 101, 101])
  125.                 publish_message(channel, message)
  126.                 print(f"Left using Power L = 0 R = {max_pwm_turn}")
  127.             elif motor_direction == "r":
  128.                 message = bytes([max_pwm_backward, 0, 114, 114])
  129.                 publish_message(channel, message)
  130.                 print(f"Left using Power L = {max_pwm_backward} R = 0")
  131.         elif event.name == 'd':
  132.             if motor_direction == "e":
  133.                 message = bytes([max_pwm_turn, 0, 101, 101])
  134.                 publish_message(channel, message)
  135.                 print(f"Right using Power L = {max_pwm_turn} R = 0")
  136.             elif motor_direction == "r":
  137.                 message = bytes([0, max_pwm_backward, 114, 114])
  138.                 publish_message(channel, message)
  139.                 print(f"Right using Power L = 0 R = {max_pwm_backward}")
  140.         elif event.name == 'shift':
  141.             if motor_direction == "e":
  142.                 message = bytes([max_pwm_braking, max_pwm_braking, 114, 114])
  143.                 publish_message(channel, message)
  144.                 print(f"Hard Braking Using Backward Direction with power {max_pwm_braking}")
  145.             elif motor_direction == "r":
  146.                 message = bytes([max_pwm_braking, max_pwm_braking, 101, 101])
  147.                 publish_message(channel, message)
  148.                 print(f"Hard Braking Using Forward Direction with power {max_pwm_braking}")
  149.         elif event.name in ['1', '2', '3']:
  150.             current_gear = int(event.name)
  151.             print(f"Gear changed to: {current_gear}")
  152.             print(f"Gear Settings {current_gear} = {gear_settings[current_gear]}")
  153.         elif event.name in ['e', 'r']:
  154.             motor_direction = event.name
  155.             print(f"Motor direction {event.name}")
  156.         elif event.name == 'f4':  # Check for F4 key press
  157.             global exit_program
  158.             exit_program = True  # Set the exit flag
  159.  
  160. # Start the consumer in a separate thread
  161. # consumer_thread = threading.Thread(target=start_consuming)
  162. # consumer_thread.start()
  163.  
  164. # Initiation
  165. print(f"Program Start using Gear Settings {current_gear} = {gear_settings[current_gear]} and Direction = {motor_direction}")
  166.  
  167. # Register keyboard event handler
  168. keyboard.on_press(handle_keyboard_input)
  169.  
  170. # Main loop to check for exit condition
  171. while not exit_program:
  172.     time.sleep(0.1)  # Sleep briefly to avoid busy waiting
  173.  
  174. # Cleanup before exiting
  175. print("Exiting program...")
  176. channel.stop_consuming()
  177. connection.close()
  178. sys.exit(0)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement