Persistent Actors in PostgreSQL

Daniel Ciocîrlan
A free video tutorial from Daniel Ciocîrlan
Software Engineer & Best-Selling Instructor
4.7 instructor rating • 11 courses • 67,458 students

Learn more from the full course

Akka Persistence with Scala | Rock the JVM

A must-have for Akka developers: write long-term reactive systems with Akka Persistence and PostgreSQL or Cassandra!

07:05:46 of on-demand video • Updated March 2021

  • Learn advanced Akka with Persistent Actors
  • Write long-lived, fault-tolerant distributed systems
  • Use Akka Persistence in production with PostgreSQL or Cassandra
  • Adopt a new mental model with Event Sourcing
English [Auto] All right, welcome back. I'm Daniel, and in this video, we're going to talk about actors, and this is where the rubber meets the road with events, sourcing and persistence. So in our idea now that we've started a new chapter, let's go ahead and create a new package under the color folder in our little project. So let's create a package called Part to underscore event sourcing. And under this package, I'm going to create a new school class, I'm going to name this persistent actors, make it an object and extends up as usual now, because in the last video, we already talked about events, sourcing and how it works here. In this lecture, we get right to the how we implement a persistent actor. So let's start by defining our first person actor. So for the purpose of this lecture, let's see. We have an accountant that keeps track of our invoices for our business and sums up the total income. So this is the scenario. So scenario we have a business. And an accountant which keeps track of our invoices, so this is the kind of scenario that we would encounter in real life, and for the purpose of this lecture, I'm going to create a simple actor, which we will use as an accountant. So I'm going to create a little class called Accountant, which extends instead of actor, I'm going to extend persistent actor. Which is part of the AACAP persistance library, which we imported way at the beginning of the course. OK, and I'm also going to mix in after logging so that we see things in our console. And because we extend persistent actor, we need to implement some methods. And for this case, we need to implement not one, but three fundamental methods. So the first method is called Persistence ID, which is a string. The second method is called receive command and the third method is called Receive Rickover. I'm going to talk about each of these in turn. So persistance ID is how the events persisted by this actor will be identified in the persistent store, will have lots and lots and lots of events grouped into one place. So we'll have to know which actor wrote what event. So this person's idea should be unique per each actor. So I'm going to name this simple accountant. This is not enforced to be unique by Arka, but the best practice is make this unique. The second method is receive command, and this is the quote unquote, normal received method, so so this is the normal received method. When you send a message to this actor, the receive command will be called. In fact, this is so simple that I can just go to an actor and show you how to receive handler is actually implemented. So you see persistent actor extend some some traits here and the deaf receive is actually receive command. So this is as simple as that. All right, so receive command is the handler that will be called when you send a normal message to this actor and third is the received recover, which is the handler that's being called on recovery. So handler that will be called on recovery, so on recovery, if you remember the previous explanation, the actor will query the persistent store for all the events associated. This persistence idea and all the events will be replayed by this actor in the exact order that they were originally written. And they will be sent to this actor as simple messages. And this receive recover handler will handle them. Now, the way that you construct an instance of this persistent actor is the way that you would normally construct any actor at all. So we would need to create a system as an actor system. Let's call this persistent actors, OK? And we would create a simple accountant as system. That actor of props, accountant. OK, and we would need to import props and give this actor a name so simple accountant. And then you're free to send messages to this accountant, as you would normally would to any actor at all. Now, let's assume that we can send invoices to this account actors. So let's create a case class super simple. Let's call this invoice with the recipient as a string, the date as a date as a value to date. OK, and the amount which is an end. OK, so let's say this accountant receives invoices, so this invoice will be the command for this accountant. OK, and let's assume that in our application we want to send a couple of invoices. So for example, for I in one to 10, let's say we have accountant tell invoice and let's say we have some recipient company like the software company or something like that. The date being today, like right now, a new date. And the amount should be eight times one thousand or something like that. OK. So basically ten simple invoice messages. Now, how would we handle these messages? Well, we would need to implement the receive command method. So this is the normal receive method that normal actors would use. So we would create cases here to handle invoices. So in case we have an invoice for a recipient date and an amount, then we would need to do a few things. Now the pattern goes like this and I'm going to write this as a comment. So when you receive a command first you create an event to persist into the store, then you persist at that event. So you purchase the event. Then passing a callback that will get triggered once the event is written specifically, I'm going to put a couple of logs here and I'm going to create this event. So I'm going to say log info, let's say received invoice for amount, and I'm going to inject the amount inside and I'm going to create a new event. So event equals, let's say, another case, Class C invoice recorded. All right. And let's give this an I.D., which we all passes to the database as an end and then also the fields here from the invoice. So recipient. As a string date, as a date and amount as an end, so this is an event. So we have a section for commands, which are the normal messages that we programmers send to this actor and a section for events which are the data structures that the persistent actors will send to the persistent store to the journal, if you remember. So we create an event as invoice recorded. With some ideas that we don't have yet and the data from this invoice or recipient date and amount, now let's say that our little accountant keeps track of the ideas that it persists. So I'm going to create a small variable here. Let's see, latest in voice ID, which I'm initializing with zero. And let's also declare a mutable total amount. Which starts as zero. OK, I'm going to kindly ask you to ignore the variables for now, I know they might bother you, but making the actors stateless with context become or other tricks will needlessly complicate the persistent concepts that I wanted to show in this lecture. OK, so just ignore these for now in case they bother you for any reason. So just to recap, this accountant has mutable state like with the latest in voice ID and the total amount of money that this account has registered. OK, so back to creating the event. So when we create the invoice, we log something and then we create the event that we want to send to the journal and we will create an invoice recorded event with the latest invoice ID. OK. And then the step two is to persist that event, so we will call a method called persist and we will pass in the event notice that the persist method takes to argument lists. First the event and then a handler that will be called after the persist has succeeded. So I'm just going to persist event and then I'm going to pass in a callback, which takes an event which happens to be the same event that we are persisting into the journal. And after the event has persisted, then I'm going to write a step three here. We update the actor state when the event has persisted. So after the event has persisted, we need to update our mutable state. So the latest in voice ID and the total amount. So the latest invoice ID, I'm going to simply incremented by one and the total amount I'm going to incremented by the amount in this invoice. OK, so this is the total amount of money that this account has registered. And just for completion, let's also log in for something like persisted and let's passing the event here as invoice number and that's passing e dot I.D.. So the event being persisted is. Of type invoice recorded. OK, for total amount, and I'm just going to inject total amount in this interpolated string now we can compress steps one and two, creating the event and processing the event into a single expression by moving the invoice recorded constructor inside the actual first call. And this is the pattern that we use in practice. So as a reaction to a command, a persistent actor will persist an event and once the event has persisted, it can use the event to do something with it, like update its mutable state. Log something or send messages to other actors. So this was the implementation of a simple receive comment method. Let's also look at receive recover, which will be called, as I said, when the actor starts or is restarted as a result of a supervisor strategy. Now, the pattern here for receive recover as a best practice is to follow the logic in the precise steps of receive command. What do I mean by that? So as a reaction to commands, this actor will already record or persist in voice recorded messages. So these are the messages that we should handle during receive recover because the journal will send these things back to the actor when the actor starts up. So I'm going to handle invoice recorded and I only care about the I.D. I don't really care about the recipient or the date because I don't need that for my state. So I.D. and amount. OK, and then the best practice is to follow the same state change that I do in the callback of persist. So latest invoice ID is going to be ID, the one that was posted by the actor and amount plus equals actually total amount plus equals the amount in the recorded event. So let's try this out. So we implemented our first person after we implemented its persistence ID, its received command, which persists events as a reaction to commands. And we also implemented the Rickover Handler, which treats the recorded persistent event when the actors starts up. OK, now let's try to run this application. But before that, we need to have a persistent store or a journal configured for storing events. There are a lot of options and we will discuss them in a later section. But let's configure the simplest store, a local level DB database that rights to a file. So I'm going to exit the presentation mode and I'm going to create the application dot com configuration file. So under the source main folder creates the directory called Resources and other resources, creates the small file called application dot com. And in this application com we will configure the person store that this simple actor will use and the configuration will go like this, I'm going to say aka dot persistence, dot journal dot plugin equals so. Right, exactly what I'm writing here on the screen. This is the configuration name that configures the journal that we're going to use in the application. So the value here for Okka Persistence Journal plugin is going to be Orka dot persistance dot journal dot level DB. OK, so this is a simple Google lightweight database and I'm going to also specify to which files this level DEBE database is going to write to. So I'm going to say Ecorp Persistance Journal level DB dot dya. So the directory where the level to be is going to write to is going to be and I'm going to write a simple path here. Target Slash Rock, the Jeem Slash Journal. And this folder will be under this orange target folder right over here. So go ahead and collapse this target folder and create a new folder here, a new directory called Aarti JVM. So rock the JVM. And once you specify targets, rock the JVM journal Orka will create the general directory for us under the JVM. But make sure you have the directory set up here in the file system of this project. If you're wondering how this level Debbie Journal is known to occur, we set that up in built DOT SBT via these two configuration lines. OK, so these library dependencies make levalley be known to orga so we can use it as a person store. So having said that, let's get back to our little code and let's try to run this application and see what happens. Oh if I. Right. Click and run this. Let's see what happens. So we have a number of logs here and we have received invoice for amount one thousand, which is this log line here. And then we have persisted invoice as invoice zero for the total amount, 1000 means our event was persisted because the callback was called. And we have this a bunch of times for all the invoices that we wrote. So invoice zero one, two, three, four, up to nine for total amount, fifty five thousand. So this persistent actor works correctly. Now, notice what's happening if I comment out these lines, so for the next run of this application, I'm not going to send any more messages to this accountant, but I'm also going to log the recovery of every single event that I had persisted in the previous run. So I'm just going to say log info recovered. Invoice hash, and I'm just going to inject the ID here for a amount. Amount. And I'm also going to say total amount. Total amount, and I'm going to put this log line right at the end. OK, so I'm going to right. Click and rerun this application. And watch what happens. So we have a bunch of loglines, so let's take a look. So we see that our persistent actor, although it didn't receive any more messages, it recovered all of the events that were persistent in the previous run. So we have recovered invoice for amount 1000, total amount one thousand and so on and so forth. We have recovered invoice all the way up to nine for amount. Ten thousand total amount, fifty five thousand. So notice that all of these recoveries happen in the same order that they were persistent originally. So we see that without sending a single message. Our accountant actor retrieved the events from the persistent store and was able to automatically recreate its state. So how does this all work? Well, in the previous run, this actor wrote a bunch of objects in the journal, Remember the precise cause. But these persist calls and these persistent events were tagged with this actor's persistence ID. This is why the persistence idea is very important on Starz. That is on the next run aka queried the Journal for all events tagged with this persistance ID on behalf of the actor. That's just starting because the Journal had 10 events for this person, said the actor will call the receive recover handler for every single one of those events. It will replace all of the events and it will recreate its state. Now, there are some important things to discuss. Let's go to the Pacific Persistent, as I mentioned in the previous videos, persisting is asynchronous. It's worth stressing that out. So persisting is a non blocking call. This will be executed at some point in the future and the handler will also be executed at some point in the future after the event has successfully been persisted into the journal. Now, what do we know about asynchronous callbacks and mutable accurate state, I wonder? Now, normally we would know from Orckit Essentials never to access mutable states or call methods in async callbacks because we know that this can break the actor encapsulation. But and here is a big but here it's OK. You don't have race conditions so safe to access mutable state here. OK, a person's guarantees that no other threads are accessing the actor during a callback behind the scenes, aka persistance is also message based. That probably helps. We can even safely access the right center of the command during the callback. So you can say send or tell something like persistance, acknowledge or something like that. And the call to sender will correctly identify the actor reference who originally sent the comment, even though the first callback is are executed at some point in the future. All right. So this is a very nice guarantee. Let's just mention here that we can correctly identify the sender of the command. Now, the very astute listener and watcher may notice a time gap between the original call to persist and the callback, which is triggered after the event, has been persistent. So here, right here, we have a time gap. So the question for the astute reader or watcher, even without the race conditions that we said we wouldn't have, there is a gap of time between persisting and handling the event. OK, now, what if other messages are handled in the meantime between persisting and the call back? What if other commands are being sent in the meantime? Now, that will be an excellent question because with persist our guarantees that all the messages between persisting and the handling of the persisted event are stashed. So I'm just going to mention in this time gap, all other messages sent to this actor are stashed. And I'm going to put this on a new line now, another thing that I wanted to mention, which is not that incredibly important, but I do want to stress it out, is that you're not obliged to persist in any event, because if you receive messages, you can act like a normal Dr.. So, for example, if you get a message called print, for example, you can simply do whatever a normal actor can do. You don't need to persist events. You're not in any way forced to persist events. So we can say blog info. Latest in voice I.D. and you can inject your state here. And you can even send replies to the sender of these messages, so, of course, you can act like a normal actor if you wanted, but you also have the capability to persist events if you wish. So this is how persistent actors work in a nutshell. Now, there are more things to discuss, which I'm going to do in the next video.