Προγραμματισμός

Κατασκευασμένο για πραγματικό χρόνο: Μεγάλη ανταλλαγή δεδομένων με το Apache Kafka, Μέρος 1

Όταν ξεκίνησε η μεγάλη μετακίνηση δεδομένων επικεντρώθηκε κυρίως στην επεξεργασία παρτίδας. Τα κατανεμημένα εργαλεία αποθήκευσης και αναζήτησης δεδομένων όπως το MapReduce, το Hive και το Pig σχεδιάστηκαν για να επεξεργάζονται δεδομένα σε παρτίδες και όχι συνεχώς. Οι επιχειρήσεις εκτελούσαν πολλές εργασίες κάθε βράδυ για να εξαγάγουν δεδομένα από μια βάση δεδομένων, μετά να αναλύσουν, να μετασχηματίσουν και τελικά να αποθηκεύσουν τα δεδομένα. Πιο πρόσφατα οι επιχειρήσεις ανακάλυψαν τη δύναμη της ανάλυσης και της επεξεργασίας δεδομένων και συμβάντων καθώς συμβαίνουν, όχι μόνο μία φορά κάθε λίγες ώρες. Ωστόσο, τα περισσότερα παραδοσιακά συστήματα ανταλλαγής μηνυμάτων δεν αυξάνονται για να χειρίζονται μεγάλα δεδομένα σε πραγματικό χρόνο. Έτσι, οι μηχανικοί στο LinkedIn δημιούργησαν το Apache Kafka ανοιχτού κώδικα: ένα κατανεμημένο πλαίσιο ανταλλαγής μηνυμάτων που ανταποκρίνεται στις απαιτήσεις μεγάλων δεδομένων με κλιμάκωση σε υλικό εμπορευμάτων.

Τα τελευταία χρόνια, η Apache Kafka εμφανίστηκε για να λύσει μια ποικιλία περιπτώσεων χρήσης. Στην απλούστερη περίπτωση, θα μπορούσε να είναι ένα απλό buffer για την αποθήκευση αρχείων καταγραφής εφαρμογών. Σε συνδυασμό με μια τεχνολογία όπως το Spark Streaming, μπορεί να χρησιμοποιηθεί για την παρακολούθηση αλλαγών δεδομένων και την ανάληψη δράσης σε αυτά τα δεδομένα προτού τα αποθηκεύσετε στον τελικό προορισμό. Η προγνωστική λειτουργία της Kafka το καθιστά ένα ισχυρό εργαλείο για την ανίχνευση απάτης, όπως ο έλεγχος της εγκυρότητας μιας συναλλαγής με πιστωτική κάρτα όταν συμβαίνει και χωρίς να περιμένετε ώρες επεξεργασίας παρτίδας αργότερα.

Αυτό το σεμινάριο δύο μερών παρουσιάζει το Kafka, ξεκινώντας από τον τρόπο εγκατάστασης και εκτέλεσης του στο περιβάλλον ανάπτυξης. Θα λάβετε μια επισκόπηση της αρχιτεκτονικής του Kafka, ακολουθούμενη από μια εισαγωγή για την ανάπτυξη ενός συστήματος ανταλλαγής μηνυμάτων Apache Kafka. Τέλος, θα δημιουργήσετε μια προσαρμοσμένη εφαρμογή παραγωγού / καταναλωτή που στέλνει και καταναλώνει μηνύματα μέσω διακομιστή Kafka. Στο δεύτερο μισό του σεμιναρίου θα μάθετε πώς να χωρίζετε και να ομαδοποιείτε μηνύματα και πώς να ελέγχετε ποια μηνύματα θα καταναλώνει ένας καταναλωτής της Kafka.

Τι είναι το Apache Kafka;

Το Apache Kafka είναι ένα σύστημα ανταλλαγής μηνυμάτων που έχει κατασκευαστεί για να κλιμακώσει μεγάλα δεδομένα. Παρόμοια με το Apache ActiveMQ ή το RabbitMq, το Kafka επιτρέπει σε εφαρμογές που είναι κατασκευασμένες σε διαφορετικές πλατφόρμες να επικοινωνούν μέσω ασύγχρονης μετάδοσης μηνυμάτων. Αλλά η Kafka διαφέρει από αυτά τα πιο παραδοσιακά συστήματα ανταλλαγής μηνυμάτων με βασικούς τρόπους:

  • Έχει σχεδιαστεί για κλιμάκωση οριζόντια, προσθέτοντας περισσότερους διακομιστές εμπορευμάτων.
  • Παρέχει πολύ υψηλότερη απόδοση τόσο για παραγωγικές όσο και για καταναλωτικές διαδικασίες.
  • Μπορεί να χρησιμοποιηθεί για να υποστηρίξει και τις περιπτώσεις χρήσης παρτίδας και σε πραγματικό χρόνο.
  • Δεν υποστηρίζει JMS, API μεσαίου λογισμικού Java.

