Вы сказали:
Холст import requests
import time
import json
import pygetwindow as gw
from ratelimit import limits, sleep_and_retry
from selenium.webdriver.common.action_chains import ActionChains
import re
from time import sleep
from loguru import logger
import urllib.parse
import threading
import traceback
import concurrent.futures
import os
import customtkinter as ctk
from tkinter import Menu
import threading
import uuid
import random
from threading import Lock
from selenium import webdriver
from tabulate import tabulate
from selenium.webdriver.chrome.service import Service as ChromeService
from seleniumwire import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.common.exceptions import WebDriverException
from selenium.webdriver.support import expected_conditions as EC
logger = logger.bind(name="main")
logger.add("file.log", format="{time} {level} {message}", level="DEBUG")
logger.info("Запустили скрипт !")
# Класс для взаимодействия с Airtable
class SaleBotAPI:
def __init__(self):
self.api_key = "8cb45340568c5bf8958215827b419882"
self.current_whatsapp_bot_id = None # Текущий выбранный канал WhatsApp
self.whatsapp_channels = {} # Словарь каналов WhatsApp
self.clients = {} # Словарь клиентов
def check_whatsapp_number(self, phone):
"""
Проверяет номер телефона в WhatsApp через Salebot API.
:param phone: Номер телефона для проверки.
:return: Ответ API с результатом проверки.
"""
url = f"https://chatter.salebot.pro/api/{self.api_key}/check_whatsapp"
payload = {
"phone": phone
}
try:
logger.info(f"POST запрос к {url} с параметрами {payload}")
response = requests.post(url, json=payload)
response.raise_for_status() # Проверяем статус ответа
data = response.json()
logger.info(f"Ответ сервера: {data}")
return data
except requests.exceptions.RequestException as e:
logger.error(f"Ошибка при выполнении запроса: {e}")
return None
def get_whatsapp_client_id(self, phone, group_id):
"""
Выполняет запрос для получения идентификатора клиента по номеру телефона и группе.
:param phone: Номер телефона клиента в WhatsApp.
:param group_id: Идентификатор группы (канала).
:return: Идентификатор клиента или None, если клиент не найден.
"""
url = f"https://chatter.salebot.pro/api/{self.api_key}/whatsapp_client_id"
params = {
"phone": phone,
"group_id": group_id
}
try:
logger.info(f"GET запрос к {url} с параметрами {params}")
response = requests.get(url, params=params)
if response.status_code == 404:
logger.warning(f"Клиент с номером телефона {phone} в группе {group_id} не найден.")
return None
response.raise_for_status()
data = response.json()
logger.info(f"Ответ сервера: {data}")
return data.get("id") # Возвращаем идентификатор клиента, если он есть в ответе
except requests.exceptions.RequestException as e:
logger.error(f"Ошибка при выполнении запроса: {e}")
return None
def find_client_by_platform_and_group(self, platform_ids, group_id):
"""
Ищет клиентов по идентификаторам платформы и группе (каналу).
:param platform_ids: Список идентификаторов платформ.
:param group_id: Идентификатор группы (канала).
:return: Список найденных клиентов или None.
"""
if not platform_ids or not isinstance(platform_ids, list):
logger.error("platform_ids должен быть списком идентификаторов платформ.")
return None
url = f"https://chatter.salebot.pro/api/{self.api_key}/find_client_id_by_platform_id"
payload = {
"platform_ids": platform_ids,
"group_id": group_id
}
response = self.make_request("post", url, json_data=payload)
if response and isinstance(response, list):
logger.info(f"Найдено {len(response)} клиентов для группы {group_id}.")
return response
else:
logger.warning(f"Не удалось найти клиентов для группы {group_id}. Ответ: {response}")
return None
def make_request(self, method, url, params=None, data=None, json_data=None, retries=1, delay=1):
"""Выполняет запрос с логикой повторных попыток."""
for attempt in range(1, retries + 1):
try:
logger.info(
f"Попытка {attempt}: {method.upper()} запрос к {url} с params={params}, data={data}, json={json_data}")
if method == "get":
response = requests.get(url, params=params)
elif method == "post":
response = requests.post(url, data=data, json=json_data)
else:
raise ValueError(f"Неподдерживаемый метод HTTP: {method}")
# logger.debug(f"Ответ сервера (status_code={response.status_code}): {response.text}")
response.raise_for_status()
try:
return response.json()
except json.JSONDecodeError:
logger.error(f"Не удалось преобразовать ответ в JSON: {response.text}")
return {"error": "Invalid JSON response", "response_text": response.text}
except requests.exceptions.RequestException as e:
logger.error(f"Ошибка при выполнении запроса: {e}")
if attempt < retries:
logger.info(f"Повторная попытка через {delay} секунд...")
sleep(delay)
else:
logger.error(f"Все {retries} попытки завершились неудачей.")
return None
def load_whatsapp_channels(self):
"""Загружает подключенные каналы WhatsApp и сохраняет их в словарь."""
url = f"https://chatter.salebot.pro/api/{self.api_key}/connected_channels"
response = self.make_request("get", url)
if response and "whatsapp" in response:
for channel in response["whatsapp"]:
self.whatsapp_channels[channel["name"]] = channel
logger.info(f"Загружено {len(self.whatsapp_channels)} каналов WhatsApp.")
else:
logger.warning("Не удалось загрузить каналы WhatsApp.")
def set_current_whatsapp_channel(self, channel_identifier):
"""
Устанавливает текущий канал WhatsApp по идентификатору.
channel_identifier может быть номером (name) или id.
"""
if isinstance(channel_identifier, str):
channel = self.whatsapp_channels.get(channel_identifier)
elif isinstance(channel_identifier, int):
channel = next((ch for ch in self.whatsapp_channels.values() if ch["id"] == channel_identifier), None)
else:
logger.error("Неверный тип channel_identifier. Используйте строку (номер) или число (id).")
return False
if channel:
self.current_whatsapp_bot_id = channel["id"]
logger.info(f"Текущий канал WhatsApp установлен: id={self.current_whatsapp_bot_id}, name={channel['name']}")
return True
else:
logger.warning("Канал WhatsApp не найден.")
return False
def get_all_clients(self, list_id=None, reverse=False):
"""
Получение всех клиентов из API SaleBot с использованием смещения (offset).
"""
all_clients = []
offset = 0
limit = 500 # Максимальное количество клиентов за один запрос
while True:
url = f"https://chatter.salebot.pro/api/{self.api_key}/get_clients"
params = {
"offset": offset,
"limit": limit,
"reverse": int(reverse) # Преобразование булевого значения в int (0 или 1)
}
if list_id is not None:
params["list"] = list_id
response = self.make_request("get", url, params=params)
if response and "clients" in response and response["clients"]:
logger.info(f"Загружено {len(response['clients'])} клиентов с offset={offset}.")
all_clients.extend(response["clients"])
offset += limit # Увеличиваем смещение
else:
logger.info("Данные клиентов больше не доступны или загрузка завершена.")
break
logger.info(f"Всего загружено {len(all_clients)} клиентов.")
return all_clients
def send_whatsapp_message(self, phone, message):
"""Отправка сообщения на указанный телефон с текущего канала WhatsApp."""
if not self.current_whatsapp_bot_id:
logger.error("Не установлен текущий канал WhatsApp. Сначала вызовите set_current_whatsapp_channel.")
return None
url = f"https://chatter.salebot.pro/api/{self.api_key}/whatsapp_message"
payload = {
"whatsapp_bot_id": self.current_whatsapp_bot_id,
"phone": phone,
"message": message
}
return self.make_request("post", url, json_data=payload)
def find_client_id_by_phone(self, phone):
"""Поиск client_id по номеру телефона и сохранение в словарь клиентов."""
if phone in self.clients:
logger.info(f"Клиент {phone} уже загружен.")
return self.clients[phone]
url = f"https://chatter.salebot.pro/api/{self.api_key}/find_client_id_by_phone"
params = {"phone": phone}
response = self.make_request("get", url, params=params)
if response and "client_id" in response:
self.clients[phone] = response
logger.info(f"Клиент {phone} добавлен в локальный словарь.")
return response
else:
logger.warning(f"Не удалось найти client_id для телефона {phone}.")
return None
def get_history_for_phone_and_channel(self, client_phone, channel_phone, limit=2000):
"""
Получает историю сообщений для указанного телефона клиента, канала и идентификатора канала.
"""
# Проверяем и устанавливаем текущий канал
if not self.set_current_whatsapp_channel(channel_phone):
logger.error(f"Не удалось установить канал WhatsApp: {channel_phone}")
return None
# Находим клиента с указанным телефоном в указанной группе (канале)
clients = self.get_all_clients(reverse=True)
logger.info(f"Поиск клиента с телефоном {client_phone} и группой {self.current_whatsapp_bot_id}")
matching_clients = [
client for client in clients
if str(client["platform_id"]) == str(client_phone) and str(client["group"]) == str(
self.current_whatsapp_bot_id)
]
if not matching_clients:
logger.warning(f"Клиент с телефоном {client_phone} в канале {channel_phone} не найден.")
return None
# Получаем историю для каждого подходящего клиента
histories = {}
for client in matching_clients:
client_id = client["id"]
logger.info(f"Запрашиваем историю для клиента {client_phone} (ID: {client_id}) в канале {channel_phone}")
url = f"https://chatter.salebot.pro/api/{self.api_key}/get_history"
params = {
"client_id": client_id,
"limit": limit
}
history = self.make_request("get", url, params=params)
histories[client_id] = history or {}
return histories
def get_new_messages(self):
# Получение списка блоков из схемы бота
url = f"https://chatter.salebot.pro/api/{self.api_key}/get_messages"
return self.make_request("get", url)
def send_message_by_client_id(self, client_id, message):
"""Отправка сообщения клиенту по его client_id."""
url = f"https://chatter.salebot.pro/api/{self.api_key}/message"
payload = {
"client_id": client_id,
"message": message
}
return self.make_request("post", url, json_data=payload)
class ChannelManagerGUI:
def __init__(self, channel_manager):
"""
Инициализация GUI для ChannelManager.
:param channel_manager: Объект ChannelManager для управления каналами.
"""
self.channel_manager = channel_manager
self.root = None
self.phone_entry = None
self.monitoring_label = None
def activate_window(self, window_title):
"""Активирует окно с заданным заголовком."""
try:
# Получаем список всех окон
windows = gw.getWindowsWithTitle(window_title)
if windows:
# Активируем первое найденное окно с этим заголовком
windows[0].activate()
logger.info(f"Окно '{window_title}' активировано.")
else:
self.update_monitoring_label(f"Окно '{window_title}' не найдено.", "red")
except Exception as e:
logger.error(f"Ошибка при активации окна '{window_title}': {e}")
def create_gui(self):
"""Создает GUI."""
ctk.set_appearance_mode("dark") # Темная тема
ctk.set_default_color_theme("blue") # Цветовая тема
self.root = ctk.CTk()
self.root.title("Управление каналами")
self.root.geometry("600x400") # Уменьшаем высоту окна до 400
# Поле ввода для номера телефона
phone_label = ctk.CTkLabel(
self.root,
text="Введите номер телефона:",
font=('Helvetica', 16)
)
phone_label.pack(pady=(10, 0))
self.phone_entry = ctk.CTkEntry(
self.root,
placeholder_text="Номер телефона",
font=('Helvetica', 14)
)
self.phone_entry.pack(pady=(0, 10), padx=20, fill="x")
# Кнопка поиска канала
find_button = ctk.CTkButton(
self.root,
text="Найти канал",
command=self.find_channel,
font=('Helvetica', 16)
)
find_button.pack(pady=10, padx=20, fill="x")
# Кнопка для управления каналами
start_button = ctk.CTkButton(
self.root,
text="Управление каналами",
command=self.manage_channels,
font=('Helvetica', 16)
)
start_button.pack(pady=(5, 5), padx=20, fill="x")
# Кнопка выхода
exit_button = ctk.CTkButton(
self.root,
text="Выход",
command=self.on_closing,
font=('Helvetica', 16)
)
exit_button.pack(pady=(5, 10), padx=20, fill="x")
# Метка для отображения состояния
self.monitoring_label = ctk.CTkLabel(
self.root,
text="",
font=('Helvetica', 18, 'bold'), # Крупный шрифт
text_color="red" # Красный цвет текста
)
self.monitoring_label.pack(pady=(20, 10)) # Отступы сверху и снизу
# Запуск основного цикла
self.root.mainloop()
def find_channel(self):
"""Находит канал по номеру телефона и делает окно активным."""
phone_number = self.phone_entry.get()
if not phone_number:
self.update_monitoring_label("Введите номер телефона!", "red")
return
try:
channel = self.channel_manager.channels.get(phone_number)
if channel:
self.update_monitoring_label(f"Канал найден: {phone_number}", "green")
# Делаем окно Selenium WebDriver активным
driver_title = channel.driver_manager.driver.title # Получаем заголовок окна
self.activate_window(driver_title) # Активируем окно
else:
self.update_monitoring_label("Канал не найден!", "red")
except Exception as e:
self.update_monitoring_label(f"Ошибка: {e}", "red")
def manage_channels(self):
"""Управляет каналами."""
self.update_monitoring_label("Управление каналами запущено!", "green")
threading.Thread(target=self.channel_manager.manage_channels, daemon=True).start()
def on_closing(self):
"""Закрывает приложение."""
self.root.destroy()
def update_monitoring_label(self, text, color):
"""Обновляет текст и цвет метки мониторинга."""
self.monitoring_label.configure(text=text, text_color=color)
class TablePrinter:
def __init__(self, logger_instance=None):
"""
Инициализирует TablePrinter с логгером Loguru.
:param logger_instance: Логгер Loguru (по умолчанию используется глобальный
logger
).
"""
self.logger = logger_instance or logger
def print_table(self, data, headers=None, tablefmt='grid'):
"""
Выводит данные в табличном формате с использованием loguru.
:param data: Список словарей или списков для отображения.
:param headers: Заголовки таблицы. Если None, берутся ключи первого словаря.
:param tablefmt: Формат таблицы (по умолчанию 'grid').
"""
if not data:
self.logger.warning("Нет данных для отображения.")
return
# Убедимся, что data — это список
if isinstance(data, dict):
data = [data] # Преобразуем словарь в список с одним элементом
# Если данные содержат ключ "fields", извлекаем их
if isinstance(data[0], dict) and 'fields' in data[0]:
data = [record['fields'] for record in data]
# Если данные - список словарей, автоматически извлекаем заголовки
if isinstance(data[0], dict):
headers = headers or data[0].keys()
table_data = [[record.get(header, '') for header in headers] for record in data]
else:
# Если данные - список списков, используем их напрямую
table_data = data
# Создаем строку таблицы
table_str = tabulate(table_data, headers=headers, tablefmt=tablefmt)
# Логируем с добавлением \n перед таблицей
self.logger.info(f"\n{table_str}")
class AirtableDatabase:
def __init__(self, api_key, base_id):
self.api_key = api_key
self.base_id = base_id
self.headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
self.request_lock = Lock()
self.schema = self._fetch_schema()
@sleep_and_retry
@limits(calls=5, period=1) # Ограничение: 5 запросов в секунду
def _send_request(self, url, params=None, json_data=None, headers=None, method="GET"):
if headers is None:
headers = self.headers
max_retries = 5 # Максимальное количество попыток
attempts = 0
while attempts < max_retries:
try:
attempts += 1
logger.debug(f"Попытка {attempts} отправки запроса {method} {url}")
response = requests.request(
method=method.upper(),
url=url,
params=params,
json=json_data,
headers=headers,
timeout=30
)
if response.status_code == 429:
logger.warning("Превышен лимит запросов. Ожидание 30 секунд...")
time.sleep(30) # Ожидание перед повтором
continue
response.raise_for_status() # Вызывает исключение для кодов ошибок HTTP
return response.json() # Успешный ответ, возвращаем JSON
except requests.exceptions.RequestException as e:
logger.error(f"Ошибка при запросе {url}: {e}")
if attempts < max_retries:
logger.info("Повтор запроса через 5 секунд...")
time.sleep(5) # Ожидание перед повторной попыткой
else:
logger.error(f"Не удалось выполнить запрос после {max_retries} попыток.")
return None
return None
def update_salebot_field(self, phone, status):
"""
Обновляет поле 'Salebot' в таблице 'Каналы' для указанного телефона.
:param phone: Номер телефона канала.
:param status: Новый статус ('Авторизован', 'Не авторизован', 'Нет').
"""
url = f'https://api.airtable.com/v0/{self.base_id}/каналы'
params = {'filterByFormula': f"{{телефон}}='{phone}'"}
response = self._send_request(url, params=params)
if response and response.get('records'):
record_id = response['records'][0]['id']
update_url = f'https://api.airtable.com/v0/{self.base_id}/каналы/{record_id}'
data = {"fields": {"Salebot": status}}
update_response = self._send_request(update_url, json_data=data, method="PATCH")
if update_response:
logger.info(f"Поле 'Salebot' успешно обновлено для телефона {phone} на {status}.")
else:
logger.error(f"Ошибка при обновлении поля 'Salebot' для телефона {phone}.")
else:
logger.warning(f"Канал с телефоном {phone} не найден.")
def _fetch_schema(self):
url = f'https://api.airtable.com/v0/meta/bases/{self.base_id}/tables'
logger.info("Получение схемы базы данных...")
return self._send_request(url)
def update_incoming_sms(self, client_number, channel_phone, incoming_messages):
"""
Обновляет поле "вх. смс" в таблице "расписание" для записи, где клиент и канал совпадают.
:param client_number: Номер клиента.
:param channel_phone: Номер телефона канала.
:param incoming_messages: Текст входящих сообщений для сохранения.
"""
try:
filter_formula = (
f"AND({{клиенты}}='{client_number}', ARRAYJOIN({{каналы}})='{channel_phone}')"
)
# Получаем запись для обновления
records = self._send_request(
url=f"https://api.airtable.com/v0/{self.base_id}/расписание",
params={"filterByFormula": filter_formula},
method="GET"
)
if records and records.get("records"):
record_id = records["records"][0]["id"]
update_data = {
"fields": {
"вх. смс": incoming_messages
}
}
# Обновляем запись
response = self._send_request(
url=f"https://api.airtable.com/v0/{self.base_id}/расписание/{record_id}",
json_data=update_data,
method="PATCH"
)
if response:
logger.info(f"Поле 'вх. смс' успешно обновлено для клиента {client_number}.")
else:
logger.error(f"Ошибка при обновлении поля 'вх. смс' для клиента {client_number}.")
else:
logger.warning(f"Запись для клиента {client_number} и канала {channel_phone} не найдена.")
except Exception as e:
logger.error(f"Ошибка при обновлении входящих сообщений: {e}")
def get_channel_status_options(self):
table_schema = next((t for t in self.schema['tables'] if t['name'] == 'каналы'), None)
if not table_schema:
raise ValueError("Таблица 'каналы' не найдена в схеме.")
field_schema = next((f for f in table_schema['fields'] if f['name'] == 'статус'), None)
if not field_schema or 'options' not in field_schema:
raise ValueError("Поле 'статус' не найдено или не содержит опций.")
return [choice['name'] for choice in field_schema['options']['choices']]
def update_channel_status(self, phone, status):
url = f'https://api.airtable.com/v0/{self.base_id}/каналы'
params = {
'filterByFormula': f"{{телефон}}='{phone}'"
}
response = self._send_request(url, params=params)
if response and response.get('records'):
channel = response['records'][0]
record_id = channel['id']
else:
raise ValueError(f"Канал с телефоном {phone} не найден.")
update_url = f'https://api.airtable.com/v0/{self.base_id}/каналы/{record_id}'
data = {
"fields": {
"статус": status
}
}
response = self._send_request(update_url, json_data=data, method="PATCH")
if response:
logger.info(f"Статус успешно обновлён на '{status}' для телефона {phone}.")
else:
logger.error(f"Ошибка при обновлении статуса для телефона {phone}.")
def update_field_cookies_by_phone(self, phone, text):
url = f'https://api.airtable.com/v0/{self.base_id}/каналы'
params = {
'filterByFormula': f"{{телефон}}='{phone}'"
}
response = self._send_request(url, params=params)
if response and response.get('records'):
channel = response['records'][0]
record_id = channel['id']
else:
raise ValueError(f"Канал с телефоном {phone} не найден.")
update_url = f'https://api.airtable.com/v0/{self.base_id}/каналы/{record_id}'
data = {
"fields": {
"куки": text
}
}
response = self._send_request(update_url, json_data=data, method="PATCH")
if response:
logger.info(f"Поле 'куки' успешно обновлено для телефона {phone}.")
else:
logger.error(f"Ошибка при обновлении поля 'куки' для телефона {phone}.")
def get_cookies_by_phone(self, phone):
url = f'https://api.airtable.com/v0/{self.base_id}/каналы'
params = {
'filterByFormula': f"{{телефон}}='{phone}'"
}
response = self._send_request(url, params=params)
if response and response.get('records'):
channel = response['records'][0]
cookies = channel['fields'].get('куки')
if cookies is None:
logger.info(f"Поле 'куки' для телефона {phone} пустое или отсутствует.")
else:
logger.info(f"Поле 'куки' успешно получено для телефона {phone}.")
return cookies
else:
raise ValueError(f"Канал с телефоном {phone} не найден.")
def get_channels_by_status_and_proxy(self, include_status=None, exclude_status=None, filter_proxy=True):
filters = []
if include_status:
status_conditions = [f"{{статус}}='{status}'" for status in include_status]
include_status_formula = f"OR({', '.join(status_conditions)})"
filters.append(include_status_formula)
if exclude_status:
status_conditions = [f"{{статус}}!='{status}'" for status in exclude_status]
exclude_status_formula = f"AND({', '.join(status_conditions)})"
filters.append(exclude_status_formula)
if filter_proxy:
proxy_formula = "NOT({прокси}='')"
filters.append(proxy_formula)
if filters:
filter_formula = f"AND({', '.join(filters)})" if len(filters) > 1 else filters[0]
else:
filter_formula = None
url = f'https://api.airtable.com/v0/{self.base_id}/каналы'
params = {}
if filter_formula:
params['filterByFormula'] = filter_formula
all_records = []
while True:
response = self._send_request(url, params=params)
if response:
all_records.extend(response.get('records', []))
if 'offset' in response:
params['offset'] = response['offset']
else:
break
else:
logger.error("Не удалось получить записи из таблицы 'каналы'.")
break
return all_records
def generate_channel_report_by_phone(self, phone):
url = f'https://api.airtable.com/v0/{self.base_id}/каналы'
params = {
'filterByFormula': f"{{телефон}}='{phone}'"
}
response = self._send_request(url, params=params)
if response and response.get('records'):
channel = response['records'][0]
channel_fields = channel.get('fields', {})
channel_id = channel.get('id')
else:
raise ValueError(f"Канал с телефоном {phone} не найден.")
def fetch_all_records(url, params=None):
"""
Fetch all records from Airtable, handling pagination via offset
.
"""
records = []
params = params or {}
while True:
response = self._send_request(url, params=params)
if response:
records.extend(response.get('records', []))
if 'offset' in response:
params['offset'] = response['offset']
else:
break
else:
logger.error(f"Не удалось получить данные из {url}")
break
return records
def fetch_rk_data(rk_id):
rk_url = f'https://api.airtable.com/v0/{self.base_id}/рк'
rk_params = {'filterByFormula': f"RECORD_ID()='{rk_id}'"}
rk_records = fetch_all_records(rk_url, params=rk_params)
if not rk_records:
return {}
rk_fields = rk_records[0].get('fields', {})
# Fetching voronka data in parallel
voronka_ids = rk_fields.get('воронка', [])
with concurrent.futures.ThreadPoolExecutor() as executor:
voronka_data = list(executor.map(fetch_voronka_data, voronka_ids))
rk_fields['воронка'] = voronka_data
return rk_fields
def fetch_voronka_data(voronka_id):
voronka_url = f'https://api.airtable.com/v0/{self.base_id}/воронка'
voronka_params = {'filterByFormula': f"RECORD_ID()='{voronka_id}'"}
voronka_records = fetch_all_records(voronka_url, params=voronka_params)
return voronka_records[0].get('fields', {}) if voronka_records else {}
# Fetching RK data in parallel
rk_ids = channel_fields.get('рк', [])
with concurrent.futures.ThreadPoolExecutor() as executor:
rk_data_list = list(executor.map(fetch_rk_data, rk_ids))
# Fetching schedule data
schedule_url = f'https://api.airtable.com/v0/{self.base_id}/расписание'
schedule_params = {
'filterByFormula': f"FIND('{channel_id}', ARRAYJOIN({{каналы}}))"
}
schedule_records = fetch_all_records(schedule_url, params=schedule_params)
# Assembling the report
channel_data = {
"channel_info": channel_fields,
"rk": rk_data_list,
"schedule": [record.get('fields', {}) for record in schedule_records]
}
report = {
phone: channel_data
}
return json.dumps(report, ensure_ascii=False, indent=4)
def export_schema_to_json(self):
schema_details = []
for table in self.schema['tables']:
table_info = {
"table_name": table["name"],
"table_id": table["id"],
"fields": []
}
for field in table["fields"]:
field_info = {
"field_name": field["name"],
"field_id": field["id"],
"field_type": field["type"]
}
if "options" in field:
field_info["options"] = field["options"]
table_info["fields"].append(field_info)
schema_details.append(table_info)
return schema_details
def print_schema(self):
schema_json = self.export_schema_to_json()
logger.info("Схема базы данных:")
print(json.dumps(schema_json, ensure_ascii=False, separators=(',', ':')))
def save_cookies_by_phone(self, phone, cookies):
"""
Сохраняет куки в таблицу Airtable для записи с указанным номером телефона.
:param phone: Номер телефона записи.
:param cookies: Куки для сохранения (в формате JSON).
"""
url = f'https://api.airtable.com/v0/{self.base_id}/каналы'
params = {
'filterByFormula': f"{{телефон}}='{phone}'"
}
response = self._send_request(url, params=params)
if response and response.get('records'):
channel = response['records'][0]
record_id = channel['id']
else:
raise ValueError(f"Канал с телефоном {phone} не найден.")
update_url = f'https://api.airtable.com/v0/{self.base_id}/каналы/{record_id}'
data = {
"fields": {
"куки": cookies # Сохраняем куки
}
}
response = self._send_request(update_url, json_data=data, method="PATCH")
if response:
logger.info(f"Куки успешно сохранены в таблице для телефона {phone}.")
else:
logger.error(f"Ошибка при сохранении кук для телефона {phone}.")
# Класс для отправки уведомлений в Telegram
class Telegram_cls:
def __init__(self, who=None):
if who is not None:
self.chat_id = '6673887542'
self.token = 'bot7420892625:AAHtsLqQMRngHRyIPo_hqevFko7G3--zhKQ'
else:
self.chat_id = '6673887542'
self.token = 'bot7420892625:AAHtsLqQMRngHRyIPo_hqevFko7G3--zhKQ'
def send_message(self, text):
url = f"https://api.telegram.org/{self.token}/sendMessage"
payload = {"chat_id": self.chat_id, "text": text, "parse_mode": "html"}
try:
response = requests.post(url, json=payload)
return response.json()
except Exception as e:
logger.error(f"Ошибка при отправке сообщения в Telegram: {e}")
def send_photo(self, photo, caption=None):
url = f"https://api.telegram.org/{self.token}/sendPhoto"
payload = {"chat_id": self.chat_id}
if caption:
payload["caption"] = caption
files = {"photo": open(photo, "rb")}
try:
response = requests.post(url, data=payload, files=files)
return response.json()
except Exception as e:
logger.error(f"Ошибка при отправке фото в Telegram: {e}")
def send_file(self, file_path):
url = f"https://api.telegram.org/{self.token}/sendDocument"
try:
with open(file_path, "rb") as f:
response = requests.post(url, data={"chat_id": self.chat_id}, files={"document": f})
return response.json()
except Exception as e:
logger.error(f"Ошибка при отправке файла в Telegram: {e}")
# Класс для управления веб-драйвером Selenium
class DriverManager:
def __init__(self, proxy=None,user_agent=None, phone_number=None):
self.logger = logger
self.driver_id = str(uuid.uuid4())
self.user_agent = user_agent or "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
self.proxy = proxy # Прокси может быть None
self.phone_number = phone_number
self.driver = self.create_driver()
self.cookies_file = f"cookies_{self.driver_id}.txt"
def wait_element_from_dict(self, elements_dict, time_out=30):
"""
Ожидает элемент, указанный в словаре элементов, и возвращает название ключа найденного элемента.
:param elements_dict: Словарь, где ключи — названия элементов, а значения — их XPath.
:param time_out: Время ожидания в секундах.
:return: Название ключа найденного элемента или None, если элемент не найден.
"""
count = 0
while count < time_out:
for key, xpath in elements_dict.items():
try:
# Пытаемся найти элемент по текущему XPath
element = self.driver.find_element(By.XPATH, xpath)
if element:
self.logger.info(f"Найден элемент '{key}' по XPath: {xpath}")
return key # Возвращаем ключ найденного элемента
except Exception:
# Если элемент не найден, продолжаем цикл
continue
# Ждем 1 секунду перед следующей проверкой
time.sleep(1)
count += 1
self.logger.warning(f"Не удалось найти элементы из словаря: {list(elements_dict.keys())}")
return None
def wait_elements(self,locator_type, locator, new_elem=None, time_out=120):
# счетчик попыток
count = 0
elements = []
while not elements:
try:
# попытка найти элементы на странице
if new_elem == None:
elements = self.driver.find_elements(locator_type, locator)
else:
elements = new_elem.find_elements(locator_type, locator)
# Если список не пуст, прерываем цикл
if elements:
break
# Если список пуст, увеличиваем счетчик попыток
else:
count += 1
except:
count += 1
# Если счетчик превышает time_out, возвращаем пустой список
if count > time_out:
return []
# Ждем 1 секунду перед следующей попыткой
time.sleep(1)
return elements
def wait_element(self,type1, elem, new_elem=None, time_out=10):
# счетчик попыток
count = 0
next1 = False
while next1 == False:
try:
# попытка найти элемент на странице
if new_elem == None:
element1 = self.driver.find_element(type1, elem)
else:
element1 = new_elem.find_element(type1, elem)
next1 = True
count = count + 1
except:
if count > time_out:
# _log("Не нашел элемент", "красный")
return False
count = count + 1
time.sleep(1)
next1 = False
if next1 == False:
return False
else:
return element1
def create_driver(self):
port = random.randint(9223, 30500)
with open('path_to_chromedriver.txt', 'r', encoding='utf-8') as file:
chromedriver_path = file.read().strip()
chrome_service = ChromeService(chromedriver_path)
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument(f'--remote-debugging-port={port}')
chrome_options.add_argument(f'--user-agent={self.user_agent}')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
profiles_base_path = "C:\\Users\\regis\\Downloads\\profiles" # Здесь укажите ваш путь
profile_dir = os.path.join(profiles_base_path, self.phone_number)
if not os.path.exists(profile_dir):
os.makedirs(profile_dir, exist_ok=True)
# profile_dir = os.path.join("profiles", self.phone_number)
# if not os.path.exists(profile_dir):
# os.makedirs(profile_dir, exist_ok=True)
chrome_options.add_argument('--allow-profiles-outside-user-dir')
chrome_options.add_argument('--enable-profile-shortcut-manager')
chrome_options.add_argument(f'--user-data-dir={profile_dir}')
chrome_options.add_argument('--profile-directory=Profile 1')
chrome_options.add_argument('--profiling-flush=n')
chrome_options.add_argument('--enable-aggressive-domstorage-flushing')
# Настройки прокси для Selenium Wire
if self.proxy:
proxy_host, proxy_port, proxy_user, proxy_pass = self.proxy.split(':')
seleniumwire_options = {
'proxy': {
'http': f'http://{proxy_user}:{proxy_pass}@{proxy_host}:{proxy_port}',
'https': f'https://{proxy_user}:{proxy_pass}@{proxy_host}:{proxy_port}',
'no_proxy': 'localhost,127.0.0.1' # Исключаем локальные адреса из прокси
}
}
else:
seleniumwire_options = {}
while True:
try:
self.driver = webdriver.Chrome(
service=chrome_service,
options=chrome_options,
seleniumwire_options=seleniumwire_options
)
break
except Exception as e:
trace = traceback.format_exc()
telegram_bot = Telegram_cls(who="me")
telegram_bot.send_message(f"An error occurred: {trace}")
telegram_bot.send_message(f"Ошибка при создании веб-драйвера. Возможно, версия chromedriver устарела.")
time.sleep(3)
continue
# Выполняем необходимые команды CDP
self.driver.execute_cdp_cmd('Network.enable', {})
self.driver.execute_cdp_cmd('Page.enable', {})
self.driver.execute_cdp_cmd('Browser.getVersion', {})
return self.driver
def open_page(self, url):
try:
self.driver.get(url)
logger.info(f"Страница {url} успешно открыта.")
except Exception as e:
logger.error(f"Ошибка при открытии страницы {url}: {e}")
self.handle_error(e, "open_page")
def save_cookies(self):
"""
Возвращает куки драйвера в формате JSON.
"""
try:
cookies = self.driver.get_cookies()
cookies_json = json.dumps(cookies) # Преобразуем куки в JSON
logger.info("Куки успешно получены.")
return cookies_json # Возвращаем JSON вместо сохранения в файл
except Exception as e:
logger.error(f"Ошибка при получении куков: {e}")
return None
# def save_cookies(self):
# try:
# cookies = self.driver.get_cookies()
# with open(self.cookies_file, 'w') as file:
# json.dump(cookies, file)
# logger.info(f"Cookies сохранены в {self.cookies_file}")
# except Exception as e:
# logger.error(f"Ошибка при сохранении cookies: {e}")
def load_cookies(self):
if os.path.exists(self.cookies_file):
try:
with open(self.cookies_file, 'r') as file:
cookies = json.load(file)
for cookie in cookies:
self.driver.add_cookie(cookie)
logger.info(f"Cookies загружены из {self.cookies_file}")
self.driver.refresh()
except Exception as e:
logger.error(f"Ошибка при загрузке cookies: {e}")
else:
logger.warning(f"Файл cookies {self.cookies_file} не найден.")
def intercept_request(self, keyword):
# Метод для перехвата запросов (реализуется по необходимости)
pass
def execute_request_with_fetch(self, url, headers, data_json, request_type='POST'):
# Метод для выполнения запроса с помощью JavaScript fetch
pass
def save_to_excel(self, data):
# Метод для сохранения данных в Excel
pass
def load_existing_data(self):
# Метод для загрузки существующих данных из Excel
pass
def handle_error(self, error, context):
logger.error(f"Ошибка в {context}: {error}")
Telegram_cls().send_message(f"Ошибка в {context}: {error}")
def quit_driver(self):
if self.driver:
self.driver.quit()
logger.info("Веб-драйвер закрыт.")
# Класс, представляющий отдельный канал
class Channel:
DEFAULT_PROXY = "127.0.0.1:8080:default_user:default_pass"
DEFAULT_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36"
def __init__(self, phone_number, airtable_db, mode='selenium'):
self.phone_number = phone_number
self.airtable_db = airtable_db
self.mode = mode # Уст
self.data = None
self.logger = logger
self.proxy = None
self.cookies = None
self.salebot_api = SaleBotAPI()
self.user_agent = None
self.chat_fetch_thread = None
self.poller_thread = None
self.auth_check_thread = None
self.lock = threading.Lock() # Добавляем Lock для синхронизации
if mode == 'selenium':
self.update_data()
if mode == 'salebot':
self.update_data()
self.update_salebot_data()
def update_salebot_data(self):
"""
Обновляет данные авторизации и статус канала в Salebot.
"""
try:
self.salebot_api.load_whatsapp_channels()
salebot_channels = self.salebot_api.whatsapp_channels
channel_data = salebot_channels.get(self.phone_number, {})
self.data['salebot'] = channel_data
# Определяем статус авторизации
is_authorized = channel_data.get("authorized", False)
status = "Авторизован" if is_authorized else "Не авторизован" if channel_data else "Нет"
# Обновляем поле Salebot в Airtable
self.airtable_db.update_salebot_field(self.phone_number, status)
self.logger.info(f"Salebot: Канал {self.phone_number} обновлен, статус - {status}.")
except Exception as e:
self.logger.error(f"Ошибка при обновлении данных Salebot для канала {self.phone_number}: {e}")
self.start_auth_check()
def start_auth_check(self):
"""
Запускает периодическую проверку статуса авторизации.
"""
if self.auth_check_thread and self.auth_check_thread.is_alive():
return # Если поток уже запущен, ничего не делаем
def auth_checker():
while True:
self.update_salebot_data()
time.sleep(180) # Проверка каждые 3 минуты
self.auth_check_thread = threading.Thread(target=auth_checker, daemon=True)
self.auth_check_thread.start()
def start_chat_fetcher(self):
"""
Запускает отдельный поток для регулярного сбора истории чатов.
"""
if self.chat_fetch_thread is not None and self.chat_fetch_thread.is_alive():
return # Уже запущен
def fetcher():
while True:
with self.lock: # Используем блокировку, чтобы не пересекаться с отправкой сообщений
self.fetch_chat_history()
time.sleep(60) # Сбор каждые 60 секунд
self.chat_fetch_thread = threading.Thread(target=fetcher, daemon=True)
self.chat_fetch_thread.start()
def fetch_chat_history(self):
def format_phone_number(phone):
digits = re.sub(r'\D', '', phone)
if len(digits) == 11 and digits.startswith('8'):
digits = '7' + digits[1:]
if len(digits) == 11 and digits.startswith('7'):
return digits
return None
try:
driver = self.driver_manager.driver
chat_elements = driver.find_elements(By.CSS_SELECTOR, '[role="grid"] [role="listitem"]')
self.logger.info(f"Найдено чатов: {len(chat_elements)}")
all_chats = []
for i, chat in enumerate(chat_elements):
self.logger.info(f"Обрабатываем чат №{i + 1}")
# Извлекаем номер и имя из списка чатов
try:
chatNameElement = chat.find_element(By.CSS_SELECTOR, 'span[title]')
chatName = chatNameElement.text.strip()
chatNumber = chatNameElement.get_attribute('title').strip()
except Exception as e:
self.logger.warning(f"Чат №{i + 1}: Не удалось извлечь номер/имя из списка. Ошибка: {e}")
chatName = f"Chat {i + 1}"
chatNumber = "Unknown"
self.logger.info(f"Чат №{i + 1}: Имя из списка: {chatName}, Номер из списка: {chatNumber}")
# Проверяем наличие непрочитанных сообщений
try:
unreadBadge = chat.find_element(By.CSS_SELECTOR, 'span[aria-label*="непрочитанное сообщение"]')
hasUnreadMessage = True
except:
hasUnreadMessage = False
self.logger.info(f"Чат №{i + 1}: Непрочитанные сообщения: {hasUnreadMessage}")
# Скроллим к чату и кликаем
driver.execute_script("arguments[0].scrollIntoView();", chat)
self.logger.info(f"Чат №{i + 1}: Прокрутка в видимую область...")
from selenium.webdriver import ActionChains
actions = ActionChains(driver)
actions.move_to_element(chat).click().perform()
self.logger.info(f"Чат №{i + 1}: Клик по чату выполнен.")
# Ждем подгрузки чата
time.sleep(3)
# Извлекаем сообщения из открытого чата
messages = []
message_elements = driver.find_elements(By.CSS_SELECTOR, '#main .copyable-text')
self.logger.info(f"Чат №{i + 1}: Найдено сообщений: {len(message_elements)}")
for index, element in enumerate(message_elements, start=1):
prePlainText = element.get_attribute('data-pre-plain-text')
try:
content_element = element.find_element(By.CSS_SELECTOR, 'span.selectable-text')
content = content_element.text.strip()
except:
content = ''
if prePlainText and content:
timestampMatch = re.search(r'\[(.*?)\]', prePlainText)
timestamp = timestampMatch.group(1) if timestampMatch else ''
isOutgoing = 'Вы:' in prePlainText
messages.append({
'id': index,
'type': 'out' if isOutgoing else 'in',
'content': content,
'timestamp': timestamp
})
self.logger.info(f"Чат №{i + 1}: Извлечено сообщений: {len(messages)}")
all_chats.append({
"client": {
"name": chatName,
"number": chatNumber
},
"messages": messages,
"hasUnreadMessage": hasUnreadMessage
})
self.chat_history = all_chats
self.logger.info("Все чаты обработаны. Результат:")
self.logger.info(json.dumps(self.chat_history, ensure_ascii=False, indent=2))
# Пытаемся сохранить сообщения в Airtable
for chat in self.chat_history:
raw_number = chat["client"]["number"]
self.logger.info(f"Исходный номер из списка: {raw_number}")
client_number = format_phone_number(raw_number)
self.logger.info(f"Отформатированный номер: {client_number}")
formatted_messages = []
for msg in chat["messages"]:
timestamp = f"[{msg['timestamp']}]"
content = msg["content"]
if msg['type'] == 'in':
# Входящее сообщение - без отступов
line = f"{timestamp} {content}"
else:
# Исходящее сообщение - с большим отступом слева
line = f"{' ' * 50}{timestamp} {content}"
formatted_messages.append(line)
incoming_messages = "\n".join(formatted_messages)
if incoming_messages and client_number:
self.logger.info(f"Сохраняем сообщения для клиента {client_number}.")
self.airtable_db.update_incoming_sms(
client_number=client_number,
channel_phone=self.phone_number,
incoming_messages=incoming_messages
)
else:
self.logger.info("Нет сообщений или клиентский номер не определён. Пропускаем запись.")
except Exception as e:
self.logger.error(f"Ошибка при извлечении истории переписки: {e}")
self.handle_errors(e)
def check_and_handle_qr_code(self):
chats_xpath = "//h1[text()='Чаты']"
loading_xpath = "//*[contains(text(),'Защищено сквозным шифрованием')]" # Примерный локатор загрузочного экрана
qr_xpath = "//canvas[@aria-label='Scan this QR code to link a device!']"
# Шаг 1: Проверяем, доступны ли чаты сразу
logger.info("Проверяем доступность чатов.")
chats_loaded = self.driver_manager.wait_element(By.XPATH, chats_xpath, time_out=5)
if chats_loaded:
self.logger.info("Чаты обнаружены. Канал уже авторизован.")
self.airtable_db.update_channel_status(self.phone_number, "ready")
self.start_message_poller()
# self.start_chat_fetcher()
return
# Шаг 2: Если чатов нет, проверяем наличие загрузочного экрана
logger.info("Проверяем наличие загрузочного экрана")
loading_present = self.driver_manager.wait_element(By.XPATH, loading_xpath, time_out=3)
if loading_present:
self.logger.info(
"Отображается загрузочный экран WhatsApp. Ждем завершения загрузки или появления чатов.")
# Периодически проверяем, не появились ли чаты или не исчез ли загрузочный экран.
# Задаем общий таймаут ожидания, например, 120 секунд.
timeout = 120
elapsed = 0
while elapsed < timeout:
# Снова пробуем найти чаты
logger.info("Проверяем наличие чатов в цикле.")
chats_loaded = self.driver_manager.wait_element(By.XPATH, chats_xpath, time_out=2)
if chats_loaded:
self.logger.info("Чаты появились после ожидания загрузки!")
self.airtable_db.update_channel_status(self.phone_number, "ready")
self.start_message_poller()
# self.start_chat_fetcher()
return
# Проверяем, не исчез ли загрузочный экран
try:
# Если элемент всё еще есть — просто ждём
self.driver_manager.driver.find_element(By.XPATH, loading_xpath)
except:
# Если тут исключение — значит элемент исчез. Переходим к проверке QR-кода
self.logger.info("Загрузочный экран исчез, но чаты не появились. Проверяем QR-код.")
break
time.sleep(2)
elapsed += 2
# Шаг 3: Если дошли сюда, значит чаты не найдены даже после исчезновения загрузки. Проверяем QR-код.
logger.info("Проверяем QR-код")
qr_found = self.driver_manager.wait_element(By.XPATH, qr_xpath, time_out=5)
if qr_found:
self.logger.info("QR-код обнаружен, аккаунт разлогинен. Ожидаем повторной авторизации...")
self.airtable_db.update_channel_status(self.phone_number, "авторизаваться")
# Ждём появления чатов долго, т.к. нужно авторизоваться заново
chats_loaded = self.driver_manager.wait_element(By.XPATH, chats_xpath, time_out=1200)
if chats_loaded:
self.logger.info("Чаты успешно загружены после повторной авторизации!")
self.airtable_db.update_channel_status(self.phone_number, "ready")
self.start_message_poller()
# self.start_chat_fetcher()
else:
self.logger.warning("Чаты не загрузились после повторной попытки авторизации.")
self.handle_errors("Не удалось авторизоваться повторно - чаты не появились.")
else:
# Нет ни чатов, ни QR-кода и загрузочный экран исчез.
# Это непредвиденная ситуация — возможно, стоит дополнительно обработать.
self.logger.warning("Нет чатов, нет QR-кода, загрузочный экран исчез. Неизвестное состояние.")
self.handle_errors("Не удалось авторизоваться повторно - чаты не появились в канале ")
# При необходимости можно вызвать handle_errors или повторить попытку проверки.
def update_data(self):
try:
report = self.airtable_db.generate_channel_report_by_phone(self.phone_number)
self.data = json.loads(report)
logger.debug(f"Обновлённые данные: {self.data}")
first_key = next(iter(self.data))
self.data = self.data.get(first_key)
self.data = self.data.get('channel_info')
if not isinstance(self.data, dict):
raise ValueError(f"Ожидается словарь, но получен {type(self.data).__name__}")
logger.info(f"Данные канала {self.phone_number} успешно обновлены.")
printer = TablePrinter()
# Преобразуем словарь в список, чтобы корректно отобразить таблицу
printer.print_table([self.data], headers=self.data.keys())
# Извлекаем прокси и User-Agent или устанавливаем значения по умолчанию
self.proxy = self.data.get('прокси', None)
self.proxy, self.user_agent = self.extract_proxy_and_user_agent(self.data.get('прокси', None))
if not self.user_agent:
self.user_agent = self.DEFAULT_USER_AGENT
logger.info(f"Используется стандартный User-Agent: {self.user_agent}")
logger.info(f"Используется прокси: {self.proxy}")
logger.info(f"Используется User-Agent: {self.user_agent}")
self.driver_manager = DriverManager(proxy= None, user_agent= self.user_agent,phone_number = self.phone_number)
# self.driver_manager = DriverManager(self.proxy, self.user_agent)
except Exception as e:
logger.error(f"Ошибка при обновлении данных: {e}", exc_info=True)
self.handle_errors(e)
@staticmethod
def extract_proxy_and_user_agent(proxy_string):
if not proxy_string:
return None, None
# Разбиваем строку по пробелу один раз
parts = proxy_string.split(" ", 1)
def is_proxy_format(s):
return s.count(':') >= 1
if len(parts) == 2:
possible_proxy = parts[0].strip()
possible_ua = parts[1].strip()
# Если первая часть выглядит как прокси, тогда возвращаем её и юзер-агент
if is_proxy_format(possible_proxy):
proxy = possible_proxy
user_agent = possible_ua
return proxy, user_agent
else:
# Первая часть не является прокси, значит вся строка — это User-Agent
# Прокси нет
return None, proxy_string.strip()
else:
# Нет пробела или не удалось разделить
# Значит вся строка — это User-Agent, без прокси
return None, proxy_string.strip()
def process_incoming_messages(self):
"""
Функция, которая получает сообщения с сервера и отправляет их клиентам.
"""
grouped_messages = self.fetch_messages() # Получаем сгруппированные сообщения
if not grouped_messages: # Проверяем, пуст ли словарь
# self.logger.info(f"Нет сообщений для отправки для номера канала {self.phone_number}.")
return # Если сообщений нет, завершить функцию
for client_phone, messages in grouped_messages.items():
if not client_phone or not isinstance(messages, list): # Проверяем корректность ключа и значений
self.logger.warning(f"Некорректные данные для клиента: {client_phone} -> {messages}")
continue
for msg in messages:
if not isinstance(msg, dict): # Проверяем корректность структуры сообщения
self.logger.warning(f"Пропущено сообщение с некорректным форматом: {msg}")
continue
message_text = msg.get('message_text')
if message_text:
self.send_whatsapp_message(client_phone, message_text)
else:
self.logger.warning(f"Пропущено сообщение с некорректным текстом: {msg}")
def send_whatsapp_message(self, client_phone, message_text):
logger.info(f"Отправка сообщения клиенту {client_phone}: {message_text}")
with self.lock: # Используем блокировку для синхронизации
encoded_text = urllib.parse.quote(message_text)
send_url = f"https://web.whatsapp.com/send?phone={client_phone}&text={encoded_text}"
self.driver_manager.open_page(send_url)
self.check_and_handle_qr_code()
try:
# Ждём появления кнопки отправки
logger.info(f"Ждём появления кнопки отправки сообщения клиенту {client_phone}")
send_button = self.driver_manager.wait_element(By.XPATH,
"//button[@aria-label='Отправить']",
time_out=120)
if send_button:
# Используем ActionChains для клика
actions = ActionChains(self.driver_manager.driver)
actions.move_to_element(send_button).click().perform()
time.sleep(1)
self.logger.info(f"Сообщение клиенту {client_phone} отправлено.")
else:
self.logger.warning("Не удалось найти кнопку отправки сообщения.")
self.handle_errors("Не удалось найти кнопку отправки сообщения.")
self.check_and_handle_qr_code()
except Exception as e:
self.check_and_handle_qr_code()
self.handle_errors(e)
self.logger.error(f"Ошибка при отправке сообщения клиенту {client_phone}: {e}")
def fetch_messages(self):
"""
Получаем сообщения с удалённого PHP-скрипта для заданного номера канала.
"""
url = "http://94.241.171.167/whatsapp_answer.php"
try:
params = {'channel_phone': self.phone_number} # Передаём channel_phone как параметр
response = requests.get(url, params=params)
if response.ok:
data = response.json()
if isinstance(data, dict): # Проверяем, что данные пришли в ожидаемом формате
return data # Возвращаем объект, где ключи - client_phone, значения - массив сообщений
else:
self.logger.error(f"Неверный формат данных: {data}")
return {}
else:
self.logger.error(f"Ошибка при запросе: {response.status_code}")
return {}
except Exception as e:
self.logger.error(f"Ошибка при получении сообщений: {e}")
return {}
def start_message_poller(self):
"""
Запускает отдельный поток, который каждые 10 секунд опрашивает сервер и отправляет сообщения.
"""
if self.poller_thread is not None and self.poller_thread.is_alive():
return # Уже запущен
def poll():
while True:
self.process_incoming_messages()
time.sleep(10) # Пауза 10 секунд
self.poller_thread = threading.Thread(target=poll, daemon=True)
self.poller_thread.start()
def perform_salebot_actions(self):
self.logger.info("Выполняем действия с помощью Salebot.")
def perform_actions(self):
"""
Выполняет действия в зависимости от выбранного режима.
"""
try:
if self.mode == 'selenium':
self.perform_selenium_actions()
elif self.mode == 'salebot':
pass
# self.perform_salebot_actions()
else:
raise ValueError(f"Неизвестный режим: {self.mode}")
except Exception as e:
self.handle_errors(e)
def perform_selenium_actions(self):
try:
target_url = 'https://web.whatsapp.com/'
self.driver_manager.open_page(target_url)
self.logger.info(f"Открываем страницу {target_url}")
elements = {
"login_button": "//a[span[contains(text(),'Войти')]]",
"qr_code": "//canvas[@aria-label='Scan this QR code to link a device!']",
"chats_header": "//h1[text()='Чаты']"
}
found_key = self.driver_manager.wait_element_from_dict(elements, time_out=30)
if not found_key:
# Если ни один из элементов не найден – логируем и обрабатываем ошибку
self.logger.warning("Не удалось найти элементы из словаря.")
self.handle_errors("Не удалось найти элементы 'Войти', 'Чаты' или 'QR-код'")
self.airtable_db.update_channel_status(self.phone_number, "авторизаваться")
return
# Определим отдельный метод для завершения процесса и запуска поллера
def finalize_authorization():
self.logger.info("Чаты успешно загружены!")
self.airtable_db.update_channel_status(self.phone_number, "ready")
self.start_message_poller()
# self.start_chat_fetcher()
# Определим отдельный метод ожидания полной авторизации (появления чатов)
def wait_for_chats():
self.driver_manager.wait_element(
By.XPATH,
elements["chats_header"],
time_out=1200000
)
if found_key == "login_button":
# Логика для кнопки "Войти"
elem = self.driver_manager.driver.find_element(By.XPATH, elements["login_button"])
href = elem.get_attribute("href")
if href:
self.driver_manager.open_page(href)
time.sleep(10)
self.logger.info("QR-код обнаружен, ожидаем завершения авторизации...")
self.airtable_db.update_channel_status(self.phone_number, "авторизаваться")
wait_for_chats()
finalize_authorization()
elif found_key == "qr_code":
# Логика для QR-кода
self.logger.info("QR-код обнаружен, ожидаем завершения авторизации...")
self.airtable_db.update_channel_status(self.phone_number, "авторизаваться")
wait_for_chats()
finalize_authorization()
elif found_key == "chats_header":
# Если сразу заголовок "Чаты" - значит уже авторизованы
finalize_authorization()
except Exception as e:
self.handle_errors(e)
def handle_errors(self, error):
traceback_info = traceback.format_exc()
logger.error(f"Ошибка в канале {self.phone_number}: {error}")
logger.error(traceback_info)
Telegram_cls().send_message(f"Ошибка в канале {self.phone_number}: {error} и трасе {traceback_info}")
# Дополнительная обработка ошибки
def close(self):
self.driver_manager.quit_driver()
logger.info(f"Канал {self.phone_number} закрыт.")
# Класс для управления множеством каналов
class ChannelManager:
def __init__(self, airtable_db, mode='selenium'):
self.airtable_db = airtable_db
self.channels = {}
self.logger = logger
self.mode = mode
self.load_channels()
def start_command_poller(self):
"""
Запускает отдельный поток, который каждые 10 секунд отправляет запрос
по адресу http://94.241.171.167/command_answer.php.
Если ответ содержит {"channel_phone": "XXX", "command": True},
то вызываем perform_selenium_actions() для указанного канала.
"""
def poll_commands():
while True:
try:
response = requests.get("http://94.241.171.167/command_answer.php")
if response.ok:
data = response.json()
# Предполагаемый формат ответа:
# {"channel_phone": "79998887766", "command": True}
# При необходимости изменить логику разбора в соответствии с реальным ответом
channel_phone = data.get("channel_phone")
command = data.get("command", False)
if command and channel_phone in self.channels:
channel = self.channels[channel_phone]
self.logger.info(
f"Получена команда: для канала {channel_phone} возвращено True. Открываем страницу.")
channel.perform_selenium_actions()
if command['type'] == 'text':
channel.send_whatsapp_message(command['phone'],command['message'])
else:
self.logger.warning(f"Запрос к command_answer.php вернул статус {response.status_code}")
except Exception as e:
self.logger.error(f"Ошибка при опросе command_answer.php: {e}")
time.sleep(10) # Опрашиваем каждые 10 секунд
threading.Thread(target=poll_commands, daemon=True).start()
def check_whatsapp_channels_authorization(self):
"""
Проверяет статус авторизации всех каналов WhatsApp каждые 3 минуты.
Если канал из таблицы отсутствует в whatsapp_channels, устанавливает статус "Нет".
"""
sale_bot_api = SaleBotAPI()
def check_authorization():
while True:
try:
# Загрузка всех подключенных каналов WhatsApp через SaleBotAPI
sale_bot_api.load_whatsapp_channels()
whatsapp_channels = sale_bot_api.whatsapp_channels
# Получаем все каналы из Airtable
airtable_records = self.airtable_db.get_channels_by_status_and_proxy()
airtable_phones = [record['fields']['телефон'] for record in airtable_records if 'fields' in record]
# Проверяем статус для каждого канала из Airtable
for phone in airtable_phones:
channel_data = whatsapp_channels.get(phone)
if channel_data:
# Если канал найден, обновляем статус в зависимости от авторизации
is_authorized = channel_data.get('authorized', False)
status = "Авторизован" if is_authorized else "Не авторизован"
else:
# Если канала нет в whatsapp_channels, ставим статус "Нет"
status = "Нет"
# Обновляем поле Salebot в Airtable
self.airtable_db.update_salebot_field(phone, status)
logger.info(f"Канал {phone}: Статус авторизации - {status}")
# Логируем, если есть каналы в WhatsApp, которых нет в Airtable
whatsapp_only_phones = set(whatsapp_channels.keys()) - set(airtable_phones)
if whatsapp_only_phones:
logger.info(f"Каналы, присутствующие только в WhatsApp: {whatsapp_only_phones}")
time.sleep(180) # Ожидаем 3 минуты перед следующим циклом
except Exception as e:
logger.error(f"Ошибка в check_whatsapp_channels_authorization: {e}")
threading.Thread(target=check_authorization, daemon=True).start()
def load_channels(self):
try:
# Получаем записи из Airtable
if self.mode == 'salebot':
records = self.airtable_db.get_channels_by_status_and_proxy()
if self.mode == 'selenium':
records = self.airtable_db.get_channels_by_status_and_proxy(
include_status = ['в работе'], filter_proxy = False
)
logger.debug(f"Загружено {len(records)} записей из Airtable.")
# Печать таблицы
printer = TablePrinter()
printer.print_table(
records, # Данные из Airtable
headers=["телефон", "статус", "прокси"] # Ключи из fields
)
def process_channel(record):
"""
Обрабатывает запись и добавляет канал в self.channels
.
"""
fields = record.get('fields', {})
phone = fields.get('телефон')
if phone and phone not in self.channels:
self.channels[phone] = Channel(phone, self.airtable_db, self.mode)
logger.debug(f"Канал {phone} успешно добавлен.")
# Обработка записей с задержкой запуска
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for i, record in enumerate(records):
# Задержка запуска следующей функции
# time.sleep(5 * i)
futures.append(executor.submit(process_channel, record))
# Ожидание завершения всех потоков
concurrent.futures.wait(futures)
logger.info("Каналы успешно загружены.")
except Exception as e:
self.handle_errors(e)
def update_channels(self):
try:
current_phones = set(self.channels.keys())
records = self.airtable_db.get_channels_by_status_and_proxy(include_status=['Активен'])
new_phones = set()
for record in records:
phone = record['fields'].get('телефон')
if phone:
new_phones.add(phone)
if phone not in self.channels:
self.channels[phone] = Channel(phone, self.airtable_db)
# Удаляем каналы, которых больше нет в базе
phones_to_remove = current_phones - new_phones
for phone in phones_to_remove:
self.channels[phone].close()
del self.channels[phone]
logger.info("Каналы успешно обновлены.")
except Exception as e:
self.handle_errors(e)
def manage_channels(self):
try:
for channel in self.channels.values():
channel.perform_actions()
except Exception as e:
self.handle_errors(e)
def handle_errors(self, error):
traceback_info = traceback.format_exc()
logger.error(f"Ошибка в ChannelManager: {error}")
logger.error(f"Traceback: {traceback_info}")
message = f"Ошибка в ChannelManager: {error}\n\nTraceback:\n{traceback_info}"
Telegram_cls().send_message(message)
# Дополнительная обработка ошибки
def close_all_channels(self):
for channel in self.channels.values():
channel.close()
logger.info("Все каналы закрыты.")
# Пример использования
if __name__ == "__main__":
API_KEY = 'patLUEptBlh6yASUs.be5b15b1c3390f5d1c35e28d5cff7016531bc02fdb3c634a343d3060684010a7'
BASE_ID = 'appz2O6S5Y0TsiAsn'
airtable_db = AirtableDatabase(API_KEY, BASE_ID)
channel_manager = ChannelManager(airtable_db,mode='salebot') #ВАРИАНТ 1
# channel_manager = ChannelManager(airtable_db,mode='selenium') #ВАРИАНТ 2
# gui = ChannelManagerGUI(channel_manager) # ТУТ запускается все в ChannelManager в manage_channels
def run_channel_manager():
"""Функция для запуска управления каналами в отдельном потоке."""
try:
channel_manager.manage_channels()
except Exception as e:
logger.error(f"Ошибка в manage_channels: {e}")
#
try:
threading.Thread(target=run_channel_manager, daemon=True).start()
# Запуск GUI
# gui.create_gui()
except KeyboardInterrupt:
logger.info("Остановка программы пользователем.")
finally:
channel_manager.close_all_channels()