Προγραμματισμός

Κατασκευασμένο για πραγματικό χρόνο: Μεγάλη ανταλλαγή δεδομένων με το Apache Kafka, Μέρος 2

Στο πρώτο μισό αυτής της εισαγωγής JavaWorld στην Apache Kafka, αναπτύξατε μερικές μικρής κλίμακας εφαρμογές παραγωγών / καταναλωτών χρησιμοποιώντας το Kafka. Από αυτές τις ασκήσεις θα πρέπει να είστε εξοικειωμένοι με τα βασικά στοιχεία του συστήματος ανταλλαγής μηνυμάτων Apache Kafka. Σε αυτό το δεύτερο εξάμηνο, θα μάθετε πώς να χρησιμοποιείτε διαμερίσματα για τη διανομή φόρτωσης και την κλίμακα της εφαρμογής σας οριζόντια, διαχειρίζοντας έως και εκατομμύρια μηνύματα την ημέρα. Θα μάθετε επίσης πώς η Kafka χρησιμοποιεί αντισταθμίσεις μηνυμάτων για να παρακολουθεί και να διαχειρίζεται την περίπλοκη επεξεργασία μηνυμάτων και πώς να προστατεύετε το σύστημα ανταλλαγής μηνυμάτων Apache Kafka από αποτυχία σε περίπτωση που ένας καταναλωτής πέσει. Θα αναπτύξουμε την παραδειγματική εφαρμογή από το Μέρος 1 τόσο για περιπτώσεις χρήσης δημοσίευσης-εγγραφής όσο και από σημείο σε σημείο.

Χωρίσματα στο Apache Kafka

Τα θέματα στην Κάφκα μπορούν να χωριστούν σε διαμερίσματα. Για παράδειγμα, κατά τη δημιουργία ενός θέματος που ονομάζεται Επίδειξη, μπορείτε να το ρυθμίσετε ώστε να έχει τρία διαμερίσματα. Ο διακομιστής θα δημιουργούσε τρία αρχεία καταγραφής, ένα για κάθε ένα από τα διαμερίσματα επίδειξης. Όταν ένας παραγωγός δημοσίευσε ένα μήνυμα στο θέμα, θα εκχωρούσε ένα αναγνωριστικό κατάτμησης για αυτό το μήνυμα. Στη συνέχεια, ο διακομιστής θα προσθέσει το μήνυμα στο αρχείο καταγραφής μόνο για αυτό το διαμέρισμα.

Εάν ξεκινήσατε τότε δύο καταναλωτές, ο διακομιστής ενδέχεται να εκχωρήσει τα διαμερίσματα 1 και 2 στον πρώτο καταναλωτή και το διαμέρισμα 3 στον δεύτερο καταναλωτή. Κάθε καταναλωτής θα μπορούσε να διαβάσει μόνο από τα καθορισμένα διαμερίσματα. Μπορείτε να δείτε το θέμα της επίδειξης που έχει διαμορφωθεί για τρία διαμερίσματα στο Σχήμα 1.

Για να επεκτείνετε το σενάριο, φανταστείτε ένα σύμπλεγμα Kafka με δύο μεσίτες, που στεγάζεται σε δύο μηχανήματα. Όταν χωρίσατε το θέμα επίδειξης, θα το διαμορφώσετε ώστε να έχει δύο διαμερίσματα και δύο αντίγραφα. Για αυτόν τον τύπο διαμόρφωσης, ο διακομιστής Kafka θα εκχωρήσει τα δύο διαμερίσματα στους δύο μεσίτες του συμπλέγματος σας. Κάθε μεσίτης θα ήταν ο ηγέτης για ένα από τα διαμερίσματα.

Όταν ένας παραγωγός δημοσίευσε ένα μήνυμα, θα πήγαινε στον αρχηγό των διαμερισμάτων. Ο αρχηγός θα λάβει το μήνυμα και θα το προσθέσει στο αρχείο καταγραφής του τοπικού υπολογιστή. Ο δεύτερος μεσίτης θα αναπαράγει παθητικά αυτό το αρχείο καταγραφής στο δικό του μηχάνημα. Εάν ο αρχηγός διαμερισμάτων έπεσε, ο δεύτερος μεσίτης θα γίνει ο νέος ηγέτης και θα αρχίσει να εξυπηρετεί αιτήματα πελατών. Με τον ίδιο τρόπο, όταν ένας καταναλωτής έστειλε ένα αίτημα σε ένα διαμέρισμα, αυτό το αίτημα θα πήγαινε πρώτα στον αρχηγό του διαμερίσματος, ο οποίος θα επέστρεφε τα ζητούμενα μηνύματα.

