Logistics Bot Documentation

Telegram-bot with OCR for logistics system

Руководство по внедрению Telegram-бота с OCR для логистической системы

Данное руководство содержит пошаговые инструкции по созданию и настройке Telegram-бота с функциями OCR для автоматической обработки товарных накладных в групповом чате логистической компании.

Содержание

  1. Подготовка окружения
  2. Создание Telegram-бота
  3. Настройка Google Cloud Vision API
  4. Настройка Google Sheets API
  5. Разработка базовой структуры бота
  6. Реализация модуля OCR
  7. Разработка модуля обработки данных
  8. Интеграция с Google Sheets
  9. Тестирование и отладка
  10. Развертывание и запуск

1. Подготовка окружения

1.1. Установка необходимого ПО

# Обновление пакетов
sudo apt update
sudo apt upgrade -y

# Установка Python и необходимых инструментов
sudo apt install -y python3 python3-pip python3-venv git

# Создание директории проекта
mkdir -p ~/logistics_bot
cd ~/logistics_bot

# Создание виртуального окружения
python3 -m venv venv
source venv/bin/activate

# Создание файла requirements.txt
cat > requirements.txt << EOF
python-telegram-bot==13.15
google-cloud-vision==3.4.0
google-api-python-client==2.86.0
google-auth-httplib2==0.1.0
google-auth-oauthlib==1.0.0
opencv-python==4.7.0.72
numpy==1.24.3
pillow==9.5.0
spacy==3.5.3
SQLAlchemy==2.0.15
python-dotenv==1.0.0
EOF

# Установка зависимостей
pip install -r requirements.txt

# Установка языковой модели для spaCy (для обработки текстовых сообщений)
python -m spacy download ru_core_news_sm

1.2. Настройка структуры проекта

# Создание структуры директорий
mkdir -p src/{bot,ocr,data_processing,db,sheets,utils,config}
mkdir -p logs
mkdir -p data/{images,processed,failed}
mkdir -p tests

# Создание файла .env для хранения конфиденциальных данных
cat > .env << EOF
# Telegram Bot
TELEGRAM_BOT_TOKEN=your_bot_token_here

# Google Cloud
GOOGLE_APPLICATION_CREDENTIALS=path_to_credentials.json

# Google Sheets
GOOGLE_SHEETS_ID=your_sheet_id_here

# Database
DATABASE_URL=sqlite:///data/logistics_bot.db

# Logging
LOG_LEVEL=INFO
EOF

# Создание .gitignore
cat > .gitignore << EOF
# Виртуальное окружение
venv/

# Конфиденциальные данные
.env
*.json
!example_credentials.json

# Кэш Python
__pycache__/
*.py[cod]
*$py.class

# Логи
logs/

# Данные
data/images/
data/processed/
data/failed/
*.db

# IDE
.vscode/
.idea/
EOF

# Инициализация Git репозитория
git init

2. Создание Telegram-бота

2.1. Регистрация бота через BotFather

  1. Откройте Telegram и найдите бота @BotFather
  2. Отправьте команду /newbot
  3. Следуйте инструкциям для создания нового бота:
    • Введите имя бота (например, "Logistics OCR Bot")
    • Введите username бота (должен заканчиваться на "bot", например, "logistics_ocr_bot")
  4. BotFather предоставит вам токен бота - сохраните его в файле .env

2.2. Настройка прав бота для групповых чатов

  1. Отправьте команду /setprivacy боту @BotFather
  2. Выберите вашего бота
  3. Выберите опцию "Disable" для разрешения боту видеть все сообщения в групповом чате

2.3. Создание базового файла бота

Создайте файл src/bot/bot.py:

import logging
import os
from dotenv import load_dotenv
from telegram import Update
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext

# Загрузка переменных окружения
load_dotenv()
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")

# Настройка логирования
logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    level=logging.INFO,
    filename='logs/bot.log'
)
logger = logging.getLogger(__name__)

def start(update: Update, context: CallbackContext) -> None:
    """Обработчик команды /start"""
    user = update.effective_user
    update.message.reply_text(f'Привет, {user.first_name}! Я бот для обработки товарных накладных.')

def help_command(update: Update, context: CallbackContext) -> None:
    """Обработчик команды /help"""
    update.message.reply_text('Отправьте фотографию товарной накладной в чат, и я автоматически обработаю её.')

def status(update: Update, context: CallbackContext) -> None:
    """Обработчик команды /status"""
    update.message.reply_text('Бот работает нормально и готов к обработке изображений.')

def handle_photo(update: Update, context: CallbackContext) -> None:
    """Обработчик получения фотографий"""
    user = update.effective_user
    chat_id = update.effective_chat.id
    message_id = update.message.message_id
    date = update.message.date
    
    logger.info(f"Получено изображение от {user.first_name} (ID: {user.id}) в чате {chat_id}")
    
    # Получение файла с наибольшим разрешением
    photo_file = update.message.photo[-1].get_file()
    
    # Создание директории для сохранения, если её нет
    os.makedirs('data/images', exist_ok=True)
    
    # Формирование имени файла с использованием ID сообщения и даты
    file_path = f"data/images/img_{chat_id}_{message_id}_{date.strftime('%Y%m%d_%H%M%S')}.jpg"
    
    # Скачивание файла
    photo_file.download(file_path)
    logger.info(f"Изображение сохранено: {file_path}")
    
    # Здесь будет вызов функции обработки изображения
    # process_image(file_path, user.id, date)
    
    # Временный ответ для тестирования
    update.message.reply_text('Изображение получено и сохранено для обработки.')

def handle_text(update: Update, context: CallbackContext) -> None:
    """Обработчик текстовых сообщений"""
    user = update.effective_user
    text = update.message.text
    date = update.message.date
    
    logger.info(f"Получено текстовое сообщение от {user.first_name} (ID: {user.id}): {text[:50]}...")
    
    # Здесь будет вызов функции обработки текста
    # process_text_message(text, user.id, date)

def error_handler(update: Update, context: CallbackContext) -> None:
    """Обработчик ошибок"""
    logger.error(f"Ошибка: {context.error} при обработке {update}")

