Библиотеки Python Часть 2. Практическое применение. Джейд Картер

Читать онлайн.
Название Библиотеки Python Часть 2. Практическое применение
Автор произведения Джейд Картер
Жанр
Серия
Издательство
Год выпуска 2025
isbn



Скачать книгу

сортирует данные по указанному столбцу.

      – `write.csv` сохраняет результат в новом файле.

      Эти задачи демонстрируют, как использовать Dask и PySpark для работы с большими объемами данных.

      – Dask подходит для локальных задач и интеграции с Python-библиотеками.

      – PySpark эффективен для кластерной обработки данных и интеграции с экосистемой Hadoop.

      Обе библиотеки упрощают решение задач, которые сложно выполнить традиционными методами из-за ограничений памяти или мощности процессора.

1.2 Потоковая обработка данных с Apache Kafka

      Apache Kafka – это мощная платформа для обработки потоков данных в реальном времени. Она широко используется для обработки и анализа событий, поступающих из различных источников, таких как веб-серверы, базы данных, датчики IoT, системы мониторинга и многое другое. Kafka обеспечивает высокую производительность, надежность и масштабируемость, что делает её одним из лучших инструментов для потоковой обработки данных.

      В основе Apache Kafka лежат несколько ключевых компонентов:

      1. Брокеры – серверы, которые принимают, хранят и доставляют данные.

      2. Топики – логические каналы, через которые данные передаются.

      3. Продюсеры – приложения или устройства, которые отправляют данные в Kafka.

      4. Консьюмеры – приложения, которые получают данные из Kafka.

      Kafka организует поток данных в виде последовательностей сообщений. Сообщения записываются в топики и разделяются на партиции, что позволяет обрабатывать данные параллельно.

      Пример потока данных

      Представим, что у нас есть система интернет-магазина, где Kafka используется для обработки событий, таких как заказы, клики на странице, добавление товаров в корзину и платежи. Каждое из этих событий записывается в топик Kafka. Например, топик `orders` может содержать события, описывающие новые заказы.

      Установка и настройка Apache Kafka

      Перед началом работы убедитесь, что Kafka установлена. Для локальной работы используйте официальные сборки Kafka с сайта [Apache Kafka](https://kafka.apache.org/).

      1. Установите Kafka и запустите ZooKeeper, необходимый для управления брокерами.

      2. Запустите Kafka-брокер.

      3. Создайте топик с помощью команды:

      ```bash

      bin/kafka-topics.sh –create –topic orders –bootstrap-server localhost:9092 –partitions 3 –replication-factor 1

      ```

      Отправка данных в Kafka

      Теперь создадим простого продюсера на Python, который будет отправлять данные в топик `orders`. Для работы с Kafka на Python используется библиотека `confluent-kafka`. Установите её с помощью команды:

      ```bash

      pip install confluent-kafka

      ```

      Пример кода, который отправляет сообщения в топик:

      ```python

      from confluent_kafka import Producer

      import json

      import time

      # Настройки продюсера

      producer_config = {

      'bootstrap.servers': 'localhost:9092' # Адрес Kafka-брокера

      }

      # Создание продюсера

      producer = Producer(producer_config)

      # Функция для обратного вызова при успешной отправке сообщения

      def delivery_report(err, msg):

      if err is not None:

      print(f'Ошибка доставки сообщения: {err}')

      else:

      print(f'Сообщение отправлено: {msg.topic()} [{msg.partition()}]')

      # Отправка данных в Kafka

      orders = [

      {'order_id': 1, 'product': 'Laptop', 'price': 1000},

      {'order_id': 2, 'product': 'Phone', 'price': 500},

      {'order_id': 3, 'product': 'Headphones', 'price': 150}

      ]

      for order in orders:

      producer.produce(

      'orders',

      key=str(order['order_id']),

      value=json.dumps(order),

      callback=delivery_report

      )

      producer.flush()