Добавить 11

This commit is contained in:
serov 2025-03-06 09:59:29 +00:00
commit 8a766d5fb0

319
11 Normal file
View File

@ -0,0 +1,319 @@
import sys
import json
import os
import logging
import time
import pytz
import re
from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QLabel, QLineEdit, QPushButton, QMessageBox, QInputDialog
from PyQt5.QtCore import QThread, pyqtSignal
from exchangelib import Q, DELEGATE, Account, Credentials, Configuration, HTMLBody, CalendarItem, EWSDateTime, EWSTimeZone
from mattermostdriver import Driver
from html2text import HTML2Text
from cryptography.fernet import Fernet
from PyQt5.QtGui import QClipboard
from datetime import datetime, timedelta
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Установка московского часового поя
moscow_tz = pytz.timezone('Europe/Moscow')
last_notification_times = {}
class ScriptThread(QThread):
finished = pyqtSignal()
error = pyqtSignal(str)
def __init__(self, interval, key):
super().__init__()
self.interval = interval
self.key = key
self.running = True
def run(self):
while self.running:
try:
check_outlook_calendar_only(self.key) # Вызываем функцию только для календаря
except Exception as e:
self.error.emit(str(e))
time.sleep(self.interval)
logging.info("Script thread has been stopped.")
def stop(self):
self.running = False
class LoginWindow(QWidget):
def __init__(self):
super().__init__()
self.key = None
self.script_thread = None
if not os.path.exists('credentials.json'):
self.generate_and_request_key()
else:
self.request_key()
def generate_and_request_key(self):
self.key = Fernet.generate_key()
key_str = self.key.decode()
msg_box = QMessageBox()
msg_box.setWindowTitle('Новый ключ')
msg_box.setText(f'Ваш ключ шифрования: {key_str}\n\nСохраните его в безопасном месте!')
copy_button = QPushButton('Копировать ключ')
copy_button.clicked.connect(lambda: QApplication.clipboard().setText(key_str))
msg_box.layout().addWidget(copy_button)
msg_box.exec_()
self.request_credentials()
def request_key(self):
key, ok = QInputDialog.getText(self, 'Ввод ключа', 'Введите ваш ключ шифрования:')
if ok:
try:
self.key = key.encode()
self.initUI()
except Exception as e:
QMessageBox.warning(self, 'Ошибка', 'Неверный ключ. Попробуйте снова.')
self.request_key()
else:
sys.exit()
def request_credentials(self):
self.initUI()
def initUI(self):
layout = QVBoxLayout()
self.fields = {
'username': 'Username:',
'password': 'Password:',
'domain': 'Domain:',
'email': 'Email:',
'mattermost_token': 'Mattermost Token:',
'mattermost_url': 'Mattermost URL:',
'mattermost_channel_id': 'Mattermost Channel ID:',
'user_agent': 'User Agent:',
}
self.inputs = {}
for key, label in self.fields.items():
self.inputs[key] = QLineEdit()
layout.addWidget(QLabel(label))
layout.addWidget(self.inputs[key])
self.inputs['password'].setEchoMode(QLineEdit.Password)
self.inputs['mattermost_token'].setEchoMode(QLineEdit.Password)
self.start_button = QPushButton('Start')
self.start_button.clicked.connect(self.start_script)
layout.addWidget(self.start_button)
self.stop_button = QPushButton('Stop')
self.stop_button.setEnabled(False)
self.stop_button.clicked.connect(self.stop_script)
layout.addWidget(self.stop_button)
self.setLayout(layout)
self.setWindowTitle('Login')
self.setGeometry(300, 300, 300, 500)
self.load_existing_credentials()
self.show()
def encrypt_data(self, data):
f = Fernet(self.key)
return f.encrypt(data.encode()).decode()
def decrypt_data(self, data):
f = Fernet(self.key)
return f.decrypt(data.encode()).decode()
def load_existing_credentials(self):
if os.path.exists('credentials.json'):
try:
with open('credentials.json', 'r') as f:
encrypted_data = json.load(f)
decrypted_data = {k: self.decrypt_data(v) for k, v in encrypted_data.items()}
for key, value in decrypted_data.items():
if key in self.inputs:
self.inputs[key].setText(value)
logging.info("Existing credentials loaded successfully.")
except Exception as e:
QMessageBox.warning(self, 'Warning', f'Failed to load existing credentials: {str(e)}')
def validate_inputs(self):
for key, input_field in self.inputs.items():
if not input_field.text().strip():
return False, f'{self.fields[key]} cannot be empty'
return True, ''
def start_script(self):
is_valid, error_message = self.validate_inputs()
if not is_valid:
QMessageBox.warning(self, 'Validation Error', error_message)
return
credentials = {key: self.encrypt_data(input_field.text()) for key, input_field in self.inputs.items()}
with open('credentials.json', 'w') as f:
json.dump(credentials, f)
try:
logging.info("Starting the main script.")
self.script_thread = ScriptThread(interval=10, key=self.key)
self.script_thread.finished.connect(self.on_script_finished)
self.script_thread.error.connect(self.on_script_error)
self.script_thread.start()
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
except Exception as e:
QMessageBox.critical(self, 'Error', f'Failed to start the main script: {str(e)}')
def stop_script(self):
if self.script_thread and self.script_thread.isRunning():
logging.info("Stopping the script thread.")
self.script_thread.stop()
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
self.on_script_finished()
def on_script_finished(self):
logging.info("Script finished successfully.")
def on_script_error(self, error_message):
logging.error(f"Script error: {error_message}")
def load_credentials(filename: str, key: bytes) -> dict:
with open(filename, 'r') as file:
encrypted_data = json.load(file)
f = Fernet(key)
decrypted_data = {k: f.decrypt(v.encode()).decode() for k, v in encrypted_data.items()}
return decrypted_data
class TiMeManager(object):
def __init__(self, token: str, key: bytes):
credentials_data = load_credentials('credentials.json', key)
url = credentials_data['mattermost_url']
self.mmDriver = Driver(options={
'url': url,
'token': token,
'scheme': 'https',
'port': 443,
'headers': {"User-Agent": credentials_data['user_agent']},
'debug': False
})
self.mmDriver.login()
def post_message(self, channel_id: str, message: str):
post_log = self.mmDriver.posts.create_post(options={
'channel_id': channel_id,
'message': message
})
return post_log
def send_message_to_mattermost(message: str, key: bytes):
credentials_data = load_credentials('credentials.json', key)
token = credentials_data['mattermost_token']
channel_id = credentials_data['mattermost_channel_id']
time_driver = TiMeManager(token=token, key=key)
try:
post_log = time_driver.post_message(channel_id=channel_id, message=message)
logging.info(f"Message sent to Mattermost: {post_log['message']}")
except Exception as e:
logging.error(f"Failed to send message to Mattermost: {str(e)}")
def check_outlook_calendar(account, time_before_event, key: bytes):
"""Проверяет календарь Outlook на наличие встреч в ближайшее время и отправляет уведомления."""
global last_notification_times
moscow_tz = EWSTimeZone('Europe/Moscow')
now = EWSDateTime.now(tz=moscow_tz)
start = now
end = now + timedelta(minutes=time_before_event) + timedelta(minutes=1)
calendar_items = list(account.calendar.view(start=start, end=end))
for item in calendar_items:
if isinstance(item, CalendarItem):
time_diff = item.start - now
minutes_until_start = int(time_diff.total_seconds() / 60)
# Создаем уникальный идентификатор для встречи
meeting_id = f"{item.subject}_{item.start.strftime('%Y%m%d%H%M')}"
# Проверяем, было ли уже отправлено уведомление в последние 30 минут
if meeting_id in last_notification_times:
last_notification = last_notification_times[meeting_id]
if (datetime.now() - last_notification).total_seconds() < 1800: # 1800 секунд = 30 минут
logging.info(f"Уведомление о встрече '{item.subject}' уже было отправлено недавно, пропускаем.")
continue
if 0 <= minutes_until_start <= time_before_event:
subject = item.subject
start_time = item.start.astimezone(pytz.timezone('Europe/Moscow')).strftime('%Y-%m-%d %H:%M:%S')
end_time = item.end.astimezone(pytz.timezone('Europe/Moscow')).strftime('%Y-%m-%d %H:%M:%S')
message_body = (
f"@all Внимание! Скоро начнется встреча:\n"
f"Тема: {subject}\n"
f"Начало: {start_time}\n"
f"Окончание: {end_time}\n"
f"Место: {item.location}\n"
)
logging.info(f"Отправка уведомления о встрече: {subject} в {start_time}")
send_message_to_mattermost(message_body, key)
# Обновляем время последнего уведомления
last_notification_times[meeting_id] = datetime.now()
# Очищаем старые записи из словаря
current_time = datetime.now()
last_notification_times = {k: v for k, v in last_notification_times.items()
if (current_time - v).total_seconds() < 3600} # Удаляем записи старше 1 часа
def check_outlook_calendar_only(key: bytes):
"""Функция для проверки только календаря."""
credentials_data = load_credentials('credentials.json', key)
username = credentials_data['username']
password = credentials_data['password']
domain = credentials_data['domain']
email = credentials_data['email']
try:
credentials = Credentials(username=f"{domain}\\{username}", password=password)
config = Configuration(server='mail.tinkoff.ru', credentials=credentials)
account = Account(primary_smtp_address=email,
config=config,
autodiscover=False,
access_type=DELEGATE)
# Проверяем календарь основного пользователя
check_outlook_calendar(account, 15, key)
except Exception as e:
logging.error(f"Ошибка при проверке календаря: {str(e)}")
if __name__ == '__main__':
app = QApplication(sys.argv)
login_window = LoginWindow()
sys.exit(app.exec_())