def main() -> None:
    """Основная функция запуска бота"""
    # Создание Updater и передача ему токена бота
    updater = Updater(TELEGRAM_BOT_TOKEN)
    
    # Получение диспетчера для регистрации обработчиков
    dispatcher = updater.dispatcher
    
    # Регистрация обработчиков команд
    dispatcher.add_handler(CommandHandler("start", start))
    dispatcher.add_handler(CommandHandler("help", help_command))
    dispatcher.add_handler(CommandHandler("status", status))
    
    # Регистрация обработчиков сообщений
    dispatcher.add_handler(MessageHandler(Filters.photo, handle_photo))
    dispatcher.add_handler(MessageHandler(Filters.text & ~Filters.command, handle_text))
    
    # Регистрация обработчика ошибок
    dispatcher.add_error_handler(error_handler)
    
    # Запуск бота
    updater.start_polling()
    logger.info("Бот запущен")
    
    # Бот работает до нажатия Ctrl-C
    updater.idle()

if __name__ == '__main__':
    main()

2.4. Создание файла запуска

Создайте файл run.py в корневой директории проекта:

from src.bot.bot import main

if __name__ == '__main__':
    main()

3. Настройка Google Cloud Vision API

3.1. Создание проекта в Google Cloud Platform

  1. Перейдите на Google Cloud Console
  2. Создайте новый проект (или выберите существующий)
  3. Запишите ID проекта для дальнейшего использования

3.2. Активация Google Cloud Vision API

  1. В меню навигации выберите "APIs & Services" > "Library"
  2. Найдите "Cloud Vision API" и нажмите на него
  3. Нажмите кнопку "Enable" для активации API

3.3. Создание учетных данных

  1. В меню навигации выберите "APIs & Services" > "Credentials"
  2. Нажмите "Create Credentials" и выберите "Service Account"
  3. Заполните необходимую информацию и нажмите "Create"
  4. Добавьте роль "Cloud Vision API User" для сервисного аккаунта
  5. Нажмите "Continue" и затем "Done"
  6. В списке сервисных аккаунтов найдите созданный аккаунт и нажмите на его email
  7. Перейдите на вкладку "Keys" и нажмите "Add Key" > "Create new key"
  8. Выберите формат JSON и нажмите "Create"
  9. Файл с ключом будет автоматически загружен на ваш компьютер
  10. Переместите этот файл в директорию проекта и обновите путь в файле .env

3.4. Создание модуля для работы с Vision API

Создайте файл src/ocr/vision_api.py:

import os
import io
import logging
from google.cloud import vision
from dotenv import load_dotenv
import cv2
import numpy as np

# Загрузка переменных окружения
load_dotenv()

# Настройка логирования
logger = logging.getLogger(__name__)

class VisionOCR:
    def __init__(self):
        """Инициализация клиента Google Cloud Vision"""
        self.client = vision.ImageAnnotatorClient()
        logger.info("Инициализирован клиент Google Cloud Vision")
    
    def preprocess_image(self, image_path):
        """Предобработка изображения для улучшения OCR"""
        try:
            # Чтение изображения
            img = cv2.imread(image_path)
            if img is None:
                logger.error(f"Не удалось прочитать изображение: {image_path}")
                return None
            
            # Преобразование в оттенки серого
            gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
            
            # Применение адаптивного порогового преобразования
            thresh = cv2.adaptiveThreshold(
                gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, 
                cv2.THRESH_BINARY, 11, 2
            )
            
            # Удаление шума
            kernel = np.ones((1, 1), np.uint8)
            opening = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
            
            # Сохранение обработанного изображения
            processed_path = image_path.replace('/images/', '/processed/')
            os.makedirs(os.path.dirname(processed_path), exist_ok=True)
            cv2.imwrite(processed_path, opening)
            
            logger.info(f"Изображение предобработано и сохранено: {processed_path}")
            return processed_path
        
        except Exception as e:
            logger.error(f"Ошибка при предобработке изображения: {e}")
            return None
    
    def detect_text(self, image_path):
        """Распознавание текста на изображении"""
        try:
            # Предобработка изображения
            processed_path = self.preprocess_image(image_path)
            if not processed_path:
                processed_path = image_path  # Используем оригинал, если предобработка не удалась
            
            # Чтение изображения
            with io.open(processed_path, 'rb') as image_file:
                content = image_file.read()
            
            # Создание объекта изображения
            image = vision.Image(content=content)
            
            # Распознавание текста
            response = self.client.text_detection(image=image)
            texts = response.text_annotations
            
            if not texts:
                logger.warning(f"Текст не обнаружен на изображении: {image_path}")
                return None
            
            # Получение полного текста
            full_text = texts[0].description
            
            # Логирование результата
            logger.info(f"Текст успешно распознан, {len(full_text)} символов")
            
            # Проверка наличия ошибок
            if response.error.message:
                logger.error(f"Ошибка при распознавании: {response.error.message}")
                return None
            
            return full_text
        
        except Exception as e:
            logger.error(f"Ошибка при распознавании текста: {e}")
            return None

# Пример использования
if __name__ == "__main__":
    ocr = VisionOCR()
    result = ocr.detect_text("path/to/test/image.jpg")
    print(result)

4. Настройка Google Sheets API

4.1. Активация Google Sheets API

  1. В Google Cloud Console перейдите в "APIs & Services" > "Library"
  2. Найдите "Google Sheets API" и нажмите на него
  3. Нажмите кнопку "Enable" для активации API

4.2. Настройка учетных данных

Если вы уже создали сервисный аккаунт для Vision API, вы можете использовать его и для Sheets API:

  1. В меню навигации выберите "APIs & Services" > "Credentials"
  2. Найдите созданный ранее сервисный аккаунт и нажмите на его email
  3. Перейдите на вкладку "Keys" и убедитесь, что у вас есть ключ в формате JSON
  4. Если нет, создайте новый ключ как описано в разделе 3.3

4.3. Создание и настройка Google Sheets

  1. Перейдите на Google Sheets
  2. Создайте новую таблицу
  3. Настройте структуру таблицы (например, столбцы: Дата, Водитель, Нетто, Брутто, Тара, Статус)
  4. Скопируйте ID таблицы из URL (часть между /d/ и /edit)
  5. Добавьте сервисный аккаунт как редактора таблицы:
    • Нажмите кнопку "Поделиться" в правом верхнем углу
    • Введите email сервисного аккаунта
    • Установите права "Редактор"
    • Нажмите "Готово"
  6. Обновите GOOGLE_SHEETS_ID в файле .env

