gRPC with SpringBoot and Kotlin

gRPC for Spring Boot with Kotlin - 完全ガイド

目次

  1. はじめに
  2. gRPCの基本概念
  3. Spring BootとgRPCの統合
  4. KotlinでのgRPC実装
  5. アーキテクチャ設計
  6. 実装例とベストプラクティス
  7. パフォーマンス最適化
  8. トラブルシューティングとデバッグ

1. はじめに

1.1 gRPCとは

gRPC(gRPC Remote Procedure Call)は、Googleが開発した高性能なオープンソースRPCフレームワークです。HTTP/2をベースとしており、Protocol Buffersをデフォルトのシリアライゼーション形式として使用します。

主な特徴:

  • 高速通信: HTTP/2による多重化とバイナリプロトコル
  • 言語中立: 複数のプログラミング言語をサポート
  • 厳密な型定義: Protocol Buffersによる明確なスキーマ定義
  • ストリーミング対応: 双方向ストリーミング通信をサポート

1.2 Spring BootとgRPCの相性

Spring Bootは、Javaエコシステムで最も人気のあるマイクロサービスフレームワークです。gRPC for Spring Bootライブラリにより、Spring Bootの習慣的な開発スタイルを保ちながら、gRPCの高性能さを活用できます。

1.3 Kotlinを選ぶ理由

Kotlinは、Java仮想機械(JVM)上で動作する現代的なプログラミング言語です。

  • 簡潔性: Javaと比べてボイラープレート코드が少ない
  • Null安全性: null参照エラーを排除する設計
  • 相互運用性: 既存のJavaコードとの完全な互換性
  • 関数型プログラミング: First-classの関数サポート

2. gRPCの基本概念

2.1 Protocol Buffers

Protocol Buffers(protobuf)は、Googleが開発した構造化データをシリアライズするための方法です。

基本的なスキーマ定義例(.proto ファイル):

syntax = "proto3";

package com.example.grpc;

option java_multiple_files = true;
option java_package = "com.example.grpc.generated";
option java_outer_classname = "UserServiceProto";

// ユーザー情報を表すメッセージ
message User {
  int32 id = 1;
  string name = 2;
  string email = 3;
  repeated string phone_numbers = 4;
  enum Status {
    UNKNOWN = 0;
    ACTIVE = 1;
    INACTIVE = 2;
  }
  Status status = 5;
  google.protobuf.Timestamp created_at = 6;
}

// ページング情報
message PageInfo {
  int32 page = 1;
  int32 page_size = 2;
}

// リクエストメッセージ
message GetUserRequest {
  int32 user_id = 1;
}

// リスポンスメッセージ
message GetUserResponse {
  User user = 1;
}

// リスト取得リクエスト
message ListUsersRequest {
  PageInfo pagination = 1;
}

// リスト取得レスポンス
message ListUsersResponse {
  repeated User users = 1;
  int32 total_count = 2;
}

// ユーザー作成リクエスト
message CreateUserRequest {
  string name = 1;
  string email = 2;
}

// ユーザー作成レスポンス
message CreateUserResponse {
  User user = 1;
  string message = 2;
}

// サービス定義
service UserService {
  // 単一RPC
  rpc GetUser(GetUserRequest) returns (GetUserResponse);
  
  // サーバーストリーミング
  rpc ListUsers(ListUsersRequest) returns (stream GetUserResponse);
  
  // クライアントストリーミング
  rpc BulkCreateUsers(stream CreateUserRequest) returns (CreateUserResponse);
  
  // 双方向ストリーミング
  rpc SyncUsers(stream User) returns (stream User);
}

2.2 RPC通信パターン

2.2.1 Unary RPC(単一RPC)

クライアントが1つのリクエストを送信し、サーバーが1つのレスポンスを返す最も基本的なパターンです。

Client                Server
  |                    |
  |--- Request ------->|
  |                    |
  |<---- Response -----|
  |                    |

2.2.2 Server Streaming RPC

クライアントが1つのリクエストを送信し、サーバーが複数のレスポンスをストリーミングで返すパターンです。

Client                Server
  |                    |
  |--- Request ------->|
  |                    |
  |<---- Response 1 ----|
  |<---- Response 2 ----|
  |<---- Response 3 ----|
  |                    |

2.2.3 Client Streaming RPC

クライアントが複数のリクエストをストリーミングで送信し、サーバーが1つのレスポンスを返すパターンです。

Client                Server
  |                    |
  |--- Request 1 ----->|
  |--- Request 2 ----->|
  |--- Request 3 ----->|
  |                    |
  |<---- Response -----|
  |                    |

2.2.4 Bidirectional Streaming RPC

クライアントとサーバーが双方向でストリーミング通信をするパターンです。

Client                Server
  |                    |
  |--- Request 1 ----->|
  |<---- Response 1 ----|
  |--- Request 2 ----->|
  |<---- Response 2 ----|
  |                    |

2.3 gRPCのメタデータ

gRPCではHTTP/2ヘッダーを活用したメタデータの送受信が可能です。認証情報やトレーシング情報などを送信できます。

// メタデータの例
// クライアント側で送信
context.withCompression("gzip")
  .withDeadlineAfter(5, TimeUnit.SECONDS)

3. Spring BootとgRPCの統合

3.1 依存関係の設定

build.gradle.kts:

