Нелогичный подход к оптимизации ИИ, который меняет то, как мы внедряем модели
Делиться

TL;DR
Большинство компаний сталкиваются с трудностями, связанными с затратами и задержками, связанными с внедрением ИИ. В этой статье показано, как создать гибридную систему, которая:
- Обрабатывает 94,9% запросов на периферийных устройствах (время отклика менее 20 мс)
- Снижает затраты на вывод на 93,5% по сравнению с решениями, использующими только облако.
- Сохраняет точность исходной модели на уровне 99,1% благодаря интеллектуальному квантованию
- Хранит конфиденциальные данные локально, что упрощает соблюдение требований
Мы рассмотрим полную реализацию с помощью кода: от адаптации домена до мониторинга производства.
Настоящая проблема, о которой никто не говорит
Представьте себе: вы создали отличную модель ИИ для поддержки клиентов. Она отлично работает в вашем блокноте Jupyter. Но когда вы разворачиваете её в рабочей среде, вы обнаруживаете:
- Облачный вывод стоит 2900 долларов в месяц при приличных объемах трафика
- Время отклика составляет около 200 мс (клиенты замечают задержку)
- Данные пересекают международные границы (команда по обеспечению соответствия недовольна)
- Расходы непредсказуемо растут в зависимости от пиков трафика
Знакомо? Вы не одиноки. По данным Forbes Tech Council (2024), до 85% моделей ИИ могут не быть успешно развернуты, главными препятствиями являются стоимость и задержка.
Решение: думайте как служба безопасности аэропорта
Вместо того, чтобы отправлять каждый запрос в огромную облачную модель, что если бы мы могли:
- Обрабатывайте 95% повседневных запросов локально (например, в режиме ускоренного прохождения контроля безопасности в аэропорту)
- Передавайте в облако только сложные случаи (вторичная проверка)
- Ведите четкий учет решений о маршрутизации (для аудита)
Этот «самый передовой» подход отражает естественный подход людей к обработке запросов на поддержку. Опытные специалисты могут быстро решить большинство проблем, передавая специалистам только самые сложные.