4.4. Создание модуля для работы с Google Sheets

Создайте файл src/sheets/sheets_api.py:

import os
import logging
from dotenv import load_dotenv
from googleapiclient.discovery import build
from google.oauth2 import service_account
from datetime import datetime

# Загрузка переменных окружения
load_dotenv()
GOOGLE_SHEETS_ID = os.getenv("GOOGLE_SHEETS_ID")
GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")

# Настройка логирования
logger = logging.getLogger(__name__)

class SheetsAPI:
    def __init__(self):
        """Инициализация клиента Google Sheets API"""
        try:
            # Настройка учетных данных
            credentials = service_account.Credentials.from_service_account_file(
                GOOGLE_APPLICATION_CREDENTIALS,
                scopes=['https://www.googleapis.com/auth/spreadsheets']
            )
            
            # Создание сервиса
            self.service = build('sheets', 'v4', credentials=credentials)
            self.sheet = self.service.spreadsheets()
            logger.info("Инициализирован клиент Google Sheets API")
        
        except Exception as e:
            logger.error(f"Ошибка при инициализации Google Sheets API: {e}")
            raise
    
    def append_data(self, data, range_name='Sheet1!A:F'):
        """Добавление данных в таблицу"""
        try:
            # Подготовка данных для вставки
            values = [data]
            body = {
                'values': values
            }
            
            # Вставка данных
            result = self.sheet.values().append(
                spreadsheetId=GOOGLE_SHEETS_ID,
                range=range_name,
                valueInputOption='USER_ENTERED',
                insertDataOption='INSERT_ROWS',
                body=body
            ).execute()
            
            logger.info(f"Данные добавлены в таблицу: {result.get('updates').get('updatedRange')}")
            return True
        
        except Exception as e:
            logger.error(f"Ошибка при добавлении данных в таблицу: {e}")
            return False
    
    def get_last_row(self, range_name='Sheet1!A:A'):
        """Получение номера последней заполненной строки"""
        try:
            result = self.sheet.values().get(
                spreadsheetId=GOOGLE_SHEETS_ID,
                range=range_name
            ).execute()
            
            values = result.get('values', [])
            return len(values)
        
        except Exception as e:
            logger.error(f"Ошибка при получении последней строки: {e}")
            return 0

# Пример использования
if __name__ == "__main__":
    sheets = SheetsAPI()
    
    # Пример данных для добавления
    data = [
        datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Дата и время
        "Иванов И.И.",  # Водитель
        "1000",  # Нетто
        "1200",  # Брутто
        "200",   # Тара
        "Обработано"  # Статус
    ]
    
    sheets.append_data(data)

5. Разработка базовой структуры бота

5.1. Создание базы данных

Создайте файл src/db/database.py:

import os
import logging
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Float, Boolean, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from dotenv import load_dotenv
from datetime import datetime

# Загрузка переменных окружения
load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL")

# Настройка логирования
logger = logging.getLogger(__name__)

# Создание базового класса для моделей
Base = declarative_base()

class ProcessedImage(Base):
    """Модель для хранения информации об обработанных изображениях"""
    __tablename__ = 'processed_images'
    
    id = Column(Integer, primary_key=True)
    file_path = Column(String(255), nullable=False)
    file_hash = Column(String(64), nullable=False, unique=True)
    user_id = Column(Integer, nullable=False)
    chat_id = Column(Integer, nullable=False)
    message_id = Column(Integer, nullable=False)
    timestamp = Column(DateTime, default=datetime.utcnow)
    processed = Column(Boolean, default=False)
    netto = Column(Float, nullable=True)
    brutto = Column(Float, nullable=True)
    tara = Column(Float, nullable=True)
    confidence = Column(Float, nullable=True)
    raw_text = Column(Text, nullable=True)
    status = Column(String(50), default='pending')
    
    def __repr__(self):
        return f"<ProcessedImage(id={self.id}, user_id={self.user_id}, processed={self.processed})>"

class UserStatus(Base):
    """Модель для хранения статусов водителей"""
    __tablename__ = 'user_statuses'
    
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, nullable=False, unique=True)
    username = Column(String(100), nullable=True)
    full_name = Column(String(255), nullable=True)
    status = Column(String(50), default='inactive')
    last_activity = Column(DateTime, default=datetime.utcnow)
    location = Column(String(255), nullable=True)
    
    def __repr__(self):
        return f"<UserStatus(id={self.id}, user_id={self.user_id}, status={self.status})>"

class Database:
    def __init__(self):
        """Инициализация базы данных"""
        try:
            self.engine = create_engine(DATABASE_URL)
            Base.metadata.create_all(self.engine)
            self.Session = sessionmaker(bind=self.engine)
            logger.info("База данных инициализирована")
        
        except Exception as e:
            logger.error(f"Ошибка при инициализации базы данных: {e}")
            raise
    
    def add_processed_image(self, file_path, file_hash, user_id, chat_id, message_id):
        """Добавление информации об обработанном изображении"""
        try:
            session = self.Session()
            image = ProcessedImage(
                file_path=file_path,
                file_hash=file_hash,
                user_id=user_id,
                chat_id=chat_id,
                message_id=message_id
            )
            session.add(image)
            session.commit()
            logger.info(f"Добавлена информация об изображении: {file_path}")
            return image.id
        
        except Exception as e:
            session.rollback()
            logger.error(f"Ошибка при добавлении информации об изображении: {e}")
            return None
        
        finally:
            session.close()
    
    def update_image_processing_result(self, image_id, netto=None, brutto=None, tara=None, 
                                      confidence=None, raw_text=None, status='processed'):
        """Обновление результатов обработки изображения"""
        try:
            session = self.Session()
            image = session.query(ProcessedImage).filter_by(id=image_id).first()
            
            if not image:
                logger.error(f"Изображение с ID {image_id} не найдено")
                return False
            
            image.processed = True
            if netto is not None:
                image.netto = netto
            if brutto is not None:
                image.brutto = brutto
            if tara is not None:
                image.tara = tara
            if confidence is not None:
                image.confidence = confidence
            if raw_text is not None:
                image.raw_text = raw_text
            
            image.status = status
            session.commit()
            logger.info(f"Обновлены результаты обработки изображения ID {image_id}")
            return True
        
        except Exception as e:
            session.rollback()
            logger.error(f"Ошибка при обновлении результатов обработки: {e}")
            return False
        
        finally:
            session.close()
    
    def check_image_hash(self, file_hash):
        """Проверка, было ли изображение уже обработано"""
        try:
            session = self.Session()
            image = session.query(ProcessedImage).filter_by(file_hash=file_hash).first()
            return image is not None
        
        except Exception as e:
            logger.error(f"Ошибка при проверке хеша изображения: {e}")
            return False
        
        finally:
            session.close()
    
    def update_user_status(self, user_id, username, full_name, status='active', location=None):
        """Обновление статуса пользователя"""
        try:
            session = self.Session()
            user = session.query(UserStatus).filter_by(user_id=user_id).first()
            
            if not user:
                user = UserStatus(
                    user_id=user_id,
                    username=username,
                    full_name=full_name
                )
                session.add(user)
            
            user.status = status
            user.last_activity = datetime.utcnow()
            if location:
                user.location = location
            
            session.commit()
            logger.info(f"Обновлен статус пользователя ID {user_id}: {status}")
            return True
        
        except Exception as e:
            session.rollback()
            logger.error(f"Ошибка при обновлении статуса пользователя: {e}")
            return False
        
        finally:
            session.close()