Η αρχιτεκτονική του Apache Kafka

Πριν εξερευνήσουμε την αρχιτεκτονική της Kafka, πρέπει να γνωρίζετε τη βασική της ορολογία:

  • ΕΝΑ παραγωγός είναι μια διαδικασία που μπορεί να δημοσιεύσει ένα μήνυμα σε ένα θέμα.
  • ένα καταναλωτής είναι μια διαδικασία που μπορεί να εγγραφεί σε ένα ή περισσότερα θέματα και να καταναλώσει μηνύματα που δημοσιεύονται σε θέματα.
  • ΕΝΑ κατηγορία θέματος είναι το όνομα της ροής στην οποία δημοσιεύονται τα μηνύματα.
  • ΕΝΑ μεσίτης είναι μια διαδικασία που εκτελείται σε ένα μηχάνημα.
  • ΕΝΑ σύμπλεγμα είναι μια ομάδα μεσιτών που συνεργάζονται.

Η αρχιτεκτονική του Apache Kafka είναι πολύ απλή, η οποία μπορεί να οδηγήσει σε καλύτερη απόδοση και απόδοση σε ορισμένα συστήματα. Κάθε θέμα στην Κάφκα είναι σαν ένα απλό αρχείο καταγραφής. Όταν ένας παραγωγός δημοσιεύει ένα μήνυμα, ο διακομιστής Kafka το προσθέτει στο τέλος του αρχείου καταγραφής για το συγκεκριμένο θέμα. Ο διακομιστής εκχωρεί επίσης ένα αντισταθμίζεται, που είναι ένας αριθμός που χρησιμοποιείται για την μόνιμη αναγνώριση κάθε μηνύματος. Καθώς ο αριθμός των μηνυμάτων αυξάνεται, αυξάνεται η τιμή κάθε μετατόπισης. Για παράδειγμα, εάν ο παραγωγός δημοσιεύει τρία μηνύματα, το πρώτο μπορεί να πάρει όφσετ 1, το δεύτερο όφσετ 2 και το τρίτο όφσετ 3.

Κατά την πρώτη εκκίνηση του καταναλωτή Kafka, θα στείλει ένα αίτημα έλξης στον διακομιστή, ζητώντας να ανακτήσει τυχόν μηνύματα για ένα συγκεκριμένο θέμα με τιμή μετατόπισης μεγαλύτερη από 0. Ο διακομιστής θα ελέγξει το αρχείο καταγραφής για αυτό το θέμα και θα επιστρέψει τα τρία νέα μηνύματα . Ο καταναλωτής θα επεξεργαστεί τα μηνύματα και, στη συνέχεια, θα στείλει ένα αίτημα για μηνύματα με όφσετ πιο ψηλά από 3 και ούτω καθεξής.

Στο Kafka, ο πελάτης είναι υπεύθυνος για την απομνημόνευση του αριθμού όφσετ και την ανάκτηση μηνυμάτων. Ο διακομιστής Kafka δεν παρακολουθεί ούτε διαχειρίζεται την κατανάλωση μηνυμάτων. Από προεπιλογή, ένας διακομιστής Kafka θα διατηρήσει ένα μήνυμα για επτά ημέρες. Ένα νήμα φόντου στο διακομιστή ελέγχει και διαγράφει μηνύματα που είναι επτά ημέρες ή μεγαλύτερα. Ένας καταναλωτής μπορεί να έχει πρόσβαση στα μηνύματα όσο βρίσκονται στο διακομιστή. Μπορεί να διαβάσει ένα μήνυμα πολλές φορές, ακόμη και να διαβάσει μηνύματα με αντίστροφη σειρά απόδειξης. Αλλά εάν ο καταναλωτής δεν ανακτήσει το μήνυμα πριν τελειώσουν οι επτά ημέρες, θα χάσει αυτό το μήνυμα.

Σημεία αναφοράς Kafka