import com.google.protobuf.gradle.id

plugins {
    id("org.springframework.boot") version "3.2.0"
    id("io.spring.dependency-management") version "1.1.4"
    id("com.google.protobuf") version "0.9.4"
    kotlin("jvm") version "1.9.20"
    kotlin("plugin.spring") version "1.9.20"
}

group = "com.example"
version = "1.0.0"

java {
    sourceCompatibility = JavaVersion.VERSION_17
}

repositories {
    mavenCentral()
}

dependencies {
    // Spring Boot
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.springframework.boot:spring-boot-starter-actuator")
    
    // gRPC for Spring Boot
    implementation("net.devh:grpc-server-spring-boot-starter:2.15.0.RELEASE")
    implementation("net.devh:grpc-client-spring-boot-starter:2.15.0.RELEASE")
    
    // Protocol Buffers
    implementation("com.google.protobuf:protobuf-java:3.24.4")
    implementation("com.google.protobuf:protobuf-kotlin:3.24.4")
    
    // Kotlin
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    
    // ロギング
    implementation("org.springframework.boot:spring-boot-starter-logging")
    
    // テスト
    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("io.grpc:grpc-testing:1.59.0")
}

protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:3.24.4"
    }
    plugins {
        id("grpc") {
            artifact = "io.grpc:protoc-gen-grpc-java:1.59.0"
        }
        id("grpckt") {
            artifact = "io.grpc:protoc-gen-grpc-kotlin:1.3.1:jdk8@jar"
        }
    }
    generateProtoTasks {
        all().forEach { task ->
            task.plugins {
                id("grpc")
                id("grpckt")
            }
        }
    }
}

3.2 application.yml 設定

spring:
  application:
    name: grpc-service
  
grpc:
  server:
    port: 9090
    enable-keep-alive: true
    keep-alive-time: 30s
    keep-alive-timeout: 10s
    permit-keep-alive-without-calls: true
    max-inbound-message-size: 4194304  # 4MB
    max-concurrent-streams: 100
    
  client:
    # クライアント設定は個別に指定可能
    user-service:
      address: static://localhost:9090
      enable-keep-alive: true
      keep-alive-time: 30s
      keep-alive-timeout: 10s
      idle-timeout: 300s
      
server:
  port: 8080
  
logging:
  level:
    net.devh.boot.grpc: INFO
    com.example: DEBUG
    io.grpc: DEBUG

4. KotlinでのgRPC実装

4.1 プロジェクト構造

project/
├── src/main/kotlin/
│   └── com/example/grpc/
│       ├── config/                 # gRPC設定
│       ├── service/                # ビジネスロジック
│       ├── controller/             # gRPCサービス実装
│       ├── repository/             # データアクセス層
│       └── GrpcServiceApplication.kt
├── src/main/proto/                 # Protocol Buffer定義
│   └── com/example/grpc/
│       ├── user_service.proto
│       └── common.proto
├── src/test/kotlin/
├── build.gradle.kts
└── application.yml

4.2 Protocol Buffer ファイルの構成

src/main/proto/com/example/grpc/common.proto:

syntax = "proto3";

package com.example.grpc.common;

option java_multiple_files = true;
option java_package = "com.example.grpc.common";

import "google/protobuf/timestamp.proto";

message ErrorDetail {
  string code = 1;
  string message = 2;
  string details = 3;
}

message PageRequest {
  int32 page = 1;
  int32 size = 2;
  string sort_by = 3;
}

message PageResponse {
  int32 current_page = 1;
  int32 total_pages = 2;
  int32 total_elements = 3;
  int32 elements_in_page = 4;
}

src/main/proto/com/example/grpc/user_service.proto:

syntax = "proto3";

package com.example.grpc.user;

option java_multiple_files = true;
option java_package = "com.example.grpc.user";
option java_outer_classname = "UserServiceProto";

import "google/protobuf/timestamp.proto";
import "common.proto";

message User {
  int64 id = 1;
  string username = 2;
  string email = 3;
  string full_name = 4;
  repeated string tags = 5;
  bool active = 6;
  google.protobuf.Timestamp created_at = 7;
  google.protobuf.Timestamp updated_at = 8;
}

message GetUserRequest {
  int64 user_id = 1;
}

message CreateUserRequest {
  string username = 1;
  string email = 2;
  string full_name = 3;
  repeated string tags = 4;
}

message UpdateUserRequest {
  int64 user_id = 1;
  string email = 2;
  string full_name = 3;
}

message DeleteUserRequest {
  int64 user_id = 1;
}

message ListUsersRequest {
  common.PageRequest pagination = 1;
}

message ListUsersResponse {
  repeated User users = 1;
  common.PageResponse page = 2;
}

message BatchCreateUsersRequest {
  repeated CreateUserRequest users = 1;
}

message BatchCreateUsersResponse {
  repeated User created_users = 1;
  repeated common.ErrorDetail errors = 2;
}

service UserService {
  rpc GetUser(GetUserRequest) returns (User);
  rpc CreateUser(CreateUserRequest) returns (User);
  rpc UpdateUser(UpdateUserRequest) returns (User);
  rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty);
  rpc ListUsers(ListUsersRequest) returns (stream User);
  rpc BatchCreateUsers(stream CreateUserRequest) returns (BatchCreateUsersResponse);
  rpc SyncUsers(stream User) returns (stream User);
}

