Προγραμματισμός

Τρόπος δημιουργίας εφαρμογών συνεχούς ροής με το Apache Flink

Ο Fabian Hueske είναι μέλος και μέλος του PMC του έργου Apache Flink και συνιδρυτής της Data Artisans.

Το Apache Flink είναι ένα πλαίσιο για την εφαρμογή κρατικών εφαρμογών επεξεργασίας ροής και την εκτέλεση τους σε κλίμακα σε ένα σύμπλεγμα υπολογιστών. Σε ένα προηγούμενο άρθρο, εξετάσαμε ποια είναι η κατάσταση επεξεργασίας ροής, ποιες περιπτώσεις χρήσης αντιμετωπίζει και γιατί πρέπει να εφαρμόσετε και να εκτελέσετε τις εφαρμογές ροής με το Apache Flink.

Σε αυτό το άρθρο, θα παρουσιάσω παραδείγματα για δύο κοινές περιπτώσεις χρήσης της επεξεργασίας stateful stream και θα συζητήσω πώς μπορούν να εφαρμοστούν με το Flink. Η πρώτη περίπτωση χρήσης είναι εφαρμογές που βασίζονται σε συμβάντα, δηλαδή εφαρμογές που λαμβάνουν συνεχείς ροές συμβάντων και εφαρμόζουν κάποια επιχειρηματική λογική σε αυτά τα συμβάντα. Το δεύτερο είναι η περίπτωση χρήσης ροής αναλυτικών στοιχείων, όπου θα παρουσιάσω δύο αναλυτικά ερωτήματα που εφαρμόζονται με το SQL API του Flink, τα οποία συγκεντρώνουν δεδομένα ροής σε πραγματικό χρόνο. Εμείς στην Data Artisans παρέχουμε τον πηγαίο κώδικα όλων των παραδειγμάτων μας σε ένα δημόσιο αποθετήριο GitHub.

Πριν βυθίσουμε τις λεπτομέρειες των παραδειγμάτων, θα παρουσιάσω τη ροή συμβάντων που απορροφάται από τις εφαρμογές παραδειγμάτων και θα εξηγήσω πώς μπορείτε να εκτελέσετε τον κώδικα που παρέχουμε.

Μια ροή εκδηλώσεων με ταξί

Οι παραδειγμένες εφαρμογές μας βασίζονται σε ένα δημόσιο σύνολο δεδομένων σχετικά με τις βόλτες με ταξί που συνέβησαν στη Νέα Υόρκη το 2013. Οι διοργανωτές της Μεγάλης Πρόκλησης του 2015 DEBS (ACM International Conference on Distributed Event-based Systems) αναδιάταξαν το αρχικό σύνολο δεδομένων και το μετέτρεψαν ένα μόνο αρχείο CSV από το οποίο διαβάζουμε τα ακόλουθα εννέα πεδία.

  • Μενταγιόν — ένα αναγνωριστικό αθροίσματος MD5 του ταξί
  • Hack_license — ένα αναγνωριστικό αθροίσματος MD5 της άδειας ταξί
  • Pickup_datetime - τη στιγμή που οι επιβάτες παραλήφθηκαν
  • Dropoff_datetime - τη στιγμή που οι επιβάτες έπεσαν
  • Pickup_longitude — το μήκος της τοποθεσίας παραλαβής
  • Pickup_latitude — το πλάτος της θέσης παραλαβής
  • Dropoff_longitude — το γεωγραφικό μήκος της τοποθεσίας απόρριψης
  • Dropoff_latitude — το γεωγραφικό πλάτος της τοποθεσίας απόρριψης
  • Total_amount — σύνολο που καταβάλλεται σε δολάρια

Το αρχείο CSV αποθηκεύει τις εγγραφές με αύξουσα σειρά του χαρακτηριστικού χρόνου αποχώρησης. Ως εκ τούτου, το αρχείο μπορεί να αντιμετωπιστεί ως καταγεγραμμένο αρχείο καταγραφής συμβάντων που δημοσιεύθηκαν όταν τελείωσε ένα ταξίδι. Για να εκτελέσετε τα παραδείγματα που παρέχουμε στο GitHub, πρέπει να κατεβάσετε το σύνολο δεδομένων της πρόκλησης DEBS από το Google Drive.

Όλες οι παραδείγματα εφαρμογών διαβάζουν διαδοχικά το αρχείο CSV και το απορροφούν ως ροή εκδηλώσεων με ταξί. Από εκεί και πέρα, οι εφαρμογές επεξεργάζονται τα συμβάντα όπως και κάθε άλλη ροή, δηλαδή, όπως μια ροή που απορροφάται από ένα σύστημα εγγραφής-δημοσίευσης που βασίζεται σε αρχεία καταγραφής, όπως το Apache Kafka ή το Kinesis. Στην πραγματικότητα, η ανάγνωση ενός αρχείου (ή οποιουδήποτε άλλου τύπου δεδομένων που παραμένει) και η αντιμετώπισή του ως ροής είναι ο ακρογωνιαίος λίθος της προσέγγισης του Flink στην ενοποίηση της επεξεργασίας παρτίδας και ροής.

Εκτέλεση των παραδειγμάτων Flink

Όπως αναφέρθηκε προηγουμένως, δημοσιεύσαμε τον πηγαίο κώδικα των παραδειγμάτων εφαρμογών μας σε ένα αποθετήριο GitHub. Σας ενθαρρύνουμε να διακλαδώσετε και να κλωνοποιήσετε το αποθετήριο. Τα παραδείγματα μπορούν να εκτελεστούν εύκολα μέσα από το IDE της επιλογής σας. δεν χρειάζεται να ρυθμίσετε και να διαμορφώσετε ένα σύμπλεγμα Flink για να τα εκτελέσετε. Αρχικά, εισαγάγετε τον πηγαίο κώδικα των παραδειγμάτων ως έργο Maven. Στη συνέχεια, εκτελέστε την κύρια κλάση μιας εφαρμογής και καταχωρίστε τη θέση αποθήκευσης του αρχείου δεδομένων (δείτε παραπάνω για τον σύνδεσμο για τη λήψη των δεδομένων) ως παράμετρο προγράμματος.

Μόλις ξεκινήσετε μια εφαρμογή, θα ξεκινήσει μια τοπική, ενσωματωμένη παρουσία Flink μέσα στη διαδικασία JVM της εφαρμογής και θα υποβάλει την εφαρμογή για να την εκτελέσει. Θα δείτε μια δέσμη δηλώσεων καταγραφής ενώ το Flink ξεκινά και οι εργασίες της εργασίας προγραμματίζονται. Μόλις εκτελεστεί η εφαρμογή, η έξοδος της θα γραφτεί στην τυπική έξοδο.

Δημιουργία μιας εφαρμογής βάσει εκδήλωσης στο Flink

Τώρα, ας συζητήσουμε την πρώτη μας περίπτωση χρήσης, η οποία είναι μια εφαρμογή βάσει συμβάντων. Οι εφαρμογές βάσει συμβάντων λαμβάνουν ροές συμβάντων, εκτελούν υπολογισμούς καθώς λαμβάνονται τα συμβάντα και ενδέχεται να εκπέμπουν νέα συμβάντα ή να προκαλούν εξωτερικές ενέργειες. Πολλές εφαρμογές που βασίζονται σε συμβάντα μπορούν να συντεθούν συνδέοντάς τις μεταξύ τους μέσω συστημάτων καταγραφής συμβάντων, παρόμοια με το πόσο μεγάλα συστήματα μπορούν να αποτελούνται από μικροϋπηρεσίες. Οι εφαρμογές που βασίζονται σε συμβάντα, τα αρχεία καταγραφής συμβάντων και τα στιγμιότυπα κατάστασης εφαρμογής (γνωστά ως σημεία αποθήκευσης στο Flink) περιλαμβάνουν ένα πολύ ισχυρό σχέδιο σχεδίασης, επειδή μπορείτε να επαναφέρετε την κατάστασή τους και να επαναλάβετε την είσοδό τους για να ανακτήσετε από μια αποτυχία, να διορθώσετε ένα σφάλμα ή να μετακινήσετε ένα εφαρμογή σε διαφορετικό σύμπλεγμα.

Σε αυτό το άρθρο θα εξετάσουμε μια εφαρμογή βάσει συμβάντων που υποστηρίζει μια υπηρεσία, η οποία παρακολουθεί τις ώρες εργασίας των οδηγών ταξί. Το 2016, η Επιτροπή ταξί και λιμουζίνα της Νέας Υόρκης αποφάσισε να περιορίσει τις ώρες εργασίας των οδηγών ταξί σε βάρδιες 12 ωρών και να απαιτήσει διάλειμμα τουλάχιστον οκτώ ωρών πριν ξεκινήσει η επόμενη βάρδια. Μια αλλαγή ξεκινά με την αρχή της πρώτης διαδρομής. Από τότε, ο οδηγός μπορεί να ξεκινήσει νέες βόλτες μέσα σε ένα παράθυρο 12 ωρών. Η εφαρμογή μας παρακολουθεί τις διαδρομές των οδηγών, σηματοδοτεί την ώρα λήξης του παραθύρου των 12 ωρών (δηλ. Την ώρα που ενδέχεται να ξεκινήσουν την τελευταία διαδρομή) και επισημαίνει βόλτες που παραβίασαν τον κανονισμό. Μπορείτε να βρείτε τον πλήρη πηγαίο κώδικα αυτού του παραδείγματος στο αποθετήριο GitHub.