# Пример использования
if __name__ == "__main__":
    db = Database()
    # Пример добавления изображения
    image_id = db.add_processed_image(
        file_path="data/images/test.jpg",
        file_hash="abcdef123456",
        user_id=12345,
        chat_id=67890,
        message_id=111
    )
    
    # Пример обновления результатов
    db.update_image_processing_result(
        image_id=image_id,
        netto=1000,
        brutto=1200,
        tara=200,
        confidence=0.95,
        raw_text="Накладная №123\nНетто: 1000 кг\nБрутто: 1200 кг\nТара: 200 кг"
    )

5.2. Создание утилит для обработки изображений

Создайте файл src/utils/image_utils.py:

import os
import hashlib
import logging
import cv2
import numpy as np

# Настройка логирования
logger = logging.getLogger(__name__)

def calculate_image_hash(image_path):
    """Вычисление хеша изображения для предотвращения повторной обработки"""
    try:
        with open(image_path, 'rb') as f:
            return hashlib.md5(f.read()).hexdigest()
    except Exception as e:
        logger.error(f"Ошибка при вычислении хеша изображения: {e}")
        return None

def is_waybill_image(image_path):
    """Определение, является ли изображение товарной накладной"""
    try:
        # Это простая заглушка. В реальном проекте здесь должна быть
        # более сложная логика определения типа документа
        # Например, можно использовать машинное обучение или поиск ключевых слов
        
        # Пока просто возвращаем True для всех изображений
        return True
    except Exception as e:
        logger.error(f"Ошибка при определении типа изображения: {e}")
        return False

def enhance_image(image_path, output_path=None):
    """Улучшение качества изображения для OCR"""
    try:
        # Чтение изображения
        img = cv2.imread(image_path)
        if img is None:
            logger.error(f"Не удалось прочитать изображение: {image_path}")
            return None
        
        # Преобразование в оттенки серого
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        
        # Применение гауссовского размытия для удаления шума
        blurred = cv2.GaussianBlur(gray, (5, 5), 0)
        
        # Применение адаптивного порогового преобразования
        thresh = cv2.adaptiveThreshold(
            blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, 
            cv2.THRESH_BINARY, 11, 2
        )
        
        # Если выходной путь не указан, создаем его на основе входного
        if output_path is None:
            output_dir = os.path.dirname(image_path).replace('/images/', '/processed/')
            os.makedirs(output_dir, exist_ok=True)
            output_path = os.path.join(
                output_dir, 
                os.path.basename(image_path)
            )
        
        # Сохранение обработанного изображения
        cv2.imwrite(output_path, thresh)
        logger.info(f"Изображение улучшено и сохранено: {output_path}")
        
        return output_path
    
    except Exception as e:
        logger.error(f"Ошибка при улучшении изображения: {e}")
        return None

