GenStage
Σε αυτό το μάθημα θα δούμε πιο αναλυτικά το GenStage, τι ρόλο παίζει και πως θα το αξιοποιήσουμε στις εφαρμογές μας.
Εισαγωγή
Τι είναι το GenStage; Από την επίσημη τεκμηρίωση, είναι “ένας προσδιορισμός και μια επεξεργαστική ροή για την Elixir”, αλλά τι σημαίνει αυτό για εμάς;
Σημαίνει ότι το GenStage μας παρέχει έναν τρόπο να ορίζουμε έναν αγωγό εργασίας που εκτελείται από ανεξάρτητα βήματα (ή στάδια) σε ξεχωριστές διεργασίες· Αν έχετε δουλέψει με αγωγούς στο παρελθόν τότε μερικές από τις έννοιες θα σας είναι γνώριμες.
Για να κατανοήσετε καλύτερα πως δουλεύει, ας σκεφτούμε μια απλή ροή παραγωγού-καταναλωτή:
[A] -> [B] -> [C]
Σε αυτό το παράδειγμα έχουμε τρία στάδια: το A
είναι ένας παραγωγός, το B
είναι ένας παραγωγός-καταναλωτής και το C
είναι ένας καταναλωτής.
Το A
παράγει μια τιμή η οποία καταναλώνεται από το B
, το B
κάνει κάποια εργασία και επιστρέφει μια νέα τιμή η οποία λαμβάνεται από τον καταναλωτή μας C
· Ο ρόλος του σταδίου μας είναι σημαντικός, όπως θα δούμε στον επόμενο τομέα.
Παρόλο που το παράδειγμά μας είναι 1-προς-1 παραγωγός-προς-καταναλωτή, είναι δυνατό να έχουμε πολλαπλούς παραγωγούς και πολλαπλούς καταναλωτές σε οποιοδήποτε στάδιο.
Για να παρουσιάσουμε καλύτερα αυτές τις έννοιες θα κατασκευάσουμε έναν αγωγό με το GenStage, αλλά πρώτα ας εξερευνήσουμε περισσότερο τους ρόλους στους οποίους βασίζεται το GenStage.
Καταναλωτές και Παραγωγοί
Όπως έχουμε διαβάσει, ο ρόλος που δίνουμε στο στάδιό μας είναι σημαντικός. Οι προδιαγραφές του GenStage αναγνωρίζουν τρεις ρόλους:
-
:producer
— Μια πηγή. Οι παραγωγοί περιμένουν για ζήτηση από τους καταναλωτές και ανταποκρίνονται με τα συμβάντα που ζητήθηκαν. -
:producer_consumer
— Ταυτόχρονα μια πηγή και ένας προορισμός. Οι παραγωγοί-καταναλωτές μπορούν να ανταποκριθούν στη ζήτηση από άλλους καταναλωτές και να ζητήσουν συμβάντα από παραγωγούς. -
:consumer
— Ένας προορισμός. Ένας καταναλωτής ζητάει και λαμβάνει δεδομένα από παραγωγούς.
Παρατηρείτε ότι οι παραγωγοί μας περιμένουν για ζήτηση; Με το GenStage οι καταναλωτές στέλνουν αιτήσεις προς τα πάνω και επεξεργάζονται τα δεδομένα από τον παραγωγό. Αυτό διευκολύνει ένα μηχανισμό γνωστό ως αντίθλιψη. Η αντίθλιψη βάζει το βάρος στον παραγωγό ώστε να μην πιέζει υπερβολικά όταν οι καταναλωτές είναι απασχολημένοι.
Τώρα που καλύψαμε τους ρόλους στο GenStage, ας ξεκινήσουμε με την εφαρμογή μας.
Ξεκινώντας
Σε αυτό το παράδειγμα θα κατασκευάσουμε μια εφαρμογή GenStage που στέλνει αριθμούς, ταξινομεί τους ζυγούς και τέλος τους τυπώνει.
Στην εφαρμογή μας, θα χρησιμοποιήσουμε και τους τρεις ρόλους GenStage. Ο παραγωγός μας θα είναι υπεύθυνος για το μέτρημα και την αποστολή των αριθμών. Θα χρησιμοποιήσουμε ένα παραγωγό-καταναλωτή για να φιλτράρουμε τους ζυγούς αριθμούς και αργότερα να ανταποκρινόμαστε στη ζήτηση από κάτω. Τέλος θα χτίσουμε ένα καταναλωτή για να εμφανίσουμε τους εναπομείναντες αριθμούς.
Θα ξεκινήσουμε δημιουργώντας ένα project με ένα δέντρο παρακολούθησης:
mix new genstage_example --sup
cd genstage_example
Ας αναβαθμίσουμε τις εξαρτήσεις μας στο mix.exs
για να συμπεριλάβουμε το gen_stage
:
defp deps do
[
{:gen_stage, "~> 1.0.0"},
]
end
Θα πρέπει να κατεβάσουμε τις εξαρτήσεις μας και να τις συντάξουμε πριν προχωρήσουμε:
mix do deps.get, compile
Τώρα είμαστε έτοιμοι να χτίστουμε τον παραγωγό μας!
Παραγωγός
Το πρώτο βήμα της εφαρμογής GenStage μας είναι να δημιουργήσουμε τον παραγωγό μας. Όπως συζητήσαμε πριν, θέλουμε να δημιουργήσουμε ένα παραγωγό που στέλνει μια συνεχή ροή αριθμών. Ας δημιουργήσουμε το αρχείο παραγωγού:
touch lib/genstage_example/producer.ex
Τώρα μπορούμε να προσθέσουμε τον κώδικα:
defmodule GenstageExample.Producer do
use GenStage
def start_link(initial \\ 0) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
def init(counter), do: {:producer, counter}
def handle_demand(demand, state) do
events = Enum.to_list(state..(state + demand - 1))
{:noreply, events, state + demand}
end
end
Τα δύο πιο σημαντικά μέρη που πρέπει να σημειώσουμε εδώ είναι οι init/1
και handle_demand/2
.
Στην init/1
ορίζουμε την αρχική κατάσταση όπως θα κάναμε στους GenServers μας αλλά το πιο σημαντικό είναι ότι μας βάζουμε την ταμπέλα του παραγωγού.
Η απάντηση της συνάρτησής μας init/1
είναι αυτή στην οποία στηρίζεται το GenStage για να ταξινομήσει την διεργασία μας.
Η συνάρτηση handle_demand/2
είναι εκεί που ορίζεται το μεγαλύτερο μέρος του παραγωγού μας.
Πρέπει να υλοποιηθεί από όλους τους GenStage παραγωγούς.
Εδώ επιστρέφουμε το σετ των αριθμών που ζητούνται από τους καταναλωτές μας και αυξάνουμε το μετρητή μας.
Η ζήτηση από τους καταναλωτές, η μεταβλητή demand
στον κώδικα από πάνω, είναι ένας ακέραιος που αναπαριστά τον αριθμό των συμβάντων που μπορούν να χειριστούν· με προκαθορισμένο το 1000.
Παραγωγός Καταναλωτής
Τώρα που έχουμε έναν παραγωγό-γεννήτρια αριθμών ας μεταβούμε στον παραγωγό-καταναλωτή μας. Θα θέλουμε να ζητήσουμε αριθμούς από τον παραγωγό μας, να αφαιρέσουμε τους μονούς και να απαντήσουμε στη ζήτηση.
touch lib/genstage_example/producer_consumer.ex
Ας αναβαθμίσουμε το αρχείο μας για να δείχνει όπως ο κώδικας στο παράδειγμα:
defmodule GenstageExample.ProducerConsumer do
use GenStage
require Integer
def start_link(_initial) do
GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
end
def init(state) do
{:producer_consumer, state, subscribe_to: [GenstageExample.Producer]}
end
def handle_events(events, _from, state) do
numbers =
events
|> Enum.filter(&Integer.is_even/1)
{:noreply, numbers, state}
end
end
Όπως ίσως παρατηρήσατε, με τον παραγωγό-καταναλωτή μας παρουσιάζουμε μια νέα συνάρτηση, την handle_events/3
και μια νέα επιλογή στην init/1
.
Με την επιλογή subscribe_to
καθοδηγούμε το GenStage να μας βάλει σε επικοινωνία με ένα συγκεκριμμένο παραγωγό.
Η μέθοδος handle_events/3
είναι η κινητήριος δύναμή μας, εκεί όπου λαμβάνουμε τα εισερχόμενα συμβάντα, τα επεξεργαζόμαστε και επιστρέφουμε το μεταμορφωμένο σετ.
Όπως θα δούμε οι καταναλωτές υλοποιούνται με περίπου τον ίδιο τρόπο αλλά η σημαντική διαφορά είναι στο τι επιστρέφει η μέθοδος handle_events/3
και στο πως χρησιμοποιείται.
Όταν μαρκάρουμε την διεργασία μας σαν παραγωγό-καταναλωτή το δεύτερο όρισμα της τούπλας μας, η numbers
σε αυτή την περίπτωση, χρησιμοποιείται για να ανταποκριθεί στη ζήτηση προς τα κάτω.
Στους καταναλωτές αυτή η τιμή απορρίπτεται.
Καταναλωτής
Τελευταίο αλλά εξίσου σημαντικό έχουμε τον καταναλωτή μας. Ας ξεκινήσουμε:
touch lib/genstage_example/consumer.ex
Από τη στιγμή που οι καταναλωτές και οι παραγωγοί-καταναλωτές είναι τόσο όμοιοι ο κώδικάς μας δεν θα δείχνει τόσο διαφορετικός:
defmodule GenstageExample.Consumer do
use GenStage
def start_link(_initial) do
GenStage.start_link(__MODULE__, :state_doesnt_matter)
end
def init(state) do
{:consumer, state, subscribe_to: [GenstageExample.ProducerConsumer]}
end
def handle_events(events, _from, state) do
for event <- events do
IO.inspect({self(), event, state})
end
# As a consumer we never emit events
{:noreply, [], state}
end
end
Όπως είδαμε στον προηγούμενο τομέα ο καταναλωτής μας δεν στέλνει συμβάντα, έτσι η δεύτερη τιμή στην τούπλα μας θα απορριφθεί.
Συνδεση
Τώρα που έχουμε τον παραγωγό μας, τον παραγωγό-καταναλωτή και τον καταναλωτή έτοιμους είμαστε έτοιμοι να τους συνδέσουμε.
Ας ξεκινήσουμε ανοίγοντας το lib/genstage_example/application.ex
και προσθέτοντας τις διεργασίες μας στο δέντρο παρακολούθησης:
def start(_type, _args) do
import Supervisor.Spec, warn: false
children = [
{GenstageExample.Producer, 0},
{GenstageExample.ProducerConsumer, []},
{GenstageExample.Consumer, []}
]
opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
Supervisor.start_link(children, opts)
end
Αν όλα είναι σωστά, μπορούμε να τρέξουμε το project και θα πρέπει να δούμε τα πάντα να δουλέυουν:
$ mix run --no-halt
{#PID<0.109.0>, 0, :state_doesnt_matter}
{#PID<0.109.0>, 2, :state_doesnt_matter}
{#PID<0.109.0>, 4, :state_doesnt_matter}
{#PID<0.109.0>, 6, :state_doesnt_matter}
...
{#PID<0.109.0>, 229062, :state_doesnt_matter}
{#PID<0.109.0>, 229064, :state_doesnt_matter}
{#PID<0.109.0>, 229066, :state_doesnt_matter}
Τα καταφέραμε! Όπως αναμέναμε η εφαρμογή μας στέλνει μόνο ζυγούς αριθμούς και το κάνει γρήγορα.
Σε αυτό το σημείο έχουμε έναν αγωγό που λειτουργεί. Υπαρχει ένας παραγωγός που στέλνει αριθμούς, ένας παραγωγός καταναλωτής που διαγράφει τους μονούς αριθμούς, και ένας καταναλωτής που τα εμφανίζει όλα αυτά και συνεχίζει τη ροή.
Πολλαπλοί Παραγωγοί ή Καταναλωτές
Αναφέραμε στην εισαγωγή ότι θα μπορούσαμε να έχουμε πάνω από ένα παραγωγούς ή καταναλωτές. Ας ρίξουμε μια ματιά σε αυτά.
Αν εξετάσουμε την έξοδο της IO.inspect/1
από το παράδειγμά μας θα δούμε ότι κάθε συμβάν το χειρίζεται μια μοναδική PID.
Ας κάνουμε κάποιες αλλαγές για πολλαπλούς εργάτες αλλάζοντας το lib/genstage_example/application.ex
:
children = [
{GenstageExample.Producer, 0},
{GenstageExample.ProducerConsumer, []},
%{
id: 1,
start: {GenstageExample.Consumer, :start_link, [[]]}
},
%{
id: 2,
start: {GenstageExample.Consumer, :start_link, [[]]}
},
]
Τώρα που ορίσαμε δύο καταναλωτές ας δούμε τι παίρνουμε αν τρέξουμε τώρα την εφαρμογή μας:
$ mix run --no-halt
{#PID<0.120.0>, 0, :state_doesnt_matter}
{#PID<0.120.0>, 2, :state_doesnt_matter}
{#PID<0.120.0>, 4, :state_doesnt_matter}
{#PID<0.120.0>, 6, :state_doesnt_matter}
...
{#PID<0.120.0>, 86478, :state_doesnt_matter}
{#PID<0.121.0>, 87338, :state_doesnt_matter}
{#PID<0.120.0>, 86480, :state_doesnt_matter}
{#PID<0.120.0>, 86482, :state_doesnt_matter}
Όπως θα δείτε τώρα έχουμε πολλαπλά PID, απλά προσθέτοντας μια γραμμή κώδικα και δίνοντας ID στους καταναλωτές μας.
Περιπτώσεις Χρήσης
Τώρα που καλύψαμε το GenStage και χτίσαμε την πρώτη μας δοκιμαστική εφαρμογή, ποιές είναι μερικές από τις πραγματικές περιπτώσεις χρήσης για το GenStage;
-
Αγωγός Μετατροπής Δεδομένων — Οι παραγωγοί δεν χρειάζεται να είναι απλές γεννήτριες αριθμών. Θα μπορούσαμε να παράγουμε συμβάντα από μια βάση δεδομένων ή ακόμα από μια άλλη πηγή όπως η Apache Kafka. Με ένα συνδυασμό παραγωγών-καταναλωτών και καταναλωτών θα μπορούσαμε να παράγουμε, ταξινομήσουμε, καταγράψουμε και αποθηκεύσουμε μετρήσεις καθώς γίνονται διαθέσιμες.
-
Ουρά Εργασίας — Αφού τα συμβάντα μπορούν να είναι οτιδήποτε θα μπορούσαμε να παράγουμε εργασίες μονάδων για να ολοκληρωθούν από μια σειρά καταναλωτών.
-
Επεξεργασία Συμβάντων — Παρόμοια στον αγωγό δεδομένων θα μπορούσαμε να λαμβάνουμε, επεξεργαζόμαστε, ταξινομούμε και να δρούμε πάνω σε συμβάντα που στέλνονται σε πραγματικό χρόνο από τις πηγές μας.
Αυτές είναι μόλις μερικές από τις δυνατότητες για το GenStage.
Έπιασες λάθος ή θέλεις να συνεισφέρεις στο μάθημα; Επεξεργαστείτε αυτό το μάθημα στο GitHub!