Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import logging
- import mimetypes
- import os
- import re
- import smtplib
- import socket
- import sys
- import traceback
- from copy import deepcopy
- from email.headerregistry import Address
- from email.message import EmailMessage
- from itertools import groupby
- from operator import itemgetter
- from pathlib import Path
- from typing import Dict, List
- import pyexcel
- import tabulate
- __version__ = "1.2.0 (20231019)"
- __author__ = "科技部 · 牛逼科 · 数智组"
- __email__ = "mumu@mumu.com"
- class BatchEmailParseError(Exception):
- pass
- class BatchEmailSendError(Exception):
- pass
- def _filter_row(row_index, row):
- """
- 过滤空行 helper 函数
- """
- result = [element for element in row if element != ""]
- return len(result) == 0
- class BatchEmailSender:
- """
- 批量邮件发送
- """
- LOG_FORMAT = "[%(asctime)s.%(msecs)03d] [%(levelname)5s] | %(message)s"
- LOG_DATE_FMT = "%Y-%m-%d %H:%M:%S"
- LOG_LEVEL = logging.DEBUG
- SENDMAIL_ALL_OPTIONS = {"是否发送", "附件干预"}
- def __init__(self, /, **kwargs):
- """
- 批量邮件发送类初始化
- @kwparam config_file : 邮件分发配置文件(xlsx 文件格式)
- @kwparam [opt] log_file : 日志文件路径
- """
- # 运行环境相关信息
- self.cur_dir = Path(sys.argv[0]).parent
- # 方便起见,构造函数中直接对所有命名参数赋给实例
- for arg_k, arg_v in kwargs.items():
- setattr(self, arg_k, arg_v)
- self.set_logger()
- # SMTP 配置组,键为参数配置中的配置名,值为包括 SMTP server 信息的登录信息字典
- self.smtp_servers: Dict[str, Dict] = {}
- def set_logger(self):
- """
- 日志设置
- """
- if not hasattr(self, "log_file"):
- cur_file = Path(sys.argv[0])
- self.log_file = cur_file.parent / (cur_file.stem + ".log")
- self.o_logfile = Path(self.log_file)
- self.o_logfile.parent.mkdir(parents=True, exist_ok=True)
- self.logger = logging.getLogger(self.o_logfile.stem)
- self.logger.setLevel(self.LOG_LEVEL)
- handler = logging.FileHandler(self.o_logfile, encoding="UTF-8")
- handler.setFormatter(logging.Formatter(self.LOG_FORMAT, self.LOG_DATE_FMT))
- self.logger.addHandler(handler)
- self.logger.info("-" * 80)
- self.logger.info(f"{Path(sys.argv[0]).stem}: 程序开始运行")
- def parse_config(self):
- """
- 解析 Excel 配置文件
- """
- if not Path(self.config_file).is_file():
- self.logger.error(f"配置文件 [{self.config_file}] 不存在!")
- raise BatchEmailParseError(f"配置文件解析错误:{self.config_file}")
- sys.exit(1)
- self.logger.info(f"配置文件解析中 [{self.config_file}]")
- book = pyexcel.get_book(file_name=self.config_file)
- # 参数表 Sheet 设置
- self.sheet_config = book["参数配置"]
- self.sheet_config.name_columns_by_row(0)
- del self.sheet_config.row[_filter_row]
- # 仅供日志输出用
- sheet_config_view = list(self.sheet_config.records)
- for d in sheet_config_view:
- d["登录密码"] = "******"
- self.logger.debug(
- "参数配置信息:\n"
- + tabulate.tabulate(sheet_config_view, headers="keys", tablefmt="simple")
- )
- # 邮件分发信息
- self.sheet_data = book["邮件分发"]
- self.sheet_data.name_columns_by_row(0)
- del self.sheet_data.row[_filter_row]
- self.logger.info(f"配置文件解析完成")
- print(f"配置文件解析完成")
- def smtp_login(self, timeout=5, /, **login_info):
- """
- 连接至邮箱服务器
- """
- (
- profile,
- smtp_host,
- smtp_port,
- smtp_ssl,
- sender_mail,
- sender_password,
- ) = itemgetter("配置名称", "邮箱服务器", "邮箱服务端口", "SSL加密", "发件人邮箱", "登录密码")(login_info)
- auth_status = False
- smtp_info = f"{sender_mail}/******@{smtp_host}:{smtp_port}"
- smtp_method = smtplib.SMTP_SSL if smtp_ssl == "是" else smtplib.SMTP
- try:
- self.logger.info(f"尝试登录至 SMTP 服务器 ({smtp_info})")
- server = smtp_method(smtp_host, smtp_port, timeout=timeout)
- server.login(sender_mail, sender_password)
- except smtplib.SMTPAuthenticationError:
- self.logger.error(f"发件人信息配置错误:{smtp_info}")
- print(f"发件人信息配置错误,详见日志 {self.o_logfile.resolve()}")
- except (smtplib.SMTPConnectError, socket.gaierror):
- self.logger.error(f"邮箱服务器配置错误:{smtp_info}")
- print(f"邮件服务器配置错误,详见日志 {self.o_logfile.resolve()}")
- except (socket.timeout, smtplib.SMTPServerDisconnected):
- self.logger.error(f"连接邮箱服务器超时:{smtp_info}")
- print(f"连接邮箱服务器超时,详见日志 {self.o_logfile.resolve()}")
- else:
- print(f"成功连接邮箱服务器 ({smtp_info})\n")
- self.logger.info(f"成功连接邮箱服务器 ({smtp_info})")
- self.smtp_servers[profile] = {
- "server": server,
- **login_info,
- }
- auth_status = True
- return auth_status
- @staticmethod
- def _parse_single_option(option_str):
- """
- 解析单条选项
- """
- mat = re.search(r"(\S+)\s*[::]\s*(\S+)", option_str)
- if (
- mat
- and len(mat.groups()) == 2
- and mat.group(1) in BatchEmailSender.SENDMAIL_ALL_OPTIONS
- and mat.group(2)
- ):
- option = {mat.group(1): mat.group(2)}
- return option
- else:
- return dict()
- @staticmethod
- def parse_options(option_str):
- """
- 解析单元格选项(可为多个)
- """
- options_res = {}
- raw_options = list(map(str.strip, re.split(r"[\n;;]+", option_str.strip())))
- for option in raw_options:
- options_res.update(BatchEmailSender._parse_single_option(option))
- return options_res
- def run(self):
- """
- 批量发送邮件
- """
- self.parse_config()
- # 发送顺序:对 (发件配置, 序号) 进行排列,每个发件配置内依次发件
- send_data = list(
- map(
- itemgetter(1),
- (
- sorted(
- enumerate(self.sheet_data.records),
- key=lambda x: (x[1]["发件配置"], x[0]),
- )
- ),
- )
- )
- # 无待发邮件
- if not send_data:
- print("无待发邮件,退出程序")
- self.logger.warning("无待发邮件,退出程序")
- return
- send_data_view = deepcopy(send_data)
- for record in send_data_view:
- if len(record["邮件正文"]) > 60:
- record["邮件正文"] = record["邮件正文"][:30] + " ... " + record["邮件正文"][-30:]
- self.logger.info(
- "发件信息:\n"
- + tabulate.tabulate(
- send_data_view,
- headers="keys",
- tablefmt="simple",
- maxcolwidths=[40] * len(self.sheet_data.colnames),
- )
- )
- # 邮件发送结果
- send_result = {}
- self.logger.info("邮件准备分发中...")
- self.logger.info("")
- print("邮件准备分发...\n")
- # 分组发送邮件
- for profile, send_info in groupby(send_data, key=itemgetter("发件配置")):
- # 根据 profile 获取发送配置信息
- for profile_record in self.sheet_config.records:
- if profile_record["配置名称"] == profile:
- login_info = profile_record
- break
- else:
- print(f"没有找到 <{profile}> 的配置信息")
- continue
- # 尝试登录
- login_status = self.smtp_login(**login_info)
- # 如登录失败,尝试登录下一组配置
- if not login_status:
- continue
- # 防止迭代器失效,提前展开并存储
- send_info = list(send_info)
- for info in send_info:
- # 发件人
- sender_username, sender_domain = self.smtp_servers[profile][
- "发件人邮箱"
- ].split("@")
- sender_display_name = self.smtp_servers[profile]["发件人显示名"].strip()
- if sender_display_name:
- mail_sender_address = Address(
- sender_display_name, sender_username, sender_domain
- )
- else:
- mail_sender_address = Address(sender_username, sender_domain)
- # 收件人组
- mail_receiver_address = self.parse_address(info["收件人"])
- mail_receiver_view = "; ".join(
- f"({i}) {c}" for i, c in enumerate(mail_receiver_address, 1)
- )
- # 抄送组
- mail_cc_address = self.parse_address(info["抄送"])
- mail_cc_view = "; ".join(
- f"({i}) {c}" for i, c in enumerate(mail_cc_address, 1)
- )
- # 密送组
- mail_bcc_address = self.parse_address(info["密送"])
- mail_bcc_view = "; ".join(
- f"({i}) {c}" for i, c in enumerate(mail_bcc_address, 1)
- )
- # 主题
- mail_subject = info["邮件主题"]
- # 正文
- mail_content_view = mail_content = info["邮件正文"]
- if len(mail_content) > 100:
- mail_content_view = mail_content[:50] + " ... " + mail_content[-50:]
- # 附件路径
- mail_attaches = []
- if info["附件路径"].strip():
- mail_attaches = list(
- map(
- lambda s: s.strip(' "'),
- re.split(r"\s*[,;]\s*", info["附件路径"].strip()),
- )
- )
- # 发送选项
- mail_options = BatchEmailSender.parse_options(info["发送选项"])
- # -------------------------------------------------------
- # 日志输出发件信息
- self.logger.info(f"发件人:{mail_sender_address}")
- self.logger.info(f"收件人:{mail_receiver_view}")
- if mail_cc_address:
- self.logger.info(f"抄送:{mail_cc_view}")
- if mail_bcc_address:
- self.logger.info(f"密送:{mail_bcc_view}")
- self.logger.info(f"主题:{mail_subject}")
- self.logger.info(f"正文:{mail_content_view}")
- if mail_attaches:
- self.logger.info("附件:" + "; ".join(mail_attaches))
- for option_idx, (option_key, option_val) in enumerate(
- mail_options.items(), 1
- ):
- self.logger.info(
- f"选项 #{option_idx} | [{option_key}]: [{option_val}]"
- )
- # -------------------------------------------------------
- send_info_abstract = "\n".join(
- filter(
- len,
- [
- f"配置:{profile}",
- f"发件:{mail_sender_address}",
- "收件:" + "; ".join(map(str, mail_receiver_address)),
- ("抄送:" + "; ".join(map(str, mail_cc_address)))
- if mail_cc_address
- else "",
- ("密送:" + "; ".join(map(str, mail_bcc_address)))
- if mail_bcc_address
- else "",
- f"主题:{mail_subject}",
- ],
- )
- )
- print(f"邮件发送中...\n{send_info_abstract}")
- # 邮件发送干预
- # -----------
- # 问题附件路径(不符文件名规范或该路径下文件不存在)
- problem_attaches = []
- # :: 是否发送 :: 如该选项为「否」,直接停止发送
- if mail_options.get("是否发送", "是") == "否":
- print("发送选项中该邮件设置为不发送!")
- print("-" * 40, end="\n\n")
- self.logger.warning("邮件发送失败")
- self.logger.info("")
- send_result["失败"] = send_result.get("失败", 0) + 1
- continue
- # :: 附件干预 :: 如该选项为「否」,直接停止发送
- if mail_options.get("附件干预", "否") == "是":
- for attach_file in mail_attaches:
- if not (
- Path(attach_file).is_file() and Path(attach_file).exists()
- ):
- problem_attaches.append(attach_file)
- if problem_attaches:
- problem_attaches_str = "、".join(problem_attaches)
- print(f"附件 {problem_attaches_str} 不存在,不发送该邮件")
- print("-" * 40, end="\n\n")
- send_result["失败"] = send_result.get("失败", 0) + 1
- self.logger.warning("邮件发送失败")
- self.logger.info("")
- continue
- # -------------------------------------------------------
- try:
- self.send_email(
- self.smtp_servers[profile]["server"],
- mail_sender_address,
- mail_receiver_address,
- mail_cc_address,
- mail_bcc_address,
- mail_subject,
- mail_content,
- mail_attaches,
- )
- except Exception as e:
- send_result["失败"] = send_result.get("失败", 0) + 1
- self.logger.exception(e)
- self.logger.error("邮件发送失败")
- print(f"邮件发送失败,详见日志 {self.log_file}")
- else:
- send_result["成功"] = send_result.get("成功", 0) + 1
- self.logger.info("邮件发送成功")
- print("邮件发送成功")
- print("-" * 40, end="\n\n")
- finally:
- self.logger.info("")
- send_conclusion = (
- f"批量发送完成,成功发件 {send_result.get('成功', 0)} 封,失败 {send_result.get('失败', 0)} 封"
- )
- self.logger.info(send_conclusion)
- print("\n" + send_conclusion)
- self.logger.info(f"{Path(sys.argv[0]).stem}: 程序运行结束")
- self.logger.info("=" * 80)
- self.logger.info("")
- @staticmethod
- def parse_address(addr_str: str) -> List[Address]:
- """
- 将收件 / 抄送 / 密送组人员解析为 Address 列表
- """
- addr_list = []
- split_addrs = re.split(r"\s*[,;]\s*", addr_str)
- for addr in map(str.strip, split_addrs):
- # 匹配两种类型邮箱地址:
- # - display name <rec1@xx.com>
- # - rec2@xx.com
- mat = re.search(
- r"^(?P<display_name>.*?)\s*<(?P<comp_username>\S+)@(?P<comp_domain>\S+)>"
- r"|^(?P<username>\S+)@(?P<domain>\S+)",
- addr,
- re.VERBOSE,
- )
- # 错误地址类型,忽略
- if not mat:
- continue
- # 带有显示名称类型地址
- if mat["display_name"]:
- addr_list.append(
- Address(
- mat["display_name"], mat["comp_username"], mat["comp_domain"]
- )
- )
- # 单纯邮箱地址
- elif mat["username"]:
- addr_list.append(
- Address(username=mat["username"], domain=mat["domain"])
- )
- return tuple(addr_list)
- def send_email(
- self,
- server: smtplib.SMTP,
- sender: Address,
- receiver: List[Address],
- cc: List[Address],
- bcc: List[Address],
- subject: str,
- content: str,
- attach_files=None,
- ):
- """
- 单封邮件发送
- """
- msg = EmailMessage()
- msg["Subject"] = subject
- msg["From"] = sender
- msg["To"] = receiver
- if cc:
- msg["Cc"] = cc
- if bcc:
- msg["Bcc"] = bcc
- msg.set_content(content, charset="utf-8", subtype="html")
- for attach_file in attach_files:
- o_attach_file = Path(attach_file)
- # 附件路径需存在
- if not o_attach_file.is_file():
- continue
- ctype, encoding = mimetypes.guess_type(o_attach_file)
- if ctype is None or encoding is not None:
- ctype = "application/octet-stream"
- maintype, subtype = ctype.split("/", 1)
- msg.add_attachment(
- o_attach_file.read_bytes(),
- maintype=maintype,
- subtype=subtype,
- filename=o_attach_file.name,
- )
- try:
- server.send_message(msg)
- except Exception:
- traceback.print_exc()
- if __name__ == "__main__":
- print("\n")
- print(f"----------------------------------------------------------------")
- print(f" ")
- print(f" 邮件批量分发工具 ")
- print(f" ==================== ")
- print(f" ")
- print(f" 版本信息:{__version__}")
- print(f" 开发作者:{__author__}")
- print(f" 问题反馈:{__email__}")
- print(f" ")
- print(f"----------------------------------------------------------------")
- print("\n")
- if len(sys.argv) == 2:
- config_file = sys.argv[1]
- else:
- # 防止路径中包含空格报错
- while True:
- config_file = input("邮件配置文件路径:").strip("\" \t'")
- if config_file:
- break
- sender = BatchEmailSender(config_file=config_file)
- try:
- sender.run()
- except BatchEmailParseError as e:
- print(e)
- except Exception:
- traceback.print_exc()
- print(f"发生错误,详见日志:{Path(sender.log_file).resolve()}")
- print("\n按任意键继续. . . ", end="")
- os.system("pause > nul")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement