Προγραμματισμός

Πώς να χρησιμοποιήσετε το Redis για επεξεργασία ροής σε πραγματικό χρόνο

Ο Roshan Kumar είναι ανώτερος διευθυντής προϊόντων στο Redis Labs.

Η απορρόφηση δεδομένων ροής σε πραγματικό χρόνο είναι μια κοινή απαίτηση για πολλές μεγάλες περιπτώσεις χρήσης δεδομένων. Σε τομείς όπως το IoT, το ηλεκτρονικό εμπόριο, η ασφάλεια, οι επικοινωνίες, η ψυχαγωγία, τα χρηματοοικονομικά και το λιανικό εμπόριο, όπου εξαρτώνται πολλά από την έγκαιρη και ακριβή λήψη αποφάσεων βάσει δεδομένων, η συλλογή και ανάλυση δεδομένων σε πραγματικό χρόνο είναι στην πραγματικότητα πυρήνας της επιχείρησης.

Ωστόσο, η συλλογή, αποθήκευση και επεξεργασία δεδομένων ροής σε μεγάλους όγκους και με μεγάλη ταχύτητα παρουσιάζει αρχιτεκτονικές προκλήσεις. Ένα σημαντικό πρώτο βήμα στην παροχή ανάλυσης δεδομένων σε πραγματικό χρόνο είναι να διασφαλιστεί ότι υπάρχουν επαρκείς πόροι δικτύου, υπολογισμού, αποθήκευσης και μνήμης για τη λήψη γρήγορων ροών δεδομένων. Αλλά η στοίβα λογισμικού μιας εταιρείας πρέπει να ταιριάζει με την απόδοση της φυσικής της υποδομής. Διαφορετικά, οι επιχειρήσεις θα αντιμετωπίσουν τεράστια καθυστέρηση δεδομένων ή, χειρότερα, ελλιπή ή ελλιπή δεδομένα.

Το Redis έχει γίνει μια δημοφιλής επιλογή για τέτοια σενάρια απορρόφησης δεδομένων. Μια ελαφριά πλατφόρμα βάσης δεδομένων στη μνήμη, το Redis επιτυγχάνει απόδοση σε εκατομμύρια λειτουργίες ανά δευτερόλεπτο με καθυστέρηση δευτερολέπτων, ενώ αντλεί ελάχιστους πόρους. Προσφέρει επίσης απλές υλοποιήσεις, που ενεργοποιούνται από τις πολλαπλές δομές και λειτουργίες δεδομένων.

Σε αυτό το άρθρο, θα δείξω πώς η Redis Enterprise μπορεί να λύσει κοινές προκλήσεις που σχετίζονται με την κατάποση και την επεξεργασία μεγάλων όγκων δεδομένων υψηλής ταχύτητας. Θα ακολουθήσουμε τρεις διαφορετικές προσεγγίσεις (συμπεριλαμβανομένου του κώδικα) για την επεξεργασία μιας ροής Twitter σε πραγματικό χρόνο, χρησιμοποιώντας Redis Pub / Sub, Redis Lists και Redis Sorted Sets, αντίστοιχα. Όπως θα δούμε, και οι τρεις μέθοδοι μπορούν να παίξουν ρόλο στη γρήγορη απορρόφηση δεδομένων, ανάλογα με την περίπτωση χρήσης.

Προκλήσεις στο σχεδιασμό γρήγορων λύσεων απορρόφησης δεδομένων

