Руководство по внедрению Telegram-бота с OCR для логистической системы
Данное руководство содержит пошаговые инструкции по созданию и настройке Telegram-бота с функциями OCR для автоматической обработки товарных накладных в групповом чате логистической компании.
Содержание
- Подготовка окружения
- Создание Telegram-бота
- Настройка Google Cloud Vision API
- Настройка Google Sheets API
- Разработка базовой структуры бота
- Реализация модуля OCR
- Разработка модуля обработки данных
- Интеграция с Google Sheets
- Тестирование и отладка
- Развертывание и запуск
1. Подготовка окружения
1.1. Установка необходимого ПО
# Обновление пакетов
sudo apt update
sudo apt upgrade -y
# Установка Python и необходимых инструментов
sudo apt install -y python3 python3-pip python3-venv git
# Создание директории проекта
mkdir -p ~/logistics_bot
cd ~/logistics_bot
# Создание виртуального окружения
python3 -m venv venv
source venv/bin/activate
# Создание файла requirements.txt
cat > requirements.txt << EOF
python-telegram-bot==13.15
google-cloud-vision==3.4.0
google-api-python-client==2.86.0
google-auth-httplib2==0.1.0
google-auth-oauthlib==1.0.0
opencv-python==4.7.0.72
numpy==1.24.3
pillow==9.5.0
spacy==3.5.3
SQLAlchemy==2.0.15
python-dotenv==1.0.0
EOF
# Установка зависимостей
pip install -r requirements.txt
# Установка языковой модели для spaCy (для обработки текстовых сообщений)
python -m spacy download ru_core_news_sm
1.2. Настройка структуры проекта
# Создание структуры директорий
mkdir -p src/{bot,ocr,data_processing,db,sheets,utils,config}
mkdir -p logs
mkdir -p data/{images,processed,failed}
mkdir -p tests
# Создание файла .env для хранения конфиденциальных данных
cat > .env << EOF
# Telegram Bot
TELEGRAM_BOT_TOKEN=your_bot_token_here
# Google Cloud
GOOGLE_APPLICATION_CREDENTIALS=path_to_credentials.json
# Google Sheets
GOOGLE_SHEETS_ID=your_sheet_id_here
# Database
DATABASE_URL=sqlite:///data/logistics_bot.db
# Logging
LOG_LEVEL=INFO
EOF
# Создание .gitignore
cat > .gitignore << EOF
# Виртуальное окружение
venv/
# Конфиденциальные данные
.env
*.json
!example_credentials.json
# Кэш Python
__pycache__/
*.py[cod]
*$py.class
# Логи
logs/
# Данные
data/images/
data/processed/
data/failed/
*.db
# IDE
.vscode/
.idea/
EOF
# Инициализация Git репозитория
git init
2. Создание Telegram-бота
2.1. Регистрация бота через BotFather
- Откройте Telegram и найдите бота @BotFather
- Отправьте команду
/newbot - Следуйте инструкциям для создания нового бота:
- Введите имя бота (например, "Logistics OCR Bot")
- Введите username бота (должен заканчиваться на "bot", например, "logistics_ocr_bot")
- BotFather предоставит вам токен бота - сохраните его в файле
.env
2.2. Настройка прав бота для групповых чатов
- Отправьте команду
/setprivacyботу @BotFather - Выберите вашего бота
- Выберите опцию "Disable" для разрешения боту видеть все сообщения в групповом чате
2.3. Создание базового файла бота
Создайте файл src/bot/bot.py:
import logging
import os
from dotenv import load_dotenv
from telegram import Update
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext
# Загрузка переменных окружения
load_dotenv()
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
# Настройка логирования
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO,
filename='logs/bot.log'
)
logger = logging.getLogger(__name__)
def start(update: Update, context: CallbackContext) -> None:
"""Обработчик команды /start"""
user = update.effective_user
update.message.reply_text(f'Привет, {user.first_name}! Я бот для обработки товарных накладных.')
def help_command(update: Update, context: CallbackContext) -> None:
"""Обработчик команды /help"""
update.message.reply_text('Отправьте фотографию товарной накладной в чат, и я автоматически обработаю её.')
def status(update: Update, context: CallbackContext) -> None:
"""Обработчик команды /status"""
update.message.reply_text('Бот работает нормально и готов к обработке изображений.')
def handle_photo(update: Update, context: CallbackContext) -> None:
"""Обработчик получения фотографий"""
user = update.effective_user
chat_id = update.effective_chat.id
message_id = update.message.message_id
date = update.message.date
logger.info(f"Получено изображение от {user.first_name} (ID: {user.id}) в чате {chat_id}")
# Получение файла с наибольшим разрешением
photo_file = update.message.photo[-1].get_file()
# Создание директории для сохранения, если её нет
os.makedirs('data/images', exist_ok=True)
# Формирование имени файла с использованием ID сообщения и даты
file_path = f"data/images/img_{chat_id}_{message_id}_{date.strftime('%Y%m%d_%H%M%S')}.jpg"
# Скачивание файла
photo_file.download(file_path)
logger.info(f"Изображение сохранено: {file_path}")
# Здесь будет вызов функции обработки изображения
# process_image(file_path, user.id, date)
# Временный ответ для тестирования
update.message.reply_text('Изображение получено и сохранено для обработки.')
def handle_text(update: Update, context: CallbackContext) -> None:
"""Обработчик текстовых сообщений"""
user = update.effective_user
text = update.message.text
date = update.message.date
logger.info(f"Получено текстовое сообщение от {user.first_name} (ID: {user.id}): {text[:50]}...")
# Здесь будет вызов функции обработки текста
# process_text_message(text, user.id, date)
def error_handler(update: Update, context: CallbackContext) -> None:
"""Обработчик ошибок"""
logger.error(f"Ошибка: {context.error} при обработке {update}")
def main() -> None:
"""Основная функция запуска бота"""
# Создание Updater и передача ему токена бота
updater = Updater(TELEGRAM_BOT_TOKEN)
# Получение диспетчера для регистрации обработчиков
dispatcher = updater.dispatcher
# Регистрация обработчиков команд
dispatcher.add_handler(CommandHandler("start", start))
dispatcher.add_handler(CommandHandler("help", help_command))
dispatcher.add_handler(CommandHandler("status", status))
# Регистрация обработчиков сообщений
dispatcher.add_handler(MessageHandler(Filters.photo, handle_photo))
dispatcher.add_handler(MessageHandler(Filters.text & ~Filters.command, handle_text))
# Регистрация обработчика ошибок
dispatcher.add_error_handler(error_handler)
# Запуск бота
updater.start_polling()
logger.info("Бот запущен")
# Бот работает до нажатия Ctrl-C
updater.idle()
if __name__ == '__main__':
main()
2.4. Создание файла запуска
Создайте файл run.py в корневой директории проекта:
from src.bot.bot import main
if __name__ == '__main__':
main()
3. Настройка Google Cloud Vision API
3.1. Создание проекта в Google Cloud Platform
- Перейдите на Google Cloud Console
- Создайте новый проект (или выберите существующий)
- Запишите ID проекта для дальнейшего использования
3.2. Активация Google Cloud Vision API
- В меню навигации выберите "APIs & Services" > "Library"
- Найдите "Cloud Vision API" и нажмите на него
- Нажмите кнопку "Enable" для активации API
3.3. Создание учетных данных
- В меню навигации выберите "APIs & Services" > "Credentials"
- Нажмите "Create Credentials" и выберите "Service Account"
- Заполните необходимую информацию и нажмите "Create"
- Добавьте роль "Cloud Vision API User" для сервисного аккаунта
- Нажмите "Continue" и затем "Done"
- В списке сервисных аккаунтов найдите созданный аккаунт и нажмите на его email
- Перейдите на вкладку "Keys" и нажмите "Add Key" > "Create new key"
- Выберите формат JSON и нажмите "Create"
- Файл с ключом будет автоматически загружен на ваш компьютер
- Переместите этот файл в директорию проекта и обновите путь в файле
.env
3.4. Создание модуля для работы с Vision API
Создайте файл src/ocr/vision_api.py:
import os
import io
import logging
from google.cloud import vision
from dotenv import load_dotenv
import cv2
import numpy as np
# Загрузка переменных окружения
load_dotenv()
# Настройка логирования
logger = logging.getLogger(__name__)
class VisionOCR:
def __init__(self):
"""Инициализация клиента Google Cloud Vision"""
self.client = vision.ImageAnnotatorClient()
logger.info("Инициализирован клиент Google Cloud Vision")
def preprocess_image(self, image_path):
"""Предобработка изображения для улучшения OCR"""
try:
# Чтение изображения
img = cv2.imread(image_path)
if img is None:
logger.error(f"Не удалось прочитать изображение: {image_path}")
return None
# Преобразование в оттенки серого
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Применение адаптивного порогового преобразования
thresh = cv2.adaptiveThreshold(
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2
)
# Удаление шума
kernel = np.ones((1, 1), np.uint8)
opening = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
# Сохранение обработанного изображения
processed_path = image_path.replace('/images/', '/processed/')
os.makedirs(os.path.dirname(processed_path), exist_ok=True)
cv2.imwrite(processed_path, opening)
logger.info(f"Изображение предобработано и сохранено: {processed_path}")
return processed_path
except Exception as e:
logger.error(f"Ошибка при предобработке изображения: {e}")
return None
def detect_text(self, image_path):
"""Распознавание текста на изображении"""
try:
# Предобработка изображения
processed_path = self.preprocess_image(image_path)
if not processed_path:
processed_path = image_path # Используем оригинал, если предобработка не удалась
# Чтение изображения
with io.open(processed_path, 'rb') as image_file:
content = image_file.read()
# Создание объекта изображения
image = vision.Image(content=content)
# Распознавание текста
response = self.client.text_detection(image=image)
texts = response.text_annotations
if not texts:
logger.warning(f"Текст не обнаружен на изображении: {image_path}")
return None
# Получение полного текста
full_text = texts[0].description
# Логирование результата
logger.info(f"Текст успешно распознан, {len(full_text)} символов")
# Проверка наличия ошибок
if response.error.message:
logger.error(f"Ошибка при распознавании: {response.error.message}")
return None
return full_text
except Exception as e:
logger.error(f"Ошибка при распознавании текста: {e}")
return None
# Пример использования
if __name__ == "__main__":
ocr = VisionOCR()
result = ocr.detect_text("path/to/test/image.jpg")
print(result)
4. Настройка Google Sheets API
4.1. Активация Google Sheets API
- В Google Cloud Console перейдите в "APIs & Services" > "Library"
- Найдите "Google Sheets API" и нажмите на него
- Нажмите кнопку "Enable" для активации API
4.2. Настройка учетных данных
Если вы уже создали сервисный аккаунт для Vision API, вы можете использовать его и для Sheets API:
- В меню навигации выберите "APIs & Services" > "Credentials"
- Найдите созданный ранее сервисный аккаунт и нажмите на его email
- Перейдите на вкладку "Keys" и убедитесь, что у вас есть ключ в формате JSON
- Если нет, создайте новый ключ как описано в разделе 3.3
4.3. Создание и настройка Google Sheets
- Перейдите на Google Sheets
- Создайте новую таблицу
- Настройте структуру таблицы (например, столбцы: Дата, Водитель, Нетто, Брутто, Тара, Статус)
- Скопируйте ID таблицы из URL (часть между /d/ и /edit)
- Добавьте сервисный аккаунт как редактора таблицы:
- Нажмите кнопку "Поделиться" в правом верхнем углу
- Введите email сервисного аккаунта
- Установите права "Редактор"
- Нажмите "Готово"
- Обновите GOOGLE_SHEETS_ID в файле
.env
4.4. Создание модуля для работы с Google Sheets
Создайте файл src/sheets/sheets_api.py:
import os
import logging
from dotenv import load_dotenv
from googleapiclient.discovery import build
from google.oauth2 import service_account
from datetime import datetime
# Загрузка переменных окружения
load_dotenv()
GOOGLE_SHEETS_ID = os.getenv("GOOGLE_SHEETS_ID")
GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
# Настройка логирования
logger = logging.getLogger(__name__)
class SheetsAPI:
def __init__(self):
"""Инициализация клиента Google Sheets API"""
try:
# Настройка учетных данных
credentials = service_account.Credentials.from_service_account_file(
GOOGLE_APPLICATION_CREDENTIALS,
scopes=['https://www.googleapis.com/auth/spreadsheets']
)
# Создание сервиса
self.service = build('sheets', 'v4', credentials=credentials)
self.sheet = self.service.spreadsheets()
logger.info("Инициализирован клиент Google Sheets API")
except Exception as e:
logger.error(f"Ошибка при инициализации Google Sheets API: {e}")
raise
def append_data(self, data, range_name='Sheet1!A:F'):
"""Добавление данных в таблицу"""
try:
# Подготовка данных для вставки
values = [data]
body = {
'values': values
}
# Вставка данных
result = self.sheet.values().append(
spreadsheetId=GOOGLE_SHEETS_ID,
range=range_name,
valueInputOption='USER_ENTERED',
insertDataOption='INSERT_ROWS',
body=body
).execute()
logger.info(f"Данные добавлены в таблицу: {result.get('updates').get('updatedRange')}")
return True
except Exception as e:
logger.error(f"Ошибка при добавлении данных в таблицу: {e}")
return False
def get_last_row(self, range_name='Sheet1!A:A'):
"""Получение номера последней заполненной строки"""
try:
result = self.sheet.values().get(
spreadsheetId=GOOGLE_SHEETS_ID,
range=range_name
).execute()
values = result.get('values', [])
return len(values)
except Exception as e:
logger.error(f"Ошибка при получении последней строки: {e}")
return 0
# Пример использования
if __name__ == "__main__":
sheets = SheetsAPI()
# Пример данных для добавления
data = [
datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # Дата и время
"Иванов И.И.", # Водитель
"1000", # Нетто
"1200", # Брутто
"200", # Тара
"Обработано" # Статус
]
sheets.append_data(data)
5. Разработка базовой структуры бота
5.1. Создание базы данных
Создайте файл src/db/database.py:
import os
import logging
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Float, Boolean, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from dotenv import load_dotenv
from datetime import datetime
# Загрузка переменных окружения
load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL")
# Настройка логирования
logger = logging.getLogger(__name__)
# Создание базового класса для моделей
Base = declarative_base()
class ProcessedImage(Base):
"""Модель для хранения информации об обработанных изображениях"""
__tablename__ = 'processed_images'
id = Column(Integer, primary_key=True)
file_path = Column(String(255), nullable=False)
file_hash = Column(String(64), nullable=False, unique=True)
user_id = Column(Integer, nullable=False)
chat_id = Column(Integer, nullable=False)
message_id = Column(Integer, nullable=False)
timestamp = Column(DateTime, default=datetime.utcnow)
processed = Column(Boolean, default=False)
netto = Column(Float, nullable=True)
brutto = Column(Float, nullable=True)
tara = Column(Float, nullable=True)
confidence = Column(Float, nullable=True)
raw_text = Column(Text, nullable=True)
status = Column(String(50), default='pending')
def __repr__(self):
return f"<ProcessedImage(id={self.id}, user_id={self.user_id}, processed={self.processed})>"
class UserStatus(Base):
"""Модель для хранения статусов водителей"""
__tablename__ = 'user_statuses'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, nullable=False, unique=True)
username = Column(String(100), nullable=True)
full_name = Column(String(255), nullable=True)
status = Column(String(50), default='inactive')
last_activity = Column(DateTime, default=datetime.utcnow)
location = Column(String(255), nullable=True)
def __repr__(self):
return f"<UserStatus(id={self.id}, user_id={self.user_id}, status={self.status})>"
class Database:
def __init__(self):
"""Инициализация базы данных"""
try:
self.engine = create_engine(DATABASE_URL)
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
logger.info("База данных инициализирована")
except Exception as e:
logger.error(f"Ошибка при инициализации базы данных: {e}")
raise
def add_processed_image(self, file_path, file_hash, user_id, chat_id, message_id):
"""Добавление информации об обработанном изображении"""
try:
session = self.Session()
image = ProcessedImage(
file_path=file_path,
file_hash=file_hash,
user_id=user_id,
chat_id=chat_id,
message_id=message_id
)
session.add(image)
session.commit()
logger.info(f"Добавлена информация об изображении: {file_path}")
return image.id
except Exception as e:
session.rollback()
logger.error(f"Ошибка при добавлении информации об изображении: {e}")
return None
finally:
session.close()
def update_image_processing_result(self, image_id, netto=None, brutto=None, tara=None,
confidence=None, raw_text=None, status='processed'):
"""Обновление результатов обработки изображения"""
try:
session = self.Session()
image = session.query(ProcessedImage).filter_by(id=image_id).first()
if not image:
logger.error(f"Изображение с ID {image_id} не найдено")
return False
image.processed = True
if netto is not None:
image.netto = netto
if brutto is not None:
image.brutto = brutto
if tara is not None:
image.tara = tara
if confidence is not None:
image.confidence = confidence
if raw_text is not None:
image.raw_text = raw_text
image.status = status
session.commit()
logger.info(f"Обновлены результаты обработки изображения ID {image_id}")
return True
except Exception as e:
session.rollback()
logger.error(f"Ошибка при обновлении результатов обработки: {e}")
return False
finally:
session.close()
def check_image_hash(self, file_hash):
"""Проверка, было ли изображение уже обработано"""
try:
session = self.Session()
image = session.query(ProcessedImage).filter_by(file_hash=file_hash).first()
return image is not None
except Exception as e:
logger.error(f"Ошибка при проверке хеша изображения: {e}")
return False
finally:
session.close()
def update_user_status(self, user_id, username, full_name, status='active', location=None):
"""Обновление статуса пользователя"""
try:
session = self.Session()
user = session.query(UserStatus).filter_by(user_id=user_id).first()
if not user:
user = UserStatus(
user_id=user_id,
username=username,
full_name=full_name
)
session.add(user)
user.status = status
user.last_activity = datetime.utcnow()
if location:
user.location = location
session.commit()
logger.info(f"Обновлен статус пользователя ID {user_id}: {status}")
return True
except Exception as e:
session.rollback()
logger.error(f"Ошибка при обновлении статуса пользователя: {e}")
return False
finally:
session.close()
# Пример использования
if __name__ == "__main__":
db = Database()
# Пример добавления изображения
image_id = db.add_processed_image(
file_path="data/images/test.jpg",
file_hash="abcdef123456",
user_id=12345,
chat_id=67890,
message_id=111
)
# Пример обновления результатов
db.update_image_processing_result(
image_id=image_id,
netto=1000,
brutto=1200,
tara=200,
confidence=0.95,
raw_text="Накладная №123\nНетто: 1000 кг\nБрутто: 1200 кг\nТара: 200 кг"
)
5.2. Создание утилит для обработки изображений
Создайте файл src/utils/image_utils.py:
import os
import hashlib
import logging
import cv2
import numpy as np
# Настройка логирования
logger = logging.getLogger(__name__)
def calculate_image_hash(image_path):
"""Вычисление хеша изображения для предотвращения повторной обработки"""
try:
with open(image_path, 'rb') as f:
return hashlib.md5(f.read()).hexdigest()
except Exception as e:
logger.error(f"Ошибка при вычислении хеша изображения: {e}")
return None
def is_waybill_image(image_path):
"""Определение, является ли изображение товарной накладной"""
try:
# Это простая заглушка. В реальном проекте здесь должна быть
# более сложная логика определения типа документа
# Например, можно использовать машинное обучение или поиск ключевых слов
# Пока просто возвращаем True для всех изображений
return True
except Exception as e:
logger.error(f"Ошибка при определении типа изображения: {e}")
return False
def enhance_image(image_path, output_path=None):
"""Улучшение качества изображения для OCR"""
try:
# Чтение изображения
img = cv2.imread(image_path)
if img is None:
logger.error(f"Не удалось прочитать изображение: {image_path}")
return None
# Преобразование в оттенки серого
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Применение гауссовского размытия для удаления шума
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
# Применение адаптивного порогового преобразования
thresh = cv2.adaptiveThreshold(
blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2
)
# Если выходной путь не указан, создаем его на основе входного
if output_path is None:
output_dir = os.path.dirname(image_path).replace('/images/', '/processed/')
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(
output_dir,
os.path.basename(image_path)
)
# Сохранение обработанного изображения
cv2.imwrite(output_path, thresh)
logger.info(f"Изображение улучшено и сохранено: {output_path}")
return output_path
except Exception as e:
logger.error(f"Ошибка при улучшении изображения: {e}")
return None
def detect_document_boundaries(image_path):
"""Обнаружение границ документа на изображении"""
try:
# Чтение изображения
img = cv2.imread(image_path)
if img is None:
logger.error(f"Не удалось прочитать изображение: {image_path}")
return None
# Преобразование в оттенки серого
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Размытие для уменьшения шума
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
# Обнаружение краев
edged = cv2.Canny(blurred, 75, 200)
# Поиск контуров
contours, _ = cv2.findContours(edged.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# Сортировка контуров по площади (от большего к меньшему)
contours = sorted(contours, key=cv2.contourArea, reverse=True)
# Переменная для хранения контура документа
doc_contour = None
# Перебор контуров
for contour in contours:
# Аппроксимация контура
perimeter = cv2.arcLength(contour, True)
approx = cv2.approxPolyDP(contour, 0.02 * perimeter, True)
# Если контур имеет 4 точки, предполагаем, что это документ
if len(approx) == 4:
doc_contour = approx
break
# Если контур документа не найден, возвращаем None
if doc_contour is None:
logger.warning(f"Границы документа не обнаружены: {image_path}")
return None
# Возвращаем координаты углов документа
return doc_contour.reshape(4, 2)
except Exception as e:
logger.error(f"Ошибка при обнаружении границ документа: {e}")
return None
# Пример использования
if __name__ == "__main__":
image_path = "data/images/test.jpg"
hash_value = calculate_image_hash(image_path)
print(f"Hash: {hash_value}")
enhanced_path = enhance_image(image_path)
print(f"Enhanced image: {enhanced_path}")
boundaries = detect_document_boundaries(image_path)
print(f"Document boundaries: {boundaries}")
5.3. Создание модуля обработки данных
Создайте файл src/data_processing/text_processor.py:
import re
import logging
from datetime import datetime
# Настройка логирования
logger = logging.getLogger(__name__)
class TextProcessor:
def __init__(self):
"""Инициализация обработчика текста"""
# Регулярные выражения для извлечения данных
self.netto_patterns = [
r'нетто[:\s]*(\d+(?:[.,]\d+)?)',
r'масса\s*нетто[:\s]*(\d+(?:[.,]\d+)?)',
r'вес\s*нетто[:\s]*(\d+(?:[.,]\d+)?)'
]
self.brutto_patterns = [
r'брутто[:\s]*(\d+(?:[.,]\d+)?)',
r'масса\s*брутто[:\s]*(\d+(?:[.,]\d+)?)',
r'вес\s*брутто[:\s]*(\d+(?:[.,]\d+)?)'
]
self.tara_patterns = [
r'тара[:\s]*(\d+(?:[.,]\d+)?)',
r'масса\s*тары[:\s]*(\d+(?:[.,]\d+)?)',
r'вес\s*тары[:\s]*(\d+(?:[.,]\d+)?)'
]
logger.info("Инициализирован обработчик текста")
def extract_netto(self, text):
"""Извлечение значения НЕТТО из текста"""
text = text.lower()
for pattern in self.netto_patterns:
match = re.search(pattern, text)
if match:
value_str = match.group(1).replace(',', '.')
try:
return float(value_str)
except ValueError:
continue
logger.warning("Не удалось извлечь значение НЕТТО из текста")
return None
def extract_brutto(self, text):
"""Извлечение значения БРУТТО из текста"""
text = text.lower()
for pattern in self.brutto_patterns:
match = re.search(pattern, text)
if match:
value_str = match.group(1).replace(',', '.')
try:
return float(value_str)
except ValueError:
continue
logger.warning("Не удалось извлечь значение БРУТТО из текста")
return None
def extract_tara(self, text):
"""Извлечение значения ТАРА из текста"""
text = text.lower()
for pattern in self.tara_patterns:
match = re.search(pattern, text)
if match:
value_str = match.group(1).replace(',', '.')
try:
return float(value_str)
except ValueError:
continue
logger.warning("Не удалось извлечь значение ТАРА из текста")
return None
def validate_values(self, netto, brutto, tara):
"""Валидация извлеченных значений"""
# Проверка наличия всех значений
if netto is None and (brutto is None or tara is None):
logger.warning("Недостаточно данных для валидации")
return False, None
# Если нетто отсутствует, но есть брутто и тара, вычисляем нетто
if netto is None and brutto is not None and tara is not None:
netto = brutto - tara
logger.info(f"Нетто вычислено: {netto} = {brutto} - {tara}")
# Если есть все три значения, проверяем соответствие
if netto is not None and brutto is not None and tara is not None:
# Допустимая погрешность (0.5%)
tolerance = max(netto, brutto, tara) * 0.005
if abs((brutto - tara) - netto) <= tolerance:
logger.info("Значения прошли валидацию")
return True, {
'netto': netto,
'brutto': brutto,
'tara': tara
}
else:
logger.warning(f"Значения не прошли валидацию: {netto} != {brutto} - {tara}")
return False, {
'netto': netto,
'brutto': brutto,
'tara': tara
}
# Если есть только нетто
if netto is not None:
logger.info("Доступно только значение нетто")
return True, {
'netto': netto,
'brutto': None,
'tara': None
}
logger.warning("Не удалось валидировать значения")
return False, None
def process_text(self, text):
"""Обработка распознанного текста"""
try:
# Извлечение значений
netto = self.extract_netto(text)
brutto = self.extract_brutto(text)
tara = self.extract_tara(text)
# Валидация значений
is_valid, values = self.validate_values(netto, brutto, tara)
# Расчет уровня достоверности (простая эвристика)
confidence = 0.0
if is_valid and values:
if values['netto'] is not None:
confidence += 0.4
if values['brutto'] is not None:
confidence += 0.3
if values['tara'] is not None:
confidence += 0.3
return {
'is_valid': is_valid,
'values': values,
'confidence': confidence
}
except Exception as e:
logger.error(f"Ошибка при обработке текста: {e}")
return {
'is_valid': False,
'values': None,
'confidence': 0.0
}
def analyze_message_text(self, text):
"""Анализ текстового сообщения для определения типа"""
text = text.lower()
# Определение типа сообщения
if re.search(r'загруз|погруз', text):
return 'loading', 0.8
elif re.search(r'выгруз|разгруз', text):
return 'unloading', 0.8
elif re.search(r'готов|свобод|доступ', text):
return 'ready', 0.7
else:
return 'unknown', 0.3
# Пример использования
if __name__ == "__main__":
processor = TextProcessor()
# Пример текста накладной
sample_text = """
Товарная накладная №123
Дата: 15.03.2023
Масса брутто: 1200 кг
Масса тары: 200 кг
Масса нетто: 1000 кг
"""
result = processor.process_text(sample_text)
print(f"Результат обработки: {result}")
# Пример текстового сообщения
message_text = "Загрузился на складе №5, выезжаю"
message_type, confidence = processor.analyze_message_text(message_text)
print(f"Тип сообщения: {message_type}, уверенность: {confidence}")
5.4. Интеграция компонентов в основной модуль бота
Обновите файл src/bot/bot.py:
import logging
import os
import hashlib
from datetime import datetime
from dotenv import load_dotenv
from telegram import Update, ParseMode
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext
# Импорт модулей проекта
from src.ocr.vision_api import VisionOCR
from src.data_processing.text_processor import TextProcessor
from src.sheets.sheets_api import SheetsAPI
from src.db.database import Database
from src.utils.image_utils import calculate_image_hash, is_waybill_image, enhance_image
# Загрузка переменных окружения
load_dotenv()
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
# Настройка логирования
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO,
filename='logs/bot.log'
)
logger = logging.getLogger(__name__)
# Инициализация компонентов
ocr = VisionOCR()
text_processor = TextProcessor()
sheets_api = SheetsAPI()
db = Database()
def start(update: Update, context: CallbackContext) -> None:
"""Обработчик команды /start"""
user = update.effective_user
update.message.reply_text(
f'Привет, {user.first_name}! Я бот для обработки товарных накладных.\n\n'
f'Я автоматически обрабатываю все фотографии накладных, отправленные в чат, '
f'и извлекаю из них данные о весе (нетто, брутто, тара).\n\n'
f'Для получения справки используйте команду /help.'
)
def help_command(update: Update, context: CallbackContext) -> None:
"""Обработчик команды /help"""
update.message.reply_text(
'Я автоматически обрабатываю все фотографии товарных накладных, '
'отправленные в групповой чат.\n\n'
'Доступные команды:\n'
'/start - Начало работы с ботом\n'
'/help - Вывод этой справки\n'
'/status - Проверка статуса работы бота\n'
'/report - Получение статистики обработанных накладных (только для администраторов)\n'
'/settings - Настройка параметров бота (только для администраторов)'
)
def status(update: Update, context: CallbackContext) -> None:
"""Обработчик команды /status"""
update.message.reply_text(
'✅ Бот работает нормально и готов к обработке изображений.\n\n'
'Система автоматически обрабатывает все фотографии накладных, '
'отправленные в чат, и извлекает из них данные о весе.'
)
def report(update: Update, context: CallbackContext) -> None:
"""Обработчик команды /report - статистика обработанных накладных"""
# Здесь должна быть проверка прав администратора
# Для простоты примера она опущена
# Получение статистики из базы данных
# Это заглушка, в реальном проекте здесь будет запрос к БД
total_processed = 100
successful = 85
failed = 15
success_rate = (successful / total_processed) * 100
update.message.reply_text(
f'📊 *Статистика обработки накладных*\n\n'
f'Всего обработано: {total_processed}\n'
f'Успешно: {successful}\n'
f'С ошибками: {failed}\n'
f'Процент успеха: {success_rate:.1f}%',
parse_mode=ParseMode.MARKDOWN
)
def settings(update: Update, context: CallbackContext) -> None:
"""Обработчик команды /settings - настройка параметров бота"""
# Здесь должна быть проверка прав администратора
# Для простоты примера она опущена
update.message.reply_text(
'⚙️ *Настройки бота*\n\n'
'Текущие настройки:\n'
'- Автоматическая обработка изображений: Включено\n'
'- Уведомления об ошибках: Включено\n'
'- Минимальный уровень достоверности: 0.6\n\n'
'Для изменения настроек обратитесь к администратору системы.',
parse_mode=ParseMode.MARKDOWN
)
def process_image(file_path, user_id, chat_id, message_id, timestamp):
"""Обработка изображения накладной"""
try:
# Вычисление хеша изображения
file_hash = calculate_image_hash(file_path)
# Проверка, было ли изображение уже обработано
if db.check_image_hash(file_hash):
logger.info(f"Изображение уже было обработано: {file_path}")
return "duplicate", None
# Проверка, является ли изображение накладной
if not is_waybill_image(file_path):
logger.info(f"Изображение не является накладной: {file_path}")
return "not_waybill", None
# Добавление информации об изображении в базу данных
image_id = db.add_processed_image(file_path, file_hash, user_id, chat_id, message_id)
# Распознавание текста
text = ocr.detect_text(file_path)
if not text:
logger.warning(f"Не удалось распознать текст: {file_path}")
db.update_image_processing_result(
image_id=image_id,
status='failed_ocr',
raw_text=None
)
return "ocr_failed", None
# Обработка распознанного текста
result = text_processor.process_text(text)
# Обновление результатов в базе данных
if result['is_valid'] and result['values']:
values = result['values']
db.update_image_processing_result(
image_id=image_id,
netto=values.get('netto'),
brutto=values.get('brutto'),
tara=values.get('tara'),
confidence=result['confidence'],
raw_text=text,
status='processed'
)
# Подготовка данных для записи в Google Sheets
sheet_data = [
timestamp.strftime("%Y-%m-%d %H:%M:%S"), # Дата и время отправки изображения
str(user_id), # ID пользователя
str(values.get('netto', '')), # Нетто
str(values.get('brutto', '')), # Брутто
str(values.get('tara', '')), # Тара
"Обработано" # Статус
]
# Запись данных в Google Sheets
sheets_api.append_data(sheet_data)
return "success", values
else:
db.update_image_processing_result(
image_id=image_id,
raw_text=text,
status='invalid_data'
)
return "invalid_data", None
except Exception as e:
logger.error(f"Ошибка при обработке изображения: {e}")
return "error", None
def handle_photo(update: Update, context: CallbackContext) -> None:
"""Обработчик получения фотографий"""
user = update.effective_user
chat_id = update.effective_chat.id
message_id = update.message.message_id
timestamp = update.message.date
logger.info(f"Получено изображение от {user.first_name} (ID: {user.id}) в чате {chat_id}")
# Получение файла с наибольшим разрешением
photo_file = update.message.photo[-1].get_file()
# Создание директории для сохранения, если её нет
os.makedirs('data/images', exist_ok=True)
# Формирование имени файла с использованием ID сообщения и даты
file_path = f"data/images/img_{chat_id}_{message_id}_{timestamp.strftime('%Y%m%d_%H%M%S')}.jpg"
# Скачивание файла
photo_file.download(file_path)
logger.info(f"Изображение сохранено: {file_path}")
# Обработка изображения
status, values = process_image(file_path, user.id, chat_id, message_id, timestamp)
# Отправка ответа в зависимости от результата обработки
if status == "success":
netto = values.get('netto', 'Н/Д')
brutto = values.get('brutto', 'Н/Д')
tara = values.get('tara', 'Н/Д')
update.message.reply_text(
f"✅ Накладная успешно обработана\n"
f"Нетто: {netto}\n"
f"Брутто: {brutto}\n"
f"Тара: {tara}\n"
f"Дата: {timestamp.strftime('%Y-%m-%d %H:%M:%S')}"
)
elif status == "duplicate":
update.message.reply_text("⚠️ Это изображение уже было обработано ранее.")
elif status == "not_waybill":
# Для пассивного мониторинга лучше не отвечать на изображения, не являющиеся накладными
pass
elif status == "ocr_failed":
# Уведомление только для администраторов
# В реальном проекте здесь должна быть проверка прав администратора
context.bot.send_message(
chat_id=user.id, # Отправка личного сообщения пользователю
text="⚠️ Не удалось распознать текст на изображении. Изображение сохранено для ручной обработки."
)
elif status == "invalid_data":
# Уведомление только для администраторов
context.bot.send_message(
chat_id=user.id, # Отправка личного сообщения пользователю
text="⚠️ Не удалось извлечь корректные данные из накладной. Требуется ручная проверка."
)
else:
# Уведомление только для администраторов
context.bot.send_message(
chat_id=user.id, # Отправка личного сообщения пользователю
text="❌ Произошла ошибка при обработке изображения. Проверьте логи."
)
def handle_text(update: Update, context: CallbackContext) -> None:
"""Обработчик текстовых сообщений"""
user = update.effective_user
text = update.message.text
timestamp = update.message.date
logger.info(f"Получено текстовое сообщение от {user.first_name} (ID: {user.id}): {text[:50]}...")
# Анализ текста сообщения
message_type, confidence = text_processor.analyze_message_text(text)
# Если уверенность в типе сообщения достаточно высокая
if confidence >= 0.6:
# Обновление статуса пользователя
db.update_user_status(
user_id=user.id,
username=user.username,
full_name=f"{user.first_name} {user.last_name or ''}",
status=message_type
)
# Подготовка данных для записи в Google Sheets
sheet_data = [
timestamp.strftime("%Y-%m-%d %H:%M:%S"), # Дата и время сообщения
str(user.id), # ID пользователя
"", # Нетто (пусто для текстовых сообщений)
"", # Брутто (пусто для текстовых сообщений)
"", # Тара (пусто для текстовых сообщений)
message_type.capitalize() # Статус
]
# Запись данных в Google Sheets
sheets_api.append_data(sheet_data)
# Добавление реакции на сообщение (если поддерживается API)
try:
if message_type == 'loading':
update.message.reply_text("👍 Загрузка зафиксирована")
elif message_type == 'unloading':
update.message.reply_text("👍 Выгрузка зафиксирована")
elif message_type == 'ready':
update.message.reply_text("👍 Статус готовности обновлен")
except Exception as e:
logger.error(f"Ошибка при добавлении реакции: {e}")
def error_handler(update: Update, context: CallbackContext) -> None:
"""Обработчик ошибок"""
logger.error(f"Ошибка: {context.error} при обработке {update}")
def main() -> None:
"""Основная функция запуска бота"""
# Создание Updater и передача ему токена бота
updater = Updater(TELEGRAM_BOT_TOKEN)
# Получение диспетчера для регистрации обработчиков
dispatcher = updater.dispatcher
# Регистрация обработчиков команд
dispatcher.add_handler(CommandHandler("start", start))
dispatcher.add_handler(CommandHandler("help", help_command))
dispatcher.add_handler(CommandHandler("status", status))
dispatcher.add_handler(CommandHandler("report", report))
dispatcher.add_handler(CommandHandler("settings", settings))
# Регистрация обработчиков сообщений
dispatcher.add_handler(MessageHandler(Filters.photo, handle_photo))
dispatcher.add_handler(MessageHandler(Filters.text & ~Filters.command, handle_text))
# Регистрация обработчика ошибок
dispatcher.add_error_handler(error_handler)
# Запуск бота
updater.start_polling()
logger.info("Бот запущен")
# Бот работает до нажатия Ctrl-C
updater.idle()
if __name__ == '__main__':
main()
6. Реализация модуля OCR
Модуль OCR уже был создан в разделе 3.4 (src/ocr/vision_api.py). Он включает в себя функции для предобработки изображений и распознавания текста с использованием Google Cloud Vision API.
7. Разработка модуля обработки данных
Модуль обработки данных уже был создан в разделе 5.3 (src/data_processing/text_processor.py). Он включает в себя функции для извлечения значений нетто, брутто и тары из распознанного текста, а также для валидации этих значений.
8. Интеграция с Google Sheets
Модуль интеграции с Google Sheets уже был создан в разделе 4.4 (src/sheets/sheets_api.py). Он включает в себя функции для подключения к Google Sheets API и записи данных в таблицу.
9. Тестирование и отладка
9.1. Создание тестовых скриптов
Создайте файл tests/test_ocr.py:
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.ocr.vision_api import VisionOCR
from src.data_processing.text_processor import TextProcessor
def test_ocr_pipeline():
"""Тестирование полного цикла OCR"""
# Путь к тестовому изображению
test_image = "tests/test_data/sample_waybill.jpg"
# Проверка наличия тестового изображения
if not os.path.exists(test_image):
print(f"Тестовое изображение не найдено: {test_image}")
return False
# Инициализация OCR
ocr = VisionOCR()
# Распознавание текста
text = ocr.detect_text(test_image)
if not text:
print("Не удалось распознать текст")
return False
print(f"Распознанный текст:\n{text[:500]}...")
# Инициализация обработчика текста
processor = TextProcessor()
# Обработка текста
result = processor.process_text(text)
print(f"Результат обработки: {result}")
# Проверка результата
if result['is_valid'] and result['values']:
print("Тест пройден успешно!")
return True
else:
print("Тест не пройден: не удалось извлечь валидные данные")
return False
if __name__ == "__main__":
# Создание директории для тестовых данных, если её нет
os.makedirs("tests/test_data", exist_ok=True)
# Проверка наличия тестового изображения
test_image = "tests/test_data/sample_waybill.jpg"
if not os.path.exists(test_image):
print(f"Пожалуйста, поместите тестовое изображение накладной в {test_image}")
print("Затем запустите этот скрипт снова")
exit(1)
# Запуск теста
test_ocr_pipeline()
Создайте файл tests/test_sheets.py:
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.sheets.sheets_api import SheetsAPI
from datetime import datetime
def test_sheets_api():
"""Тестирование интеграции с Google Sheets"""
try:
# Инициализация API
sheets = SheetsAPI()
# Тестовые данные
test_data = [
datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # Дата и время
"Test User", # Водитель
"1000", # Нетто
"1200", # Брутто
"200", # Тара
"Test" # Статус
]
# Запись данных
result = sheets.append_data(test_data)
if result:
print("Тест пройден успешно! Данные записаны в таблицу.")
return True
else:
print("Тест не пройден: не удалось записать данные в таблицу")
return False
except Exception as e:
print(f"Ошибка при тестировании: {e}")
return False
if __name__ == "__main__":
test_sheets_api()
9.2. Создание скрипта для тестирования всей системы
Создайте файл tests/test_full_system.py:
import os
import sys
import time
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.ocr.vision_api import VisionOCR
from src.data_processing.text_processor import TextProcessor
from src.sheets.sheets_api import SheetsAPI
from src.db.database import Database
from src.utils.image_utils import calculate_image_hash
from datetime import datetime
def test_full_system():
"""Тестирование полного цикла обработки изображения"""
try:
# Инициализация компонентов
ocr = VisionOCR()
text_processor = TextProcessor()
sheets_api = SheetsAPI()
db = Database()
# Путь к тестовому изображению
test_image = "tests/test_data/sample_waybill.jpg"
# Проверка наличия тестового изображения
if not os.path.exists(test_image):
print(f"Тестовое изображение не найдено: {test_image}")
return False
# Вычисление хеша изображения
file_hash = calculate_image_hash(test_image)
# Добавление информации об изображении в базу данных
image_id = db.add_processed_image(
file_path=test_image,
file_hash=file_hash,
user_id=12345,
chat_id=67890,
message_id=111
)
if not image_id:
print("Не удалось добавить информацию об изображении в базу данных")
return False
print(f"Изображение добавлено в базу данных, ID: {image_id}")
# Распознавание текста
text = ocr.detect_text(test_image)
if not text:
print("Не удалось распознать текст")
return False
print(f"Распознанный текст:\n{text[:500]}...")
# Обработка текста
result = text_processor.process_text(text)
if not result['is_valid'] or not result['values']:
print("Не удалось извлечь валидные данные из текста")
return False
print(f"Результат обработки: {result}")
# Обновление результатов в базе данных
values = result['values']
db.update_image_processing_result(
image_id=image_id,
netto=values.get('netto'),
brutto=values.get('brutto'),
tara=values.get('tara'),
confidence=result['confidence'],
raw_text=text,
status='processed'
)
# Подготовка данных для записи в Google Sheets
sheet_data = [
datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # Дата и время
"12345", # ID пользователя
str(values.get('netto', '')), # Нетто
str(values.get('brutto', '')), # Брутто
str(values.get('tara', '')), # Тара
"Test" # Статус
]
# Запись данных в Google Sheets
sheets_result = sheets_api.append_data(sheet_data)
if not sheets_result:
print("Не удалось записать данные в Google Sheets")
return False
print("Данные успешно записаны в Google Sheets")
print("\nТест полного цикла обработки пройден успешно!")
return True
except Exception as e:
print(f"Ошибка при тестировании: {e}")
return False
if __name__ == "__main__":
test_full_system()
9.3. Запуск тестов
# Создание директории для тестовых данных
mkdir -p tests/test_data
# Запуск тестов
python tests/test_ocr.py
python tests/test_sheets.py
python tests/test_full_system.py
10. Развертывание и запуск
10.1. Создание Dockerfile
Создайте файл Dockerfile в корневой директории проекта:
FROM python:3.10-slim
WORKDIR /app
# Установка необходимых пакетов
RUN apt-get update && apt-get install -y \
libgl1-mesa-glx \
libglib2.0-0 \
&& rm -rf /var/lib/apt/lists/*
# Копирование файлов проекта
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Установка языковой модели для spaCy
RUN python -m spacy download ru_core_news_sm
# Копирование остальных файлов
COPY . .
# Создание необходимых директорий
RUN mkdir -p data/images data/processed data/failed logs
# Запуск бота
CMD ["python", "run.py"]
10.2. Создание docker-compose.yml
Создайте файл docker-compose.yml в корневой директории проекта:
version: '3'
services:
logistics-bot:
build: .
volumes:
- ./data:/app/data
- ./logs:/app/logs
- ./.env:/app/.env
- ./credentials.json:/app/credentials.json
restart: always
10.3. Запуск бота
# Запуск в режиме разработки
python run.py
# Запуск с использованием Docker
docker-compose up -d
10.4. Мониторинг логов
# Просмотр логов бота
tail -f logs/bot.log
# Просмотр логов Docker-контейнера
docker-compose logs -f
Заключение
В этом руководстве мы рассмотрели все этапы создания Telegram-бота с OCR для логистической системы, который автоматически обрабатывает изображения товарных накладных в групповом чате и извлекает из них данные о весе (нетто, брутто, тара).
Бот использует пассивный мониторинг чата, не требуя специальных команд от водителей, и автоматически записывает извлеченные данные в Google Sheets, используя дату и время отправки изображения как дату загрузки/выгрузки.
Для повышения точности распознавания используется Google Cloud Vision API, который хорошо справляется с распознаванием рукописных цифр и изображений низкого качества.
Система также включает в себя механизмы валидации данных, обработки ошибок и предотвращения повторной обработки одних и тех же изображений.
Для развертывания бота можно использовать Docker, что упрощает процесс установки и обновления.