Apache Kafka

Как да четем данни от Kafka с Python

Как да четем данни от Kafka с Python
Kafka е система за разпространение на съобщения с отворен код за изпращане на съобщението в разделени и различни теми. Стриймингът на данни в реално време може да бъде реализиран чрез използване на Kafka за получаване на данни между приложенията. Той има три основни части. Това са производител, потребител и теми. Производителят се използва за изпращане на съобщение до определена тема и всяко съобщение се прикачва с ключ. Потребителят се използва за четене на съобщение по определена тема от набора дялове. Данните, получени от производителя и съхранявани в дяловете въз основа на определена тема. Много библиотеки съществуват в python, за да създадат производител и потребител за изграждане на система за съобщения, използвайки Kafka. Как данните от Kafka могат да бъдат прочетени с помощта на python е показано в този урок.

Предпоставка

Трябва да инсталирате необходимата библиотека на python, за да четете данни от Kafka. Python3 се използва в този урок, за да напише скрипта на потребител и производител. Ако пакетът pip не е инсталиран преди във вашата операционна система Linux, тогава трябва да инсталирате pip, преди да инсталирате библиотеката Kafka за python. python3-kafka се използва в този урок за четене на данни от Kafka. Изпълнете следната команда, за да инсталирате библиотеката.

$ pip инсталирайте python3-kafka

Четене на прости текстови данни от Kafka

Различни видове данни могат да бъдат изпращани от производителя по определена тема, която може да бъде прочетена от потребителя. Как може да се изпращат и получават прости текстови данни от Kafka с помощта на производител и потребител е показано в тази част на този урок.

Създайте файл с име производител1.py със следния скрипт на python. KafkaProducer модул се импортира от библиотеката на Kafka. Списъкът на брокерите трябва да дефинира по време на инициализиране на обект на производител, за да се свърже със сървъра Kafka. Портът по подразбиране на Kafka е '9092'. Аргументът bootstrap_servers се използва за дефиниране на името на хоста с порта. 'First_Topic'е зададено като име на тема, с което текстовото съобщение ще бъде изпратено от производителя. След това, просто текстово съобщение, 'Здравей от Кафкасе изпраща с помощта на изпрати () метод на KafkaProducer към темата, 'First_Topic'.

производител1.py:

# Импортирайте KafkaProducer от библиотеката на Kafka
от kafka import KafkaProducer
# Определете сървър с порт
bootstrap_servers = ['localhost: 9092']
# Определете името на темата, където съобщението ще се публикува
topicName = 'Първа тема'
# Инициализира променливата на производителя
производител = KafkaProducer (bootstrap_servers = bootstrap_servers)
# Публикуване на текст в определена тема
продуцент.изпрати (topicName, b'Hello от kafka ... ')
# Печат на съобщение
print ("Съобщението е изпратено")

Създайте файл с име потребител1.py със следния скрипт на python. KafkaConsumer модул се импортира от библиотеката на Kafka за четене на данни от Kafka. sys тук се използва модул за прекратяване на скрипта. Едно и също име на хост и номер на порт на производителя се използват в скрипта на потребителя за четене на данни от Kafka. Името на темата на потребителя и производителя трябва да бъде същото, което е „Първа_тема'.  След това потребителският обект се инициализира с трите аргумента. Име на темата, идентификатор на група и информация за сървъра. за цикъл се използва тук за четене на текста, изпратен от производителя на Kafka.

потребител1.py:

# Импортирайте KafkaConsumer от библиотеката на Kafka
от kafka import KafkaConsumer
# Импортиране на модул sys
импортиране на sys
# Определете сървър с порт
bootstrap_servers = ['localhost: 9092']
# Определете името на темата, откъдето ще се получава съобщението
topicName = 'Първа тема'
# Инициализиране на потребителска променлива
потребител = KafkaConsumer (topicName, group_id = 'group1', bootstrap_servers =
bootstrap_servers)
# Прочетете и отпечатайте съобщение от потребителя
за съобщение в потребител:
print ("Име на темата =% s, Съобщение =% s"% (msg.тема, съобщ.стойност))
# Прекратяване на скрипта
sys.изход ()

Изход:

Изпълнете следната команда от един терминал, за да изпълните скрипта на производителя.

$ python3 производител1.py

След изпращане на съобщението ще се появи следният изход.

