Лабораторная работа №10¶
Цель работы:¶
Исследовать методы оптимизации вычисления кода, используя потоки, процессы, Cython и отключение GIL на основе сравнения времени вычисления функции численного интегрирования методом прямоугольников, реализованной на чистом Python.
Пример заданий:¶
- Написать полный и содержательный docstring для функции integrate(), включая описание назначения каждого аргумента, возвращаемого значения и возможных ограничений метода. Формат docstring должен соответствовать стандартам PEP 257.
- Использовать аннотации типов для аргументов и возвращаемых значений функции, применяя синтаксис типа подсказок согласно PEP 484. Убедитесь, что аннотации соответствуют типу передаваемых данных.
- Создать два примера для проверки правильности работы функции, размещённые непосредственно в docstring функции integrate(). Один пример должен содержать простую тригонометрическую функцию, второй — полиномиальную функцию второго порядка.
- Разработать дополнительные юнит-тесты, покрывающие хотя бы две ситуации: правильный расчёт известного интеграла и проверку устойчивости к изменению числа итераций. Для оценки точности используйте assertAlmostEqual или аналогичный механизм проверки близости чисел.
- Провести оценку производительности функции с помощью модуля timeit, выполнив серию замеров времени выполнения функции для различного числа итераций. Фиксировать полученные временные характеристики и включить их в итоговый отчет.
Отрывок кода из лабораторной, демонстрирующий оптимизацию для интегрирования:¶
import concurrent.futures as ftres
from functools import partial
from typing import Callable, Union
import math
import time
import timeit
def integrate(f: Callable[[float], float],
a: Union[int, float],
b: float,
*,
n_iter: int = 10000) -> float:
"""
Вычисляет площадь под графиком функции численным интегрированием
Недостаток: плохо работает для быстро меняющихся функций с малым количеством прямоугольников
Args:
f: Функция одного вещественного аргумента, интеграл которой вычисляется.
a: Левая граница интервала интегрирования.
b: Правая граница интервала интегрирования.
n_iter: Количество прямоугольников для аппроксимации интеграла.
Returns:
Приближенное значение определенного интеграла.
Examples:
>>> import math
>>> result = integrate(math.sin, 0, math.pi, n_iter=10000)
>>> abs(result - 2.0) < 0.01
True
>>> result = integrate(lambda x: x**2, 0, 1, n_iter=10000)
>>> abs(result - 1/3) < 0.001
True
"""
if a >= b:
raise ValueError(f"a ({a}) должно быть меньше b ({b})")
acc = 0.0
step = (b - a) / n_iter
f_local = f
a_local = a
for i in range(n_iter):
acc += f_local(a_local + i * step) * step
return acc
def integrate_async(f: Callable[[float], float],
a: Union[int, float],
b: float,
*,
n_jobs: int = 2,
n_iter: int = 1000) -> float:
"""Параллельное интегрирование с потоками."""
executor = ftres.ThreadPoolExecutor(max_workers=n_jobs)
spawn = partial(executor.submit, integrate, f, n_iter=n_iter // n_jobs)
step = (b - a) / n_jobs
fs = [spawn(a + i * step, a + (i + 1) * step) for i in range(n_jobs)]
executor.shutdown(wait=False)
return sum(f.result() for f in ftres.as_completed(fs))
def integrate_async_processes(f: Callable[[float], float],
a: Union[int, float],
b: float,
*,
n_jobs: int = 2,
n_iter: int = 1000) -> float:
"""Параллельное интегрирование с процессами."""
executor = ftres.ProcessPoolExecutor(max_workers=n_jobs)
spawn = partial(executor.submit, integrate, f, n_iter=n_iter // n_jobs)
step = (b - a) / n_jobs
fs = [spawn(a + i * step, a + (i + 1) * step) for i in range(n_jobs)]
executor.shutdown(wait=False)
return sum(f.result() for f in ftres.as_completed(fs))
def benchmark_iterations():
"""
Проводит замер времени выполнения функции integrate для различного количества итераций.
Returns:
list: Список с результатами замеров времени
"""
print("-" * 60)
print("Замеры integrate() с разным числом итераций:")
print("-" * 60)
test_cases = [
(1000, "1K итераций"),
(10000, "10K итераций"),
(100000, "100K итераций"),
(1000000, "1M итераций"),
]
results = []
for n_iter, description in test_cases:
time_taken = timeit.timeit(
lambda: integrate(math.cos, 0, math.pi, n_iter=n_iter),
number=10
) / 10
results.append(f"{description}: {time_taken:.6f} секунд")
return results
def benchmark_threads_processes():
n_jobs_list = [2, 4,6,8]
print()
print("-" * 75)
print("Сравнение потоков и процессов:")
print("-" * 75)
for n_jobs in n_jobs_list:
start = time.perf_counter()
result_threads = integrate_async(math.sin, 0, math.pi, n_jobs=n_jobs, n_iter=5000)
time_threads = time.perf_counter() - start
start = time.perf_counter()
result_processes = integrate_async_processes(math.sin, 0, math.pi, n_jobs=n_jobs, n_iter=5000)
time_processes = time.perf_counter() - start
print(f"n_jobs={n_jobs}:")
print(f" Потоки: {time_threads:.4f} сек, результат={result_threads:.6f}")
print(f" Процессы: {time_processes:.4f} сек, результат={result_processes:.6f}")
print()
if __name__ == "__main__":
benchmark_iterations()
benchmark_threads_processes()
Выводы: Сравнение потоков и процессов:¶
-
n_jobs=2: Потоки: 0.0032 сек, результат=2.000000 Процессы: 0.0809 сек, результат=2.000000
-
n_jobs=4: Потоки: 0.0010 сек, результат=2.000000 Процессы: 0.0758 сек, результат=2.000000
-
n_jobs=6: Потоки: 0.0008 сек, результат=2.000000 Процессы: 0.0853 сек, результат=2.000000
-
n_jobs=8: Потоки: 0.0015 сек, результат=2.000000 Процессы: 0.0946 сек, результат=2.000000
При численном интегрировании примитивы синхронизации (мьютексы, семафоры) не нужны, потому что: 1. Данные разделены (data parallelism): - Каждый поток работает со своим интервалом - Нет общей изменяемой структуры данных - Результаты объединяются только в конце 2. Нет состояния гонки (race condition):
- Каждый аккумулятор - локальная переменная
- Суммирование результатов происходит ПОСЛЕ вычислений
- Нет одновременной записи в одну переменную