4.3 gRPC サービスの実装

com/example/grpc/entity/User.kt:

package com.example.grpc.entity

import java.time.LocalDateTime
import jakarta.persistence.*

@Entity
@Table(name = "users")
class User(
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    val id: Long = 0,
    
    @Column(unique = true, nullable = false)
    var username: String,
    
    @Column(unique = true, nullable = false)
    var email: String,
    
    @Column(nullable = false)
    var fullName: String,
    
    @ElementCollection
    @CollectionTable(name = "user_tags", joinColumns = [JoinColumn(name = "user_id")])
    @Column(name = "tag")
    var tags: MutableSet<String> = mutableSetOf(),
    
    @Column(nullable = false)
    var active: Boolean = true,
    
    @Column(nullable = false, updatable = false)
    val createdAt: LocalDateTime = LocalDateTime.now(),
    
    @Column(nullable = false)
    var updatedAt: LocalDateTime = LocalDateTime.now()
)

com/example/grpc/repository/UserRepository.kt:

package com.example.grpc.repository

import com.example.grpc.entity.User
import org.springframework.data.domain.Page
import org.springframework.data.domain.Pageable
import org.springframework.data.jpa.repository.JpaRepository
import org.springframework.stereotype.Repository

@Repository
interface UserRepository : JpaRepository<User, Long> {
    fun findByUsername(username: String): User?
    fun findByEmail(email: String): User?
    fun findByActiveTrue(pageable: Pageable): Page<User>
}

com/example/grpc/service/UserService.kt:

package com.example.grpc.service

import com.example.grpc.entity.User
import com.example.grpc.repository.UserRepository
import org.springframework.data.domain.Pageable
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.time.LocalDateTime

@Service
@Transactional
class UserService(
    private val userRepository: UserRepository
) {
    
    fun getUserById(id: Long): User? = userRepository.findById(id).orElse(null)
    
    fun createUser(username: String, email: String, fullName: String, tags: List<String> = emptyList()): User {
        val user = User(
            username = username,
            email = email,
            fullName = fullName,
            tags = tags.toMutableSet()
        )
        return userRepository.save(user)
    }
    
    fun updateUser(id: Long, email: String?, fullName: String?): User? {
        val user = userRepository.findById(id).orElse(null) ?: return null
        email?.let { user.email = it }
        fullName?.let { user.fullName = it }
        user.updatedAt = LocalDateTime.now()
        return userRepository.save(user)
    }
    
    fun deleteUser(id: Long) {
        userRepository.deleteById(id)
    }
    
    fun listActiveUsers(pageable: Pageable): List<User> {
        return userRepository.findByActiveTrue(pageable).content
    }
    
    fun getAllUsers(pageable: Pageable): List<User> {
        return userRepository.findAll(pageable).content
    }
}

com/example/grpc/controller/UserServiceGrpcImpl.kt:

package com.example.grpc.controller

import com.example.grpc.service.UserService
import com.example.grpc.user.*
import io.grpc.stub.StreamObserver
import net.devh.boot.grpc.server.service.GrpcService
import org.springframework.data.domain.PageRequest
import java.time.Instant
import com.google.protobuf.Empty
import com.google.protobuf.Timestamp as ProtoTimestamp

@GrpcService
class UserServiceGrpcImpl(
    private val userService: UserService
) : UserServiceGrpc.UserServiceImplBase() {
    
    // Unary RPC: 単一ユーザーの取得
    override fun getUser(request: GetUserRequest, responseObserver: StreamObserver<User>) {
        try {
            val user = userService.getUserById(request.userId)
            if (user != null) {
                responseObserver.onNext(user.toProto())
                responseObserver.onCompleted()
            } else {
                responseObserver.onError(
                    io.grpc.Status.NOT_FOUND
                        .withDescription("User not found with ID: ${request.userId}")
                        .asException()
                )
            }
        } catch (e: Exception) {
            responseObserver.onError(
                io.grpc.Status.INTERNAL
                    .withDescription("Error fetching user: ${e.message}")
                    .asException()
            )
        }
    }
    
    // Unary RPC: ユーザー作成
    override fun createUser(request: CreateUserRequest, responseObserver: StreamObserver<User>) {
        try {
            val user = userService.createUser(
                username = request.username,
                email = request.email,
                fullName = request.fullName,
                tags = request.tagsList
            )
            responseObserver.onNext(user.toProto())
            responseObserver.onCompleted()
        } catch (e: Exception) {
            responseObserver.onError(
                io.grpc.Status.INVALID_ARGUMENT
                    .withDescription("Error creating user: ${e.message}")
                    .asException()
            )
        }
    }
    
    // Unary RPC: ユーザー更新
    override fun updateUser(request: UpdateUserRequest, responseObserver: StreamObserver<User>) {
        try {
            val user = userService.updateUser(
                id = request.userId,
                email = request.email.takeIf { it.isNotEmpty() },
                fullName = request.fullName.takeIf { it.isNotEmpty() }
            )
            if (user != null) {
                responseObserver.onNext(user.toProto())
                responseObserver.onCompleted()
            } else {
                responseObserver.onError(
                    io.grpc.Status.NOT_FOUND
                        .withDescription("User not found with ID: ${request.userId}")
                        .asException()
                )
            }
        } catch (e: Exception) {
            responseObserver.onError(
                io.grpc.Status.INTERNAL
                    .withDescription("Error updating user: ${e.message}")
                    .asException()
            )
        }
    }
    
    // Unary RPC: ユーザー削除
    override fun deleteUser(request: DeleteUserRequest, responseObserver: StreamObserver<Empty>) {
        try {
            userService.deleteUser(request.userId)
            responseObserver.onNext(Empty.getDefaultInstance())
            responseObserver.onCompleted()
        } catch (e: Exception) {
            responseObserver.onError(
                io.grpc.Status.INTERNAL
                    .withDescription("Error deleting user: ${e.message}")
                    .asException()
            )
        }
    }
    
    // Server Streaming RPC: ユーザー一覧の取得
    override fun listUsers(request: ListUsersRequest, responseObserver: StreamObserver<User>) {
        try {
            val pageRequest = PageRequest.of(
                request.pagination.page.coerceAtLeast(1) - 1,
                request.pagination.size.coerceIn(1, 100)
            )
            val users = userService.listActiveUsers(pageRequest)
            
            users.forEach { user ->
                responseObserver.onNext(user.toProto())
            }
            responseObserver.onCompleted()
        } catch (e: Exception) {
            responseObserver.onError(
                io.grpc.Status.INTERNAL
                    .withDescription("Error listing users: ${e.message}")
                    .asException()
            )
        }
    }
    
    // Client Streaming RPC: バッチユーザー作成
    override fun batchCreateUsers(responseObserver: StreamObserver<BatchCreateUsersResponse>): StreamObserver<CreateUserRequest> {
        return object : StreamObserver<CreateUserRequest> {
            private val createdUsers = mutableListOf<User>()
            private val errors = mutableListOf<com.example.grpc.common.ErrorDetail>()
            
            override fun onNext(request: CreateUserRequest) {
                try {
                    val user = userService.createUser(
                        username = request.username,
                        email = request.email,
                        fullName = request.fullName,
                        tags = request.tagsList
                    )
                    createdUsers.add(user)
                } catch (e: Exception) {
                    errors.add(
                        com.example.grpc.common.ErrorDetail.newBuilder()
                            .setCode("CREATE_ERROR")
                            .setMessage("Failed to create user: ${request.username}")
                            .setDetails(e.message ?: "Unknown error")
                            .build()
                    )
                }
            }
            
            override fun onError(t: Throwable) {
                responseObserver.onError(
                    io.grpc.Status.INTERNAL
                        .withDescription("Streaming error: ${t.message}")
                        .asException()
                )
            }
            
            override fun onCompleted() {
                val response = BatchCreateUsersResponse.newBuilder()
                    .addAllCreatedUsers(createdUsers.map { it.toProto() })
                    .addAllErrors(errors)
                    .build()
                responseObserver.onNext(response)
                responseObserver.onCompleted()
            }
        }
    }
    
    // Bidirectional Streaming RPC: ユーザー同期
    override fun syncUsers(responseObserver: StreamObserver<User>): StreamObserver<User> {
        return object : StreamObserver<User> {
            override fun onNext(request: User) {
                try {
                    // Protocol Bufferのメッセージをドメインエンティティに変換
                    val user = if (request.id > 0) {
                        userService.updateUser(
                            id = request.id,
                            email = request.email.takeIf { it.isNotEmpty() },
                            fullName = request.fullName.takeIf { it.isNotEmpty() }
                        )
                    } else {
                        userService.createUser(
                            username = request.username,
                            email = request.email,
                            fullName = request.fullName,
                            tags = request.tagsList
                        )
                    }
                    
                    if (user != null) {
                        responseObserver.onNext(user.toProto())
                    }
                } catch (e: Exception) {
                    responseObserver.onError(
                        io.grpc.Status.INTERNAL
                            .withDescription("Sync error: ${e.message}")
                            .asException()
                    )
                }
            }
            
            override fun onError(t: Throwable) {
                responseObserver.onError(
                    io.grpc.Status.INTERNAL
                        .withDescription("Streaming error: ${t.message}")
                        .asException()
                )
            }
            
            override fun onCompleted() {
                responseObserver.onCompleted()
            }
        }
    }
    
    // ドメインエンティティをProtocol Bufferメッセージに変換
    private fun com.example.grpc.entity.User.toProto(): User {
        return User.newBuilder()
            .setId(this.id)
            .setUsername(this.username)
            .setEmail(this.email)
            .setFullName(this.fullName)
            .addAllTags(this.tags)
            .setActive(this.active)
            .setCreatedAt(this.createdAt.toProtoTimestamp())
            .setUpdatedAt(this.updatedAt.toProtoTimestamp())
            .build()
    }
    
    // LocalDateTimeをProto Timestampに変換
    private fun java.time.LocalDateTime.toProtoTimestamp(): ProtoTimestamp {
        val instant = this.atZone(java.time.ZoneId.systemDefault()).toInstant()
        return ProtoTimestamp.newBuilder()
            .setSeconds(instant.epochSecond)
            .setNanos(instant.nano)
            .build()
    }
}

4.4 gRPC クライアントの実装

com/example/grpc/client/UserServiceClient.kt:

package com.example.grpc.client

import com.example.grpc.user.*
import io.grpc.Channel
import io.grpc.ManagedChannelBuilder
import io.grpc.stub.StreamObserver
import kotlinx.coroutines.*
import org.slf4j.LoggerFactory
import java.util.concurrent.TimeUnit

class UserServiceClient(
    private val channel: Channel
) {
    private val stub: UserServiceGrpc.UserServiceStub = UserServiceGrpc.newStub(channel)
    private val blockingStub: UserServiceGrpc.UserServiceBlockingStub = UserServiceGrpc.newBlockingStub(channel)
    private val logger = LoggerFactory.getLogger(javaClass)
    
    // Unary RPC呼び出し(ブロッキング)
    fun getUser(userId: Long): User? {
        return try {
            val request = GetUserRequest.newBuilder().setUserId(userId).build()
            blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).getUser(request)
        } catch (e: Exception) {
            logger.error("Error getting user: ${e.message}", e)
            null
        }
    }
    
    // Unary RPC呼び出し(非ブロッキング)
    suspend fun getUserAsync(userId: Long): User? = withContext(Dispatchers.IO) {
        suspendCancellableCoroutine { continuation ->
            val request = GetUserRequest.newBuilder().setUserId(userId).build()
            stub.getUser(request, object : StreamObserver<User> {
                override fun onNext(value: User) {
                    continuation.resume(value)
                }
                
                override fun onError(t: Throwable) {
                    continuation.resumeWithException(t)
                }
                
                override fun onCompleted() {}
            })
        }
    }
    
    // Server Streaming RPC呼び出し
    fun listUsers(page: Int = 1, pageSize: Int = 10): List<User> {
        val users = mutableListOf<User>()
        val countDownLatch = java.util.concurrent.CountDownLatch(1)
        
        val request = ListUsersRequest.newBuilder()
            .setPagination(
                com.example.grpc.common.PageRequest.newBuilder()
                    .setPage(page)
                    .setSize(pageSize)
                    .build()
            )
            .build()
        
        stub.listUsers(request, object : StreamObserver<User> {
            override fun onNext(value: User) {
                users.add(value)
            }
            
            override fun onError(t: Throwable) {
                logger.error("Error listing users: ${t.message}", t)
                countDownLatch.countDown()
            }
            
            override fun onCompleted() {
                countDownLatch.countDown()
            }
        })
        
        countDownLatch.await(10, TimeUnit.SECONDS)
        return users
    }
    
    // Client Streaming RPC呼び出し
    fun batchCreateUsers(requests: List<CreateUserRequest>): BatchCreateUsersResponse? {
        val countDownLatch = java.util.concurrent.CountDownLatch(1)
        var response: BatchCreateUsersResponse? = null
        
        val requestObserver = stub.batchCreateUsers(object : StreamObserver<BatchCreateUsersResponse> {
            override fun onNext(value: BatchCreateUsersResponse) {
                response = value
            }
            
            override fun onError(t: Throwable) {
                logger.error("Error in batch create: ${t.message}", t)
                countDownLatch.countDown()
            }
            
            override fun onCompleted() {
                countDownLatch.countDown()
            }
        })
        
        try {
            requests.forEach { request ->
                requestObserver.onNext(request)
            }
            requestObserver.onCompleted()
        } catch (e: Exception) {
            requestObserver.onError(e)
        }
        
        countDownLatch.await(30, TimeUnit.SECONDS)
        return response
    }
    
    // Bidirectional Streaming RPC呼び出し
    fun syncUsers(users: List<User>): List<User> {
        val syncedUsers = mutableListOf<User>()
        val countDownLatch = java.util.concurrent.CountDownLatch(1)
        
        val requestObserver = stub.syncUsers(object : StreamObserver<User> {
            override fun onNext(value: User) {
                syncedUsers.add(value)
            }
            
            override fun onError(t: Throwable) {
                logger.error("Error in sync: ${t.message}", t)
                countDownLatch.countDown()
            }
            
            override fun onCompleted() {
                countDownLatch.countDown()
            }
        })
        
        try {
            users.forEach { user ->
                requestObserver.onNext(user)
            }
            requestObserver.onCompleted()
        } catch (e: Exception) {
            requestObserver.onError(e)
        }
        
        countDownLatch.await(30, TimeUnit.SECONDS)
        return syncedUsers
    }
    
    fun shutdown() {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS)
    }
}

// クライアント使用例
suspend fun main() {
    val channel = ManagedChannelBuilder.forAddress("localhost", 9090)
        .usePlaintext()
        .build()
    
    val client = UserServiceClient(channel)
    
    try {
        // Unary RPC
        val user = client.getUserAsync(1)
        println("User: $user")
        
        // Server Streaming
        val users = client.listUsers()
        println("Users: $users")
        
        // Client Streaming
        val createRequests = listOf(
            CreateUserRequest.newBuilder().setUsername("user1").setEmail("user1@example.com").setFullName("User One").build(),
            CreateUserRequest.newBuilder().setUsername("user2").setEmail("user2@example.com").setFullName("User Two").build()
        )
        val batchResponse = client.batchCreateUsers(createRequests)
        println("Batch Response: $batchResponse")
    } finally {
        client.shutdown()
    }
}

4.5 Spring Boot統合クライアント

com/example/grpc/config/GrpcClientConfig.kt:

package com.example.grpc.config

