Creating a Kafka Consumer with Spring Boot
A free video tutorial from Arbi Elezi
Software Developer | Computer Engineer | Instructor
4.4 instructor rating • 2 courses • 26,996 students
Learn more from the full courseApache Kafka and Spring Boot (Consumer, Producer)
In this course Apache Kafka and Spring Boot will be used to establish communication between them.
51:34 of on-demand video • Updated May 2019
- You will learn how to create a Kafka Consumer using Spring Boot
- You will learn how to create a Kafka Producer using Spring Boot
English [Auto] Hello guys. And welcome back to creating Kafka. Producer and consumer with Spring Boot with me RBA Daisy. Well if you have watched the previous video where I have created a Kafka producer with Springboard then you may actually be familiar with this code. If you haven't watched the previous video when this code is basically the code for the Kafka producer. So I have the configuration here with the producer factory and we have the Kafka template and I have also a simple controller where I get from a post and point the Jason model which is this simple format with do random string fields and I basically send this model to the specific Kafka topic which I have named my topic. Well since uh the Kafka server is not up and running let's start it and uh then let's continue coding the consumer so I am starting the zookeeper first. Key for server start dot s age and then config slash a zoo keeper. Dog proper zookeeper no properties. I started it. I have done some type of here. So here Sarah start this. Yes it's gone. The of config so. Yep. The zookeeper is started now. Let's start the Kafka. The same here. Kafka server start that Ice Age config and here I'll type server dot properties dot properties files are similar to dot to Yama files which are used for configuration. You can see here in the resources there also an application dot properties where I can configure things like the server part of the driver for database etc. If you're familiar with Sprint you already know about. Anyway let's start this server to and is run our government the machine is already running. So now let's go back to the code. So in order to consume from a Kafka topic we first of all need to build to other beings. So we need we need to have other beings ready. A consumer config being just like the producer config here and also a concurrent CAF listener. I don't remember the long name but it is used for configuration in order to perform the listening. And also this annotation here yet enable Kafka. Which makes it possible for the spring application to actually listen to Kafka events and there's also another annotation called Kafka listener which we are going to see later in this video. So let's create the Consumer config beat just like before. Public consumer. Factory for configuration string. Simple model can assume. Factory event. Let's create a hash map to keep the properties. I like to call it config you call it whatever you want. And uh there we are going to put things that are similar to this one. So the better step server is the same but the serialize are and two key and valid serialize those are going to be key and value D serialize us and V consumer factory here. Uh requires also a group I.D. for the topic so let's copy here. This input pays them here and now let's change from producer config. If you have guessed it to consumer config and up here to consumer config key D serialize her string D serialize her and yes you will use the Apache Kafka is common and the Jason and the serialize her class also from Apache Kafka commons. Here there is also a consumer config similar config these serialize of course will also group ideas. I said before is required to buy the consumer factory so let's put it here too and you can put any consumer. I'm sorry consumer config and you can put here any. Yeah. Grab. You can pick any name you want. It's just too hard to identify the different golf cart listener configurations in your application say you have different configuration for the same topics. You know it doesn't matter all the topics but if you have different configuration like one is with the string D serialize her for a value or the other with a Jason the serialize her or XM of this realize are and you basically need different group ideas for this one so let's name it this group idea my group idea and let's actually put this properties of configuration to use. So you default GAFCON consumer factory. And here it takes our arguments as a constructor argument. The configuration. But in order for passing from from. For the DC realizing to work you also need to explicitly identify the to create the D serialization objects so a new Jason. Serialize her and the inside the constructor of the Jason DC Eliza you also need to specify what type of Podger jar you're going to d you realize. So in our case the simple model dot class and uh we are done with the consumer factory. Now let's create the oblique concurrent covered call listener container factory which has the same. In fact it is recommended to name it Kafka listener container factory not concocted Kafka Lesnar Kundera factory these factory in order to override the actual Kafka listener container factory that the ease that is instantiated by default so let's create our of course the types here. String simple model the same here. String simple model it goes to a one and why did die declared like these because the concurrent Kafka listener container factory doesn't take the consumer configuration. S as a constructor arguments so you need to. You need to set it by a center so the consumer factory not a concurrency set consumer factory and that's all. Let's return this object. And we're done here. That configuration for consumption from a Kafka topic is ready. And the last thing you need is this at enable Kafka which enables the event. Listening to Kafka events in the springboard application. Now let's go to our Controller. I'm a great service clubs but for simplicity of use let's use the current controller and just like you have the rest endpoints or HDP and points like post get put delete patch. You create something similar. We say Kafka listener annotation which takes as a parameter topics which is the actual topic that uh is going to be listened to. And in this case it is my topic and uh public void. Let's get from Kafka and here s a request body. We need the simple model symbol model. And uh. Well and I'm simply going to print this simple model. I have already built the two string method so you can build your own. And anyway this circle should be ready to go. So let's run it. As the goes server is already running and as you're going to see here consumer client and the group I.D. these specified so it's successfully joint group with generation and it started said the partition to topic zero and it's basically ready to go. So let's hit it with postman and as you can see here the post the end point. Sense whenever I send by the postman to the Kafka topic and the whatever I send I get back by the Kafka listener because it listens to every change that happens to the queue to the uh Kafka topic in the server. Basically it's similar to mongo tailing or read this daily. If you're familiar with it so let's try something different now. It and it works perfectly. What I also like to do but it's not very commendable is I'd like to use a third party library for serialization and d serialization of the Kafka of performing all of the civilizations and DC realisations by myself in order not to encounter errors later so basically if you change the version of a library that's like a recovery library. There may be change to the serialized R and D serialize her say in order to counter that. I like to perform them manually so let's go to Maven Repository. It's in online a web page where you can get all maybe most maybe all packages maven packages that you can then import your own project Java project. It doesn't matter if you're using grade as it also gives the in this webpage it is also giving the grade though a snippet for in front of the library injection. So I like to use the latest go Zong which is basically Google G zone serialize there and DC analyzer and I like to import it into my save and what I actually got it imported. So basically what I like to do is this I like to create a serialization being from Google. Jason you bought it. Jason converter and return it and to use this to perform the serialization and d serialization. Whether I'm using Mongo whether I'm using read this or Kafka or rabbit -- queue I like to perform the civilization by myself. But you can keep the current code. It will work for you but it's just a preference and it's also a pretext to show you the string serialization and d serialization of the value of the Kafka Kafka consumer. I of the value of the consumer config and producer config so let's do it. After I have done this I basically need to change these Jason this year analyzer to string this you realize Lazarus above and the same here. Do you think this your allies are not here. No no I don't need to do this. I just use string. Serialize. And why do I like to do this. Because it allows me to use the same been the same configuration for multiple types of models. Basically I put it here. String this you realize there and then I can convert what I get as a string into a by V. Gain goes on by Google Jason into whatever apologia I may happen to use. So basically this is a form of abstraction. So let's change it here to string. And also for a template and producer. String and here to strike and what else is left. Yes it's a concurrent Kafka factory. Let's change it also to string and there's some problems with the consumer factory. A string string. Yes it's it's here. I have to change also these instantiation to shrink. Now that this is done I simply you I simply inject this beam into my controller just like I injected the Kafka template. Yes he did this shrink to string shrinks shrink. A template template. What happens to be the problem here. Let's check it string string could not now beans Kafka template are available. So yes we have the right to change it to string. Yes. Here it is. Jason chose it to change the salsa to string so we have the producer factory a string string. I've got template this string string the consumer factories both of the values our string and the Kafka listener. Container factories also string. So I don't think there should be any problem with this. Kafka template I think that the this is just a unintelligible error. And the problem is right here. So I need to get a string I need to know. In fact I I need the simple model here. I need to get the simple model and convert it via Jason converter that Google Jason library I just import it goes on Jason converter I also need to create the field for it. Jason converter and here the dependency is successfully injected as it is detected by the Tele sends anthology version. And here I need the simple model extent of the string and I'll convert I'll use the jettison converter to convert this simple model to string. I'll do the same here. I won't get a simple model I'll get a string and basically I can just print it or I can just print the Jason or if you want these serialized version of it and print it's actual to shrink. We can do that too. So simple model C simple model one is equal to a cost to a simple model of Jason converter dot from Jason on the actual string that is to be converted to a Podger And to the class that corresponds to this string you fight in a Jason format puzzle after that you can use system out to print our own version of to sync. So simple model one dot two string let's check if this is correct. So let's run the application and check if everything is fine. So far so good. Now let's send this failed one is the new field one and field two is the new field too. Let's send it and there's an internal error. Let's see what it is. So basically it says that string this line line is not an instance of common serialization sterilizer. So maybe we have done something wrong. Yes. Here right here in the producer instead of building a serialized we have put a D serialize are the other ones are correct. So Stauber we were on it and check for other errors let's repost and so on. OK so no errors. And as you can see this is the Jason stratification and this is our signification of the object. So basically we get the object we convert we get it the object here from the HDP rest and point it is then automatically converted in the background by this spring framework using Jackson converter to convert the string the string version of Jason format to the actual Podger then I use Google Jason good G zone to convert it back to string defied version of Jason and send it to the topic then I get this string defied version of the Jason and I converted back to a polynomial and also I print the string of the the string if I Jason version and are two string method so basically this is this was performed by us but that's why those Jason serialize there isn't this you realize there's we're actually doing so we have taken and we have for in like net taken they obstruct work and to done it by ourselves manually but this also allows us to use to listen by using the sample the very same governor at the very same container factory to listen to different to listen to different topics with different budget format so. That's right. Let's create the new one. So at post mapping that's change something here. Let's change the to for version to public void post and let's create another project. In this case let's create a new project. You know Java class let's call it more simple model. Okay let's add it get and it will have a private String field string which is let's call it title and private String description. And let's generate its constructor and getters and setters constructor. Yes on all arcs one structure. Also the no arcs constructor or simple model. And also the getters and setters so I've generated all of them here. Now let's get this new body by the end point. So the more simple model more simple model and let's send it to topic let's call this topic my topic to and let's send it using it of course using jaison converter and convert any to jaison. And that's done. And now let's listen to this new topic topics my topic do too. And public void get from Kafka to string more simple model. And let's do the very same thing as we did up here. But for say a more simple model this is also more simple model S to these two of these. So ever think yeah. No I don't have a two string. But let's create also the two string so old insert the string and that's all done now. Let's run it and see what happens. Basically it's not shown any errors so far so good. So let's Kafka let's post a war simple model title Kafka consumption and uh description creating a Kafka consumer with spring wood. That's the title of this video. So let's send it and it say 200 OK. And this you can see here via this other method within the topic too. We got our other project. So we have this same Kafka configuration. We we could send and receive two different Kafka topics with completely different models by doing the jaison conversion ourselves. So this basically is a form of abstraction. So instead of creating another concurrent Kafka listener factory another Kafka template you use the existing one and we do the conversions because in different let's say in different project or micro services that you might encounter. It may not be possible for you to change this configuration here. That's why it's better to give it like that. At least I believe so. So this is the end of this video. Thank you for watching. And if you want more contact like this click the subscription Subscribe button below and stay in touch. Thank you for watching and see you. Hopefully see only in the next videos.