Files
tkp_robot/email_processor.py

410 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from imapclient import IMAPClient
import email
from email.header import decode_header
from pathlib import Path
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import Select
from datetime import datetime, timedelta, date
import pyautogui
import time, sys
import re
import logging
from telegram import Bot
import asyncio
import traceback
from selenium.webdriver.common.action_chains import ActionChains
# Настройка ведения журнала
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/home/specit/email_processor.log'),
logging.StreamHandler()
]
)
# Конфигурация Telegram
TELEGRAM_BOT_TOKEN = '5374522720:AAGoNUYCEyJ-7-bSAQPT7aV_W2GWcinnkQU'
TELEGRAM_CHAT_ID = '394151541'
# Конфигурация Email
# EMAIL = 'seligenenko.a@maverik.ru'
# PASSWORD = '9Jufd22Mwvzp07dvpELn'
EMAIL = 'info@maverik.ru'
PASSWORD = 'HWPwY9Ty7QHiusubV31x'
IMAP_SERVER = 'mail.maverik.ru'
SMTP_SERVER = 'mail.maverik.ru'
FOLDER = 'INBOX/0СУБСИДИИ'
SAVE_DIRECTORY = Path(r"/opt/autorun/auto_tkp/files")
# SAVE_DIRECTORY = Path(r"\\192.168.0.13\obmen\ТКПСубсидии")
# Web Конфигурация
WEB_URL = 'https://www.tch.ru/ru-ru/Pages/Home.aspx'
WEB_USERNAME = '12ROV_kornienko'
WEB_PASSWORD = 'Rita20152015'
WEB_PASSWORD = 'Tkmaverik2025*1'
yesterday = date.today() - timedelta(days=2)
# yesterday = date.today()
async def send_telegram_notification(message):
try:
bot = Bot(token=TELEGRAM_BOT_TOKEN)
await bot.send_message(chat_id=TELEGRAM_CHAT_ID, text=message)
except Exception as e:
logging.error(f"Субсидии:\nНе удалось отправить уведомление Telegram: {str(e)}")
def handle_error(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
error_message = f"Субсидии:\nОшибка в {func.__name__}: {str(e)}\n{traceback.format_exc()}"
logging.error(error_message)
asyncio.run(send_telegram_notification(error_message))
raise
return wrapper
def connect_to_email():
server = IMAPClient(IMAP_SERVER, use_uid=True, ssl=True)
server.login(EMAIL, PASSWORD)
return server
def extract_number_from_subject(subject):
# Извлеките цифры из темы, например «субсидия 555 612086» или «субсидия555612086»
numbers = re.findall(r'\d+', subject)
if numbers and len(''.join(numbers)) > 9:
# return ' '.join(numbers)
# if len(''.join(numbers)) > 9:
print(''.join(numbers)[-10:])
return ''.join(numbers)[-10:]
return None
def decode_email_header(header):
"""Декодировать заголовок электронной почты для обработки различных кодировок"""
decoded_header = decode_header(header)
result = []
for part, encoding in decoded_header:
if isinstance(part, bytes):
if encoding:
result.append(part.decode(encoding))
else:
result.append(part.decode('utf-8', errors='replace'))
else:
result.append(part)
return ''.join(result)
@handle_error
def process_emails():
server = connect_to_email()
server.select_folder(FOLDER)
# Поиск непрочитанных сообщений
# messages = server.search(['UNSEEN'])
# Критерии поиска: непрочитанные сообщений перед текущей датой
# messages = server.search(['UNSEEN', 'BEFORE', yesterday.strftime('%d-%b-%Y')])
# Критерии поиска: письма, полученные вчера. Ищем письма конкретно за вчерашний день
messages = server.search(['ON', yesterday.strftime('%d-%b-%Y')])
processed_numbers = []
missing_files = []
error_extract_number = 0
multiple_attachments = [] # Список для хранения тем писем с несколькими вложениями
for uid, message_data in server.fetch(messages, ['RFC822']).items():
time.sleep(3)
email_message = email.message_from_bytes(message_data[b'RFC822'])
# Расшифровать тему
subject = decode_email_header(email_message['subject'])
number = extract_number_from_subject(subject)
if not number:
# Отметить сообщение как непрочитанное, если в теме не найдено число
server.remove_flags(uid, ['\\Seen'])
error_extract_number += 1
continue
# Количество вложений
attachment_count = 0
for part in email_message.walk():
if part.get_content_maintype() == 'multipart':
continue
if part.get('Content-Disposition') is None:
continue
if part.get_content_disposition() == 'inline':
continue
attachment_count += 1
# Пропустить, если вложений несколько
if attachment_count > 1:
multiple_attachments.append(subject)
server.remove_flags(uid, ['\\Seen']) # Отметить как непрочитанное
continue
has_attachment = False
for part in email_message.walk():
if part.get_content_maintype() == 'multipart':
continue
if part.get('Content-Disposition') is None:
continue
if part.get_content_disposition() == 'inline':
continue
filename = part.get_filename()
# Расшифровать имя файла, если оно закодировано
filename = decode_email_header(filename)
suffix = Path(filename).suffix if filename else ''
print('Имя файла:', filename)
if filename:
# Сохранение файл с номером из темы
new_filename = number + suffix
filepath = SAVE_DIRECTORY / new_filename
with open(filepath, 'wb') as f:
f.write(part.get_payload(decode=True))
# processed_numbers.append(number)
processed_numbers.append(new_filename)
has_attachment = True
break
if not has_attachment:
missing_files.append(number)
server.logout()
return processed_numbers, missing_files, error_extract_number, multiple_attachments
@handle_error
def upload_files_to_web(processed_numbers):
if not processed_numbers:
return 0, 0
disabled_passenger = [] # Пассажир инвалид
driver = webdriver.Chrome() # Убедитесь, что ChromeDriver установлен.
driver.get(WEB_URL)
driver.set_window_size(1920, 1080)
# Авторизоваться
username_field = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.NAME, "ctl00$PlaceHolderMain$lf_Login$tb_Login"))
)
password_field = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.NAME, "ctl00$PlaceHolderMain$lf_Login$lb_Pass"))
)
username_field.send_keys(WEB_USERNAME)
password_field.send_keys(WEB_PASSWORD)
driver.find_element(By.XPATH, "//input[@type='submit']").click()
# Перейдите к "Подтверждение документов"
next_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CLASS_NAME, "a-login"))
)
next_button.click()
next_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.LINK_TEXT, "Управление сетью продажи, контроль и анализ данных, обмен электронными документами"))
)
next_button.click()
next_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.LINK_TEXT, "Стандартный вход по протоколу HTTPS"))
)
next_button.click()
next_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.LINK_TEXT, "Организация продаж"))
)
next_button.click()
next_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.LINK_TEXT, "Подтверждающие документы"))
)
next_button.click()
successful_uploads = []
failed_uploads = []
for number in processed_numbers:
print('Обрабатывается: ', Path(number).stem)
try:
time.sleep(10)
dropdown = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.TAG_NAME, "select"))
)
select = Select(dropdown)
select.select_by_visible_text("Отправлено в АВС")
# Ввод номера
number_field = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "input[data-bind='value: filter.ticketNo']"))
)
number_field.clear()
time.sleep(1)
number_field.send_keys(Path(number).stem)
current_date_obj = datetime.now().date()
new_date_obj = current_date_obj - timedelta(days=9)
new_date_str = new_date_obj.strftime("%d.%m.%Y")
input_field = driver.find_element(By.CSS_SELECTOR, "input[data-bind='date: filter.modifyDateFrom']")
input_field.clear()
time.sleep(0.5)
input_field.click()
driver.execute_script("arguments[0].value = arguments[1];", input_field, new_date_str)
# input_field.send_keys(new_date_str)
time.sleep(4)
form_button = driver.find_element(By.XPATH, "//button[contains(text(), 'Сформировать')]")
# actions = ActionChains(driver)
# actions.move_to_element(form_button).perform() # Перемещение курсора к элементу
# actions.click().perform() # Клик
form_button.click()
# driver.execute_script("arguments[0].click();", form_button)
print('Сформировать')
time.sleep(10)
edit_link = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "a.i-edit"))
)
edit_link.click()
try:
# Скрытый родительский эелемент
# driver.find_element(By.XPATH, "//td[contains(text(),'Справка об инвалидности')]")
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, "//td[contains(text(),'Справка об инвалидности')]/parent::tr[contains(@style, 'display: none')]"))
)
# print('Инвалид: ', edit_link.text, edit_link.is_displayed())
# print('Инвалид: ', disabled.text, disabled.is_displayed(), number)
# driver.back()
# time.sleep(5)
# continue
except:
# print('Закрываю')
disabled_passenger.append(number)
# time.sleep(5)
driver.back()
continue
# pass
# Загрузить файл
upload_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "button[data-bind='visible: model.addButtonVisible, click: attach']"))
)
upload_button.click()
time.sleep(3)
# file_path = os.path.join(SAVE_DIRECTORY, f"{number}.pdf")
# file_input.send_keys(file_path)
# pyautogui.typewrite(os.path.join(SAVE_DIRECTORY, f"{number}.pdf"))
pyautogui.typewrite(str(SAVE_DIRECTORY / number))
time.sleep(1)
pyautogui.press('enter')
time.sleep(7)
# Проверяется, подгружен ли файл
try:
present_element = WebDriverWait(driver, 10).until(
EC.visibility_of_element_located((By.CSS_SELECTOR, "a[data-bind='text: fileName, attr: { href: \\'/Attachments/Get?fileId=\\' + attachmentId() }']"))
)
print(present_element)
except:
error_message = f"Ссылка на файл отсутствует."
logging.error(error_message)
pyautogui.press('esc')
pyautogui.press('esc')
failed_uploads.append(number)
driver.back()
continue
send_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "button[data-bind='click: $parent.action, text: actionName, visible: actionName() != null']"))
)
send_button.click()
modal_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "button[modal='true']"))
)
modal_button.click()
successful_uploads.append(number)
time.sleep(2)
except Exception as e:
print(f"Не удалось загрузить файл для номера {number}: {str(e)}")
failed_uploads.append(number)
driver.quit()
return successful_uploads, failed_uploads, disabled_passenger
@handle_error
def send_report_email(successful_uploads, failed_uploads, missing_files, error_extract_number, multiple_attachments, disabled_passenger):
msg = MIMEMultipart()
msg['From'] = EMAIL
# msg['To'] = EMAIL
msg['To'] = '109@maverik.ru'
msg['Subject'] = 'Отчет по обработке субсидий'
body = f"""
Отчет по обработке субсидий:
Успешно загружены файлы для номеров:
{', '.join(successful_uploads) if successful_uploads else 'Нет'}
Не удалось загрузить файлы для номеров:
{', '.join(failed_uploads) if failed_uploads else 'Нет'}
Отсутствуют файлы для номеров:
{', '.join(missing_files) if missing_files else 'Нет'}
Количество писем, в теме которых нет билета:
{error_extract_number if error_extract_number else 'Нет'}
Пропущены письма с несколькими вложениями:
{', '.join(multiple_attachments) if multiple_attachments else 'Нет'}
В билетах пассажир с инвалидностью:
{', '.join(disabled_passenger) if disabled_passenger else 'Нет'}
"""
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP(SMTP_SERVER)
server.starttls()
server.login(EMAIL, PASSWORD)
server.send_message(msg)
server.quit()
def main():
# Создайте папку для сохранения, если она не существует
SAVE_DIRECTORY.mkdir(parents=True, exist_ok=True)
try:
# Обработка электронных писем
processed_numbers, missing_files, error_extract_number, multiple_attachments = process_emails()
print(processed_numbers)
# Загрузить файлы на ресурс
successful_uploads, failed_uploads, disabled_passenger = upload_files_to_web(processed_numbers)
# Отправить отчет по электронной почте
send_report_email(successful_uploads, failed_uploads, missing_files, error_extract_number, multiple_attachments, disabled_passenger)
logging.info("Субсидии:\nСкрипт успешно завершен")
asyncio.run(send_telegram_notification("Субсидии:\nСкрипт успешно завершен"))
except Exception as e:
error_message = f"Субсидии:\nСкрипт не выполнен из-за ошибки: {str(e)}\n{traceback.format_exc()}"
logging.error(error_message)
asyncio.run(send_telegram_notification(error_message))
sys.exit(1)
if __name__ == "__main__":
main()