import io.grpc.ManagedChannel
import net.devh.boot.grpc.client.inject.GrpcClient
import org.springframework.context.annotation.Configuration

@Configuration
class GrpcClientConfig {
    
    // @GrpcClient アノテーションを使用してクライアントを注入
    // application.ymlで設定された接続情報を自動的に使用
}

com/example/grpc/component/UserServiceConsumer.kt:

package com.example.grpc.component

import com.example.grpc.user.*
import net.devh.boot.grpc.client.inject.GrpcClient
import org.springframework.stereotype.Component
import org.slf4j.LoggerFactory

@Component
class UserServiceConsumer {
    
    @GrpcClient("user-service")
    private lateinit var userServiceStub: UserServiceGrpc.UserServiceBlockingStub
    
    private val logger = LoggerFactory.getLogger(javaClass)
    
    fun fetchRemoteUser(userId: Long): User? {
        return try {
            val request = GetUserRequest.newBuilder().setUserId(userId).build()
            userServiceStub.getUser(request)
        } catch (e: Exception) {
            logger.error("Error fetching remote user: ${e.message}", e)
            null
        }
    }
}

5. アーキテクチャ設計

5.1 マイクロサービスアーキテクチャ

gRPCはマイクロサービス間通信に最適化されています。

┌─────────────────────────────────────────────────────────┐
│                   API Gateway                            │
│              (gRPC or REST/HTTP)                         │
└───────────────────┬─────────────────────────────────────┘
                    │
        ┌───────────┼───────────┐
        │           │           │
        v           v           v
    ┌─────────┐ ┌─────────┐ ┌─────────┐
    │User     │ │Order    │ │Payment  │
    │Service  │ │Service  │ │Service  │
    └─────────┘ └─────────┘ └─────────┘
        │           │           │
        └───────────┼───────────┘
                    │
            ┌───────┴───────┐
            │               │
        ┌─────────┐   ┌─────────┐
        │Database │   │Cache    │
        └─────────┘   └─────────┘

5.2 サービスディスカバリー

com/example/grpc/config/ServiceDiscoveryConfig.kt:

package com.example.grpc.config

import org.springframework.cloud.client.discovery.EnableDiscoveryClient
import org.springframework.context.annotation.Configuration

@Configuration
@EnableDiscoveryClient
class ServiceDiscoveryConfig

application.ymlでのサービスディスカバリー設定:

spring:
  cloud:
    consul:
      host: localhost
      port: 8500
      discovery:
        service-name: grpc-user-service
        port: 9090
        prefer-ip-address: true
        
grpc:
  client:
    # Consulから自動的にサービス情報を取得
    user-service:
      address: consul://localhost:8500
      enable-keep-alive: true

5.3 負荷分散

gRPCの負荷分散は複数の戦略をサポートしています:

grpc:
  client:
    user-service:
      address: static://server1:9090,static://server2:9090,static://server3:9090
      load-balancing-policy: round_robin  # round_robin, pick_first, grpclb

6. 実装例とベストプラクティス

6.1 エラーハンドリング

package com.example.grpc.exception

import io.grpc.Status
import io.grpc.StatusRuntimeException

// カスタム例外
sealed class GrpcException(message: String) : Exception(message) {
    class UserNotFoundException(userId: Long) : GrpcException("User not found: $userId")
    class UserAlreadyExistsException(username: String) : GrpcException("User already exists: $username")
    class ValidationException(message: String) : GrpcException(message)
    class InternalServerException(message: String) : GrpcException(message)
}

// 例外マッピング
fun Throwable.toGrpcStatus(): Status = when (this) {
    is GrpcException.UserNotFoundException -> Status.NOT_FOUND.withDescription(this.message)
    is GrpcException.UserAlreadyExistsException -> Status.ALREADY_EXISTS.withDescription(this.message)
    is GrpcException.ValidationException -> Status.INVALID_ARGUMENT.withDescription(this.message)
    is GrpcException.InternalServerException -> Status.INTERNAL.withDescription(this.message)
    else -> Status.UNKNOWN.withDescription(this.message)
}

// インターセプタでエラーハンドリング
override fun onError(t: Throwable) {
    val status = t.toGrpcStatus()
    responseObserver.onError(status.asException())
}

6.2 インターセプタとミドルウェア

com/example/grpc/interceptor/LoggingInterceptor.kt:

package com.example.grpc.interceptor

import io.grpc.*
import org.slf4j.LoggerFactory

class LoggingInterceptor : ServerInterceptor {
    private val logger = LoggerFactory.getLogger(javaClass)
    
    override fun <ReqT, RespT> interceptCall(
        call: ServerCall<ReqT, RespT>,
        headers: Metadata,
        next: ServerCallHandler<ReqT, RespT>
    ): ServerCall.Listener<ReqT> {
        logger.info("Method: ${call.methodDescriptor.fullMethodName}")
        logger.info("Headers: $headers")
        
        return object : ServerCall.Listener<ReqT>() {
            override fun onMessage(message: ReqT) {
                logger.debug("Request: $message")
                super.onMessage(message)
            }
            
            override fun onHalfClose() {
                logger.debug("Half close")
                super.onHalfClose()
            }
        }.run {
            next.startCall(
                object : ServerCall<ReqT, RespT>() {
                    override fun request(numMessages: Int) {
                        call.request(numMessages)
                    }
                    
                    override fun sendHeaders(headers: Metadata) {
                        call.sendHeaders(headers)
                    }
                    
                    override fun sendMessage(message: RespT) {
                        logger.debug("Response: $message")
                        call.sendMessage(message)
                    }
                    
                    override fun close(status: Status, trailers: Metadata) {
                        logger.info("Status: $status")
                        call.close(status, trailers)
                    }
                    
                    override fun isCancelled(): Boolean = call.isCancelled
                    
                    override fun getMethodDescriptor(): MethodDescriptor<ReqT, RespT> = call.methodDescriptor
                },
                headers
            )
        }
    }
}

