Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import mysql.connector
- from datetime import datetime
- class Database:
- def __init__(self):
- self.connection = mysql.connector.connect(
- host="192.168.1.2",
- user="root",
- password="Perla17*+",
- database="kardex"
- )
- self.cursor = self.connection.cursor(dictionary=True)
- def execute_query(self, query, params=None):
- self.cursor.execute(query, params or ())
- return self.cursor.fetchall()
- def execute_insert(self, query, params=None):
- self.cursor.execute(query, params or ())
- self.connection.commit()
- return self.cursor.lastrowid
- def get_next_auto_increment(self, table_name):
- self.cursor.execute(f"SHOW TABLE STATUS LIKE '{table_name}'")
- result = self.cursor.fetchone()
- return result['Auto_increment']
- class UnidadProducto:
- def __init__(self, db):
- self.db = db
- def listar(self):
- return self.db.execute_query("SELECT * FROM unidadesproducto")
- def obtener_por_id(self, id_unidad):
- result = self.db.execute_query(
- "SELECT * FROM unidadesproducto WHERE idUnidad = %s",
- (id_unidad,)
- )
- return result[0] if result else None
- class Producto:
- def __init__(self, db):
- self.db = db
- def listar(self):
- return self.db.execute_query("""
- SELECT p.*, u.nombreUnidad
- FROM productos p
- JOIN unidadesproducto u ON p.idUnidadBase = u.idUnidad
- """)
- def obtener(self, id_producto):
- result = self.db.execute_query(
- "SELECT * FROM productos WHERE idProducto = %s",
- (id_producto,)
- )
- return result[0] if result else None
- def obtener_equivalencias(self, id_producto):
- return self.db.execute_query(
- "SELECT * FROM equivalencias WHERE idProducto = %s",
- (id_producto,)
- )
- def existe_nombre(self, nombre):
- result = self.db.execute_query(
- "SELECT idProducto FROM productos WHERE nombreProducto = %s",
- (nombre,)
- )
- return bool(result)
- def insertar(self, codigo, nombre, id_unidad_base, stock):
- return self.db.execute_insert(
- """INSERT INTO productos
- (codigoProducto, nombreProducto, idUnidadBase, stockInicial)
- VALUES (%s, %s, %s, %s)""",
- (codigo, nombre, id_unidad_base, stock)
- )
- def obtener_movimientos(self):
- return self.db.execute_query("""
- SELECT p.idProducto,
- SUM(CASE WHEN m.tipoMovimiento = 'ENTRADA' THEN m.cantidad ELSE 0 END) as entradas,
- SUM(CASE WHEN m.tipoMovimiento = 'SALIDA' THEN m.cantidad ELSE 0 END) as salidas
- FROM productos p
- LEFT JOIN movimientos m ON p.idProducto = m.idProducto
- GROUP BY p.idProducto
- """)
- def print_tabla(header, rows):
- if not rows:
- print("No hay datos para mostrar")
- return
- column_widths = [len(str(campo)) for campo in header]
- for row in rows:
- for i, campo in enumerate(row):
- column_widths[i] = max(column_widths[i], len(str(campo)))
- border = "+" + "+".join(["-" * (w + 2) for w in column_widths]) + "+"
- row_format = "| " + " | ".join(["{:<" + str(w) + "}" for w in column_widths]) + " |"
- print(border)
- print(row_format.format(*header))
- print(border)
- for row in rows:
- print(row_format.format(*row))
- print(border)
- def registrar_unidades(db):
- print("\nREGISTRO DE UNIDADES (0 para volver)")
- nombre = input("Nombre de la unidad: ").strip()
- if nombre == "0":
- return
- if not nombre:
- print("El nombre no puede estar vacío")
- return
- try:
- db.execute_insert(
- "INSERT INTO unidadesproducto (nombreUnidad) VALUES (%s)",
- (nombre,)
- )
- print("\n¡Unidad registrada exitosamente!")
- except Exception as e:
- print(f"\nError al registrar unidad: {str(e)}")
- def listar_unidades(db):
- unidades = UnidadProducto(db).listar()
- if not unidades:
- print("\nNo hay unidades registradas")
- return
- header = ["N°", "NOMBRE DE UNIDAD"]
- rows = [(str(i+1), u['nombreUnidad']) for i, u in enumerate(unidades)]
- print("\nLISTADO DE UNIDADES REGISTRADAS")
- print_tabla(header, rows)
- def listar_productos(db):
- productos = Producto(db).listar()
- if not productos:
- print("\nNo hay productos registrados")
- return
- header = ["N°", "CÓDIGO", "NOMBRE DEL PRODUCTOS", "UNIDAD BASE"]
- rows = []
- for i, p in enumerate(productos, 1):
- rows.append((
- f"{i:02d}",
- p['codigoProducto'],
- p['nombreProducto'],
- p['nombreUnidad']
- ))
- print("\nLISTADO DE PRODUCTOS REGISTRADOS")
- print_tabla(header, rows)
- def registrar_producto(db):
- print("\nREGISTRO DE PRODUCTO (0 para volver)")
- producto = Producto(db)
- unidades = UnidadProducto(db)
- while True:
- nombre = input("Nombre del producto: ").strip()
- if nombre == "0":
- return
- if not nombre:
- print("El nombre no puede estar vacío")
- continue
- if producto.existe_nombre(nombre):
- print("¡El producto ya existe!")
- else:
- break
- # Continúa en la siguiente parte...
- while True:
- try:
- num_medidas = int(input("\n¿Cuántas medidas para cuantificar el producto? (1-4): "))
- if 1 <= num_medidas <= 4:
- break
- print("Debe ser un número entre 1 y 4")
- except ValueError:
- print("Entrada inválida, debe ser un número")
- unidades_seleccionadas = []
- for i in range(num_medidas):
- while True:
- unidades_disponibles = unidades.listar()
- print(f"\nSelección de medida {i+1}:")
- print("Unidades disponibles:")
- for idx, u in enumerate(unidades_disponibles, 1):
- print(f"{idx}. {u['nombreUnidad']}")
- print(f"{len(unidades_disponibles)+1}. Agregar nueva unidad")
- try:
- opcion = int(input("Seleccione una opción: "))
- if 1 <= opcion <= len(unidades_disponibles):
- unidades_seleccionadas.append(unidades_disponibles[opcion-1])
- break
- elif opcion == len(unidades_disponibles)+1:
- nombre_unidad = input("Nombre de la nueva unidad: ").strip()
- if not nombre_unidad:
- print("El nombre no puede estar vacío")
- continue
- db.execute_insert(
- "INSERT INTO unidadesproducto (nombreUnidad) VALUES (%s)",
- (nombre_unidad,)
- )
- print("¡Nueva unidad registrada!")
- else:
- print("Opción inválida")
- except ValueError:
- print("Entrada inválida, debe ser un número")
- equivalencias = []
- for i in range(num_medidas-1):
- while True:
- try:
- cantidad = int(input(
- f"\n1 {unidades_seleccionadas[i+1]['nombreUnidad']} = ¿Cuántas {unidades_seleccionadas[i]['nombreUnidad']}? "
- ))
- if cantidad > 0:
- equivalencias.append((
- unidades_seleccionadas[i+1]['idUnidad'],
- unidades_seleccionadas[i]['idUnidad'],
- cantidad
- ))
- break
- print("La cantidad debe ser mayor a 0")
- except ValueError:
- print("Entrada inválida, debe ser un número")
- while True:
- print("\nSelección de unidad para stock inicial:")
- for idx, u in enumerate(unidades_seleccionadas, 1):
- print(f"{idx}. {u['nombreUnidad']}")
- try:
- opcion = int(input("Seleccione una opción: ")) - 1
- if 0 <= opcion < len(unidades_seleccionadas):
- unidad_stock = unidades_seleccionadas[opcion]
- stock = int(input("Stock inicial: "))
- if stock < 0:
- print("El stock no puede ser negativo")
- continue
- break
- except ValueError:
- print("Entrada inválida")
- factor = 1
- for i in range(opcion):
- factor *= equivalencias[i][2]
- stock_base = stock * factor
- next_id = db.get_next_auto_increment('productos')
- codigo = f"ACU-{next_id:02d}"
- id_producto = producto.insertar(
- codigo,
- nombre,
- unidades_seleccionadas[0]['idUnidad'],
- stock_base
- )
- for sup, inf, cant in equivalencias:
- db.execute_insert(
- """INSERT INTO equivalencias
- (idProducto, idUnidadSuperior, idUnidadInferior, cantidadEquivalente)
- VALUES (%s, %s, %s, %s)""",
- (id_producto, sup, inf, cant)
- )
- fecha_registro = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- header = ["CÓDIGO", "NOMBRE DEL PRODUCTO", "STOCK", "UNIDAD", "F. REGISTRO"]
- row = [codigo, nombre, str(stock), unidad_stock['nombreUnidad'], fecha_registro]
- print("\n¡PRODUCTO REGISTRADO EXITOSAMENTE!")
- print_tabla(header, [row])
- def registrar_entrada(db):
- print("\nREGISTRO DE ENTRADA DE PRODUCTOS (0 para volver)")
- productos = Producto(db).listar()
- if not productos:
- print("No hay productos registrados")
- return
- print("\nSeleccione un producto:")
- header = ["N°", "CÓDIGO", "NOMBRE DEL PRODUCTO", "STOCK", "UNIDAD"]
- rows = [(str(i+1), p['codigoProducto'], p['nombreProducto'], str(p['stockInicial']), p['nombreUnidad'])
- for i, p in enumerate(productos)]
- print_tabla(header, rows)
- try:
- opcion = input("\nIngrese el número de producto (0 para volver): ").strip()
- if opcion == "0":
- return
- opcion = int(opcion) - 1
- producto = productos[opcion]
- except (ValueError, IndexError):
- print("Selección inválida")
- return
- unidades_ids = {producto['idUnidadBase']}
- equivalencias = db.execute_query(
- "SELECT idUnidadSuperior, idUnidadInferior FROM equivalencias WHERE idProducto = %s",
- (producto['idProducto'],)
- )
- for eq in equivalencias:
- unidades_ids.add(eq['idUnidadSuperior'])
- unidades_ids.add(eq['idUnidadInferior'])
- unidades = []
- for uid in unidades_ids:
- unidad = db.execute_query(
- "SELECT * FROM unidadesproducto WHERE idUnidad = %s",
- (uid,)
- )
- if unidad:
- unidades.append(unidad[0])
- if not unidades:
- print("No hay unidades asociadas al producto")
- return
- print("\nUnidades disponibles para este producto:")
- header = ["N°", "NOMBRE DE UNIDAD"]
- rows = [(str(i+1), u['nombreUnidad']) for i, u in enumerate(unidades)]
- print_tabla(header, rows)
- try:
- opcion_unidad = int(input("\nIngrese el número de unidad: ")) - 1
- unidad = unidades[opcion_unidad]
- except (ValueError, IndexError):
- print("Selección inválida")
- return
- factor = 1
- current_unit = unidad['idUnidad']
- base_unit = producto['idUnidadBase']
- equivalencias = db.execute_query(
- "SELECT * FROM equivalencias WHERE idProducto = %s",
- (producto['idProducto'],)
- )
- while current_unit != base_unit:
- encontrado = False
- for eq in equivalencias:
- if eq['idUnidadSuperior'] == current_unit:
- factor *= eq['cantidadEquivalente']
- current_unit = eq['idUnidadInferior']
- encontrado = True
- break
- if not encontrado:
- print("No se puede convertir a la unidad base")
- return
- try:
- stock = int(input("\nStock de ingreso: "))
- if stock <= 0:
- print("El stock debe ser mayor a 0")
- return
- except ValueError:
- print("Valor inválido")
- return
- documentos = db.execute_query("SELECT * FROM documentos")
- print("\nSeleccione tipo de documento:")
- for i, doc in enumerate(documentos, 1):
- print(f"{i}. {doc['nombreDocumento']}")
- try:
- doc_opcion = int(input("Opción: ")) - 1
- documento = documentos[doc_opcion]
- except (ValueError, IndexError):
- print("Selección inválida")
- return
- numero_doc = input("Número de documento: ").strip()
- # Seleccionar proveedor
- proveedores = db.execute_query(
- "SELECT * FROM entidades WHERE tipoEntidad = 'EMPRESA'"
- )
- if not proveedores:
- print("\nNo hay proveedores registrados. Registre uno primero.")
- return
- print("\nLISTADO DE PROVEEDORES REGISTRADOS")
- header = ["N°", "TIPO", "DOCUMENTO", "NOMBRE"]
- rows = [(str(i+1), p['tipoDocumento'], p['numeroDocumento'], p['nomEntidad'])
- for i, p in enumerate(proveedores)]
- print_tabla(header, rows)
- try:
- opcion = int(input("\nSeleccione proveedor (0 para cancelar): "))
- if opcion == 0:
- return
- proveedor = proveedores[opcion-1]['numeroDocumento']
- except:
- print("Selección inválida")
- return
- fecha = input("Ingrese fecha (DD-MM-AA) o 0 para fecha actual: ").strip()
- if fecha == '0':
- fecha_movimiento = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- else:
- try:
- fecha_dt = datetime.strptime(fecha, '%d-%m-%y')
- fecha_movimiento = fecha_dt.strftime('%Y-%m-%d %H:%M:%S')
- except ValueError:
- print("Formato de fecha inválido, usando fecha actual")
- fecha_movimiento = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- stock_base = stock * factor
- try:
- db.execute_insert(
- """INSERT INTO movimientos
- (tipoMovimiento, idProducto, cantidad, idDocumento,
- numeroDocumento, proveedor, fechaMovimiento)
- VALUES (%s, %s, %s, %s, %s, %s, %s)""",
- ('ENTRADA', producto['idProducto'], stock_base,
- documento['idDocumento'], numero_doc, proveedor, fecha_movimiento)
- )
- print("\n¡Entrada registrada exitosamente!")
- except Exception as e:
- print(f"Error al registrar entrada: {str(e)}")
- def registrar_salida(db):
- print("\nREGISTRO DE SALIDA DE PRODUCTOS (0 para volver)")
- productos = Producto(db).listar()
- if not productos:
- print("No hay productos registrados")
- return
- print("\nSeleccione un producto:")
- header = ["N°", "CÓDIGO", "NOMBRE DEL PRODUCTO", "STOCK", "UNIDAD"]
- rows = [(str(i+1), p['codigoProducto'], p['nombreProducto'], str(p['stockInicial']), p['nombreUnidad'])
- for i, p in enumerate(productos)]
- print_tabla(header, rows)
- try:
- opcion = input("\nIngrese el número de producto (0 para volver): ").strip()
- if opcion == "0":
- return
- opcion = int(opcion) - 1
- producto = productos[opcion]
- except (ValueError, IndexError):
- print("Selección inválida")
- return
- unidades_ids = {producto['idUnidadBase']}
- equivalencias = db.execute_query(
- "SELECT idUnidadSuperior, idUnidadInferior FROM equivalencias WHERE idProducto = %s",
- (producto['idProducto'],)
- )
- for eq in equivalencias:
- unidades_ids.add(eq['idUnidadSuperior'])
- unidades_ids.add(eq['idUnidadInferior'])
- unidades = []
- for uid in unidades_ids:
- unidad = db.execute_query(
- "SELECT * FROM unidadesproducto WHERE idUnidad = %s",
- (uid,)
- )
- if unidad:
- unidades.append(unidad[0])
- if not unidades:
- print("No hay unidades asociadas al producto")
- return
- print("\nUnidades disponibles para este producto:")
- header = ["N°", "NOMBRE DE UNIDAD"]
- rows = [(str(i+1), u['nombreUnidad']) for i, u in enumerate(unidades)]
- print_tabla(header, rows)
- try:
- opcion_unidad = int(input("\nIngrese el número de unidad: ")) - 1
- unidad = unidades[opcion_unidad]
- except (ValueError, IndexError):
- print("Selección inválida")
- return
- factor = 1
- current_unit = unidad['idUnidad']
- base_unit = producto['idUnidadBase']
- equivalencias = db.execute_query(
- "SELECT * FROM equivalencias WHERE idProducto = %s",
- (producto['idProducto'],)
- )
- while current_unit != base_unit:
- encontrado = False
- for eq in equivalencias:
- if eq['idUnidadSuperior'] == current_unit:
- factor *= eq['cantidadEquivalente']
- current_unit = eq['idUnidadInferior']
- encontrado = True
- break
- if not encontrado:
- print("No se puede convertir a la unidad base")
- return
- try:
- stock = int(input("\nStock de salida: "))
- if stock <= 0:
- print("El stock debe ser mayor a 0")
- return
- stock_base_total = producto['stockInicial'] + sum(
- m['cantidad'] for m in db.execute_query(
- "SELECT cantidad FROM movimientos WHERE idProducto = %s AND tipoMovimiento = 'ENTRADA'",
- (producto['idProducto'],)
- )
- ) - sum(
- m['cantidad'] for m in db.execute_query(
- "SELECT cantidad FROM movimientos WHERE idProducto = %s AND tipoMovimiento = 'SALIDA'",
- (producto['idProducto'],)
- )
- )
- if (stock * factor) > stock_base_total:
- print("No hay suficiente stock disponible")
- return
- except ValueError:
- print("Valor inválido")
- return
- # Seleccionar usuario
- usuarios = db.execute_query(
- "SELECT * FROM entidades WHERE tipoEntidad = 'USUARIO'"
- )
- if not usuarios:
- print("\nNo hay usuarios registrados. Registre uno primero.")
- return
- print("\nLISTADO DE USUARIOS REGISTRADOS")
- header = ["N°", "TIPO", "DOCUMENTO", "NOMBRE"]
- rows = [(str(i+1), u['tipoDocumento'], u['numeroDocumento'], u['nomEntidad'])
- for i, u in enumerate(usuarios)]
- print_tabla(header, rows)
- try:
- opcion = int(input("\nSeleccione usuario (0 para cancelar): "))
- if opcion == 0:
- return
- usuario = usuarios[opcion-1]['numeroDocumento']
- except:
- print("Selección inválida")
- return
- fecha = input("Ingrese fecha (DD-MM-AA) o 0 para fecha actual: ").strip()
- if fecha == '0':
- fecha_movimiento = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- else:
- try:
- fecha_dt = datetime.strptime(fecha, '%d-%m-%y')
- fecha_movimiento = fecha_dt.strftime('%Y-%m-%d %H:%M:%S')
- except ValueError:
- print("Formato de fecha inválido, usando fecha actual")
- fecha_movimiento = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- stock_base = stock * factor
- try:
- db.execute_insert(
- """INSERT INTO movimientos
- (tipoMovimiento, idProducto, cantidad, proveedor, fechaMovimiento)
- VALUES (%s, %s, %s, %s, %s)""",
- ('SALIDA', producto['idProducto'], stock_base, usuario, fecha_movimiento)
- )
- print("\n¡Salida registrada exitosamente!")
- except Exception as e:
- print(f"Error al registrar salida: {str(e)}")
- def reporte_movimientos(db):
- productos = Producto(db).listar()
- movimientos = Producto(db).obtener_movimientos()
- if not productos:
- print("\nNo hay productos registrados")
- return
- header = ["N°", "CÓDIGO", "NOMBRE DEL PRODUCTO", "STOCK BASE",
- "STOCK INGRESO", "STOCK SALIDA", "STOCK REAL", "UNIDAD"]
- rows = []
- for i, p in enumerate(productos, 1):
- mov = next((m for m in movimientos if m['idProducto'] == p['idProducto']), None)
- entradas = mov['entradas'] if mov else 0
- salidas = mov['salidas'] if mov else 0
- stock_real = p['stockInicial'] + entradas - salidas
- rows.append((
- f"{i:02d}",
- p['codigoProducto'],
- p['nombreProducto'],
- str(p['stockInicial']),
- str(entradas),
- str(salidas),
- str(stock_real),
- p['nombreUnidad']
- ))
- print("\nREPORTE DE MOVIMIENTOS")
- print_tabla(header, rows)
- def reporte_ingresos(db, por_producto=False):
- query = """
- SELECT m.*, p.codigoProducto, p.nombreProducto,
- u.nombreUnidad, e.nomEntidad
- FROM movimientos m
- JOIN productos p ON m.idProducto = p.idProducto
- JOIN unidadesproducto u ON p.idUnidadBase = u.idUnidad
- LEFT JOIN entidades e ON m.proveedor = e.numeroDocumento
- WHERE m.tipoMovimiento = 'ENTRADA'
- """
- if por_producto:
- productos = db.execute_query("SELECT * FROM productos")
- print("\nSeleccione un producto:")
- header = ["N°", "CÓDIGO", "NOMBRE DEL PRODUCTO"]
- rows = [(str(i+1), p['codigoProducto'], p['nombreProducto'])
- for i, p in enumerate(productos)]
- print_tabla(header, rows)
- try:
- opcion = int(input("\nIngrese el número de producto: ")) - 1
- id_producto = productos[opcion]['idProducto']
- query += f" AND m.idProducto = {id_producto}"
- except:
- print("Selección inválida")
- return
- movimientos = db.execute_query(query)
- if not movimientos:
- print("\nNo hay movimientos registrados")
- return
- header = ["N°", "CÓDIGO", "PRODUCTO", "CANTIDAD", "UNIDAD", "PROVEEDOR", "FECHA"]
- rows = []
- for i, m in enumerate(movimientos, 1):
- rows.append((
- f"{i:02d}",
- m['codigoProducto'],
- m['nombreProducto'],
- str(m['cantidad']),
- m['nombreUnidad'],
- m['nomEntidad'] or 'N/A',
- m['fechaMovimiento'].strftime('%d/%m/%Y')
- ))
- print("\nREPORTE DE INGRESOS" + (" POR PRODUCTO" if por_producto else ""))
- print_tabla(header, rows)
- def reporte_salidas(db, por_producto=False):
- query = """
- SELECT m.*, p.codigoProducto, p.nombreProducto,
- u.nombreUnidad, e.nomEntidad
- FROM movimientos m
- JOIN productos p ON m.idProducto = p.idProducto
- JOIN unidadesproducto u ON p.idUnidadBase = u.idUnidad
- LEFT JOIN entidades e ON m.proveedor = e.numeroDocumento
- WHERE m.tipoMovimiento = 'SALIDA'
- ORDER BY m.fechaMovimiento DESC
- """
- if por_producto:
- productos = db.execute_query("SELECT * FROM productos")
- print("\nSeleccione un producto:")
- header = ["N°", "CÓDIGO", "NOMBRE DEL PRODUCTO"]
- rows = [(str(i+1), p['codigoProducto'], p['nombreProducto'])
- for i, p in enumerate(productos)]
- print_tabla(header, rows)
- try:
- opcion = int(input("\nIngrese el número de producto: ")) - 1
- id_producto = productos[opcion]['idProducto']
- query += f" AND m.idProducto = {id_producto}"
- except:
- print("Selección inválida")
- return
- movimientos = db.execute_query(query)
- if not movimientos:
- print("\nNo hay movimientos registrados")
- return
- header = ["N°", "CÓDIGO", "PRODUCTO", "CANTIDAD", "UNIDAD", "USUARIO", "FECHA"]
- rows = []
- for i, m in enumerate(movimientos, 1):
- rows.append((
- f"{i:02d}",
- m['codigoProducto'],
- m['nombreProducto'],
- str(m['cantidad']),
- m['nombreUnidad'],
- m['nomEntidad'] or 'N/A',
- m['fechaMovimiento'].strftime('%d/%m/%Y')
- ))
- print("\nREPORTE DE SALIDAS" + (" POR PRODUCTO" if por_producto else ""))
- print_tabla(header, rows)
- def reporte_salidas_por_producto(db):
- """
- Genera un reporte de salidas agrupadas por producto.
- """
- print("\nSeleccione un producto:")
- productos = db.execute_query("SELECT DISTINCT p.idProducto, p.codigoProducto, p.nombreProducto FROM movimientos m "
- "JOIN productos p ON m.idProducto = p.idProducto "
- "WHERE m.tipoMovimiento = 'SALIDA'")
- if not productos:
- print("No hay registros de salidas por producto.")
- return
- # Mostrar lista de productos
- header = ["N°", "CÓDIGO", "NOMBRE DEL PRODUCTO"]
- rows = [(str(i+1), p['codigoProducto'], p['nombreProducto']) for i, p in enumerate(productos)]
- print_tabla(header, rows)
- try:
- opcion = int(input("\nIngrese el número de producto: ")) - 1
- if opcion < 0 or opcion >= len(productos):
- print("Selección inválida")
- return
- producto_seleccionado = productos[opcion]['idProducto']
- except ValueError:
- print("Entrada inválida")
- return
- # Obtener movimientos del producto seleccionado
- query = """
- SELECT m.*, p.codigoProducto, p.nombreProducto, u.nombreUnidad, e.nomEntidad, m.fechaMovimiento
- FROM movimientos m
- JOIN productos p ON m.idProducto = p.idProducto
- JOIN unidadesproducto u ON p.idUnidadBase = u.idUnidad
- LEFT JOIN entidades e ON m.proveedor = e.numeroDocumento
- WHERE m.tipoMovimiento = 'SALIDA' AND m.idProducto = %s
- """
- movimientos = db.execute_query(query, (producto_seleccionado,))
- if not movimientos:
- print("No hay salidas registradas para este producto.")
- return
- # Ordenar los resultados por fecha de manera descendente
- movimientos.sort(key=lambda x: x['fechaMovimiento'], reverse=True)
- # Mostrar resultados
- header = ["N°", "CÓDIGO", "PRODUCTO", "CANTIDAD", "UNIDAD", "USUARIO", "FECHA"]
- rows = [(str(i+1), m['codigoProducto'], m['nombreProducto'], str(m['cantidad']), m['nombreUnidad'],
- m['nomEntidad'] if m['nomEntidad'] else "N/A", m['fechaMovimiento'].strftime('%d/%m/%Y')) for i, m in enumerate(movimientos)]
- print("\nREPORTE DE SALIDAS POR PRODUCTO")
- print_tabla(header, rows)
- def reporte_salidas_por_usuario(db):
- """
- Genera un reporte de salidas agrupadas por usuario.
- """
- print("\nSeleccione un usuario:")
- usuarios = db.execute_query("SELECT DISTINCT e.numeroDocumento, e.nomEntidad FROM movimientos m "
- "JOIN entidades e ON m.proveedor = e.numeroDocumento "
- "WHERE m.tipoMovimiento = 'SALIDA'")
- if not usuarios:
- print("No hay registros de salidas por usuario.")
- return
- # Mostrar lista de usuarios
- header = ["N°", "DOCUMENTO", "NOMBRE"]
- rows = [(str(i+1), u['numeroDocumento'], u['nomEntidad']) for i, u in enumerate(usuarios)]
- print_tabla(header, rows)
- try:
- opcion = int(input("\nIngrese el número de usuario: ")) - 1
- if opcion < 0 or opcion >= len(usuarios):
- print("Selección inválida")
- return
- usuario_seleccionado = usuarios[opcion]['numeroDocumento']
- except ValueError:
- print("Entrada inválida")
- return
- # Obtener movimientos del usuario seleccionado
- query = """
- SELECT m.*, p.codigoProducto, p.nombreProducto, u.nombreUnidad, m.fechaMovimiento
- FROM movimientos m
- JOIN productos p ON m.idProducto = p.idProducto
- JOIN unidadesproducto u ON p.idUnidadBase = u.idUnidad
- WHERE m.tipoMovimiento = 'SALIDA' AND m.proveedor = %s
- ORDER BY m.fechaMovimiento DESC
- """
- movimientos = db.execute_query(query, (usuario_seleccionado,))
- if not movimientos:
- print("No hay salidas registradas para este usuario.")
- return
- # Ordenar los resultados por fecha de manera descendente
- movimientos.sort(key=lambda x: x['fechaMovimiento'], reverse=True)
- # Mostrar resultados
- header = ["N°", "CÓDIGO", "PRODUCTO", "CANTIDAD", "UNIDAD", "FECHA"]
- rows = [(str(i+1), m['codigoProducto'], m['nombreProducto'], str(m['cantidad']), m['nombreUnidad'],
- m['fechaMovimiento'].strftime('%d/%m/%Y')) for i, m in enumerate(movimientos)]
- print("\nREPORTE DE SALIDAS POR USUARIO")
- print_tabla(header, rows)
- def registrar_entidad(db):
- print("\nREGISTRO DE ENTIDAD (0 para volver)")
- while True:
- print("Tipo de entidad:")
- print("1. USUARIO")
- print("2. EMPRESA")
- print("0. Volver")
- opcion_tipo = input("Seleccione una opción: ").strip()
- if opcion_tipo == "0":
- return
- if opcion_tipo == '1':
- tipo_entidad = 'USUARIO'
- tipo_doc = 'DNI'
- break
- elif opcion_tipo == '2':
- tipo_entidad = 'EMPRESA'
- tipo_doc = 'RUC'
- break
- else:
- print("Opción inválida")
- while True:
- numero_doc = input(f"Ingrese {tipo_doc} (1 para generar DNI especial): ").strip()
- if tipo_doc == 'DNI':
- if numero_doc == "1":
- # Se consulta el máximo DNI especial (los que comienzan con '999')
- resultado = db.execute_query(
- "SELECT MAX(numeroDocumento) AS maxDNI FROM entidades WHERE numeroDocumento LIKE '999%'"
- )
- if resultado and resultado[0]['maxDNI'] is not None:
- nuevo_dni = int(resultado[0]['maxDNI']) + 1
- else:
- nuevo_dni = 99900001 # Valor inicial si no existe ningún DNI especial
- numero_doc = str(nuevo_dni)
- elif numero_doc == "0":
- return
- # Si el número ingresado NO es un DNI especial (no comienza con "999"),
- # se verifica que no exista ya en la base de datos.
- if not numero_doc.startswith("999"):
- existe = db.execute_query(
- "SELECT idEntidad FROM entidades WHERE numeroDocumento = %s",
- (numero_doc,)
- )
- if existe:
- print(f"¡El {tipo_doc} ya existe en el sistema!")
- continue
- if tipo_doc == 'DNI' and len(numero_doc) == 8 and numero_doc.isdigit():
- break
- elif tipo_doc == 'RUC' and len(numero_doc) == 11 and numero_doc.isdigit():
- break
- print(f"{tipo_doc} inválido. Debe tener {8 if tipo_doc == 'DNI' else 11} dígitos")
- nombre_entidad = input("Nombre de la entidad: ").strip()
- try:
- db.execute_insert(
- """INSERT INTO entidades
- (tipoEntidad, tipoDocumento, numeroDocumento, nomEntidad)
- VALUES (%s, %s, %s, %s)""",
- (tipo_entidad, tipo_doc, numero_doc, nombre_entidad)
- )
- print("\n¡Entidad registrada exitosamente!")
- entidad = db.execute_query(
- "SELECT * FROM entidades ORDER BY idEntidad DESC LIMIT 1"
- )[0]
- header = ["N°", "TIPOENTIDAD", "TIPODOCUMENTO", "NUMERODOCUMENTO", "NOMENTIDAD", "FECHAREGISTRO"]
- rows = [(
- "01",
- entidad['tipoEntidad'],
- entidad['tipoDocumento'],
- entidad['numeroDocumento'],
- entidad['nomEntidad'],
- entidad['fechaRegistro'].strftime('%d/%m/%Y')
- )]
- print_tabla(header, rows)
- except mysql.connector.IntegrityError:
- print("\nError: El número de documento ya existe en el sistema")
- except Exception as e:
- print(f"\nError al registrar entidad: {str(e)}")
- def listar_entidades(db):
- entidades = db.execute_query("SELECT * FROM entidades")
- if not entidades:
- print("\nNo hay entidades registradas")
- return
- header = ["N°", "TIPOENTIDAD", "TIPODOCUMENTO", "NUMERODOCUMENTO", "NOMENTIDAD", "FECHAREGISTRO"]
- rows = []
- for i, entidad in enumerate(entidades, 1):
- rows.append((
- f"{i:02d}",
- entidad['tipoEntidad'],
- entidad['tipoDocumento'],
- entidad['numeroDocumento'],
- entidad['nomEntidad'],
- entidad['fechaRegistro'].strftime('%d/%m/%Y')
- ))
- print("\nLISTADO DE ENTIDADES REGISTRADAS")
- print_tabla(header, rows)
- def menu_reportes_ingresos(db):
- while True:
- print("\nREPORTES DE INGRESOS")
- print("1. Reporte general de todos los ingresos")
- print("2. Reporte de ingresos por producto")
- print("3. Volver al menú principal")
- opcion = input("Seleccione una opción: ").strip()
- if opcion == '1':
- reporte_ingresos(db)
- elif opcion == '2':
- reporte_ingresos(db, por_producto=True)
- elif opcion == '3':
- break
- else:
- print("Opción inválida")
- def menu_reportes_salidas(db):
- while True:
- print("\nREPORTES DE SALIDAS")
- print("1. Reporte general de todas las salidas")
- print("2. Reporte de salidas por producto")
- print("3. Reporte de salidas por usuario") # Nueva opción
- print("4. Volver al menú principal")
- opcion = input("Seleccione una opción: ").strip()
- if opcion == '1':
- reporte_salidas(db)
- elif opcion == '2':
- reporte_salidas_por_producto(db)
- elif opcion == '3':
- reporte_salidas_por_usuario(db)
- elif opcion == '4':
- break
- else:
- print("Opción inválida")
- def main():
- db = Database()
- while True:
- print("\n" + "=" * 40)
- print("SISTEMA DE GESTIÓN DE PRODUCTOS")
- print("=" * 40)
- print("1. Registrar unidad de medida")
- print("2. Listar unidades registradas")
- print("3. Registrar nuevo producto")
- print("4. Listar productos registrados")
- print("5. Registrar entidad (proveedor/usuario)")
- print("6. Listar entidades registradas")
- print("7. Registrar entrada de productos")
- print("8. Registrar salida de productos")
- print("9. Reporte general de movimientos")
- print("10. Reportes de ingresos")
- print("11. Reportes de salidas")
- print("12. Salir del sistema")
- opcion = input("\nSeleccione una opción: ").strip()
- if opcion == '1':
- registrar_unidades(db)
- elif opcion == '2':
- listar_unidades(db)
- elif opcion == '3':
- registrar_producto(db)
- elif opcion == '4':
- listar_productos(db)
- elif opcion == '5':
- registrar_entidad(db)
- elif opcion == '6':
- listar_entidades(db)
- elif opcion == '7':
- registrar_entrada(db)
- elif opcion == '8':
- registrar_salida(db)
- elif opcion == '9':
- reporte_movimientos(db)
- elif opcion == '10':
- menu_reportes_ingresos(db)
- elif opcion == '11':
- menu_reportes_salidas(db)
- elif opcion == '12':
- print("\n¡Gracias por usar el sistema!")
- break
- else:
- print("\nOpción inválida, intente nuevamente")
- input("\nPresione Enter para continuar...")
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement