Библиотеки Python Часть 2. Практическое применение. Джейд Картер

Читать онлайн.
Название Библиотеки Python Часть 2. Практическое применение
Автор произведения Джейд Картер
Жанр
Серия
Издательство
Год выпуска 2025
isbn



Скачать книгу

Чтение данных из топика orders

      order_msg = order_consumer.poll(0.1)

      if order_msg and not order_msg.error():

      order = json.loads(order_msg.value().decode('utf-8'))

      product_id = order['product_id']

      # Объединение данных о заказе и товаре

      if product_id in product_catalog:

      product = product_catalog[product_id]

      total_price = order['quantity'] * product['price']

      print(f"Заказ {order['order_id']}: {product['name']} x {order['quantity']} = {total_price} $")

      else:

      print(f"Информация о товаре {product_id} отсутствует.")

      except KeyboardInterrupt:

      print("Завершение работы.")

      finally:

      order_consumer.close()

      product_consumer.close()

      ```

      Объяснение:

      – Данные из топика `products` кэшируются в словаре `product_catalog`.

      – При чтении заказа из топика `orders` программа объединяет данные и вычисляет итоговую стоимость.

      Задача 7: Потоковая обработка с вычислением скользящего среднего

      Описание:

      В топик `stock_prices` поступают данные о ценах акций:

      – `symbol` – тикер акции.

      – `price` – текущая цена.

      – `timestamp` – время.

      Ваша задача: вычислять скользящее среднее цены акции за последние 5 сообщений для каждого тикера.

      Решение:

      ```python

      from confluent_kafka import Consumer

      import json

      from collections import defaultdict, deque

      # Настройки Kafka

      broker = 'localhost:9092'

      # Создание консьюмера

      consumer = Consumer({

      'bootstrap.servers': broker,

      'group.id': 'stocks-group',

      'auto.offset.reset': 'earliest'

      })

      consumer.subscribe(['stock_prices'])

      # Дек для хранения последних цен по тикерам

      price_window = defaultdict(lambda: deque(maxlen=5))

      try:

      while True:

      msg = consumer.poll(1.0)

      if msg is None:

      continue

      if msg.error():

      continue

      # Преобразуем сообщение в Python-объект

      stock_data = json.loads(msg.value().decode('utf-8'))

      # Добавляем цену в окно

      symbol = stock_data['symbol']

      price_window[symbol].append(stock_data['price'])

      # Вычисляем скользящее среднее

      moving_average = sum(price_window[symbol]) / len(price_window[symbol])

      print(f"Скользящее среднее для {symbol}: {moving_average:.2f}")

      except KeyboardInterrupt:

      print("Завершение работы.")

      finally:

      consumer.close()

      ```

      Объяснение:

      – Используется `deque` для хранения последних 5 цен.

      – Скользящее среднее вычисляется как сумма значений, делённая на их количество.

      Задача 8: Генерация уведомлений

      Описание:

      В топик `user_actions` поступают данные о действиях пользователей:

      – `user_id` – идентификатор пользователя.

      – `action` – выполненное действие (например, "login", "purchase").

      Напишите программу, которая отслеживает пользователей, выполнивших вход (`login`), но не совершивших покупку (`purchase`) в течение 10 минут, и отправляет уведомление в топик `notifications`.

      Решение:

      ```python

      from confluent_kafka import Consumer, Producer

      import json

      from datetime import datetime, timedelta

      # Настройки Kafka

      broker = 'localhost:9092'

      # Создание консьюмера

      consumer = Consumer({

      'bootstrap.servers': broker,

      'group.id': 'user-actions-group',

      'auto.offset.reset': 'earliest'

      })

      producer = Producer({'bootstrap.servers': broker})

      consumer.subscribe(['user_actions'])

      # Словарь для отслеживания пользователей

      user_login_time = {}

      try:

      while True:

      msg = consumer.poll(1.0)

      if msg is None:

      continue

      if msg.error():

      continue

      # Преобразуем сообщение в Python-объект

      action = json.loads(msg.value().decode('utf-8'))

      user_id = action['user_id']

      action_type