Η εφαρμογή μας υλοποιείται με το Flink's DataStream API και a KeyedProcessFunction. Το DataStream API είναι ένα λειτουργικό API και βασίζεται στην έννοια των δακτυλογραφημένων ροών δεδομένων. ΕΝΑ Ροή δεδομένων είναι η λογική αναπαράσταση μιας ροής γεγονότων τύπου Τ. Μια ροή υποβάλλεται σε επεξεργασία εφαρμόζοντας μια συνάρτηση σε αυτήν που παράγει μια άλλη ροή δεδομένων, πιθανώς διαφορετικού τύπου. Το Flink επεξεργάζεται τις ροές παράλληλα, διανέμοντας συμβάντα σε διαμερίσματα ροής και εφαρμόζοντας διαφορετικές παρουσίες συναρτήσεων σε κάθε διαμέρισμα.

Το παρακάτω απόσπασμα κώδικα δείχνει τη ροή υψηλού επιπέδου της εφαρμογής παρακολούθησης.

// απορρόφηση ροής ταξί.

DataStream rides = TaxiRides.getRides (env, inputPath);

Ροή δεδομένων ειδοποιήσεις = βόλτες

// ροή διαμερισμάτων από το αναγνωριστικό άδειας οδήγησης

.keyBy (r -> r.licenseId)

// παρακολουθήστε συμβάντα οδήγησης και δημιουργήστε ειδοποιήσεις

.process (νέο MonitorWorkTime ());

// εκτυπώσεις ειδοποιήσεων

notifications.print ();

Η εφαρμογή αρχίζει να απορροφά μια ροή εκδηλώσεων με ταξί. Στο παράδειγμά μας, τα συμβάντα διαβάζονται από ένα αρχείο κειμένου, αναλύονται και αποθηκεύονται Ταξί Αντικείμενα POJO. Μια πραγματική εφαρμογή τυπικά λαμβάνει τα συμβάντα από μια ουρά μηνυμάτων ή ένα αρχείο καταγραφής συμβάντων, όπως το Apache Kafka ή το Pravega. Το επόμενο βήμα είναι να πληκτρολογήσετε το Ταξί εκδηλώσεις από το άδεια ταυτότητας του οδηγού. ο keyBy Η λειτουργία χωρίζει τη ροή στο δηλωμένο πεδίο, έτσι ώστε όλα τα συμβάντα με το ίδιο κλειδί να υποβάλλονται σε επεξεργασία με την ίδια παράλληλη παρουσία της ακόλουθης συνάρτησης. Στην περίπτωσή μας, χωρίζουμε στο άδεια ταυτότητας πεδίο επειδή θέλουμε να παρακολουθούμε τον χρόνο εργασίας κάθε μεμονωμένου προγράμματος οδήγησης.

Στη συνέχεια, εφαρμόζουμε το MonitorWorkTime λειτουργία στο διαμέρισμα Ταξί εκδηλώσεις. Η λειτουργία παρακολουθεί τις διαδρομές ανά οδηγό και παρακολουθεί τις αλλαγές και τους χρόνους διακοπής. Εκπέμπει γεγονότα τύπου 2, όπου κάθε πλειάδα αντιπροσωπεύει μια ειδοποίηση που αποτελείται από το αναγνωριστικό άδειας χρήσης του προγράμματος οδήγησης και ένα μήνυμα. Τέλος, η εφαρμογή μας εκπέμπει τα μηνύματα εκτυπώνοντάς τα στην τυπική έξοδο. Μια εφαρμογή πραγματικού κόσμου θα έγραφε τις ειδοποιήσεις σε ένα εξωτερικό μήνυμα ή σύστημα αποθήκευσης, όπως το Apache Kafka, το HDFS ή ένα σύστημα βάσης δεδομένων, ή θα πυροδότησε μια εξωτερική κλήση για να τις σβήσει αμέσως.

