Writing a Kafka Avro Producer in Java

A free video tutorial from Stephane Maarek | AWS Certified Cloud Practitioner,Solutions Architect,Developer
Best Selling Instructor, 10x AWS Certified, Kafka Guru
Rating: 4.7 out of 5Instructor rating
65 courses
2,420,393 students
Writing a Kafka Avro Producer in Java

Lecture description

Learn how to use Java to write your first Avro Producer

Learn more from the full course

Apache Kafka Series - Confluent Schema Registry & REST Proxy

Kafka - Master Avro, the Confluent Schema Registry and Kafka REST Proxy. Build Avro Producers/Consumers, Evolve Schemas

04:24:21 of on-demand video • Updated March 2024

Write simple and complex Avro Schemas
Create, Write and Read Avro objects in Java
Write a Java Producer and Consumer leveraging Avro data and the Schema Registry
Learn about Schema Evolution
Perform Schema evolution using the command line and in Java
Utilize the REST Proxy using a REST Client
English [Auto]
Okay, so in this lecture we are going to write an Avro producer in Java. So this is exciting because when we're feeling for the console producer and the console consumer and you know, they're very helpful for debugging, we're going to do something in Java, something that looks more like an application you would deploy in your production environment. So anything that we'll do here will leverage all the knowledge we've acquired from before, as you'll see, and we'll be using specific record as our way to create Avro objects. So that's really exciting, right? So let's get started. Okay, so we are in our project and what I'm going to do now is create a new project. It's going to be Maven. Click next. The package again can be whatever you want. I'll just do that example for now. And this is Kafka, Avro V1, so I'll be coding along with you. But all the code again is available in the GitHub repository. This window. Okay. So as we get started now, we definitely want to enable auto import. And so we are in our Maven project just like usual. And the first thing I want to do is get the Pom.xml right. So for this I will go directly to the Kafka Avro V1 folder that we've had from before, from the code download on GitHub. And here it is, there's all this stuff, so everything from below from properties you should get. So properties all the way down and we'll go over right now and analyze what that means. Okay, so I'll go ahead and paste it. So everything's been copied now. And let's go over the code and understand what this pom.xml contains. So first thing is we have an Avro version 1.802, but I've also added the Kafka version zero 11.01 and the Confluent version 331. So those are the latest at the time I'm writing this code, this course. Actually, there's Kafka 1.0, but because we want to match up to Confluent, Confluent 4.0 is not out yet. So I just use the older version of Kafka zero 11 zero one But that's plainly enough for this course as we are going to resolve dependencies from Confluence, you need to add a repository. And for this we have a repositories block with a repository and the ID is confluent and the URL is the maven from Confluence. For the dependencies, we have Apache Avro, which is how we'll use it to create our specific records. And then we have our Kafka client just like before, and the reference to Kafka version from before. So. Zero 11 001. And finally, we have Confluence and the artifact is Kafka. Avro Serializer. So here it is. That's how we will write stuff to the schema registry into Kafka in Avro. Regarding the build plugins, we have Java eight enabled right here. We still have the specific record plugin to build code that will be using and a code and a plugin. Again to force the discovery of these sources. So fairly simple. The only thing that has changed now is these two blocks. So let's get started. So in my main, I'm going to create a. Come Dot Example Dot Kafka. Avro Producer V1. Okay. And I named it V1 and we have a VM for public static void Main. So let's go ahead and create our first producer. So first of all, same as before, we create some properties and these properties contains what you would expect. So we have the bootstrap servers and please set it to your IP 9092. Then we can set some properties such as the X being one. We could set the retries. Being ten. But you know that we have to set the value at the key serializer. And so for this we'll use a string serializer dot class dot getName. So we'll serialize our key with strings. And similarly, we have a value serializer, no serializer. And that one is going to be a Kafka Avro serializer dot class dot getName. So here we have a new value serializer and that's the Kafka Avro. And you can see that from the import statement. We can see that it is from the Confluent Kafka package. So because we have a value serializer, we also should set the schema registry URL http slash slash 127 0018081. Okay. Make sure you have it like this. Or again, change the IP based on what you want. Okay, next up, we go ahead and create a Kafka producer. And so the key is going to be string and the value is going to be a customer. We don't have the customer yet, and that's our Kafka producer, and it's a new Kafka producer that takes the properties as an input. You still we still have the customer, but we'll have it very soon. By the way, the string topic that we write to will be customer Avro, so that's perfect. So now we need to create a customer and we haven't done it so yet. So we'll just put question marks and then what we'll do is that we'll create a producer record string. Uh, customer. So we'll create our producer. Record equals new producer record. And so as an input, the topic is going to be topic. And the key is going to be whatever you want. We want the unity key. We'll just put the customer as a value. Okay. So, so far, so good. But there is something and the customer we still don't have. So for now, I'll just comment this code. So it compiles. And so we need to create that customer class. Do you know how to create that customer class? The answer is yes, you do. You do, you know. So just like before in resources, we create an Avro directory and we'll create a customer v1 dot avsc file. Now go ahead because I'm lazy and I'll just use the one from my project. Copy this, paste that. So here's our customer just like before. First name. Last name. Age, height, weight. Automated email. You see, there's no targets yet, so I'll go ahead and Maven and then I will in the lifecycle clean. First and then package. And it will go ahead and compile my Avro schema into a class. So you see now target generated sources avro. We have our customer. Excellent. If it didn't work, make sure that your producer could compile. Okay, so I commented out this code. Now I'm going to uncomment my code. Right here. And as you can see now, the customer is being imported. It's because I'm in the same package. Com example. If you had an other package, you would just need to right the import statements. Okay, so here we go. We have a Kafka producer, we have a string, and now we need to create a customer. So for this we know this customer dot new builder dot build at the very end. And so we set the first name, our good old John, we set the last name, so we set his age. He's still 26. We said his height. He's grown up a little bit and we set his weight. You also took some weight. And we'll set the automated email being false. Okay, so here's our customer. And now we have a producer record which takes the topic in the value as being the customer. So now we can do Kafka producer dot send. The producer record, and this time we'll add a callback. So new callback. And on completion we check the exception. So if the exception is null. Else. Okay. So if the exception is null, well, we'll just print ln success. And maybe we'll do another print. Ln and we'll say metadata to string. It's going to print the metadata. So it's going to print basically the record offset and partition. And if there is an error, we'll do exception dot print stack trace. Okay. Finally, just to make sure that produce that record does indeed get sent, we'll flush and then we'll close it. And here we go. We have our first Kafka producer that's using Avro. But, you know, let's be convinced about it. Let's run it. So I click here and run it. And get an error. String serializer is not an instance of serializer. So obviously the string serializer right here I get and it's amazing to get errors because you have to go through them as well in here. Look at the import import code has dot Jackson dot map. That's it's not the right one. I remove this and I will import and then I get a choice. So I will use the first one the org dot Apache kafka.com dot serialization. Okay, much better. Let's run our code again and I'll keep these errors because errors are very nice. Even I do errors all the time and they keep you on learning. So that's why I keep them on record. The course is not perfect. I'm a programmer like everyone else, so here we go. Success Customer Avro zero at zero. That means partition zero and offset zero. If I run this again, we'll get partition zero and offset one. All right. Amazing. So it's been written. Let's double check that using maybe the UI. So if I refresh the schema registry, we now have a customer avro value and here is our record, our schema right here. But if we want to check for some data, we can go to topic. And customer Avro. And after a second, we should see. Here we go. Here's our customer, John Doe. It's been there. There's a last way of checking. As you know, the last way of checking is to use a terminal. And for this, I can do my docker run. In schema Registry bash. All right, I'm in it. And then I do Kafka, Avro Console, Consumer Bootstrap Server. Being for me. 127 0019092 but changes to your app. The topic is customer Avro. We want it from beginning and we want to add a property and I'm lazy so I'm just going to copy this from here. The property is the schema registry URL, so let's just go ahead and print this. Here we go. Enter. And we see our John Doe using the console consumer. Amazing. Now if you run the producer once more and go into terminal, we see that there's a third John Doe that has appeared. So congratulations. You just wrote your first Java Kafka Avro producer. And that was fairly easy, right? Everything we learned before just apply to a producer record and then we have a callback flush close. Okay. Hope you loved the learning. Hope you remember to add the schema URL as a property and I will see you in the next lecture.