Οφέλη του διαμερίσματος

Εξετάστε τα οφέλη του διαμερίσματος ενός συστήματος ανταλλαγής μηνυμάτων που βασίζεται στο Kafka:

  1. Επεκτασιμότητα: Σε ένα σύστημα με ένα μόνο διαμέρισμα, τα μηνύματα που δημοσιεύονται σε ένα θέμα αποθηκεύονται σε ένα αρχείο καταγραφής, το οποίο υπάρχει σε ένα μόνο μηχάνημα. Ο αριθμός των μηνυμάτων για ένα θέμα πρέπει να χωράει σε ένα αρχείο καταγραφής δεσμεύσεων και το μέγεθος των αποθηκευμένων μηνυμάτων δεν μπορεί ποτέ να υπερβαίνει τον χώρο στο δίσκο αυτού του μηχανήματος. Η κατανομή ενός θέματος σάς επιτρέπει να κλιμακώσετε το σύστημά σας αποθηκεύοντας μηνύματα σε διαφορετικά μηχανήματα σε ένα σύμπλεγμα. Εάν θέλετε να αποθηκεύσετε 30 gigabytes (GB) μηνυμάτων για το θέμα της επίδειξης, για παράδειγμα, θα μπορούσατε να δημιουργήσετε ένα σύμπλεγμα Kafka τριών μηχανών, το καθένα με 10 GB χώρου στο δίσκο. Τότε θα ρυθμίσετε το θέμα ώστε να έχει τρία διαμερίσματα.
  2. Εξισορρόπηση φορτίου διακομιστή: Η ύπαρξη πολλαπλών κατατμήσεων σας επιτρέπει να διαδίδετε αιτήματα μηνυμάτων σε μεσίτες. Για παράδειγμα, εάν είχατε ένα θέμα που επεξεργάστηκε 1 εκατομμύριο μηνύματα ανά δευτερόλεπτο, θα μπορούσατε να το διαιρέσετε σε 100 διαμερίσματα και να προσθέσετε 100 μεσίτες στο σύμπλεγμα. Κάθε μεσίτης θα ήταν ο ηγέτης για ένα μόνο διαμέρισμα, υπεύθυνος για την απάντηση σε 10.000 αιτήματα πελατών ανά δευτερόλεπτο.
  3. Εξισορρόπηση φορτίου καταναλωτή: Παρόμοια με την εξισορρόπηση φορτίου διακομιστή, η φιλοξενία πολλών καταναλωτών σε διαφορετικό μηχάνημα σάς επιτρέπει να κατανείμετε το φορτίο του καταναλωτή. Ας υποθέσουμε ότι θέλετε να καταναλώσετε 1 εκατομμύριο μηνύματα ανά δευτερόλεπτο από ένα θέμα με 100 διαμερίσματα. Θα μπορούσατε να δημιουργήσετε 100 καταναλωτές και να τους τρέχετε παράλληλα. Ο διακομιστής Kafka θα εκχωρήσει ένα διαμέρισμα σε κάθε έναν από τους καταναλωτές και κάθε καταναλωτής θα επεξεργάζεται 10.000 μηνύματα παράλληλα. Δεδομένου ότι η Kafka εκχωρεί κάθε διαμέρισμα σε έναν μόνο καταναλωτή, εντός του διαμερίσματος κάθε μήνυμα θα καταναλώνεται με τη σειρά.

Δύο τρόποι κατάτμησης