Η απορρόφηση δεδομένων υψηλής ταχύτητας συχνά περιλαμβάνει πολλούς διαφορετικούς τύπους πολυπλοκότητας:

  • Μεγάλοι όγκοι δεδομένων φτάνουν μερικές φορές σε εκρήξεις. Τα δεδομένα Bursty απαιτούν μια λύση που είναι σε θέση να επεξεργάζεται μεγάλους όγκους δεδομένων με ελάχιστη καθυστέρηση. Στην ιδανική περίπτωση, θα πρέπει να μπορεί να εκτελεί εκατομμύρια εγγραφές ανά δευτερόλεπτο με καθυστέρηση δευτερολέπτου, χρησιμοποιώντας ελάχιστους πόρους.
  • Δεδομένα από πολλές πηγές. Οι λύσεις απορρόφησης δεδομένων πρέπει να είναι αρκετά ευέλικτες για να χειρίζονται δεδομένα σε πολλές διαφορετικές μορφές, διατηρώντας την ταυτότητα της πηγής, εάν χρειάζεται και μετατρέπουν ή ομαλοποιούν σε πραγματικό χρόνο.
  • Δεδομένα που πρέπει να φιλτραριστούν, να αναλυθούν ή να προωθηθούν. Οι περισσότερες λύσεις απορρόφησης δεδομένων έχουν έναν ή περισσότερους συνδρομητές που καταναλώνουν τα δεδομένα. Αυτές είναι συχνά διαφορετικές εφαρμογές που λειτουργούν στις ίδιες ή διαφορετικές τοποθεσίες με ποικίλο σύνολο υποθέσεων. Σε τέτοιες περιπτώσεις, η βάση δεδομένων όχι μόνο χρειάζεται να μετασχηματίσει τα δεδομένα, αλλά επίσης να φιλτράρει ή να συγκεντρώσει ανάλογα με τις απαιτήσεις των καταναλωτικών εφαρμογών.
  • Δεδομένα που προέρχονται από γεωγραφικά κατανεμημένες πηγές. Σε αυτό το σενάριο, είναι συχνά βολικό να διανέμετε τους κόμβους συλλογής δεδομένων, τοποθετώντας τους κοντά στις πηγές. Οι ίδιοι οι κόμβοι γίνονται μέρος της λύσης γρήγορης απορρόφησης δεδομένων, για τη συλλογή, επεξεργασία, προώθηση ή αλλαγή διαδρομής δεδομένων.

Χειρισμός γρήγορης απορρόφησης δεδομένων στο Redis

Πολλές λύσεις που υποστηρίζουν τη γρήγορη απορρόφηση δεδομένων είναι πολύπλοκες, πλούσιες σε χαρακτηριστικά και υπερβολικά σχεδιασμένες για απλές απαιτήσεις. Το Redis, από την άλλη πλευρά, είναι εξαιρετικά ελαφρύ, γρήγορο και εύκολο στη χρήση. Με πελάτες διαθέσιμους σε περισσότερες από 60 γλώσσες, το Redis μπορεί εύκολα να ενσωματωθεί στις δημοφιλείς στοίβες λογισμικού.

Το Redis προσφέρει δομές δεδομένων όπως λίστες, σύνολα, ταξινομημένα σύνολα και κατακερματισμούς που προσφέρουν απλή και ευέλικτη επεξεργασία δεδομένων. Το Redis παρέχει περισσότερες από ένα εκατομμύριο λειτουργίες ανάγνωσης / εγγραφής ανά δευτερόλεπτο, με καθυστέρηση δευτερολέπτου δευτερολέπτου σε μια παρουσία cloud μεσαίου μεγέθους, καθιστώντας την εξαιρετικά αποδοτική από πόρους για μεγάλους όγκους δεδομένων. Η Redis υποστηρίζει επίσης υπηρεσίες ανταλλαγής μηνυμάτων και βιβλιοθήκες πελατών σε όλες τις δημοφιλείς γλώσσες προγραμματισμού, καθιστώντας την κατάλληλη για το συνδυασμό απορρόφησης δεδομένων υψηλής ταχύτητας και αναλυτικών στοιχείων σε πραγματικό χρόνο. Οι εντολές Redis Pub / Sub του επιτρέπουν να παίζει το ρόλο ενός μεσίτη μηνυμάτων μεταξύ εκδοτών και συνδρομητών, μια λειτουργία που χρησιμοποιείται συχνά για την αποστολή ειδοποιήσεων ή μηνυμάτων μεταξύ των διανεμημένων κόμβων δεδομένων.