Что мы построим вместе
К концу этой статьи вы узнаете:
- Адаптированная к предметной области модель , которая понимает язык обслуживания клиентов
- На 84% уменьшенная квантованная версия , которая быстро работает на ЦП
- Интеллектуальный маршрутизатор , который выбирает между периферией и облаком в зависимости от запроса
- Мониторинг производства для поддержания здоровья всех участников
Давайте начнем кодировать.
Настройка среды: все правильно с первого дня
Для начала давайте создадим воспроизводимую среду. Ничто так не убивает импульс, как целый день, потраченный на отладку конфликтов библиотек.
import os import warnings import numpy as np import pandas as pd import torch import tensorflow as tf from transformers import ( DistilBertTokenizerFast, DistilBertForMaskedLM, Trainer, TrainingArguments, TFDistilBertForSequenceClassification ) from sklearn.model_selection import train_test_split from sklearn.preprocessing import LabelEncoder import onnxruntime as ort import time from collections import deque def setup_reproducible_environment(seed=42): «»»Сделать результаты воспроизводимыми между запусками»»» np.random.seed(seed) torch.manual_seed(seed) tf.random.set_seed(seed) torch.backends.cudnn.deterministic = True tf.config.experimental.enable_op_determinism() warnings.filterwarnings('ignore') print(f»✅ Окружение настроено (seed: {seed})») setup_reproducible_environment() # Аппаратные характеристики для воспроизведения SYSTEM_CONFIG = { «cpu»: «Intel Xeon Silver 4314 @ 2.4GHz», «memory»: «64GB DDR4», «os»: «Ubuntu 22.04», «python»: «3.10.12», «key_libs»: { «torch»: «2.7.1», «tensorflow»: «2.14.0», «transformers»: «4.52.4», «onnxruntime»: «1.17.3» } } # Структура проекта PATHS = { «data»: «./data», «models»: { «domain_adapted»: «./models/dapt», «classifier»: «./models/classifier», «onnx_fp32»: «./models/onnx/model_fp32.onnx», «onnx_quantized»: «./models/onnx/model_quantized.onnx» }, «logs»: «./logs» } # Создаём каталоги для path в PATHS.values(): if isinstance(path, dict): for p in path.values(): os.makedirs(os.path.dirname(p) if '.' in os.path.basename(p) else p, exist_ok=True) else: os.makedirs(path, exist_ok=True) print(«📁 Структура проекта готова») # УЛУЧШЕНО: Добавлены эмодзи для согласованности
Шаг 1: Адаптация к предметной области — обучение ИИ речевой поддержке
Обычные модели знают английский, но не умеют поддерживать связь на английском. Между «Мне нужна помощь» и «Это совершенно неприемлемо — я требую немедленной связи с менеджером» — огромная разница!
Адаптивное предварительное обучение (DAPT) решает эту проблему, продолжая изучение языка модели на основе разговоров в службе поддержки клиентов перед ее обучением классификации.
class CustomerServiceTrainer: «»»Полный конвейер для адаптации домена + классификации»»» def __init__(self, base_model=»distilbert-base-uncased»): self.base_model = base_model self.tokenizer = DistilBertTokenizerFast.from_pretrained(base_model) print(f»🤖 Инициализировано с помощью {base_model}») def domain_adaptation(self, texts, output_path, epochs=2, batch_size=32): «»» Фаза 1: Адаптация модели к шаблонам языка обслуживания клиентов Это похоже на погружение в язык — модель изучает специфичную для поддержки лексику, фразы эскалации и общие шаблоны взаимодействия. «»» from datasets import Dataset from transformers import DataCollatorForLanguageModeling print(f»📚 Запуск адаптации домена для {len(texts):,} разговоров…») # Создание набора данных для моделирования маскированного языка dataset = Dataset.from_dict({«text»: texts}).map( lambda examples: self.tokenizer( examples[«text»], padding=»max_length», truncation=True, max_length=128 # Оставьте разумный для памяти ), batched=True, remove_columns=[«text»] ) # Инициализируйте модель для продолжения предварительного обучения model = DistilBertForMaskedLM.from_pretrained(self.base_model) print(f» 📊 Параметры модели: {model.num_parameters():,}») # Настройка обучения training_args = TrainingArguments( output_dir=output_path, num_train_epochs=epochs, per_device_train_batch_size=batch_size, logging_steps=200, save_steps=1000, fp16=torch.cuda.is_available(), # Используйте смешанную точность, если доступен графический процессор ) trainer = Trainer( model=model, args=training_args, train_dataset=dataset, data_collator=DataCollatorForLanguageModeling( self.tokenizer, mlm=True, mlm_probability=0.15 ) ) # Обучаем и сохраняем trainer.train() trainer.save_model(output_path) self.tokenizer.save_pretrained(output_path) print(f»✅ Адаптация домена завершена: {output_path}») return output_path def train_classifier(self, X_train, X_val, y_train, y_val, dapt_model_path, output_path, epochs=8): «»» Фаза 2: Двухэтапное обучение классификации Этап 1: Разогрев головы классификатора (основная часть заморожена) Этап 2: Тонкая настройка всей модели с меньшей скоростью обучения «»» from transformers import create_optimizer print(f»🎯 Обучение классификатора включено {len(X_train):,} образцов…») # Кодируем метки self.label_encoder = LabelEncoder() y_train_enc = self.label_encoder.fit_transform(y_train) y_val_enc = self.label_encoder.transform(y_val) print(f» 📊 Классы: {list(self.label_encoder.classes_)}») # Создаем наборы данных TensorFlow def make_dataset(texts, labels, batch_size=128, shuffle=False): encodings = self.tokenizer( texts, padding=»max_length», truncation=True, max_length=256, return_tensors=»tf» # Длиннее для классификации ) dataset = tf.data.Dataset.from_tensor_slices((dict(encodings), labels)) if shuffle: dataset = dataset.shuffle(10000, seed=42) return dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE) train_dataset = make_dataset(X_train, y_train_enc, shuffle=True) val_dataset = make_dataset(X_val, y_val_enc) # Загрузка адаптированной к домену модели для классификации model = TFDistilBertForSequenceClassification.from_pretrained( dapt_model_path, num_labels=len(self.label_encoder.classes_) ) # Оптимизатор с прогревом total_steps = len(train_dataset) * epochs optimizer, _ = create_optimizer( init_lr=3e-5, num_train_steps=total_steps, num_warmup_steps=int(0.1 * total_steps) ) model.compile( optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'] ) # Этап 1: Разогрев головки классификатора print(» 🔥 Этап 1: Разогрев головки классификатора…») model.distilbert.trainable = False model.fit(train_dataset, validation_data=val_dataset, epochs=1, verbose=1) # Этап 2: Полная тонкая настройка print(» 🔥 Этап 2: Полная тонкая настройка модели…») model.distilbert.trainable = True model.optimizer.learning_rate = 3e-6 # Уменьшение LR для стабильности # Добавление обратных вызовов для лучшего обучения callbacks = [ tf.keras.callbacks.EarlyStopping(patience=2, restore_best_weights=True), tf.keras.callbacks.ReduceLROnPlateau(factor=0.5, patient=1) ] history = model.fit( train_dataset, validation_data=val_dataset, epochs=epochs-1, # Уже выполнил 1 epoch callbacks=callbacks, verbose=1 ) # Сохраним все model.save_pretrained(output_path) self.tokenizer.save_pretrained(output_path) import joblib joblib.dump(self.label_encoder, f»{output_path}/label_encoder.pkl») best_acc = max(history.history['val_accuracy']) print(f»✅ Обучение завершено! Наилучшая точность: {best_acc:.4f}») return model, history # Давайте создадим несколько демонстрационных данных для демонстрации def create_sample_data(n_samples=5000): «»»Создайте реалистичные данные по обслуживанию клиентов для демонстрации»»» np.random.seed(42) # Шаблоны примеров разговоров templates = { 'positive': [ «Большое спасибо за превосходное обслуживание клиентов сегодня!», «Отличная работа по быстрому и профессиональному решению моей проблемы.», «Я очень ценю помощь с моей учетной записью.», «Служба поддержки была фантастической и очень компетентной.», «Идеальное обслуживание, именно то, что мне было нужно.» ], 'negative': [ «Это совершенно неприемлемо, и я требую поговорить с менеджером!», «Я крайне расстроен низким качеством обслуживания.», «Эта проблема не решается уже несколько недель.», «Ужасный опыт, худшее обслуживание клиентов в моей жизни.», «Я требую немедленного полного возврата средств, это просто смешно.» ], 'neutral': [ «Мне нужна помощь с настройками моей учетной записи, пожалуйста.», «Можете ли вы проверить статус моего последнего заказа?», «Каковы ваши часы работы и контактная информация?», «У меня есть вопрос о вариантах выставления счетов и оплаты.», «Пожалуйста, помогите мне разобраться в процессе возврата средств.» ] } data = [] for _ in range(n_samples): sentiment = np.random.choice(['positive', 'negative', 'neutral'], p=[0.4, 0.3, 0.3]) # Реалистичное распределение template = np.random.choice(templates[sentiment]) # Добавить несколько вариаций if np.random.random() < 0.2: # 20% get account numbers template += f" Номер моего счета {np.random.randint(100000, 999999)}." data.append({ 'transcript': template, 'sentiment': sentiment }) df = pd.DataFrame(data) print(f"📊 Created {len(df):,} sample conversations") print(f"📊 Sentiment Distribution:n{df['sentiment'].value_counts()}") return df # Выполнить адаптацию домена и обучение классификации trainer = CustomerServiceTrainer() # Создать образцы данных (заменить вашими фактическими данными) df = create_sample_data(5000) # Разделить данные X_train, X_val, y_train, y_val = train_test_split( df['transcript'], df['sentiment'], test_size=0.2, stratify=df['sentiment'], random_state=42 ) # Запустить адаптацию домена dapt_path = trainer.domain_adaptation( df['transcript'].tolist(), PATHS['models']['domain_adapted'], epochs=2 ) # Модель классификатора обучения, история = trainer.train_classifier( X_train.tolist(), X_val.tolist(), y_train.tolist(), y_val.tolist(), dapt_path, PATHS['models']['classifier'], epochs=6 )
Шаг 2: Сжатие модели – уменьшение размера на 84%
А теперь волшебный трюк: мы сожмём нашу модель на 84%, сохранив при этом практически всю её точность. Именно это делает возможным развёртывание на периферии.
Ключевой вывод заключается в том, что большинство нейронных сетей слишком сложны в разработке. Они используют 32-битные числа с плавающей запятой, тогда как 8-битные целые числа прекрасно подходят для большинства задач. Это как использовать камеру высокого разрешения, когда камера телефона даёт тот же результат для социальных сетей.
class ModelCompressor: «»»Сжатие модели на основе ONNX с комплексной проверкой»»» def __init__(self, model_path): self.model_path = model_path self.tokenizer = DistilBertTokenizerFast.from_pretrained(model_path) print(f»🗜️ Компрессор готов к {model_path}») def compress_to_onnx(self, fp32_output, quantized_output): «»» Двухэтапный процесс: 1. Преобразование модели TensorFlow в ONNX (кроссплатформенный формат) 2. Применение динамического квантования INT8 (калибровка не требуется) «»» from optimal.onnxruntime import ORTModelForSequenceClassification from onnxruntime.quantization import quantize_dynamic, QuantType print(«📋 Шаг 1: Преобразование в формат ONNX…») # Экспорт в ONNX (это делает модель portable across platform) ort_model = ORTModelForSequenceClassification.from_pretrained( self.model_path, export=True, provider=»CPUExecutionProvider» ) ort_model.save_pretrained(os.path.dirname(fp32_output)) # Переименовываем в нужный нам путь generated_path = os.path.join(os.path.dirname(fp32_output), «model.onnx») if os.path.exists(generated_path): os.rename(generated_path, fp32_output) fp32_size = os.path.getsize(fp32_output) / (1024**2) # МБ print(f» 📏 Исходная модель ONNX: {fp32_size:.2f}MB») print(«⚡ Шаг 2: Применение динамического квантования INT8…») # Динамическое квантование — без калибровки Нужен набор данных! quantize_dynamic( model_input=fp32_output, model_output=quantized_output, op_types_to_quantize=[QuantType.QInt8, QuantType.QUInt8], weight_type=QuantType.QInt8, optimize_model=False # Разделите оптимизацию ) quantized_size = os.path.getsize(quantized_output) / (1024**2) # Коэффициент_сжатия в МБ = (fp32_size — квантованный_размер) / fp32_size * 100 print(f» 📏 Квантованная модель: {quantized_size:.2f}MB») print(f» 🎯 Сжатие: {compression_ratio:.1f}% уменьшения размера») return fp32_output, quantized_output, compression_ratio def benchmark_models(self, fp32_path, quantized_path, test_texts, test_labels): «»» Сравнение моделей FP32 и INT8 по точности, скорости и размеру. Это крайне важно — нам нужно убедиться, что сжатие ничего не сломало! «»» print(«🧪 Тестирование производительности модели…») results = {} for name, model_path in [(«FP32 Original», fp32_path), («INT8 Quantized», quantized_path)]: print(f» Тестирование {name}…») # Загрузка модели для сеанса вывода = ort.InferenceSession(model_path, providers=[«CPUExecutionProvider»]) # Тест на репрезентативной выборке (500 примеров для скорости) test_sample = min(500, len(test_texts)) correct_predictions = 0 latencies = [] # Прогрев модели (важно для справедливого расчета времени!) warmup_text = «Спасибо за помощь с моим заказом сегодня» warmup_encoding = self.tokenizer( warmup_text, padding=»max_length», truncation=True, max_length=256, return_tensors=»np» ) for _ in range(10): # 10 прогонов прогрева _ = session.run(None, { «input_ids»: warmup_encoding[«input_ids»], «attention_mask»: warmup_encoding[«attention_mask»] }) # Фактический бенчмаркинг для i in range(test_sample): text, true_label = test_texts[i], test_labels[i] encoding = self.tokenizer( text, padding=»max_length», truncation=True, max_length=256, return_tensors=»np» ) # Время вывода start_time = time.perf_counter() outputs = session.run(None, { «input_ids»: encoding[«input_ids»], «attention_mask»: encoding[«attention_mask»] }) latency_ms = (time.perf_counter() — start_time) * 1000 latency.append(latency_ms) # Проверка точности predicted_class = np.argmax(outputs[0]) if predicted_class == true_label: correct_predictions += 1 # Рассчитать метрики precision = correct_predictions / test_sample mean_latency = np.mean(latencies) p95_latency = np.percentile(latencies, 95) model_size_mb = os.path.getsize(model_path) / (1024**2) results[name] = { «accuracy»: precision, «mean_latency_ms»: mean_latency, «p95_latency_ms»: p95_latency, «model_size_mb»: model_size_mb, «throughput_qps»: 1000 / mean_latency # Запросов в секунду } print(f» ✓ Точность: {accuracy:.4f}») print(f» ✓ Средняя задержка: {mean_latency:.2f}ms») print(f» ✓ Задержка P95: {p95_latency:.2f}ms») print(f» ✓ Размер модели: {model_size_mb:.2f}MB») print(f» ✓ Пропускная способность: {results[name]['throughput_qps']:.1f} QPS») # Покажите сравнение, если len(results) == 2: fp32_results = results[«FP32 Original»] int8_results = results[«INT8 Quantized»] size_reduction = (1 — int8_results[«model_size_mb»] / fp32_results[«model_size_mb»]) * 100 precision_retention = int8_results[«accuracy»] / fp32_results[«accuracy»] latency_change = ((int8_results[«mean_latency_ms»] — fp32_results[«mean_latency_ms»]) / fp32_results[«mean_latency_ms»]) * 100 print(f»n🎯 Сводка влияния квантования:») print(f» 📦 Уменьшение размера: {size_reduction:.1f}%») print(f» 🎯 Сохранение точности: {accuracy_retention:.1%}») print(f» ⚡ Изменение задержки: {latency_change:+.1f}%») print(f» 💾 Сохранено памяти: {fp32_results['model_size_mb'] — int8_results['model_size_mb']:.1f}MB») return results # Выполнить сжатие модели compressor = ModelCompressor(PATHS['models']['classifier']) # Сжать модель fp32_path, quantized_path, compression_ratio = compressor.compress_to_onnx( PATHS['models']['onnx_fp32'], PATHS['models']['onnx_quantized'] ) # Загрузить тестовые данные и кодировщик меток для сравнительного анализа import joblib label_encoder = joblib.load(f»{PATHS['models']['classifier']}/label_encoder.pkl») test_labels_encoded = label_encoder.transform(y_val[:500]) # Тестирование моделей benchmark_results = compressor.benchmark_models( fp32_path, quantized_path, X_val[:500].tolist(), test_labels_encoded )
Шаг 3: Умный маршрутизатор — выбор между Edge и Cloud
Именно здесь и происходит гибридная магия. Наш маршрутизатор анализирует каждый запрос клиента и определяет, обрабатывать его локально (на периферии) или перенаправлять в облако. Представьте себе интеллектуальный контроллер трафика.
Маршрутизатор учитывает пять факторов:
- Длина текста — более длинные запросы часто означают более сложные проблемы.
- Структура предложения : множественные предложения указывают на тонкие проблемы
- Эмоциональные индикаторы – такие слова, как «разочарование», сигнализируют о необходимости эскалации.
- Модель доверия — если ИИ не уверен, перенаправьте его в облако
- Ключевые слова для эскалации – «менеджер», «жалоба» и т. д.
class IntelligentRouter: «»» Интеллектуальная система маршрутизации, которая максимально использует периферийные ресурсы, сохраняя при этом качество. Основная идея: 95% запросов клиентов являются типовыми и могут быть обработаны небольшой быстрой моделью. Оставшимся 5% требуется вся мощь облака. «»» def __init__(self, edge_model_path, cloud_model_path, tokenizer_path): # Загрузить обе модели self.edge_session = ort.InferenceSession( edge_model_path, providers=[«CPUExecutionProvider»] ) self.cloud_session = ort.InferenceSession( cloud_model_path, providers=[«CPUExecutionProvider»] # Также можно использовать GPU ) # Загрузить токенизатор и кодировщик меток self.tokenizer = DistilBertTokenizerFast.from_pretrained(tokenizer_path) import joblib self.label_encoder = joblib.load(f»{tokenizer_path}/label_encoder.pkl») # Конфигурация маршрутизации (настроена экспериментально) self.complexity_threshold = 0.75 # Маршрут в облако, если сложность > 0.75 self.confidence_threshold = 0.90 # Маршрут в облако, если уверенность < 0.90 self.edge_preference = 0.95 # 95% предпочтение периферии, когда это возможно # Отслеживание затрат (реалистичные цены в облаке) self.costs = { "edge": 0.001, # 0.001 $ за вывод на периферии "cloud": 0.0136 # 0.0136 $ за вывод в облаке (ценообразование как в OpenAI) } # Метрики производительности self.metrics = { "total_requests": 0, "edge_requests": 0, "cloud_requests": 0, "total_cost": 0.0, "routing_reasons": {} } print("🧠 Умный маршрутизатор инициализирован") print(f" Порог сложности: {self.complexity_threshold}") print(f" Порог уверенности: {self.confidence_threshold}") print(f" Соотношение стоимости облака/периферии: {self.costs['cloud']/self.costs['edge']:.1f}x") def analyze_complexity(self, text, model_confidence): """ Многомерный анализ сложности Это сердце нашей логики маршрутизации. Мы рассматриваем несколько сигналов, чтобы определить, нужна ли запросу вся мощь облачной модели. """ # Фактор 1: Сложность длины (нормализованная по типичным сообщениям клиентов) # Более длинные сообщения часто указывают на более сложные проблемы length_score = min(len(text) / 200, 1.0) # 200 символов = типичное сообщение # Фактор 2: Синтаксическая сложность (структура предложения) sentences = [s.strip() for s in text.split('.') if s.strip()] words = text.split() if sentences and words: avg_sentence_length = len(words) / len(sentences) syntax_score = min(avg_sentence_length / 15, 1.0) # 15 words = среднее else: syntax_score = 0.0 # Фактор 3: Неопределенность модели (обратная величина уверенности) # Если модель не уверена, это, вероятно, сложный случай certainty_score = 1 - abs(2 * model_confidence - 1) # Фактор 4: Ключевые слова эскалации/эмоциональности escalation_keywords = [ 'разочарованный', 'сердитый', 'неприемлемый', 'менеджер', 'руководитель', 'жалоба', 'ужасный', 'ужасный', 'отвращение', 'яростный' ] keywords_matches = sum(1 for word in escalation_keywords if word in text.lower()) emotional_score = min(keyword_matches / 3, 1.0) # Нормализовать до 0-1 # Взвешенная комбинация (веса настраиваются экспериментальным путем) difficulty = ( 0.3 * length_score + # Длина имеет наибольшее значение 0.3 * syntax_score + # Структура важна 0.2 * certain_score + # Уверенность модели 0.2 * emotional_score # Эмоциональные индикаторы ) return difficulty, { 'length': length_score, 'syntax': syntax_score, 'uncertainty': certain_score, 'emotion': emotional_score, 'keyword_matches': keyword_matches } def route_queries(self, requests): """ Основной конвейер маршрутизации 1. Получить начальные прогнозы из облачной модели (для оценок уверенности) 2. Проанализировать сложность каждого запроса 3. Направить простые запросы в Edge, сложные запросы остаются в облаке 4. Верните результаты с записанными решениями о маршрутизации """ print(f" Маршрутизация {len(queries)} запросов клиентов...") # Шаг 1: Получите прогнозы облака для анализа сложности cloud_predictions = self._run_inference(self.cloud_session, requests, "cloud") # Шаг 2: Проанализируйте каждый запрос и примите решения о маршрутизации edge_queries = [] edge_indices = [] routing_decisions = [] for i, (query, cloud_result) in enumerate(zip(queries, cloud_predictions)): if "error" in cloud_result: # Если облако не удалось, принудительно переключиться на Edge в качестве запасного decision = { "route": "edge", "reason": "cloud_error", "complexity": 0.0, "confidence": 0.0 } edge_queries.append(query) edge_indices.append(i) else: # Проанализируйте сложность difficulty, breakdown = self.analyze_complexity( query, cloud_result["confidence"] ) # Принять решение о маршрутизации should_use_edge = ( difficulty <= self.complexity_threshold and cloud_result["confidence"] >= self.confidence_threshold and np.random.random() < self.edge_preference ) # Определить причину решения о маршрутизации if should_use_edge: reason = "optimal_edge" edge_queries.append(query) edge_indices.append(i) else: if difficulty > self.complexity_threshold: reason = «high_complexity» elif cloud_result[«confidence»] < self.confidence_threshold: reason = "low_confidence" else: reason = "random_cloud" decision = { "route": "edge" if should_use_edge else "cloud", "reason": reason, "complexity": difficulty, "confidence": cloud_result["confidence"], "breakdown": breakdown } routing_decisions.append(decision) # Шаг 3: Запустить вывод границ для выбранных запросов if edge_queries: edge_results = self._run_inference(self.edge_session, edge_queries, "edge") # Заменить результаты облака на результаты границ для маршрутизированных запросов for idx, edge_result in zip(edge_indices, edge_results): cloud_predictions[idx] = edge_result # Шаг 4: Добавить метаданные маршрутизации и стоимость for i, (result, decision) in enumerate(zip(cloud_predictions, routing_decisions)): result.update(decision) result["cost"] = self.costs[decision["route"]] # Шаг 5: Обновить метрики edge_count = len(edge_queries) cloud_count = len(queries) - edge_count self.metrics["total_requests"] += len(queries) self.metrics["edge_requests"] += edge_count self.metrics["cloud_requests"] += cloud_count batch_cost = edge_count * self.costs["edge"] + cloud_count * self.costs["cloud"] self.metrics["total_cost"] += batch_cost # Отслеживать причины маршрутизации для принятия решения в routing_decisions: reason = decision["reason"] self.metrics["routing_reasons"][reason] = ( self.metrics["routing_reasons"].get(reason, 0) + 1 ) print(f" Направлено: {edge_count} edge, {cloud_count} cloud") print(f" Стоимость пакета: ${batch_cost:.4f}") print(f" Использование Edge: {edge_count/len(queries):.1%}") return cloud_predictions, { "total_queries": len(queries), "edge_utilization": edge_count / len(queries), "batch_cost": batch_cost, "avg_complexity": np.mean([d["complexity"] for d in routing_decisions]) } def _run_inference(self, session, texts, source): """Запустить пакетный вывод с обработкой ошибок""" try: # Токенизировать все тексты encodings = self.tokenizer( texts, padding="max_length", truncation=True, max_length=256, return_tensors="np" ) # Запустить вывод outputs = session.run(None, { "input_ids": encodings["input_ids"], "attention_mask": encodings["attention_mask"] }) # Обработать результаты results = [] for i, logits in enumerate(outputs[0]): predicted_class = int(np.argmax(logits)) trust = float(np.max(self._softmax(logits))) predicted_sentiment = self.label_encoder.inverse_transform([predicted_class])[0] results.append({ "text": texts[i], "predicted_class": predicted_class, "predicted_sentiment": predicted_sentiment, "confidence": trust, "processing_location": source }) return results except Exception as e: # Return error results return [{"text": text, "error": str(e), "processing_location": source} for text in texts] def _softmax(self, x): """Преобразовать логиты в вероятности""" exp_x = np.exp(x - np.max(x)) return exp_x / np.sum(exp_x) def get_system_stats(self): """Получите полную системную статистику""" if self.metrics["total_requests"] == 0: return {"error": "Запросы не обработаны"} # Рассчитайте экономию средств по сравнению с использованием только облака cloud_only_cost = self.metrics["total_requests"] * self.costs["cloud"] actual_cost = self.metrics["total_cost"] savings_percent = (cloud_only_cost - actual_cost) / cloud_only_cost * 100 return { "total_queries_processed": self.metrics["total_requests"], "edge_utilization": self.metrics["edge_requests"] / self.metrics["total_requests"], "cloud_utilization": self.metrics["cloud_requests"] / self.metrics["total_requests"], "total_cost": self.metrics["total_cost"], "cost_per_query": self.metrics["total_cost"] / self.metrics["total_requests"], "cost_savings_percent": savings_percent, "routing_reasons": dict(self.metrics["routing_reasons"]), "estimated_monthly_savings": (cloud_only_cost - actual_cost) * 30 } # Инициализируем маршрутизатор router = IntelligentRouter( edge_model_path=PATHS['models']['onnx_quantized'], cloud_model_path=PATHS['models']['onnx_fp32'], tokenizer_path=PATHS['models']['classifier'] ) # Тест с реалистичными запросами клиентов test_queries = [ "Большое спасибо за отличное обслуживание клиентов сегодня!", "Я крайне расстроен этим постоянным выставлением счетов Проблема, которая продолжается уже три месяца, несмотря на многочисленные обращения в вашу службу поддержки, которая, похоже, совершенно неспособна решить эти сложные проблемы с синхронизацией учётных записей.», «Не могли бы вы помочь мне проверить статус моего заказа?», «Какова ваша политика возврата бракованных товаров?», «Это совершенно неприемлемо, и я требую немедленно поговорить с менеджером по поводу этих ошибок в счёте!», «Номер моей учётной записи — 123456789, и мне нужна помощь с процессом обновления.», «Здравствуйте, у меня есть небольшой вопрос по моей недавней покупке.», «Служба технической поддержки не смогла решить мою проблему с подключением, и мне нужно обратиться к старшему специалисту, который может решить проблемы с настройкой корпоративной сети». ] # Маршрутизация результатов запросов, batch_metrics = router.route_queries(test_queries) # Отображение подробных результатов print(f"n ПОДРОБНЫЙ АНАЛИЗ МАРШРУТИЗАЦИИ:") for i, (query, result) in enumerate(zip(test_queries, results)): route = result.get("processing_location", "unknown").upper() sentiment = result.get("predicted_sentiment", "unknown") trust = result.get("confidence", 0) difficulty = result.get("complexity", 0) reason = result.get("reason", "unknown") cost = result.get("cost", 0) print(f"nQuery {i+1}: "{query[:60]}..."") print(f" Route: {route} (reason: {reason})") print(f" Sentiment: {sentiment} (confidence: {confidence:.3f})") print(f" Сложность: {complexity:.3f}") print(f" Стоимость: ${cost:.6f}") # Показать общесистемную производительность system_stats = router.get_system_stats() print(f"n СВОДКА ПО ПРОИЗВОДИТЕЛЬНОСТИ СИСТЕМЫ:") print(f" Всего запросов: {system_stats['total_queries_processed']}") print(f" Использование Edge: {system_stats['edge_utilization']:.1%}") print(f" Стоимость за запрос: ${system_stats['cost_per_query']:.6f}") print(f" Экономия средств: {system_stats['cost_savings_percent']:.1f}%") print(f" Оценка ежемесячной экономии: ${system_stats['estimated_monthly_savings']:.2f}")
Шаг 4: Мониторинг производства – поддержание его в рабочем состоянии
Система без мониторинга — это система, обречённая на сбой. Наша система мониторинга проста в использовании, но эффективна для выявления важных проблем: снижения точности, резких скачков затрат и проблем с маршрутизацией.
class ProductionMonitor: «»» Облегченный мониторинг производства для гибридных систем ИИ Отслеживает метрики, которые действительно важны для бизнес-результатов: — Использование периферии (влияние на стоимость) — Тенденции точности (влияние на качество) — Распределение задержки (влияние на пользовательский опыт) — Стоимость за запрос (влияние на бюджет) «»» def __init__(self, alert_thresholds=None): # Установите разумные значения по умолчанию для оповещений self.thresholds = alert_thresholds or { «min_edge_utilization»: 0.80, # Оповещение, если использование периферии < 80% "min_accuracy": 0.85, # Оповещение, если точность падает ниже 85% "max_cost_per_query": 0.01, # Оповещение, если стоимость > 0.01 доллара за запрос «max_p95_latency»: 150 # Оповещение, если задержка P95 > 150 мс } # Эффективное хранилище с кольцевыми буферами (с ограничением по памяти) self.metrics_history = deque(maxlen=10000) # ~1 неделя при 1 пакете в минуту self.alerts = [] print(» Мониторинг производства инициализирован») print(f» Пороговые значения: {self.thresholds}») def log_batch(self, batch_metrics, precision=None, latencies=None): «»» Запись производительности пакета и проверка на наличие проблем Эта функция вызывается после обработки каждого пакета запросов. «»» timestamp = time.time() # Создание записи о производительности record = { «timestamp»: timestamp, «edge_utilization»: batch_metrics[«edge_utilization»], «total_cost»: batch_metrics[«batch_cost»], «avg_complexity»: batch_metrics.get(«avg_complexity», 0), «query_count»: batch_metrics[«total_queries»], «accuracy»: precision } # Добавить статистику по задержкам, если она указана if latencies: record.update({ «mean_latency»: np.mean(latencies), «p95_latency»: np.percentile(latencies, 95), «p99_latency»: np.percentile(latencies, 99) }) self.metrics_history.append(record) # Проверка на наличие оповещений alerts = self._check_alerts(record) self.alerts.extend(alerts) if alerts: for alert in alerts: print(f» ALERT: {alert}») def _check_alerts(self, record): «»»Проверка текущих метрик на соответствие пороговым значениям»»» alerts = [] # Оповещение об использовании периферии if record[«edge_utilization»] < self.thresholds["min_edge_utilization"]: alerts.append( f"Низкое использование периферии: {record['edge_utilization']:.1%} " f"< {self.thresholds['min_edge_utilization']:.1%}" ) # Оповещение о точности if record.get("accuracy") и record["accuracy"] < self.thresholds["min_accuracy"]: alerts.append( f"Низкая точность: {record['accuracy']:.3f} " f"< {self.thresholds['min_accuracy']:.3f}" ) # Оповещение о стоимости cost_per_query = record["total_cost"] / record["query_count"] if cost_per_query > self.thresholds[«max_cost_per_query»]: alerts.append( f»Высокая стоимость за запрос: ${cost_per_query:.4f} » f»> ${self.thresholds['max_cost_per_query']:.4f}» ) # Оповещение о задержке if record.get(«p95_latency») и record[«p95_latency»] > self.thresholds[«max_p95_latency»]: alerts.append( f»Высокая задержка P95: {record['p95_latency']:.1f}мс » f»> {self.thresholds['max_p95_latency']}мс» ) return alerts def generate_health_report(self): «»»Создать полный отчет о работоспособности системы»»» if not self.metrics_history: return {«status»: «Данные недоступны»} # Анализируем недавнюю производительность (последние 100 пакетов или 24 часа) now = time.time() recent_cutoff = now — (24 * 3600) # 24 часа назад recent_records = [ r for r in self.metrics_history if r[«timestamp»] > recent_cutoff ] if not recent_records: recent_records = list(self.metrics_history)[-100:] # Последние 100 пакетов # Рассчитать ключевые метрики total_queries = sum(r[«query_count»] for r in recent_records) total_cost = sum(r[«total_cost»] for r in recent_records) # Средние показатели производительности avg_metrics = { «edge_utilization»: np.mean([r[«edge_utilization»] for r in recent_records]), «cost_per_query»: total_cost / total_queries if total_queries > 0 else 0, «avg_complexity»: np.mean([r.get(«avg_complexity», 0) for r in recent_records]) } # Анализ точности (если доступен) precision_records = [r[«accuracy»] for r in recent_records if r.get(«accuracy»)] if precision_records: avg_metrics.update({ «current_accuracy»: precision_records[-1], «avg_accuracy»: np.mean(accuracy_records), «accuracy_trend»: self._calculate_trend(accuracy_records[-10:]) }) # Анализ задержки (если доступен) latency_records = [r.get(«p95_latency») for r in recent_records if r.get(«p95_latency»)] if latency_records: avg_metrics.update({ «current_p95_latency»: latency_records[-1], «avg_p95_latency»: np.mean(latency_records), «latency_trend»: self._calculate_trend(latency_records[-10:]) }) # Недавние оповещения recent_alert_count = len(self.alerts) if self.alerts else 0 # Общая оценка состояния здоровья health_score = self._calculate_health_score(avg_metrics, recent_alert_count) return { «timestamp»: now, «period_analyzed»: f»{len(recent_records)} batches ({total_queries:,} requests)», «health_score»: health_score, «health_status»: self._get_health_status(health_score), «performance_metrics»: avg_metrics, «recent_alerts»: recent_alert_count, «recommendations»: self._generate_recommendations(avg_metrics, recent_alert_count), «cost_analysis»: { «total_cost_analyzed»: total_cost, «daily_cost_estimate»: total_cost * (86400 / (24 * 3600)), # Масштабируем до ежедневной «monthly_cost_estimate»: total_cost * (86400 * 30 / (24 * 3600)) } } def _calculate_trend(self, values, min_samples=3): «»»Рассчитать, улучшаются ли метрики, стабилизируются или снижаются»»» if len(values) < min_samples: return "insufficient_data" # Наклон простой линейной регрессии x = np.arange(len(values)) slope = np.polyfit(x, values, 1)[0] # Определить значимость std_dev = np.std(values) threshold = std_dev * 0.1 # 10% от std dev if abs(slope) < threshold: return "stable" elif slope > 0: return «improving» else: return «declining» def _calculate_health_score(self, metrics, alert_count): «»»Рассчитать общее состояние системы (0-100)»»» score = 100 # Наложить штраф на основе метрик if metrics[«edge_utilization»] < 0.9: score -= 10 # Штраф за использование периферии if metrics["edge_utilization"] < 0.8: score -= 20 # Серьёзный штраф за использование периферии if metrics.get("current_accuracy", 1.0) < 0.9: score -= 15 # Штраф за точность if metrics.get("current_accuracy", 1.0) < 0.8: score -= 30 # Серьёзный штраф за точность # Штраф за оповещение score -= min(alert_count * 5, 30) # Максимальный штраф за оповещения — 30 баллов return max(0, score) def _get_health_status(self, score): """Преобразовать числовое состояние Оценка статуса """ if score >= 90: return «excellent» elif score >= 75: return «good» elif score >= 60: return «fair» elif score >= 40: return «poor» else: return «critical» def _generate_recommendations(self, metrics, alert_count): «»»Сгенерировать рекомендации для выполнения»»» suggestions = [] if metrics[«edge_utilization»] < 0.8: suggestions.append( f"Низкая загрузка периферии ({metrics['edge_utilization']:.1%}): " "Рассмотрите возможность снижения порога сложности или порога уверенности" ) if metrics.get("current_accuracy", 1.0) < 0.85: suggestions.append( f"Низкая точность ({metrics.get('current_accuracy', 0):.3f}): " "Проверьте производительность модели и рассмотрите возможность переобучения" ) if metrics["cost_per_query"] > 0.005: # > $0.005 за запрос suggestions.append( f»Высокая стоимость запроса (${metrics['cost_per_query']:.4f}): » «Увеличьте использование периферии для снижения затрат» ) if alert_count > 5: suggestions.append( f»Высокий объем оповещений ({alert_count}): » «Проверьте пороговые значения оповещений и устраните основные проблемы» ) if not suggestions: suggestions.append(«Система работает в пределах нормальных параметров») return suggestions # Инициализируем мониторинг monitor = ProductionMonitor() # Записываем в журнал производительности нашего пакета monitor.log_batch(batch_metrics) # Сгенерируем отчет о работоспособности health_report = monitor.generate_health_report() print(f»n ОТЧЕТ О СОСТОЯНИИ СИСТЕМЫ:») print(f» Состояние работоспособности: {health_report['health_status'].upper()} ({health_report['health_score']}/100)») print(f» Период: {health_report['period_analyzed']}») print(f»n Ключевые метрики:») для метрики, значение в health_report['performance_metrics'].items(): if isinstance(value, float): if 'utilization' в метрике: print(f» {metric}: {value:.1%}») elif 'cost' в метрике: print(f» {metric}: ${value:.4f}») else: print(f» {metric}: {value:.3f}») else: print(f» {metric}: {value}») print(f»n Анализ затрат:») для метрики, значение в health_report['cost_analysis'].items(): print(f» {metric}: ${value:.4f}») print(f»n Рекомендации:») for i, rec в enumerate(health_report['recommendations'], 1): print(f» {i}. {rec}»)
Что мы создали: готовую к производству систему
Давайте сделаем шаг назад и оценим то, чего мы достигли:
- Адаптированная к предметной области модель , которая понимает язык обслуживания клиентов
- Квантованная модель на 84% меньше , работает на стандартном процессорном оборудовании
- Умный маршрутизатор , который обрабатывает 95% запросов локально
- Мониторинг производства , который выявляет проблемы до того, как они повлияют на пользователей
Вот как эти цифры выглядят на практике:
# Подведем итоги производительности нашей системы print(«🎯 ПРОИЗВОДИТЕЛЬНОСТЬ ГИБРИДНОЙ EDGE-CLOUD ИИ-СИСТЕМЫ») print(«=» * 50) # Результаты сжатия модели fp32_size = benchmark_results[«FP32 Original»][«model_size_mb»] int8_size = benchmark_results[«INT8 Quantized»][«model_size_mb»] compression_ratio = (1 — int8_size/fp32_size) * 100 print(f» Сжатие модели:») print(f» Исходный размер: {fp32_size:.1f}MB») print(f» Квантованный размер: {int8_size:.1f}MB») print(f» Сжатие: {compression_ratio:.1f}%») # Сохранение точности fp32_acc = benchmark_results[«FP32 Original»][«accuracy»] int8_acc = benchmark_results[«INT8 Quantized»][«accuracy»] precision_retention = int8_acc / fp32_acc * 100 print(f»n Точность:») print(f» Исходная точность: {fp32_acc:.3f}») print(f» Квантованная точность: {int8_acc:.3f}») print(f» Удержание: {accuracy_retention:.1f}%») # Метрики производительности fp32_latency = benchmark_results[«FP32 Original»][«mean_latency_ms»] int8_latency = benchmark_results[«INT8 Quantized»][«mean_latency_ms»] print(f»n Производительность:») print(f» Средняя задержка FP32: {fp32_latency:.1f}ms») print(f» Средняя задержка INT8: {int8_latency:.1f}ms») print(f» Задержка FP32 P95: {benchmark_results['FP32 Original']['p95_latency_ms']:.1f}ms») print(f» Задержка INT8 P95: {benchmark_results['INT8 Quantized']['p95_latency_ms']:.1f}ms») # Метрики маршрутизации и стоимости system_stats = router.get_system_stats() print(f»n Эффективность маршрутизации:») print(f» Использование периферии: {system_stats['edge_utilization']:.1%}») print(f» Экономия средств: {system_stats['cost_savings_percent']:.1f}%») print(f» Стоимость за запрос: ${system_stats['cost_per_query']:.6f}») # Работоспособность системы print(f»n Состояние системы:») print(f» Состояние: {health_report['health_status'].upper()}») print(f» Оценка: {health_report['health_score']}/100″) print(f» Последние оповещения: {health_report['recent_alerts']}») print(«n» + «=» * 50)
Основные выводы и дальнейшие шаги
Мы создали нечто практичное: гибридную систему искусственного интеллекта, которая обеспечивает результаты облачного качества при минимальных затратах и задержках на уровне периферийных вычислений. Вот что делает её эффективной:
Правило 95/5 : большинство запросов клиентов — рутинные. Хорошо настроенная небольшая модель может справиться с ними идеально, оставив облаку только действительно сложные случаи.
Сжатие без компромиссов : динамическое квантование INT8 позволяет сократить размер на 84% с минимальной потерей точности, устраняя необходимость в калибровочных наборах данных.
Интеллектуальная маршрутизация : наш многомерный анализ сложности гарантирует, что запросы будут направляться в нужное место по правильным причинам.
Мониторинг производства : простые оповещения по ключевым показателям поддерживают работоспособность системы в процессе производства.
Куда идти дальше
Начните с малого : сначала разверните решение на подмножестве вашего трафика. Убедитесь, что результаты соответствуют вашим ожиданиям, прежде чем масштабировать решение.
Настраивайте постепенно : еженедельно корректируйте пороговые значения маршрутизации в зависимости от ваших конкретных компромиссов между качеством и стоимостью.
Масштабируйте с умом : добавляйте периферийные узлы по мере роста трафика. Архитектура масштабируется горизонтально.
Продолжайте учиться : отслеживайте решения по маршрутизации и тенденции точности. Эти данные помогут вам в дальнейшей оптимизации.
Общая картина
Это касается не только контакт-центров или обслуживания клиентов. Этот принцип работает везде, где есть:
- Крупномасштабные стандартные запросы в сочетании с иногда сложными случаями
- Требования к чувствительности к стоимости и задержке
- Проблемы соблюдения требований или суверенитета данных
Подумайте о своих собственных ИИ-приложениях. Сколько из них действительно сложные, а сколько — рутинные? Мы уверены, что большинство из них следуют правилу 95/5, что делает их идеальными кандидатами для этого гибридного подхода.
Будущее ИИ — не в более крупных моделях, а в более интеллектуальных архитектурах. Системы, которые делают больше с меньшими затратами, хранят данные там, где им место, и стоят столько, сколько вы можете себе позволить.
Готовы попробовать сами? Полный код доступен в этой статье. Начните с собственных данных, следуйте инструкциям по настройке и посмотрите, как будет выглядеть ваше распределение 95/5.
*Все изображения, если не указано иное, принадлежат автору.
Ссылки и ресурсы
- Научная работа : «Сравнительный анализ развертываний периферийных и облачных контакт-центров: техническая и архитектурная перспектива» – IEEE ICECCE 2025
- Полный блокнот : весь код из этой статьи доступен в виде воспроизводимого блокнота Jupyter.
- Характеристики среды : Intel Xeon Silver 4314, 64 ГБ ОЗУ, Ubuntu 22.04, Python 3.10
Описанная здесь система представляет собой результат независимого исследования и не связана с каким-либо работодателем или коммерческой организацией. Результаты могут различаться в зависимости от оборудования, характеристик данных и специфики предметной области.
Хотите обсудить детали реализации или поделиться результатами? Свяжитесь со мной в комментариях ниже.
Источник: towardsdatascience.com



