def detect_document_boundaries(image_path):
    """Обнаружение границ документа на изображении"""
    try:
        # Чтение изображения
        img = cv2.imread(image_path)
        if img is None:
            logger.error(f"Не удалось прочитать изображение: {image_path}")
            return None
        
        # Преобразование в оттенки серого
        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        
        # Размытие для уменьшения шума
        blurred = cv2.GaussianBlur(gray, (5, 5), 0)
        
        # Обнаружение краев
        edged = cv2.Canny(blurred, 75, 200)
        
        # Поиск контуров
        contours, _ = cv2.findContours(edged.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        # Сортировка контуров по площади (от большего к меньшему)
        contours = sorted(contours, key=cv2.contourArea, reverse=True)
        
        # Переменная для хранения контура документа
        doc_contour = None
        
        # Перебор контуров
        for contour in contours:
            # Аппроксимация контура
            perimeter = cv2.arcLength(contour, True)
            approx = cv2.approxPolyDP(contour, 0.02 * perimeter, True)
            
            # Если контур имеет 4 точки, предполагаем, что это документ
            if len(approx) == 4:
                doc_contour = approx
                break
        
        # Если контур документа не найден, возвращаем None
        if doc_contour is None:
            logger.warning(f"Границы документа не обнаружены: {image_path}")
            return None
        
        # Возвращаем координаты углов документа
        return doc_contour.reshape(4, 2)
    
    except Exception as e:
        logger.error(f"Ошибка при обнаружении границ документа: {e}")
        return None

# Пример использования
if __name__ == "__main__":
    image_path = "data/images/test.jpg"
    hash_value = calculate_image_hash(image_path)
    print(f"Hash: {hash_value}")
    
    enhanced_path = enhance_image(image_path)
    print(f"Enhanced image: {enhanced_path}")
    
    boundaries = detect_document_boundaries(image_path)
    print(f"Document boundaries: {boundaries}")

5.3. Создание модуля обработки данных

Создайте файл src/data_processing/text_processor.py:

import re
import logging
from datetime import datetime

# Настройка логирования
logger = logging.getLogger(__name__)

class TextProcessor:
    def __init__(self):
        """Инициализация обработчика текста"""
        # Регулярные выражения для извлечения данных
        self.netto_patterns = [
            r'нетто[:\s]*(\d+(?:[.,]\d+)?)',
            r'масса\s*нетто[:\s]*(\d+(?:[.,]\d+)?)',
            r'вес\s*нетто[:\s]*(\d+(?:[.,]\d+)?)'
        ]
        
        self.brutto_patterns = [
            r'брутто[:\s]*(\d+(?:[.,]\d+)?)',
            r'масса\s*брутто[:\s]*(\d+(?:[.,]\d+)?)',
            r'вес\s*брутто[:\s]*(\d+(?:[.,]\d+)?)'
        ]
        
        self.tara_patterns = [
            r'тара[:\s]*(\d+(?:[.,]\d+)?)',
            r'масса\s*тары[:\s]*(\d+(?:[.,]\d+)?)',
            r'вес\s*тары[:\s]*(\d+(?:[.,]\d+)?)'
        ]
        
        logger.info("Инициализирован обработчик текста")
    
    def extract_netto(self, text):
        """Извлечение значения НЕТТО из текста"""
        text = text.lower()
        
        for pattern in self.netto_patterns:
            match = re.search(pattern, text)
            if match:
                value_str = match.group(1).replace(',', '.')
                try:
                    return float(value_str)
                except ValueError:
                    continue
        
        logger.warning("Не удалось извлечь значение НЕТТО из текста")
        return None
    
    def extract_brutto(self, text):
        """Извлечение значения БРУТТО из текста"""
        text = text.lower()
        
        for pattern in self.brutto_patterns:
            match = re.search(pattern, text)
            if match:
                value_str = match.group(1).replace(',', '.')
                try:
                    return float(value_str)
                except ValueError:
                    continue
        
        logger.warning("Не удалось извлечь значение БРУТТО из текста")
        return None
    
    def extract_tara(self, text):
        """Извлечение значения ТАРА из текста"""
        text = text.lower()
        
        for pattern in self.tara_patterns:
            match = re.search(pattern, text)
            if match:
                value_str = match.group(1).replace(',', '.')
                try:
                    return float(value_str)
                except ValueError:
                    continue
        
        logger.warning("Не удалось извлечь значение ТАРА из текста")
        return None
    
    def validate_values(self, netto, brutto, tara):
        """Валидация извлеченных значений"""
        # Проверка наличия всех значений
        if netto is None and (brutto is None or tara is None):
            logger.warning("Недостаточно данных для валидации")
            return False, None
        
        # Если нетто отсутствует, но есть брутто и тара, вычисляем нетто
        if netto is None and brutto is not None and tara is not None:
            netto = brutto - tara
            logger.info(f"Нетто вычислено: {netto} = {brutto} - {tara}")
        
        # Если есть все три значения, проверяем соответствие
        if netto is not None and brutto is not None and tara is not None:
            # Допустимая погрешность (0.5%)
            tolerance = max(netto, brutto, tara) * 0.005
            
            if abs((brutto - tara) - netto) <= tolerance:
                logger.info("Значения прошли валидацию")
                return True, {
                    'netto': netto,
                    'brutto': brutto,
                    'tara': tara
                }
            else:
                logger.warning(f"Значения не прошли валидацию: {netto} != {brutto} - {tara}")
                return False, {
                    'netto': netto,
                    'brutto': brutto,
                    'tara': tara
                }
        
        # Если есть только нетто
        if netto is not None:
            logger.info("Доступно только значение нетто")
            return True, {
                'netto': netto,
                'brutto': None,
                'tara': None
            }
        
        logger.warning("Не удалось валидировать значения")
        return False, None
    
    def process_text(self, text):
        """Обработка распознанного текста"""
        try:
            # Извлечение значений
            netto = self.extract_netto(text)
            brutto = self.extract_brutto(text)
            tara = self.extract_tara(text)
            
            # Валидация значений
            is_valid, values = self.validate_values(netto, brutto, tara)
            
            # Расчет уровня достоверности (простая эвристика)
            confidence = 0.0
            if is_valid and values:
                if values['netto'] is not None:
                    confidence += 0.4
                if values['brutto'] is not None:
                    confidence += 0.3
                if values['tara'] is not None:
                    confidence += 0.3
            
            return {
                'is_valid': is_valid,
                'values': values,
                'confidence': confidence
            }
        
        except Exception as e:
            logger.error(f"Ошибка при обработке текста: {e}")
            return {
                'is_valid': False,
                'values': None,
                'confidence': 0.0
            }
    
    def analyze_message_text(self, text):
        """Анализ текстового сообщения для определения типа"""
        text = text.lower()
        
        # Определение типа сообщения
        if re.search(r'загруз|погруз', text):
            return 'loading', 0.8
        elif re.search(r'выгруз|разгруз', text):
            return 'unloading', 0.8
        elif re.search(r'готов|свобод|доступ', text):
            return 'ready', 0.7
        else:
            return 'unknown', 0.3

# Пример использования
if __name__ == "__main__":
    processor = TextProcessor()
    
    # Пример текста накладной
    sample_text = """
    Товарная накладная №123
    Дата: 15.03.2023
    
    Масса брутто: 1200 кг
    Масса тары: 200 кг
    Масса нетто: 1000 кг
    """
    
    result = processor.process_text(sample_text)
    print(f"Результат обработки: {result}")
    
    # Пример текстового сообщения
    message_text = "Загрузился на складе №5, выезжаю"
    message_type, confidence = processor.analyze_message_text(message_text)
    print(f"Тип сообщения: {message_type}, уверенность: {confidence}")

5.4. Интеграция компонентов в основной модуль бота

Обновите файл src/bot/bot.py:

import logging
import os
import hashlib
from datetime import datetime
from dotenv import load_dotenv
from telegram import Update, ParseMode
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext

# Импорт модулей проекта
from src.ocr.vision_api import VisionOCR
from src.data_processing.text_processor import TextProcessor
from src.sheets.sheets_api import SheetsAPI
from src.db.database import Database
from src.utils.image_utils import calculate_image_hash, is_waybill_image, enhance_image

# Загрузка переменных окружения
load_dotenv()
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")

# Настройка логирования
logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    level=logging.INFO,
    filename='logs/bot.log'
)
logger = logging.getLogger(__name__)

# Инициализация компонентов
ocr = VisionOCR()
text_processor = TextProcessor()
sheets_api = SheetsAPI()
db = Database()

