Библиотеки Python Часть 2. Практическое применение. Джейд Картер

Читать онлайн.
Название Библиотеки Python Часть 2. Практическое применение
Автор произведения Джейд Картер
Жанр
Серия
Издательство
Год выпуска 2025
isbn



Скачать книгу

Преобразуем сообщение из Kafka в Python-объект

      event = json.loads(msg.value().decode('utf-8'))

      # Фильтруем события с URL, содержащими "product"

      if 'product' in event['url']:

      print(f"Фильтруем событие: {event}")

      produce_filtered_event(event)

      except KeyboardInterrupt:

      print("Завершение работы.")

      finally:

      consumer.close()

      ```

      Объяснение:

      – Консьюмер читает события из топика `clickstream`.

      – Каждое сообщение проверяется на наличие слова "product" в поле `url`.

      – Отфильтрованные события отправляются в новый топик `filtered_clicks` через продюсера.

      Задача 2: Подсчет количества событий в реальном времени

      Описание:

      Топик `log_events` содержит логи системы. Каждое сообщение содержит:

      – `log_level` (например, "INFO", "ERROR", "DEBUG").

      – `message` (текст лога).

      Ваша задача: написать программу, которая считает количество событий уровня "ERROR" в реальном времени и каждые 10 секунд выводит их общее количество.

      Решение:

      ```python

      from confluent_kafka import Consumer

      import time

      # Настройки Kafka

      broker = 'localhost:9092'

      # Создание консьюмера

      consumer = Consumer({

      'bootstrap.servers': broker,

      'group.id': 'log-group',

      'auto.offset.reset': 'earliest'

      })

      consumer.subscribe(['log_events'])

      error_count = 0

      start_time = time.time()

      try:

      while True:

      msg = consumer.poll(1.0)

      if msg is None:

      continue

      if msg.error():

      continue

      # Преобразуем сообщение в Python-объект

      log_event = json.loads(msg.value().decode('utf-8'))

      # Увеличиваем счетчик, если уровень лога "ERROR"

      if log_event['log_level'] == 'ERROR':

      error_count += 1

      # Каждые 10 секунд выводим текущий счетчик

      if time.time() – start_time >= 10:

      print(f"Количество ошибок за последние 10 секунд: {error_count}")

      error_count = 0

      start_time = time.time()

      except KeyboardInterrupt:

      print("Завершение работы.")

      finally:

      consumer.close()

      ```

      Объяснение:

      – Консьюмер читает события из топика `log_events`.

      – Если уровень лога "ERROR", увеличивается счетчик `error_count`.

      – Каждые 10 секунд программа выводит количество событий "ERROR" и сбрасывает счетчик.

      Задача 3: Агрегация данных по группам

      Описание:

      Топик `transactions` содержит данные о финансовых транзакциях:

      – `user_id` – идентификатор пользователя.

      – `amount` – сумма транзакции.

      Ваша задача: написать программу, которая подсчитывает общую сумму транзакций для каждого пользователя и выводит результаты в реальном времени.

      Решение:

      ```python

      from confluent_kafka import Consumer

      import json

      from collections import defaultdict

      # Настройки Kafka

      broker = 'localhost:9092'

      # Создание консьюмера

      consumer = Consumer({

      'bootstrap.servers': broker,

      'group.id': 'transaction-group',

      'auto.offset.reset': 'earliest'

      })

      consumer.subscribe(['transactions'])

      # Словарь для хранения сумм по пользователям

      user_totals = defaultdict(float)

      try:

      while True:

      msg = consumer.poll(1.0)

      if msg is None:

      continue

      if msg.error():

      continue

      # Преобразуем сообщение в Python-объект

      transaction = json.loads(msg.value().decode('utf-8'))

      # Обновляем сумму для пользователя

      user_id = transaction['user_id']

      user_totals[user_id] += transaction['amount']

      # Вывод текущих сумм

      print(f"Текущая сумма транзакций по пользователям: {dict(user_totals)}")

      except