Το Redis Enterprise βελτιώνει το Redis με απρόσκοπτη κλιμάκωση, διαθεσιμότητα πάντα, αυτοματοποιημένη ανάπτυξη και δυνατότητα χρήσης οικονομικής μνήμης flash ως επέκτασης RAM, έτσι ώστε η επεξεργασία μεγάλων συνόλων δεδομένων να μπορεί να επιτευχθεί οικονομικά.

Στις παρακάτω ενότητες, θα περιγράψω τον τρόπο χρήσης του Redis Enterprise για την αντιμετώπιση κοινών προκλήσεων απορρόφησης δεδομένων.

Επαναλάβετε την ταχύτητα του Twitter

Για να δείξουμε την απλότητα του Redis, θα διερευνήσουμε ένα δείγμα λύσης γρήγορης απορρόφησης δεδομένων που συγκεντρώνει μηνύματα από μια ροή Twitter. Ο στόχος αυτής της λύσης είναι να επεξεργαστείτε tweets σε πραγματικό χρόνο και να τα σπρώξετε προς τα κάτω κατά την επεξεργασία τους.

Στη συνέχεια, τα δεδομένα Twitter που απορροφούνται από τη λύση καταναλώνονται από πολλούς επεξεργαστές. Όπως φαίνεται στο Σχήμα 1, αυτό το παράδειγμα ασχολείται με δύο επεξεργαστές - τον Αγγλικό Tweet Processor και τον Influencer Processor. Κάθε επεξεργαστής φιλτράρει τα tweets και τα διαβιβάζει στα αντίστοιχα κανάλια του σε άλλους καταναλωτές. Αυτή η αλυσίδα μπορεί να φτάσει όσο απαιτείται η λύση. Ωστόσο, στο παράδειγμά μας, σταματάμε στο τρίτο επίπεδο, όπου συγκεντρώνουμε δημοφιλείς συζητήσεις μεταξύ Αγγλικών ομιλητών και κορυφαίων επιρροών.

Εργαστήρια Redis

Λάβετε υπόψη ότι χρησιμοποιούμε το παράδειγμα επεξεργασίας ροών Twitter λόγω της ταχύτητας άφιξης και απλότητας των δεδομένων. Σημειώστε επίσης ότι τα δεδομένα Twitter φτάνουν στη γρήγορη λήψη δεδομένων μέσω ενός καναλιού. Σε πολλές περιπτώσεις, όπως το IoT, ενδέχεται να υπάρχουν πολλές πηγές δεδομένων που στέλνουν δεδομένα στον κύριο δέκτη.

Υπάρχουν τρεις πιθανοί τρόποι για την εφαρμογή αυτής της λύσης χρησιμοποιώντας το Redis: απορρόφηση με Redis Pub / Sub, λήψη με τη δομή δεδομένων λίστας ή απορρόφηση με τη δομή δεδομένων του ταξινομημένου συνόλου. Ας εξετάσουμε καθεμία από αυτές τις επιλογές.

Κατάποση με Redis Pub / Sub

Αυτή είναι η απλούστερη εφαρμογή γρήγορης απορρόφησης δεδομένων. Αυτή η λύση χρησιμοποιεί τη λειτουργία Pub / Sub του Redis, η οποία επιτρέπει στις εφαρμογές να δημοσιεύουν και να εγγραφούν σε μηνύματα. Όπως φαίνεται στο σχήμα 2, κάθε στάδιο επεξεργάζεται τα δεδομένα και τα δημοσιεύει σε ένα κανάλι. Το επόμενο στάδιο εγγράφεται στο κανάλι και λαμβάνει τα μηνύματα για περαιτέρω επεξεργασία ή φιλτράρισμα.

Εργαστήρια Redis

Πλεονεκτήματα

  • Εύκολο στην εφαρμογή.
  • Λειτουργεί καλά όταν οι πηγές δεδομένων και οι επεξεργαστές κατανέμονται γεωγραφικά.

Μειονεκτήματα

  • Η λύση απαιτεί από τους εκδότες και τους συνδρομητές να είναι συνεχώς ενεργοί. Οι συνδρομητές χάνουν δεδομένα όταν σταματούν ή όταν η σύνδεση χάνεται.
  • Απαιτεί περισσότερες συνδέσεις. Ένα πρόγραμμα δεν μπορεί να δημοσιεύσει και να εγγραφεί στην ίδια σύνδεση, επομένως κάθε ενδιάμεσος επεξεργαστής δεδομένων απαιτεί δύο συνδέσεις - μία για εγγραφή και μία για δημοσίευση. Εάν εκτελείτε το Redis σε πλατφόρμα DBaaS, είναι σημαντικό να επαληθεύσετε εάν το πακέτο ή το επίπεδο υπηρεσίας σας έχει όρια στον αριθμό των συνδέσεων.

Μια σημείωση για τις συνδέσεις

Εάν περισσότεροι από ένας πελάτες εγγράφονται σε ένα κανάλι, ο Redis ωθεί τα δεδομένα σε κάθε πελάτη γραμμικά, το ένα μετά το άλλο. Τα μεγάλα ωφέλιμα φορτία δεδομένων και πολλές συνδέσεις ενδέχεται να προκαλούν καθυστέρηση μεταξύ ενός εκδότη και των συνδρομητών του. Αν και το προεπιλεγμένο σκληρό όριο για τον μέγιστο αριθμό συνδέσεων είναι 10.000, πρέπει να ελέγξετε και να αξιολογήσετε πόσες συνδέσεις είναι κατάλληλες για το ωφέλιμο φορτίο σας.

Το Redis διατηρεί ένα buffer εξόδου πελάτη για κάθε πελάτη. Τα προεπιλεγμένα όρια για το buffer εξόδου πελάτη για Pub / Sub ορίζονται ως:

πελάτης-έξοδος-buffer-όριο pubsub 32mb 8mb 60

Με αυτήν τη ρύθμιση, το Redis θα αναγκάσει τους πελάτες να αποσυνδεθούν υπό δύο συνθήκες: εάν το buffer εξόδου μεγαλώσει πέρα ​​από τα 32MB ή εάν το buffer εξόδου διατηρεί 8MB δεδομένων με συνέπεια για 60 δευτερόλεπτα.

Αυτές είναι ενδείξεις ότι οι πελάτες καταναλώνουν τα δεδομένα πιο αργά από ό, τι δημοσιεύονται. Εάν προκύψει τέτοια κατάσταση, δοκιμάστε πρώτα να βελτιστοποιήσετε τους καταναλωτές έτσι ώστε να μην προσθέτουν καθυστέρηση κατά την κατανάλωση των δεδομένων. Εάν παρατηρήσετε ότι οι πελάτες σας εξακολουθούν να αποσυνδέονται, τότε μπορείτε να αυξήσετε τα όρια για το πελάτης-έξοδος-buffer-όριο pubsub ιδιοκτησία στο redis.conf. Λάβετε υπόψη ότι τυχόν αλλαγές στις ρυθμίσεις ενδέχεται να αυξήσουν την καθυστέρηση μεταξύ του εκδότη και του συνδρομητή. Οποιεσδήποτε αλλαγές πρέπει να δοκιμαστούν και να επαληθευτούν διεξοδικά.

Σχεδιασμός κώδικα για τη λύση Redis Pub / Sub

Εργαστήρια Redis

Αυτή είναι η απλούστερη από τις τρεις λύσεις που περιγράφονται σε αυτό το έγγραφο. Εδώ είναι οι σημαντικές τάξεις Java που εφαρμόζονται για αυτήν τη λύση. Κάντε λήψη του πηγαίου κώδικα με πλήρη εφαρμογή εδώ: //github.com/redislabsdemo/IngestPubSub.

ο Συνδρομητής class είναι η βασική κατηγορία αυτού του σχεδιασμού. Κάθε Συνδρομητής το αντικείμενο διατηρεί μια νέα σύνδεση με το Redis.