def start(update: Update, context: CallbackContext) -> None:
    """Обработчик команды /start"""
    user = update.effective_user
    update.message.reply_text(
        f'Привет, {user.first_name}! Я бот для обработки товарных накладных.\n\n'
        f'Я автоматически обрабатываю все фотографии накладных, отправленные в чат, '
        f'и извлекаю из них данные о весе (нетто, брутто, тара).\n\n'
        f'Для получения справки используйте команду /help.'
    )

def help_command(update: Update, context: CallbackContext) -> None:
    """Обработчик команды /help"""
    update.message.reply_text(
        'Я автоматически обрабатываю все фотографии товарных накладных, '
        'отправленные в групповой чат.\n\n'
        'Доступные команды:\n'
        '/start - Начало работы с ботом\n'
        '/help - Вывод этой справки\n'
        '/status - Проверка статуса работы бота\n'
        '/report - Получение статистики обработанных накладных (только для администраторов)\n'
        '/settings - Настройка параметров бота (только для администраторов)'
    )

def status(update: Update, context: CallbackContext) -> None:
    """Обработчик команды /status"""
    update.message.reply_text(
        '✅ Бот работает нормально и готов к обработке изображений.\n\n'
        'Система автоматически обрабатывает все фотографии накладных, '
        'отправленные в чат, и извлекает из них данные о весе.'
    )

def report(update: Update, context: CallbackContext) -> None:
    """Обработчик команды /report - статистика обработанных накладных"""
    # Здесь должна быть проверка прав администратора
    # Для простоты примера она опущена
    
    # Получение статистики из базы данных
    # Это заглушка, в реальном проекте здесь будет запрос к БД
    total_processed = 100
    successful = 85
    failed = 15
    success_rate = (successful / total_processed) * 100
    
    update.message.reply_text(
        f'📊 *Статистика обработки накладных*\n\n'
        f'Всего обработано: {total_processed}\n'
        f'Успешно: {successful}\n'
        f'С ошибками: {failed}\n'
        f'Процент успеха: {success_rate:.1f}%',
        parse_mode=ParseMode.MARKDOWN
    )

def settings(update: Update, context: CallbackContext) -> None:
    """Обработчик команды /settings - настройка параметров бота"""
    # Здесь должна быть проверка прав администратора
    # Для простоты примера она опущена
    
    update.message.reply_text(
        '⚙️ *Настройки бота*\n\n'
        'Текущие настройки:\n'
        '- Автоматическая обработка изображений: Включено\n'
        '- Уведомления об ошибках: Включено\n'
        '- Минимальный уровень достоверности: 0.6\n\n'
        'Для изменения настроек обратитесь к администратору системы.',
        parse_mode=ParseMode.MARKDOWN
    )

def process_image(file_path, user_id, chat_id, message_id, timestamp):
    """Обработка изображения накладной"""
    try:
        # Вычисление хеша изображения
        file_hash = calculate_image_hash(file_path)
        
        # Проверка, было ли изображение уже обработано
        if db.check_image_hash(file_hash):
            logger.info(f"Изображение уже было обработано: {file_path}")
            return "duplicate", None
        
        # Проверка, является ли изображение накладной
        if not is_waybill_image(file_path):
            logger.info(f"Изображение не является накладной: {file_path}")
            return "not_waybill", None
        
        # Добавление информации об изображении в базу данных
        image_id = db.add_processed_image(file_path, file_hash, user_id, chat_id, message_id)
        
        # Распознавание текста
        text = ocr.detect_text(file_path)
        if not text:
            logger.warning(f"Не удалось распознать текст: {file_path}")
            db.update_image_processing_result(
                image_id=image_id,
                status='failed_ocr',
                raw_text=None
            )
            return "ocr_failed", None
        
        # Обработка распознанного текста
        result = text_processor.process_text(text)
        
        # Обновление результатов в базе данных
        if result['is_valid'] and result['values']:
            values = result['values']
            db.update_image_processing_result(
                image_id=image_id,
                netto=values.get('netto'),
                brutto=values.get('brutto'),
                tara=values.get('tara'),
                confidence=result['confidence'],
                raw_text=text,
                status='processed'
            )
            
            # Подготовка данных для записи в Google Sheets
            sheet_data = [
                timestamp.strftime("%Y-%m-%d %H:%M:%S"),  # Дата и время отправки изображения
                str(user_id),  # ID пользователя
                str(values.get('netto', '')),  # Нетто
                str(values.get('brutto', '')),  # Брутто
                str(values.get('tara', '')),  # Тара
                "Обработано"  # Статус
            ]
            
            # Запись данных в Google Sheets
            sheets_api.append_data(sheet_data)
            
            return "success", values
        else:
            db.update_image_processing_result(
                image_id=image_id,
                raw_text=text,
                status='invalid_data'
            )
            return "invalid_data", None
    
    except Exception as e:
        logger.error(f"Ошибка при обработке изображения: {e}")
        return "error", None

def handle_photo(update: Update, context: CallbackContext) -> None:
    """Обработчик получения фотографий"""
    user = update.effective_user
    chat_id = update.effective_chat.id
    message_id = update.message.message_id
    timestamp = update.message.date
    
    logger.info(f"Получено изображение от {user.first_name} (ID: {user.id}) в чате {chat_id}")
    
    # Получение файла с наибольшим разрешением
    photo_file = update.message.photo[-1].get_file()
    
    # Создание директории для сохранения, если её нет
    os.makedirs('data/images', exist_ok=True)
    
    # Формирование имени файла с использованием ID сообщения и даты
    file_path = f"data/images/img_{chat_id}_{message_id}_{timestamp.strftime('%Y%m%d_%H%M%S')}.jpg"
    
    # Скачивание файла
    photo_file.download(file_path)
    logger.info(f"Изображение сохранено: {file_path}")
    
    # Обработка изображения
    status, values = process_image(file_path, user.id, chat_id, message_id, timestamp)
    
    # Отправка ответа в зависимости от результата обработки
    if status == "success":
        netto = values.get('netto', 'Н/Д')
        brutto = values.get('brutto', 'Н/Д')
        tara = values.get('tara', 'Н/Д')
        
        update.message.reply_text(
            f"✅ Накладная успешно обработана\n"
            f"Нетто: {netto}\n"
            f"Брутто: {brutto}\n"
            f"Тара: {tara}\n"
            f"Дата: {timestamp.strftime('%Y-%m-%d %H:%M:%S')}"
        )
    elif status == "duplicate":
        update.message.reply_text("⚠️ Это изображение уже было обработано ранее.")
    elif status == "not_waybill":
        # Для пассивного мониторинга лучше не отвечать на изображения, не являющиеся накладными
        pass
    elif status == "ocr_failed":
        # Уведомление только для администраторов
        # В реальном проекте здесь должна быть проверка прав администратора
        context.bot.send_message(
            chat_id=user.id,  # Отправка личного сообщения пользователю
            text="⚠️ Не удалось распознать текст на изображении. Изображение сохранено для ручной обработки."
        )
    elif status == "invalid_data":
        # Уведомление только для администраторов
        context.bot.send_message(
            chat_id=user.id,  # Отправка личного сообщения пользователю
            text="⚠️ Не удалось извлечь корректные данные из накладной. Требуется ручная проверка."
        )
    else:
        # Уведомление только для администраторов
        context.bot.send_message(
            chat_id=user.id,  # Отправка личного сообщения пользователю
            text="❌ Произошла ошибка при обработке изображения. Проверьте логи."
        )