// インターセプタの登録
@Configuration
class GrpcServerConfiguration {
    
    @Bean
    fun loggingInterceptor(): LoggingInterceptor = LoggingInterceptor()
}

com/example/grpc/interceptor/AuthenticationInterceptor.kt:

package com.example.grpc.interceptor

import io.grpc.*
import org.slf4j.LoggerFactory

class AuthenticationInterceptor : ServerInterceptor {
    private val logger = LoggerFactory.getLogger(javaClass)
    
    companion object {
        private val AUTHORIZATION_METADATA_KEY = Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER)
    }
    
    override fun <ReqT, RespT> interceptCall(
        call: ServerCall<ReqT, RespT>,
        headers: Metadata,
        next: ServerCallHandler<ReqT, RespT>
    ): ServerCall.Listener<ReqT> {
        val authToken = headers.get(AUTHORIZATION_METADATA_KEY)
        
        if (authToken.isNullOrEmpty()) {
            val status = Status.UNAUTHENTICATED.withDescription("Missing authorization token")
            call.close(status, Metadata())
            return object : ServerCall.Listener<ReqT>() {}
        }
        
        // トークンの検証
        if (!validateToken(authToken)) {
            val status = Status.PERMISSION_DENIED.withDescription("Invalid token")
            call.close(status, Metadata())
            return object : ServerCall.Listener<ReqT>() {}
        }
        
        logger.debug("Authentication successful for token: $authToken")
        return next.startCall(call, headers)
    }
    
    private fun validateToken(token: String): Boolean {
        // トークン検証ロジック
        return token.startsWith("Bearer ")
    }
}

6.3 テスト

com/example/grpc/controller/UserServiceGrpcImplTest.kt:

package com.example.grpc.controller

import com.example.grpc.entity.User as UserEntity
import com.example.grpc.service.UserService
import com.example.grpc.user.*
import io.grpc.testing.GrpcCleanupRule
import io.grpc.inprocess.InProcessChannelBuilder
import io.grpc.inprocess.InProcessServerBuilder
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.mockito.Mockito
import java.time.LocalDateTime
import kotlin.test.assertEquals
import kotlin.test.assertNotNull

class UserServiceGrpcImplTest {
    
    @get:Rule
    val grpcCleanup = GrpcCleanupRule()
    
    private lateinit var userService: UserService
    private lateinit var stub: UserServiceGrpc.UserServiceBlockingStub
    
    @Before
    fun setUp() {
        userService = Mockito.mock(UserService::class.java)
        
        val serviceImpl = UserServiceGrpcImpl(userService)
        
        val serverName = InProcessServerBuilder.generateName()
        grpcCleanup.register(
            InProcessServerBuilder(serverName)
                .addService(serviceImpl)
                .directExecutor()
                .build()
                .start()
        )
        
        val channel = grpcCleanup.register(
            InProcessChannelBuilder.forName(serverName)
                .directExecutor()
                .build()
        )
        
        stub = UserServiceGrpc.newBlockingStub(channel)
    }
    
    @Test
    fun testGetUserSuccess() {
        val mockUser = UserEntity(
            id = 1L,
            username = "testuser",
            email = "test@example.com",
            fullName = "Test User",
            tags = mutableSetOf("tag1"),
            active = true,
            createdAt = LocalDateTime.now(),
            updatedAt = LocalDateTime.now()
        )
        
        Mockito.`when`(userService.getUserById(1L)).thenReturn(mockUser)
        
        val request = GetUserRequest.newBuilder().setUserId(1L).build()
        val response = stub.getUser(request)
        
        assertNotNull(response)
        assertEquals("testuser", response.username)
        assertEquals("test@example.com", response.email)
    }
    
    @Test
    fun testCreateUserSuccess() {
        val mockUser = UserEntity(
            id = 1L,
            username = "newuser",
            email = "new@example.com",
            fullName = "New User",
            tags = mutableSetOf()
        )
        
        Mockito.`when`(userService.createUser("newuser", "new@example.com", "New User", emptyList()))
            .thenReturn(mockUser)
        
        val request = CreateUserRequest.newBuilder()
            .setUsername("newuser")
            .setEmail("new@example.com")
            .setFullName("New User")
            .build()
        
        val response = stub.createUser(request)
        
        assertNotNull(response)
        assertEquals("newuser", response.username)
    }
}

7. パフォーマンス最適化

7.1 接続管理

grpc:
  server:
    # スレッドプール設定
    max-concurrent-streams: 100
    
    # Keep-Alive設定
    enable-keep-alive: true
    keep-alive-time: 30s
    keep-alive-timeout: 10s
    permit-keep-alive-without-calls: true
    
    # バッファ設定
    max-inbound-message-size: 4194304  # 4MB
    max-header-list-size: 16384  # 16KB
    
  client:
    keep-alive-time: 30s
    keep-alive-timeout: 10s
    idle-timeout: 300s
    max-retry-attempts: 3