Ο συνδρομητής κλάσης επεκτείνει τα εργαλεία JedisPubSub Runnable {

ιδιωτικό όνομα συμβολοσειράς;

ιδιωτικό RedisConnection conn = null;

ιδιωτικό Jedis jedis = null;

ιδιωτικός συνδρομητής StringChannel;

δημόσιος συνδρομητής (String subscriberName, String channelName) ρίχνει την εξαίρεση {

name = συνδρομητήςName;

subscriberChannel = όνομα καναλιού;

Νήμα t = νέο νήμα (αυτό);

t.start ();

       }

@Καταπατώ

δημόσια άκυρη εκτέλεση () {

δοκιμάστε{

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

ενώ (αληθινό) {

jedis.subscribe (this, this.subscriberChannel);

                      }

} αλίευση (Εξαίρεση ε) {

e.printStackTrace ();

              }

       }

@Καταπατώ

public void onMessage (String channel, String message) {

super.onMessage (κανάλι, μήνυμα);

       }

}

ο Εκδότης Η τάξη διατηρεί μια ξεχωριστή σύνδεση με το Redis για τη δημοσίευση μηνυμάτων σε ένα κανάλι.

δημόσιας τάξης Εκδότης {

RedisConnection conn = null;

Jedis jedis = null;

ιδιωτικό κανάλι String;

δημόσιος εκδότης (String channelName) ρίχνει την εξαίρεση {

channel = channelName;

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

       }

public void publish (String msg) ρίχνει την εξαίρεση {

jedis.publish (κανάλι, msg);

       }

}

ο ΑγγλικάTweetFilter, InfluencerTweetFilter, HashTagCollector, και Συλλέκτης Influencer επεκτείνονται τα φίλτρα Συνδρομητής, που τους επιτρέπει να ακούνε τα εισερχόμενα κανάλια. Εφόσον χρειάζεστε ξεχωριστές συνδέσεις Redis για εγγραφή και δημοσίευση, κάθε κατηγορία φίλτρων έχει τη δική της Επανασύνδεση αντικείμενο. Τα φίλτρα ακούνε τα νέα μηνύματα στα κανάλια τους με βρόχο. Εδώ είναι το δείγμα κώδικα του ΑγγλικάTweetFilter τάξη:

δημόσια τάξη EnglishTweetFilter επεκτείνει τον συνδρομητή

{

ιδιωτικό RedisConnection conn = null;

ιδιωτικό Jedis jedis = null;

ιδιωτικό String publisherChannel = null;

public EnglishTweetFilter (String name, String subscriberChannel, String publisherChannel) ρίχνει την εξαίρεση {

σούπερ (όνομα, συνδρομητήςChannel);

this.publisherChannel = publisherChannel;

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

       }

@Καταπατώ

public void onMessage (String subscriberChannel, String message) {

JsonParser jsonParser = νέο JsonParser ();

JsonElement jsonElement = jsonParser.parse (μήνυμα);

JsonObject jsonObject = jsonElement.getAsJsonObject ();

// φίλτρα μηνυμάτων: δημοσιεύστε μόνο αγγλικά tweets

αν (jsonObject.get ("lang")! = null &&

jsonObject.get ("lang"). getAsString (). ισούται με ("en")) {

jedis.publish (publisherChannel, μήνυμα);

              }

       }

}

ο Εκδότης Το class έχει μια μέθοδο δημοσίευσης που δημοσιεύει μηνύματα στο απαιτούμενο κανάλι.

δημόσιας τάξης Εκδότης {

.

.     

public void publish (String msg) ρίχνει την εξαίρεση {

jedis.publish (κανάλι, msg);

       }

.

}

Η κύρια τάξη διαβάζει δεδομένα από τη ροή απορρόφησης και τα δημοσιεύει στο Ολα τα δεδομένα Κανάλι. Η κύρια μέθοδος αυτής της κλάσης ξεκινά όλα τα αντικείμενα φίλτρου.

δημόσια τάξη IngestPubSub