Η χρήση της παραγωγής από το LinkedIn και άλλες επιχειρήσεις έχει δείξει ότι με τη σωστή διαμόρφωση το Apache Kafka είναι σε θέση να επεξεργάζεται εκατοντάδες gigabytes δεδομένων καθημερινά. Το 2011, τρεις μηχανικοί του LinkedIn χρησιμοποίησαν δοκιμές αναφοράς για να δείξουν ότι η Kafka θα μπορούσε να επιτύχει πολύ υψηλότερη απόδοση από τα ActiveMQ και RabbitMQ.

Γρήγορη ρύθμιση και επίδειξη του Apache Kafka

Θα δημιουργήσουμε μια προσαρμοσμένη εφαρμογή σε αυτό το σεμινάριο, αλλά ας ξεκινήσουμε εγκαθιστώντας και δοκιμάζοντας μια παρουσία Kafka με έναν παραγωγό και καταναλωτή που δεν είναι διαθέσιμος.

  1. Επισκεφθείτε τη σελίδα λήψης Kafka για να εγκαταστήσετε την πιο πρόσφατη έκδοση (0,9 από αυτήν τη γραφή).
  2. Εξαγάγετε τα δυαδικά αρχεία σε ένα λογισμικό / kafka ντοσιέ. Για την τρέχουσα έκδοση είναι λογισμικό / kafka_2.11-0.9.0.0.
  3. Αλλάξτε τον τρέχοντα κατάλογό σας για να μεταβείτε στον νέο φάκελο.
  4. Ξεκινήστε τον διακομιστή Zookeeper εκτελώντας την εντολή: bin / zookeeper-server-start.sh config / zookeeper.properties.
  5. Ξεκινήστε τον διακομιστή Kafka εκτελώντας: bin / kafka-server-start.sh config / server.properties.
  6. Δημιουργήστε ένα δοκιμαστικό θέμα που μπορείτε να χρησιμοποιήσετε για τη δοκιμή: bin / kafka-topics.sh --create --zookeeper localhost: 2181 - παράγοντας-παράγοντας 1 - διαμερίσματα 1 - τοπικό javaworld.
  7. Ξεκινήστε έναν απλό καταναλωτή κονσόλας που μπορεί να καταναλώσει μηνύματα που δημοσιεύονται σε ένα συγκεκριμένο θέμα, όπως javaworld: bin / kafka-console-consumer.sh --zookeeper localhost: 2181 - τοπικό javaworld - από την αρχή.
  8. Ξεκινήστε μια απλή κονσόλα παραγωγού που μπορεί να δημοσιεύσει μηνύματα στο θέμα της δοκιμής: bin / kafka-console-producer.sh - localhost λίστα χρηστών: 9092 - τοπική javaworld.
  9. Δοκιμάστε να πληκτρολογήσετε ένα ή δύο μηνύματα στην κονσόλα παραγωγού. Τα μηνύματά σας θα πρέπει να εμφανίζονται στην κονσόλα καταναλωτών.

Παράδειγμα εφαρμογής με Apache Kafka

Έχετε δει πώς λειτουργεί η Apache Kafka έξω από το κουτί. Στη συνέχεια, ας αναπτύξουμε μια προσαρμοσμένη εφαρμογή παραγωγού / καταναλωτή. Ο παραγωγός θα ανακτήσει την είσοδο χρήστη από την κονσόλα και θα στείλει κάθε νέα γραμμή ως μήνυμα σε έναν διακομιστή Kafka. Ο καταναλωτής θα ανακτήσει μηνύματα για ένα συγκεκριμένο θέμα και θα τα εκτυπώσει στην κονσόλα. Τα συστατικά του παραγωγού και του καταναλωτή σε αυτήν την περίπτωση είναι οι δικές σας εφαρμογές kafka-console-producer.sh και kafka-console-consumer.sh.

Ας ξεκινήσουμε δημιουργώντας ένα Παραγωγός. Java τάξη. Αυτή η κλάση πελάτη περιέχει λογική για την ανάγνωση εισόδου χρήστη από την κονσόλα και την αποστολή αυτής της εισόδου ως μήνυμα στον διακομιστή Kafka.

