Advertisement
Shriyam

Untitled

Nov 11th, 2024
121
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 28.90 KB | None | 0 0
  1. # backend.py
  2.  
  3. import os
  4. import time
  5. import threading
  6. import logging
  7. from datetime import datetime
  8.  
  9. from fastapi import FastAPI, Body
  10. from fastapi.responses import JSONResponse
  11. from fastapi.middleware.cors import CORSMiddleware
  12. from pydantic import BaseModel
  13.  
  14. import RPi.GPIO as GPIO
  15. import smbus2 as smbus
  16. import board
  17. import busio
  18. import adafruit_tcs34725
  19. from simple_pid import PID
  20. import openpyxl
  21. from openpyxl import Workbook
  22.  
  23. # ---------------------------- Configuration Constants ----------------------------
  24.  
  25. # I2C setup for TCS34725 sensor
  26. i2c = busio.I2C(board.SCL, board.SDA)
  27. TCS34725_ADDRESS = 0x29
  28.  
  29. # Register addresses for color data
  30. CDATAL = 0x14  # Clear (ambient light) channel
  31.  
  32. # Define pin numbers for PIR, IR sensor, and LEDs
  33. PIR_LED_PIN = 18    # LED for PIR sensor (GPIO 18, Physical Pin 12)
  34. IR_LED_PIN = 22     # LED for IR sensor (GPIO 22, Physical Pin 15)
  35. PIR_PIN = 17        # PIR sensor pin (GPIO 17, Physical Pin 11)
  36. IR_PIN = 27         # IR sensor pin (GPIO 27, Physical Pin 13)
  37. TCS_LED_PIN = 26    # TCS sensor-controlled LED (GPIO 26, Physical Pin 37)
  38. RED_LED_PIN = 12    # Red LED for fault indication (GPIO 12, Physical Pin 32)
  39.  
  40. # Define pin numbers and physical pins for additional LEDs
  41. ADDITIONAL_LED_PINS = {
  42.     "LED1": {"gpio": 5, "physical": 29},    # Additional LED 1 (GPIO 5, Physical Pin 29)
  43.     "LED2": {"gpio": 6, "detection_gpio": 21, "physical": 31},  # Additional LED 2 (GPIO 6, Physical Pin 31)
  44.     "LED3": {"gpio": 13, "physical": 33},   # Additional LED 3 (GPIO 13, Physical Pin 33)
  45. }
  46.  
  47. # Fault simulation options
  48. FAULT_MODES = {
  49.     '1': 'Normal Operation',
  50.     '2': 'Simulate PIR Sensor Failure',
  51.     '3': 'Simulate IR Sensor Failure',
  52.     '4': 'Simulate TCS Sensor Failure',
  53.     '5': 'Simulate I2C Communication Failure',
  54.     '6': 'Simulate GPIO Output Failure',
  55.     '7': 'Simulate Power Issues',
  56.     '8': 'Simulate Delayed Response',
  57.     '9': 'Simulate Sensor Cross-Talk',
  58.     '10': 'Simulate LED1 Failure',
  59.     '11': 'Simulate LED2 Failure',
  60.     '12': 'Simulate LED3 Failure',
  61.     # Add more fault modes as needed
  62. }
  63.  
  64. # Light intensity thresholds
  65. LOW_LIGHT_THRESHOLD = 1000    # Below this lux, LEDs should be on
  66. HIGH_LIGHT_THRESHOLD = 10000  # Above this lux, LEDs should be off
  67.  
  68. # Time to keep the LEDs on after detecting motion or object (in seconds)
  69. LED_ON_TIME = 10
  70.  
  71. # Excel setup
  72. EXCEL_FILE_PATH = "/home/iiitg/sensor_data.xlsx"
  73.  
  74. # PID Controller Parameters
  75. INITIAL_TARGET_LUX = 500  # Initial desired lux level
  76. MAX_BRIGHTNESS = 100      # Max PWM duty cycle (0-100)
  77. PID_KP = 1.0              # Proportional gain
  78. PID_KI = 0.1              # Integral gain
  79. PID_KD = 0.05             # Derivative gain
  80.  
  81. # LED Power Calculation (Assume LED draws 20mA at full brightness, voltage is 5V)
  82. LED_VOLTAGE = 5.0        # Assuming 5V supply for the LED
  83. LED_MAX_CURRENT = 0.02   # Max current of 20mA at 100% duty cycle
  84.  
  85. # ---------------------------- Pydantic Models ----------------------------
  86.  
  87. class FaultModeRequest(BaseModel):
  88.     mode: str
  89.  
  90. class SetTargetLuxRequest(BaseModel):
  91.     target_lux: float
  92.  
  93. class SetPIDRequest(BaseModel):
  94.     Kp: float = None
  95.     Ki: float = None
  96.     Kd: float = None
  97.  
  98. class SetLEDRequest(BaseModel):
  99.     led: str
  100.     state: bool
  101.  
  102. # ---------------------------- FastAPI Setup ----------------------------
  103.  
  104. app = FastAPI()
  105.  
  106. # Configure CORS to allow requests from your frontend
  107. app.add_middleware(
  108.     CORSMiddleware,
  109.     allow_origins=["http://localhost:3000"],  # Update with your frontend's URL
  110.     allow_credentials=True,
  111.     allow_methods=["*"],
  112.     allow_headers=["*"],
  113. )
  114.  
  115. # Configure logging
  116. logging.basicConfig(
  117.     level=logging.DEBUG,
  118.     format='%(asctime)s - %(levelname)s - %(message)s',
  119.     filename='backend.log',
  120.     filemode='a'
  121. )
  122.  
  123. # ---------------------------- Shared Variables and Locks ----------------------------
  124.  
  125. # Fault mode management
  126. fault_mode = '1'  # Default to Normal Operation
  127. fault_mode_lock = threading.Lock()
  128.  
  129. # Faults dictionary
  130. faults = {
  131.     "PIR_Sensor_Failure": False,
  132.     "IR_Sensor_Failure": False,
  133.     "TCS_Sensor_Failure": False,
  134.     "I2C_Communication_Failure": False,
  135.     "Sensor_CrossTalk": False,
  136.     "PIR_LED_Failure": False,
  137.     "IR_LED_Failure": False,
  138.     "TCS_LED_Failure": False,
  139.     "LED1_Failure": False,
  140.     "LED2_Failure": False,
  141.     "LED3_Failure": False,
  142.     "GPIO_Output_Failure": False,
  143.     "Power_Issues": False,
  144.     "Delayed_Response": False,
  145. }
  146. faults_lock = threading.Lock()
  147.  
  148. # Manual override flags
  149. manual_override = {
  150.     'LED2': False  # Only LED2 has manual override
  151. }
  152.  
  153. # Dimming parameters
  154. DIM_STEP = 5        # Duty cycle increment/decrement step
  155. DIM_DELAY = 0.05    # Delay between dimming steps in seconds
  156.  
  157. # Duty cycle trackers (excluding LED2 as it's not PWM controlled)
  158. current_duty = {
  159.     'PIR': 0,
  160.     'IR': 0,
  161.     'TCS': 0,
  162.     'LED1': 0,
  163.     'LED3': 0
  164. }
  165.  
  166. # Fade control flags to prevent multiple fade threads (excluding LED2)
  167. fading = {
  168.     'PIR': False,
  169.     'IR': False,
  170.     'TCS': False,
  171.     'LED1': False,
  172.     'LED3': False
  173. }
  174.  
  175. # LED2 Fault Flag
  176. led2_fault_flag = False
  177. led2_fault_lock = threading.Lock()
  178.  
  179. # Global variable to store additional PWM instances
  180. additional_pwms = {}
  181.  
  182. # ---------------------------- PID Controller Setup ----------------------------
  183.  
  184. pid = PID(PID_KP, PID_KI, PID_KD, setpoint=INITIAL_TARGET_LUX)
  185. pid.output_limits = (0, MAX_BRIGHTNESS)  # Restrict output to valid PWM range
  186.  
  187. # ---------------------------- Excel Logging Setup ----------------------------
  188.  
  189. # Ensure the directory exists
  190. directory = os.path.dirname(EXCEL_FILE_PATH)
  191. if not os.path.exists(directory):
  192.     os.makedirs(directory)
  193.  
  194. # Try loading existing workbook or create a new one
  195. try:
  196.     workbook = openpyxl.load_workbook(EXCEL_FILE_PATH)
  197.     sheet = workbook.active
  198. except FileNotFoundError:
  199.     workbook = Workbook()
  200.     sheet = workbook.active
  201.     sheet.append(["Timestamp", "Lux", "Red", "Green", "Blue", "CCT (K)", "LED Duty Cycle (%)", "Power Consumption (W)"])
  202.  
  203. # ---------------------------- Helper Functions ----------------------------
  204.  
  205. def calculate_power_consumption(duty_cycle):
  206.     """Calculate power consumption based on duty cycle."""
  207.     current = LED_MAX_CURRENT * (duty_cycle / 100)
  208.     power = LED_VOLTAGE * current
  209.     return power
  210.  
  211. def calculate_cct(r, g, b):
  212.     """Calculate Correlated Color Temperature (CCT) from RGB values using McCamy's formula."""
  213.     try:
  214.         # Normalize RGB values
  215.         r_norm = r / 65535
  216.         g_norm = g / 65535
  217.         b_norm = b / 65535
  218.  
  219.         # Calculate the chromaticity coordinates
  220.         X = -0.14282 * r_norm + 1.54924 * g_norm + -0.95641 * b_norm
  221.         Y = -0.32466 * r_norm + 1.57837 * g_norm + -0.73191 * b_norm
  222.         Z = -0.68202 * r_norm + 0.77073 * g_norm + 0.56332 * b_norm
  223.  
  224.         # Avoid division by zero
  225.         if (X + Y + Z) == 0:
  226.             return None
  227.  
  228.         # Calculate chromaticity coordinates
  229.         xc = X / (X + Y + Z)
  230.         yc = Y / (X + Y + Z)
  231.  
  232.         # Calculate n
  233.         n = (xc - 0.3320) / (0.1858 - yc)
  234.  
  235.         # Calculate CCT using McCamy's formula
  236.         cct = -449 * (n * 3) + 3525 * (n * 2) - 6823.3 * n + 5520.33
  237.         return round(cct, 2)
  238.     except Exception as e:
  239.         # Handle unexpected errors
  240.         logging.error(f"Error calculating CCT: {e}")
  241.         return None
  242.  
  243. # ---------------------------- GPIO and PWM Initialization ----------------------------
  244.  
  245. def initialize_gpio():
  246.     global PIR_PWM, IR_PWM, TCS_PWM, additional_pwms, sensor
  247.  
  248.     GPIO.setwarnings(False)
  249.     GPIO.setmode(GPIO.BCM)
  250.  
  251.     # Set up sensor input pins with pull-down resistors
  252.     GPIO.setup(PIR_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
  253.     GPIO.setup(IR_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
  254.  
  255.     # Set up LED output pins with initial LOW
  256.     GPIO.setup(PIR_LED_PIN, GPIO.OUT, initial=GPIO.LOW)
  257.     GPIO.setup(IR_LED_PIN, GPIO.OUT, initial=GPIO.LOW)
  258.     GPIO.setup(TCS_LED_PIN, GPIO.OUT, initial=GPIO.LOW)
  259.     GPIO.setup(RED_LED_PIN, GPIO.OUT, initial=GPIO.LOW)
  260.  
  261.     # Set up power pins and detection pins for additional LEDs
  262.     for led_name, led_info in ADDITIONAL_LED_PINS.items():
  263.         gpio_pin = led_info["gpio"]
  264.         GPIO.setup(gpio_pin, GPIO.OUT, initial=GPIO.LOW)
  265.         if "detection_gpio" in led_info:
  266.             detection_pin = led_info["detection_gpio"]
  267.             GPIO.setup(detection_pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
  268.  
  269.     # Set up PWM for PIR and IR LEDs
  270.     PIR_PWM = GPIO.PWM(PIR_LED_PIN, 1000)  # 1 kHz
  271.     IR_PWM = GPIO.PWM(IR_LED_PIN, 1000)    # 1 kHz
  272.     PIR_PWM.start(0)  # Start with LEDs off
  273.     IR_PWM.start(0)
  274.  
  275.     # Set up PWM on the TCS LED pin with 1 kHz frequency
  276.     TCS_PWM = GPIO.PWM(TCS_LED_PIN, 1000)
  277.     TCS_PWM.start(0)  # Start PWM with 0% duty cycle (LED off)
  278.  
  279.     # Initialize the red LED state to off
  280.     GPIO.output(RED_LED_PIN, GPIO.LOW)
  281.  
  282.     # Set up PWM for additional LEDs (excluding LED2)
  283.     additional_pwms = {}
  284.     for name, led_info in ADDITIONAL_LED_PINS.items():
  285.         if name != "LED2":  # Exclude LED2 from PWM control
  286.             pwm_instance = GPIO.PWM(led_info["gpio"], 1000)  # 1 kHz
  287.             pwm_instance.start(0)
  288.             additional_pwms[name] = pwm_instance
  289.  
  290.     # Initialize TCS34725 sensor
  291.     initialize_tcs34725()
  292.  
  293.     logging.info("GPIO and PWM initialized successfully.")
  294.  
  295. def initialize_tcs34725():
  296.     global sensor
  297.  
  298.     with fault_mode_lock:
  299.         current_mode = fault_mode
  300.  
  301.     if current_mode == '4':
  302.         logging.warning("Simulating TCS sensor failure. Skipping initialization.")
  303.         print("Simulating TCS sensor failure. Skipping initialization.")
  304.         return
  305.  
  306.     try:
  307.         sensor = adafruit_tcs34725.TCS34725(i2c)
  308.         sensor.integration_time = 700  # Maximum integration time (~700ms)
  309.         sensor.gain = 60               # 60x gain for increased sensitivity
  310.         logging.info("TCS34725 color sensor initialized with higher sensitivity settings.")
  311.         print("TCS34725 color sensor initialized with higher sensitivity settings.")
  312.     except Exception as e:
  313.         logging.error(f"Error initializing TCS34725: {e}")
  314.         print(f"Error initializing TCS34725: {e}")
  315.         with faults_lock:
  316.             faults["TCS_Sensor_Failure"] = True
  317.             faults["I2C_Communication_Failure"] = True
  318.  
  319. def read_sensor_data():
  320.     with fault_mode_lock:
  321.         current_mode = fault_mode
  322.  
  323.     if current_mode == '4':
  324.         # Simulating TCS sensor failure
  325.         logging.warning("Simulating TCS sensor failure. Returning fixed clear value.")
  326.         return {
  327.             "lux": 5000,  # Fixed high lux value to simulate sensor failure
  328.             "r": 100,
  329.             "g": 100,
  330.             "b": 100,
  331.             "cct": 5000
  332.         }
  333.  
  334.     if current_mode == '5':
  335.         # Simulating I2C communication failure
  336.         logging.error("Simulating I2C communication failure.")
  337.         raise IOError("I2C communication error")
  338.  
  339.     try:
  340.         # Read sensor data
  341.         r, g, b, c = sensor.color_raw
  342.         lux = sensor.lux
  343.  
  344.         # Calculate CCT
  345.         cct = calculate_cct(r, g, b)
  346.  
  347.         with faults_lock:
  348.             faults["TCS_Sensor_Failure"] = False
  349.             faults["I2C_Communication_Failure"] = False
  350.  
  351.         logging.debug(f"Sensor Data - Lux: {lux}, R: {r}, G: {g}, B: {b}, CCT: {cct}")
  352.         return {
  353.             "lux": lux,
  354.             "r": r,
  355.             "g": g,
  356.             "b": b,
  357.             "cct": cct
  358.         }
  359.     except Exception as e:
  360.         logging.error(f"Error reading TCS34725 data: {e}")
  361.         print(f"Error reading TCS34725 data: {e}")
  362.         with faults_lock:
  363.             faults["TCS_Sensor_Failure"] = True
  364.             faults["I2C_Communication_Failure"] = True
  365.         return {
  366.             "lux": HIGH_LIGHT_THRESHOLD,  # Assume it's bright to turn off LEDs
  367.             "r": 0,
  368.             "g": 0,
  369.             "b": 0,
  370.             "cct": None
  371.         }
  372.  
  373. def map_lux_to_duty_cycle(lux):
  374.     """Map the lux value to a PWM duty cycle percentage."""
  375.     if lux > HIGH_LIGHT_THRESHOLD:
  376.         return 0  # Day mode, LEDs off
  377.     elif lux < LOW_LIGHT_THRESHOLD:
  378.         target_lux = 500  # Night mode
  379.     else:
  380.         target_lux = 350  # Moderate light mode
  381.  
  382.     pid.setpoint = target_lux
  383.     duty_cycle = pid(lux)
  384.     duty_cycle = max(0, min(100, duty_cycle))  # Clamp between 0 and 100
  385.     logging.debug(f"Mapped Lux {lux} to Duty Cycle {duty_cycle}% with Target Lux {pid.setpoint}")
  386.     return duty_cycle
  387.  
  388. def calculate_power_consumption(duty_cycle):
  389.     """Calculate power consumption based on duty cycle."""
  390.     current = LED_MAX_CURRENT * (duty_cycle / 100)
  391.     power = LED_VOLTAGE * current
  392.     return power
  393.  
  394. def control_led_brightness(duty_cycle, led_name):
  395.     """Control the brightness of a specific LED using PWM."""
  396.     if led_name in additional_pwms:
  397.         pwm_instance = additional_pwms[led_name]
  398.     elif led_name == 'TCS':
  399.         pwm_instance = TCS_PWM
  400.     elif led_name == 'PIR':
  401.         pwm_instance = PIR_PWM
  402.     elif led_name == 'IR':
  403.         pwm_instance = IR_PWM
  404.     else:
  405.         logging.error(f"Invalid LED name: {led_name}")
  406.         return
  407.  
  408.     # Apply fading to prevent abrupt changes
  409.     threading.Thread(target=fade_to_duty_cycle, args=(pwm_instance, led_name, duty_cycle)).start()
  410.  
  411. def fade_to_duty_cycle(pwm_instance, led_name, target_dc):
  412.     """Fade to a specific duty cycle smoothly."""
  413.     global current_duty, fading
  414.     with faults_lock:
  415.         if faults.get(f"{led_name}_Failure", False):
  416.             logging.error(f"Cannot change duty cycle of {led_name} LED due to a detected fault.")
  417.             return
  418.     if fading.get(led_name, False):
  419.         return  # Prevent multiple fade threads
  420.     fading[led_name] = True
  421.     logging.debug(f"Starting fade to {target_dc}% duty cycle for {led_name}")
  422.    
  423.     step = DIM_STEP if target_dc > current_duty.get(led_name, 0) else -DIM_STEP
  424.     while (step > 0 and current_duty.get(led_name, 0) < target_dc) or \
  425.           (step < 0 and current_duty.get(led_name, 0) > target_dc):
  426.         current = current_duty.get(led_name, 0)
  427.         new_dc = current + step
  428.         if step > 0:
  429.             new_dc = min(new_dc, target_dc)
  430.         else:
  431.             new_dc = max(new_dc, target_dc)
  432.         pwm_instance.ChangeDutyCycle(new_dc)
  433.         current_duty[led_name] = new_dc
  434.         time.sleep(DIM_DELAY)
  435.     fading[led_name] = False
  436.     logging.debug(f"{led_name} duty cycle set to {target_dc}%.")
  437.  
  438. def log_sensor_data(timestamp, lux, r, g, b, cct, duty_cycle, power_consumed):
  439.     """Log sensor data and LED status to Excel."""
  440.     sheet.append([timestamp, lux, r, g, b, cct, duty_cycle, power_consumed])
  441.     workbook.save(EXCEL_FILE_PATH)
  442.     logging.debug(f"Logged data at {timestamp}")
  443.  
  444. def handle_individual_led_faults(led_faults):
  445.     """Handles faults for individual additional LEDs."""
  446.     for led_name, is_faulty in led_faults.items():
  447.         if is_faulty:
  448.             if led_name in additional_pwms and current_duty[led_name] != 0:
  449.                 fade_out(additional_pwms[led_name], led_name)
  450.             # Ensure the LED is off
  451.             if led_name in additional_pwms:
  452.                 additional_pwms[led_name].ChangeDutyCycle(0)
  453.                 current_duty[led_name] = 0
  454.             logging.error(f"{led_name} LED has a fault and has been turned off.")
  455.  
  456. def fade_out(pwm_instance, led_name):
  457.     """Gradually decrease duty cycle to 0."""
  458.     global current_duty, fading
  459.     with faults_lock:
  460.         if faults.get(f"{led_name}_Failure", False):
  461.             logging.error(f"Cannot fade out {led_name} LED due to a detected fault.")
  462.             return
  463.     if fading.get(led_name, False):
  464.         return  # Prevent multiple fade_out threads
  465.     fading[led_name] = True
  466.     logging.debug(f"Starting fade out for {led_name}")
  467.     while current_duty.get(led_name, 0) > 0:
  468.         current_duty[led_name] = max(current_duty.get(led_name, 0) - DIM_STEP, 0)
  469.         pwm_instance.ChangeDutyCycle(current_duty[led_name])
  470.         time.sleep(DIM_DELAY)
  471.     fading[led_name] = False
  472.     logging.debug(f"{led_name} faded out to 0% duty cycle.")
  473.  
  474. def fade_in(pwm_instance, led_name, target_dc=100):
  475.     """Gradually increase duty cycle to target_dc."""
  476.     global current_duty, fading
  477.     with faults_lock:
  478.         if faults.get(f"{led_name}_Failure", False):
  479.             logging.error(f"Cannot fade in {led_name} LED due to a detected fault.")
  480.             return
  481.     if fading.get(led_name, False):
  482.         return  # Prevent multiple fade_in threads
  483.     fading[led_name] = True
  484.     logging.debug(f"Starting fade in for {led_name} to {target_dc}% duty cycle.")
  485.     while current_duty.get(led_name, 0) < target_dc:
  486.         current_duty[led_name] = min(current_duty.get(led_name, 0) + DIM_STEP, target_dc)
  487.         pwm_instance.ChangeDutyCycle(current_duty[led_name])
  488.         time.sleep(DIM_DELAY)
  489.     fading[led_name] = False
  490.     logging.debug(f"{led_name} faded in to {target_dc}% duty cycle.")
  491.  
  492. # ---------------------------- Control Loop ----------------------------
  493.  
  494. def control_loop():
  495.     global current_duty_cycle
  496.     while True:
  497.         try:
  498.             sensor_data = read_sensor_data()
  499.             lux = sensor_data["lux"]
  500.             r = sensor_data["r"]
  501.             g = sensor_data["g"]
  502.             b = sensor_data["b"]
  503.             cct = sensor_data["cct"]
  504.  
  505.             timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  506.  
  507.             # Determine duty cycle based on lux
  508.             duty_cycle = map_lux_to_duty_cycle(lux)
  509.  
  510.             # Calculate power consumption
  511.             power_consumed = calculate_power_consumption(duty_cycle)
  512.  
  513.             # Log data to Excel
  514.             log_sensor_data(timestamp, lux, r, g, b, cct, duty_cycle, power_consumed)
  515.  
  516.             # Adaptive control - adjust target lux based on ambient light conditions
  517.             if lux < 100:
  518.                 pid.setpoint = 500  # Increase target lux in very low light
  519.             elif 100 <= lux < 300:
  520.                 pid.setpoint = 450  # Moderate increase in target lux
  521.             elif 300 <= lux < LOW_LIGHT_THRESHOLD:
  522.                 pid.setpoint = 350  # Lower intensity for medium light
  523.             else:
  524.                 pid.setpoint = 0  # Day mode, LEDs off
  525.  
  526.             # Update duty cycle after adjusting setpoint
  527.             duty_cycle = map_lux_to_duty_cycle(lux)
  528.  
  529.             # Control LEDs based on duty cycle
  530.             if lux > HIGH_LIGHT_THRESHOLD:
  531.                 # Day Mode: Ensure LEDs are off
  532.                 for led_name in ['PIR', 'IR', 'TCS', 'LED1', 'LED3']:
  533.                     if led_name in current_duty and current_duty[led_name] != 0:
  534.                         control_led_brightness(0, led_name)
  535.                 with faults_lock:
  536.                     if not manual_override['LED2'] and not faults.get("LED2_Failure", False):
  537.                         GPIO.output(ADDITIONAL_LED_PINS["LED2"]["gpio"], GPIO.LOW)
  538.             else:
  539.                 # Night or Moderate Light Mode: Adjust LEDs based on sensor detections
  540.                 # Read PIR and IR sensor states unless in a fault mode
  541.                 with fault_mode_lock:
  542.                     current_mode = fault_mode
  543.  
  544.                 if current_mode in ['2', '9']:  # Simulate PIR Sensor Failure or Sensor Cross-Talk
  545.                     pir_detected = False if current_mode == '2' else True
  546.                     ir_detected = False if current_mode == '3' else True
  547.                 else:
  548.                     pir_detected = GPIO.input(PIR_PIN)
  549.                     ir_detected = GPIO.input(IR_PIN)
  550.  
  551.                 if pir_detected or not ir_detected:
  552.                     # Turn on LEDs based on duty cycle
  553.                     for led_name in ['PIR', 'IR', 'TCS', 'LED1', 'LED3']:
  554.                         if led_name in current_duty:
  555.                             control_led_brightness(duty_cycle, led_name)
  556.                     # Turn on LED2 if not in manual override and not faulty
  557.                     with faults_lock:
  558.                         if not manual_override['LED2'] and not faults.get("LED2_Failure", False):
  559.                             GPIO.output(ADDITIONAL_LED_PINS["LED2"]["gpio"], GPIO.HIGH)
  560.                 else:
  561.                     # Turn off LEDs
  562.                     for led_name in ['PIR', 'IR', 'TCS', 'LED1', 'LED3']:
  563.                         if led_name in current_duty and current_duty[led_name] != 0:
  564.                             control_led_brightness(0, led_name)
  565.                     # Turn off LED2 if not in manual override and not faulty
  566.                     with faults_lock:
  567.                         if not manual_override['LED2'] and not faults.get("LED2_Failure", False):
  568.                             GPIO.output(ADDITIONAL_LED_PINS["LED2"]["gpio"], GPIO.LOW)
  569.  
  570.             # Handle individual LED faults
  571.             led_faults = {
  572.                 'LED1': faults.get("LED1_Failure", False),
  573.                 'LED2': faults.get("LED2_Failure", False),
  574.                 'LED3': faults.get("LED3_Failure", False)
  575.             }
  576.             handle_individual_led_faults(led_faults)
  577.  
  578.             # Sleep before next iteration
  579.             time.sleep(5)  # Adjust as needed
  580.  
  581.         except Exception as e:
  582.             logging.error(f"Error in control loop: {e}")
  583.             time.sleep(5)  # Wait before retrying
  584.  
  585. # ---------------------------- API Endpoints ----------------------------
  586.  
  587. @app.get("/")
  588. def read_root():
  589.     return {"message": "Backend server is running."}
  590.  
  591. @app.get("/status")
  592. def get_status():
  593.     with fault_mode_lock:
  594.         current_mode = fault_mode
  595.  
  596.     # Read LED2's detection pin (GPIO 21) to determine its state
  597.     led2_state = False
  598.     try:
  599.         led2_state = GPIO.input(ADDITIONAL_LED_PINS["LED2"]["detection_gpio"]) == GPIO.HIGH
  600.     except Exception as e:
  601.         logging.error(f"Error reading LED2's detection pin: {e}")
  602.         with faults_lock:
  603.             faults["LED2_Failure"] = True
  604.  
  605.     status = {
  606.         "fault_mode": FAULT_MODES.get(current_mode, "Unknown"),
  607.         "current_duty": current_duty,
  608.         "LED2_state": led2_state,  # Include LED2's ON/OFF state
  609.         "faults": faults.copy()
  610.     }
  611.  
  612.     return JSONResponse(status)
  613.  
  614. @app.post("/set_fault_mode")
  615. def set_fault_mode(request: FaultModeRequest):
  616.     mode = request.mode
  617.     if mode not in FAULT_MODES:
  618.         logging.error(f"Invalid fault mode attempted: {mode}")
  619.         return JSONResponse(status_code=400, content={"error": "Invalid fault mode."})
  620.    
  621.     with fault_mode_lock:
  622.         global fault_mode
  623.         fault_mode = mode
  624.  
  625.         if mode == '1':
  626.             # Reset all fault states
  627.             with faults_lock:
  628.                 for key in faults:
  629.                     faults[key] = False
  630.             logging.info("Switched to Normal Operation. All faults cleared.")
  631.             print("Switched to Normal Operation. All faults cleared.")
  632.         else:
  633.             # Simulate faults based on the selected mode
  634.             with faults_lock:
  635.                 # First, clear all faults
  636.                 for key in faults:
  637.                     faults[key] = False
  638.                
  639.                 # Then, set the specific fault
  640.                 if mode == '2':
  641.                     faults["PIR_Sensor_Failure"] = True
  642.                 elif mode == '3':
  643.                     faults["IR_Sensor_Failure"] = True
  644.                 elif mode == '4':
  645.                     faults["TCS_Sensor_Failure"] = True
  646.                 elif mode == '5':
  647.                     faults["I2C_Communication_Failure"] = True
  648.                 elif mode == '6':
  649.                     faults["GPIO_Output_Failure"] = True
  650.                 elif mode == '7':
  651.                     faults["Power_Issues"] = True
  652.                 elif mode == '8':
  653.                     faults["Delayed_Response"] = True
  654.                 elif mode == '9':
  655.                     faults["Sensor_CrossTalk"] = True
  656.                 elif mode == '10':
  657.                     faults["LED1_Failure"] = True
  658.                 elif mode == '11':
  659.                     faults["LED2_Failure"] = True
  660.                 elif mode == '12':
  661.                     faults["LED3_Failure"] = True
  662.                 # Add more fault simulations as needed
  663.  
  664.             logging.info(f"Simulated Fault Mode: {FAULT_MODES[mode]}")
  665.             print(f"Simulated Fault Mode: {FAULT_MODES[mode]}")
  666.  
  667.     return {"message": FAULT_MODES[mode]}
  668.  
  669. @app.post("/set_target_lux")
  670. def set_target_lux(request: SetTargetLuxRequest):
  671.     target_lux = request.target_lux
  672.     pid.setpoint = target_lux
  673.     logging.info(f"Target lux set to {target_lux}")
  674.     print(f"Target lux set to {target_lux}")
  675.     return {"status": "success", "target_lux": pid.setpoint}
  676.  
  677. @app.post("/set_pid")
  678. def set_pid(request: SetPIDRequest):
  679.     global pid
  680.     if request.Kp is not None:
  681.         pid.Kp = request.Kp
  682.         logging.info(f"PID Kp set to {pid.Kp}")
  683.     if request.Ki is not None:
  684.         pid.Ki = request.Ki
  685.         logging.info(f"PID Ki set to {pid.Ki}")
  686.     if request.Kd is not None:
  687.         pid.Kd = request.Kd
  688.         logging.info(f"PID Kd set to {pid.Kd}")
  689.     return JSONResponse({
  690.         "status": "success",
  691.         "Kp": pid.Kp,
  692.         "Ki": pid.Ki,
  693.         "Kd": pid.Kd
  694.     })
  695.  
  696. @app.post("/set_led")
  697. def set_led(request: SetLEDRequest):
  698.     led = request.led.upper()
  699.     state = request.state
  700.  
  701.     if led not in ADDITIONAL_LED_PINS and led not in current_duty and led != "TCS":
  702.         logging.error(f"Invalid LED name attempted: {led}")
  703.         return JSONResponse(status_code=400, content={"error": "Invalid LED name."})
  704.  
  705.     # Prevent controlling LEDs that are in fault mode
  706.     fault_prevent = False
  707.     with fault_mode_lock:
  708.         if led == "LED1" and fault_mode == '10':
  709.             fault_prevent = True
  710.         elif led == "LED2" and fault_mode == '11':
  711.             fault_prevent = True
  712.         elif led == "LED3" and fault_mode == '12':
  713.             fault_prevent = True
  714.         elif led == "PIR" and fault_mode == '2':
  715.             fault_prevent = True
  716.         elif led == "IR" and fault_mode == '3':
  717.             fault_prevent = True
  718.         elif led == "TCS" and fault_mode == '4':
  719.             fault_prevent = True
  720.         elif fault_mode in ['6', '7', '8', '9'] and led in ['LED1', 'LED2', 'LED3']:
  721.             fault_prevent = True
  722.  
  723.     if fault_prevent:
  724.         logging.warning(f"Attempted to control {led} while in fault mode.")
  725.         return JSONResponse(status_code=400, content={"error": f"Cannot control {led} in current fault mode."})
  726.  
  727.     if led == "LED2":
  728.         # Control LED2 directly via GPIO6
  729.         GPIO.output(ADDITIONAL_LED_PINS["LED2"]["gpio"], GPIO.HIGH if state else GPIO.LOW)
  730.         with faults_lock:
  731.             manual_override['LED2'] = True  # Activate manual override
  732.             # Reset LED2 Fault Flag if manual control is restored
  733.             if faults.get("LED2_Failure", False):
  734.                 faults["LED2_Failure"] = False
  735.                 logging.info("Manual control restored for LED2. Fault flag cleared.")
  736.                 print("Manual control restored for LED2. Fault flag cleared.")
  737.         logging.info(f"{led} LED set to {'on' if state else 'off'} via manual control.")
  738.         return {"message": f"{led} LED turned {'on' if state else 'off'} via manual control"}
  739.     else:
  740.         # For PWM-controlled LEDs
  741.         duty_cycle = 100 if state else 0
  742.         pwm_instance = None
  743.         if led in ['PIR', 'IR', 'TCS']:
  744.             pwm_instance = globals().get(f"{led}_PWM")
  745.         elif led in additional_pwms:
  746.             pwm_instance = additional_pwms.get(led)
  747.        
  748.         if pwm_instance:
  749.             control_led_brightness(duty_cycle, led)
  750.             logging.info(f"{led} LED set to {'on' if state else 'off'}.")
  751.             return {"message": f"{led} LED turned {'on' if state else 'off'}."}
  752.  
  753.     logging.error(f"Failed to set LED: {led}")
  754.     return JSONResponse(status_code=500, content={"error": "Failed to set LED."})
  755.  
  756. # ---------------------------- Application Lifecycle Events ----------------------------
  757.  
  758. @app.on_event("startup")
  759. def startup_event():
  760.     initialize_gpio()
  761.     # Start the control loop in a separate daemon thread
  762.     control_thread = threading.Thread(target=control_loop, daemon=True)
  763.     control_thread.start()
  764.     logging.info("Backend server started and control loop initiated.")
  765.     print("Backend server started and control loop initiated.")
  766.  
  767. @app.on_event("shutdown")
  768. def shutdown_event():
  769.     # Stop PWM and clean up GPIO settings
  770.     PIR_PWM.stop()
  771.     IR_PWM.stop()
  772.     TCS_PWM.stop()
  773.     for pwm_instance in additional_pwms.values():
  774.         pwm_instance.stop()
  775.     # Turn off the red LED
  776.     GPIO.output(RED_LED_PIN, GPIO.LOW)
  777.     # Turn off LED2
  778.     GPIO.output(ADDITIONAL_LED_PINS["LED2"]["gpio"], GPIO.LOW)
  779.     GPIO.cleanup()
  780.     logging.info("Backend server shutdown and GPIO cleaned up.")
  781.     print("Backend server shutdown and GPIO cleaned up.")
  782.  
  783. # ---------------------------- Run the FastAPI App ----------------------------
  784.  
  785. # To run the app, use the following command:
  786. # uvicorn backend:app --host 0.0.0.0 --port 8000
  787.  
  788.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement