За да завършите този урок, трябва да имате активна инсталация за Kafka на вашата машина. Прочетете Инсталиране на Apache Kafka на Ubuntu, за да знаете как да направите това.
Инсталиране на клиент на Python за Apache Kafka
Преди да започнем работа с Apache Kafka в програмата Python, трябва да инсталираме клиента на Python за Apache Kafka. Това може да стане с помощта на пип (Индекс на пакета на Python). Ето команда за постигане на това:
pip3 инсталирайте kafka-pythonТова ще бъде бърза инсталация на терминала:
Инсталиране на клиент на Python Kafka с помощта на PIP
След като имаме активна инсталация за Apache Kafka и също така инсталирахме клиента на Python Kafka, ние сме готови да започнем да кодираме.
Създаване на продуцент
Първото нещо, което трябва да публикувате съобщения в Kafka, е приложение на производител, което може да изпраща съобщения до теми в Kafka.
Имайте предвид, че производителите на Kafka са асинхронни производители на съобщения. Това означава, че операциите, извършени по време на публикуване на съобщение в дяла на Kafka Topic, не са блокиращи. За да улесним нещата, ще напишем прост JSON издател за този урок.
За начало направете екземпляр за Kafka Producer:
от kafka import KafkaProducerимпортиране json
внос pprint
производител = KafkaProducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.сметища (v).encode ('utf-8'))
Атрибутът bootstrap_servers информира за хоста и порта за сървъра Kafka. Атрибутът value_serializer е само за целите на JSON сериализацията на срещнатите JSON стойности.
За да играем с Kafka Producer, нека опитаме да отпечатаме показателите, свързани с клъстера Producer и Kafka:
метрика = производител.метрика()отпечатък.pprint (метрики)
Сега ще видим следното:
Кафка Мтерикс
Сега, нека най-накрая се опитаме да изпратим съобщение до опашката на Кафка. Един прост JSON обект ще бъде добър пример:
продуцент.изпрати ('linuxhint', 'topic': 'kafka')The linuxhint е дялът на темата, на който ще бъде изпратен JSON обектът. Когато стартирате скрипта, няма да получите изход, тъй като съобщението е изпратено до дяла на темата. Време е да напишем потребител, за да можем да тестваме нашето приложение.
Осъществяване на потребител
Сега сме готови да осъществим нова връзка като потребителско приложение и да получим съобщенията от темата Kafka. Започнете с създаването на нов екземпляр за потребителя:
от kafka import KafkaConsumerот kafka import TopicPartition
print ('Осъществяване на връзка.')
потребител = KafkaConsumer (bootstrap_servers = 'localhost: 9092')
Сега задайте тема на тази връзка и също така възможна стойност на отместването.
print ('Присвояване на тема.')консуматор.присвояване ([TopicPartition ('linuxhint', 2)])
И накрая, ние сме готови да отпечатаме mssage:
print ('Получаване на съобщение.')за съобщение в потребител:
print ("OFFSET:" + str (съобщение [0]) + "\ t MSG:" + str (съобщение))
Чрез това ще получим списък с всички публикувани съобщения в раздела за потребителски теми на Kafka. Резултатът за тази програма ще бъде:
Потребител на Kafka
Само за бърза справка, ето пълният скрипт на Producer:
от kafka import KafkaProducerимпортиране json
внос pprint
производител = KafkaProducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.сметища (v).encode ('utf-8'))
продуцент.изпрати ('linuxhint', 'topic': 'kafka')
# метрики = производител.метрика()
# pprint.pprint (метрики)
И ето пълната потребителска програма, която използвахме:
от kafka import KafkaConsumerот kafka import TopicPartition
print ('Осъществяване на връзка.')
потребител = KafkaConsumer (bootstrap_servers = 'localhost: 9092')
print ('Присвояване на тема.')
консуматор.присвояване ([TopicPartition ('linuxhint', 2)])
print ('Получаване на съобщение.')
за съобщение в потребител:
print ("OFFSET:" + str (съобщение [0]) + "\ t MSG:" + str (съобщение))
Заключение
В този урок разгледахме как можем да инсталираме и започнем да използваме Apache Kafka в нашите програми на Python. Показахме колко лесно е да изпълняваме прости задачи, свързани с Kafka в Python с демонстрирания клиент на Kafka за Python.