Изпълнете следната команда от друг терминал, за да изпълните потребителския скрипт.

$ python3 потребител1.py

Резултатът показва името на темата и текстовото съобщение, изпратено от производителя.

Четене на форматирани JSON данни от Kafka

Данните, форматирани в JSON, могат да се изпращат от производителя на Kafka и да се четат от потребителя на Kafka json модул на python. Как JSON данните могат да бъдат сериализирани и десериализирани преди изпращане и получаване на данните с помощта на модула python-kafka е показано в тази част на този урок.

Създайте python скрипт с име производител2.py със следния скрипт. Друг модул на име JSON се импортира с KafkaProducer модул тук. стойност_сериализатор аргумент се използва с bootstrap_servers аргумент тук за инициализиране на обекта на производителя на Kafka. Този аргумент показва, че JSON данните ще бъдат кодирани с помощта на 'utf-8'набор от символи по време на изпращане. След това данните, форматирани в JSON, се изпращат на имената тема JSONtopic.

производител2.py:

# Импортирайте KafkaProducer от библиотеката на Kafka
от kafka import KafkaProducer
# Импортирайте JSON модул за сериализиране на данни
импортиране json
# Инициализирайте производителската променлива и задайте параметър за JSON кодиране
производител = KafkaProducer (bootstrap_servers =
['localhost: 9092'], value_serializer = lambda v: json.сметища (v).encode ('utf-8'))
# Изпратете данни в JSON формат
продуцент.send ('JSONtopic', 'name': 'fahmida', 'email': '[имейл защитен]')
 
# Печат на съобщение
print ("Съобщение изпратено до JSONtopic")

Създайте скрипт на python с име потребител2.py със следния скрипт. KafkaConsumer, sys и JSON модулите се импортират в този скрипт. KafkaConsumer модул се използва за четене на JSON форматирани данни от Kafka. Модулът JSON се използва за декодиране на кодираните данни JSON, изпратени от производителя на Kafka. Sys модул се използва за прекратяване на скрипта. стойност_десетериализатор аргумент се използва с bootstrap_servers за да дефинира как ще се декодират JSON данни. Следващия, за цикъл се използва за отпечатване на всички потребителски записи и JSON данни, извлечени от Kafka.

потребител2.py:

# Импортирайте KafkaConsumer от библиотеката на Kafka
от kafka import KafkaConsumer
# Импортиране на модул sys
импортиране на sys
# Импортирайте json модул за сериализиране на данни
импортиране json
# Инициализирайте потребителска променлива и задайте свойство за JSON декодиране
потребител = KafkaConsumer ('JSONtopic', bootstrap_servers = ['localhost: 9092'],
value_deserializer = ламбда m: json.товари (m.decode ('utf-8')))
# Прочетете данни от kafka
за съобщение в потребител:
print ("Потребителски записи: \ n")
печат (съобщение)
print ("\ nЧетене от JSON данни \ n")
print ("Име:", съобщение [6] ['name'])
print ("Email:", съобщение [6] ['email'])
# Прекратяване на скрипта
sys.изход ()

Изход:

Изпълнете следната команда от един терминал, за да изпълните скрипта на производителя.

$ python3 производител2.py

Скриптът ще отпечата следното съобщение след изпращане на JSON данни.

Изпълнете следната команда от друг терминал, за да изпълните потребителския скрипт.

$ python3 потребител2.py

След стартиране на скрипта ще се появи следният изход.

Заключение:

Данните могат да се изпращат и получават в различни формати от Kafka с помощта на python. Данните могат също да се съхраняват в базата данни и да се извличат от базата данни с помощта на Kafka и python. Вкъщи, този урок ще помогне на потребителя на python да започне да работи с Kafka.

Бутонът на левия бутон на мишката не работи в Windows 10
Ако използвате специална мишка с вашия лаптоп или настолен компютър, но бутонът на левия бутон на мишката не работи на Windows 10/8/7 по някаква причи...
Курсорът скача или се движи произволно, докато пишете в Windows 10
Ако установите, че курсорът на вашата мишка скача или се движи самостоятелно, автоматично, произволно, докато пишете в лаптоп или компютър на Windows,...
Как да обърнете посоката на превъртане на мишката и тъчпада в Windows 10
Мишка и Тъчпадs не само правят изчисленията лесни, но и по-ефективни и отнемат по-малко време. Не можем да си представим живот без тези устройства, но...