{

.

δημόσια άκυρη εκκίνηση () ρίχνει την εξαίρεση {

       .

       .

publisher = νέος εκδότης ("AllData");

englishFilter = new EnglishTweetFilter ("Αγγλικό φίλτρο", "AllData",

"EnglishTweets");

influencerFilter = νέο InfluencerTweetFilter ("Influencer Filter",

"AllData", "InfluencerTweets");

hashtagCollector = νέο HashTagCollector ("Hashtag Collector",

"EnglishTweets");

influencerCollector = νέο InfluencerCollector ("Influencer Collector",

"InfluencerTweets");

       .

       .

}

Κατάποση με λίστες Redis

Η δομή δεδομένων λίστας στο Redis καθιστά εύκολη και απλή την εφαρμογή μιας λύσης ουράς. Σε αυτήν τη λύση, ο παραγωγός ωθεί κάθε μήνυμα στο πίσω μέρος της ουράς και ο συνδρομητής κάνει δημοσκοπήσεις στην ουρά και τραβά νέα μηνύματα από το άλλο άκρο.

Εργαστήρια Redis

Πλεονεκτήματα

  • Αυτή η μέθοδος είναι αξιόπιστη σε περιπτώσεις απώλειας σύνδεσης. Μόλις τα δεδομένα προωθηθούν στις λίστες, διατηρούνται εκεί μέχρι να το διαβάσουν οι συνδρομητές. Αυτό ισχύει ακόμη και αν οι συνδρομητές σταματήσουν ή χάσουν τη σύνδεσή τους με τον διακομιστή Redis.
  • Οι παραγωγοί και οι καταναλωτές δεν απαιτούν καμία σύνδεση μεταξύ τους.

Μειονεκτήματα

  • Μόλις ληφθούν τα δεδομένα από τη λίστα, καταργούνται και δεν μπορούν να ανακτηθούν ξανά. Εκτός αν οι καταναλωτές διατηρήσουν τα δεδομένα, θα χαθούν μόλις τα καταναλώσουν.
  • Κάθε καταναλωτής απαιτεί μια ξεχωριστή ουρά, η οποία απαιτεί την αποθήκευση πολλαπλών αντιγράφων των δεδομένων.

Σχεδιασμός κώδικα για τη λύση Redis Lists

Εργαστήρια Redis

Μπορείτε να κατεβάσετε τον πηγαίο κώδικα για τη λύση Redis Lists εδώ: //github.com/redislabsdemo/IngestList. Οι κύριες τάξεις αυτής της λύσης εξηγούνται παρακάτω.

Λίστα μηνυμάτων ενσωματώνει τη δομή δεδομένων της λίστας Redis. ο Σπρώξτε() Η μέθοδος ωθεί το νέο μήνυμα στα αριστερά της ουράς και κρότος() περιμένει ένα νέο μήνυμα από τα δεξιά, εάν η ουρά είναι κενή.

δημόσια λίστα μηνυμάτων λίστας {

προστατευμένο όνομα συμβολοσειράς = "MyList"; // Ονομα

.

.     

public void push (String msg) ρίχνει την εξαίρεση {

jedis.lpush (όνομα, μήνυμα); // Αριστερή ώθηση

       }

public String pop () ρίχνει την εξαίρεση {

επιστροφή jedis.brpop (0, όνομα) .toString ();

       }

.

.

}

Λίστα μηνυμάτων είναι μια αφηρημένη τάξη που εφαρμόζει τη λογική του ακροατή και του εκδότη. ΕΝΑ Λίστα μηνυμάτων το αντικείμενο ακούει μόνο μία λίστα, αλλά μπορεί να δημοσιεύσει σε πολλά κανάλια (Φίλτρο μηνυμάτων αντικείμενα). Αυτή η λύση απαιτεί ξεχωριστό Φίλτρο μηνυμάτων αντικείμενο για κάθε συνδρομητή κάτω από το σωλήνα.

class MessageListener υλοποιεί το Runnable {

ιδιωτικό όνομα συμβολοσειράς = null;

ιδιωτική λίστα μηνυμάτων inboundList = null;

Map outBoundMsgFilters = νέο HashMap ();

.

.     

public void registerOutBoundMessageList (MessageFilter msgFilter) {

αν (msgFilter! = null) {

if (outBoundMsgFilters.get (msgFilter.name) == null) {

outBoundMsgFilters.put (msgFilter.name, msgFilter);

                      }

              }

       }

.

.

@Καταπατώ

δημόσια άκυρη εκτέλεση () {

.

ενώ (αληθινό) {

Συμβολοσειρά msg = inboundList.pop ();

processMessage (msg);

                      }                                  

.

       }

.

προστατευμένο void pushMessage (String msg) ρίχνει την εξαίρεση {

Ορισμός outBoundMsgNames = outBoundMsgFilters.keySet ();

για (Όνομα συμβολοσειράς: outBoundMsgNames) {

MessageFilter msgList = outBoundMsgFilters.get (όνομα);

msgList.filterAndPush (msg);

              }

       }

}

Φίλτρο μηνυμάτων είναι μια τάξη γονέα που διευκολύνει το filterAndPush () μέθοδος. Καθώς τα δεδομένα ρέουν μέσω του συστήματος απορρόφησης, συχνά φιλτράρονται ή μετασχηματίζονται πριν σταλούν στο επόμενο στάδιο. Μαθήματα που επεκτείνουν το Φίλτρο μηνυμάτων η τάξη παρακάμπτει το filterAndPush () μέθοδο, και εφαρμόστε τη δική τους λογική για να ωθήσει το φιλτραρισμένο μήνυμα στην επόμενη λίστα.

δημόσια τάξη MessageFilter {

MessageList messageList = null;

.

.

public void filterAndPush (String msg) ρίχνει την εξαίρεση {

messageList.push (msg);

       }

.

.     

}

AllTweetsListener είναι ένα δείγμα υλοποίησης ενός Λίστα μηνυμάτων τάξη. Αυτό ακούει όλα τα tweets στο Ολα τα δεδομένα κανάλι και δημοσιεύει τα δεδομένα στο ΑγγλικάTweetsFilter και Φίλτρο Influencer.

δημόσια τάξη AllTweetsListener επεκτείνει το MessageListener {

.

.     

public static void main (String [] args) ρίχνει την εξαίρεση {

MessageListener allTweetsProcessor = AllTweetsListener.getInstance ();

allTweetsProcessor.registerOutBoundMessageList (νέο

EnglishTweetsFilter ("EnglishTweetsFilter", "EnglishTweets"));

allTweetsProcessor.registerOutBoundMessageList (νέο

InfluencerFilter ("InfluencerFilter", "Influencers"));

allTweetsProcessor.start ();

       }

.

.

}

ΑγγλικάTweetsFilter εκτείνεται Φίλτρο μηνυμάτων. Αυτή η τάξη εφαρμόζει λογική για να επιλέξει μόνο εκείνα τα tweets που έχουν επισημανθεί ως αγγλικά tweets. Το φίλτρο απορρίπτει τα μη αγγλικά tweets και ωθεί τα αγγλικά tweets στην επόμενη λίστα.

δημόσια τάξη EnglishTweetsFilter επεκτείνει το MessageFilter {

public EnglishTweetsFilter (String name, String listName) ρίχνει την εξαίρεση {

σούπερ (όνομα, listName);

       }

@Καταπατώ

public void filterAndPush (String message) ρίχνει την εξαίρεση {

JsonParser jsonParser = νέο JsonParser ();

JsonElement jsonElement = jsonParser.parse (μήνυμα);

JsonArray jsonArray = jsonElement.getAsJsonArray ();

JsonObject jsonObject = jsonArray.get (1) .getAsJsonObject ();

αν (jsonObject.get ("lang")! = null &&

jsonObject.get ("lang"). getAsString (). ισούται με ("en")) {

Jedis jedis = super.getJedisInstance ();

αν (jedis! = null) {

jedis.lpush (super.name, jsonObject.toString ());

                             }

              }

       }

}

$config[zx-auto] not found$config[zx-overlay] not found