Добавить chek_shifr_mail

This commit is contained in:
serov 2025-03-11 16:54:00 +00:00
commit 115fa7263a

402
chek_shifr_mail Normal file
View File

@ -0,0 +1,402 @@
import sys
import json
import os
import logging
import time
import re
from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QLabel, QLineEdit, QPushButton, QMessageBox, QFileDialog, QInputDialog
from PyQt5.QtCore import QThread, pyqtSignal
from exchangelib import Q, DELEGATE, Account, Credentials, Configuration, HTMLBody, EWSTimeZone, Folder
from mattermostdriver import Driver
from html2text import HTML2Text
from cryptography.fernet import Fernet
from PyQt5.QtGui import QClipboard
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Установка московского часового пояса
EWS_MOSCOW_TZ = EWSTimeZone('Europe/Moscow')
CREDENTIALS_FILE = 'credentials.json'
class ScriptThread(QThread):
finished = pyqtSignal()
error = pyqtSignal(str)
def __init__(self, interval, key):
super().__init__()
self.interval = interval
self.key = key
self.running = True
def run(self):
while self.running:
try:
check_outlook_messages(self.key)
except Exception as e:
self.error.emit(str(e))
time.sleep(self.interval)
logging.info("Script thread has been stopped.")
def stop(self):
self.running = False
class LoginWindow(QWidget):
def __init__(self):
super().__init__()
self.key = None
self.script_thread = None
if not os.path.exists(CREDENTIALS_FILE):
self.generate_and_request_key()
else:
self.request_key()
def generate_and_request_key(self):
self.key = Fernet.generate_key()
key_str = self.key.decode()
msg_box = QMessageBox()
msg_box.setWindowTitle('Новый ключ')
msg_box.setText(f'Ваш ключ шифрования: {key_str}\n\nСохраните его в безопасном месте!')
copy_button = QPushButton('Копировать ключ')
copy_button.clicked.connect(lambda: QApplication.clipboard().setText(key_str))
msg_box.layout().addWidget(copy_button)
msg_box.exec_()
self.request_credentials()
def request_key(self):
key, ok = QInputDialog.getText(self, 'Ввод ключа', 'Введите ваш ключ шифрования:')
if ok:
try:
self.key = key.encode()
self.initUI()
except Exception as e:
QMessageBox.warning(self, 'Ошибка', 'Неверный ключ. Попробуйте снова.')
self.request_key()
else:
sys.exit()
def request_credentials(self):
self.initUI()
def initUI(self):
layout = QVBoxLayout()
self.fields = {
'username': 'Username:',
'password': 'Password:',
'domain': 'Domain:',
'email': 'Email:',
'mattermost_token': 'Mattermost Token:',
'mattermost_url': 'Mattermost URL:',
'mattermost_channel_id': 'Mattermost Channel ID:',
'user_agent': 'User Agent:',
}
self.inputs = {}
for key, label in self.fields.items():
self.inputs[key] = QLineEdit()
layout.addWidget(QLabel(label))
layout.addWidget(self.inputs[key])
self.inputs['password'].setEchoMode(QLineEdit.Password)
self.inputs['mattermost_token'].setEchoMode(QLineEdit.Password)
self.start_button = QPushButton('Start')
self.start_button.clicked.connect(self.start_script)
layout.addWidget(self.start_button)
self.stop_button = QPushButton('Stop')
self.stop_button.setEnabled(False)
self.stop_button.clicked.connect(self.stop_script)
layout.addWidget(self.stop_button)
self.setLayout(layout)
self.setWindowTitle('Login')
self.setGeometry(300, 300, 300, 500)
self.load_existing_credentials()
self.show()
def encrypt_data(self, data):
f = Fernet(self.key)
return f.encrypt(data.encode()).decode()
def decrypt_data(self, data):
f = Fernet(self.key)
return f.decrypt(data.encode()).decode()
def load_existing_credentials(self):
if os.path.exists(CREDENTIALS_FILE):
try:
with open(CREDENTIALS_FILE, 'r') as f:
encrypted_data = json.load(f)
decrypted_data = {k: self.decrypt_data(v) for k, v in encrypted_data.items()}
for key, value in decrypted_data.items():
if key in self.inputs:
self.inputs[key].setText(value)
logging.info("Existing credentials loaded successfully.")
except Exception as e:
QMessageBox.warning(self, 'Warning', f'Failed to load existing credentials: {str(e)}')
def validate_inputs(self):
for key, input_field in self.inputs.items():
if not input_field.text().strip():
return False, f'{self.fields[key]} cannot be empty'
return True, ''
def start_script(self):
is_valid, error_message = self.validate_inputs()
if not is_valid:
QMessageBox.warning(self, 'Validation Error', error_message)
return
credentials = {key: self.encrypt_data(input_field.text()) for key, input_field in self.inputs.items()}
with open(CREDENTIALS_FILE, 'w') as f:
json.dump(credentials, f)
try:
logging.info("Starting the main script.")
self.script_thread = ScriptThread(interval=10, key=self.key)
self.script_thread.finished.connect(self.on_script_finished)
self.script_thread.error.connect(self.on_script_error)
self.script_thread.start()
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
except Exception as e:
QMessageBox.critical(self, 'Error', f'Failed to start the main script: {str(e)}')
def stop_script(self):
if self.script_thread and self.script_thread.isRunning():
logging.info("Stopping the script thread.")
self.script_thread.stop()
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
self.on_script_finished()
def on_script_finished(self):
logging.info("Script finished successfully.")
def on_script_error(self, error_message):
logging.error(f"Script error: {error_message}")
QMessageBox.critical(self, 'Script Error', error_message)
def load_credentials(filename: str, key: bytes) -> dict:
with open(filename, 'r') as file:
encrypted_data = json.load(file)
f = Fernet(key)
decrypted_data = {k: f.decrypt(v.encode()).decode() for k, v in encrypted_data.items()}
del f
return decrypted_data
class TiMeManager(object):
def __init__(self, token: str, key: bytes):
credentials_data = load_credentials(CREDENTIALS_FILE, key)
url = credentials_data['mattermost_url']
self.mmDriver = Driver(options={
'url': url,
'token': token,
'scheme': 'https',
'port': 443,
'headers': {"User-Agent": credentials_data['user_agent']},
'debug': False
})
self.mmDriver.login()
def post_message(self, channel_id: str, message: str):
post_log = self.mmDriver.posts.create_post(options={
'channel_id': channel_id,
'message': message
})
return post_log
def format_text(text: str, words_to_bold) -> str:
for word in words_to_bold:
pattern = r'\b' + re.escape(word) + r'\b'
text = re.sub(pattern, f'**{word}**\n', text, flags=re.IGNORECASE)
return text
def extract_text_from_html(html_content: str) -> str:
h = HTML2Text()
h.ignore_links = False
h.ignore_images = True
h.ignore_emphasis = True
h.body_width = 0
text = h.handle(html_content)
lines = [line.strip() for line in text.split('\n') if line.strip()]
uniform_text = ' '.join(lines)
max_length = 4000
if len(uniform_text) > max_length:
uniform_text = uniform_text[:max_length] + "...\n(Содержимое сокращено)"
return uniform_text
def send_message_to_mattermost(message: str, key: bytes):
credentials_data = load_credentials(CREDENTIALS_FILE, key)
token = credentials_data['mattermost_token']
channel_id = credentials_data['mattermost_channel_id']
time_driver = TiMeManager(token=token, key=key)
try:
post_log = time_driver.post_message(channel_id=channel_id, message=message)
logging.info(f"Message sent to Mattermost: {post_log['message']}")
except Exception as e:
logging.error(f"Failed to send message to Mattermost: {str(e)}")
def move_message_to_mysd(account, message):
try:
# Ищем папку "mysd" среди всех доступных папок
mysd_folder = next((folder for folder in account.root.walk() if folder.name.lower() == "mysd"), None)
if not mysd_folder:
logging.error("Папка 'mysd' не найдена")
return
# Перемещаем сообщение в найденную папку
message.move(to_folder=mysd_folder)
logging.info(f"Сообщение '{message.subject}' успешно перемещено в папку 'mysd'")
except Exception as e:
logging.error(f"Ошибка при перемещении сообщения в папку 'mysd': {str(e)}")
def check_outlook_messages(key: bytes):
"""Проверка непрочитанных сообщений в Outlook и их отправка в Mattermost"""
try:
credentials_data = load_credentials(CREDENTIALS_FILE, key)
username = credentials_data['username']
password = credentials_data['password']
domain = credentials_data['domain']
email = credentials_data['email']
subject_filters = [
"Вы назначены ответственным по обращению",
"В вашу ответственность поступил запрос"
]
words_to_remove = [
"Группа", "Критичность", "Низкая", "Средняя", "Высокая", "Техническая поддержка сотрудников",
"Тема", "Вы назначены ответственным по обращению",
"Отдел технической поддержки пользователей",
"Ваш ответ на это письмо добавим комментарием к обращению",
"Forge — больше, чем платформа поддержки",
"Сведения об устройстве: Приложение:", "InformerModule", "Версия:", "v4.80",
"From: support_in support_in @ tinkoff.ru Sent:",
"PM To: Dmitriy Serov d.m.serov@tbank.ru Subject: [Forge] (19519859)",
"Назначил", "SupportAgent","Software"
]
words_to_bold = ["Описание","проблемы:", "Подробное",
"Инструкция для первой линии:",
"Рабочее время Решения в категории:",
"Сведения о сотруднике:", "Руководитель:",
"Почта для нотификации:", "Тип действия с ПО:",
"Какая ошибка возникает ?:", "Дополнительное описание:",
"Напишите","InformerModule v4.81"
]
try:
credentials = Credentials(username=f"{domain}\\{username}", password=password)
config = Configuration(server='mail.tinkoff.ru', credentials=credentials)
account = Account(primary_smtp_address=email,
config=config,
autodiscover=False,
access_type=DELEGATE)
# Добавляем новую функцию здесь
def add_at_to_logins(text: str) -> str:
text = re.sub(r'(Login: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
text = re.sub(r'(сотрудника: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
text = re.sub(r'(сотрудника )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
text = re.sub(r'(логин.: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
text = re.sub(r'(Инициатор: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
text = re.sub(r'(Логин: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
text = re.sub(r'(доступ\?: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
text = re.sub(r'(проблема\?: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
text = re.sub(r'(оператора: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
text = re.sub(r'(доступ:: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
text = re.sub(r'(dACL: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
text = re.sub(r'(wologin: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
text = re.sub(r'(Outlook?: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
return text
def format_text(text: str, words_to_bold: list) -> str:
for word in words_to_bold:
pattern = r'\b' + re.escape(word) + r'\b'
text = re.sub(pattern, f'\n**{word}**', text, flags=re.IGNORECASE)
return text
filter_query = Q(is_read=False) & (
Q(subject__contains=subject_filters[0]) |
Q(subject__contains=subject_filters[1])
)
unread_messages = account.inbox.filter(filter_query).order_by('-datetime_received')
for item in unread_messages:
message_datetime = item.datetime_received.astimezone(EWS_MOSCOW_TZ)
if isinstance(item.body, HTMLBody):
content = extract_text_from_html(item.body)
else:
content = item.text_body if hasattr(item, 'text_body') else str(item.body)
for word in words_to_remove:
content = content.replace(word, '')
content = format_text(content, words_to_bold)
content = re.sub(r'\|', '', content)
content = re.sub(r'-+', '', content)
content = re.sub(r'\s+', ' ', content).strip()
processed_content = add_at_to_logins(content)
message_body = (
f"Новое сообщение: @all\n"
f"Тема: {item.subject}\n"
f"От: {item.sender.email_address}\n"
f"Дата: {message_datetime.strftime('%Y-%m-%d %H:%M:%S')}\n\n"
f"Содержимое:\n{processed_content}"
)
logging.info(f"Найдено новое сообщение: {item.subject}")
send_message_to_mattermost(message_body, key)
item.is_read = True
item.save()
logging.info(f"Сообщение отмечено как прочитанное: {item.subject}")
move_message_to_mysd(account, item)
if not unread_messages:
logging.info("Новых сообщений не найдено")
except Exception as e:
logging.error(f"Ошибка при проверке сообщений Outlook: {str(e)}")
except FileNotFoundError:
logging.error("Файл с учетными данными не найден. Требуется повторный ввод данных.")
if __name__ == '__main__':
app = QApplication(sys.argv)
ex = LoginWindow()
sys.exit(app.exec_())