Προγραμματισμός

Java - Ανίχνευση και Χειρισμός Κλωστού Νήματος

Από τον Άλεξ. Γ. Πένεν

Αρχιτέκτονας - Δίκτυα Nokia Siemens

Μπανγκαλόρ

Κρεμαστά νήματα είναι μια κοινή πρόκληση για την ανάπτυξη λογισμικού που πρέπει να διασυνδέεται με ιδιόκτητες συσκευές χρησιμοποιώντας ιδιόκτητες ή τυποποιημένες διεπαφές όπως SNMP, Q3 ή Telnet. Αυτό το πρόβλημα δεν περιορίζεται στη διαχείριση του δικτύου, αλλά παρουσιάζεται σε ένα ευρύ φάσμα πεδίων, όπως διακομιστές ιστού, διεργασίες που πραγματοποιούν κλήσεις απομακρυσμένης διαδικασίας και ούτω καθεξής.

Ένα νήμα που ξεκινά ένα αίτημα σε μια συσκευή χρειάζεται έναν μηχανισμό για τον εντοπισμό σε περίπτωση που η συσκευή δεν αποκρίνεται ή αποκρίνεται μόνο εν μέρει. Σε ορισμένες περιπτώσεις όπου εντοπίζεται τέτοια αναστολή, πρέπει να αναληφθεί συγκεκριμένη ενέργεια. Η συγκεκριμένη ενέργεια θα μπορούσε να είναι είτε επανάληψη δοκιμής ή ενημέρωση του τελικού χρήστη σχετικά με την αποτυχία της εργασίας ή κάποια άλλη επιλογή ανάκτησης. Σε ορισμένες περιπτώσεις όπου ένας μεγάλος αριθμός εργασιών πρέπει να εκτελεστεί σε μεγάλο αριθμό στοιχείων δικτύου από ένα στοιχείο, η ανίχνευση νήματος κρέμονται είναι σημαντική, ώστε να μην αποτελεί εμπόδιο για την επεξεργασία άλλων εργασιών. Υπάρχουν λοιπόν δύο πτυχές στη διαχείριση των κρεμαστών νημάτων: εκτέλεση και Γνωστοποίηση.

Για το πτυχή ειδοποίησης μπορούμε να προσαρμόσουμε το μοτίβο Java Observer για να ταιριάζει στον κόσμο των πολλαπλών νημάτων.

Προσαρμογή του προτύπου Java Observer σε συστήματα πολλαπλών νημάτων

Λόγω αναστολής εργασιών, χρησιμοποιώντας το Java ThreadPool τάξη με μια κατάλληλη στρατηγική είναι η πρώτη λύση που έρχεται στο μυαλό. Ωστόσο, χρησιμοποιώντας το Java ThreadPool στο πλαίσιο ορισμένων νημάτων που κρέμονται τυχαία για μια χρονική περίοδο δίνει ανεπιθύμητη συμπεριφορά με βάση τη συγκεκριμένη στρατηγική που χρησιμοποιείται, όπως το λιμό νήματος στην περίπτωση της στρατηγικής σταθερού νήματος. Αυτό οφείλεται κυρίως στο γεγονός ότι η Java ThreadPool δεν διαθέτει μηχανισμό για την ανίχνευση ενός νήματος.

Θα μπορούσαμε να δοκιμάσουμε ένα Cached thread pool, αλλά έχει επίσης προβλήματα. Εάν υπάρχει υψηλός ρυθμός ενεργοποίησης εργασιών και κάποια νήματα κρέμονται, ο αριθμός των νημάτων θα μπορούσε να αυξηθεί, προκαλώντας τελικά λιμοκτονία πόρων και εξαιρέσεις εκτός μνήμης. Ή θα μπορούσαμε να χρησιμοποιήσουμε ένα Custom ThreadPool στρατηγική επίκληση α CallerRunsPolicy. Σε αυτήν την περίπτωση, επίσης, μια ανάρτηση νήματος θα μπορούσε να κάνει όλα τα νήματα να κολλήσουν τελικά. (Το κύριο νήμα δεν πρέπει ποτέ να είναι ο καλούντος, καθώς υπάρχει πιθανότητα να κολλήσει οποιαδήποτε εργασία που μεταβιβάζεται στο κύριο νήμα, προκαλώντας σταμάτημα των πάντων.)

Λοιπόν, ποια είναι η λύση; Θα δείξω ένα όχι τόσο απλό μοτίβο ThreadPool που προσαρμόζει το μέγεθος της πισίνας σύμφωνα με το ρυθμό εργασίας και με βάση τον αριθμό των νήματος που κρέμονται. Ας πάμε πρώτα στο πρόβλημα της ανίχνευσης κρεμαστών νημάτων.

Εντοπισμός Κρεμασμένων νημάτων

Το σχήμα 1 δείχνει μια αφαίρεση του μοτίβου:

Υπάρχουν δύο σημαντικές τάξεις εδώ: ThreadManager και Διαχειριζόμενο νήμα. Και οι δύο εκτείνονται από την Java Νήμα τάξη. ο ThreadManager κρατά ένα δοχείο που κρατά το Διαχειριζόμενα νήματα. Όταν ένα νέο Διαχειριζόμενο νήμα δημιουργείται προσθέτει τον εαυτό του σε αυτό το κοντέινερ.

 ThreadHangTester testthread = νέο ThreadHangTester ("threadhangertest", 2000, false); testthread.start (); thrdManger.manage (testthread, ThreadManager.RESTART_THREAD, 10); thrdManger.start (); 

ο ThreadManager επαναλαμβάνεται μέσω αυτής της λίστας και καλεί το Διαχειριζόμενο νήμα'μικρό isHung () μέθοδος. Αυτή είναι βασικά μια λογική ελέγχου χρονικής σήμανσης.

 if (System.currentTimeMillis () - lastprocessingtime.get ()> maxprocessingtime) {logger.debug ("Το νήμα είναι κρεμασμένο"); επιστροφή αληθινή? } 

Εάν διαπιστώσει ότι ένα νήμα έχει περάσει σε ένα βρόχο εργασιών και δεν έχει ενημερώσει ποτέ τα αποτελέσματά του, απαιτείται ένας μηχανισμός ανάκτησης όπως ορίζεται από το Διαχείριση νήματος.

 while (isRunning) {για (Iterator iterator = manageThreads.iterator (); iterator.hasNext ();) {ManagedThreadData thrddata = (ManagedThreadData) iterator.next (); if (thrddata.getManagedThread (). isHung ()) {logger.warn ("Εντοπίστηκε νήμα Hang για ThreadName =" + thrddata.getManagedThread (). getName ()); switch (thrddata.getManagedAction ()) {case RESTART_THREAD: // Η ενέργεια εδώ είναι να κάνετε επανεκκίνηση του νήματος // κατάργηση από το διαχειριστή iterator.remove (); // σταματήστε την επεξεργασία αυτού του νήματος, αν είναι δυνατόν, thrddata.getManagedThread (). stopProcessing (); if (thrddata.getManagedThread (). getClass () == ThreadHangTester.class) // Για να μάθετε ποιο τύπο νήματος θα δημιουργήσετε {ThreadHangTester newThread = new ThreadHangTester ("restarted_ThrdHangTest", 5000, true); // Δημιουργήστε ένα νέο νήμα newThread.start (); // προσθέστε το για να διαχειριστείτε τη διαχείριση (newThread, thrddata.getManagedAction (), thrddata.getThreadChecktime ()); } Διακοπή; ......... 

Για ένα νέο Διαχειριζόμενο νήμα για να δημιουργηθεί και να χρησιμοποιηθεί στη θέση του κρεμασμένου, δεν θα πρέπει να διατηρεί καμία κατάσταση ή κανένα δοχείο. Γι 'αυτό το δοχείο στο οποίο Διαχειριζόμενο νήμα οι πράξεις πρέπει να διαχωρίζονται. Εδώ χρησιμοποιούμε το μοτίβο Singleton που βασίζεται στο ENUM για να κρατήσουμε τη λίστα εργασιών. Έτσι, το κοντέινερ που κρατά τις εργασίες είναι ανεξάρτητο από το νήμα που επεξεργάζεται τις εργασίες. Κάντε κλικ στον παρακάτω σύνδεσμο για να κατεβάσετε την πηγή για το μοτίβο που περιγράφεται: Java Thread Manager Source.

Κρεμασμένα νήματα και στρατηγικές Java ThreadPool

Η Ιάβα ThreadPool δεν διαθέτει μηχανισμό ανίχνευσης κρεμασμένων νημάτων. Χρησιμοποιώντας μια στρατηγική όπως το σταθερό threadpool (Executors.newFixedThreadPool ()) δεν θα λειτουργήσει γιατί αν κάποιες εργασίες κρέμονται με την πάροδο του χρόνου, όλα τα νήματα θα είναι τελικά σε κατάσταση αναμονής. Μια άλλη επιλογή είναι η χρήση μιας προσωρινής αποθήκευσης πολιτικής ThreadPool (Executors.newCachedThreadPool ()). Αυτό θα μπορούσε να διασφαλίσει ότι θα υπάρχουν πάντα διαθέσιμα νήματα για την επεξεργασία μιας εργασίας, περιοριζόμενο μόνο από τα όρια μνήμης VM, CPU και νήματος. Ωστόσο, με αυτήν την πολιτική δεν υπάρχει έλεγχος του αριθμού των νημάτων που δημιουργούνται. Ανεξάρτητα από το αν ένα νήμα επεξεργασίας κρέμεται ή όχι, η χρήση αυτής της πολιτικής ενώ ο ρυθμός εργασιών είναι υψηλός οδηγεί στη δημιουργία τεράστιου αριθμού νημάτων. Εάν δεν έχετε αρκετούς πόρους για το JVM πολύ σύντομα θα φτάσετε το μέγιστο όριο μνήμης ή το υψηλό CPU. Είναι πολύ συνηθισμένο να βλέπουμε τον αριθμό των νημάτων να χτυπούν εκατοντάδες ή χιλιάδες. Παρόλο που απελευθερώνονται μόλις ολοκληρωθεί η επεξεργασία της εργασίας, μερικές φορές κατά τη διάρκεια του χειρισμού ριπής, ο μεγάλος αριθμός νημάτων θα κατακλύσει τους πόρους του συστήματος.

Μια τρίτη επιλογή είναι η χρήση προσαρμοσμένων στρατηγικών ή πολιτικών. Μία τέτοια επιλογή είναι να υπάρχει ένα νήμα που ξεκινά από 0 έως κάποιο μέγιστο αριθμό. Έτσι, ακόμη και αν ένα νήμα κρεμαστεί, θα δημιουργηθεί ένα νέο νήμα όσο επιτυγχάνεται ο μέγιστος αριθμός νημάτων:

 execexec = νέο ThreadPoolExecutor (0, 3, 60, TimeUnit.SECONDS, νέο SynchronousQueue ()); 

Εδώ 3 είναι ο μέγιστος αριθμός νημάτων και ο χρόνος διατήρησης είναι 60 δευτερόλεπτα, καθώς αυτή είναι μια διαδικασία εντατικής εργασίας. Εάν δώσουμε έναν αρκετά υψηλό μέγιστο αριθμό νήματος, αυτή είναι λίγο πολύ μια λογική πολιτική που πρέπει να χρησιμοποιείται στο πλαίσιο αναστολής εργασιών. Το μόνο πρόβλημα είναι ότι εάν τα κρεμαστά νήματα δεν απελευθερωθούν τελικά υπάρχει μια μικρή πιθανότητα ότι όλα τα νήματα θα μπορούσαν κάποια στιγμή να κολλήσουν. Εάν τα μέγιστα νήματα είναι αρκετά υψηλά και υποθέτοντας ότι μια αναστολή εργασιών είναι ένα σπάνιο φαινόμενο, τότε αυτή η πολιτική θα ταιριάζει στο λογαριασμό.

Θα ήταν γλυκό αν το ThreadPool είχε επίσης ένα μηχανισμό ανίχνευσης κρεμασμένων νημάτων. Θα συζητήσω ένα τέτοιο σχέδιο αργότερα. Φυσικά, εάν όλα τα νήματα έχουν παγώσει, θα μπορούσατε να διαμορφώσετε και να χρησιμοποιήσετε την πολιτική απορριφθέντων εργασιών του ομαδικού νήματος. Εάν δεν θέλετε να απορρίψετε τις εργασίες θα πρέπει να χρησιμοποιήσετε το CallerRunsPolicy:

 execexec = νέο ThreadPoolExecutor (0, 20, 20, TimeUnit.MILLISECONDS, νέο SynchronousQueue () νέο ThreadPoolExecutor.CallerRunsPolicy ()); 

Σε αυτήν την περίπτωση, εάν μια αναστολή νήματος προκάλεσε την απόρριψη μιας εργασίας, αυτή η εργασία θα δοθεί στο νήμα κλήσης για χειρισμό. Υπάρχει πάντα η πιθανότητα αυτού του έργου να κρέμεται. Σε αυτήν την περίπτωση ολόκληρη η διαδικασία θα παγώσει. Επομένως, είναι καλύτερο να μην προσθέσετε μια τέτοια πολιτική σε αυτό το πλαίσιο.

 Δημόσια τάξη NotificationProcessor υλοποιεί Runnable {private final NotificationOriginator notificationOrginator; boolean isRunning = true; ιδιωτικό τελικό ExecutorService execexec; AlarmNotificationProcessor (NotificationOriginator norginator) {// ctor // execexec = Executors.newCachedThreadPool (); // Πάρα πολλά νήματα // execexec = Executors.newFixedThreadPool (2); //, ανίχνευση καθυστέρησης execexec = νέο ThreadPoolExecutor (0 , 250, TimeUnit.MILLISECONDS, νέο SynchronousQueue (), νέο ThreadPoolExecutor.CallerRunsPolicy ()); } δημόσια εκτέλεση κενού () {while (isRunning) {try {final Task task = TaskQueue.INSTANCE.getTask (); Runnable thisTrap = new Runnable () {public void run () {++ alarmid; notificaionOrginator.notify (νέο OctetString (), // Επεξεργασία εργασιών nbialarmnew.getOID (), nbialarmnew.createVariableBindingPayload ()); É ........}}; execexec.execute (thisTrap); } 

Ένα προσαρμοσμένο ThreadPool με ανίχνευση αναμονής

Θα ήταν υπέροχο να υπάρχει βιβλιοθήκη νήματος με δυνατότητα ανίχνευσης και χειρισμού αναμονής εργασίας. Έχω αναπτύξει ένα και θα το δείξω παρακάτω. Αυτό είναι στην πραγματικότητα μια θύρα από μια ομάδα νήματος C ++ που σχεδίασα και χρησιμοποίησα λίγο καιρό πίσω (βλ. Αναφορές). Βασικά, αυτή η λύση χρησιμοποιεί το μοτίβο Εντολών και την αλυσίδα Ευθύνης. Ωστόσο, η υλοποίηση του μοτίβου εντολών σε Java χωρίς τη βοήθεια του Function Object Support είναι λίγο δύσκολη. Για αυτό έπρεπε να αλλάξω ελαφρώς την εφαρμογή για να χρησιμοποιήσω την αντανάκλαση Java. Σημειώστε ότι το πλαίσιο στο οποίο σχεδιάστηκε αυτό το μοτίβο ήταν εκεί όπου έπρεπε να τοποθετηθεί / συνδεθεί μια δεξαμενή νήματος χωρίς να τροποποιηθεί καμία από τις υπάρχουσες τάξεις. (Πιστεύω ότι το μεγάλο πλεονέκτημα του αντικειμενοστρεφούς προγραμματισμού είναι ότι μας δίνει έναν τρόπο να σχεδιάζουμε μαθήματα έτσι ώστε να κάνουμε αποτελεσματική χρήση του Open Closed Principle. Αυτό ισχύει ιδιαίτερα για τον περίπλοκο παλιό κώδικα παλαιού τύπου και μπορεί να έχει λιγότερο σημασία για ανάπτυξη νέων προϊόντων.) Ως εκ τούτου, χρησιμοποίησα το προβληματισμό αντί να χρησιμοποιήσω μια διεπαφή για να εφαρμόσω το μοτίβο εντολών. Ο υπόλοιπος κώδικας θα μπορούσε να μεταφερθεί χωρίς σημαντική αλλαγή, καθώς σχεδόν όλα τα πρωτότυπα συγχρονισμού και σηματοδότησης νημάτων είναι διαθέσιμα στην Java 1.5 και μετά.

 δημόσια τάξη Command {private Object [] argParameter; ........ // Ctor για μια μέθοδο με δύο args Command (T pObj, String methodName, longoutout, String key, int arg1, int arg2) {m_objptr = pObj; m_methodName = mthodName; m_timeout = χρονικό όριο; m_key = κλειδί; argParameter = νέο αντικείμενο [2]; argParameter [0] = arg1; argParameter [1] = arg2; } // Καλεί τη μέθοδο του αντικειμένου void execute () {Class klass = m_objptr.getClass (); Class [] paramTypes = new Class [] {int.class, int.class}; δοκιμάστε το {Method methodName = klass.getMethod (m_methodName, paramTypes); //System.out.println("Βρείτε τη μέθοδο -> "+ methodName); if (argParameter.length == 2) {methodName.invoke (m_objptr, (Object) argParameter [0], (Object) argParameter [1]); } 

Παράδειγμα χρήσης αυτού του μοτίβου:

 δημόσια τάξη CTask {.. public int DoSomething (int a, int b) {...}} 

Command cmd4 = new Command (task4, "DoMultiplication", 1, "key2", 2,5);

Τώρα έχουμε δύο πιο σημαντικά μαθήματα εδώ. Το ένα είναι το Αλυσίδα νήματος τάξη, η οποία εφαρμόζει το μοτίβο Αλυσίδα Ευθύνης:

 δημόσια τάξη ThreadChain υλοποιεί Runnable {public ThreadChain (ThreadChain p, ThreadPool pool, String name) {AddRef (); deleteMe = false; απασχολημένος = false; // -> πολύ σημαντικό επόμενο = p; // ορίστε την αλυσίδα νημάτων - σημειώστε ότι είναι σαν μια συνδεδεμένη λίστα impl threadpool = pool; // ορίστε το νήμα - Ρίζα του νήματος ........ threadId = ++ ThreadId; ...... // ξεκινήστε το νήμα thisThread = νέο νήμα (αυτό, όνομα + inttid.toString ()); thisThread.start (); } 

Αυτή η τάξη έχει δύο κύριες μεθόδους. Το ένα είναι Boolean Μπορω να χειριστω() που ξεκινά από το ThreadPool τάξη και στη συνέχεια προχωρά αναδρομικά. Αυτό ελέγχει εάν το τρέχον νήμα (τρέχον Αλυσίδα νήματος παράδειγμα) είναι ελεύθερος να χειριστεί την εργασία. Εάν χειρίζεται ήδη μια εργασία καλεί την επόμενη στην αλυσίδα.

 public Boolean canHandle () {if (! busy) {// Εάν δεν είναι απασχολημένος System.out.println ("Can Handle This Event in id =" + threadId); // πρέπει να σηματοδοτήσετε ένα συμβάν δοκιμάστε το {condLock.lock (); condWait.signal (); // Σηματοδοτήστε το HandleRequest που το περιμένει στη μέθοδο εκτέλεσης .................................... ..... επιστρέψτε αληθινό? } ......................................... /// Διαφορετικά δείτε αν το επόμενο το αντικείμενο στην αλυσίδα είναι δωρεάν /// για να χειριστεί το αίτημα επιστροφής next.canHandle (); 

Σημειώστε ότι το HandleRequest είναι μια μέθοδος Αλυσίδα νήματος που επικαλούνται από το Διαδρομή σπειρώματος () μέθοδο και περιμένει το σήμα από το μπορω να χειριστω μέθοδος. Σημειώστε επίσης τον τρόπο χειρισμού της εργασίας μέσω του μοτίβου εντολών.