Ο παραγωγός είναι υπεύθυνος για να αποφασίσει σε ποιο διαμέρισμα θα πάει ένα μήνυμα. Ο παραγωγός έχει δύο επιλογές για τον έλεγχο αυτής της ανάθεσης:

  • Προσαρμοσμένο διαμέρισμα: Μπορείτε να δημιουργήσετε μια τάξη που εφαρμόζει το org.apache.kafka.clients.producer.Partitioner διεπαφή. Αυτό το έθιμο Χωρίζων θα εφαρμόσει τη λογική της επιχείρησης για να αποφασίσει πού στέλνονται τα μηνύματα.
  • DefaultPartitioner: Εάν δεν δημιουργήσετε μια προσαρμοσμένη κλάση διαμερισμάτων, τότε από προεπιλογή το org.apache.kafka.clients.producer.internals.DefaultPartitioner θα χρησιμοποιηθεί η τάξη. Το προεπιλεγμένο διαμέρισμα είναι αρκετά καλό για τις περισσότερες περιπτώσεις, παρέχοντας τρεις επιλογές:
    1. Εγχειρίδιο: Όταν δημιουργείτε ένα Καταγραφή παραγωγού, χρησιμοποιήστε τον υπερφορτωμένο κατασκευαστή νέο ProducerRecord (topicName, partitionId, messageKey, message) για να καθορίσετε ένα αναγνωριστικό διαμερίσματος.
    2. Κατακερματισμός (ευαίσθητο στην τοποθεσία): Όταν δημιουργείτε ένα Καταγραφή παραγωγού, καθορίστε ένα μήνυμαKey, τηλεφωνώντας νέο ProducerRecord (topicName, messageKey, message). DefaultPartitioner θα χρησιμοποιήσει τον κατακερματισμό του κλειδιού για να διασφαλίσει ότι όλα τα μηνύματα για το ίδιο κλειδί πηγαίνουν στον ίδιο παραγωγό. Αυτή είναι η ευκολότερη και πιο κοινή προσέγγιση.
    3. Ψεκασμός (εξισορρόπηση τυχαίου φορτίου): Εάν δεν θέλετε να ελέγξετε τα μηνύματα κατάτμησης, απλώς καλέστε νέο ProducerRecord (topicName, message) για να δημιουργήσετε το δικό σας Καταγραφή παραγωγού. Σε αυτήν την περίπτωση, ο διαχωριστής θα στείλει μηνύματα σε όλα τα διαμερίσματα με τρόπο round-robin, διασφαλίζοντας ένα ισορροπημένο φορτίο διακομιστή.

Διαχωρισμός μιας εφαρμογής Apache Kafka

Για το απλό παράδειγμα παραγωγού / καταναλωτή στο Μέρος 1, χρησιμοποιήσαμε ένα DefaultPartitioner. Τώρα θα προσπαθήσουμε να δημιουργήσουμε ένα προσαρμοσμένο διαμέρισμα. Για αυτό το παράδειγμα, ας υποθέσουμε ότι έχουμε έναν ιστότοπο λιανικής που οι καταναλωτές μπορούν να χρησιμοποιήσουν για να παραγγείλουν προϊόντα οπουδήποτε στον κόσμο. Με βάση τη χρήση, γνωρίζουμε ότι οι περισσότεροι καταναλωτές βρίσκονται είτε στις Ηνωμένες Πολιτείες είτε στην Ινδία. Θέλουμε να χωρίσουμε την αίτησή μας για να στείλουμε παραγγελίες από τις ΗΠΑ ή την Ινδία στους αντίστοιχους καταναλωτές τους, ενώ οι παραγγελίες από οπουδήποτε αλλού θα μεταφερθούν σε έναν τρίτο καταναλωτή.

Για να ξεκινήσουμε, θα δημιουργήσουμε ένα Χωριστή που εφαρμόζει το org.apache.kafka.clients.producer.Partitioner διεπαφή. Πρέπει να εφαρμόσουμε τις ακόλουθες μεθόδους:

  1. Η Κάφκα θα τηλεφωνήσει Διαμορφώστε() όταν αρχικοποιούμε το Χωρίζων τάξη, με ένα Χάρτης ιδιοτήτων διαμόρφωσης. Αυτή η μέθοδος προετοιμάζει συναρτήσεις ειδικά για την επιχειρηματική λογική της εφαρμογής, όπως η σύνδεση σε μια βάση δεδομένων. Σε αυτήν την περίπτωση θέλουμε ένα αρκετά γενικό διαμέρισμα που χρειάζεται όνομα χώρας ως ιδιοκτησία. Στη συνέχεια μπορούμε να χρησιμοποιήσουμε configProperties.put ("partitions.0", "USA") για να χαρτογραφήσετε τη ροή των μηνυμάτων σε διαμερίσματα. Στο μέλλον μπορούμε να χρησιμοποιήσουμε αυτήν τη μορφή για να αλλάξουμε ποιες χώρες έχουν το δικό τους διαμέρισμα.
  2. ο Παραγωγός Κλήσεις API χώρισμα() μία φορά για κάθε μήνυμα. Σε αυτήν την περίπτωση θα το χρησιμοποιήσουμε για να διαβάσουμε το μήνυμα και να αναλύσουμε το όνομα της χώρας από το μήνυμα. Εάν το όνομα της χώρας βρίσκεται στο countryToPartitionMap, θα επιστρέψει partitionId αποθηκευμένο στο Χάρτης. Εάν όχι, θα κατακερματιστεί η τιμή της χώρας και θα τη χρησιμοποιήσει για να υπολογίσει σε ποιο διαμέρισμα πρέπει να πάει.
  3. Καλούμε Κλείσε() για να κλείσετε το διαμέρισμα. Η χρήση αυτής της μεθόδου διασφαλίζει ότι όλοι οι πόροι που αποκτήθηκαν κατά την προετοιμασία καθαρίζονται κατά τον τερματισμό.

