Only_calendar/calendar_ews_shifr.py

319 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import json
import os
import logging
import time
import re
from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QLabel, QLineEdit, QPushButton, QMessageBox, QInputDialog
from PyQt5.QtCore import QThread, pyqtSignal
from exchangelib import Q, DELEGATE, Account, Credentials, Configuration, HTMLBody, CalendarItem, EWSDateTime, EWSTimeZone
from mattermostdriver import Driver
from html2text import HTML2Text
from cryptography.fernet import Fernet
from PyQt5.QtGui import QClipboard
from datetime import timedelta
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Установка московского часового поя
moscow_tz = EWSTimeZone('Europe/Moscow')
last_notification_times = {}
class ScriptThread(QThread):
finished = pyqtSignal()
error = pyqtSignal(str)
def __init__(self, interval, key):
super().__init__()
self.interval = interval
self.key = key
self.running = True
def run(self):
while self.running:
try:
check_outlook_calendar_only(self.key) # Вызываем функцию только для календаря
except Exception as e:
self.error.emit(str(e))
time.sleep(self.interval)
logging.info("Script thread has been stopped.")
def stop(self):
self.running = False
class LoginWindow(QWidget):
def __init__(self):
super().__init__()
self.key = None
self.script_thread = None
if not os.path.exists('credentials.json'):
self.generate_and_request_key()
else:
self.request_key()
def generate_and_request_key(self):
self.key = Fernet.generate_key()
key_str = self.key.decode()
msg_box = QMessageBox()
msg_box.setWindowTitle('Новый ключ')
msg_box.setText(f'Ваш ключ шифрования: {key_str}\n\nСохраните его в безопасном месте!')
copy_button = QPushButton('Копировать ключ')
copy_button.clicked.connect(lambda: QApplication.clipboard().setText(key_str))
msg_box.layout().addWidget(copy_button)
msg_box.exec_()
self.request_credentials()
def request_key(self):
key, ok = QInputDialog.getText(self, 'Ввод ключа', 'Введите ваш ключ шифрования:')
if ok:
try:
self.key = key.encode()
self.initUI()
except Exception as e:
QMessageBox.warning(self, 'Ошибка', 'Неверный ключ. Попробуйте снова.')
self.request_key()
else:
sys.exit()
def request_credentials(self):
self.initUI()
def initUI(self):
layout = QVBoxLayout()
self.fields = {
'username': 'Username:',
'password': 'Password:',
'domain': 'Domain:',
'email': 'Email:',
'mattermost_token': 'Mattermost Token:',
'mattermost_url': 'Mattermost URL:',
'mattermost_channel_id': 'Mattermost Channel ID:',
'user_agent': 'User Agent:',
}
self.inputs = {}
for key, label in self.fields.items():
self.inputs[key] = QLineEdit()
layout.addWidget(QLabel(label))
layout.addWidget(self.inputs[key])
self.inputs['password'].setEchoMode(QLineEdit.Password)
self.inputs['mattermost_token'].setEchoMode(QLineEdit.Password)
self.start_button = QPushButton('Start')
self.start_button.clicked.connect(self.start_script)
layout.addWidget(self.start_button)
self.stop_button = QPushButton('Stop')
self.stop_button.setEnabled(False)
self.stop_button.clicked.connect(self.stop_script)
layout.addWidget(self.stop_button)
self.setLayout(layout)
self.setWindowTitle('Login')
self.setGeometry(300, 300, 300, 500)
self.load_existing_credentials()
self.show()
def encrypt_data(self, data):
f = Fernet(self.key)
return f.encrypt(data.encode()).decode()
def decrypt_data(self, data):
f = Fernet(self.key)
return f.decrypt(data.encode()).decode()
def load_existing_credentials(self):
if os.path.exists('credentials.json'):
try:
with open('credentials.json', 'r') as f:
encrypted_data = json.load(f)
decrypted_data = {k: self.decrypt_data(v) for k, v in encrypted_data.items()}
for key, value in decrypted_data.items():
if key in self.inputs:
self.inputs[key].setText(value)
logging.info("Existing credentials loaded successfully.")
except Exception as e:
QMessageBox.warning(self, 'Warning', f'Failed to load existing credentials: {str(e)}')
def validate_inputs(self):
for key, input_field in self.inputs.items():
if not input_field.text().strip():
return False, f'{self.fields[key]} cannot be empty'
return True, ''
def start_script(self):
is_valid, error_message = self.validate_inputs()
if not is_valid:
QMessageBox.warning(self, 'Validation Error', error_message)
return
credentials = {key: self.encrypt_data(input_field.text()) for key, input_field in self.inputs.items()}
with open('credentials.json', 'w') as f:
json.dump(credentials, f)
try:
logging.info("Starting the main script.")
self.script_thread = ScriptThread(interval=10, key=self.key)
self.script_thread.finished.connect(self.on_script_finished)
self.script_thread.error.connect(self.on_script_error)
self.script_thread.start()
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
except Exception as e:
QMessageBox.critical(self, 'Error', f'Failed to start the main script: {str(e)}')
def stop_script(self):
if self.script_thread and self.script_thread.isRunning():
logging.info("Stopping the script thread.")
self.script_thread.stop()
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
self.on_script_finished()
def on_script_finished(self):
logging.info("Script finished successfully.")
def on_script_error(self, error_message):
logging.error(f"Script error: {error_message}")
def load_credentials(filename: str, key: bytes) -> dict:
with open(filename, 'r') as file:
encrypted_data = json.load(file)
f = Fernet(key)
decrypted_data = {k: f.decrypt(v.encode()).decode() for k, v in encrypted_data.items()}
return decrypted_data
class TiMeManager(object):
def __init__(self, token: str, key: bytes):
credentials_data = load_credentials('credentials.json', key)
url = credentials_data['mattermost_url']
self.mmDriver = Driver(options={
'url': url,
'token': token,
'scheme': 'https',
'port': 443,
'headers': {"User-Agent": credentials_data['user_agent']},
'debug': False
})
self.mmDriver.login()
def post_message(self, channel_id: str, message: str):
post_log = self.mmDriver.posts.create_post(options={
'channel_id': channel_id,
'message': message
})
return post_log
def send_message_to_mattermost(message: str, key: bytes):
credentials_data = load_credentials('credentials.json', key)
token = credentials_data['mattermost_token']
channel_id = credentials_data['mattermost_channel_id']
time_driver = TiMeManager(token=token, key=key)
try:
post_log = time_driver.post_message(channel_id=channel_id, message=message)
logging.info(f"Message sent to Mattermost: {post_log['message']}")
except Exception as e:
logging.error(f"Failed to send message to Mattermost: {str(e)}")
def check_outlook_calendar(account, time_before_event, key: bytes):
"""Проверяет календарь Outlook на наличие встреч в ближайшее время и отправляет уведомления."""
global last_notification_times
moscow_tz = EWSTimeZone('Europe/Moscow')
now = EWSDateTime.now(tz=moscow_tz)
start = now
end = now + timedelta(minutes=time_before_event) + timedelta(minutes=1)
calendar_items = list(account.calendar.view(start=start, end=end))
for item in calendar_items:
if isinstance(item, CalendarItem):
time_diff = item.start - now
minutes_until_start = int(time_diff.total_seconds() / 60)
# Создаем уникальный идентификатор для встречи
meeting_id = f"{item.subject}_{item.start.strftime('%Y%m%d%H%M')}"
# Проверяем, было ли уже отправлено уведомление в последние 30 минут
if meeting_id in last_notification_times:
last_notification = last_notification_times[meeting_id]
if (EWSDateTime.now(tz=moscow_tz) - last_notification).total_seconds() < 1800:
logging.info(f"Уведомление о встрече '{item.subject}' уже было отправлено недавно, пропускаем.")
continue
if 0 <= minutes_until_start <= time_before_event:
subject = item.subject
start_time = item.start.astimezone(moscow_tz).strftime('%Y-%m-%d %H:%M:%S')
end_time = item.end.astimezone(moscow_tz).strftime('%Y-%m-%d %H:%M:%S')
message_body = (
f"@all Внимание! Скоро начнется встреча:\n"
f"Тема: {subject}\n"
f"Начало: {start_time}\n"
f"Окончание: {end_time}\n"
f"Место: {item.location}\n"
)
logging.info(f"Отправка уведомления о встрече: {subject} в {start_time}")
send_message_to_mattermost(message_body, key)
# Обновляем время последнего уведомления
last_notification_times[meeting_id] = EWSDateTime.now(tz=moscow_tz)
# Очищаем старые записи из словаря
current_time = EWSDateTime.now(tz=moscow_tz)
last_notification_times = {k: v for k, v in last_notification_times.items()
if (current_time - v.astimezone(moscow_tz)).total_seconds() < 3600} #удаляем старше часа
def check_outlook_calendar_only(key: bytes):
"""Функция для проверки только календаря."""
credentials_data = load_credentials('credentials.json', key)
username = credentials_data['username']
password = credentials_data['password']
domain = credentials_data['domain']
email = credentials_data['email']
try:
credentials = Credentials(username=f"{domain}\\{username}", password=password)
config = Configuration(server='mail.tinkoff.ru', credentials=credentials)
account = Account(primary_smtp_address=email,
config=config,
autodiscover=False,
access_type=DELEGATE)
# Проверяем календарь основного пользователя
check_outlook_calendar(account, 15, key)
except Exception as e:
logging.error(f"Ошибка при проверке календаря: {str(e)}")
if __name__ == '__main__':
app = QApplication(sys.argv)
login_window = LoginWindow()
sys.exit(app.exec_())