От одноядерных до многоядерных процессоров на вашем локальном ПК и не только.
Делиться

Это первая часть двухсерийного цикла статей о распределенных вычислениях с использованием Ray. В этой части показано, как использовать Ray на локальном компьютере, а во второй части — как масштабировать Ray до многосерверных кластеров в облаке.
Представьте, что вы только что приобрели новый 16-ядерный ноутбук или настольный компьютер и вам не терпится проверить его возможности, выполнив несколько ресурсоемких вычислений.
Вы программист на Python, хотя ещё не эксперт, поэтому открываете свой любимый учебник по LLM и задаёте примерно такой вопрос.
«Мне нужно подсчитать количество простых чисел в заданном диапазоне. Пожалуйста, предоставьте мне код на Python для этой задачи».
Через несколько секунд LLM выдаст вам некоторый код. Вы можете немного подкорректировать его в ходе короткого обмена сообщениями, и в итоге получите что-то вроде этого:
import math, time, os def is_prime(n: int) -> bool: if n < 2: return False if n == 2: return True if n % 2 == 0: return False r = int(math.isqrt(n)) + 1 for i in range(3, r, 2): if n % i == 0: return False return True def count_primes(a: int, b: int) -> int: c = 0 for n in range(a, b): if is_prime(n): c += 1 return c if __name__ == «__main__»: A, B = 10_000_000, 20_000_000 total_cpus = os.cpu_count() or 1 # Start «chunky»; Мы можем выполнить эту операцию позже. chunks = max(4, total_cpus * 2) step = (B — A) // chunks print(f»CPUs~{total_cpus}, chunks={chunks}») t0 = time.time() results = [] for i in range(chunks): s = A + i * step e = s + step if i < chunks - 1 else B results.append(count_primes(s, e)) total = sum(results) print(f"total={total}, time={time.time() - t0:.2f}s")
Вы запускаете программу, и она работает идеально. Единственная проблема в том, что она выполняется довольно долго, возможно, от тридцати до шестидесяти секунд, в зависимости от размера входного диапазона. Это, вероятно, неприемлемо.
Что вам теперь делать? У вас есть несколько вариантов, и, вероятно, три наиболее распространенных:
– Распараллелите код, используя потоки или многопроцессорную обработку.
– Перепишите код на «быстром» языке, таком как C или Rust.
– Попробуйте использовать такие библиотеки, как Cython, Numba или NumPy.
Все эти варианты жизнеспособны, но у каждого есть свои недостатки. Варианты 1 и 3 значительно увеличивают сложность кода, а средний вариант может потребовать изучения нового языка программирования.
А что, если я скажу вам, что есть другой способ? Способ, при котором изменения, необходимые для вашего существующего кода, будут сведены к абсолютному минимуму. Способ, при котором ваша среда выполнения автоматически распределяется по всем доступным ядрам.
Именно это и обещает сделать сторонняя библиотека Ray .
Кто такой Рэй?
Библиотека Ray Python — это платформа для распределенных вычислений с открытым исходным кодом. Разработан для упрощения масштабирования программ на Python от ноутбука до кластера с минимальными изменениями в коде.
Ray упрощает масштабирование и распределение ресурсоемких рабочих нагрузок приложений — от глубокого обучения до обработки данных — по кластерам удаленных компьютеров, а также обеспечивает практическое улучшение производительности приложений на вашем ноутбуке, настольном компьютере или даже в удаленном облачном вычислительном кластере.
Ray предоставляет богатый набор библиотек и интеграций, построенных на гибкой распределенной среде выполнения, что делает распределенные вычисления простыми и доступными для всех.
Вкратце, Ray позволяет распараллеливать и распределять ваш код на Python с минимальными усилиями, независимо от того, выполняется ли он локально на ноутбуке или на огромном облачном кластере.
Используя луч
В оставшейся части этой статьи я расскажу вам об основах использования Ray для ускорения ресурсоемкого кода на Python, и мы настроим несколько примеров кода, чтобы показать, как легко интегрировать возможности Ray в ваши собственные задачи.
Чтобы максимально эффективно использовать Ray, если вы являетесь специалистом по анализу данных или инженером по машинному обучению, вам необходимо сначала понять несколько ключевых концепций. Ray состоит из нескольких компонентов.
Ray Data — это масштабируемая библиотека, разработанная для обработки данных в задачах машинного обучения и искусственного интеллекта. Она предлагает гибкие и высокопроизводительные API для задач ИИ, включая пакетный вывод, предварительную обработку данных и загрузку данных для обучения машинного обучения.
Ray Train — это гибкая, масштабируемая библиотека, разработанная для распределенного обучения и тонкой настройки моделей машинного обучения.
Ray Tune используется для настройки гиперпараметров.
Ray Serve — это масштабируемая библиотека для развертывания моделей, обеспечивающая работу API для онлайн-вывода данных.
Библиотека Ray RLlib используется для масштабируемого обучения с подкреплением.
Как видите, Рэй уделяет большое внимание большим языковым моделям и приложениям искусственного интеллекта, но есть еще один важный компонент, о котором я еще не упомянул, и именно его я буду использовать в этой статье.
Ray Core предназначен для масштабирования ресурсоемких приложений на Python общего назначения. Он разработан для распределения рабочей нагрузки Python по всем доступным ядрам независимо от того, на какой системе вы его запускаете.
В этой статье речь пойдёт исключительно о Ray Core.
В Ray Core необходимо освоить две важнейшие концепции: задачи и акторы.
Задачи представляют собой безсостоятельные рабочие процессы или сервисы, реализованные с помощью Ray путем добавления аннотаций к обычным функциям Python.
Акторы (или работники с сохранением состояния ) используются, например, когда необходимо отслеживать и поддерживать состояние зависимых переменных в распределенном кластере. Акторы реализуются путем добавления декораторов к обычным классам Python.
И акторы, и задачи определяются с помощью одного и того же декоратора `@ray .remote` . После определения эти задачи выполняются с помощью специального метода `.remote()`, предоставляемого Ray. Пример этого мы рассмотрим далее.
Настройка среды разработки
Прежде чем начать кодирование, нам следует настроить среду разработки, чтобы наши проекты были изолированными и не мешали друг другу. Для этого я буду использовать conda, но вы можете использовать любой инструмент по своему усмотрению. Я буду запускать свой код с помощью Jupyter Notebook в оболочке WSL2 Ubuntu на Windows.
$ conda create -n ray-test python=3.13 -y $ conda activate ray-test (ray-test) $ conda install ray[default]
Пример кода – подсчет простых чисел
Давайте вернемся к примеру, который я привел в начале: подсчет количества простых чисел в интервале от 10 000 000 до 20 000 000.
Мы запустим наш исходный код на Python и замерим время его выполнения.
import math, time, os def is_prime(n: int) -> bool: if n < 2: return False if n == 2: return True if n % 2 == 0: return False r = int(math.isqrt(n)) + 1 for i in range(3, r, 2): if n % i == 0: return False return True def count_primes(a: int, b: int) -> int: c = 0 for n in range(a, b): if is_prime(n): c += 1 return c if __name__ == «__main__»: A, B = 10_000_000, 20_000_000 total_cpus = os.cpu_count() or 1 # Start «chunky»; Мы можем выполнить эту операцию позже. chunks = max(4, total_cpus * 2) step = (B — A) // chunks print(f»CPUs~{total_cpus}, chunks={chunks}») t0 = time.time() results = [] for i in range(chunks): s = A + i * step e = s + step if i < chunks - 1 else B results.append(count_primes(s, e)) total = sum(results) print(f"total={total}, time={time.time() - t0:.2f}s")
А что в итоге?
Количество процессоров: 32, блоков: 64, всего: 606028, время: 31,17 с.
Можно ли улучшить это с помощью Ray? Да, следуя этому простому четырехэтапному процессу.
Шаг 1 — Инициализация Ray . Добавьте эти две строки в начало вашего кода.
import ray ray.init()
Шаг 2 — Создайте нашу удалённую функцию . Это легко. Просто добавьте декоратор `@ray.remote` к функции, которую вы хотите оптимизировать. Декорируемая функция — это та, которая выполняет наибольшую работу. В нашем примере это функция `count_primes`.
@ray.remote(num_cpus=1) def count_primes(start: int, end: int) -> int: … …
Шаг 3 — Запустите параллельные задачи. Вызовите удаленную функцию, используя директиву Ray ` .remote` .
refs.append(count_primes.remote(s, e))
Шаг 4 — Дождитесь завершения всех задач. Каждая задача в Ray возвращает ObjectRef при вызове . Это промис от Ray. Это означает, что Ray запустил задачу в удаленном режиме, и Ray вернет ее значение в какой-то момент в будущем. Мы отслеживаем все ObjectRef, возвращаемые выполняющимися задачами, с помощью функции ray.get() . Это блокирует выполнение до тех пор, пока все задачи не завершатся.
результаты = ray.get(tasks)
Давайте соберем все воедино. Как вы увидите, изменения в нашем исходном коде минимальны — добавлено всего четыре строки кода и оператор print для отображения количества узлов и ядер, на которых мы работаем.
import math import time # —————————————— # Изменение № 1 # —————————————— import ray ray.init(auto) def is_prime(n: int) -> bool: if n < 2: return False if n == 2: return True if n % 2 == 0: return False r = int(math.isqrt(n)) + 1 for i in range(3, r, 2): if n % i == 0: return False return True # ----------------------------------------- # Изменение № 2 # ----------------------------------------- @ray.remote(num_cpus=1) # цикл на чистом Python → 1 CPU на задачу def count_primes(a: int, b: int) -> int: c = 0 for n in range(a, b): if is_prime(n): c += 1 return c if __name__ == «__main__»: A, B = 10_000_000, 60_000_000 total_cpus = int(ray.cluster_resources().get(«CPU», 1)) # Начало «блокового» режима; Мы можем выполнить перебор позже. chunks = max(4, total_cpus * 2) step = (B — A) // chunks print(f»nodes={len(ray.nodes())}, CPUs~{total_cpus}, chunks={chunks}») t0 = time.time() refs = [] for i in range(chunks): s = A + i * step e = s + step if i < chunks - 1 else B # ----------------------------------------- # Изменение № 3 # ----------------------------------------- refs.append(count_primes.remote(s, e)) # ----------------------------------------- # Изменение № 4 # ----------------------------------------- total = sum(ray.get(refs)) print(f"total={total}, time={time.time() - t0:.2f}s")
Итак, стоило ли всё это того? Давайте запустим новый код и посмотрим, что получится.
2025-11-01 13:36:30,650 INFO worker.py:2004 — Запущен локальный экземпляр Ray. Просмотреть панель управления можно по адресу 127.0.0.1:8265 /home/tom/.local/lib/python3.10/site-packages/ray/_private/worker.py:2052: FutureWarning: Совет: В будущих версиях Ray Ray больше не будет переопределять переменную окружения видимых устройств акселератора, если num_gpus=0 или num_gpus=None (по умолчанию). Чтобы включить это поведение и отключить это сообщение об ошибке, установите RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0 warnings.warn( nodes=1, CPUs~32, chunks=64 total=606028, time=3.04s
Что ж, результат говорит сам за себя. Код Ray на Python работает в 10 раз быстрее , чем обычный код на Python. Неплохо, правда?
Откуда берется это увеличение скорости? Дело в том, что Ray может распределить вашу рабочую нагрузку на все ядра вашей системы. Ядро — это как мини-процессор. Когда мы запускали наш исходный код на Python, он использовал только одно ядро. Это хорошо, но если у вашего процессора больше одного ядра, а у большинства современных ПК они есть, то вы, так сказать, теряете деньги.
В моем случае процессор имеет 24 ядра, поэтому неудивительно, что мой код на Ray оказался намного быстрее, чем код без Ray.
Мониторинг заданий Ray
Ещё один важный момент: Ray значительно упрощает мониторинг выполнения заданий с помощью панели мониторинга. Обратите внимание на результат, полученный при запуске примера кода Ray:
… — Запущен локальный экземпляр Ray. Просмотреть панель управления можно по адресу 127.0.0.1:8265
Отображается локальная ссылка, потому что я запускаю это на своем настольном компьютере. Если бы вы запускали это в кластере, URL-адрес указывал бы на местоположение на головном узле кластера.
При нажатии на указанную ссылку вы должны увидеть что-то подобное:

С главного экрана вы можете детально отслеживать многие аспекты ваших программ Ray, используя ссылки меню в верхней части страницы.
Использование акторов Рэя
Ранее я упоминал, что акторы являются неотъемлемой частью основной обработки Ray. Акторы используются для координации и обмена данными между задачами Ray. Например, предположим, вы хотите установить глобальное ограничение для ВСЕХ запущенных задач, которому они должны соответствовать. Допустим, у вас есть пул рабочих задач, но вы хотите гарантировать, что одновременно могут выполняться не более пяти таких задач. Вот пример кода, который, как вам кажется, подойдет.
import math, time, os def is_prime(n: int) -> bool: if n < 2: return False if n == 2: return True if n % 2 == 0: return False r = int(math.isqrt(n)) + 1 for i in range(3, r, 2): if n % i == 0: return False return True def count_primes(a: int, b: int) -> int: c = 0 for n in range(a, b): if is_prime(n): c += 1 return c if __name__ == «__main__»: A, B = 10_000_000, 20_000_000 total_cpus = os.cpu_count() or 1 # Start «chunky»; Мы можем выполнить эту операцию позже. chunks = max(4, total_cpus * 2) step = (B — A) // chunks print(f»CPUs~{total_cpus}, chunks={chunks}») t0 = time.time() results = [] for i in range(chunks): s = A + i * step e = s + step if i < chunks - 1 else B results.append(count_primes(s, e)) total = sum(results) print(f"total={total}, time={time.time() - t0:.2f}s")
Мы использовали глобальную переменную для ограничения количества запущенных задач, и код синтаксически корректен и выполняется без ошибок. К сожалению, вы не получите ожидаемого результата. Это потому, что каждая задача Ray выполняется в собственном пространстве процессов и имеет свою собственную копию глобальной переменной. Глобальная переменная НЕ используется совместно функциями. Поэтому при запуске приведенного выше кода мы увидим примерно такой вывод:
Общее количество вызовов: 200 Планируемое значение GLOBAL_QPS: 5.0 Ожидаемое время при действительно глобальном ограничении: ~40.00 с Фактическое время с 'глобальной переменной' (не работает): 3.80 с Наблюдаемое значение QPS кластера: ~52.6 (должно было быть ~5.0)
Чтобы это исправить, мы используем актора. Напомним, что актор — это просто класс Python, помеченный атрибутом Ray. Вот код с акторами.
import time, ray ray.init(ignore_reinit_error=True, log_to_driver=False) # Это наш актор @ray.remote class GlobalPacer: «»»Сериализуйте вызовы так, чтобы скорость обработки в масштабе всего кластера была <= qps.""" def __init__(self, qps: float): self.interval = 1.0 / qps self.next_time = time.time() def acquire(self): # Ждем внутри актора, пока не сможем продолжить now = time.time() if now < self.next_time: time.sleep(self.next_time - now) # Резервируем следующий слот; защита от дрейфа self.next_time = max(self.next_time + self.interval, time.time()) return True @ray.remote def call_api_with_limit(n_calls: int, pacer): done = 0 for _ in range(n_calls): # Ожидание глобального разрешения ray.get(pacer.acquire.remote()) # имитация вызова API (без дополнительной задержки) done += 1 return done if __name__ == "__main__": NUM_WORKERS = 10 CALLS_EACH = 20 GLOBAL_QPS = 5.0 # ограничение для всего кластера total_calls = NUM_WORKERS * CALLS_EACH expected_min_time = total_calls / GLOBAL_QPS pacer = GlobalPacer.remote(GLOBAL_QPS) t0 = time.time() ray.get([call_api_with_limit.remote(CALLS_EACH, pacer) for _ in range(NUM_WORKERS)]) dt = time.time() - t0 print(f"Всего звонков: {total_calls}") print(f"Глобальное ограничение QPS: {GLOBAL_QPS}") print(f"Ожидаемое время (если ограничено {GLOBAL_QPS} QPS): ~{expected_min_time:.2f}s") print(f"Фактическое время с актором: {dt:.2f}s") print(f"Наблюдаемое кластерное QPS: ~{total_calls/dt:.1f}")
Наш код ограничителя инкапсулирован в класс (GlobalPacer) и помечен атрибутом ray.remote, что означает, что он применяется ко всем запущенным задачам. Мы можем увидеть разницу в результате, запустив обновленный код.
Общее количество вызовов: 200 Глобальное ограничение QPS: 5,0 Ожидаемое время (при ограничении до 5,0 QPS): ~40,00 с Фактическое время работы актора: 39,86 с Наблюдаемое значение QPS кластера: ~5,0
Краткое содержание
В этой статье был представлен Ray — открытый фреймворк на Python, который упрощает масштабирование ресурсоемких программ с одного ядра на несколько ядер или даже кластер с минимальными изменениями в коде.
Я кратко упомянул ключевые компоненты Ray — Ray Data, Ray Train, Ray Tune, Ray Serve и Ray Core — подчеркнув, что Ray Core идеально подходит для масштабирования ЦП общего назначения.
Я объяснил некоторые основные концепции Ray Core, такие как введение задач (параллельных функций без состояния), акторов (работников с состоянием для совместного использования состояния и координации) и ObjectRefs (обещание получения возвращаемого значения задачи в будущем).
Чтобы продемонстрировать преимущества использования Ray, я начал с простого, ресурсоемкого примера — подсчета простых чисел в заданном диапазоне — и показал, как его выполнение на одном ядре может быть медленным при использовании простой реализации на Python.
Вместо того чтобы переписывать код на другом языке или использовать сложные библиотеки для многопроцессорной обработки, Ray позволяет распараллелить рабочую нагрузку всего за четыре простых шага и всего несколько дополнительных строк кода:
- Чтобы запустить Ray, используйте ray.init().
- Добавьте к своим функциям атрибут `@ray.remote`, чтобы превратить их в параллельные задачи.
- Метод `.remote()` позволяет запускать задачи одновременно, и
- ray.get() используется для сбора результатов выполнения задачи.
Этот подход позволил сократить время выполнения примера подсчета простых чисел с ~30 секунд до ~3 секунд на 24-ядерном компьютере.
Я также упомянул, насколько легко отслеживать выполняющиеся задания в Ray с помощью встроенной панели управления, и показал, как получить к ней доступ.
В заключение я привёл пример использования актора Ray , показав, почему глобальные переменные не подходят для координации между задачами, поскольку у каждого рабочего процесса есть собственное пространство в памяти.
Во второй части этой серии мы рассмотрим, как вывести задачу на новый уровень, позволив заданиям Ray использовать еще больше вычислительной мощности по мере масштабирования до крупных многоузловых серверов в облаке через Amazon Web Services.
Источник: towardsdatascience.com



