Τώρα που έχουμε συζητήσει τη συνολική ροή της εφαρμογής, ας ρίξουμε μια ματιά στο MonitorWorkTime συνάρτηση, η οποία περιέχει το μεγαλύτερο μέρος της πραγματικής επιχειρηματικής λογικής της εφαρμογής. ο MonitorWorkTime η λειτουργία είναι μια κατάσταση KeyedProcessFunction που απορροφά Ταξί εκδηλώσεις και εκπομπές 2 εγγραφές. ο KeyedProcessFunction Η διεπαφή διαθέτει δύο μεθόδους για την επεξεργασία δεδομένων: processElement () και onTimer (). ο processElement () καλείται μέθοδος για κάθε συμβάν άφιξης. ο onTimer () Η μέθοδος καλείται όταν ενεργοποιηθεί ένα χρονόμετρο που έχει ήδη καταχωρηθεί. Το παρακάτω απόσπασμα δείχνει τον σκελετό του MonitorWorkTime λειτουργία και όλα όσα δηλώνονται εκτός των μεθόδων επεξεργασίας.

δημόσια στατική τάξη MonitorWorkTime

επεκτείνει το KeyedProcessFunction {

// σταθερές χρόνου σε χιλιοστά του δευτερολέπτου

ιδιωτικό στατικό τελικό ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; // 12 ΩΡΕΣ

ιδιωτικό στατικό τελικό μακρύ REQ_BREAK_TIME = 8 * 60 * 60 * 1000; // 8 ώρες

ιδιωτικό στατικό τελικό CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 ώρες

ιδιωτικό προσωρινό μορφοποιητή DateTimeFormatter;

// κατάσταση λαβής για αποθήκευση του χρόνου έναρξης μιας βάρδιας

ValueState shiftStart;

@Καταπατώ

ανοιχτό δημόσιο κενό (Configuration conf) {

// εγγραφή λαβής κατάστασης

shiftStart = getRuntimeContext (). getState (

νέο ValueStateDescriptor ("shiftStart", Types.LONG));

// αρχικοποίηση του χρονοδιαμορφωτή

this.formatter = DateTimeFormat.forPattern ("εεεε-ΜΜ-ηηΗΗ: mm: δδ");

  }

// processElement () και onTimer () συζητούνται λεπτομερώς παρακάτω.

}

Η συνάρτηση δηλώνει μερικές σταθερές για χρονικά διαστήματα σε χιλιοστά του δευτερολέπτου, ένα μορφοποιητή χρόνου και μια λαβή κατάστασης για την κατάσταση κλειδώματος που διαχειρίζεται ο Flink. Η διαχειριζόμενη κατάσταση ελέγχεται περιοδικά και επαναφέρεται αυτόματα σε περίπτωση βλάβης. Η κατάσταση πληκτρολογίου είναι οργανωμένη ανά κλειδί, πράγμα που σημαίνει ότι μια συνάρτηση θα διατηρήσει μία τιμή ανά λαβή και κλειδί. Στην περίπτωσή μας, το MonitorWorkTime η συντήρηση διατηρεί ένα Μακρύς τιμή για κάθε κλειδί, δηλαδή για κάθε κλειδί άδεια ταυτότητας. ο shiftStart Η κατάσταση αποθηκεύει την ώρα έναρξης της αλλαγής οδηγού. Η λαβή κατάστασης αρχικοποιείται στο Άνοιξε() μέθοδος, η οποία καλείται μία φορά πριν από την επεξεργασία του πρώτου συμβάντος.

Τώρα, ας ρίξουμε μια ματιά στο processElement () μέθοδος.

@Καταπατώ