def handle_text(update: Update, context: CallbackContext) -> None:
    """Обработчик текстовых сообщений"""
    user = update.effective_user
    text = update.message.text
    timestamp = update.message.date
    
    logger.info(f"Получено текстовое сообщение от {user.first_name} (ID: {user.id}): {text[:50]}...")
    
    # Анализ текста сообщения
    message_type, confidence = text_processor.analyze_message_text(text)
    
    # Если уверенность в типе сообщения достаточно высокая
    if confidence >= 0.6:
        # Обновление статуса пользователя
        db.update_user_status(
            user_id=user.id,
            username=user.username,
            full_name=f"{user.first_name} {user.last_name or ''}",
            status=message_type
        )
        
        # Подготовка данных для записи в Google Sheets
        sheet_data = [
            timestamp.strftime("%Y-%m-%d %H:%M:%S"),  # Дата и время сообщения
            str(user.id),  # ID пользователя
            "",  # Нетто (пусто для текстовых сообщений)
            "",  # Брутто (пусто для текстовых сообщений)
            "",  # Тара (пусто для текстовых сообщений)
            message_type.capitalize()  # Статус
        ]
        
        # Запись данных в Google Sheets
        sheets_api.append_data(sheet_data)
        
        # Добавление реакции на сообщение (если поддерживается API)
        try:
            if message_type == 'loading':
                update.message.reply_text("👍 Загрузка зафиксирована")
            elif message_type == 'unloading':
                update.message.reply_text("👍 Выгрузка зафиксирована")
            elif message_type == 'ready':
                update.message.reply_text("👍 Статус готовности обновлен")
        except Exception as e:
            logger.error(f"Ошибка при добавлении реакции: {e}")

def error_handler(update: Update, context: CallbackContext) -> None:
    """Обработчик ошибок"""
    logger.error(f"Ошибка: {context.error} при обработке {update}")

def main() -> None:
    """Основная функция запуска бота"""
    # Создание Updater и передача ему токена бота
    updater = Updater(TELEGRAM_BOT_TOKEN)
    
    # Получение диспетчера для регистрации обработчиков
    dispatcher = updater.dispatcher
    
    # Регистрация обработчиков команд
    dispatcher.add_handler(CommandHandler("start", start))
    dispatcher.add_handler(CommandHandler("help", help_command))
    dispatcher.add_handler(CommandHandler("status", status))
    dispatcher.add_handler(CommandHandler("report", report))
    dispatcher.add_handler(CommandHandler("settings", settings))
    
    # Регистрация обработчиков сообщений
    dispatcher.add_handler(MessageHandler(Filters.photo, handle_photo))
    dispatcher.add_handler(MessageHandler(Filters.text & ~Filters.command, handle_text))
    
    # Регистрация обработчика ошибок
    dispatcher.add_error_handler(error_handler)
    
    # Запуск бота
    updater.start_polling()
    logger.info("Бот запущен")
    
    # Бот работает до нажатия Ctrl-C
    updater.idle()

if __name__ == '__main__':
    main()

6. Реализация модуля OCR

Модуль OCR уже был создан в разделе 3.4 (src/ocr/vision_api.py). Он включает в себя функции для предобработки изображений и распознавания текста с использованием Google Cloud Vision API.

7. Разработка модуля обработки данных

Модуль обработки данных уже был создан в разделе 5.3 (src/data_processing/text_processor.py). Он включает в себя функции для извлечения значений нетто, брутто и тары из распознанного текста, а также для валидации этих значений.

8. Интеграция с Google Sheets

Модуль интеграции с Google Sheets уже был создан в разделе 4.4 (src/sheets/sheets_api.py). Он включает в себя функции для подключения к Google Sheets API и записи данных в таблицу.

9. Тестирование и отладка

9.1. Создание тестовых скриптов

Создайте файл tests/test_ocr.py:

import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from src.ocr.vision_api import VisionOCR
from src.data_processing.text_processor import TextProcessor

def test_ocr_pipeline():
    """Тестирование полного цикла OCR"""
    # Путь к тестовому изображению
    test_image = "tests/test_data/sample_waybill.jpg"
    
    # Проверка наличия тестового изображения
    if not os.path.exists(test_image):
        print(f"Тестовое изображение не найдено: {test_image}")
        return False
    
    # Инициализация OCR
    ocr = VisionOCR()
    
    # Распознавание текста
    text = ocr.detect_text(test_image)
    if not text:
        print("Не удалось распознать текст")
        return False
    
    print(f"Распознанный текст:\n{text[:500]}...")
    
    # Инициализация обработчика текста
    processor = TextProcessor()
    
    # Обработка текста
    result = processor.process_text(text)
    
    print(f"Результат обработки: {result}")
    
    # Проверка результата
    if result['is_valid'] and result['values']:
        print("Тест пройден успешно!")
        return True
    else:
        print("Тест не пройден: не удалось извлечь валидные данные")
        return False

if __name__ == "__main__":
    # Создание директории для тестовых данных, если её нет
    os.makedirs("tests/test_data", exist_ok=True)
    
    # Проверка наличия тестового изображения
    test_image = "tests/test_data/sample_waybill.jpg"
    if not os.path.exists(test_image):
        print(f"Пожалуйста, поместите тестовое изображение накладной в {test_image}")
        print("Затем запустите этот скрипт снова")
        exit(1)
    
    # Запуск теста
    test_ocr_pipeline()

Создайте файл tests/test_sheets.py:

import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from src.sheets.sheets_api import SheetsAPI
from datetime import datetime

def test_sheets_api():
    """Тестирование интеграции с Google Sheets"""
    try:
        # Инициализация API
        sheets = SheetsAPI()
        
        # Тестовые данные
        test_data = [
            datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Дата и время
            "Test User",  # Водитель
            "1000",  # Нетто
            "1200",  # Брутто
            "200",   # Тара
            "Test"   # Статус
        ]
        
        # Запись данных
        result = sheets.append_data(test_data)
        
        if result:
            print("Тест пройден успешно! Данные записаны в таблицу.")
            return True
        else:
            print("Тест не пройден: не удалось записать данные в таблицу")
            return False
    
    except Exception as e:
        print(f"Ошибка при тестировании: {e}")
        return False

if __name__ == "__main__":
    test_sheets_api()

9.2. Создание скрипта для тестирования всей системы

Создайте файл tests/test_full_system.py:

import os
import sys
import time
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from src.ocr.vision_api import VisionOCR
from src.data_processing.text_processor import TextProcessor
from src.sheets.sheets_api import SheetsAPI
from src.db.database import Database
from src.utils.image_utils import calculate_image_hash
from datetime import datetime

def test_full_system():
    """Тестирование полного цикла обработки изображения"""
    try:
        # Инициализация компонентов
        ocr = VisionOCR()
        text_processor = TextProcessor()
        sheets_api = SheetsAPI()
        db = Database()
        
        # Путь к тестовому изображению
        test_image = "tests/test_data/sample_waybill.jpg"
        
        # Проверка наличия тестового изображения
        if not os.path.exists(test_image):
            print(f"Тестовое изображение не найдено: {test_image}")
            return False
        
        # Вычисление хеша изображения
        file_hash = calculate_image_hash(test_image)
        
        # Добавление информации об изображении в базу данных
        image_id = db.add_processed_image(
            file_path=test_image,
            file_hash=file_hash,
            user_id=12345,
            chat_id=67890,
            message_id=111
        )
        
        if not image_id:
            print("Не удалось добавить информацию об изображении в базу данных")
            return False
        
        print(f"Изображение добавлено в базу данных, ID: {image_id}")
        
        # Распознавание текста
        text = ocr.detect_text(test_image)
        if not text:
            print("Не удалось распознать текст")
            return False
        
        print(f"Распознанный текст:\n{text[:500]}...")
        
        # Обработка текста
        result = text_processor.process_text(text)
        
        if not result['is_valid'] or not result['values']:
            print("Не удалось извлечь валидные данные из текста")
            return False
        
        print(f"Результат обработки: {result}")
        
        # Обновление результатов в базе данных
        values = result['values']
        db.update_image_processing_result(
            image_id=image_id,
            netto=values.get('netto'),
            brutto=values.get('brutto'),
            tara=values.get('tara'),
            confidence=result['confidence'],
            raw_text=text,
            status='processed'
        )
        
        # Подготовка данных для записи в Google Sheets
        sheet_data = [
            datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Дата и время
            "12345",  # ID пользователя
            str(values.get('netto', '')),  # Нетто
            str(values.get('brutto', '')),  # Брутто
            str(values.get('tara', '')),  # Тара
            "Test"  # Статус
        ]
        
        # Запись данных в Google Sheets
        sheets_result = sheets_api.append_data(sheet_data)
        
        if not sheets_result:
            print("Не удалось записать данные в Google Sheets")
            return False
        
        print("Данные успешно записаны в Google Sheets")
        
        print("\nТест полного цикла обработки пройден успешно!")
        return True
    
    except Exception as e:
        print(f"Ошибка при тестировании: {e}")
        return False

if __name__ == "__main__":
    test_full_system()

9.3. Запуск тестов

# Создание директории для тестовых данных
mkdir -p tests/test_data

# Запуск тестов
python tests/test_ocr.py
python tests/test_sheets.py
python tests/test_full_system.py

10. Развертывание и запуск

10.1. Создание Dockerfile

Создайте файл Dockerfile в корневой директории проекта:

FROM python:3.10-slim

WORKDIR /app

# Установка необходимых пакетов
RUN apt-get update && apt-get install -y \
    libgl1-mesa-glx \
    libglib2.0-0 \
    && rm -rf /var/lib/apt/lists/*

# Копирование файлов проекта
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Установка языковой модели для spaCy
RUN python -m spacy download ru_core_news_sm

# Копирование остальных файлов
COPY . .

# Создание необходимых директорий
RUN mkdir -p data/images data/processed data/failed logs

# Запуск бота
CMD ["python", "run.py"]

10.2. Создание docker-compose.yml

Создайте файл docker-compose.yml в корневой директории проекта:

version: '3'

services:
  logistics-bot:
    build: .
    volumes:
      - ./data:/app/data
      - ./logs:/app/logs
      - ./.env:/app/.env
      - ./credentials.json:/app/credentials.json
    restart: always

10.3. Запуск бота

# Запуск в режиме разработки
python run.py

# Запуск с использованием Docker
docker-compose up -d

10.4. Мониторинг логов

# Просмотр логов бота
tail -f logs/bot.log

# Просмотр логов Docker-контейнера
docker-compose logs -f

Заключение

В этом руководстве мы рассмотрели все этапы создания Telegram-бота с OCR для логистической системы, который автоматически обрабатывает изображения товарных накладных в групповом чате и извлекает из них данные о весе (нетто, брутто, тара).

Бот использует пассивный мониторинг чата, не требуя специальных команд от водителей, и автоматически записывает извлеченные данные в Google Sheets, используя дату и время отправки изображения как дату загрузки/выгрузки.

Для повышения точности распознавания используется Google Cloud Vision API, который хорошо справляется с распознаванием рукописных цифр и изображений низкого качества.

Система также включает в себя механизмы валидации данных, обработки ошибок и предотвращения повторной обработки одних и тех же изображений.

Для развертывания бота можно использовать Docker, что упрощает процесс установки и обновления.