403 lines
16 KiB
Plaintext
403 lines
16 KiB
Plaintext
import sys
|
||
import json
|
||
import os
|
||
import logging
|
||
import time
|
||
import re
|
||
from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QLabel, QLineEdit, QPushButton, QMessageBox, QFileDialog, QInputDialog
|
||
from PyQt5.QtCore import QThread, pyqtSignal
|
||
from exchangelib import Q, DELEGATE, Account, Credentials, Configuration, HTMLBody, EWSTimeZone, Folder
|
||
from mattermostdriver import Driver
|
||
from html2text import HTML2Text
|
||
from cryptography.fernet import Fernet
|
||
from PyQt5.QtGui import QClipboard
|
||
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
|
||
# Установка московского часового пояса
|
||
EWS_MOSCOW_TZ = EWSTimeZone('Europe/Moscow')
|
||
CREDENTIALS_FILE = 'credentials.json'
|
||
|
||
class ScriptThread(QThread):
|
||
finished = pyqtSignal()
|
||
error = pyqtSignal(str)
|
||
|
||
def __init__(self, interval, key):
|
||
super().__init__()
|
||
self.interval = interval
|
||
self.key = key
|
||
self.running = True
|
||
|
||
def run(self):
|
||
while self.running:
|
||
try:
|
||
check_outlook_messages(self.key)
|
||
except Exception as e:
|
||
self.error.emit(str(e))
|
||
time.sleep(self.interval)
|
||
|
||
logging.info("Script thread has been stopped.")
|
||
|
||
def stop(self):
|
||
self.running = False
|
||
|
||
class LoginWindow(QWidget):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.key = None
|
||
self.script_thread = None
|
||
|
||
if not os.path.exists(CREDENTIALS_FILE):
|
||
self.generate_and_request_key()
|
||
else:
|
||
self.request_key()
|
||
|
||
def generate_and_request_key(self):
|
||
self.key = Fernet.generate_key()
|
||
key_str = self.key.decode()
|
||
|
||
msg_box = QMessageBox()
|
||
msg_box.setWindowTitle('Новый ключ')
|
||
msg_box.setText(f'Ваш ключ шифрования: {key_str}\n\nСохраните его в безопасном месте!')
|
||
|
||
copy_button = QPushButton('Копировать ключ')
|
||
copy_button.clicked.connect(lambda: QApplication.clipboard().setText(key_str))
|
||
|
||
msg_box.layout().addWidget(copy_button)
|
||
msg_box.exec_()
|
||
|
||
self.request_credentials()
|
||
|
||
def request_key(self):
|
||
key, ok = QInputDialog.getText(self, 'Ввод ключа', 'Введите ваш ключ шифрования:')
|
||
if ok:
|
||
try:
|
||
self.key = key.encode()
|
||
self.initUI()
|
||
except Exception as e:
|
||
QMessageBox.warning(self, 'Ошибка', 'Неверный ключ. Попробуйте снова.')
|
||
self.request_key()
|
||
else:
|
||
sys.exit()
|
||
|
||
def request_credentials(self):
|
||
self.initUI()
|
||
|
||
def initUI(self):
|
||
layout = QVBoxLayout()
|
||
self.fields = {
|
||
'username': 'Username:',
|
||
'password': 'Password:',
|
||
'domain': 'Domain:',
|
||
'email': 'Email:',
|
||
'mattermost_token': 'Mattermost Token:',
|
||
'mattermost_url': 'Mattermost URL:',
|
||
'mattermost_channel_id': 'Mattermost Channel ID:',
|
||
'user_agent': 'User Agent:',
|
||
}
|
||
|
||
self.inputs = {}
|
||
for key, label in self.fields.items():
|
||
self.inputs[key] = QLineEdit()
|
||
layout.addWidget(QLabel(label))
|
||
layout.addWidget(self.inputs[key])
|
||
|
||
self.inputs['password'].setEchoMode(QLineEdit.Password)
|
||
self.inputs['mattermost_token'].setEchoMode(QLineEdit.Password)
|
||
|
||
self.start_button = QPushButton('Start')
|
||
self.start_button.clicked.connect(self.start_script)
|
||
layout.addWidget(self.start_button)
|
||
|
||
self.stop_button = QPushButton('Stop')
|
||
self.stop_button.setEnabled(False)
|
||
self.stop_button.clicked.connect(self.stop_script)
|
||
layout.addWidget(self.stop_button)
|
||
|
||
self.setLayout(layout)
|
||
self.setWindowTitle('Login')
|
||
self.setGeometry(300, 300, 300, 500)
|
||
|
||
self.load_existing_credentials()
|
||
|
||
self.show()
|
||
|
||
def encrypt_data(self, data):
|
||
f = Fernet(self.key)
|
||
return f.encrypt(data.encode()).decode()
|
||
|
||
def decrypt_data(self, data):
|
||
f = Fernet(self.key)
|
||
return f.decrypt(data.encode()).decode()
|
||
|
||
def load_existing_credentials(self):
|
||
if os.path.exists(CREDENTIALS_FILE):
|
||
try:
|
||
with open(CREDENTIALS_FILE, 'r') as f:
|
||
encrypted_data = json.load(f)
|
||
decrypted_data = {k: self.decrypt_data(v) for k, v in encrypted_data.items()}
|
||
for key, value in decrypted_data.items():
|
||
if key in self.inputs:
|
||
self.inputs[key].setText(value)
|
||
logging.info("Existing credentials loaded successfully.")
|
||
except Exception as e:
|
||
QMessageBox.warning(self, 'Warning', f'Failed to load existing credentials: {str(e)}')
|
||
|
||
def validate_inputs(self):
|
||
for key, input_field in self.inputs.items():
|
||
if not input_field.text().strip():
|
||
return False, f'{self.fields[key]} cannot be empty'
|
||
|
||
return True, ''
|
||
|
||
def start_script(self):
|
||
is_valid, error_message = self.validate_inputs()
|
||
|
||
if not is_valid:
|
||
QMessageBox.warning(self, 'Validation Error', error_message)
|
||
return
|
||
|
||
credentials = {key: self.encrypt_data(input_field.text()) for key, input_field in self.inputs.items()}
|
||
|
||
with open(CREDENTIALS_FILE, 'w') as f:
|
||
json.dump(credentials, f)
|
||
|
||
try:
|
||
logging.info("Starting the main script.")
|
||
self.script_thread = ScriptThread(interval=10, key=self.key)
|
||
|
||
self.script_thread.finished.connect(self.on_script_finished)
|
||
self.script_thread.error.connect(self.on_script_error)
|
||
|
||
self.script_thread.start()
|
||
|
||
self.start_button.setEnabled(False)
|
||
self.stop_button.setEnabled(True)
|
||
|
||
except Exception as e:
|
||
QMessageBox.critical(self, 'Error', f'Failed to start the main script: {str(e)}')
|
||
|
||
def stop_script(self):
|
||
if self.script_thread and self.script_thread.isRunning():
|
||
logging.info("Stopping the script thread.")
|
||
self.script_thread.stop()
|
||
self.start_button.setEnabled(True)
|
||
self.stop_button.setEnabled(False)
|
||
|
||
self.on_script_finished()
|
||
|
||
def on_script_finished(self):
|
||
logging.info("Script finished successfully.")
|
||
|
||
def on_script_error(self, error_message):
|
||
logging.error(f"Script error: {error_message}")
|
||
QMessageBox.critical(self, 'Script Error', error_message)
|
||
|
||
def load_credentials(filename: str, key: bytes) -> dict:
|
||
with open(filename, 'r') as file:
|
||
encrypted_data = json.load(file)
|
||
|
||
f = Fernet(key)
|
||
decrypted_data = {k: f.decrypt(v.encode()).decode() for k, v in encrypted_data.items()}
|
||
del f
|
||
return decrypted_data
|
||
|
||
class TiMeManager(object):
|
||
def __init__(self, token: str, key: bytes):
|
||
credentials_data = load_credentials(CREDENTIALS_FILE, key)
|
||
url = credentials_data['mattermost_url']
|
||
|
||
self.mmDriver = Driver(options={
|
||
'url': url,
|
||
'token': token,
|
||
'scheme': 'https',
|
||
'port': 443,
|
||
'headers': {"User-Agent": credentials_data['user_agent']},
|
||
'debug': False
|
||
})
|
||
self.mmDriver.login()
|
||
|
||
def post_message(self, channel_id: str, message: str):
|
||
post_log = self.mmDriver.posts.create_post(options={
|
||
'channel_id': channel_id,
|
||
'message': message
|
||
})
|
||
return post_log
|
||
|
||
def format_text(text: str, words_to_bold) -> str:
|
||
for word in words_to_bold:
|
||
pattern = r'\b' + re.escape(word) + r'\b'
|
||
text = re.sub(pattern, f'**{word}**\n', text, flags=re.IGNORECASE)
|
||
return text
|
||
|
||
def extract_text_from_html(html_content: str) -> str:
|
||
h = HTML2Text()
|
||
h.ignore_links = False
|
||
h.ignore_images = True
|
||
h.ignore_emphasis = True
|
||
h.body_width = 0
|
||
|
||
text = h.handle(html_content)
|
||
lines = [line.strip() for line in text.split('\n') if line.strip()]
|
||
uniform_text = ' '.join(lines)
|
||
|
||
max_length = 4000
|
||
if len(uniform_text) > max_length:
|
||
uniform_text = uniform_text[:max_length] + "...\n(Содержимое сокращено)"
|
||
|
||
return uniform_text
|
||
|
||
def send_message_to_mattermost(message: str, key: bytes):
|
||
credentials_data = load_credentials(CREDENTIALS_FILE, key)
|
||
token = credentials_data['mattermost_token']
|
||
channel_id = credentials_data['mattermost_channel_id']
|
||
|
||
time_driver = TiMeManager(token=token, key=key)
|
||
|
||
try:
|
||
post_log = time_driver.post_message(channel_id=channel_id, message=message)
|
||
logging.info(f"Message sent to Mattermost: {post_log['message']}")
|
||
|
||
except Exception as e:
|
||
logging.error(f"Failed to send message to Mattermost: {str(e)}")
|
||
|
||
def move_message_to_mysd(account, message):
|
||
try:
|
||
# Ищем папку "mysd" среди всех доступных папок
|
||
mysd_folder = next((folder for folder in account.root.walk() if folder.name.lower() == "mysd"), None)
|
||
|
||
if not mysd_folder:
|
||
logging.error("Папка 'mysd' не найдена")
|
||
return
|
||
|
||
# Перемещаем сообщение в найденную папку
|
||
message.move(to_folder=mysd_folder)
|
||
logging.info(f"Сообщение '{message.subject}' успешно перемещено в папку 'mysd'")
|
||
except Exception as e:
|
||
logging.error(f"Ошибка при перемещении сообщения в папку 'mysd': {str(e)}")
|
||
|
||
def check_outlook_messages(key: bytes):
|
||
"""Проверка непрочитанных сообщений в Outlook и их отправка в Mattermost"""
|
||
try:
|
||
credentials_data = load_credentials(CREDENTIALS_FILE, key)
|
||
|
||
username = credentials_data['username']
|
||
password = credentials_data['password']
|
||
domain = credentials_data['domain']
|
||
email = credentials_data['email']
|
||
|
||
subject_filters = [
|
||
"Вы назначены ответственным по обращению",
|
||
"В вашу ответственность поступил запрос"
|
||
]
|
||
|
||
words_to_remove = [
|
||
"Группа", "Критичность", "Низкая", "Средняя", "Высокая", "Техническая поддержка сотрудников",
|
||
"Тема", "Вы назначены ответственным по обращению",
|
||
"Отдел технической поддержки пользователей",
|
||
"Ваш ответ на это письмо добавим комментарием к обращению",
|
||
"Forge — больше, чем платформа поддержки",
|
||
"Сведения об устройстве: Приложение:", "InformerModule", "Версия:", "v4.80",
|
||
"From: support_in support_in @ tinkoff.ru Sent:",
|
||
"PM To: Dmitriy Serov d.m.serov@tbank.ru Subject: [Forge] (19519859)",
|
||
"Назначил", "SupportAgent","Software"
|
||
]
|
||
|
||
words_to_bold = ["Описание","проблемы:", "Подробное",
|
||
"Инструкция для первой линии:",
|
||
"Рабочее время Решения в категории:",
|
||
"Сведения о сотруднике:", "Руководитель:",
|
||
"Почта для нотификации:", "Тип действия с ПО:",
|
||
"Какая ошибка возникает ?:", "Дополнительное описание:",
|
||
"Напишите","InformerModule v4.81"
|
||
]
|
||
|
||
try:
|
||
credentials = Credentials(username=f"{domain}\\{username}", password=password)
|
||
config = Configuration(server='mail.tinkoff.ru', credentials=credentials)
|
||
|
||
account = Account(primary_smtp_address=email,
|
||
config=config,
|
||
autodiscover=False,
|
||
access_type=DELEGATE)
|
||
|
||
# Добавляем новую функцию здесь
|
||
def add_at_to_logins(text: str) -> str:
|
||
text = re.sub(r'(Login: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
|
||
text = re.sub(r'(сотрудника: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
|
||
text = re.sub(r'(сотрудника )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
|
||
text = re.sub(r'(логин.: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
|
||
text = re.sub(r'(Инициатор: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
|
||
text = re.sub(r'(Логин: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
|
||
text = re.sub(r'(доступ\?: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
|
||
text = re.sub(r'(проблема\?: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
|
||
text = re.sub(r'(оператора: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
|
||
text = re.sub(r'(доступ:: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
|
||
text = re.sub(r'(dACL: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
|
||
text = re.sub(r'(wologin: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
|
||
text = re.sub(r'(Outlook?: )([a-zA-Z]+\.[a-zA-Z]+)', r'\1@\2', text)
|
||
return text
|
||
|
||
def format_text(text: str, words_to_bold: list) -> str:
|
||
for word in words_to_bold:
|
||
pattern = r'\b' + re.escape(word) + r'\b'
|
||
text = re.sub(pattern, f'\n**{word}**', text, flags=re.IGNORECASE)
|
||
return text
|
||
|
||
filter_query = Q(is_read=False) & (
|
||
Q(subject__contains=subject_filters[0]) |
|
||
Q(subject__contains=subject_filters[1])
|
||
)
|
||
|
||
unread_messages = account.inbox.filter(filter_query).order_by('-datetime_received')
|
||
|
||
for item in unread_messages:
|
||
message_datetime = item.datetime_received.astimezone(EWS_MOSCOW_TZ)
|
||
|
||
if isinstance(item.body, HTMLBody):
|
||
content = extract_text_from_html(item.body)
|
||
else:
|
||
content = item.text_body if hasattr(item, 'text_body') else str(item.body)
|
||
|
||
for word in words_to_remove:
|
||
content = content.replace(word, '')
|
||
|
||
content = format_text(content, words_to_bold)
|
||
content = re.sub(r'\|', '', content)
|
||
content = re.sub(r'-+', '', content)
|
||
content = re.sub(r'\s+', ' ', content).strip()
|
||
|
||
processed_content = add_at_to_logins(content)
|
||
|
||
message_body = (
|
||
f"Новое сообщение: @all\n"
|
||
f"Тема: {item.subject}\n"
|
||
f"От: {item.sender.email_address}\n"
|
||
f"Дата: {message_datetime.strftime('%Y-%m-%d %H:%M:%S')}\n\n"
|
||
f"Содержимое:\n{processed_content}"
|
||
)
|
||
|
||
logging.info(f"Найдено новое сообщение: {item.subject}")
|
||
|
||
send_message_to_mattermost(message_body, key)
|
||
|
||
item.is_read = True
|
||
item.save()
|
||
logging.info(f"Сообщение отмечено как прочитанное: {item.subject}")
|
||
|
||
move_message_to_mysd(account, item)
|
||
|
||
if not unread_messages:
|
||
logging.info("Новых сообщений не найдено")
|
||
|
||
except Exception as e:
|
||
logging.error(f"Ошибка при проверке сообщений Outlook: {str(e)}")
|
||
|
||
except FileNotFoundError:
|
||
logging.error("Файл с учетными данными не найден. Требуется повторный ввод данных.")
|
||
|
||
if __name__ == '__main__':
|
||
app = QApplication(sys.argv)
|
||
ex = LoginWindow()
|
||
sys.exit(app.exec_())
|