319 lines
12 KiB
Python
319 lines
12 KiB
Python
import sys
|
||
import json
|
||
import os
|
||
import logging
|
||
import time
|
||
import re
|
||
from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QLabel, QLineEdit, QPushButton, QMessageBox, QInputDialog
|
||
from PyQt5.QtCore import QThread, pyqtSignal
|
||
from exchangelib import Q, DELEGATE, Account, Credentials, Configuration, HTMLBody, CalendarItem, EWSDateTime, EWSTimeZone
|
||
from mattermostdriver import Driver
|
||
from html2text import HTML2Text
|
||
from cryptography.fernet import Fernet
|
||
from PyQt5.QtGui import QClipboard
|
||
from datetime import timedelta
|
||
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
|
||
# Установка московского часового поя
|
||
moscow_tz = EWSTimeZone('Europe/Moscow')
|
||
|
||
last_notification_times = {}
|
||
|
||
class ScriptThread(QThread):
|
||
finished = pyqtSignal()
|
||
error = pyqtSignal(str)
|
||
|
||
def __init__(self, interval, key):
|
||
super().__init__()
|
||
self.interval = interval
|
||
self.key = key
|
||
self.running = True
|
||
|
||
def run(self):
|
||
while self.running:
|
||
try:
|
||
check_outlook_calendar_only(self.key) # Вызываем функцию только для календаря
|
||
except Exception as e:
|
||
self.error.emit(str(e))
|
||
time.sleep(self.interval)
|
||
|
||
logging.info("Script thread has been stopped.")
|
||
|
||
def stop(self):
|
||
self.running = False
|
||
|
||
class LoginWindow(QWidget):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.key = None
|
||
self.script_thread = None
|
||
|
||
if not os.path.exists('credentials.json'):
|
||
self.generate_and_request_key()
|
||
else:
|
||
self.request_key()
|
||
|
||
def generate_and_request_key(self):
|
||
self.key = Fernet.generate_key()
|
||
key_str = self.key.decode()
|
||
|
||
msg_box = QMessageBox()
|
||
msg_box.setWindowTitle('Новый ключ')
|
||
msg_box.setText(f'Ваш ключ шифрования: {key_str}\n\nСохраните его в безопасном месте!')
|
||
|
||
copy_button = QPushButton('Копировать ключ')
|
||
copy_button.clicked.connect(lambda: QApplication.clipboard().setText(key_str))
|
||
|
||
msg_box.layout().addWidget(copy_button)
|
||
msg_box.exec_()
|
||
|
||
self.request_credentials()
|
||
|
||
def request_key(self):
|
||
key, ok = QInputDialog.getText(self, 'Ввод ключа', 'Введите ваш ключ шифрования:')
|
||
if ok:
|
||
try:
|
||
self.key = key.encode()
|
||
self.initUI()
|
||
except Exception as e:
|
||
QMessageBox.warning(self, 'Ошибка', 'Неверный ключ. Попробуйте снова.')
|
||
self.request_key()
|
||
else:
|
||
sys.exit()
|
||
|
||
def request_credentials(self):
|
||
self.initUI()
|
||
|
||
def initUI(self):
|
||
layout = QVBoxLayout()
|
||
self.fields = {
|
||
'username': 'Username:',
|
||
'password': 'Password:',
|
||
'domain': 'Domain:',
|
||
'email': 'Email:',
|
||
'mattermost_token': 'Mattermost Token:',
|
||
'mattermost_url': 'Mattermost URL:',
|
||
'mattermost_channel_id': 'Mattermost Channel ID:',
|
||
'user_agent': 'User Agent:',
|
||
}
|
||
|
||
self.inputs = {}
|
||
for key, label in self.fields.items():
|
||
self.inputs[key] = QLineEdit()
|
||
layout.addWidget(QLabel(label))
|
||
layout.addWidget(self.inputs[key])
|
||
|
||
self.inputs['password'].setEchoMode(QLineEdit.Password)
|
||
self.inputs['mattermost_token'].setEchoMode(QLineEdit.Password)
|
||
|
||
self.start_button = QPushButton('Start')
|
||
self.start_button.clicked.connect(self.start_script)
|
||
layout.addWidget(self.start_button)
|
||
|
||
self.stop_button = QPushButton('Stop')
|
||
self.stop_button.setEnabled(False)
|
||
self.stop_button.clicked.connect(self.stop_script)
|
||
layout.addWidget(self.stop_button)
|
||
|
||
self.setLayout(layout)
|
||
self.setWindowTitle('Login')
|
||
self.setGeometry(300, 300, 300, 500)
|
||
|
||
self.load_existing_credentials()
|
||
|
||
self.show()
|
||
|
||
def encrypt_data(self, data):
|
||
f = Fernet(self.key)
|
||
return f.encrypt(data.encode()).decode()
|
||
|
||
def decrypt_data(self, data):
|
||
f = Fernet(self.key)
|
||
return f.decrypt(data.encode()).decode()
|
||
|
||
def load_existing_credentials(self):
|
||
if os.path.exists('credentials.json'):
|
||
try:
|
||
with open('credentials.json', 'r') as f:
|
||
encrypted_data = json.load(f)
|
||
decrypted_data = {k: self.decrypt_data(v) for k, v in encrypted_data.items()}
|
||
for key, value in decrypted_data.items():
|
||
if key in self.inputs:
|
||
self.inputs[key].setText(value)
|
||
logging.info("Existing credentials loaded successfully.")
|
||
except Exception as e:
|
||
QMessageBox.warning(self, 'Warning', f'Failed to load existing credentials: {str(e)}')
|
||
|
||
def validate_inputs(self):
|
||
for key, input_field in self.inputs.items():
|
||
if not input_field.text().strip():
|
||
return False, f'{self.fields[key]} cannot be empty'
|
||
|
||
return True, ''
|
||
|
||
def start_script(self):
|
||
is_valid, error_message = self.validate_inputs()
|
||
|
||
if not is_valid:
|
||
QMessageBox.warning(self, 'Validation Error', error_message)
|
||
return
|
||
|
||
credentials = {key: self.encrypt_data(input_field.text()) for key, input_field in self.inputs.items()}
|
||
|
||
with open('credentials.json', 'w') as f:
|
||
json.dump(credentials, f)
|
||
|
||
try:
|
||
logging.info("Starting the main script.")
|
||
self.script_thread = ScriptThread(interval=10, key=self.key)
|
||
|
||
self.script_thread.finished.connect(self.on_script_finished)
|
||
self.script_thread.error.connect(self.on_script_error)
|
||
|
||
self.script_thread.start()
|
||
|
||
self.start_button.setEnabled(False)
|
||
self.stop_button.setEnabled(True)
|
||
|
||
except Exception as e:
|
||
QMessageBox.critical(self, 'Error', f'Failed to start the main script: {str(e)}')
|
||
|
||
def stop_script(self):
|
||
if self.script_thread and self.script_thread.isRunning():
|
||
logging.info("Stopping the script thread.")
|
||
self.script_thread.stop()
|
||
self.start_button.setEnabled(True)
|
||
self.stop_button.setEnabled(False)
|
||
|
||
self.on_script_finished()
|
||
|
||
def on_script_finished(self):
|
||
logging.info("Script finished successfully.")
|
||
|
||
def on_script_error(self, error_message):
|
||
logging.error(f"Script error: {error_message}")
|
||
|
||
def load_credentials(filename: str, key: bytes) -> dict:
|
||
with open(filename, 'r') as file:
|
||
encrypted_data = json.load(file)
|
||
|
||
f = Fernet(key)
|
||
decrypted_data = {k: f.decrypt(v.encode()).decode() for k, v in encrypted_data.items()}
|
||
return decrypted_data
|
||
|
||
class TiMeManager(object):
|
||
def __init__(self, token: str, key: bytes):
|
||
credentials_data = load_credentials('credentials.json', key)
|
||
url = credentials_data['mattermost_url']
|
||
|
||
self.mmDriver = Driver(options={
|
||
'url': url,
|
||
'token': token,
|
||
'scheme': 'https',
|
||
'port': 443,
|
||
'headers': {"User-Agent": credentials_data['user_agent']},
|
||
'debug': False
|
||
})
|
||
self.mmDriver.login()
|
||
|
||
def post_message(self, channel_id: str, message: str):
|
||
post_log = self.mmDriver.posts.create_post(options={
|
||
'channel_id': channel_id,
|
||
'message': message
|
||
})
|
||
return post_log
|
||
|
||
def send_message_to_mattermost(message: str, key: bytes):
|
||
credentials_data = load_credentials('credentials.json', key)
|
||
token = credentials_data['mattermost_token']
|
||
channel_id = credentials_data['mattermost_channel_id']
|
||
|
||
time_driver = TiMeManager(token=token, key=key)
|
||
|
||
try:
|
||
post_log = time_driver.post_message(channel_id=channel_id, message=message)
|
||
logging.info(f"Message sent to Mattermost: {post_log['message']}")
|
||
|
||
except Exception as e:
|
||
logging.error(f"Failed to send message to Mattermost: {str(e)}")
|
||
|
||
def check_outlook_calendar(account, time_before_event, key: bytes):
|
||
"""Проверяет календарь Outlook на наличие встреч в ближайшее время и отправляет уведомления."""
|
||
global last_notification_times
|
||
|
||
moscow_tz = EWSTimeZone('Europe/Moscow')
|
||
now = EWSDateTime.now(tz=moscow_tz)
|
||
start = now
|
||
end = now + timedelta(minutes=time_before_event) + timedelta(minutes=1)
|
||
|
||
calendar_items = list(account.calendar.view(start=start, end=end))
|
||
|
||
for item in calendar_items:
|
||
if isinstance(item, CalendarItem):
|
||
time_diff = item.start - now
|
||
minutes_until_start = int(time_diff.total_seconds() / 60)
|
||
|
||
# Создаем уникальный идентификатор для встречи
|
||
meeting_id = f"{item.subject}_{item.start.strftime('%Y%m%d%H%M')}"
|
||
|
||
# Проверяем, было ли уже отправлено уведомление в последние 30 минут
|
||
if meeting_id in last_notification_times:
|
||
last_notification = last_notification_times[meeting_id]
|
||
if (EWSDateTime.now(tz=moscow_tz) - last_notification).total_seconds() < 1800:
|
||
logging.info(f"Уведомление о встрече '{item.subject}' уже было отправлено недавно, пропускаем.")
|
||
continue
|
||
|
||
if 0 <= minutes_until_start <= time_before_event:
|
||
subject = item.subject
|
||
start_time = item.start.astimezone(moscow_tz).strftime('%Y-%m-%d %H:%M:%S')
|
||
end_time = item.end.astimezone(moscow_tz).strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
message_body = (
|
||
f"@all Внимание! Скоро начнется встреча:\n"
|
||
f"Тема: {subject}\n"
|
||
f"Начало: {start_time}\n"
|
||
f"Окончание: {end_time}\n"
|
||
f"Место: {item.location}\n"
|
||
)
|
||
|
||
logging.info(f"Отправка уведомления о встрече: {subject} в {start_time}")
|
||
send_message_to_mattermost(message_body, key)
|
||
|
||
# Обновляем время последнего уведомления
|
||
last_notification_times[meeting_id] = EWSDateTime.now(tz=moscow_tz)
|
||
|
||
# Очищаем старые записи из словаря
|
||
current_time = EWSDateTime.now(tz=moscow_tz)
|
||
last_notification_times = {k: v for k, v in last_notification_times.items()
|
||
if (current_time - v.astimezone(moscow_tz)).total_seconds() < 3600} #удаляем старше часа
|
||
|
||
def check_outlook_calendar_only(key: bytes):
|
||
"""Функция для проверки только календаря."""
|
||
credentials_data = load_credentials('credentials.json', key)
|
||
|
||
username = credentials_data['username']
|
||
password = credentials_data['password']
|
||
domain = credentials_data['domain']
|
||
email = credentials_data['email']
|
||
|
||
try:
|
||
credentials = Credentials(username=f"{domain}\\{username}", password=password)
|
||
config = Configuration(server='mail.tinkoff.ru', credentials=credentials)
|
||
|
||
account = Account(primary_smtp_address=email,
|
||
config=config,
|
||
autodiscover=False,
|
||
access_type=DELEGATE)
|
||
|
||
# Проверяем календарь основного пользователя
|
||
check_outlook_calendar(account, 15, key)
|
||
|
||
except Exception as e:
|
||
logging.error(f"Ошибка при проверке календаря: {str(e)}")
|
||
|
||
if __name__ == '__main__':
|
||
app = QApplication(sys.argv)
|
||
login_window = LoginWindow()
|
||
sys.exit(app.exec_())
|