7.2 圧縮

// サーバー側
override fun getUser(request: GetUserRequest, responseObserver: StreamObserver<User>) {
    // 圧縮を適用
    responseObserver.setCompression("gzip")
    // ...
}

// クライアント側
val request = GetUserRequest.newBuilder().setUserId(1L).build()
val call = stub.withCompression("gzip").getUser(request)

7.3 バッチ処理

// バッチ処理の効率的な実装
override fun batchCreateUsers(responseObserver: StreamObserver<BatchCreateUsersResponse>): StreamObserver<CreateUserRequest> {
    return object : StreamObserver<CreateUserRequest> {
        private val batch = mutableListOf<CreateUserRequest>()
        private val BATCH_SIZE = 100
        
        override fun onNext(request: CreateUserRequest) {
            batch.add(request)
            if (batch.size >= BATCH_SIZE) {
                processBatch()
            }
        }
        
        override fun onCompleted() {
            if (batch.isNotEmpty()) {
                processBatch()
            }
            responseObserver.onCompleted()
        }
        
        private fun processBatch() {
            // バッチ処理ロジック
            batch.clear()
        }
        
        override fun onError(t: Throwable) {
            responseObserver.onError(t)
        }
    }
}

7.4 モニタリングとメトリクス

com/example/grpc/config/MetricsConfig.kt:

package com.example.grpc.config

import io.grpc.health.v1.HealthCheckResponse
import io.micrometer.core.instrument.MeterRegistry
import org.springframework.boot.actuate.health.Health
import org.springframework.boot.actuate.health.HealthIndicator
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
class MetricsConfig {
    
    @Bean
    fun grpcHealthIndicator(): HealthIndicator {
        return HealthIndicator {
            try {
                // gRPCサービスのヘルスチェック
                Health.up()
                    .withDetail("grpc.port", 9090)
                    .withDetail("grpc.status", "running")
                    .build()
            } catch (e: Exception) {
                Health.down()
                    .withDetail("error", e.message)
                    .build()
            }
        }
    }
}

application.ymlでのメトリクス設定:

management:
  endpoints:
    web:
      exposure:
        include: health,metrics,prometheus
  metrics:
    export:
      prometheus:
        enabled: true
    tags:
      application: grpc-user-service

8. トラブルシューティングとデバッグ

8.1 一般的な問題と解決策

問題1: Connection refused

io.grpc.StatusRuntimeException: UNAVAILABLE: Unable to resolve host

解決策:

  • サーバーが起動しているか確認
  • ポート番号が正しいか確認
  • ファイアウォール設定を確認
# サーバーの起動確認
lsof -i :9090

# ネットワーク接続の確認
netstat -an | grep 9090

問題2: Deadline exceeded

io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED

解決策:

  • タイムアウト時間を延長
  • バックエンド処理を最適化
stub.withDeadlineAfter(30, TimeUnit.SECONDS).getUser(request)

問題3: Resource exhausted

io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED

解決策:

  • バッファサイズを増やす
  • コネクションプーリングを確認
grpc:
  server:
    max-inbound-message-size: 8388608  # 8MB
    max-concurrent-streams: 200

8.2 デバッグ設定

logging:
  level:
    io.grpc: DEBUG
    net.devh.boot.grpc: DEBUG
    com.example.grpc: DEBUG

8.3 gRPC CLI ツール

# gRPC serviceの確認
grpcurl -plaintext localhost:9090 list

# RPC呼び出し
grpcurl -plaintext \
  -d '{"user_id": 1}' \
  localhost:9090 com.example.grpc.user.UserService.GetUser

# streaming RPC
grpcurl -plaintext \
  -d '{"pagination": {"page": 1, "size": 10}}' \
  localhost:9090 com.example.grpc.user.UserService.ListUsers

まとめ

gRPC for Spring Boot with Kotlinは、高性能で信頼性の高いマイクロサービス通信を実現するための強力なソリューションです。

主なポイント

  1. Protocol Buffersによる型安全性: スキーマファーストなアプローチにより、クライアント・サーバー間の契約を明確に定義

  2. HTTP/2による高速通信: バイナリプロトコルと多重化により、RESTful APIと比べて高速で効率的な通信

  3. ストリーミング対応: 単一RPC、サーバーストリーミング、クライアントストリーミング、双方向ストリーミングの4つのパターンをサポート

  4. Spring Bootの統合: 既存のSpring Bootエコシステムとシームレスに統合され、習慣的な開発が可能

  5. Kotlinの簡潔性: Null安全性と関数型プログラミング機能により、堅牢で保守性の高いコードが実現

次のステップ

  • トレーシング: Jaegerなどの分散トレーシングツールを導入
  • セキュリティ: TLS/SSL通信とトークンベース認証の実装
  • スケーリング: Kubernetes環境でのデプロイと管理
  • テスト: 統合テストとパフォーマンステストの充実

gRPCは、現代的なマイクロサービスアーキテクチャにおいて、RESTful APIの代替手段として、またはそれ以上の価値を提供します。Spring BootとKotlinの組み合わせにより、開発者は生産性を損なわずに、高性能なシステムを構築できます。