δημόσια διαδικασία άκυρου Στοιχείο (

Βόλτα με ταξί,

Περιεχόμενο ctx,

Συλλέκτης έξω) ρίχνει την εξαίρεση {

// αναζητήστε την ώρα έναρξης της τελευταίας βάρδιας

Long startTs = shiftStart.value ();

αν (startTs == null ||

startTs <ride.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {

// αυτή είναι η πρώτη διαδρομή μιας νέας βάρδιας.

startTs = ride.pickUpTime;

shiftStart.update (startTs);

long endTs = startTs + ALLOWED_WORK_TIME;

out.collect (Tuple2.of (ride.licenseId,

"Επιτρέπεται η αποδοχή νέων επιβατών έως το" + formatter.print (endTs)));

// καταχωρήστε χρονοδιακόπτη για να καθαρίσετε την κατάσταση σε 24 ώρες

ctx.timerService (). registerEventTimeTimer (startTs + CLEAN_UP_INTERVAL);

} αλλιώς εάν (startTs <ride.pickUpTime - ALLOWED_WORK_TIME) {

// αυτή η διαδρομή ξεκίνησε μετά το τέλος του επιτρεπόμενου χρόνου εργασίας.

// παραβιάζει τους κανονισμούς!

out.collect (Tuple2.of (naik.licenseId,

"Αυτή η διαδρομή παραβίασε τους κανονισμούς για το χρόνο εργασίας."));

  }

}

ο processElement () καλείται μέθοδος για καθένα Ταξί Εκδήλωση. Πρώτον, η μέθοδος παίρνει την ώρα έναρξης της αλλαγής του προγράμματος οδήγησης από τη λαβή κατάστασης. Εάν η κατάσταση δεν περιέχει ώρα έναρξης (startTs == μηδέν) ή εάν η τελευταία αλλαγή ξεκίνησε περισσότερο από 20 ώρες (ALLOWED_WORK_TIME + REQ_BREAK_TIMEνωρίτερα από την τρέχουσα διαδρομή, η τρέχουσα διαδρομή είναι η πρώτη διαδρομή μιας νέας βάρδιας. Σε κάθε περίπτωση, η συνάρτηση ξεκινά μια νέα βάρδια ενημερώνοντας την ώρα έναρξης της αλλαγής στην ώρα έναρξης της τρέχουσας διαδρομής, εκπέμπει ένα μήνυμα στον οδηγό με την ώρα λήξης της νέας βάρδιας και καταγράφει ένα χρονόμετρο για να καθαρίσει το κατάσταση σε 24 ώρες.

Εάν η τρέχουσα διαδρομή δεν είναι η πρώτη διαδρομή μιας νέας βάρδιας, η συνάρτηση ελέγχει εάν παραβαίνει τον κανονισμό για τον χρόνο εργασίας, δηλαδή αν ξεκίνησε περισσότερες από 12 ώρες αργότερα από την έναρξη της τρέχουσας βάρδιας του οδηγού. Σε αυτήν την περίπτωση, η συνάρτηση εκπέμπει ένα μήνυμα για να ενημερώσει το πρόγραμμα οδήγησης για την παραβίαση.

ο processElement () μέθοδος του MonitorWorkTime Η συνάρτηση καταγράφει ένα χρονοδιακόπτη για να καθαρίσει την κατάσταση 24 ώρες μετά την έναρξη μιας βάρδιας. Η κατάργηση της κατάστασης που δεν απαιτείται πλέον είναι σημαντική για την αποφυγή αύξησης των μεγεθών κατάστασης λόγω της διαρροής. Ένα χρονόμετρο ενεργοποιείται όταν ο χρόνος της εφαρμογής περάσει τη χρονική σήμανση του χρονοδιακόπτη. Σε αυτό το σημείο, το onTimer () καλείται μέθοδος. Παρόμοια με την κατάσταση, τα χρονόμετρα διατηρούνται ανά πλήκτρο και η συνάρτηση τοποθετείται στο πλαίσιο του σχετικού κλειδιού πριν από το onTimer () καλείται μέθοδος. Ως εκ τούτου, όλη η πρόσβαση κατάστασης κατευθύνεται στο κλειδί που ήταν ενεργό κατά την εγγραφή του χρονοδιακόπτη.

Ας ρίξουμε μια ματιά στο onTimer () μέθοδος για MonitorWorkTime.

@Καταπατώ

δημόσιο άκυρο onTimer (

μεγάλα χρονόμετρα,

OnTimerContext ctx,

Συλλέκτης έξω) ρίχνει την εξαίρεση {

// καταργήστε την κατάσταση αλλαγής εάν δεν έχει ήδη ξεκινήσει νέα αλλαγή.

Long startTs = shiftStart.value ();

εάν (startTs == timerTs - CLEAN_UP_INTERVAL) {

shiftStart.clear ();

  }

}

ο processElement () Η μέθοδος καταγράφει τους χρονομετρητές για 24 ώρες μετά από μια αλλαγή που άρχισε να καθαρίζει την κατάσταση που δεν χρειάζεται πλέον. Η εκκαθάριση του κράτους είναι η μόνη λογική που το onTimer () εφαρμόζει μέθοδο. Όταν ενεργοποιείται ένας χρονοδιακόπτης, ελέγχουμε αν το πρόγραμμα οδήγησης ξεκίνησε μια νέα αλλαγή εν τω μεταξύ, δηλαδή αν άλλαξε ο χρόνος έναρξης της αλλαγής. Εάν δεν συμβαίνει αυτό, διαγράφουμε την κατάσταση αλλαγής για τον οδηγό.