Библиотеки Python Часть 2. Практическое применение. Джейд Картер

Читать онлайн.
Название Библиотеки Python Часть 2. Практическое применение
Автор произведения Джейд Картер
Жанр
Серия
Издательство
Год выпуска 2025
isbn



Скачать книгу

на топик

      consumer.subscribe(['orders'])

      # Чтение сообщений из Kafka

      try:

      while True:

      msg = consumer.poll(1.0) # Ожидание сообщения (1 секунда)

      if msg is None:

      continue

      if msg.error():

      if msg.error().code() == KafkaException._PARTITION_EOF:

      # Конец партиции

      continue

      else:

      print(f"Ошибка: {msg.error()}")

      break

      # Обработка сообщения

      print(f"Получено сообщение: {msg.value().decode('utf-8')}")

      except KeyboardInterrupt:

      print("Завершение работы…")

      finally:

      # Закрытие консьюмера

      consumer.close()

      ```

      В этом примере консьюмер подключается к Kafka, читает сообщения из топика `orders` и выводит их на экран.

      Потоковая обработка данных

      Kafka часто используется совместно с платформами потоковой обработки, такими как Apache Spark или Apache Flink, для анализа данных в реальном времени. Однако вы также можете обрабатывать данные прямо в Python.

      Например, предположим, что мы хотим обработать события из топика `orders` и рассчитать суммарную стоимость всех заказов:

      ```python

      from confluent_kafka import Consumer

      import json

      # Настройки консьюмера

      consumer_config = {

      'bootstrap.servers': 'localhost:9092',

      'group.id': 'order-sum-group',

      'auto.offset.reset': 'earliest'

      }

      # Создание консьюмера

      consumer = Consumer(consumer_config)

      consumer.subscribe(['orders'])

      # Суммарная стоимость заказов

      total_sales = 0

      try:

      while True:

      msg = consumer.poll(1.0)

      if msg is None:

      continue

      if msg.error():

      continue

      # Обработка сообщения

      order = json.loads(msg.value().decode('utf-8'))

      total_sales += order['price']

      print(f"Обработан заказ: {order['order_id']}, текущая сумма: {total_sales}")

      except KeyboardInterrupt:

      print(f"Общая сумма всех заказов: {total_sales}")

      finally:

      consumer.close()

      ```

      Преимущества использования Kafka

      1. Высокая производительность. Kafka поддерживает миллионы событий в секунду благодаря своей архитектуре и использованию партиций.

      2. Надежность. Данные хранятся в Kafka до тех пор, пока их не обработают все подписчики.

      3. Масштабируемость. Kafka легко масштабируется путем добавления новых брокеров.

      4. Универсальность. Kafka поддерживает интеграцию с большинством современных инструментов обработки данных.

      Apache Kafka предоставляет мощный набор инструментов для потоковой обработки данных. Используя Python, вы можете легко настроить передачу данных, их обработку и анализ в реальном времени. Это особенно полезно для систем, где требуется высокая производительность и минимальная задержка при обработке больших потоков данных.

Задачи для практики

      Задача 1: Фильтрация событий по условию

      Описание:

      У вас есть топик `clickstream`, содержащий события о кликах на веб-сайте. Каждое событие содержит следующие поля:

      – `user_id` – идентификатор пользователя.

      – `url` – URL-адрес, на который был клик.

      – `timestamp` – время клика.

      Ваша задача: создать консьюмера, который будет читать события из Kafka, фильтровать только события с URL-адресами, содержащими слово "product", и сохранять их в новый топик `filtered_clicks`.

      Решение:

      ```python

      from confluent_kafka import Producer, Consumer

      import json

      # Настройки Kafka

      broker = 'localhost:9092'

      # Создание продюсера для записи в новый топик

      producer = Producer({'bootstrap.servers':