QuarkusでKafka ConsumerとProducerを始める方法を徹底解説!初心者向けガイド
生徒
「先生、QuarkusでKafkaを使ったイベント駆動開発を始めたいのですが、何から手をつければいいですか?」
先生
「まずはQuarkusプロジェクトを作成して、Kafkaの依存関係を追加することが最初のステップです。」
生徒
「KafkaのProducerとConsumerはどのように実装すればいいですか?」
先生
「順番にProducerとConsumerを作成して、メッセージ送受信を確認する方法を紹介します。」
1. Quarkusとは?基本と歴史
QuarkusはJava向けの高速で軽量なフレームワークで、特にクラウド環境やコンテナ環境でのマイクロサービス開発に最適化されています。GraalVMやNative Imageと組み合わせることで、起動速度が非常に速く、メモリ消費も少ないのが特徴です。KafkaやAMQPなどのイベント駆動開発にも対応しており、リアクティブプログラミングのサポートも充実しています。
2. Quarkusの開発環境を整える
QuarkusでKafkaを使うには、まず開発環境を整える必要があります。IntelliJ IDEAやVSCodeを使い、MavenまたはGradleでプロジェクトを作成します。Kafka Clientを使用するために、Quarkus Kafka拡張を追加します。
./mvn quarkus:create \
-DprojectGroupId=com.example \
-DprojectArtifactId=quarkus-kafka-demo \
-DclassName="com.example.KafkaResource" \
-Dpath="/kafka"
3. Quarkusプロジェクト構成と依存管理
QuarkusのMavenプロジェクトでは、pom.xmlで依存管理を行います。Kafka用にはquarkus-smallrye-reactive-messaging-kafkaを追加することで、ProducerとConsumerを簡単に実装できます。
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
4. Kafka Producerの実装方法
Kafka ProducerはメッセージをKafkaトピックに送信する役割を持っています。Quarkusでは@ChannelとEmitterを使うと簡単に送信できます。
import io.smallrye.reactive.messaging.annotations.Channel;
import io.smallrye.reactive.messaging.annotations.Emitter;
import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@Path("/send")
public class KafkaProducerResource {
@Inject
@Channel("my-topic")
Emitter<String> emitter;
@POST
public void sendMessage(String message) {
emitter.send(message);
}
}
このコードでは、HTTP POSTリクエストで受け取った文字列をKafkaのmy-topicトピックに送信します。
5. Kafka Consumerの実装方法
Kafka Consumerはトピックからメッセージを受信します。Quarkusでは@Incomingアノテーションを使うことでリアクティブに処理できます。
import io.smallrye.reactive.messaging.annotations.Incoming;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class KafkaConsumer {
@Incoming("my-topic")
public void receive(String message) {
System.out.println("Received message: " + message);
}
}
このコードを実行すると、Producerが送信したメッセージをリアルタイムで受信して処理できます。
6. application.propertiesでのKafka設定
KafkaをQuarkusで使用する際には、application.propertiesにブローカーの情報やトピック名を設定する必要があります。例えば、ローカルKafkaの場合は次のように設定します。
mp.messaging.outgoing.my-topic.connector=smallrye-kafka
mp.messaging.outgoing.my-topic.topic=my-topic
mp.messaging.outgoing.my-topic.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.incoming.my-topic.connector=smallrye-kafka
mp.messaging.incoming.my-topic.topic=my-topic
mp.messaging.incoming.my-topic.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
この設定により、Quarkusは指定したトピックにメッセージを送受信できるようになります。
7. 実際にProducerとConsumerを動かしてみる
Quarkusアプリを起動した後、ブラウザやcurlからHTTP POSTでメッセージを送信すると、Consumer側で受信されることを確認できます。
curl -X POST http://localhost:8080/send -d "Hello Quarkus Kafka!"
Received message: Hello Quarkus Kafka!
この手順でKafkaを使ったイベント駆動アプリケーションをQuarkus上で簡単に体験できます。
8. Kafkaメッセージの非同期処理とリアクティブのメリット
QuarkusとSmallRye Reactive Messagingを使うと、Kafkaのメッセージ処理が非同期で行われ、スケーラブルなシステム構築が可能です。大量のメッセージを効率的に処理したり、バックプレッシャーを管理したりできる点が大きな利点です。
@Incoming("my-topic")
public void process(String message) {
Uni.createFrom().item(message)
.onItem().invoke(m -> System.out.println("Processing: " + m))
.subscribe().with(m -> {}, failure -> failure.printStackTrace());
}
このようにUniやReactive Streamsを活用することで、よりリアクティブなKafkaアプリケーションを構築できます。