Διαμορφώνουμε τον παραγωγό δημιουργώντας ένα αντικείμενο από το java.util.Ακίνητα τάξη και ρύθμιση των ιδιοτήτων της. Η κλάση ProducerConfig καθορίζει όλες τις διαθέσιμες ιδιότητες, αλλά οι προεπιλεγμένες τιμές του Kafka επαρκούν για τις περισσότερες χρήσεις. Για την προεπιλεγμένη διαμόρφωση χρειάζεται μόνο να ορίσουμε τρεις υποχρεωτικές ιδιότητες:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) ορίζει μια λίστα κεντρικών υπολογιστών: ζεύγη θυρών που χρησιμοποιούνται για τον καθορισμό των αρχικών συνδέσεων με το σύμπλεγμα Kakfa στο host1: port1, host2: port2, ... μορφή. Ακόμα κι αν έχουμε περισσότερους από έναν μεσίτες στο σύμπλεγμα Kafka, πρέπει μόνο να καθορίσουμε την αξία του πρώτου μεσίτη κεντρικός υπολογιστής: λιμάνι. Ο πελάτης Kafka θα χρησιμοποιήσει αυτήν την τιμή για να κάνει μια κλήση ανακάλυψης στον μεσίτη, ο οποίος θα επιστρέψει μια λίστα με όλους τους μεσίτες του συμπλέγματος. Είναι καλή ιδέα να καθορίσετε περισσότερους από έναν μεσίτες στο BOOTSTRAP_SERVERS_CONFIG, έτσι ώστε εάν αυτός ο πρώτος μεσίτης είναι εκτός λειτουργίας, ο πελάτης θα μπορεί να δοκιμάσει άλλους μεσίτες.

Ο διακομιστής Kafka αναμένει μηνύματα κλειδί byte [], τιμή byte [] μορφή. Αντί να μετατρέπουμε κάθε κλειδί και αξία, η βιβλιοθήκη από την πλευρά του πελάτη της Kafka μας επιτρέπει να χρησιμοποιούμε πιο φιλικούς τύπους όπως Σειρά και int για αποστολή μηνυμάτων. Η βιβλιοθήκη θα τα μετατρέψει στον κατάλληλο τύπο. Για παράδειγμα, η εφαρμογή δείγματος δεν έχει κλειδί για συγκεκριμένο μήνυμα, επομένως θα το χρησιμοποιήσουμε μηδενικό για το κλειδί. Για την τιμή θα χρησιμοποιήσουμε ένα Σειρά, που είναι τα δεδομένα που εισήγαγε ο χρήστης στην κονσόλα.

Για να διαμορφώσετε το κλειδί μηνύματος, ορίζουμε μια τιμή KEY_SERIALIZER_CLASS_CONFIG στο org.apache.kafka.common.serialization.ByteArraySerializer. Αυτό λειτουργεί γιατί μηδενικό δεν χρειάζεται να μετατραπεί σε ψηφιόλεξη[]. Για το τιμή μηνύματος, ορίσαμε VALUE_SERIALIZER_CLASS_CONFIG στο org.apache.kafka.common.serialization.StringSerializer, επειδή αυτή η τάξη ξέρει πώς να μετατρέψει ένα Σειρά μέσα σε ψηφιόλεξη[].

Προσαρμοσμένα αντικείμενα κλειδιού / τιμής

Παρόμοιο με StringSerializer, Η Kafka παρέχει σειριοποιητές για άλλα πρωτόγονα όπως int και μακρύς. Για να χρησιμοποιήσουμε ένα προσαρμοσμένο αντικείμενο για το κλειδί ή την τιμή μας, θα πρέπει να δημιουργήσουμε μια κλάση υλοποίησης org.apache.kafka.common.serialization.Serializer. Θα μπορούσαμε τότε να προσθέσουμε λογική για να σειριοποιήσουμε την τάξη ψηφιόλεξη[]. Θα πρέπει επίσης να χρησιμοποιήσουμε έναν αντίστοιχο deserializer στον κωδικό καταναλωτή μας.

Ο παραγωγός της Κάφκα

Αφού συμπληρώσετε το Ιδιότητες class με τις απαραίτητες ιδιότητες διαμόρφωσης, μπορούμε να το χρησιμοποιήσουμε για να δημιουργήσουμε ένα αντικείμενο Παραγωγός Kafka. Όποτε θέλουμε να στείλουμε ένα μήνυμα στον διακομιστή Kafka μετά από αυτό, θα δημιουργούμε ένα αντικείμενο Καταγραφή παραγωγού και καλέστε το Παραγωγός Kafka'μικρό στείλετε() μέθοδο με αυτήν την εγγραφή για να στείλετε το μήνυμα. ο Καταγραφή παραγωγού παίρνει δύο παραμέτρους: το όνομα του θέματος στο οποίο πρέπει να δημοσιευτεί το μήνυμα και το πραγματικό μήνυμα. Μην ξεχάσετε να καλέσετε το Producer.close () μέθοδος όταν τελειώσετε χρησιμοποιώντας τον παραγωγό:

Λίστα 1. KafkaProducer

 Δημόσιος παραγωγός κατηγορίας {ιδιωτικό στατικό σαρωτή στο; public static void main (String [] argv) ρίχνει την Εξαίρεση {if (argv.length! = 1) {System.err.println ("Παρακαλώ προσδιορίστε 1 παραμέτρους"); System.exit (-1); } String topicName = argv [0]; σε = νέο σαρωτή (System.in); System.out.println ("Εισαγάγετε μήνυμα (πληκτρολογήστε έξοδο για έξοδο)"); // Ρυθμίστε τις παραμέτρους του Properter Properties configProperties = new Properties (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); org.apache.kafka.clients.producer.Producer παραγωγός = νέο KafkaProducer (configProperties); Γραμμή συμβολοσειράς = in.nextLine (); ενώ (! line.equals ("έξοδος")) {ProducerRecord rec = new ProducerRecord (topicName, line); παραγωγός. αποστολή (rec) line = in.nextLine (); } στο.κλείσιμο (); παραγωγός.κλείσιμο (); }} 

Διαμόρφωση του καταναλωτή μηνυμάτων

Στη συνέχεια θα δημιουργήσουμε έναν απλό καταναλωτή που θα εγγραφεί σε ένα θέμα. Κάθε φορά που ένα νέο μήνυμα δημοσιεύεται στο θέμα, θα διαβάσει αυτό το μήνυμα και θα το εκτυπώσει στην κονσόλα. Ο κωδικός καταναλωτή είναι αρκετά παρόμοιος με τον κωδικό παραγωγού. Ξεκινάμε δημιουργώντας ένα αντικείμενο java.util.Ακίνητα, ορίζοντας τις ιδιότητες για συγκεκριμένους καταναλωτές και, στη συνέχεια, τη χρησιμοποίησή του για τη δημιουργία ενός νέου αντικειμένου KafkaΚαταναλωτής. Η κλάση ConsumerConfig καθορίζει όλες τις ιδιότητες που μπορούμε να ορίσουμε. Υπάρχουν μόνο τέσσερις υποχρεωτικές ιδιότητες:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
  • KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (value.deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Όπως κάναμε και για την κατηγορία παραγωγών, θα χρησιμοποιήσουμε BOOTSTRAP_SERVERS_CONFIG για να ρυθμίσετε τα ζεύγη κεντρικού υπολογιστή / θύρας για την κατηγορία καταναλωτών. Αυτή η διαμόρφωση μας επιτρέπει να δημιουργήσουμε τις αρχικές συνδέσεις με το σύμπλεγμα Kakfa στο host1: port1, host2: port2, ... μορφή.

Όπως ανέφερα προηγουμένως, ο διακομιστής Kafka αναμένει μηνύματα ψηφιόλεξη[] κλειδί και ψηφιόλεξη[] μορφές τιμών, και έχει τη δική του εφαρμογή για σειριοποίηση διαφορετικών τύπων σε ψηφιόλεξη[]. Όπως κάναμε και με τον παραγωγό, από την πλευρά των καταναλωτών θα πρέπει να χρησιμοποιήσουμε ένα προσαρμοσμένο deserializer για μετατροπή ψηφιόλεξη[] πίσω στον κατάλληλο τύπο.

Στην περίπτωση του παραδείγματος εφαρμογής, γνωρίζουμε ότι ο παραγωγός χρησιμοποιεί ByteArraySerializer για το κλειδί και StringSerializer για την τιμή. Επομένως, από την πλευρά του πελάτη πρέπει να χρησιμοποιήσουμε org.apache.kafka.common.serialization.ByteArrayDeserializer για το κλειδί και org.apache.kafka.common.serialization.StringDeserializer για την τιμή. Ορισμός αυτών των τάξεων ως τιμών για KEY_DESERIALIZER_CLASS_CONFIG και VALUE_DESERIALIZER_CLASS_CONFIG θα επιτρέψει στον καταναλωτή να αποστραγγίσει ψηφιόλεξη[] κωδικοποιημένοι τύποι που αποστέλλονται από τον παραγωγό.

Τέλος, πρέπει να ορίσουμε την τιμή του GROUP_ID_CONFIG. Αυτό πρέπει να είναι ένα όνομα ομάδας σε μορφή συμβολοσειράς. Θα εξηγήσω περισσότερα για αυτήν τη διαμόρφωση σε ένα λεπτό. Προς το παρόν, απλώς κοιτάξτε τον καταναλωτή της Kafka με τα τέσσερα υποχρεωτικά σετ ιδιοτήτων:

$config[zx-auto] not found$config[zx-overlay] not found