Σημειώστε ότι όταν καλεί η Κάφκα Διαμορφώστε(), ο παραγωγός Kafka θα μεταβιβάσει όλες τις ιδιότητες που έχουμε διαμορφώσει για τον παραγωγό στο Χωρίζων τάξη. Είναι σημαντικό να διαβάζουμε μόνο εκείνες τις ιδιότητες που ξεκινούν με χωρίσματα., αναλύστε τα για να πάρετε το partitionIdκαι αποθηκεύστε το αναγνωριστικό στο countryToPartitionMap.

Παρακάτω είναι η προσαρμοσμένη εφαρμογή μας του Χωρίζων διεπαφή.

Λίστα 1. CountryPartitioner

 Δημόσια τάξη CountryPartitioner εφαρμόζει το Partitioner {ιδιωτικό στατικό χάρτη countryToPartitionMap; public void configigure (Map configs) {System.out.println ("Inside CountryPartitioner.configure" + configs); countryToPartitionMap = νέο HashMap (); για (Map.Entry entry: configs.entrySet ()) {if (entry.getKey (). beginWith ("partitions.")) {String keyName = entry.getKey () Τιμή συμβολοσειράς = (String) entry.getValue (); System.out.println (keyName.substring (11)); int paritionId = Integer.parseInt (keyName.substring (11)); countryToPartitionMap.put (τιμή, paritionId); }}} δημόσιο διαμέρισμα int (θέμα συμβολοσειράς, κλειδί αντικειμένου, byte [] keyBytes, τιμή αντικειμένου, byte [] valueBytes, σύμπλεγμα συμπλέγματος) {List partitions = cluster.availablePartitionsForTopic (topic); Τιμή συμβολοσειράς Str = (συμβολοσειρά) τιμή; String countryName = ((String) τιμή) .split (":") [0]; if (countryToPartitionMap.containsKey (countryName)) {// Εάν η χώρα χαρτογραφηθεί σε συγκεκριμένο διαμέρισμα επιστρέψτε την επιστροφή countryToPartitionMap.get (countryName); } αλλιώς {// Εάν καμία χώρα δεν έχει αντιστοιχιστεί σε συγκεκριμένη κατανομή διαμερισμάτων μεταξύ των υπόλοιπων διαμερισμάτων int noOfPartitions = cluster.topics (). size (); return value.hashCode ()% noOfPartitions + countryToPartitionMap.size (); }} κλείσιμο δημόσιου κενού () {}} 

ο Παραγωγός Το μάθημα στην Λίστα 2 (παρακάτω) μοιάζει πολύ με τον απλό μας παραγωγό από το Μέρος 1, με δύο αλλαγές με έντονη γραφή:

  1. Ορίζουμε μια ιδιότητα config με ένα κλειδί ίσο με την τιμή του ΠαραγωγόςConfig.PARTITIONER_CLASS_CONFIG, που ταιριάζει με το πλήρως αναγνωρισμένο όνομα του Χωριστή τάξη. Ορίσαμε επίσης όνομα χώρας προς την partitionIdχαρτογραφώντας έτσι τις ιδιότητες στις οποίες θέλουμε να περάσουμε Χωριστή.
  2. Περνάμε μια παρουσία μιας κλάσης που εφαρμόζει το org.apache.kafka.clients.producer.Callback διεπαφή ως δεύτερο επιχείρημα για το παραγωγός. αποστολή () μέθοδος. Ο πελάτης της Kafka θα το καλέσει onCompletion () μέθοδος όταν ένα μήνυμα δημοσιεύεται με επιτυχία, επισυνάπτοντας ένα Εγγραφή μεταδεδομένων αντικείμενο. Θα μπορέσουμε να χρησιμοποιήσουμε αυτό το αντικείμενο για να μάθουμε σε ποιο διαμέρισμα αποστέλλεται ένα μήνυμα, καθώς και για την αντιστάθμιση που έχει εκχωρηθεί στο δημοσιευμένο μήνυμα.

Λίστα 2. Ένας διαχωρισμένος παραγωγός

 Δημόσιος παραγωγός κατηγορίας {ιδιωτικό στατικό σαρωτή στο; public static void main (String [] argv) ρίχνει την Εξαίρεση {if (argv.length! = 1) {System.err.println ("Παρακαλώ προσδιορίστε 1 παραμέτρους"); System.exit (-1); } String topicName = argv [0]; σε = νέο σαρωτή (System.in); System.out.println ("Εισαγάγετε μήνυμα (πληκτρολογήστε έξοδο για έξοδο)"); // Ρυθμίστε τις παραμέτρους του Properter Properties configProperties = new Properties (); configProperties.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost: 9092"); configProperties.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");  configProperties.put (ProducerConfig.PARTITIONER_CLASS_CONFIG, CountryPartitioner.class.getCanonicalName ()); configProperties.put ("partition.1", "USA"); configProperties.put ("partition.2", "India");  org.apache.kafka.clients.producer.Producer παραγωγός = νέο KafkaProducer (configProperties); Γραμμή συμβολοσειράς = in.nextLine (); ενώ (! line.equals ("έξοδος")) {ProducerRecord rec = new ProducerRecord (topicName, null, line); παραγωγός. αποστολή (rec, νέο Callback () {public void onCompletion (RecordMetadata metadata, Exception εξαίρεση) {System.out.println ("Μήνυμα εστάλη στο θέμα ->" + metadata.topic () + ", parition->" + metadata.partition () + "αποθηκευμένο στο offset->" + metadata.offset ()); ; }}); line = in.nextLine (); } στο.κλείσιμο (); παραγωγός.κλείσιμο (); }} 

Ανάθεση κατατμήσεων στους καταναλωτές

Ο διακομιστής Kafka εγγυάται ότι ένα διαμέρισμα εκχωρείται σε έναν μόνο καταναλωτή, διασφαλίζοντας έτσι τη σειρά κατανάλωσης μηνυμάτων. Μπορείτε να αντιστοιχίσετε χειροκίνητα ένα διαμέρισμα ή να το αντιστοιχίσετε αυτόματα.

Εάν η λογική της επιχείρησής σας απαιτεί περισσότερο έλεγχο, τότε θα πρέπει να αντιστοιχίσετε χειροκίνητα διαμερίσματα. Σε αυτήν την περίπτωση θα χρησιμοποιούσατε KafkaConsumer.assign () για να περάσει μια λίστα κατατμήσεων που κάθε καταναλωτής ενδιαφερόταν στον διακομιστή Kakfa.

Η αυτόματη εκχώρηση κατατμήσεων είναι η προεπιλεγμένη και πιο κοινή επιλογή. Σε αυτήν την περίπτωση, ο διακομιστής Kafka θα εκχωρήσει ένα διαμέρισμα σε κάθε καταναλωτή και θα εκχωρήσει εκ νέου διαμερίσματα σε κλίμακα για νέους καταναλωτές.

Ας πούμε ότι δημιουργείτε ένα νέο θέμα με τρία διαμερίσματα. Όταν ξεκινάτε τον πρώτο καταναλωτή για το νέο θέμα, η Kafka θα εκχωρήσει και τα τρία διαμερίσματα στον ίδιο καταναλωτή. Εάν ξεκινήσετε στη συνέχεια έναν δεύτερο καταναλωτή, η Kafka θα εκχωρήσει εκ νέου όλα τα διαμερίσματα, εκχωρώντας ένα διαμέρισμα στον πρώτο καταναλωτή και τα υπόλοιπα δύο διαμερίσματα στον δεύτερο καταναλωτή. Εάν προσθέσετε έναν τρίτο καταναλωτή, η Kafka θα εκχωρήσει εκ νέου τα διαμερίσματα, έτσι ώστε σε κάθε καταναλωτή να έχει ένα μόνο διαμέρισμα. Τέλος, εάν ξεκινήσετε τον τέταρτο και τον πέμπτο καταναλωτή, τότε τρεις από τους καταναλωτές θα έχουν ένα εκχωρημένο διαμέρισμα, αλλά οι άλλοι δεν θα λάβουν μηνύματα. Εάν ένα από τα πρώτα τρία διαμερίσματα κατεβεί, η Kafka θα χρησιμοποιήσει την ίδια λογική διαμέρισης για να εκχωρήσει εκ νέου το διαμέρισμα του καταναλωτή σε έναν από τους επιπλέον καταναλωτές.