gRPC with SpringBoot and Kotlin
gRPC for Spring Boot with Kotlin - 完全ガイド
目次
- はじめに
- gRPCの基本概念
- Spring BootとgRPCの統合
- KotlinでのgRPC実装
- アーキテクチャ設計
- 実装例とベストプラクティス
- パフォーマンス最適化
- トラブルシューティングとデバッグ
1. はじめに
1.1 gRPCとは
gRPC(gRPC Remote Procedure Call)は、Googleが開発した高性能なオープンソースRPCフレームワークです。HTTP/2をベースとしており、Protocol Buffersをデフォルトのシリアライゼーション形式として使用します。
主な特徴:
- 高速通信: HTTP/2による多重化とバイナリプロトコル
- 言語中立: 複数のプログラミング言語をサポート
- 厳密な型定義: Protocol Buffersによる明確なスキーマ定義
- ストリーミング対応: 双方向ストリーミング通信をサポート
1.2 Spring BootとgRPCの相性
Spring Bootは、Javaエコシステムで最も人気のあるマイクロサービスフレームワークです。gRPC for Spring Bootライブラリにより、Spring Bootの習慣的な開発スタイルを保ちながら、gRPCの高性能さを活用できます。
1.3 Kotlinを選ぶ理由
Kotlinは、Java仮想機械(JVM)上で動作する現代的なプログラミング言語です。
- 簡潔性: Javaと比べてボイラープレート코드が少ない
- Null安全性: null参照エラーを排除する設計
- 相互運用性: 既存のJavaコードとの完全な互換性
- 関数型プログラミング: First-classの関数サポート
2. gRPCの基本概念
2.1 Protocol Buffers
Protocol Buffers(protobuf)は、Googleが開発した構造化データをシリアライズするための方法です。
基本的なスキーマ定義例(.proto ファイル):
syntax = "proto3";
package com.example.grpc;
option java_multiple_files = true;
option java_package = "com.example.grpc.generated";
option java_outer_classname = "UserServiceProto";
// ユーザー情報を表すメッセージ
message User {
int32 id = 1;
string name = 2;
string email = 3;
repeated string phone_numbers = 4;
enum Status {
UNKNOWN = 0;
ACTIVE = 1;
INACTIVE = 2;
}
Status status = 5;
google.protobuf.Timestamp created_at = 6;
}
// ページング情報
message PageInfo {
int32 page = 1;
int32 page_size = 2;
}
// リクエストメッセージ
message GetUserRequest {
int32 user_id = 1;
}
// リスポンスメッセージ
message GetUserResponse {
User user = 1;
}
// リスト取得リクエスト
message ListUsersRequest {
PageInfo pagination = 1;
}
// リスト取得レスポンス
message ListUsersResponse {
repeated User users = 1;
int32 total_count = 2;
}
// ユーザー作成リクエスト
message CreateUserRequest {
string name = 1;
string email = 2;
}
// ユーザー作成レスポンス
message CreateUserResponse {
User user = 1;
string message = 2;
}
// サービス定義
service UserService {
// 単一RPC
rpc GetUser(GetUserRequest) returns (GetUserResponse);
// サーバーストリーミング
rpc ListUsers(ListUsersRequest) returns (stream GetUserResponse);
// クライアントストリーミング
rpc BulkCreateUsers(stream CreateUserRequest) returns (CreateUserResponse);
// 双方向ストリーミング
rpc SyncUsers(stream User) returns (stream User);
}
2.2 RPC通信パターン
2.2.1 Unary RPC(単一RPC)
クライアントが1つのリクエストを送信し、サーバーが1つのレスポンスを返す最も基本的なパターンです。
Client Server
| |
|--- Request ------->|
| |
|<---- Response -----|
| |
2.2.2 Server Streaming RPC
クライアントが1つのリクエストを送信し、サーバーが複数のレスポンスをストリーミングで返すパターンです。
Client Server
| |
|--- Request ------->|
| |
|<---- Response 1 ----|
|<---- Response 2 ----|
|<---- Response 3 ----|
| |
2.2.3 Client Streaming RPC
クライアントが複数のリクエストをストリーミングで送信し、サーバーが1つのレスポンスを返すパターンです。
Client Server
| |
|--- Request 1 ----->|
|--- Request 2 ----->|
|--- Request 3 ----->|
| |
|<---- Response -----|
| |
2.2.4 Bidirectional Streaming RPC
クライアントとサーバーが双方向でストリーミング通信をするパターンです。
Client Server
| |
|--- Request 1 ----->|
|<---- Response 1 ----|
|--- Request 2 ----->|
|<---- Response 2 ----|
| |
2.3 gRPCのメタデータ
gRPCではHTTP/2ヘッダーを活用したメタデータの送受信が可能です。認証情報やトレーシング情報などを送信できます。
// メタデータの例
// クライアント側で送信
context.withCompression("gzip")
.withDeadlineAfter(5, TimeUnit.SECONDS)
3. Spring BootとgRPCの統合
3.1 依存関係の設定
build.gradle.kts:
import com.google.protobuf.gradle.id
plugins {
id("org.springframework.boot") version "3.2.0"
id("io.spring.dependency-management") version "1.1.4"
id("com.google.protobuf") version "0.9.4"
kotlin("jvm") version "1.9.20"
kotlin("plugin.spring") version "1.9.20"
}
group = "com.example"
version = "1.0.0"
java {
sourceCompatibility = JavaVersion.VERSION_17
}
repositories {
mavenCentral()
}
dependencies {
// Spring Boot
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-actuator")
// gRPC for Spring Boot
implementation("net.devh:grpc-server-spring-boot-starter:2.15.0.RELEASE")
implementation("net.devh:grpc-client-spring-boot-starter:2.15.0.RELEASE")
// Protocol Buffers
implementation("com.google.protobuf:protobuf-java:3.24.4")
implementation("com.google.protobuf:protobuf-kotlin:3.24.4")
// Kotlin
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
// ロギング
implementation("org.springframework.boot:spring-boot-starter-logging")
// テスト
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("io.grpc:grpc-testing:1.59.0")
}
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:3.24.4"
}
plugins {
id("grpc") {
artifact = "io.grpc:protoc-gen-grpc-java:1.59.0"
}
id("grpckt") {
artifact = "io.grpc:protoc-gen-grpc-kotlin:1.3.1:jdk8@jar"
}
}
generateProtoTasks {
all().forEach { task ->
task.plugins {
id("grpc")
id("grpckt")
}
}
}
}
3.2 application.yml 設定
spring:
application:
name: grpc-service
grpc:
server:
port: 9090
enable-keep-alive: true
keep-alive-time: 30s
keep-alive-timeout: 10s
permit-keep-alive-without-calls: true
max-inbound-message-size: 4194304 # 4MB
max-concurrent-streams: 100
client:
# クライアント設定は個別に指定可能
user-service:
address: static://localhost:9090
enable-keep-alive: true
keep-alive-time: 30s
keep-alive-timeout: 10s
idle-timeout: 300s
server:
port: 8080
logging:
level:
net.devh.boot.grpc: INFO
com.example: DEBUG
io.grpc: DEBUG
4. KotlinでのgRPC実装
4.1 プロジェクト構造
project/
├── src/main/kotlin/
│ └── com/example/grpc/
│ ├── config/ # gRPC設定
│ ├── service/ # ビジネスロジック
│ ├── controller/ # gRPCサービス実装
│ ├── repository/ # データアクセス層
│ └── GrpcServiceApplication.kt
├── src/main/proto/ # Protocol Buffer定義
│ └── com/example/grpc/
│ ├── user_service.proto
│ └── common.proto
├── src/test/kotlin/
├── build.gradle.kts
└── application.yml
4.2 Protocol Buffer ファイルの構成
src/main/proto/com/example/grpc/common.proto:
syntax = "proto3";
package com.example.grpc.common;
option java_multiple_files = true;
option java_package = "com.example.grpc.common";
import "google/protobuf/timestamp.proto";
message ErrorDetail {
string code = 1;
string message = 2;
string details = 3;
}
message PageRequest {
int32 page = 1;
int32 size = 2;
string sort_by = 3;
}
message PageResponse {
int32 current_page = 1;
int32 total_pages = 2;
int32 total_elements = 3;
int32 elements_in_page = 4;
}
src/main/proto/com/example/grpc/user_service.proto:
syntax = "proto3";
package com.example.grpc.user;
option java_multiple_files = true;
option java_package = "com.example.grpc.user";
option java_outer_classname = "UserServiceProto";
import "google/protobuf/timestamp.proto";
import "common.proto";
message User {
int64 id = 1;
string username = 2;
string email = 3;
string full_name = 4;
repeated string tags = 5;
bool active = 6;
google.protobuf.Timestamp created_at = 7;
google.protobuf.Timestamp updated_at = 8;
}
message GetUserRequest {
int64 user_id = 1;
}
message CreateUserRequest {
string username = 1;
string email = 2;
string full_name = 3;
repeated string tags = 4;
}
message UpdateUserRequest {
int64 user_id = 1;
string email = 2;
string full_name = 3;
}
message DeleteUserRequest {
int64 user_id = 1;
}
message ListUsersRequest {
common.PageRequest pagination = 1;
}
message ListUsersResponse {
repeated User users = 1;
common.PageResponse page = 2;
}
message BatchCreateUsersRequest {
repeated CreateUserRequest users = 1;
}
message BatchCreateUsersResponse {
repeated User created_users = 1;
repeated common.ErrorDetail errors = 2;
}
service UserService {
rpc GetUser(GetUserRequest) returns (User);
rpc CreateUser(CreateUserRequest) returns (User);
rpc UpdateUser(UpdateUserRequest) returns (User);
rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);
rpc ListUsers(ListUsersRequest) returns (stream User);
rpc BatchCreateUsers(stream CreateUserRequest) returns (BatchCreateUsersResponse);
rpc SyncUsers(stream User) returns (stream User);
}
4.3 gRPC サービスの実装
com/example/grpc/entity/User.kt:
package com.example.grpc.entity
import java.time.LocalDateTime
import jakarta.persistence.*
@Entity
@Table(name = "users")
class User(
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
val id: Long = 0,
@Column(unique = true, nullable = false)
var username: String,
@Column(unique = true, nullable = false)
var email: String,
@Column(nullable = false)
var fullName: String,
@ElementCollection
@CollectionTable(name = "user_tags", joinColumns = [JoinColumn(name = "user_id")])
@Column(name = "tag")
var tags: MutableSet<String> = mutableSetOf(),
@Column(nullable = false)
var active: Boolean = true,
@Column(nullable = false, updatable = false)
val createdAt: LocalDateTime = LocalDateTime.now(),
@Column(nullable = false)
var updatedAt: LocalDateTime = LocalDateTime.now()
)
com/example/grpc/repository/UserRepository.kt:
package com.example.grpc.repository
import com.example.grpc.entity.User
import org.springframework.data.domain.Page
import org.springframework.data.domain.Pageable
import org.springframework.data.jpa.repository.JpaRepository
import org.springframework.stereotype.Repository
@Repository
interface UserRepository : JpaRepository<User, Long> {
fun findByUsername(username: String): User?
fun findByEmail(email: String): User?
fun findByActiveTrue(pageable: Pageable): Page<User>
}
com/example/grpc/service/UserService.kt:
package com.example.grpc.service
import com.example.grpc.entity.User
import com.example.grpc.repository.UserRepository
import org.springframework.data.domain.Pageable
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.time.LocalDateTime
@Service
@Transactional
class UserService(
private val userRepository: UserRepository
) {
fun getUserById(id: Long): User? = userRepository.findById(id).orElse(null)
fun createUser(username: String, email: String, fullName: String, tags: List<String> = emptyList()): User {
val user = User(
username = username,
email = email,
fullName = fullName,
tags = tags.toMutableSet()
)
return userRepository.save(user)
}
fun updateUser(id: Long, email: String?, fullName: String?): User? {
val user = userRepository.findById(id).orElse(null) ?: return null
email?.let { user.email = it }
fullName?.let { user.fullName = it }
user.updatedAt = LocalDateTime.now()
return userRepository.save(user)
}
fun deleteUser(id: Long) {
userRepository.deleteById(id)
}
fun listActiveUsers(pageable: Pageable): List<User> {
return userRepository.findByActiveTrue(pageable).content
}
fun getAllUsers(pageable: Pageable): List<User> {
return userRepository.findAll(pageable).content
}
}
com/example/grpc/controller/UserServiceGrpcImpl.kt:
package com.example.grpc.controller
import com.example.grpc.service.UserService
import com.example.grpc.user.*
import io.grpc.stub.StreamObserver
import net.devh.boot.grpc.server.service.GrpcService
import org.springframework.data.domain.PageRequest
import java.time.Instant
import com.google.protobuf.Empty
import com.google.protobuf.Timestamp as ProtoTimestamp
@GrpcService
class UserServiceGrpcImpl(
private val userService: UserService
) : UserServiceGrpc.UserServiceImplBase() {
// Unary RPC: 単一ユーザーの取得
override fun getUser(request: GetUserRequest, responseObserver: StreamObserver<User>) {
try {
val user = userService.getUserById(request.userId)
if (user != null) {
responseObserver.onNext(user.toProto())
responseObserver.onCompleted()
} else {
responseObserver.onError(
io.grpc.Status.NOT_FOUND
.withDescription("User not found with ID: ${request.userId}")
.asException()
)
}
} catch (e: Exception) {
responseObserver.onError(
io.grpc.Status.INTERNAL
.withDescription("Error fetching user: ${e.message}")
.asException()
)
}
}
// Unary RPC: ユーザー作成
override fun createUser(request: CreateUserRequest, responseObserver: StreamObserver<User>) {
try {
val user = userService.createUser(
username = request.username,
email = request.email,
fullName = request.fullName,
tags = request.tagsList
)
responseObserver.onNext(user.toProto())
responseObserver.onCompleted()
} catch (e: Exception) {
responseObserver.onError(
io.grpc.Status.INVALID_ARGUMENT
.withDescription("Error creating user: ${e.message}")
.asException()
)
}
}
// Unary RPC: ユーザー更新
override fun updateUser(request: UpdateUserRequest, responseObserver: StreamObserver<User>) {
try {
val user = userService.updateUser(
id = request.userId,
email = request.email.takeIf { it.isNotEmpty() },
fullName = request.fullName.takeIf { it.isNotEmpty() }
)
if (user != null) {
responseObserver.onNext(user.toProto())
responseObserver.onCompleted()
} else {
responseObserver.onError(
io.grpc.Status.NOT_FOUND
.withDescription("User not found with ID: ${request.userId}")
.asException()
)
}
} catch (e: Exception) {
responseObserver.onError(
io.grpc.Status.INTERNAL
.withDescription("Error updating user: ${e.message}")
.asException()
)
}
}
// Unary RPC: ユーザー削除
override fun deleteUser(request: DeleteUserRequest, responseObserver: StreamObserver<Empty>) {
try {
userService.deleteUser(request.userId)
responseObserver.onNext(Empty.getDefaultInstance())
responseObserver.onCompleted()
} catch (e: Exception) {
responseObserver.onError(
io.grpc.Status.INTERNAL
.withDescription("Error deleting user: ${e.message}")
.asException()
)
}
}
// Server Streaming RPC: ユーザー一覧の取得
override fun listUsers(request: ListUsersRequest, responseObserver: StreamObserver<User>) {
try {
val pageRequest = PageRequest.of(
request.pagination.page.coerceAtLeast(1) - 1,
request.pagination.size.coerceIn(1, 100)
)
val users = userService.listActiveUsers(pageRequest)
users.forEach { user ->
responseObserver.onNext(user.toProto())
}
responseObserver.onCompleted()
} catch (e: Exception) {
responseObserver.onError(
io.grpc.Status.INTERNAL
.withDescription("Error listing users: ${e.message}")
.asException()
)
}
}
// Client Streaming RPC: バッチユーザー作成
override fun batchCreateUsers(responseObserver: StreamObserver<BatchCreateUsersResponse>): StreamObserver<CreateUserRequest> {
return object : StreamObserver<CreateUserRequest> {
private val createdUsers = mutableListOf<User>()
private val errors = mutableListOf<com.example.grpc.common.ErrorDetail>()
override fun onNext(request: CreateUserRequest) {
try {
val user = userService.createUser(
username = request.username,
email = request.email,
fullName = request.fullName,
tags = request.tagsList
)
createdUsers.add(user)
} catch (e: Exception) {
errors.add(
com.example.grpc.common.ErrorDetail.newBuilder()
.setCode("CREATE_ERROR")
.setMessage("Failed to create user: ${request.username}")
.setDetails(e.message ?: "Unknown error")
.build()
)
}
}
override fun onError(t: Throwable) {
responseObserver.onError(
io.grpc.Status.INTERNAL
.withDescription("Streaming error: ${t.message}")
.asException()
)
}
override fun onCompleted() {
val response = BatchCreateUsersResponse.newBuilder()
.addAllCreatedUsers(createdUsers.map { it.toProto() })
.addAllErrors(errors)
.build()
responseObserver.onNext(response)
responseObserver.onCompleted()
}
}
}
// Bidirectional Streaming RPC: ユーザー同期
override fun syncUsers(responseObserver: StreamObserver<User>): StreamObserver<User> {
return object : StreamObserver<User> {
override fun onNext(request: User) {
try {
// Protocol Bufferのメッセージをドメインエンティティに変換
val user = if (request.id > 0) {
userService.updateUser(
id = request.id,
email = request.email.takeIf { it.isNotEmpty() },
fullName = request.fullName.takeIf { it.isNotEmpty() }
)
} else {
userService.createUser(
username = request.username,
email = request.email,
fullName = request.fullName,
tags = request.tagsList
)
}
if (user != null) {
responseObserver.onNext(user.toProto())
}
} catch (e: Exception) {
responseObserver.onError(
io.grpc.Status.INTERNAL
.withDescription("Sync error: ${e.message}")
.asException()
)
}
}
override fun onError(t: Throwable) {
responseObserver.onError(
io.grpc.Status.INTERNAL
.withDescription("Streaming error: ${t.message}")
.asException()
)
}
override fun onCompleted() {
responseObserver.onCompleted()
}
}
}
// ドメインエンティティをProtocol Bufferメッセージに変換
private fun com.example.grpc.entity.User.toProto(): User {
return User.newBuilder()
.setId(this.id)
.setUsername(this.username)
.setEmail(this.email)
.setFullName(this.fullName)
.addAllTags(this.tags)
.setActive(this.active)
.setCreatedAt(this.createdAt.toProtoTimestamp())
.setUpdatedAt(this.updatedAt.toProtoTimestamp())
.build()
}
// LocalDateTimeをProto Timestampに変換
private fun java.time.LocalDateTime.toProtoTimestamp(): ProtoTimestamp {
val instant = this.atZone(java.time.ZoneId.systemDefault()).toInstant()
return ProtoTimestamp.newBuilder()
.setSeconds(instant.epochSecond)
.setNanos(instant.nano)
.build()
}
}
4.4 gRPC クライアントの実装
com/example/grpc/client/UserServiceClient.kt:
package com.example.grpc.client
import com.example.grpc.user.*
import io.grpc.Channel
import io.grpc.ManagedChannelBuilder
import io.grpc.stub.StreamObserver
import kotlinx.coroutines.*
import org.slf4j.LoggerFactory
import java.util.concurrent.TimeUnit
class UserServiceClient(
private val channel: Channel
) {
private val stub: UserServiceGrpc.UserServiceStub = UserServiceGrpc.newStub(channel)
private val blockingStub: UserServiceGrpc.UserServiceBlockingStub = UserServiceGrpc.newBlockingStub(channel)
private val logger = LoggerFactory.getLogger(javaClass)
// Unary RPC呼び出し(ブロッキング)
fun getUser(userId: Long): User? {
return try {
val request = GetUserRequest.newBuilder().setUserId(userId).build()
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).getUser(request)
} catch (e: Exception) {
logger.error("Error getting user: ${e.message}", e)
null
}
}
// Unary RPC呼び出し(非ブロッキング)
suspend fun getUserAsync(userId: Long): User? = withContext(Dispatchers.IO) {
suspendCancellableCoroutine { continuation ->
val request = GetUserRequest.newBuilder().setUserId(userId).build()
stub.getUser(request, object : StreamObserver<User> {
override fun onNext(value: User) {
continuation.resume(value)
}
override fun onError(t: Throwable) {
continuation.resumeWithException(t)
}
override fun onCompleted() {}
})
}
}
// Server Streaming RPC呼び出し
fun listUsers(page: Int = 1, pageSize: Int = 10): List<User> {
val users = mutableListOf<User>()
val countDownLatch = java.util.concurrent.CountDownLatch(1)
val request = ListUsersRequest.newBuilder()
.setPagination(
com.example.grpc.common.PageRequest.newBuilder()
.setPage(page)
.setSize(pageSize)
.build()
)
.build()
stub.listUsers(request, object : StreamObserver<User> {
override fun onNext(value: User) {
users.add(value)
}
override fun onError(t: Throwable) {
logger.error("Error listing users: ${t.message}", t)
countDownLatch.countDown()
}
override fun onCompleted() {
countDownLatch.countDown()
}
})
countDownLatch.await(10, TimeUnit.SECONDS)
return users
}
// Client Streaming RPC呼び出し
fun batchCreateUsers(requests: List<CreateUserRequest>): BatchCreateUsersResponse? {
val countDownLatch = java.util.concurrent.CountDownLatch(1)
var response: BatchCreateUsersResponse? = null
val requestObserver = stub.batchCreateUsers(object : StreamObserver<BatchCreateUsersResponse> {
override fun onNext(value: BatchCreateUsersResponse) {
response = value
}
override fun onError(t: Throwable) {
logger.error("Error in batch create: ${t.message}", t)
countDownLatch.countDown()
}
override fun onCompleted() {
countDownLatch.countDown()
}
})
try {
requests.forEach { request ->
requestObserver.onNext(request)
}
requestObserver.onCompleted()
} catch (e: Exception) {
requestObserver.onError(e)
}
countDownLatch.await(30, TimeUnit.SECONDS)
return response
}
// Bidirectional Streaming RPC呼び出し
fun syncUsers(users: List<User>): List<User> {
val syncedUsers = mutableListOf<User>()
val countDownLatch = java.util.concurrent.CountDownLatch(1)
val requestObserver = stub.syncUsers(object : StreamObserver<User> {
override fun onNext(value: User) {
syncedUsers.add(value)
}
override fun onError(t: Throwable) {
logger.error("Error in sync: ${t.message}", t)
countDownLatch.countDown()
}
override fun onCompleted() {
countDownLatch.countDown()
}
})
try {
users.forEach { user ->
requestObserver.onNext(user)
}
requestObserver.onCompleted()
} catch (e: Exception) {
requestObserver.onError(e)
}
countDownLatch.await(30, TimeUnit.SECONDS)
return syncedUsers
}
fun shutdown() {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS)
}
}
// クライアント使用例
suspend fun main() {
val channel = ManagedChannelBuilder.forAddress("localhost", 9090)
.usePlaintext()
.build()
val client = UserServiceClient(channel)
try {
// Unary RPC
val user = client.getUserAsync(1)
println("User: $user")
// Server Streaming
val users = client.listUsers()
println("Users: $users")
// Client Streaming
val createRequests = listOf(
CreateUserRequest.newBuilder().setUsername("user1").setEmail("user1@example.com").setFullName("User One").build(),
CreateUserRequest.newBuilder().setUsername("user2").setEmail("user2@example.com").setFullName("User Two").build()
)
val batchResponse = client.batchCreateUsers(createRequests)
println("Batch Response: $batchResponse")
} finally {
client.shutdown()
}
}
4.5 Spring Boot統合クライアント
com/example/grpc/config/GrpcClientConfig.kt:
package com.example.grpc.config
import io.grpc.ManagedChannel
import net.devh.boot.grpc.client.inject.GrpcClient
import org.springframework.context.annotation.Configuration
@Configuration
class GrpcClientConfig {
// @GrpcClient アノテーションを使用してクライアントを注入
// application.ymlで設定された接続情報を自動的に使用
}
com/example/grpc/component/UserServiceConsumer.kt:
package com.example.grpc.component
import com.example.grpc.user.*
import net.devh.boot.grpc.client.inject.GrpcClient
import org.springframework.stereotype.Component
import org.slf4j.LoggerFactory
@Component
class UserServiceConsumer {
@GrpcClient("user-service")
private lateinit var userServiceStub: UserServiceGrpc.UserServiceBlockingStub
private val logger = LoggerFactory.getLogger(javaClass)
fun fetchRemoteUser(userId: Long): User? {
return try {
val request = GetUserRequest.newBuilder().setUserId(userId).build()
userServiceStub.getUser(request)
} catch (e: Exception) {
logger.error("Error fetching remote user: ${e.message}", e)
null
}
}
}
5. アーキテクチャ設計
5.1 マイクロサービスアーキテクチャ
gRPCはマイクロサービス間通信に最適化されています。
┌─────────────────────────────────────────────────────────┐
│ API Gateway │
│ (gRPC or REST/HTTP) │
└───────────────────┬─────────────────────────────────────┘
│
┌───────────┼───────────┐
│ │ │
v v v
┌─────────┐ ┌─────────┐ ┌─────────┐
│User │ │Order │ │Payment │
│Service │ │Service │ │Service │
└─────────┘ └─────────┘ └─────────┘
│ │ │
└───────────┼───────────┘
│
┌───────┴───────┐
│ │
┌─────────┐ ┌─────────┐
│Database │ │Cache │
└─────────┘ └─────────┘
5.2 サービスディスカバリー
com/example/grpc/config/ServiceDiscoveryConfig.kt:
package com.example.grpc.config
import org.springframework.cloud.client.discovery.EnableDiscoveryClient
import org.springframework.context.annotation.Configuration
@Configuration
@EnableDiscoveryClient
class ServiceDiscoveryConfig
application.ymlでのサービスディスカバリー設定:
spring:
cloud:
consul:
host: localhost
port: 8500
discovery:
service-name: grpc-user-service
port: 9090
prefer-ip-address: true
grpc:
client:
# Consulから自動的にサービス情報を取得
user-service:
address: consul://localhost:8500
enable-keep-alive: true
5.3 負荷分散
gRPCの負荷分散は複数の戦略をサポートしています:
grpc:
client:
user-service:
address: static://server1:9090,static://server2:9090,static://server3:9090
load-balancing-policy: round_robin # round_robin, pick_first, grpclb
6. 実装例とベストプラクティス
6.1 エラーハンドリング
package com.example.grpc.exception
import io.grpc.Status
import io.grpc.StatusRuntimeException
// カスタム例外
sealed class GrpcException(message: String) : Exception(message) {
class UserNotFoundException(userId: Long) : GrpcException("User not found: $userId")
class UserAlreadyExistsException(username: String) : GrpcException("User already exists: $username")
class ValidationException(message: String) : GrpcException(message)
class InternalServerException(message: String) : GrpcException(message)
}
// 例外マッピング
fun Throwable.toGrpcStatus(): Status = when (this) {
is GrpcException.UserNotFoundException -> Status.NOT_FOUND.withDescription(this.message)
is GrpcException.UserAlreadyExistsException -> Status.ALREADY_EXISTS.withDescription(this.message)
is GrpcException.ValidationException -> Status.INVALID_ARGUMENT.withDescription(this.message)
is GrpcException.InternalServerException -> Status.INTERNAL.withDescription(this.message)
else -> Status.UNKNOWN.withDescription(this.message)
}
// インターセプタでエラーハンドリング
override fun onError(t: Throwable) {
val status = t.toGrpcStatus()
responseObserver.onError(status.asException())
}
6.2 インターセプタとミドルウェア
com/example/grpc/interceptor/LoggingInterceptor.kt:
package com.example.grpc.interceptor
import io.grpc.*
import org.slf4j.LoggerFactory
class LoggingInterceptor : ServerInterceptor {
private val logger = LoggerFactory.getLogger(javaClass)
override fun <ReqT, RespT> interceptCall(
call: ServerCall<ReqT, RespT>,
headers: Metadata,
next: ServerCallHandler<ReqT, RespT>
): ServerCall.Listener<ReqT> {
logger.info("Method: ${call.methodDescriptor.fullMethodName}")
logger.info("Headers: $headers")
return object : ServerCall.Listener<ReqT>() {
override fun onMessage(message: ReqT) {
logger.debug("Request: $message")
super.onMessage(message)
}
override fun onHalfClose() {
logger.debug("Half close")
super.onHalfClose()
}
}.run {
next.startCall(
object : ServerCall<ReqT, RespT>() {
override fun request(numMessages: Int) {
call.request(numMessages)
}
override fun sendHeaders(headers: Metadata) {
call.sendHeaders(headers)
}
override fun sendMessage(message: RespT) {
logger.debug("Response: $message")
call.sendMessage(message)
}
override fun close(status: Status, trailers: Metadata) {
logger.info("Status: $status")
call.close(status, trailers)
}
override fun isCancelled(): Boolean = call.isCancelled
override fun getMethodDescriptor(): MethodDescriptor<ReqT, RespT> = call.methodDescriptor
},
headers
)
}
}
}
// インターセプタの登録
@Configuration
class GrpcServerConfiguration {
@Bean
fun loggingInterceptor(): LoggingInterceptor = LoggingInterceptor()
}
com/example/grpc/interceptor/AuthenticationInterceptor.kt:
package com.example.grpc.interceptor
import io.grpc.*
import org.slf4j.LoggerFactory
class AuthenticationInterceptor : ServerInterceptor {
private val logger = LoggerFactory.getLogger(javaClass)
companion object {
private val AUTHORIZATION_METADATA_KEY = Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER)
}
override fun <ReqT, RespT> interceptCall(
call: ServerCall<ReqT, RespT>,
headers: Metadata,
next: ServerCallHandler<ReqT, RespT>
): ServerCall.Listener<ReqT> {
val authToken = headers.get(AUTHORIZATION_METADATA_KEY)
if (authToken.isNullOrEmpty()) {
val status = Status.UNAUTHENTICATED.withDescription("Missing authorization token")
call.close(status, Metadata())
return object : ServerCall.Listener<ReqT>() {}
}
// トークンの検証
if (!validateToken(authToken)) {
val status = Status.PERMISSION_DENIED.withDescription("Invalid token")
call.close(status, Metadata())
return object : ServerCall.Listener<ReqT>() {}
}
logger.debug("Authentication successful for token: $authToken")
return next.startCall(call, headers)
}
private fun validateToken(token: String): Boolean {
// トークン検証ロジック
return token.startsWith("Bearer ")
}
}
6.3 テスト
com/example/grpc/controller/UserServiceGrpcImplTest.kt:
package com.example.grpc.controller
import com.example.grpc.entity.User as UserEntity
import com.example.grpc.service.UserService
import com.example.grpc.user.*
import io.grpc.testing.GrpcCleanupRule
import io.grpc.inprocess.InProcessChannelBuilder
import io.grpc.inprocess.InProcessServerBuilder
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.mockito.Mockito
import java.time.LocalDateTime
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
class UserServiceGrpcImplTest {
@get:Rule
val grpcCleanup = GrpcCleanupRule()
private lateinit var userService: UserService
private lateinit var stub: UserServiceGrpc.UserServiceBlockingStub
@Before
fun setUp() {
userService = Mockito.mock(UserService::class.java)
val serviceImpl = UserServiceGrpcImpl(userService)
val serverName = InProcessServerBuilder.generateName()
grpcCleanup.register(
InProcessServerBuilder(serverName)
.addService(serviceImpl)
.directExecutor()
.build()
.start()
)
val channel = grpcCleanup.register(
InProcessChannelBuilder.forName(serverName)
.directExecutor()
.build()
)
stub = UserServiceGrpc.newBlockingStub(channel)
}
@Test
fun testGetUserSuccess() {
val mockUser = UserEntity(
id = 1L,
username = "testuser",
email = "test@example.com",
fullName = "Test User",
tags = mutableSetOf("tag1"),
active = true,
createdAt = LocalDateTime.now(),
updatedAt = LocalDateTime.now()
)
Mockito.`when`(userService.getUserById(1L)).thenReturn(mockUser)
val request = GetUserRequest.newBuilder().setUserId(1L).build()
val response = stub.getUser(request)
assertNotNull(response)
assertEquals("testuser", response.username)
assertEquals("test@example.com", response.email)
}
@Test
fun testCreateUserSuccess() {
val mockUser = UserEntity(
id = 1L,
username = "newuser",
email = "new@example.com",
fullName = "New User",
tags = mutableSetOf()
)
Mockito.`when`(userService.createUser("newuser", "new@example.com", "New User", emptyList()))
.thenReturn(mockUser)
val request = CreateUserRequest.newBuilder()
.setUsername("newuser")
.setEmail("new@example.com")
.setFullName("New User")
.build()
val response = stub.createUser(request)
assertNotNull(response)
assertEquals("newuser", response.username)
}
}
7. パフォーマンス最適化
7.1 接続管理
grpc:
server:
# スレッドプール設定
max-concurrent-streams: 100
# Keep-Alive設定
enable-keep-alive: true
keep-alive-time: 30s
keep-alive-timeout: 10s
permit-keep-alive-without-calls: true
# バッファ設定
max-inbound-message-size: 4194304 # 4MB
max-header-list-size: 16384 # 16KB
client:
keep-alive-time: 30s
keep-alive-timeout: 10s
idle-timeout: 300s
max-retry-attempts: 3
7.2 圧縮
// サーバー側
override fun getUser(request: GetUserRequest, responseObserver: StreamObserver<User>) {
// 圧縮を適用
responseObserver.setCompression("gzip")
// ...
}
// クライアント側
val request = GetUserRequest.newBuilder().setUserId(1L).build()
val call = stub.withCompression("gzip").getUser(request)
7.3 バッチ処理
// バッチ処理の効率的な実装
override fun batchCreateUsers(responseObserver: StreamObserver<BatchCreateUsersResponse>): StreamObserver<CreateUserRequest> {
return object : StreamObserver<CreateUserRequest> {
private val batch = mutableListOf<CreateUserRequest>()
private val BATCH_SIZE = 100
override fun onNext(request: CreateUserRequest) {
batch.add(request)
if (batch.size >= BATCH_SIZE) {
processBatch()
}
}
override fun onCompleted() {
if (batch.isNotEmpty()) {
processBatch()
}
responseObserver.onCompleted()
}
private fun processBatch() {
// バッチ処理ロジック
batch.clear()
}
override fun onError(t: Throwable) {
responseObserver.onError(t)
}
}
}
7.4 モニタリングとメトリクス
com/example/grpc/config/MetricsConfig.kt:
package com.example.grpc.config
import io.grpc.health.v1.HealthCheckResponse
import io.micrometer.core.instrument.MeterRegistry
import org.springframework.boot.actuate.health.Health
import org.springframework.boot.actuate.health.HealthIndicator
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
class MetricsConfig {
@Bean
fun grpcHealthIndicator(): HealthIndicator {
return HealthIndicator {
try {
// gRPCサービスのヘルスチェック
Health.up()
.withDetail("grpc.port", 9090)
.withDetail("grpc.status", "running")
.build()
} catch (e: Exception) {
Health.down()
.withDetail("error", e.message)
.build()
}
}
}
}
application.ymlでのメトリクス設定:
management:
endpoints:
web:
exposure:
include: health,metrics,prometheus
metrics:
export:
prometheus:
enabled: true
tags:
application: grpc-user-service
8. トラブルシューティングとデバッグ
8.1 一般的な問題と解決策
問題1: Connection refused
io.grpc.StatusRuntimeException: UNAVAILABLE: Unable to resolve host
解決策:
- サーバーが起動しているか確認
- ポート番号が正しいか確認
- ファイアウォール設定を確認
# サーバーの起動確認
lsof -i :9090
# ネットワーク接続の確認
netstat -an | grep 9090
問題2: Deadline exceeded
io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED
解決策:
- タイムアウト時間を延長
- バックエンド処理を最適化
stub.withDeadlineAfter(30, TimeUnit.SECONDS).getUser(request)
問題3: Resource exhausted
io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED
解決策:
- バッファサイズを増やす
- コネクションプーリングを確認
grpc:
server:
max-inbound-message-size: 8388608 # 8MB
max-concurrent-streams: 200
8.2 デバッグ設定
logging:
level:
io.grpc: DEBUG
net.devh.boot.grpc: DEBUG
com.example.grpc: DEBUG
8.3 gRPC CLI ツール
# gRPC serviceの確認
grpcurl -plaintext localhost:9090 list
# RPC呼び出し
grpcurl -plaintext \
-d '{"user_id": 1}' \
localhost:9090 com.example.grpc.user.UserService.GetUser
# streaming RPC
grpcurl -plaintext \
-d '{"pagination": {"page": 1, "size": 10}}' \
localhost:9090 com.example.grpc.user.UserService.ListUsers
まとめ
gRPC for Spring Boot with Kotlinは、高性能で信頼性の高いマイクロサービス通信を実現するための強力なソリューションです。
主なポイント
-
Protocol Buffersによる型安全性: スキーマファーストなアプローチにより、クライアント・サーバー間の契約を明確に定義
-
HTTP/2による高速通信: バイナリプロトコルと多重化により、RESTful APIと比べて高速で効率的な通信
-
ストリーミング対応: 単一RPC、サーバーストリーミング、クライアントストリーミング、双方向ストリーミングの4つのパターンをサポート
-
Spring Bootの統合: 既存のSpring Bootエコシステムとシームレスに統合され、習慣的な開発が可能
-
Kotlinの簡潔性: Null安全性と関数型プログラミング機能により、堅牢で保守性の高いコードが実現
次のステップ
- トレーシング: Jaegerなどの分散トレーシングツールを導入
- セキュリティ: TLS/SSL通信とトークンベース認証の実装
- スケーリング: Kubernetes環境でのデプロイと管理
- テスト: 統合テストとパフォーマンステストの充実
gRPCは、現代的なマイクロサービスアーキテクチャにおいて、RESTful APIの代替手段として、またはそれ以上の価値を提供します。Spring BootとKotlinの組み合わせにより、開発者は生産性を損なわずに、高性能なシステムを構築できます。