Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # backend.py
- import os
- import time
- import threading
- import logging
- from datetime import datetime
- from fastapi import FastAPI, Body
- from fastapi.responses import JSONResponse
- from fastapi.middleware.cors import CORSMiddleware
- from pydantic import BaseModel
- import RPi.GPIO as GPIO
- import smbus2 as smbus
- import board
- import busio
- import adafruit_tcs34725
- from simple_pid import PID
- import openpyxl
- from openpyxl import Workbook
- # ---------------------------- Configuration Constants ----------------------------
- # I2C setup for TCS34725 sensor
- i2c = busio.I2C(board.SCL, board.SDA)
- TCS34725_ADDRESS = 0x29
- # Register addresses for color data
- CDATAL = 0x14 # Clear (ambient light) channel
- # Define pin numbers for PIR, IR sensor, and LEDs
- PIR_LED_PIN = 18 # LED for PIR sensor (GPIO 18, Physical Pin 12)
- IR_LED_PIN = 22 # LED for IR sensor (GPIO 22, Physical Pin 15)
- PIR_PIN = 17 # PIR sensor pin (GPIO 17, Physical Pin 11)
- IR_PIN = 27 # IR sensor pin (GPIO 27, Physical Pin 13)
- TCS_LED_PIN = 26 # TCS sensor-controlled LED (GPIO 26, Physical Pin 37)
- RED_LED_PIN = 12 # Red LED for fault indication (GPIO 12, Physical Pin 32)
- # Define pin numbers and physical pins for additional LEDs
- ADDITIONAL_LED_PINS = {
- "LED1": {"gpio": 5, "physical": 29}, # Additional LED 1 (GPIO 5, Physical Pin 29)
- "LED2": {"gpio": 6, "detection_gpio": 21, "physical": 31}, # Additional LED 2 (GPIO 6, Physical Pin 31)
- "LED3": {"gpio": 13, "physical": 33}, # Additional LED 3 (GPIO 13, Physical Pin 33)
- }
- # Fault simulation options
- FAULT_MODES = {
- '1': 'Normal Operation',
- '2': 'Simulate PIR Sensor Failure',
- '3': 'Simulate IR Sensor Failure',
- '4': 'Simulate TCS Sensor Failure',
- '5': 'Simulate I2C Communication Failure',
- '6': 'Simulate GPIO Output Failure',
- '7': 'Simulate Power Issues',
- '8': 'Simulate Delayed Response',
- '9': 'Simulate Sensor Cross-Talk',
- '10': 'Simulate LED1 Failure',
- '11': 'Simulate LED2 Failure',
- '12': 'Simulate LED3 Failure',
- # Add more fault modes as needed
- }
- # Light intensity thresholds
- LOW_LIGHT_THRESHOLD = 1000 # Below this lux, LEDs should be on
- HIGH_LIGHT_THRESHOLD = 10000 # Above this lux, LEDs should be off
- # Time to keep the LEDs on after detecting motion or object (in seconds)
- LED_ON_TIME = 10
- # Excel setup
- EXCEL_FILE_PATH = "/home/iiitg/sensor_data.xlsx"
- # PID Controller Parameters
- INITIAL_TARGET_LUX = 500 # Initial desired lux level
- MAX_BRIGHTNESS = 100 # Max PWM duty cycle (0-100)
- PID_KP = 1.0 # Proportional gain
- PID_KI = 0.1 # Integral gain
- PID_KD = 0.05 # Derivative gain
- # LED Power Calculation (Assume LED draws 20mA at full brightness, voltage is 5V)
- LED_VOLTAGE = 5.0 # Assuming 5V supply for the LED
- LED_MAX_CURRENT = 0.02 # Max current of 20mA at 100% duty cycle
- # ---------------------------- Pydantic Models ----------------------------
- class FaultModeRequest(BaseModel):
- mode: str
- class SetTargetLuxRequest(BaseModel):
- target_lux: float
- class SetPIDRequest(BaseModel):
- Kp: float = None
- Ki: float = None
- Kd: float = None
- class SetLEDRequest(BaseModel):
- led: str
- state: bool
- # ---------------------------- FastAPI Setup ----------------------------
- app = FastAPI()
- # Configure CORS to allow requests from your frontend
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["http://localhost:3000"], # Update with your frontend's URL
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- # Configure logging
- logging.basicConfig(
- level=logging.DEBUG,
- format='%(asctime)s - %(levelname)s - %(message)s',
- filename='backend.log',
- filemode='a'
- )
- # ---------------------------- Shared Variables and Locks ----------------------------
- # Fault mode management
- fault_mode = '1' # Default to Normal Operation
- fault_mode_lock = threading.Lock()
- # Faults dictionary
- faults = {
- "PIR_Sensor_Failure": False,
- "IR_Sensor_Failure": False,
- "TCS_Sensor_Failure": False,
- "I2C_Communication_Failure": False,
- "Sensor_CrossTalk": False,
- "PIR_LED_Failure": False,
- "IR_LED_Failure": False,
- "TCS_LED_Failure": False,
- "LED1_Failure": False,
- "LED2_Failure": False,
- "LED3_Failure": False,
- "GPIO_Output_Failure": False,
- "Power_Issues": False,
- "Delayed_Response": False,
- }
- faults_lock = threading.Lock()
- # Manual override flags
- manual_override = {
- 'LED2': False # Only LED2 has manual override
- }
- # Dimming parameters
- DIM_STEP = 5 # Duty cycle increment/decrement step
- DIM_DELAY = 0.05 # Delay between dimming steps in seconds
- # Duty cycle trackers (excluding LED2 as it's not PWM controlled)
- current_duty = {
- 'PIR': 0,
- 'IR': 0,
- 'TCS': 0,
- 'LED1': 0,
- 'LED3': 0
- }
- # Fade control flags to prevent multiple fade threads (excluding LED2)
- fading = {
- 'PIR': False,
- 'IR': False,
- 'TCS': False,
- 'LED1': False,
- 'LED3': False
- }
- # LED2 Fault Flag
- led2_fault_flag = False
- led2_fault_lock = threading.Lock()
- # Global variable to store additional PWM instances
- additional_pwms = {}
- # ---------------------------- PID Controller Setup ----------------------------
- pid = PID(PID_KP, PID_KI, PID_KD, setpoint=INITIAL_TARGET_LUX)
- pid.output_limits = (0, MAX_BRIGHTNESS) # Restrict output to valid PWM range
- # ---------------------------- Excel Logging Setup ----------------------------
- # Ensure the directory exists
- directory = os.path.dirname(EXCEL_FILE_PATH)
- if not os.path.exists(directory):
- os.makedirs(directory)
- # Try loading existing workbook or create a new one
- try:
- workbook = openpyxl.load_workbook(EXCEL_FILE_PATH)
- sheet = workbook.active
- except FileNotFoundError:
- workbook = Workbook()
- sheet = workbook.active
- sheet.append(["Timestamp", "Lux", "Red", "Green", "Blue", "CCT (K)", "LED Duty Cycle (%)", "Power Consumption (W)"])
- # ---------------------------- Helper Functions ----------------------------
- def calculate_power_consumption(duty_cycle):
- """Calculate power consumption based on duty cycle."""
- current = LED_MAX_CURRENT * (duty_cycle / 100)
- power = LED_VOLTAGE * current
- return power
- def calculate_cct(r, g, b):
- """Calculate Correlated Color Temperature (CCT) from RGB values using McCamy's formula."""
- try:
- # Normalize RGB values
- r_norm = r / 65535
- g_norm = g / 65535
- b_norm = b / 65535
- # Calculate the chromaticity coordinates
- X = -0.14282 * r_norm + 1.54924 * g_norm + -0.95641 * b_norm
- Y = -0.32466 * r_norm + 1.57837 * g_norm + -0.73191 * b_norm
- Z = -0.68202 * r_norm + 0.77073 * g_norm + 0.56332 * b_norm
- # Avoid division by zero
- if (X + Y + Z) == 0:
- return None
- # Calculate chromaticity coordinates
- xc = X / (X + Y + Z)
- yc = Y / (X + Y + Z)
- # Calculate n
- n = (xc - 0.3320) / (0.1858 - yc)
- # Calculate CCT using McCamy's formula
- cct = -449 * (n * 3) + 3525 * (n * 2) - 6823.3 * n + 5520.33
- return round(cct, 2)
- except Exception as e:
- # Handle unexpected errors
- logging.error(f"Error calculating CCT: {e}")
- return None
- # ---------------------------- GPIO and PWM Initialization ----------------------------
- def initialize_gpio():
- global PIR_PWM, IR_PWM, TCS_PWM, additional_pwms, sensor
- GPIO.setwarnings(False)
- GPIO.setmode(GPIO.BCM)
- # Set up sensor input pins with pull-down resistors
- GPIO.setup(PIR_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
- GPIO.setup(IR_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
- # Set up LED output pins with initial LOW
- GPIO.setup(PIR_LED_PIN, GPIO.OUT, initial=GPIO.LOW)
- GPIO.setup(IR_LED_PIN, GPIO.OUT, initial=GPIO.LOW)
- GPIO.setup(TCS_LED_PIN, GPIO.OUT, initial=GPIO.LOW)
- GPIO.setup(RED_LED_PIN, GPIO.OUT, initial=GPIO.LOW)
- # Set up power pins and detection pins for additional LEDs
- for led_name, led_info in ADDITIONAL_LED_PINS.items():
- gpio_pin = led_info["gpio"]
- GPIO.setup(gpio_pin, GPIO.OUT, initial=GPIO.LOW)
- if "detection_gpio" in led_info:
- detection_pin = led_info["detection_gpio"]
- GPIO.setup(detection_pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
- # Set up PWM for PIR and IR LEDs
- PIR_PWM = GPIO.PWM(PIR_LED_PIN, 1000) # 1 kHz
- IR_PWM = GPIO.PWM(IR_LED_PIN, 1000) # 1 kHz
- PIR_PWM.start(0) # Start with LEDs off
- IR_PWM.start(0)
- # Set up PWM on the TCS LED pin with 1 kHz frequency
- TCS_PWM = GPIO.PWM(TCS_LED_PIN, 1000)
- TCS_PWM.start(0) # Start PWM with 0% duty cycle (LED off)
- # Initialize the red LED state to off
- GPIO.output(RED_LED_PIN, GPIO.LOW)
- # Set up PWM for additional LEDs (excluding LED2)
- additional_pwms = {}
- for name, led_info in ADDITIONAL_LED_PINS.items():
- if name != "LED2": # Exclude LED2 from PWM control
- pwm_instance = GPIO.PWM(led_info["gpio"], 1000) # 1 kHz
- pwm_instance.start(0)
- additional_pwms[name] = pwm_instance
- # Initialize TCS34725 sensor
- initialize_tcs34725()
- logging.info("GPIO and PWM initialized successfully.")
- def initialize_tcs34725():
- global sensor
- with fault_mode_lock:
- current_mode = fault_mode
- if current_mode == '4':
- logging.warning("Simulating TCS sensor failure. Skipping initialization.")
- print("Simulating TCS sensor failure. Skipping initialization.")
- return
- try:
- sensor = adafruit_tcs34725.TCS34725(i2c)
- sensor.integration_time = 700 # Maximum integration time (~700ms)
- sensor.gain = 60 # 60x gain for increased sensitivity
- logging.info("TCS34725 color sensor initialized with higher sensitivity settings.")
- print("TCS34725 color sensor initialized with higher sensitivity settings.")
- except Exception as e:
- logging.error(f"Error initializing TCS34725: {e}")
- print(f"Error initializing TCS34725: {e}")
- with faults_lock:
- faults["TCS_Sensor_Failure"] = True
- faults["I2C_Communication_Failure"] = True
- def read_sensor_data():
- with fault_mode_lock:
- current_mode = fault_mode
- if current_mode == '4':
- # Simulating TCS sensor failure
- logging.warning("Simulating TCS sensor failure. Returning fixed clear value.")
- return {
- "lux": 5000, # Fixed high lux value to simulate sensor failure
- "r": 100,
- "g": 100,
- "b": 100,
- "cct": 5000
- }
- if current_mode == '5':
- # Simulating I2C communication failure
- logging.error("Simulating I2C communication failure.")
- raise IOError("I2C communication error")
- try:
- # Read sensor data
- r, g, b, c = sensor.color_raw
- lux = sensor.lux
- # Calculate CCT
- cct = calculate_cct(r, g, b)
- with faults_lock:
- faults["TCS_Sensor_Failure"] = False
- faults["I2C_Communication_Failure"] = False
- logging.debug(f"Sensor Data - Lux: {lux}, R: {r}, G: {g}, B: {b}, CCT: {cct}")
- return {
- "lux": lux,
- "r": r,
- "g": g,
- "b": b,
- "cct": cct
- }
- except Exception as e:
- logging.error(f"Error reading TCS34725 data: {e}")
- print(f"Error reading TCS34725 data: {e}")
- with faults_lock:
- faults["TCS_Sensor_Failure"] = True
- faults["I2C_Communication_Failure"] = True
- return {
- "lux": HIGH_LIGHT_THRESHOLD, # Assume it's bright to turn off LEDs
- "r": 0,
- "g": 0,
- "b": 0,
- "cct": None
- }
- def map_lux_to_duty_cycle(lux):
- """Map the lux value to a PWM duty cycle percentage."""
- if lux > HIGH_LIGHT_THRESHOLD:
- return 0 # Day mode, LEDs off
- elif lux < LOW_LIGHT_THRESHOLD:
- target_lux = 500 # Night mode
- else:
- target_lux = 350 # Moderate light mode
- pid.setpoint = target_lux
- duty_cycle = pid(lux)
- duty_cycle = max(0, min(100, duty_cycle)) # Clamp between 0 and 100
- logging.debug(f"Mapped Lux {lux} to Duty Cycle {duty_cycle}% with Target Lux {pid.setpoint}")
- return duty_cycle
- def calculate_power_consumption(duty_cycle):
- """Calculate power consumption based on duty cycle."""
- current = LED_MAX_CURRENT * (duty_cycle / 100)
- power = LED_VOLTAGE * current
- return power
- def control_led_brightness(duty_cycle, led_name):
- """Control the brightness of a specific LED using PWM."""
- if led_name in additional_pwms:
- pwm_instance = additional_pwms[led_name]
- elif led_name == 'TCS':
- pwm_instance = TCS_PWM
- elif led_name == 'PIR':
- pwm_instance = PIR_PWM
- elif led_name == 'IR':
- pwm_instance = IR_PWM
- else:
- logging.error(f"Invalid LED name: {led_name}")
- return
- # Apply fading to prevent abrupt changes
- threading.Thread(target=fade_to_duty_cycle, args=(pwm_instance, led_name, duty_cycle)).start()
- def fade_to_duty_cycle(pwm_instance, led_name, target_dc):
- """Fade to a specific duty cycle smoothly."""
- global current_duty, fading
- with faults_lock:
- if faults.get(f"{led_name}_Failure", False):
- logging.error(f"Cannot change duty cycle of {led_name} LED due to a detected fault.")
- return
- if fading.get(led_name, False):
- return # Prevent multiple fade threads
- fading[led_name] = True
- logging.debug(f"Starting fade to {target_dc}% duty cycle for {led_name}")
- step = DIM_STEP if target_dc > current_duty.get(led_name, 0) else -DIM_STEP
- while (step > 0 and current_duty.get(led_name, 0) < target_dc) or \
- (step < 0 and current_duty.get(led_name, 0) > target_dc):
- current = current_duty.get(led_name, 0)
- new_dc = current + step
- if step > 0:
- new_dc = min(new_dc, target_dc)
- else:
- new_dc = max(new_dc, target_dc)
- pwm_instance.ChangeDutyCycle(new_dc)
- current_duty[led_name] = new_dc
- time.sleep(DIM_DELAY)
- fading[led_name] = False
- logging.debug(f"{led_name} duty cycle set to {target_dc}%.")
- def log_sensor_data(timestamp, lux, r, g, b, cct, duty_cycle, power_consumed):
- """Log sensor data and LED status to Excel."""
- sheet.append([timestamp, lux, r, g, b, cct, duty_cycle, power_consumed])
- workbook.save(EXCEL_FILE_PATH)
- logging.debug(f"Logged data at {timestamp}")
- def handle_individual_led_faults(led_faults):
- """Handles faults for individual additional LEDs."""
- for led_name, is_faulty in led_faults.items():
- if is_faulty:
- if led_name in additional_pwms and current_duty[led_name] != 0:
- fade_out(additional_pwms[led_name], led_name)
- # Ensure the LED is off
- if led_name in additional_pwms:
- additional_pwms[led_name].ChangeDutyCycle(0)
- current_duty[led_name] = 0
- logging.error(f"{led_name} LED has a fault and has been turned off.")
- def fade_out(pwm_instance, led_name):
- """Gradually decrease duty cycle to 0."""
- global current_duty, fading
- with faults_lock:
- if faults.get(f"{led_name}_Failure", False):
- logging.error(f"Cannot fade out {led_name} LED due to a detected fault.")
- return
- if fading.get(led_name, False):
- return # Prevent multiple fade_out threads
- fading[led_name] = True
- logging.debug(f"Starting fade out for {led_name}")
- while current_duty.get(led_name, 0) > 0:
- current_duty[led_name] = max(current_duty.get(led_name, 0) - DIM_STEP, 0)
- pwm_instance.ChangeDutyCycle(current_duty[led_name])
- time.sleep(DIM_DELAY)
- fading[led_name] = False
- logging.debug(f"{led_name} faded out to 0% duty cycle.")
- def fade_in(pwm_instance, led_name, target_dc=100):
- """Gradually increase duty cycle to target_dc."""
- global current_duty, fading
- with faults_lock:
- if faults.get(f"{led_name}_Failure", False):
- logging.error(f"Cannot fade in {led_name} LED due to a detected fault.")
- return
- if fading.get(led_name, False):
- return # Prevent multiple fade_in threads
- fading[led_name] = True
- logging.debug(f"Starting fade in for {led_name} to {target_dc}% duty cycle.")
- while current_duty.get(led_name, 0) < target_dc:
- current_duty[led_name] = min(current_duty.get(led_name, 0) + DIM_STEP, target_dc)
- pwm_instance.ChangeDutyCycle(current_duty[led_name])
- time.sleep(DIM_DELAY)
- fading[led_name] = False
- logging.debug(f"{led_name} faded in to {target_dc}% duty cycle.")
- # ---------------------------- Control Loop ----------------------------
- def control_loop():
- global current_duty_cycle
- while True:
- try:
- sensor_data = read_sensor_data()
- lux = sensor_data["lux"]
- r = sensor_data["r"]
- g = sensor_data["g"]
- b = sensor_data["b"]
- cct = sensor_data["cct"]
- timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- # Determine duty cycle based on lux
- duty_cycle = map_lux_to_duty_cycle(lux)
- # Calculate power consumption
- power_consumed = calculate_power_consumption(duty_cycle)
- # Log data to Excel
- log_sensor_data(timestamp, lux, r, g, b, cct, duty_cycle, power_consumed)
- # Adaptive control - adjust target lux based on ambient light conditions
- if lux < 100:
- pid.setpoint = 500 # Increase target lux in very low light
- elif 100 <= lux < 300:
- pid.setpoint = 450 # Moderate increase in target lux
- elif 300 <= lux < LOW_LIGHT_THRESHOLD:
- pid.setpoint = 350 # Lower intensity for medium light
- else:
- pid.setpoint = 0 # Day mode, LEDs off
- # Update duty cycle after adjusting setpoint
- duty_cycle = map_lux_to_duty_cycle(lux)
- # Control LEDs based on duty cycle
- if lux > HIGH_LIGHT_THRESHOLD:
- # Day Mode: Ensure LEDs are off
- for led_name in ['PIR', 'IR', 'TCS', 'LED1', 'LED3']:
- if led_name in current_duty and current_duty[led_name] != 0:
- control_led_brightness(0, led_name)
- with faults_lock:
- if not manual_override['LED2'] and not faults.get("LED2_Failure", False):
- GPIO.output(ADDITIONAL_LED_PINS["LED2"]["gpio"], GPIO.LOW)
- else:
- # Night or Moderate Light Mode: Adjust LEDs based on sensor detections
- # Read PIR and IR sensor states unless in a fault mode
- with fault_mode_lock:
- current_mode = fault_mode
- if current_mode in ['2', '9']: # Simulate PIR Sensor Failure or Sensor Cross-Talk
- pir_detected = False if current_mode == '2' else True
- ir_detected = False if current_mode == '3' else True
- else:
- pir_detected = GPIO.input(PIR_PIN)
- ir_detected = GPIO.input(IR_PIN)
- if pir_detected or not ir_detected:
- # Turn on LEDs based on duty cycle
- for led_name in ['PIR', 'IR', 'TCS', 'LED1', 'LED3']:
- if led_name in current_duty:
- control_led_brightness(duty_cycle, led_name)
- # Turn on LED2 if not in manual override and not faulty
- with faults_lock:
- if not manual_override['LED2'] and not faults.get("LED2_Failure", False):
- GPIO.output(ADDITIONAL_LED_PINS["LED2"]["gpio"], GPIO.HIGH)
- else:
- # Turn off LEDs
- for led_name in ['PIR', 'IR', 'TCS', 'LED1', 'LED3']:
- if led_name in current_duty and current_duty[led_name] != 0:
- control_led_brightness(0, led_name)
- # Turn off LED2 if not in manual override and not faulty
- with faults_lock:
- if not manual_override['LED2'] and not faults.get("LED2_Failure", False):
- GPIO.output(ADDITIONAL_LED_PINS["LED2"]["gpio"], GPIO.LOW)
- # Handle individual LED faults
- led_faults = {
- 'LED1': faults.get("LED1_Failure", False),
- 'LED2': faults.get("LED2_Failure", False),
- 'LED3': faults.get("LED3_Failure", False)
- }
- handle_individual_led_faults(led_faults)
- # Sleep before next iteration
- time.sleep(5) # Adjust as needed
- except Exception as e:
- logging.error(f"Error in control loop: {e}")
- time.sleep(5) # Wait before retrying
- # ---------------------------- API Endpoints ----------------------------
- @app.get("/")
- def read_root():
- return {"message": "Backend server is running."}
- @app.get("/status")
- def get_status():
- with fault_mode_lock:
- current_mode = fault_mode
- # Read LED2's detection pin (GPIO 21) to determine its state
- led2_state = False
- try:
- led2_state = GPIO.input(ADDITIONAL_LED_PINS["LED2"]["detection_gpio"]) == GPIO.HIGH
- except Exception as e:
- logging.error(f"Error reading LED2's detection pin: {e}")
- with faults_lock:
- faults["LED2_Failure"] = True
- status = {
- "fault_mode": FAULT_MODES.get(current_mode, "Unknown"),
- "current_duty": current_duty,
- "LED2_state": led2_state, # Include LED2's ON/OFF state
- "faults": faults.copy()
- }
- return JSONResponse(status)
- @app.post("/set_fault_mode")
- def set_fault_mode(request: FaultModeRequest):
- mode = request.mode
- if mode not in FAULT_MODES:
- logging.error(f"Invalid fault mode attempted: {mode}")
- return JSONResponse(status_code=400, content={"error": "Invalid fault mode."})
- with fault_mode_lock:
- global fault_mode
- fault_mode = mode
- if mode == '1':
- # Reset all fault states
- with faults_lock:
- for key in faults:
- faults[key] = False
- logging.info("Switched to Normal Operation. All faults cleared.")
- print("Switched to Normal Operation. All faults cleared.")
- else:
- # Simulate faults based on the selected mode
- with faults_lock:
- # First, clear all faults
- for key in faults:
- faults[key] = False
- # Then, set the specific fault
- if mode == '2':
- faults["PIR_Sensor_Failure"] = True
- elif mode == '3':
- faults["IR_Sensor_Failure"] = True
- elif mode == '4':
- faults["TCS_Sensor_Failure"] = True
- elif mode == '5':
- faults["I2C_Communication_Failure"] = True
- elif mode == '6':
- faults["GPIO_Output_Failure"] = True
- elif mode == '7':
- faults["Power_Issues"] = True
- elif mode == '8':
- faults["Delayed_Response"] = True
- elif mode == '9':
- faults["Sensor_CrossTalk"] = True
- elif mode == '10':
- faults["LED1_Failure"] = True
- elif mode == '11':
- faults["LED2_Failure"] = True
- elif mode == '12':
- faults["LED3_Failure"] = True
- # Add more fault simulations as needed
- logging.info(f"Simulated Fault Mode: {FAULT_MODES[mode]}")
- print(f"Simulated Fault Mode: {FAULT_MODES[mode]}")
- return {"message": FAULT_MODES[mode]}
- @app.post("/set_target_lux")
- def set_target_lux(request: SetTargetLuxRequest):
- target_lux = request.target_lux
- pid.setpoint = target_lux
- logging.info(f"Target lux set to {target_lux}")
- print(f"Target lux set to {target_lux}")
- return {"status": "success", "target_lux": pid.setpoint}
- @app.post("/set_pid")
- def set_pid(request: SetPIDRequest):
- global pid
- if request.Kp is not None:
- pid.Kp = request.Kp
- logging.info(f"PID Kp set to {pid.Kp}")
- if request.Ki is not None:
- pid.Ki = request.Ki
- logging.info(f"PID Ki set to {pid.Ki}")
- if request.Kd is not None:
- pid.Kd = request.Kd
- logging.info(f"PID Kd set to {pid.Kd}")
- return JSONResponse({
- "status": "success",
- "Kp": pid.Kp,
- "Ki": pid.Ki,
- "Kd": pid.Kd
- })
- @app.post("/set_led")
- def set_led(request: SetLEDRequest):
- led = request.led.upper()
- state = request.state
- if led not in ADDITIONAL_LED_PINS and led not in current_duty and led != "TCS":
- logging.error(f"Invalid LED name attempted: {led}")
- return JSONResponse(status_code=400, content={"error": "Invalid LED name."})
- # Prevent controlling LEDs that are in fault mode
- fault_prevent = False
- with fault_mode_lock:
- if led == "LED1" and fault_mode == '10':
- fault_prevent = True
- elif led == "LED2" and fault_mode == '11':
- fault_prevent = True
- elif led == "LED3" and fault_mode == '12':
- fault_prevent = True
- elif led == "PIR" and fault_mode == '2':
- fault_prevent = True
- elif led == "IR" and fault_mode == '3':
- fault_prevent = True
- elif led == "TCS" and fault_mode == '4':
- fault_prevent = True
- elif fault_mode in ['6', '7', '8', '9'] and led in ['LED1', 'LED2', 'LED3']:
- fault_prevent = True
- if fault_prevent:
- logging.warning(f"Attempted to control {led} while in fault mode.")
- return JSONResponse(status_code=400, content={"error": f"Cannot control {led} in current fault mode."})
- if led == "LED2":
- # Control LED2 directly via GPIO6
- GPIO.output(ADDITIONAL_LED_PINS["LED2"]["gpio"], GPIO.HIGH if state else GPIO.LOW)
- with faults_lock:
- manual_override['LED2'] = True # Activate manual override
- # Reset LED2 Fault Flag if manual control is restored
- if faults.get("LED2_Failure", False):
- faults["LED2_Failure"] = False
- logging.info("Manual control restored for LED2. Fault flag cleared.")
- print("Manual control restored for LED2. Fault flag cleared.")
- logging.info(f"{led} LED set to {'on' if state else 'off'} via manual control.")
- return {"message": f"{led} LED turned {'on' if state else 'off'} via manual control"}
- else:
- # For PWM-controlled LEDs
- duty_cycle = 100 if state else 0
- pwm_instance = None
- if led in ['PIR', 'IR', 'TCS']:
- pwm_instance = globals().get(f"{led}_PWM")
- elif led in additional_pwms:
- pwm_instance = additional_pwms.get(led)
- if pwm_instance:
- control_led_brightness(duty_cycle, led)
- logging.info(f"{led} LED set to {'on' if state else 'off'}.")
- return {"message": f"{led} LED turned {'on' if state else 'off'}."}
- logging.error(f"Failed to set LED: {led}")
- return JSONResponse(status_code=500, content={"error": "Failed to set LED."})
- # ---------------------------- Application Lifecycle Events ----------------------------
- @app.on_event("startup")
- def startup_event():
- initialize_gpio()
- # Start the control loop in a separate daemon thread
- control_thread = threading.Thread(target=control_loop, daemon=True)
- control_thread.start()
- logging.info("Backend server started and control loop initiated.")
- print("Backend server started and control loop initiated.")
- @app.on_event("shutdown")
- def shutdown_event():
- # Stop PWM and clean up GPIO settings
- PIR_PWM.stop()
- IR_PWM.stop()
- TCS_PWM.stop()
- for pwm_instance in additional_pwms.values():
- pwm_instance.stop()
- # Turn off the red LED
- GPIO.output(RED_LED_PIN, GPIO.LOW)
- # Turn off LED2
- GPIO.output(ADDITIONAL_LED_PINS["LED2"]["gpio"], GPIO.LOW)
- GPIO.cleanup()
- logging.info("Backend server shutdown and GPIO cleaned up.")
- print("Backend server shutdown and GPIO cleaned up.")
- # ---------------------------- Run the FastAPI App ----------------------------
- # To run the app, use the following command:
- # uvicorn backend:app --host 0.0.0.0 --port 8000
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement