Linux Kernel Inter Process Communication
Linux Kernel プロセス間通信 (IPC) 完全ガイド
目次
- はじめに
- IPC メカニズムの全体像
- パイプと FIFO (名前付きパイプ)
- System V IPC
- POSIX IPC
- Unix ドメインソケット
- シグナル
- eventfd, timerfd, signalfd
- メモリマップドファイル (mmap による IPC)
- D-Bus と kdbus の歴史
- Futex (高速ユーザ空間ミューテックス)
- io_uring による IPC
- Binder (Android IPC)
- Netlink ソケット
- proc ファイルシステムと IPC 情報
- ipcs / ipcrm ツール
- IPC メカニズムのパフォーマンス比較
- セキュリティ考慮事項
- トラブルシューティング
- まとめとベストプラクティス
1. はじめに
1.1 プロセス間通信とは
プロセス間通信 (Inter-Process Communication, IPC) は、オペレーティングシステム上で実行される複数のプロセスがデータを交換し、動作を同期するためのメカニズムの総称である。Linux カーネルは、歴史的な Unix の遺産から最新の高性能メカニズムまで、多様な IPC 手法を提供している。
各プロセスは独立したアドレス空間を持つため、あるプロセスのメモリに別のプロセスから直接アクセスすることはできない。この保護機構はセキュリティと安定性を確保する一方で、プロセス間でのデータ共有には何らかの仲介メカニズムが必要となる。それが IPC である。
1.2 なぜ IPC を深く理解する必要があるのか
現代のソフトウェアアーキテクチャでは、マイクロサービス、コンテナ、サーバレスなど、複数のプロセスが協調して動作する設計が主流である。IPC メカニズムの選択はアプリケーションの性能、スケーラビリティ、信頼性に直接影響する。
以下のようなシナリオで IPC の知識が不可欠となる:
- 高性能サーバ設計: Web サーバ、データベースサーバにおけるプロセス/スレッド間の効率的なデータ共有
- リアルタイムシステム: 制御系システムにおける低レイテンシ通信
- コンテナ技術: Docker / Kubernetes における名前空間分離と IPC
- デバッグ・障害解析: プロセスハングやデッドロックの原因特定
- セキュリティ: IPC メカニズムを介した情報漏洩の防止
1.3 Linux カーネルにおける IPC の歴史
年表:
1970年代 - Unix V6/V7: パイプの導入 (Ken Thompson)
1983年 - BSD 4.2: ソケットの導入
1987年 - System V IPC (メッセージキュー、セマフォ、共有メモリ)
1993年 - POSIX IPC 標準化 (IEEE Std 1003.1b)
2002年 - Linux 2.6: futex の導入
2005年 - Linux 2.6.14: inotify
2007年 - Linux 2.6.22: eventfd, signalfd, timerfd
2009年 - Linux 2.6.29: perf_event_open
2014年 - kdbus 提案 (最終的に却下)
2019年 - Linux 5.1: io_uring の導入
2020年代 - io_uring の IPC 利用拡大
1.4 本記事の構成
本記事では、各 IPC メカニズムについて以下の観点から解説する:
- 概念と設計思想: メカニズムの基本原理
- カーネル内部実装: 関連するカーネルデータ構造とシステムコール
- 実践的なコード例: C 言語による実装例
- 設定とチューニング: sysctl パラメータ、/proc エントリ
- パフォーマンス特性: レイテンシ、スループット、スケーラビリティ
- ユースケース: 実際のアプリケーションでの使用例
2. IPC メカニズムの全体像
2.1 分類体系
Linux の IPC メカニズムは、以下のように分類できる:
IPC メカニズム
├── データ転送型
│ ├── バイトストリーム
│ │ ├── パイプ (無名パイプ)
│ │ ├── FIFO (名前付きパイプ)
│ │ └── Unix ドメインソケット (SOCK_STREAM)
│ ├── メッセージ型
│ │ ├── System V メッセージキュー
│ │ ├── POSIX メッセージキュー
│ │ ├── Unix ドメインソケット (SOCK_DGRAM)
│ │ └── Netlink ソケット
│ └── 擬似データ転送
│ ├── eventfd
│ ├── signalfd
│ └── timerfd
├── 共有メモリ型
│ ├── System V 共有メモリ
│ ├── POSIX 共有メモリ
│ ├── mmap (ファイルバック/匿名)
│ └── memfd_create
├── 同期プリミティブ
│ ├── System V セマフォ
│ ├── POSIX セマフォ
│ ├── futex
│ └── ファイルロック (flock/fcntl)
├── シグナル
│ ├── 従来のシグナル
│ ├── リアルタイムシグナル
│ └── signalfd
└── 特殊メカニズム
├── Binder (Android)
├── io_uring
├── D-Bus (ユーザ空間)
└── Netlink
2.2 選択基準
| 基準 | パイプ | 共有メモリ | ソケット | メッセージキュー | シグナル |
|---|---|---|---|---|---|
| データ量 | 中 | 大 | 中-大 | 小-中 | 極小 |
| レイテンシ | 低 | 極低 | 中 | 中 | 低 |
| 方向性 | 単方向 | 双方向 | 双方向 | 双方向 | 単方向 |
| 関係性 | 親子 | 任意 | 任意 | 任意 | 任意 |
| 永続性 | なし | カーネル | なし | カーネル | なし |
| ネットワーク | 不可 | 不可 | 可 | 不可 | 不可 |
2.3 カーネルの IPC 名前空間
Linux の名前空間 (namespace) 機能により、IPC リソースをプロセスグループごとに隔離できる:
# IPC 名前空間の確認
ls -la /proc/self/ns/ipc
# lrwxrwxrwx 1 root root 0 Apr 10 09:00 /proc/self/ns/ipc -> 'ipc:[4026531839]'
# 新しい IPC 名前空間でシェルを起動
sudo unshare --ipc /bin/bash
# IPC 名前空間の確認 (番号が変わる)
ls -la /proc/self/ns/ipc
# lrwxrwxrwx 1 root root 0 Apr 10 09:00 /proc/self/ns/ipc -> 'ipc:[4026532XXX]'
# コンテナで IPC 名前空間を共有する例 (Docker)
docker run --ipc=host myimage # ホストの IPC 名前空間を共有
docker run --ipc=container:c1 myimage # 他のコンテナの IPC 名前空間を共有
3. パイプと FIFO (名前付きパイプ)
3.1 無名パイプ (Anonymous Pipe)
3.1.1 基本概念
パイプは Unix で最も古い IPC メカニズムの一つであり、1973年に Ken Thompson によって Unix V3 に導入された。パイプは単方向のバイトストリームであり、一方の端で書き込まれたデータがもう一方の端で読み取られる。
カーネル内部構造:
プロセスA カーネル プロセスB
┌──────────┐ ┌──────────────┐ ┌──────────┐
│ fd[1] │────────>│ パイプバッファ │────────>│ fd[0] │
│ (書込端) │ │ (リングバッファ)│ │ (読取端) │
└──────────┘ │ 16ページ=64KB │ └──────────┘
└──────────────┘
パイプバッファはカーネル内のリングバッファとして実装されており、デフォルトサイズは 16 ページ (通常 64KB) である。
3.1.2 システムコール
#include <unistd.h>
// パイプの作成
int pipe(int pipefd[2]);
// Linux 固有: フラグ付きパイプ作成
#define _GNU_SOURCE
#include <fcntl.h>
#include <unistd.h>
int pipe2(int pipefd[2], int flags);
// flags: O_CLOEXEC, O_NONBLOCK, O_DIRECT (Linux 3.4+)
3.1.3 実践例: プロデューサ-コンシューマパターン
/*
* pipe_producer_consumer.c
* パイプを使ったプロデューサ-コンシューマの実装例
* コンパイル: gcc -o pipe_demo pipe_producer_consumer.c
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/wait.h>
#include <errno.h>
#define BUF_SIZE 4096
#define NUM_MESSAGES 10
int main(void) {
int pipefd[2];
pid_t pid;
/* O_CLOEXEC フラグ付きでパイプを作成 */
if (pipe2(pipefd, O_CLOEXEC) == -1) {
perror("pipe2");
exit(EXIT_FAILURE);
}
pid = fork();
if (pid == -1) {
perror("fork");
exit(EXIT_FAILURE);
}
if (pid == 0) {
/* 子プロセス: コンシューマ (読み取り側) */
close(pipefd[1]); /* 書き込み端を閉じる */
char buf[BUF_SIZE];
ssize_t n;
int count = 0;
while ((n = read(pipefd[0], buf, sizeof(buf) - 1)) > 0) {
buf[n] = '\0';
printf("[Consumer] Received (%zd bytes): %s", n, buf);
count++;
}
if (n == -1) {
perror("read");
exit(EXIT_FAILURE);
}
printf("[Consumer] Total messages received: %d\n", count);
close(pipefd[0]);
exit(EXIT_SUCCESS);
} else {
/* 親プロセス: プロデューサ (書き込み側) */
close(pipefd[0]); /* 読み取り端を閉じる */
for (int i = 0; i < NUM_MESSAGES; i++) {
char msg[256];
snprintf(msg, sizeof(msg), "Message #%d from producer (PID=%d)\n",
i + 1, getpid());
ssize_t written = write(pipefd[1], msg, strlen(msg));
if (written == -1) {
perror("write");
break;
}
printf("[Producer] Sent (%zd bytes): %s", written, msg);
usleep(100000); /* 100ms 待機 */
}
close(pipefd[1]); /* 書き込み端を閉じる -> 子プロセスで EOF */
wait(NULL); /* 子プロセスの終了を待つ */
}
return 0;
}
3.1.4 パイプバッファサイズの調整
# 現在のパイプバッファサイズの確認 (デフォルト: 65536 = 64KB)
cat /proc/sys/fs/pipe-max-size
# 1048576 (1MB)
# パイプの最大サイズを変更
sudo sysctl -w fs.pipe-max-size=2097152
# プログラム内でパイプバッファサイズを変更
# fcntl(fd, F_SETPIPE_SZ, size) を使用
/*
* パイプバッファサイズの動的変更
*/
#include <fcntl.h>
#include <stdio.h>
#include <unistd.h>
int main(void) {
int pipefd[2];
pipe(pipefd);
/* 現在のパイプサイズを取得 */
int current_size = fcntl(pipefd[0], F_GETPIPE_SZ);
printf("Current pipe size: %d bytes\n", current_size);
/* パイプサイズを 1MB に変更 */
int new_size = fcntl(pipefd[0], F_SETPIPE_SZ, 1048576);
printf("New pipe size: %d bytes\n", new_size);
close(pipefd[0]);
close(pipefd[1]);
return 0;
}
3.1.5 O_DIRECT パイプ (Linux 3.4+)
/*
* O_DIRECT パイプ: パケットモードでの通信
* 各 write() がアトミックな単位として扱われる
*/
#define _GNU_SOURCE
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
int main(void) {
int pipefd[2];
/* O_DIRECT フラグ付きパイプ */
if (pipe2(pipefd, O_DIRECT) == -1) {
perror("pipe2 O_DIRECT");
return 1;
}
/* 書き込み: 各 write は独立したパケットとして扱われる */
write(pipefd[1], "Hello", 5);
write(pipefd[1], "World", 5);
/* 読み取り: 各 read はパケット境界を保持 */
char buf[100];
ssize_t n;
n = read(pipefd[0], buf, sizeof(buf));
buf[n] = '\0';
printf("First read: '%s' (%zd bytes)\n", buf, n);
/* 出力: First read: 'Hello' (5 bytes) */
n = read(pipefd[0], buf, sizeof(buf));
buf[n] = '\0';
printf("Second read: '%s' (%zd bytes)\n", buf, n);
/* 出力: Second read: 'World' (5 bytes) */
close(pipefd[0]);
close(pipefd[1]);
return 0;
}
3.2 FIFO (名前付きパイプ)
3.2.1 基本概念
FIFO (First In, First Out) は名前付きパイプとも呼ばれ、ファイルシステム上にエントリを持つパイプである。無名パイプが親子プロセス間でのみ使用できるのに対し、FIFO は無関係なプロセス間で使用できる。
プロセスA ファイルシステム プロセスB
┌──────────┐ ┌──────────────────┐ ┌──────────┐
│ open() │ │ /tmp/myfifo │ │ open() │
│ write() │────────>│ (特殊ファイル) │────────>│ read() │
└──────────┘ │ prw-r--r-- 1 ... │ └──────────┘
└──────────────────┘
3.2.2 FIFO の作成と使用
# コマンドラインでの FIFO 作成
mkfifo /tmp/myfifo
mkfifo -m 0666 /tmp/myfifo # パーミッション指定
# FIFO の確認
ls -la /tmp/myfifo
# prw-r--r-- 1 user group 0 Apr 10 09:00 /tmp/myfifo
# 先頭の 'p' が FIFO であることを示す
# FIFO を使った通信テスト
# ターミナル1 (読み取り側):
cat /tmp/myfifo
# ターミナル2 (書き込み側):
echo "Hello from another process" > /tmp/myfifo
3.2.3 実践例: FIFO を使ったログ集約システム
/*
* fifo_log_server.c
* FIFO を使ったログ集約サーバ
* コンパイル: gcc -o fifo_log_server fifo_log_server.c
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <errno.h>
#include <time.h>
#include <signal.h>
#define FIFO_PATH "/tmp/log_fifo"
#define LOG_FILE "/tmp/aggregated.log"
#define BUF_SIZE 4096
static volatile sig_atomic_t running = 1;
void handle_signal(int sig) {
(void)sig;
running = 0;
}
int main(void) {
int fifo_fd, log_fd;
char buf[BUF_SIZE];
/* シグナルハンドラ設定 */
signal(SIGINT, handle_signal);
signal(SIGTERM, handle_signal);
/* 既存の FIFO を削除 */
unlink(FIFO_PATH);
/* FIFO を作成 (全ユーザが書き込み可能) */
if (mkfifo(FIFO_PATH, 0666) == -1) {
perror("mkfifo");
exit(EXIT_FAILURE);
}
printf("Log server started. FIFO: %s\n", FIFO_PATH);
printf("Send logs with: echo 'message' > %s\n", FIFO_PATH);
/* ログファイルを開く */
log_fd = open(LOG_FILE, O_WRONLY | O_CREAT | O_APPEND, 0644);
if (log_fd == -1) {
perror("open log file");
unlink(FIFO_PATH);
exit(EXIT_FAILURE);
}
while (running) {
/*
* FIFO を読み取り専用で開く
* 注意: 書き込み側がいない場合、open() はブロックする
* O_NONBLOCK を指定すると即座に返る
*/
fifo_fd = open(FIFO_PATH, O_RDONLY);
if (fifo_fd == -1) {
if (errno == EINTR) continue;
perror("open fifo");
break;
}
ssize_t n;
while ((n = read(fifo_fd, buf, sizeof(buf) - 1)) > 0) {
buf[n] = '\0';
/* タイムスタンプを付与してログファイルに書き込み */
time_t now = time(NULL);
char timestamp[64];
strftime(timestamp, sizeof(timestamp), "%Y-%m-%d %H:%M:%S",
localtime(&now));
char log_entry[BUF_SIZE + 128];
int len = snprintf(log_entry, sizeof(log_entry),
"[%s] %s", timestamp, buf);
write(log_fd, log_entry, len);
printf("%s", log_entry); /* 標準出力にも表示 */
}
close(fifo_fd);
}
close(log_fd);
unlink(FIFO_PATH);
printf("\nLog server stopped.\n");
return 0;
}
/*
* fifo_log_client.c
* FIFO ログクライアント
* コンパイル: gcc -o fifo_log_client fifo_log_client.c
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#define FIFO_PATH "/tmp/log_fifo"
int main(int argc, char *argv[]) {
if (argc < 2) {
fprintf(stderr, "Usage: %s <log message>\n", argv[0]);
exit(EXIT_FAILURE);
}
int fd = open(FIFO_PATH, O_WRONLY | O_NONBLOCK);
if (fd == -1) {
perror("open fifo (is the server running?)");
exit(EXIT_FAILURE);
}
/* プロセス情報付きでメッセージを送信 */
char msg[4096];
snprintf(msg, sizeof(msg), "[PID=%d] %s\n", getpid(), argv[1]);
write(fd, msg, strlen(msg));
close(fd);
return 0;
}
3.2.4 splice() によるゼロコピーパイプ転送
/*
* splice_demo.c
* splice() を使ったゼロコピーデータ転送
* ファイル -> パイプ -> ファイル をカーネル内で直接転送
* コンパイル: gcc -o splice_demo splice_demo.c
*/
#define _GNU_SOURCE
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
int main(int argc, char *argv[]) {
if (argc != 3) {
fprintf(stderr, "Usage: %s <source> <dest>\n", argv[0]);
exit(EXIT_FAILURE);
}
int src_fd = open(argv[1], O_RDONLY);
int dst_fd = open(argv[2], O_WRONLY | O_CREAT | O_TRUNC, 0644);
int pipefd[2];
pipe(pipefd);
ssize_t total = 0;
ssize_t n;
/* ファイル -> パイプ (ゼロコピー) */
while ((n = splice(src_fd, NULL, pipefd[1], NULL,
65536, SPLICE_F_MOVE | SPLICE_F_MORE)) > 0) {
/* パイプ -> ファイル (ゼロコピー) */
ssize_t written = splice(pipefd[0], NULL, dst_fd, NULL,
n, SPLICE_F_MOVE | SPLICE_F_MORE);
if (written == -1) {
perror("splice to file");
break;
}
total += written;
}
printf("Transferred %zd bytes (zero-copy via splice)\n", total);
close(src_fd);
close(dst_fd);
close(pipefd[0]);
close(pipefd[1]);
return 0;
}
3.3 パイプの制限事項と注意点
制限事項:
1. 単方向通信 (双方向にはパイプ2本必要)
2. バッファサイズ制限 (デフォルト 64KB)
3. PIPE_BUF (4096バイト) 以下の書き込みのみアトミック保証
4. プロセス終了時に自動的に破棄 (FIFO のファイルシステムエントリは残る)
5. ネットワーク透過性がない
PIPE_BUF のアトミック保証:
- write(fd, buf, n) で n <= PIPE_BUF の場合:
他の書き込みと混在しないことが POSIX で保証される
- n > PIPE_BUF の場合:
他の書き込みと混在する可能性がある
# PIPE_BUF の値を確認
getconf PIPE_BUF /
# 4096
# パイプバッファに関する sysctl パラメータ
sysctl fs.pipe-max-size
# fs.pipe-max-size = 1048576
sysctl fs.pipe-user-pages-hard
# fs.pipe-user-pages-hard = 0 (制限なし)
sysctl fs.pipe-user-pages-soft
# fs.pipe-user-pages-soft = 16384
4. System V IPC
4.1 概要
System V IPC は、AT&T の Unix System V で導入された IPC メカニズム群である。以下の3つのメカニズムから構成される:
- メッセージキュー (Message Queues)
- セマフォ (Semaphores)
- 共有メモリ (Shared Memory)
各メカニズムは IPC キー (key_t 型) と IPC 識別子 (int 型) によって管理される。
System V IPC の構造:
┌───────────────────────────────────────────┐
│ カーネル IPC サブシステム │
├─────────────┬─────────────┬───────────────┤
│ メッセージ │ セマフォ │ 共有メモリ │
│ キュー │ │ │
│ msqid_ds │ semid_ds │ shmid_ds │
│ msgget() │ semget() │ shmget() │
│ msgsnd() │ semop() │ shmat() │
│ msgrcv() │ semctl() │ shmdt() │
│ msgctl() │ │ shmctl() │
└─────────────┴─────────────┴───────────────┘
4.2 IPC キーの生成
#include <sys/ipc.h>
/*
* ftok() - ファイルパスとプロジェクト ID から IPC キーを生成
*
* 注意: ftok() は同じファイルの inode 番号と proj_id から
* キーを生成するため、ファイルが削除・再作成されると
* 異なるキーが生成される可能性がある
*/
key_t key = ftok("/tmp/myapp", 'A');
if (key == -1) {
perror("ftok");
exit(EXIT_FAILURE);
}
/* IPC_PRIVATE: 親子プロセス間でのみ使用する場合 */
/* key に IPC_PRIVATE を指定すると、常に新しい IPC オブジェクトが作成される */
4.3 System V メッセージキュー
4.3.1 カーネルデータ構造
/* include/uapi/linux/msg.h */
struct msqid_ds {
struct ipc_perm msg_perm; /* 所有権とパーミッション */
struct msg *msg_first; /* 最初のメッセージ (カーネル内部) */
struct msg *msg_last; /* 最後のメッセージ (カーネル内部) */
__kernel_time_t msg_stime; /* 最後の msgsnd() 時刻 */
__kernel_time_t msg_rtime; /* 最後の msgrcv() 時刻 */
__kernel_time_t msg_ctime; /* 最後の変更時刻 */
unsigned long msg_lcbytes; /* キュー内の現在バイト数 */
unsigned long msg_lqbytes; /* キューの最大バイト数 */
unsigned short msg_cbytes; /* キュー内の現在バイト数 */
unsigned short msg_qnum; /* キュー内のメッセージ数 */
unsigned short msg_qbytes; /* キューの最大バイト数 */
__kernel_ipc_pid_t msg_lspid; /* 最後の msgsnd() のPID */
__kernel_ipc_pid_t msg_lrpid; /* 最後の msgrcv() のPID */
};
4.3.2 実践例: 型付きメッセージキュー
/*
* sysv_msgq_server.c
* System V メッセージキューサーバ
* コンパイル: gcc -o msgq_server sysv_msgq_server.c
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <signal.h>
#define MSG_KEY_PATH "/tmp/msgq_key"
#define MSG_PROJ_ID 'M'
#define MSG_TEXT_SIZE 256
/* メッセージ型の定義 */
#define MSG_TYPE_REQUEST 1L
#define MSG_TYPE_RESPONSE 2L
#define MSG_TYPE_SHUTDOWN 99L
struct message {
long mtype; /* メッセージ型 (正の整数) */
char mtext[MSG_TEXT_SIZE]; /* メッセージ本文 */
pid_t sender_pid; /* 送信元 PID */
int request_id; /* リクエスト ID */
};
static volatile sig_atomic_t running = 1;
void cleanup_handler(int sig) {
(void)sig;
running = 0;
}
int main(void) {
key_t key;
int msqid;
struct message msg;
signal(SIGINT, cleanup_handler);
/* キーファイルの作成 */
FILE *fp = fopen(MSG_KEY_PATH, "w");
if (fp) fclose(fp);
/* IPC キーの生成 */
key = ftok(MSG_KEY_PATH, MSG_PROJ_ID);
if (key == -1) {
perror("ftok");
exit(EXIT_FAILURE);
}
/* メッセージキューの作成 */
msqid = msgget(key, IPC_CREAT | IPC_EXCL | 0666);
if (msqid == -1) {
perror("msgget");
/* 既存のキューを取得 */
msqid = msgget(key, 0666);
if (msqid == -1) {
perror("msgget (existing)");
exit(EXIT_FAILURE);
}
}
printf("Message queue server started (msqid=%d)\n", msqid);
/* キュー情報の表示 */
struct msqid_ds buf;
if (msgctl(msqid, IPC_STAT, &buf) == 0) {
printf(" Max bytes in queue: %lu\n",
(unsigned long)buf.msg_qbytes);
printf(" Current messages: %lu\n",
(unsigned long)buf.msg_qnum);
}
while (running) {
/* リクエストメッセージを受信 (ブロッキング) */
ssize_t n = msgrcv(msqid, &msg, sizeof(msg) - sizeof(long),
MSG_TYPE_REQUEST, 0);
if (n == -1) {
if (!running) break;
perror("msgrcv");
continue;
}
printf("[Server] Received request #%d from PID %d: %s\n",
msg.request_id, msg.sender_pid, msg.mtext);
/* シャットダウンメッセージのチェック */
if (strcmp(msg.mtext, "SHUTDOWN") == 0) {
printf("[Server] Shutdown requested.\n");
break;
}
/* レスポンスの作成と送信 */
struct message response;
response.mtype = MSG_TYPE_RESPONSE;
snprintf(response.mtext, MSG_TEXT_SIZE,
"ACK for request #%d: '%s' processed",
msg.request_id, msg.mtext);
response.sender_pid = getpid();
response.request_id = msg.request_id;
if (msgsnd(msqid, &response, sizeof(response) - sizeof(long), 0) == -1) {
perror("msgsnd");
}
}
/* メッセージキューの削除 */
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("msgctl IPC_RMID");
} else {
printf("[Server] Message queue removed.\n");
}
return 0;
}
/*
* sysv_msgq_client.c
* System V メッセージキュークライアント
* コンパイル: gcc -o msgq_client sysv_msgq_client.c
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <unistd.h>
#define MSG_KEY_PATH "/tmp/msgq_key"
#define MSG_PROJ_ID 'M'
#define MSG_TEXT_SIZE 256
#define MSG_TYPE_REQUEST 1L
#define MSG_TYPE_RESPONSE 2L
struct message {
long mtype;
char mtext[MSG_TEXT_SIZE];
pid_t sender_pid;
int request_id;
};
int main(int argc, char *argv[]) {
if (argc < 2) {
fprintf(stderr, "Usage: %s <message>\n", argv[0]);
exit(EXIT_FAILURE);
}
key_t key = ftok(MSG_KEY_PATH, MSG_PROJ_ID);
if (key == -1) {
perror("ftok");
exit(EXIT_FAILURE);
}
int msqid = msgget(key, 0666);
if (msqid == -1) {
perror("msgget (is the server running?)");
exit(EXIT_FAILURE);
}
/* リクエスト送信 */
struct message msg;
msg.mtype = MSG_TYPE_REQUEST;
strncpy(msg.mtext, argv[1], MSG_TEXT_SIZE - 1);
msg.mtext[MSG_TEXT_SIZE - 1] = '\0';
msg.sender_pid = getpid();
msg.request_id = getpid() % 10000; /* 簡易リクエストID */
printf("[Client PID=%d] Sending: %s\n", getpid(), msg.mtext);
if (msgsnd(msqid, &msg, sizeof(msg) - sizeof(long), 0) == -1) {
perror("msgsnd");
exit(EXIT_FAILURE);
}
/* レスポンス受信 */
struct message response;
ssize_t n = msgrcv(msqid, &response, sizeof(response) - sizeof(long),
MSG_TYPE_RESPONSE, 0);
if (n == -1) {
perror("msgrcv");
exit(EXIT_FAILURE);
}
printf("[Client PID=%d] Response: %s\n", getpid(), response.mtext);
return 0;
}
4.3.3 メッセージキューのカーネルパラメータ
# メッセージキューの制限値を確認
cat /proc/sys/kernel/msgmax # 1メッセージの最大サイズ (バイト)
# 8192
cat /proc/sys/kernel/msgmnb # 1キューの最大バイト数
# 16384
cat /proc/sys/kernel/msgmni # システム全体の最大キュー数
# 32000
# 制限値の変更
sudo sysctl -w kernel.msgmax=65536
sudo sysctl -w kernel.msgmnb=65536
sudo sysctl -w kernel.msgmni=64000
# 永続化 (/etc/sysctl.conf)
echo "kernel.msgmax = 65536" | sudo tee -a /etc/sysctl.conf
echo "kernel.msgmnb = 65536" | sudo tee -a /etc/sysctl.conf
4.4 System V セマフォ
4.4.1 基本概念
System V セマフォはセマフォの集合 (セマフォセット) として実装される。1つのセマフォセットに複数のセマフォを含めることができ、複数のセマフォに対するアトミック操作が可能である。
セマフォセット (semid)
┌─────────────────────────────────────────────┐
│ sem[0]=3 sem[1]=1 sem[2]=0 sem[3]=5 │
│ │
│ semop() で複数セマフォをアトミックに操作: │
│ sem[0] -= 1 │
│ sem[1] += 1 ← 全て成功 or 全て失敗 │
│ sem[2] -= 0 (ゼロ待ち) │
└─────────────────────────────────────────────┘
4.4.2 実践例: プロセス間排他制御
/*
* sysv_semaphore.c
* System V セマフォによる排他制御
* コンパイル: gcc -o sysv_sem sysv_semaphore.c
*/
#include <stdio.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
/* semctl() の引数用共用体 (一部のシステムで必要) */
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
};
/* セマフォ操作のヘルパー関数 */
int sem_lock(int semid) {
struct sembuf sop = {
.sem_num = 0, /* セマフォ番号 */
.sem_op = -1, /* P操作 (デクリメント) */
.sem_flg = SEM_UNDO /* プロセス終了時に自動復元 */
};
return semop(semid, &sop, 1);
}
int sem_unlock(int semid) {
struct sembuf sop = {
.sem_num = 0,
.sem_op = 1, /* V操作 (インクリメント) */
.sem_flg = SEM_UNDO
};
return semop(semid, &sop, 1);
}
int sem_trywait(int semid) {
struct sembuf sop = {
.sem_num = 0,
.sem_op = -1,
.sem_flg = SEM_UNDO | IPC_NOWAIT /* ノンブロッキング */
};
return semop(semid, &sop, 1);
}
int main(void) {
key_t key = ftok("/tmp", 'S');
if (key == -1) {
perror("ftok");
exit(EXIT_FAILURE);
}
/* セマフォセットの作成 (セマフォ1個) */
int semid = semget(key, 1, IPC_CREAT | IPC_EXCL | 0666);
int is_creator = (semid != -1);
if (!is_creator) {
/* 既存のセマフォセットを取得 */
semid = semget(key, 1, 0666);
if (semid == -1) {
perror("semget");
exit(EXIT_FAILURE);
}
} else {
/* セマフォの初期値を 1 に設定 (バイナリセマフォ) */
union semun arg;
arg.val = 1;
if (semctl(semid, 0, SETVAL, arg) == -1) {
perror("semctl SETVAL");
exit(EXIT_FAILURE);
}
printf("Semaphore created and initialized (semid=%d)\n", semid);
}
/* クリティカルセクション */
printf("[PID=%d] Waiting for lock...\n", getpid());
if (sem_lock(semid) == -1) {
perror("sem_lock");
exit(EXIT_FAILURE);
}
printf("[PID=%d] Acquired lock! Entering critical section.\n", getpid());
/* クリティカルセクション内の処理 */
for (int i = 0; i < 5; i++) {
printf("[PID=%d] Working... (%d/5)\n", getpid(), i + 1);
sleep(1);
}
printf("[PID=%d] Releasing lock.\n", getpid());
sem_unlock(semid);
/* 作成者のみクリーンアップ */
if (is_creator) {
sleep(2); /* 他のプロセスの終了を待つ */
semctl(semid, 0, IPC_RMID);
printf("Semaphore removed.\n");
}
return 0;
}
4.4.3 SEM_UNDO と SEMMSL/SEMMNS
# セマフォの制限値
cat /proc/sys/kernel/sem
# 250 32000 32 128
# SEMMSL SEMMNS SEMOPM SEMMNI
#
# SEMMSL: 1セマフォセットあたりの最大セマフォ数
# SEMMNS: システム全体の最大セマフォ数
# SEMOPM: 1回の semop() での最大操作数
# SEMMNI: システム全体の最大セマフォセット数
# 制限値の変更
sudo sysctl -w kernel.sem="500 64000 64 256"
4.5 System V 共有メモリ
4.5.1 基本概念
共有メモリは最も高速な IPC メカニズムである。複数のプロセスが同一の物理メモリ領域を共有することで、データコピーなしに通信できる。ただし、同期メカニズムを別途用意する必要がある。
プロセスA のアドレス空間 物理メモリ プロセスB のアドレス空間
┌─────────────────┐ ┌──────────┐ ┌─────────────────┐
│ ... │ │ │ │ ... │
│ 0x7f0012340000 ─│───────>│ 共有領域 │<───────│─ 0x7f0056780000 │
│ ... │ │ │ │ ... │
└─────────────────┘ └──────────┘ └─────────────────┘
仮想アドレスは異なるが、同じ物理メモリを参照
4.5.2 実践例: 共有メモリによるリングバッファ
/*
* sysv_shm_common.h
* 共有メモリリングバッファの共通定義
*/
#ifndef SYSV_SHM_COMMON_H
#define SYSV_SHM_COMMON_H
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#define SHM_KEY_PATH "/tmp"
#define SHM_PROJ_ID 'R'
#define SEM_PROJ_ID 'S'
#define RING_SIZE 16
#define MSG_SIZE 256
/* 共有メモリ上のリングバッファ構造体 */
struct shared_ring_buffer {
int read_index; /* 読み取り位置 */
int write_index; /* 書き込み位置 */
int count; /* 要素数 */
char data[RING_SIZE][MSG_SIZE]; /* データバッファ */
};
/* セマフォインデックス */
#define SEM_MUTEX 0 /* 排他制御用 */
#define SEM_EMPTY 1 /* 空きスロット数 */
#define SEM_FULL 2 /* 使用中スロット数 */
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
};
static inline void sem_op(int semid, int sem_num, int op) {
struct sembuf sop = {
.sem_num = sem_num,
.sem_op = op,
.sem_flg = SEM_UNDO
};
if (semop(semid, &sop, 1) == -1) {
perror("semop");
exit(EXIT_FAILURE);
}
}
#define sem_wait(semid, num) sem_op(semid, num, -1)
#define sem_signal(semid, num) sem_op(semid, num, 1)
#endif /* SYSV_SHM_COMMON_H */
/*
* sysv_shm_writer.c
* 共有メモリリングバッファへの書き込み
* コンパイル: gcc -o shm_writer sysv_shm_writer.c
*/
#include "sysv_shm_common.h"
int main(void) {
key_t shm_key = ftok(SHM_KEY_PATH, SHM_PROJ_ID);
key_t sem_key = ftok(SHM_KEY_PATH, SEM_PROJ_ID);
/* 共有メモリの作成 */
int shmid = shmget(shm_key, sizeof(struct shared_ring_buffer),
IPC_CREAT | 0666);
if (shmid == -1) {
perror("shmget");
exit(EXIT_FAILURE);
}
/* 共有メモリをアタッチ */
struct shared_ring_buffer *ring = shmat(shmid, NULL, 0);
if (ring == (void *)-1) {
perror("shmat");
exit(EXIT_FAILURE);
}
/* セマフォセットの作成 (3個) */
int semid = semget(sem_key, 3, IPC_CREAT | 0666);
if (semid == -1) {
perror("semget");
exit(EXIT_FAILURE);
}
/* 初期化 */
union semun arg;
arg.val = 1;
semctl(semid, SEM_MUTEX, SETVAL, arg); /* mutex = 1 */
arg.val = RING_SIZE;
semctl(semid, SEM_EMPTY, SETVAL, arg); /* empty = RING_SIZE */
arg.val = 0;
semctl(semid, SEM_FULL, SETVAL, arg); /* full = 0 */
memset(ring, 0, sizeof(*ring));
printf("Writer started (shmid=%d, semid=%d)\n", shmid, semid);
/* データ書き込み */
for (int i = 1; i <= 20; i++) {
sem_wait(semid, SEM_EMPTY); /* 空きスロットを待つ */
sem_wait(semid, SEM_MUTEX); /* 排他制御 */
/* リングバッファに書き込み */
snprintf(ring->data[ring->write_index], MSG_SIZE,
"Message #%d from writer (PID=%d)", i, getpid());
printf("[Writer] Wrote to slot %d: %s\n",
ring->write_index, ring->data[ring->write_index]);
ring->write_index = (ring->write_index + 1) % RING_SIZE;
ring->count++;
sem_signal(semid, SEM_MUTEX); /* 排他解除 */
sem_signal(semid, SEM_FULL); /* データありを通知 */
usleep(200000); /* 200ms */
}
/* 終了マーカーの書き込み */
sem_wait(semid, SEM_EMPTY);
sem_wait(semid, SEM_MUTEX);
snprintf(ring->data[ring->write_index], MSG_SIZE, "EXIT");
ring->write_index = (ring->write_index + 1) % RING_SIZE;
ring->count++;
sem_signal(semid, SEM_MUTEX);
sem_signal(semid, SEM_FULL);
/* デタッチ */
shmdt(ring);
printf("[Writer] Done.\n");
return 0;
}
/*
* sysv_shm_reader.c
* 共有メモリリングバッファからの読み取り
* コンパイル: gcc -o shm_reader sysv_shm_reader.c
*/
#include "sysv_shm_common.h"
int main(void) {
key_t shm_key = ftok(SHM_KEY_PATH, SHM_PROJ_ID);
key_t sem_key = ftok(SHM_KEY_PATH, SEM_PROJ_ID);
/* 共有メモリの取得 */
int shmid = shmget(shm_key, sizeof(struct shared_ring_buffer), 0666);
if (shmid == -1) {
perror("shmget (run writer first)");
exit(EXIT_FAILURE);
}
struct shared_ring_buffer *ring = shmat(shmid, NULL, SHM_RDONLY);
if (ring == (void *)-1) {
perror("shmat");
exit(EXIT_FAILURE);
}
int semid = semget(sem_key, 3, 0666);
if (semid == -1) {
perror("semget");
exit(EXIT_FAILURE);
}
printf("Reader started.\n");
while (1) {
sem_wait(semid, SEM_FULL); /* データを待つ */
sem_wait(semid, SEM_MUTEX); /* 排他制御 */
/* リングバッファから読み取り */
/* 注意: SHM_RDONLY でアタッチしたが、セマフォ経由で
read_index を更新する必要があるため、実際には
書き込みも必要。本来は SHM_RDONLY を外すべき */
struct shared_ring_buffer *rw_ring =
(struct shared_ring_buffer *)ring;
char msg[MSG_SIZE];
strncpy(msg, rw_ring->data[rw_ring->read_index], MSG_SIZE);
printf("[Reader] Read from slot %d: %s\n",
rw_ring->read_index, msg);
rw_ring->read_index = (rw_ring->read_index + 1) % RING_SIZE;
rw_ring->count--;
sem_signal(semid, SEM_MUTEX);
sem_signal(semid, SEM_EMPTY); /* 空きスロットを通知 */
if (strcmp(msg, "EXIT") == 0) break;
}
/* クリーンアップ */
shmdt(ring);
shmctl(shmid, IPC_RMID, NULL);
semctl(semid, 0, IPC_RMID);
printf("[Reader] Cleaned up.\n");
return 0;
}
4.5.3 共有メモリのカーネルパラメータ
# 共有メモリの制限値
cat /proc/sys/kernel/shmmax # 1セグメントの最大サイズ (バイト)
# 18446744073692774399 (事実上無制限)
cat /proc/sys/kernel/shmall # システム全体の共有メモリページ数
# 18446744073692774399
cat /proc/sys/kernel/shmmni # セグメントの最大数
# 4096
# 制限値の変更 (例: Oracle Database 用)
sudo sysctl -w kernel.shmmax=68719476736 # 64GB
sudo sysctl -w kernel.shmall=16777216 # 64GB / 4KB
sudo sysctl -w kernel.shmmni=4096
# hugetlb を使った大規模共有メモリ
# Huge Pages の設定
echo 1024 | sudo tee /proc/sys/vm/nr_hugepages
shmget(key, size, IPC_CREAT | SHM_HUGETLB | 0666);
5. POSIX IPC
5.1 概要
POSIX IPC は System V IPC の後継として設計された、より近代的で使いやすい API である。
| 特徴 | System V IPC | POSIX IPC |
|---|---|---|
| API 命名規則 | 不統一 (msgget, semget等) | 一貫性 (_open, _close等) |
| 名前空間 | 数値キー (key_t) | 文字列名 (/name) |
| リファレンスカウント | なし | あり |
| スレッドセーフ | 一部 | 全て |
| ファイルシステム | /proc/sysvipc/* | /dev/mqueue, /dev/shm |
| 優先度付きキュー | なし | あり (mq_*) |
| 通知メカニズム | なし | あり (mq_notify) |
| close-on-exec | 不可 | O_CLOEXEC |
5.2 POSIX メッセージキュー (mq_*)
5.2.1 API 概要
#include <mqueue.h>
/* メッセージキューのオープン/作成 */
mqd_t mq_open(const char *name, int oflag, ...);
/* name: "/name" 形式 (先頭に / が必要、/ は1つだけ) */
/* メッセージの送受信 */
int mq_send(mqd_t mqdes, const char *msg_ptr,
size_t msg_len, unsigned int msg_prio);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr,
size_t msg_len, unsigned int *msg_prio);
/* タイムアウト付き送受信 */
int mq_timedsend(mqd_t mqdes, const char *msg_ptr,
size_t msg_len, unsigned int msg_prio,
const struct timespec *abs_timeout);
ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr,
size_t msg_len, unsigned int *msg_prio,
const struct timespec *abs_timeout);
/* 非同期通知 */
int mq_notify(mqd_t mqdes, const struct sigevent *sevp);
/* 属性の取得/設定 */
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr,
struct mq_attr *oldattr);
/* クローズとアンリンク */
int mq_close(mqd_t mqdes);
int mq_unlink(const char *name);
5.2.2 実践例: 優先度付きメッセージキュー
/*
* posix_mq_demo.c
* POSIX メッセージキューの優先度機能デモ
* コンパイル: gcc -o posix_mq posix_mq_demo.c -lrt
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#define QUEUE_NAME "/demo_priority_queue"
#define MAX_MSG_SIZE 256
#define MAX_MSGS 10
/* 優先度定義 */
#define PRIO_LOW 0
#define PRIO_NORMAL 5
#define PRIO_HIGH 10
#define PRIO_URGENT 15
int main(void) {
mqd_t mq;
struct mq_attr attr;
/* キュー属性の設定 */
attr.mq_flags = 0;
attr.mq_maxmsg = MAX_MSGS;
attr.mq_msgsize = MAX_MSG_SIZE;
attr.mq_curmsgs = 0;
/* 既存のキューを削除 */
mq_unlink(QUEUE_NAME);
/* メッセージキューの作成 */
mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(EXIT_FAILURE);
}
printf("=== Sending messages with different priorities ===\n");
/* 異なる優先度でメッセージを送信 */
struct {
const char *msg;
unsigned int prio;
const char *prio_name;
} messages[] = {
{"Background task completed", PRIO_LOW, "LOW"},
{"User request processed", PRIO_NORMAL, "NORMAL"},
{"Database connection lost", PRIO_HIGH, "HIGH"},
{"Regular log entry", PRIO_LOW, "LOW"},
{"System alert triggered", PRIO_URGENT, "URGENT"},
{"Config file updated", PRIO_NORMAL, "NORMAL"},
{"Critical: disk space low", PRIO_URGENT, "URGENT"},
{"Periodic health check passed", PRIO_LOW, "LOW"},
};
int num_msgs = sizeof(messages) / sizeof(messages[0]);
for (int i = 0; i < num_msgs; i++) {
if (mq_send(mq, messages[i].msg, strlen(messages[i].msg) + 1,
messages[i].prio) == -1) {
perror("mq_send");
continue;
}
printf(" Sent [%s]: %s\n", messages[i].prio_name, messages[i].msg);
}
printf("\n=== Receiving messages (highest priority first) ===\n");
/* メッセージの受信 (優先度の高い順に取り出される) */
char buf[MAX_MSG_SIZE + 1];
unsigned int prio;
for (int i = 0; i < num_msgs; i++) {
ssize_t n = mq_receive(mq, buf, MAX_MSG_SIZE + 1, &prio);
if (n == -1) {
perror("mq_receive");
break;
}
printf(" Received [prio=%2u]: %s\n", prio, buf);
}
/* キューの属性を表示 */
mq_getattr(mq, &attr);
printf("\nQueue attributes:\n");
printf(" Max messages: %ld\n", attr.mq_maxmsg);
printf(" Max message size: %ld\n", attr.mq_msgsize);
printf(" Current messages: %ld\n", attr.mq_curmsgs);
mq_close(mq);
mq_unlink(QUEUE_NAME);
return 0;
}
5.2.3 mq_notify による非同期通知
/*
* posix_mq_notify.c
* mq_notify() を使った非同期メッセージ通知
* コンパイル: gcc -o mq_notify posix_mq_notify.c -lrt -pthread
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>
#include <signal.h>
#include <pthread.h>
#include <unistd.h>
#define QUEUE_NAME "/notify_demo"
#define MAX_MSG_SIZE 256
static mqd_t mq;
/* スレッドで通知を処理 */
static void thread_handler(union sigval sv) {
char buf[MAX_MSG_SIZE + 1];
unsigned int prio;
ssize_t n;
/* 再度通知を登録 (1回限りのため) */
struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD;
sev.sigev_notify_function = thread_handler;
sev.sigev_notify_attributes = NULL;
sev.sigev_value.sival_ptr = sv.sival_ptr;
mq_notify(mq, &sev);
/* 全てのメッセージを読み取り */
while ((n = mq_receive(mq, buf, MAX_MSG_SIZE + 1, &prio)) > 0) {
buf[n] = '\0';
printf("[Notification Thread] Received (prio=%u): %s\n", prio, buf);
}
}
int main(void) {
struct mq_attr attr = {
.mq_maxmsg = 10,
.mq_msgsize = MAX_MSG_SIZE
};
mq_unlink(QUEUE_NAME);
mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR | O_NONBLOCK, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(EXIT_FAILURE);
}
/* 非同期通知の設定 */
struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD;
sev.sigev_notify_function = thread_handler;
sev.sigev_notify_attributes = NULL;
sev.sigev_value.sival_ptr = NULL;
if (mq_notify(mq, &sev) == -1) {
perror("mq_notify");
exit(EXIT_FAILURE);
}
printf("Waiting for messages on %s...\n", QUEUE_NAME);
printf("Send with: echo 'hello' | mq_send %s\n\n", QUEUE_NAME);
/* メインスレッドはメッセージを送信 */
for (int i = 0; i < 5; i++) {
sleep(2);
char msg[64];
snprintf(msg, sizeof(msg), "Auto-message #%d", i + 1);
mq_send(mq, msg, strlen(msg) + 1, i);
}
sleep(1);
mq_close(mq);
mq_unlink(QUEUE_NAME);
return 0;
}
5.2.4 POSIX メッセージキューのカーネルパラメータ
# POSIX メッセージキューの制限値
cat /proc/sys/fs/mqueue/msg_max # キューあたりの最大メッセージ数
# 10
cat /proc/sys/fs/mqueue/msgsize_max # メッセージの最大サイズ
# 8192
cat /proc/sys/fs/mqueue/queues_max # システム全体の最大キュー数
# 256
# mqueue ファイルシステムのマウント
mount -t mqueue none /dev/mqueue
# マウントされたキューの確認
ls -la /dev/mqueue/
5.3 POSIX セマフォ (sem_*)
5.3.1 名前付きセマフォと無名セマフォ
/*
* posix_semaphore_demo.c
* POSIX セマフォの使用例
* コンパイル: gcc -o posix_sem posix_semaphore_demo.c -pthread
*/
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <pthread.h>
#include <errno.h>
#include <time.h>
/* --- 名前付きセマフォ (プロセス間) --- */
void named_semaphore_demo(void) {
const char *sem_name = "/demo_semaphore";
sem_t *sem;
/* セマフォの作成 (初期値 = 1: バイナリセマフォ) */
sem = sem_open(sem_name, O_CREAT, 0644, 1);
if (sem == SEM_FAILED) {
perror("sem_open");
return;
}
/* 値の取得 */
int val;
sem_getvalue(sem, &val);
printf("Semaphore value: %d\n", val);
/* P操作 (待機) */
printf("Acquiring semaphore...\n");
sem_wait(sem);
sem_getvalue(sem, &val);
printf("After wait, value: %d\n", val);
/* タイムアウト付き待機 */
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += 2; /* 2秒後にタイムアウト */
if (sem_timedwait(sem, &ts) == -1) {
if (errno == ETIMEDOUT) {
printf("sem_timedwait: timed out (expected)\n");
}
}
/* V操作 (解放) */
sem_post(sem);
/* クリーンアップ */
sem_close(sem);
sem_unlink(sem_name);
}
/* --- 無名セマフォ (スレッド間) --- */
#define NUM_THREADS 4
#define NUM_ITERATIONS 5
sem_t unnamed_sem;
int shared_counter = 0;
void *thread_func(void *arg) {
int id = *(int *)arg;
for (int i = 0; i < NUM_ITERATIONS; i++) {
sem_wait(&unnamed_sem);
int local = shared_counter;
usleep(1000); /* 競合状態を明示的に作る */
shared_counter = local + 1;
printf("[Thread %d] counter = %d\n", id, shared_counter);
sem_post(&unnamed_sem);
}
return NULL;
}
void unnamed_semaphore_demo(void) {
/* 無名セマフォの初期化 */
/* 第2引数: 0=スレッド間, 非0=プロセス間 (共有メモリ上) */
sem_init(&unnamed_sem, 0, 1);
pthread_t threads[NUM_THREADS];
int ids[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
ids[i] = i;
pthread_create(&threads[i], NULL, thread_func, &ids[i]);
}
for (int i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i], NULL);
}
printf("Final counter: %d (expected: %d)\n",
shared_counter, NUM_THREADS * NUM_ITERATIONS);
sem_destroy(&unnamed_sem);
}
int main(void) {
printf("=== Named Semaphore Demo ===\n");
named_semaphore_demo();
printf("\n=== Unnamed Semaphore Demo ===\n");
unnamed_semaphore_demo();
return 0;
}
5.4 POSIX 共有メモリ (shm_*)
/*
* posix_shm_demo.c
* POSIX 共有メモリによるプロセス間データ共有
* コンパイル: gcc -o posix_shm posix_shm_demo.c -lrt -pthread
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <semaphore.h>
#include <sys/wait.h>
#include <time.h>
#define SHM_NAME "/demo_shared_mem"
#define SEM_NAME "/demo_shm_sem"
/* 共有メモリ上のデータ構造 */
struct shared_data {
int counter;
double values[100];
char last_message[256];
pid_t last_writer;
struct timespec last_update;
};
int main(void) {
int shm_fd;
struct shared_data *data;
sem_t *sem;
/* 既存のオブジェクトを削除 */
shm_unlink(SHM_NAME);
sem_unlink(SEM_NAME);
/* 共有メモリオブジェクトの作成 */
shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0644);
if (shm_fd == -1) {
perror("shm_open");
exit(EXIT_FAILURE);
}
/* サイズの設定 */
if (ftruncate(shm_fd, sizeof(struct shared_data)) == -1) {
perror("ftruncate");
exit(EXIT_FAILURE);
}
/* メモリマッピング */
data = mmap(NULL, sizeof(struct shared_data),
PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (data == MAP_FAILED) {
perror("mmap");
exit(EXIT_FAILURE);
}
/* fd は mmap 後にクローズ可能 */
close(shm_fd);
/* セマフォの作成 */
sem = sem_open(SEM_NAME, O_CREAT, 0644, 1);
if (sem == SEM_FAILED) {
perror("sem_open");
exit(EXIT_FAILURE);
}
/* 初期化 */
memset(data, 0, sizeof(struct shared_data));
pid_t pid = fork();
if (pid == -1) {
perror("fork");
exit(EXIT_FAILURE);
}
if (pid == 0) {
/* 子プロセス: 書き込み */
for (int i = 0; i < 10; i++) {
sem_wait(sem);
data->counter++;
data->values[i] = i * 3.14159;
snprintf(data->last_message, sizeof(data->last_message),
"Update #%d from child", i + 1);
data->last_writer = getpid();
clock_gettime(CLOCK_REALTIME, &data->last_update);
printf("[Child] counter=%d, msg='%s'\n",
data->counter, data->last_message);
sem_post(sem);
usleep(100000);
}
exit(EXIT_SUCCESS);
}
/* 親プロセス: 読み取り */
for (int i = 0; i < 10; i++) {
usleep(150000);
sem_wait(sem);
printf("[Parent] counter=%d, last_writer=%d, msg='%s'\n",
data->counter, data->last_writer, data->last_message);
sem_post(sem);
}
wait(NULL);
/* クリーンアップ */
munmap(data, sizeof(struct shared_data));
sem_close(sem);
sem_unlink(SEM_NAME);
shm_unlink(SHM_NAME);
printf("Final counter: %d\n", data->counter);
return 0;
}
# POSIX 共有メモリの確認
ls -la /dev/shm/
# 共有メモリオブジェクトの手動削除
rm /dev/shm/demo_shared_mem
# tmpfs のサイズ確認
df -h /dev/shm
# tmpfs 3.9G 100K 3.9G 1% /dev/shm
# tmpfs のサイズ変更
sudo mount -o remount,size=8G /dev/shm
6. Unix ドメインソケット
6.1 概要
Unix ドメインソケット (Unix Domain Socket, UDS) は、同一ホスト上のプロセス間通信に特化したソケットインターフェースである。TCP/UDP ソケットと同じ API を使用しながら、ネットワークスタックを経由しないため高速に動作する。
特徴:
- 双方向通信
- バイトストリーム (SOCK_STREAM) とデータグラム (SOCK_DGRAM) の両方をサポート
- ファイルディスクリプタの受け渡し (SCM_RIGHTS)
- 認証情報の送受信 (SCM_CREDENTIALS)
- 抽象名前空間 (Linux 固有)
- ソケットペア (socketpair)
6.2 ソケットタイプの比較
| 特徴 | SOCK_STREAM | SOCK_DGRAM | SOCK_SEQPACKET |
|---|---|---|---|
| 接続型 | はい | いいえ | はい |
| 信頼性 | 保証 | 保証 | 保証 |
| 順序保証 | はい | はい | はい |
| メッセージ境界 | なし | あり | あり |
| ユースケース | 汎用 | 軽量通知 | プロトコル設計 |
6.3 実践例: クライアント-サーバモデル
/*
* unix_socket_server.c
* Unix ドメインソケットサーバ (SOCK_STREAM)
* コンパイル: gcc -o uds_server unix_socket_server.c -pthread
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <pthread.h>
#include <signal.h>
#define SOCKET_PATH "/tmp/demo.sock"
#define BUF_SIZE 4096
#define MAX_CLIENTS 10
static volatile sig_atomic_t running = 1;
void handle_signal(int sig) {
(void)sig;
running = 0;
}
/* クライアント接続を処理するスレッド */
void *client_handler(void *arg) {
int client_fd = *(int *)arg;
free(arg);
char buf[BUF_SIZE];
ssize_t n;
/* クライアントの認証情報を取得 */
struct ucred cred;
socklen_t len = sizeof(cred);
if (getsockopt(client_fd, SOL_SOCKET, SO_PEERCRED, &cred, &len) == 0) {
printf("[Server] Client connected: PID=%d, UID=%d, GID=%d\n",
cred.pid, cred.uid, cred.gid);
}
while ((n = recv(client_fd, buf, sizeof(buf) - 1, 0)) > 0) {
buf[n] = '\0';
printf("[Server] Received from PID %d: %s\n", cred.pid, buf);
/* エコーレスポンス */
char response[BUF_SIZE];
int resp_len = snprintf(response, sizeof(response),
"Echo: %s (processed by server)", buf);
send(client_fd, response, resp_len, 0);
}
printf("[Server] Client PID %d disconnected.\n", cred.pid);
close(client_fd);
return NULL;
}
int main(void) {
int server_fd;
struct sockaddr_un addr;
signal(SIGINT, handle_signal);
/* 既存のソケットファイルを削除 */
unlink(SOCKET_PATH);
/* ソケットの作成 */
server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (server_fd == -1) {
perror("socket");
exit(EXIT_FAILURE);
}
/* アドレスの設定 */
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, SOCKET_PATH, sizeof(addr.sun_path) - 1);
/* バインド */
if (bind(server_fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
perror("bind");
close(server_fd);
exit(EXIT_FAILURE);
}
/* パーミッションの設定 */
chmod(SOCKET_PATH, 0666);
/* リッスン */
if (listen(server_fd, MAX_CLIENTS) == -1) {
perror("listen");
close(server_fd);
exit(EXIT_FAILURE);
}
printf("Server listening on %s\n", SOCKET_PATH);
while (running) {
struct sockaddr_un client_addr;
socklen_t client_len = sizeof(client_addr);
int *client_fd = malloc(sizeof(int));
*client_fd = accept(server_fd, (struct sockaddr *)&client_addr,
&client_len);
if (*client_fd == -1) {
free(client_fd);
if (!running) break;
perror("accept");
continue;
}
/* 新しいスレッドでクライアントを処理 */
pthread_t thread;
pthread_create(&thread, NULL, client_handler, client_fd);
pthread_detach(thread);
}
close(server_fd);
unlink(SOCKET_PATH);
printf("\nServer stopped.\n");
return 0;
}
/*
* unix_socket_client.c
* Unix ドメインソケットクライアント
* コンパイル: gcc -o uds_client unix_socket_client.c
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
#define SOCKET_PATH "/tmp/demo.sock"
#define BUF_SIZE 4096
int main(int argc, char *argv[]) {
int sock_fd;
struct sockaddr_un addr;
sock_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sock_fd == -1) {
perror("socket");
exit(EXIT_FAILURE);
}
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, SOCKET_PATH, sizeof(addr.sun_path) - 1);
if (connect(sock_fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
perror("connect");
close(sock_fd);
exit(EXIT_FAILURE);
}
printf("Connected to server.\n");
/* メッセージの送受信 */
const char *messages[] = {
"Hello from client",
"Testing Unix domain socket",
"IPC is fun",
NULL
};
char buf[BUF_SIZE];
for (int i = 0; messages[i]; i++) {
send(sock_fd, messages[i], strlen(messages[i]), 0);
printf("[Client] Sent: %s\n", messages[i]);
ssize_t n = recv(sock_fd, buf, sizeof(buf) - 1, 0);
if (n > 0) {
buf[n] = '\0';
printf("[Client] Received: %s\n", buf);
}
usleep(500000);
}
close(sock_fd);
return 0;
}
6.4 ファイルディスクリプタの受け渡し (SCM_RIGHTS)
/*
* fd_passing.c
* Unix ドメインソケットを使ったファイルディスクリプタの受け渡し
* コンパイル: gcc -o fd_passing fd_passing.c
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/wait.h>
#include <fcntl.h>
/* ファイルディスクリプタを送信 */
int send_fd(int socket, int fd_to_send) {
struct msghdr msg = {0};
struct cmsghdr *cmsg;
char buf[CMSG_SPACE(sizeof(int))];
struct iovec iov;
char data = 'F'; /* ダミーデータ (少なくとも1バイト必要) */
iov.iov_base = &data;
iov.iov_len = 1;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = buf;
msg.msg_controllen = sizeof(buf);
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
*((int *)CMSG_DATA(cmsg)) = fd_to_send;
return sendmsg(socket, &msg, 0);
}
/* ファイルディスクリプタを受信 */
int recv_fd(int socket) {
struct msghdr msg = {0};
struct cmsghdr *cmsg;
char buf[CMSG_SPACE(sizeof(int))];
struct iovec iov;
char data;
iov.iov_base = &data;
iov.iov_len = 1;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = buf;
msg.msg_controllen = sizeof(buf);
if (recvmsg(socket, &msg, 0) == -1) {
return -1;
}
cmsg = CMSG_FIRSTHDR(&msg);
if (cmsg && cmsg->cmsg_level == SOL_SOCKET &&
cmsg->cmsg_type == SCM_RIGHTS) {
return *((int *)CMSG_DATA(cmsg));
}
return -1;
}
int main(void) {
int sv[2]; /* ソケットペア */
if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
perror("socketpair");
exit(EXIT_FAILURE);
}
pid_t pid = fork();
if (pid == 0) {
/* 子プロセス: ファイルを開いて親に fd を送信 */
close(sv[0]);
int fd = open("/etc/hostname", O_RDONLY);
if (fd == -1) {
perror("open");
exit(EXIT_FAILURE);
}
printf("[Child] Opened /etc/hostname as fd=%d, sending to parent\n", fd);
send_fd(sv[1], fd);
close(fd);
close(sv[1]);
exit(EXIT_SUCCESS);
} else {
/* 親プロセス: 子から fd を受信 */
close(sv[1]);
int received_fd = recv_fd(sv[0]);
if (received_fd == -1) {
fprintf(stderr, "Failed to receive fd\n");
exit(EXIT_FAILURE);
}
printf("[Parent] Received fd=%d from child, reading content:\n",
received_fd);
char buf[256];
ssize_t n = read(received_fd, buf, sizeof(buf) - 1);
if (n > 0) {
buf[n] = '\0';
printf("[Parent] Content: %s", buf);
}
close(received_fd);
close(sv[0]);
wait(NULL);
}
return 0;
}
6.5 抽象ソケット名前空間 (Linux 固有)
/*
* 抽象名前空間ソケット
* sun_path[0] = '\0' で始まるパスを使用
* ファイルシステム上にエントリを作成しない
*/
struct sockaddr_un addr;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
/* 抽象名前空間: 先頭にヌルバイト */
addr.sun_path[0] = '\0';
strncpy(addr.sun_path + 1, "my_abstract_socket", sizeof(addr.sun_path) - 2);
/* バインド時に正しいアドレスサイズを計算 */
socklen_t addr_len = offsetof(struct sockaddr_un, sun_path) +
1 + strlen("my_abstract_socket");
bind(sock_fd, (struct sockaddr *)&addr, addr_len);
/*
* 利点:
* - ファイルシステムに痕跡を残さない
* - unlink() が不要
* - プロセス終了時に自動的にクリーンアップ
*
* 欠点:
* - Linux 固有 (ポータビリティなし)
* - ファイルパーミッションによる制御ができない
*/
6.6 socketpair の活用
# socat を使った Unix ドメインソケットのデバッグ
socat - UNIX-CONNECT:/tmp/demo.sock
# ss/netstat による Unix ドメインソケットの確認
ss -x # 全ての Unix ドメインソケット
ss -xlp # リッスンソケットとプロセス情報
ss -x -e state established # 接続済みソケット
# /proc を通じた確認
cat /proc/net/unix
7. シグナル
7.1 シグナルの基本
シグナルは Unix の最も古い IPC メカニズムの一つであり、プロセスにイベントを非同期的に通知するために使用される。
7.1.1 シグナル一覧
標準シグナル (POSIX):
信号番号 名前 デフォルト動作 説明
1 SIGHUP 終了 ハングアップ (端末切断)
2 SIGINT 終了 キーボード割り込み (Ctrl+C)
3 SIGQUIT コアダンプ キーボード終了 (Ctrl+\)
6 SIGABRT コアダンプ abort() からの異常終了
9 SIGKILL 終了 強制終了 (捕捉不可)
10 SIGUSR1 終了 ユーザ定義1
11 SIGSEGV コアダンプ 無効なメモリ参照
12 SIGUSR2 終了 ユーザ定義2
13 SIGPIPE 終了 壊れたパイプへの書き込み
14 SIGALRM 終了 alarm() タイマー
15 SIGTERM 終了 終了要求
17 SIGCHLD 無視 子プロセスの終了
18 SIGCONT 続行 停止プロセスの続行
19 SIGSTOP 停止 プロセス停止 (捕捉不可)
20 SIGTSTP 停止 端末停止 (Ctrl+Z)
リアルタイムシグナル (Linux):
34-64 SIGRTMIN〜SIGRTMAX 終了 キュー可能、優先度付き
7.2 sigaction() による堅牢なシグナル処理
/*
* signal_handling.c
* sigaction() を使った堅牢なシグナル処理
* コンパイル: gcc -o sig_demo signal_handling.c
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
/* シグナル安全な書き込み関数 */
static void safe_write(const char *msg) {
/* 注意: printf() はシグナルハンドラ内で安全ではない
* write() はシグナル安全な関数 (async-signal-safe) */
write(STDOUT_FILENO, msg, strlen(msg));
}
/* SIGINT ハンドラ (SA_SIGINFO 版) */
void sigint_handler(int sig, siginfo_t *info, void *ucontext) {
(void)ucontext;
char buf[128];
int len = snprintf(buf, sizeof(buf),
"\n[Handler] SIGINT received (sig=%d, sender PID=%d)\n",
sig, info->si_pid);
write(STDOUT_FILENO, buf, len);
}
/* SIGUSR1 ハンドラ */
void sigusr1_handler(int sig, siginfo_t *info, void *ucontext) {
(void)sig;
(void)ucontext;
char buf[128];
int len = snprintf(buf, sizeof(buf),
"[Handler] SIGUSR1: value=%d from PID=%d\n",
info->si_value.sival_int, info->si_pid);
write(STDOUT_FILENO, buf, len);
}
/* SIGCHLD ハンドラ */
void sigchld_handler(int sig, siginfo_t *info, void *ucontext) {
(void)sig;
(void)ucontext;
char buf[256];
int len;
/* 子プロセスの終了状態を確認 */
if (info->si_code == CLD_EXITED) {
len = snprintf(buf, sizeof(buf),
"[Handler] Child PID=%d exited with status=%d\n",
info->si_pid, info->si_status);
} else if (info->si_code == CLD_KILLED) {
len = snprintf(buf, sizeof(buf),
"[Handler] Child PID=%d killed by signal=%d\n",
info->si_pid, info->si_status);
} else {
len = snprintf(buf, sizeof(buf),
"[Handler] Child PID=%d, code=%d\n",
info->si_pid, info->si_code);
}
write(STDOUT_FILENO, buf, len);
}
int main(void) {
struct sigaction sa;
/* SIGINT ハンドラの設定 */
memset(&sa, 0, sizeof(sa));
sa.sa_sigaction = sigint_handler;
sa.sa_flags = SA_SIGINFO | SA_RESTART; /* SA_RESTART: システムコール再開 */
sigemptyset(&sa.sa_mask);
sigaddset(&sa.sa_mask, SIGUSR1); /* SIGINT 処理中は SIGUSR1 をブロック */
sigaction(SIGINT, &sa, NULL);
/* SIGUSR1 ハンドラの設定 */
memset(&sa, 0, sizeof(sa));
sa.sa_sigaction = sigusr1_handler;
sa.sa_flags = SA_SIGINFO;
sigemptyset(&sa.sa_mask);
sigaction(SIGUSR1, &sa, NULL);
/* SIGCHLD ハンドラの設定 */
memset(&sa, 0, sizeof(sa));
sa.sa_sigaction = sigchld_handler;
sa.sa_flags = SA_SIGINFO | SA_NOCLDSTOP; /* 停止通知を受けない */
sigemptyset(&sa.sa_mask);
sigaction(SIGCHLD, &sa, NULL);
printf("PID=%d, waiting for signals...\n", getpid());
printf(" Send SIGINT: kill -INT %d or Ctrl+C\n", getpid());
printf(" Send SIGUSR1: kill -USR1 %d\n", getpid());
/* メインループ */
while (1) {
pause(); /* シグナルを待つ */
printf("[Main] Returned from pause()\n");
}
return 0;
}
7.3 リアルタイムシグナル
/*
* realtime_signals.c
* リアルタイムシグナルによるデータ送信
* コンパイル: gcc -o rt_signals realtime_signals.c
*/
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <unistd.h>
#include <sys/wait.h>
#define RT_SIG_DATA (SIGRTMIN + 1)
void rt_handler(int sig, siginfo_t *info, void *ucontext) {
(void)ucontext;
char buf[128];
int len = snprintf(buf, sizeof(buf),
"[RT Handler] sig=%d, value=%d, from PID=%d\n",
sig, info->si_value.sival_int, info->si_pid);
write(STDOUT_FILENO, buf, len);
}
int main(void) {
pid_t pid = fork();
if (pid == 0) {
/* 子プロセス: リアルタイムシグナルの受信 */
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_sigaction = rt_handler;
sa.sa_flags = SA_SIGINFO;
sigemptyset(&sa.sa_mask);
sigaction(RT_SIG_DATA, &sa, NULL);
printf("[Child PID=%d] Waiting for RT signals...\n", getpid());
/* 10個のシグナルを待つ */
for (int i = 0; i < 10; i++) {
pause();
}
printf("[Child] Done.\n");
exit(EXIT_SUCCESS);
} else {
/* 親プロセス: リアルタイムシグナルの送信 */
sleep(1); /* 子プロセスの準備を待つ */
printf("[Parent] Sending 10 RT signals with data...\n");
for (int i = 0; i < 10; i++) {
union sigval val;
val.sival_int = (i + 1) * 100; /* データ値 */
if (sigqueue(pid, RT_SIG_DATA, val) == -1) {
perror("sigqueue");
}
printf("[Parent] Sent value=%d\n", val.sival_int);
}
wait(NULL);
}
printf("\nRT signal properties:\n");
printf(" SIGRTMIN = %d\n", SIGRTMIN);
printf(" SIGRTMAX = %d\n", SIGRTMAX);
printf(" Available RT signals = %d\n", SIGRTMAX - SIGRTMIN + 1);
return 0;
}
7.4 シグナルマスクとシグナルセット
/*
* シグナルマスク操作の例
*/
#include <signal.h>
void signal_mask_examples(void) {
sigset_t set, oldset;
/* シグナルセットの操作 */
sigemptyset(&set); /* 空集合 */
sigfillset(&set); /* 全シグナルを含む */
sigaddset(&set, SIGINT); /* SIGINT を追加 */
sigdelset(&set, SIGTERM); /* SIGTERM を削除 */
sigismember(&set, SIGINT); /* メンバーか確認 */
/* シグナルマスクの変更 */
/* SIG_BLOCK: 現在のマスクに追加 */
sigprocmask(SIG_BLOCK, &set, &oldset);
/* SIG_UNBLOCK: マスクから削除 */
sigprocmask(SIG_UNBLOCK, &set, NULL);
/* SIG_SETMASK: マスクを完全に置き換え */
sigprocmask(SIG_SETMASK, &oldset, NULL);
/* ペンディングシグナルの確認 */
sigset_t pending;
sigpending(&pending);
if (sigismember(&pending, SIGINT)) {
printf("SIGINT is pending\n");
}
/* スレッドでのシグナルマスク */
/* pthread_sigmask(SIG_BLOCK, &set, &oldset); */
}
8. eventfd, timerfd, signalfd
8.1 eventfd
eventfd は、プロセス間またはスレッド間のイベント通知に特化したファイルディスクリプタベースのメカニズムである。内部的には 64 ビットのカウンタを保持する。
/*
* eventfd_demo.c
* eventfd によるイベント通知
* コンパイル: gcc -o eventfd_demo eventfd_demo.c -pthread
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/eventfd.h>
#include <pthread.h>
#include <stdint.h>
#include <poll.h>
int efd;
void *producer(void *arg) {
int id = *(int *)arg;
for (int i = 0; i < 5; i++) {
uint64_t val = 1; /* カウンタをインクリメント */
write(efd, &val, sizeof(val));
printf("[Producer %d] Posted event (iteration %d)\n", id, i + 1);
usleep(200000);
}
return NULL;
}
void *consumer(void *arg) {
(void)arg;
struct pollfd pfd = {
.fd = efd,
.events = POLLIN
};
int total = 0;
while (total < 10) { /* 2 producers x 5 events = 10 */
int ret = poll(&pfd, 1, 5000); /* 5秒タイムアウト */
if (ret > 0) {
uint64_t val;
read(efd, &val, sizeof(val));
total += val;
printf("[Consumer] Received %lu event(s), total=%d\n",
(unsigned long)val, total);
} else if (ret == 0) {
printf("[Consumer] Timeout\n");
break;
}
}
return NULL;
}
int main(void) {
/* eventfd の作成 */
/* EFD_SEMAPHORE: read() で常に 1 を返す (セマフォモード) */
efd = eventfd(0, EFD_NONBLOCK);
if (efd == -1) {
perror("eventfd");
exit(EXIT_FAILURE);
}
printf("eventfd created (fd=%d)\n", efd);
pthread_t prod1, prod2, cons;
int id1 = 1, id2 = 2;
pthread_create(&cons, NULL, consumer, NULL);
pthread_create(&prod1, NULL, producer, &id1);
pthread_create(&prod2, NULL, producer, &id2);
pthread_join(prod1, NULL);
pthread_join(prod2, NULL);
pthread_join(cons, NULL);
close(efd);
return 0;
}
8.2 timerfd
/*
* timerfd_demo.c
* timerfd による定期タイマー
* コンパイル: gcc -o timerfd_demo timerfd_demo.c
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/timerfd.h>
#include <stdint.h>
#include <time.h>
#include <poll.h>
int main(void) {
/* timerfd の作成 */
int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
if (tfd == -1) {
perror("timerfd_create");
exit(EXIT_FAILURE);
}
/* タイマーの設定: 1秒後に開始、500ms 間隔 */
struct itimerspec its = {
.it_value = { /* 初回発火時間 */
.tv_sec = 1,
.tv_nsec = 0
},
.it_interval = { /* 繰り返し間隔 */
.tv_sec = 0,
.tv_nsec = 500000000 /* 500ms */
}
};
if (timerfd_settime(tfd, 0, &its, NULL) == -1) {
perror("timerfd_settime");
exit(EXIT_FAILURE);
}
printf("Timer started (1s initial, 500ms interval)\n");
struct pollfd pfd = { .fd = tfd, .events = POLLIN };
uint64_t expirations;
int count = 0;
while (count < 10) {
int ret = poll(&pfd, 1, 3000);
if (ret > 0) {
/* read() で経過回数を取得 */
read(tfd, &expirations, sizeof(expirations));
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
printf("[%ld.%03ld] Timer expired (%lu times)\n",
now.tv_sec, now.tv_nsec / 1000000,
(unsigned long)expirations);
count++;
}
}
/* タイマーの停止 */
struct itimerspec zero = {{0, 0}, {0, 0}};
timerfd_settime(tfd, 0, &zero, NULL);
close(tfd);
return 0;
}
8.3 signalfd
/*
* signalfd_demo.c
* signalfd による同期的なシグナル処理
* コンパイル: gcc -o signalfd_demo signalfd_demo.c
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/signalfd.h>
#include <signal.h>
#include <poll.h>
int main(void) {
sigset_t mask;
/* 対象シグナルをブロック */
sigemptyset(&mask);
sigaddset(&mask, SIGINT);
sigaddset(&mask, SIGTERM);
sigaddset(&mask, SIGUSR1);
sigprocmask(SIG_BLOCK, &mask, NULL);
/* signalfd の作成 */
int sfd = signalfd(-1, &mask, SFD_NONBLOCK);
if (sfd == -1) {
perror("signalfd");
exit(EXIT_FAILURE);
}
printf("PID=%d, waiting for signals via signalfd...\n", getpid());
printf(" kill -INT %d (SIGINT)\n", getpid());
printf(" kill -USR1 %d (SIGUSR1)\n", getpid());
printf(" kill -TERM %d (SIGTERM to exit)\n", getpid());
struct pollfd pfd = { .fd = sfd, .events = POLLIN };
while (1) {
int ret = poll(&pfd, 1, -1);
if (ret > 0) {
struct signalfd_siginfo si;
ssize_t n = read(sfd, &si, sizeof(si));
if (n != sizeof(si)) {
perror("read signalfd");
break;
}
printf("Received signal %d (from PID=%d, UID=%d)\n",
si.ssi_signo, si.ssi_pid, si.ssi_uid);
if (si.ssi_signo == SIGTERM) {
printf("SIGTERM received, exiting.\n");
break;
}
}
}
close(sfd);
return 0;
}
8.4 epoll との統合
/*
* unified_event_loop.c
* eventfd + timerfd + signalfd を epoll で統合監視
* コンパイル: gcc -o unified_events unified_event_loop.c
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/timerfd.h>
#include <sys/signalfd.h>
#include <signal.h>
#include <stdint.h>
#define MAX_EVENTS 10
enum fd_type { FD_EVENT, FD_TIMER, FD_SIGNAL };
struct fd_context {
int fd;
enum fd_type type;
};
int main(void) {
int epfd, efd, tfd, sfd;
/* epoll インスタンスの作成 */
epfd = epoll_create1(EPOLL_CLOEXEC);
/* eventfd */
efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
struct fd_context ctx_event = { .fd = efd, .type = FD_EVENT };
/* timerfd */
tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
struct itimerspec its = { .it_value = {2, 0}, .it_interval = {2, 0} };
timerfd_settime(tfd, 0, &its, NULL);
struct fd_context ctx_timer = { .fd = tfd, .type = FD_TIMER };
/* signalfd */
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGINT);
sigprocmask(SIG_BLOCK, &mask, NULL);
sfd = signalfd(-1, &mask, SFD_NONBLOCK | SFD_CLOEXEC);
struct fd_context ctx_signal = { .fd = sfd, .type = FD_SIGNAL };
/* epoll に登録 */
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.ptr = &ctx_event;
epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &ev);
ev.data.ptr = &ctx_timer;
epoll_ctl(epfd, EPOLL_CTL_ADD, tfd, &ev);
ev.data.ptr = &ctx_signal;
epoll_ctl(epfd, EPOLL_CTL_ADD, sfd, &ev);
printf("Unified event loop started (PID=%d)\n", getpid());
printf(" Timer fires every 2 seconds\n");
printf(" Ctrl+C to send SIGINT\n\n");
/* 自分自身にイベントを送信 (デモ用) */
uint64_t val = 42;
write(efd, &val, sizeof(val));
/* イベントループ */
int running = 1;
while (running) {
struct epoll_event events[MAX_EVENTS];
int n = epoll_wait(epfd, events, MAX_EVENTS, -1);
for (int i = 0; i < n; i++) {
struct fd_context *ctx = events[i].data.ptr;
switch (ctx->type) {
case FD_EVENT: {
uint64_t val;
read(ctx->fd, &val, sizeof(val));
printf("[eventfd] Value: %lu\n", (unsigned long)val);
break;
}
case FD_TIMER: {
uint64_t exp;
read(ctx->fd, &exp, sizeof(exp));
printf("[timerfd] Expired %lu time(s)\n",
(unsigned long)exp);
break;
}
case FD_SIGNAL: {
struct signalfd_siginfo si;
read(ctx->fd, &si, sizeof(si));
printf("[signalfd] Signal %d received\n", si.ssi_signo);
if (si.ssi_signo == SIGINT) {
printf("Exiting...\n");
running = 0;
}
break;
}
}
}
}
close(sfd);
close(tfd);
close(efd);
close(epfd);
return 0;
}
9. メモリマップドファイル (mmap による IPC)
9.1 mmap の基本
#include <sys/mman.h>
void *mmap(void *addr, /* マッピング先アドレス (通常 NULL) */
size_t length, /* マッピングサイズ */
int prot, /* 保護フラグ */
int flags, /* マッピングフラグ */
int fd, /* ファイルディスクリプタ */
off_t offset); /* ファイルオフセット */
int munmap(void *addr, size_t length);
int msync(void *addr, size_t length, int flags);
9.2 IPC 用の mmap パターン
パターン1: ファイルバック共有マッピング
┌──────────┐ MAP_SHARED ┌──────────┐
│ プロセスA │ ───────────────> │ ファイル │ <─────────────── │ プロセスB │
│ mmap() │ │ (ディスク) │ │ mmap() │
└──────────┘ └──────────┘ └──────────┘
パターン2: 匿名共有マッピング (fork 前)
┌──────────┐ MAP_SHARED ┌──────────┐
│ 親プロセス│ MAP_ANONYMOUS │ 物理メモリ │
│ mmap() │ ───────────────> │ (匿名) │ <── fork ── │ 子プロセス │
└──────────┘ └──────────┘ └──────────┘
パターン3: memfd_create + mmap
┌──────────┐ memfd_create ┌──────────┐ fd 送信 ┌──────────┐
│ プロセスA │ ───────────────── │ メモリFD │ ────────────> │ プロセスB │
│ mmap() │ │ (匿名) │ (SCM_RIGHTS) │ mmap() │
└──────────┘ └──────────┘ └──────────┘
9.3 実践例: mmap によるプロセス間共有メモリ
/*
* mmap_ipc.c
* mmap を使ったプロセス間通信
* コンパイル: gcc -o mmap_ipc mmap_ipc.c -lrt
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <stdatomic.h>
/* 共有データ構造 */
struct shared_state {
atomic_int counter;
atomic_int ready;
char message[256];
};
int main(void) {
const size_t size = sizeof(struct shared_state);
/* 匿名共有マッピング */
struct shared_state *state = mmap(NULL, size,
PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_ANONYMOUS,
-1, 0);
if (state == MAP_FAILED) {
perror("mmap");
exit(EXIT_FAILURE);
}
/* 初期化 */
atomic_init(&state->counter, 0);
atomic_init(&state->ready, 0);
memset(state->message, 0, sizeof(state->message));
pid_t pid = fork();
if (pid == 0) {
/* 子プロセス: データを書き込み */
for (int i = 0; i < 100; i++) {
atomic_fetch_add(&state->counter, 1);
}
snprintf(state->message, sizeof(state->message),
"Child (PID=%d) incremented counter 100 times", getpid());
atomic_store(&state->ready, 1);
exit(EXIT_SUCCESS);
}
/* 親プロセス: 子の完了を待って読み取り */
while (!atomic_load(&state->ready)) {
usleep(1000);
}
printf("Counter: %d\n", atomic_load(&state->counter));
printf("Message: %s\n", state->message);
wait(NULL);
munmap(state, size);
return 0;
}
9.4 memfd_create (Linux 3.17+)
/*
* memfd_create_demo.c
* memfd_create() による無名メモリファイル
* コンパイル: gcc -o memfd_demo memfd_create_demo.c
*/
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/mman.h>
#include <linux/memfd.h>
#include <sys/syscall.h>
int main(void) {
/* memfd の作成 */
int fd = syscall(SYS_memfd_create, "ipc_buffer",
MFD_CLOEXEC | MFD_ALLOW_SEALING);
if (fd == -1) {
perror("memfd_create");
exit(EXIT_FAILURE);
}
/* サイズの設定 */
size_t size = 4096;
ftruncate(fd, size);
/* シーリング (変更不可に設定) */
/* F_SEAL_SHRINK: 縮小不可 */
/* F_SEAL_GROW: 拡大不可 */
/* F_SEAL_WRITE: 書き込み不可 */
/* F_SEAL_SEAL: 以後のシーリング変更不可 */
/* マッピング */
char *mem = mmap(NULL, size, PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
if (mem == MAP_FAILED) {
perror("mmap");
exit(EXIT_FAILURE);
}
/* データ書き込み */
strcpy(mem, "Hello from memfd!");
printf("Written: %s\n", mem);
printf("memfd: fd=%d, /proc/self/fd/%d\n", fd, fd);
/* この fd を Unix ドメインソケット経由で別プロセスに送信可能 */
munmap(mem, size);
close(fd);
return 0;
}
9.5 madvise と mlock
/* パフォーマンスチューニング */
#include <sys/mman.h>
void mmap_tuning(void *addr, size_t length) {
/* ページフォルトの削減 */
madvise(addr, length, MADV_SEQUENTIAL); /* 順次アクセス */
madvise(addr, length, MADV_RANDOM); /* ランダムアクセス */
madvise(addr, length, MADV_WILLNEED); /* すぐに必要 */
madvise(addr, length, MADV_DONTNEED); /* もう不要 */
/* メモリをロック (スワップアウト防止) */
mlock(addr, length); /* 指定範囲 */
mlockall(MCL_CURRENT | MCL_FUTURE); /* 全メモリ */
/* Huge Pages の使用 */
madvise(addr, length, MADV_HUGEPAGE);
}
10. D-Bus と kdbus の歴史
10.1 D-Bus の概要
D-Bus (Desktop Bus) は、Linux デスクトップ環境で広く使用されるメッセージバスシステムである。freedesktop.org プロジェクトの一部として開発された。
D-Bus アーキテクチャ:
┌────────────┐ ┌────────────┐ ┌────────────┐
│ アプリA │ │ アプリB │ │ アプリC │
│ │ │ │ │ │
└─────┬──────┘ └─────┬──────┘ └─────┬──────┘
│ │ │
└──────────────────┼──────────────────┘
│
┌──────┴──────┐
│ D-Bus │
│ デーモン │
│ (dbus-daemon)│
└─────────────┘
│
┌─────────────┼─────────────┐
│ │ │
┌──────┴──────┐ ┌───┴────┐ ┌─────┴─────┐
│ System Bus │ │ Session│ │ Private │
│ (system) │ │ Bus │ │ Bus │
└─────────────┘ └────────┘ └───────────┘
10.1.1 D-Bus の構成要素
D-Bus の主要概念:
1. バス (Bus): メッセージ転送のための中央ハブ
- System Bus: システム全体で1つ、root 権限のサービス用
- Session Bus: ユーザセッションごとに1つ
2. オブジェクトパス (Object Path): /org/freedesktop/NetworkManager
3. インターフェース (Interface): org.freedesktop.NetworkManager
4. メソッド (Method): RPC 的な呼び出し
5. シグナル (Signal): ブロードキャスト通知
6. プロパティ (Property): 値の取得/設定
10.1.2 D-Bus コマンドラインツール
# システムバスのサービス一覧
dbus-send --system --dest=org.freedesktop.DBus \
--type=method_call --print-reply \
/org/freedesktop/DBus org.freedesktop.DBus.ListNames
# busctl (systemd 付属) による操作
busctl list # サービス一覧
busctl tree org.freedesktop.systemd1 # オブジェクトツリー
busctl introspect org.freedesktop.systemd1 /org/freedesktop/systemd1
# gdbus (GLib 付属) による操作
gdbus introspect --system \
--dest org.freedesktop.NetworkManager \
--object-path /org/freedesktop/NetworkManager
# D-Bus モニタ
dbus-monitor --system
dbus-monitor --session
# systemd のサービスユニットを D-Bus 経由で操作
busctl call org.freedesktop.systemd1 \
/org/freedesktop/systemd1 \
org.freedesktop.systemd1.Manager \
ListUnits
# ホスト名の取得
busctl get-property org.freedesktop.hostname1 \
/org/freedesktop/hostname1 \
org.freedesktop.hostname1 Hostname
10.2 kdbus の歴史と教訓
kdbus の年表:
2013年 - Greg Kroah-Hartman が kdbus を提案
目的: D-Bus をカーネル内に実装して高速化
2014年 - Linux 3.17 向けにパッチ提出
特徴:
- メッセージコピーの削減
- ポリシー実施をカーネルで実行
- ブルームフィルタによるシグナルマッチング
- メッセージの信頼性向上
2015年 - 激しい議論の末、Linus Torvalds が却下
理由:
- 複雑すぎるカーネルインターフェース
- セキュリティ懸念
- メンテナンスの負荷
- ユーザ空間で解決できる問題
2016年 - Bus1 プロジェクトとして再提案 (これも却下)
2017年 - dbus-broker が開発される (ユーザ空間での高速実装)
kdbus の一部アイデアを採用
教訓:
- カーネルへの機能追加は最小限であるべき
- ユーザ空間で解決できる問題はユーザ空間で解決する
- パフォーマンスだけが判断基準ではない
10.3 dbus-broker
# dbus-broker の状態確認 (systemd ベースのシステム)
systemctl status dbus-broker.service
# dbus-daemon との比較
# dbus-daemon: 従来の参照実装 (C言語)
# dbus-broker: 高性能な代替実装 (C言語、kdbus のアイデアを一部採用)
#
# パフォーマンス:
# - dbus-broker は dbus-daemon の約 3-10 倍高速
# - メモリ使用量も少ない
# - Fedora, Arch Linux 等がデフォルトで採用
11. Futex (高速ユーザ空間ミューテックス)
11.1 Futex の基本概念
Futex (Fast Userspace Mutex) は Linux 2.6 で導入されたカーネルメカニズムで、ユーザ空間の同期プリミティブの基盤となる。競合がない場合はカーネルを呼び出さずにユーザ空間のみで動作し、競合が発生した場合のみシステムコールが行われる。
Futex の動作原理:
非競合時 (Fast Path)
アトミック操作のみ ───────────────────────────> ロック獲得
(syscall なし)
競合時 (Slow Path)
アトミック操作失敗 ──> futex(FUTEX_WAIT) ──> カーネルで待機
│
ロック解放 ──> futex(FUTEX_WAKE) ──────────────> 起床
11.2 Futex システムコール
#include <linux/futex.h>
#include <sys/syscall.h>
#include <unistd.h>
long futex(uint32_t *uaddr, int futex_op, uint32_t val,
const struct timespec *timeout, uint32_t *uaddr2,
uint32_t val3);
/*
* 主な操作:
* FUTEX_WAIT: *uaddr == val なら待機
* FUTEX_WAKE: 最大 val 個のウェイターを起床
* FUTEX_FD: futex をファイルディスクリプタに関連付け (非推奨)
* FUTEX_REQUEUE: ウェイターを別の futex に移動
* FUTEX_CMP_REQUEUE: 条件付きリキュー
* FUTEX_WAIT_BITSET: ビットマスク付き待機
* FUTEX_WAKE_BITSET: ビットマスク付き起床
*
* Linux 5.16+:
* FUTEX_WAIT_V: 複数 futex の同時待機 (futex2)
*/
11.3 実践例: Futex ベースのミューテックス
/*
* futex_mutex.c
* Futex を使った軽量ミューテックスの実装
* コンパイル: gcc -o futex_mutex futex_mutex.c -pthread
*/
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <linux/futex.h>
#include <sys/syscall.h>
#include <stdatomic.h>
#include <pthread.h>
#include <errno.h>
/* Futex ラッパー */
static long futex_wait(atomic_int *uaddr, int val) {
return syscall(SYS_futex, uaddr, FUTEX_WAIT, val, NULL, NULL, 0);
}
static long futex_wake(atomic_int *uaddr, int count) {
return syscall(SYS_futex, uaddr, FUTEX_WAKE, count, NULL, NULL, 0);
}
/* 3状態ミューテックス (Ulrich Drepper の設計) */
/* 0: ロック解放, 1: ロック獲得(競合なし), 2: ロック獲得(競合あり) */
struct futex_mutex {
atomic_int state;
};
void futex_mutex_init(struct futex_mutex *m) {
atomic_init(&m->state, 0);
}
void futex_mutex_lock(struct futex_mutex *m) {
int c;
/* Fast path: 0 -> 1 (アトミック操作のみ) */
c = 0;
if (atomic_compare_exchange_strong(&m->state, &c, 1)) {
return; /* ロック獲得成功 */
}
/* Slow path: 競合発生 */
if (c != 2) {
c = atomic_exchange(&m->state, 2);
}
while (c != 0) {
/* カーネルで待機 */
futex_wait(&m->state, 2);
c = atomic_exchange(&m->state, 2);
}
}
void futex_mutex_unlock(struct futex_mutex *m) {
/* 状態を 0 にしてアンロック */
if (atomic_fetch_sub(&m->state, 1) != 1) {
/* 競合があった (state was 2) */
atomic_store(&m->state, 0);
/* 1つのウェイターを起床 */
futex_wake(&m->state, 1);
}
/* state was 1 -> 0: 競合なし、カーネル呼び出し不要 */
}
/* テスト */
#define NUM_THREADS 8
#define ITERATIONS 100000
struct futex_mutex mutex;
long shared_counter = 0;
void *thread_func(void *arg) {
(void)arg;
for (int i = 0; i < ITERATIONS; i++) {
futex_mutex_lock(&mutex);
shared_counter++;
futex_mutex_unlock(&mutex);
}
return NULL;
}
int main(void) {
futex_mutex_init(&mutex);
pthread_t threads[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
pthread_create(&threads[i], NULL, thread_func, NULL);
}
for (int i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i], NULL);
}
printf("Counter: %ld (expected: %d)\n",
shared_counter, NUM_THREADS * ITERATIONS);
return 0;
}
11.4 Futex と pthread の関係
pthread ライブラリの内部:
┌─────────────────────────────────────────────────────┐
│ pthread_mutex_lock() │
│ ├── スピンロック試行 (adaptive mutex の場合) │
│ ├── atomic_compare_exchange (futex fast path) │
│ └── futex(FUTEX_WAIT) (futex slow path) │
│ │
│ pthread_cond_wait() │
│ ├── mutex アンロック │
│ ├── futex(FUTEX_WAIT) で待機 │
│ └── mutex 再ロック │
│ │
│ pthread_rwlock_rdlock() │
│ ├── アトミック操作で読み取りカウンタ増加 │
│ └── futex(FUTEX_WAIT) (書き込みロック競合時) │
│ │
│ sem_wait() (POSIX 無名セマフォ) │
│ ├── atomic_fetch_sub │
│ └── futex(FUTEX_WAIT) (値が 0 の場合) │
└─────────────────────────────────────────────────────┘
11.5 futex2 (Linux 5.16+)
/*
* futex_waitv - 複数の futex を同時に待機
* Linux 5.16+ で利用可能
*/
#include <linux/futex.h>
#include <sys/syscall.h>
struct futex_waitv {
__u64 val; /* 期待値 */
__u64 uaddr; /* futex のアドレス */
__u32 flags; /* フラグ */
__u32 __reserved;
};
/* 複数の futex のいずれかが変化するまで待機 */
long futex_waitv(struct futex_waitv *waiters, unsigned int nr_futexes,
unsigned int flags, struct timespec *timeout,
clockid_t clockid) {
return syscall(SYS_futex_waitv, waiters, nr_futexes,
flags, timeout, clockid);
}
/*
* ユースケース:
* - Wine/Proton: Windows の WaitForMultipleObjects エミュレーション
* - ゲームエンジン: 複数のイベント源の監視
* - 低レイテンシアプリケーション
*/
12. io_uring による IPC
12.1 io_uring の概要
io_uring は Linux 5.1 で導入された高性能非同期 I/O フレームワークである。従来の aio と比較して、システムコールのオーバーヘッドを大幅に削減する。
io_uring のアーキテクチャ:
ユーザ空間 カーネル空間
┌──────────────┐ ┌──────────────┐
│ Submission │ │ │
アプリ ────────>│ Queue (SQ) │─────>│ カーネル │
│ (リングバッファ)│ │ ワーカー │
└──────────────┘ │ スレッド │
│ │
┌──────────────┐ │ │
アプリ <────────│ Completion │<─────│ │
│ Queue (CQ) │ │ │
│ (リングバッファ)│ └──────────────┘
└──────────────┘
特徴:
- SQ/CQ はカーネルとユーザ空間で共有されたメモリ
- 操作の投入と完了の確認にシステムコールが不要 (SQPOLL モード)
- バッチ処理による効率化
12.2 io_uring を IPC に使用する例
/*
* io_uring_ipc.c
* io_uring を使った非同期 Unix ドメインソケット通信
* コンパイル: gcc -o io_uring_ipc io_uring_ipc.c -luring
* 必要: liburing-dev パッケージ
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <liburing.h>
#define SOCKET_PATH "/tmp/iouring_demo.sock"
#define BUF_SIZE 1024
#define QUEUE_DEPTH 32
enum op_type { OP_ACCEPT, OP_READ, OP_WRITE };
struct io_data {
enum op_type type;
int fd;
char buf[BUF_SIZE];
struct iovec iov;
};
int setup_server(void) {
int fd = socket(AF_UNIX, SOCK_STREAM, 0);
struct sockaddr_un addr = { .sun_family = AF_UNIX };
strncpy(addr.sun_path, SOCKET_PATH, sizeof(addr.sun_path) - 1);
unlink(SOCKET_PATH);
bind(fd, (struct sockaddr *)&addr, sizeof(addr));
listen(fd, 5);
return fd;
}
int main(void) {
struct io_uring ring;
int server_fd;
/* io_uring の初期化 */
if (io_uring_queue_init(QUEUE_DEPTH, &ring, 0) < 0) {
perror("io_uring_queue_init");
exit(EXIT_FAILURE);
}
server_fd = setup_server();
printf("io_uring IPC server started on %s\n", SOCKET_PATH);
/* accept を投入 */
struct io_uring_sqe *sqe;
struct io_data *data;
data = calloc(1, sizeof(*data));
data->type = OP_ACCEPT;
data->fd = server_fd;
sqe = io_uring_get_sqe(&ring);
io_uring_prep_accept(sqe, server_fd, NULL, NULL, 0);
io_uring_sqe_set_data(sqe, data);
io_uring_submit(&ring);
/* イベントループ */
while (1) {
struct io_uring_cqe *cqe;
io_uring_wait_cqe(&ring, &cqe);
data = io_uring_cqe_get_data(cqe);
int res = cqe->res;
switch (data->type) {
case OP_ACCEPT: {
if (res >= 0) {
printf("Client connected (fd=%d)\n", res);
/* 新しい接続からの読み取りを投入 */
struct io_data *read_data = calloc(1, sizeof(*read_data));
read_data->type = OP_READ;
read_data->fd = res;
read_data->iov.iov_base = read_data->buf;
read_data->iov.iov_len = BUF_SIZE;
sqe = io_uring_get_sqe(&ring);
io_uring_prep_readv(sqe, res, &read_data->iov, 1, 0);
io_uring_sqe_set_data(sqe, read_data);
/* 次の accept も投入 */
struct io_data *accept_data = calloc(1, sizeof(*accept_data));
accept_data->type = OP_ACCEPT;
accept_data->fd = server_fd;
sqe = io_uring_get_sqe(&ring);
io_uring_prep_accept(sqe, server_fd, NULL, NULL, 0);
io_uring_sqe_set_data(sqe, accept_data);
io_uring_submit(&ring);
}
break;
}
case OP_READ: {
if (res > 0) {
data->buf[res] = '\0';
printf("Received: %s\n", data->buf);
/* エコーレスポンスを投入 */
struct io_data *write_data = calloc(1, sizeof(*write_data));
write_data->type = OP_WRITE;
write_data->fd = data->fd;
snprintf(write_data->buf, BUF_SIZE, "Echo: %s", data->buf);
write_data->iov.iov_base = write_data->buf;
write_data->iov.iov_len = strlen(write_data->buf);
sqe = io_uring_get_sqe(&ring);
io_uring_prep_writev(sqe, data->fd,
&write_data->iov, 1, 0);
io_uring_sqe_set_data(sqe, write_data);
io_uring_submit(&ring);
} else {
printf("Client disconnected (fd=%d)\n", data->fd);
close(data->fd);
}
free(data);
break;
}
case OP_WRITE: {
if (res > 0) {
/* 次の読み取りを投入 */
struct io_data *read_data = calloc(1, sizeof(*read_data));
read_data->type = OP_READ;
read_data->fd = data->fd;
read_data->iov.iov_base = read_data->buf;
read_data->iov.iov_len = BUF_SIZE;
sqe = io_uring_get_sqe(&ring);
io_uring_prep_readv(sqe, data->fd,
&read_data->iov, 1, 0);
io_uring_sqe_set_data(sqe, read_data);
io_uring_submit(&ring);
}
free(data);
break;
}
}
io_uring_cqe_seen(&ring, cqe);
}
io_uring_queue_exit(&ring);
close(server_fd);
unlink(SOCKET_PATH);
return 0;
}
12.3 io_uring の IPC 関連機能
io_uring でサポートされる IPC 関連操作:
- IORING_OP_ACCEPT: ソケット接続受け入れ
- IORING_OP_CONNECT: ソケット接続
- IORING_OP_RECV: 受信
- IORING_OP_SEND: 送信
- IORING_OP_RECVMSG: メッセージ受信
- IORING_OP_SENDMSG: メッセージ送信
- IORING_OP_READ: パイプ/ソケット読み取り
- IORING_OP_WRITE: パイプ/ソケット書き込み
- IORING_OP_SPLICE: ゼロコピー転送
- IORING_OP_MSG_RING: io_uring 間メッセージング (Linux 5.18+)
- IORING_OP_SEND_ZC: ゼロコピー送信 (Linux 6.0+)
io_uring の IPC 利点:
1. バッチ投入によるシステムコール削減
2. SQPOLL モードでの完全なユーザ空間操作
3. 固定バッファ (registered buffers) によるメモリコピー削減
4. リンクされた操作 (chained operations) による依存関係表現
13. Binder (Android IPC)
13.1 Binder の概要
Binder は Android オペレーティングシステムで使用される IPC メカニズムであり、OpenBinder プロジェクトをベースにしている。Linux メインラインカーネル 3.19 以降で drivers/android/binder.c として統合されている。
Binder アーキテクチャ:
┌──────────────┐ ┌──────────────┐
│ Client │ │ Server │
│ Process │ │ Process │
│ │ │ │
│ ┌──────────┐ │ │ ┌──────────┐ │
│ │ Proxy │ │ │ │ Stub │ │
│ │ (BpXxx) │ │ │ │ (BnXxx) │ │
│ └────┬─────┘ │ │ └────┬─────┘ │
│ │ │ │ │ │
│ ┌────┴─────┐ │ │ ┌────┴─────┐ │
│ │ IPCThread│ │ │ │ IPCThread│ │
│ └────┬─────┘ │ │ └────┬─────┘ │
└──────┼───────┘ └──────┼───────┘
│ │
│ ioctl(BINDER_WRITE_READ) │
│ │
┌────┴─────────────────────────────┴────┐
│ /dev/binder │
│ Binder Driver (カーネル) │
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ トランザク │ │ メモリ │ │
│ │ ション管理 │ │ マッピング │ │
│ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────┘
13.2 Binder の主な特徴
Binder の特徴:
1. オブジェクト指向 RPC
- リモートオブジェクトの参照を直接渡せる
- 参照カウントによる自動ライフサイクル管理
2. セキュリティ
- カーネルが呼び出し元の UID/PID を保証
- SELinux との統合
- パーミッションチェック
3. 効率性
- 1回のデータコピーのみ (mmap 使用)
- 従来のソケット IPC は2回コピー (送信→カーネル→受信)
4. 死亡通知 (Death Notification)
- サーバプロセスが終了した場合の通知メカニズム
- 参照先オブジェクトの消失を検出可能
5. servicemanager
- Binder サービスの名前解決
- Android のサービスレジストリ
13.3 Binder の設定
# Binder デバイスの確認
ls -la /dev/binder*
# crw-rw-rw- 1 root root 10, 55 Apr 10 09:00 /dev/binder
# Binder 情報の確認 (Android / Linux with binder)
cat /sys/kernel/debug/binder/state
cat /sys/kernel/debug/binder/stats
cat /sys/kernel/debug/binder/transactions
cat /sys/kernel/debug/binder/transaction_log
# binderfs (Linux 5.0+)
mount -t binder binder /dev/binderfs
# Binder のカーネル設定
# CONFIG_ANDROID_BINDER_IPC=y
# CONFIG_ANDROID_BINDERFS=y
# CONFIG_ANDROID_BINDER_DEVICES="binder,hwbinder,vndbinder"
13.4 Binder のトランザクション
/*
* binder_transaction の概念的なフロー
* (実際のコードは Android Framework を使用)
*/
/*
* Binder トランザクションの流れ:
* 1. クライアントが Parcel にデータをマーシャリング
* 2. ioctl(BINDER_WRITE_READ) でカーネルドライバに送信
* 3. カーネルが宛先プロセスの mmap 領域にデータをコピー
* 4. サーバのスレッドが起床してトランザクションを処理
* 5. 結果を逆方向でクライアントに返す
*
* 1回コピーの仕組み:
* - サーバプロセスは binder ドライバの mmap でバッファを確保
* - カーネルがクライアントのデータを直接サーバの mmap 領域にコピー
* - サーバはコピーなしでデータにアクセス可能
*/
/* Android NDK での Service Manager 使用例 (概念的) */
/*
#include <binder/IServiceManager.h>
#include <binder/IPCThreadState.h>
sp<IServiceManager> sm = defaultServiceManager();
sp<IBinder> binder = sm->getService(String16("my.service"));
sp<IMyService> service = interface_cast<IMyService>(binder);
service->myMethod(arg1, arg2);
*/
14. Netlink ソケット
14.1 概要
Netlink はカーネルとユーザ空間プロセスの間の通信に使用されるソケットベースの IPC メカニズムである。主にネットワーク構成の管理に使用される。
Netlink の構造:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ User Process │ │ User Process │ │ User Process │
│ (iproute2) │ │ (NetworkMgr) │ │ (custom app) │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
│ AF_NETLINK ソケット │ │
│ │ │
┌────┴────────────────────┴────────────────────┴────┐
│ カーネル │
│ ┌─────────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ NETLINK_ROUTE│ │ NETLINK_ │ │ NETLINK_ │ │
│ │ (ルーティング)│ │ FIREWALL │ │ GENERIC │ │
│ └─────────────┘ └──────────┘ └──────────────┘ │
└───────────────────────────────────────────────────┘
14.2 Netlink プロトコルファミリ
主要な Netlink プロトコル:
NETLINK_ROUTE ルーティング、リンク、アドレス管理
NETLINK_FIREWALL ファイアウォール (非推奨)
NETLINK_SOCK_DIAG ソケット診断
NETLINK_NFLOG Netfilter ロギング
NETLINK_XFRM IPsec SA/Policy
NETLINK_SELINUX SELinux イベント
NETLINK_AUDIT 監査
NETLINK_CONNECTOR コネクタ (プロセスイベント等)
NETLINK_GENERIC 汎用 Netlink (拡張可能)
NETLINK_KOBJECT_UEVENT udev イベント
14.3 実践例: ネットワークインターフェース監視
/*
* netlink_monitor.c
* Netlink によるネットワークインターフェース変更の監視
* コンパイル: gcc -o netlink_mon netlink_monitor.c
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <linux/netlink.h>
#include <linux/rtnetlink.h>
#include <net/if.h>
#define BUF_SIZE 8192
void parse_rtattr(struct rtattr *tb[], int max,
struct rtattr *rta, int len) {
memset(tb, 0, sizeof(struct rtattr *) * (max + 1));
while (RTA_OK(rta, len)) {
if (rta->rta_type <= max) {
tb[rta->rta_type] = rta;
}
rta = RTA_NEXT(rta, len);
}
}
void handle_link_message(struct nlmsghdr *nlh) {
struct ifinfomsg *ifi = NLMSG_DATA(nlh);
struct rtattr *tb[IFLA_MAX + 1];
int len = nlh->nlmsg_len - NLMSG_LENGTH(sizeof(*ifi));
parse_rtattr(tb, IFLA_MAX, IFLA_RTA(ifi), len);
char *if_name = tb[IFLA_IFNAME] ?
(char *)RTA_DATA(tb[IFLA_IFNAME]) : "unknown";
const char *action;
switch (nlh->nlmsg_type) {
case RTM_NEWLINK: action = "NEW"; break;
case RTM_DELLINK: action = "DEL"; break;
default: action = "???"; break;
}
printf("[LINK %s] %s (index=%d, flags=0x%x",
action, if_name, ifi->ifi_index, ifi->ifi_flags);
if (ifi->ifi_flags & IFF_UP) printf(" UP");
if (ifi->ifi_flags & IFF_RUNNING) printf(" RUNNING");
if (ifi->ifi_flags & IFF_LOOPBACK) printf(" LOOPBACK");
printf(")\n");
}
void handle_addr_message(struct nlmsghdr *nlh) {
struct ifaddrmsg *ifa = NLMSG_DATA(nlh);
struct rtattr *tb[IFA_MAX + 1];
int len = nlh->nlmsg_len - NLMSG_LENGTH(sizeof(*ifa));
parse_rtattr(tb, IFA_MAX, IFA_RTA(ifa), len);
char if_name[IF_NAMESIZE];
if_indextoname(ifa->ifa_index, if_name);
const char *action = (nlh->nlmsg_type == RTM_NEWADDR) ? "ADD" : "DEL";
if (tb[IFA_ADDRESS]) {
char addr_str[64];
inet_ntop(ifa->ifa_family, RTA_DATA(tb[IFA_ADDRESS]),
addr_str, sizeof(addr_str));
printf("[ADDR %s] %s/%d on %s\n",
action, addr_str, ifa->ifa_prefixlen, if_name);
}
}
int main(void) {
int sock;
struct sockaddr_nl addr;
/* Netlink ソケットの作成 */
sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE);
if (sock == -1) {
perror("socket");
exit(EXIT_FAILURE);
}
/* マルチキャストグループにバインド */
memset(&addr, 0, sizeof(addr));
addr.nl_family = AF_NETLINK;
addr.nl_groups = RTMGRP_LINK | RTMGRP_IPV4_IFADDR | RTMGRP_IPV6_IFADDR;
if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
perror("bind");
close(sock);
exit(EXIT_FAILURE);
}
printf("Monitoring network changes (Ctrl+C to stop)...\n");
/* イベントループ */
char buf[BUF_SIZE];
while (1) {
ssize_t len = recv(sock, buf, sizeof(buf), 0);
if (len <= 0) break;
struct nlmsghdr *nlh = (struct nlmsghdr *)buf;
while (NLMSG_OK(nlh, len)) {
switch (nlh->nlmsg_type) {
case RTM_NEWLINK:
case RTM_DELLINK:
handle_link_message(nlh);
break;
case RTM_NEWADDR:
case RTM_DELADDR:
handle_addr_message(nlh);
break;
case NLMSG_ERROR: {
struct nlmsgerr *err = NLMSG_DATA(nlh);
fprintf(stderr, "Netlink error: %d\n", err->error);
break;
}
}
nlh = NLMSG_NEXT(nlh, len);
}
}
close(sock);
return 0;
}
14.4 Generic Netlink
# Generic Netlink ファミリの確認
genl-ctrl-list # iproute2 のツール
# 例: TaskStats (プロセス統計)
# NETLINK_GENERIC + TASKSTATS ファミリ
# ip コマンドの内部は Netlink を使用
strace ip addr show 2>&1 | grep netlink
# socket(AF_NETLINK, SOCK_RAW|SOCK_CLOEXEC, NETLINK_ROUTE) = 3
15. proc ファイルシステムと IPC 情報
15.1 /proc/sysvipc/
# System V メッセージキューの情報
cat /proc/sysvipc/msg
# key msqid perms cbytes qnum lspid lrpid uid gid cuid cgid stime rtime ctime
# System V セマフォの情報
cat /proc/sysvipc/sem
# key semid perms nsems uid gid cuid cgid otime ctime
# System V 共有メモリの情報
cat /proc/sysvipc/shm
# key shmid perms size cpid lpid nattch uid gid cuid cgid atime dtime ctime rss swap
# 各フィールドの説明:
# key: IPC キー
# xxid: IPC 識別子
# perms: パーミッション
# cpid: 作成プロセス PID
# lpid: 最後に操作したプロセス PID
# nattch: アタッチ数 (共有メモリ)
15.2 プロセス固有の IPC 情報
# プロセスのファイルディスクリプタ
ls -la /proc/<PID>/fd/
# パイプの確認
ls -la /proc/<PID>/fd/ | grep pipe
# l-wx------ 1 user user 64 Apr 10 09:00 1 -> 'pipe:[123456]'
# ソケットの確認
ls -la /proc/<PID>/fd/ | grep socket
# lrwx------ 1 user user 64 Apr 10 09:00 3 -> 'socket:[789012]'
# fdinfo から詳細情報
cat /proc/<PID>/fdinfo/3
# pos: 0
# flags: 02
# mnt_id: 12
# メモリマッピングの確認
cat /proc/<PID>/maps | grep -E 'shm|SYSV'
# プロセスのネットワークソケット
cat /proc/<PID>/net/unix # Unix ドメインソケット
cat /proc/<PID>/net/netlink # Netlink ソケット
# IPC 名前空間の確認
ls -la /proc/<PID>/ns/ipc
readlink /proc/<PID>/ns/ipc
15.3 /proc/sys/kernel/ の IPC パラメータ
# 全ての IPC 関連カーネルパラメータの一覧
# --- メッセージキュー ---
sysctl kernel.msgmax # 1メッセージの最大サイズ
sysctl kernel.msgmnb # 1キューの最大バイト数
sysctl kernel.msgmni # 最大キュー数
# --- セマフォ ---
sysctl kernel.sem
# SEMMSL SEMMNS SEMOPM SEMMNI
# --- 共有メモリ ---
sysctl kernel.shmmax # 1セグメントの最大サイズ
sysctl kernel.shmall # 総共有メモリページ数
sysctl kernel.shmmni # 最大セグメント数
# --- POSIX メッセージキュー ---
sysctl fs.mqueue.msg_max
sysctl fs.mqueue.msgsize_max
sysctl fs.mqueue.queues_max
# --- パイプ ---
sysctl fs.pipe-max-size
sysctl fs.pipe-user-pages-hard
sysctl fs.pipe-user-pages-soft
# --- 全設定の表示 ---
sysctl -a 2>/dev/null | grep -E '(kernel\.(msg|sem|shm)|fs\.(mqueue|pipe))'
16. ipcs / ipcrm ツール
16.1 ipcs - IPC リソースの表示
# 全ての IPC リソースを表示
ipcs -a
# メッセージキューのみ
ipcs -q
# セマフォのみ
ipcs -s
# 共有メモリのみ
ipcs -m
# 詳細表示
ipcs -a -p # PID 情報付き (作成者 PID、最終操作 PID)
ipcs -a -t # タイムスタンプ付き
ipcs -a -c # 作成者情報付き
ipcs -a -l # 制限値の表示
# 制限値の表示例
ipcs -l
# ------ Messages Limits --------
# max queues system wide = 32000
# max size of message (bytes) = 8192
# default max size of queue (bytes) = 16384
#
# ------ Shared Memory Limits --------
# max number of segments = 4096
# max seg size (kbytes) = 18014398509465599
# max total shared memory (kbytes) = 18014398509465599
# min seg size (bytes) = 1
#
# ------ Semaphore Limits --------
# max number of arrays = 128
# max semaphores per array = 250
# max semaphores system wide = 32000
# max ops per semop call = 32
# semaphore max value = 32767
# 特定ユーザの IPC リソース
ipcs -a | grep username
16.2 ipcrm - IPC リソースの削除
# メッセージキューの削除
ipcrm -q <msqid> # ID で削除
ipcrm -Q <key> # キーで削除
# セマフォの削除
ipcrm -s <semid>
ipcrm -S <key>
# 共有メモリの削除
ipcrm -m <shmid>
ipcrm -M <key>
# 全てのリソースを削除 (注意!)
ipcrm -a # 現在のユーザの全 IPC リソースを削除
# スクリプトでの一括削除例
# 特定ユーザの全共有メモリを削除
ipcs -m | grep username | awk '{print $2}' | xargs -I{} ipcrm -m {}
# 孤立した IPC リソースのクリーンアップ
# (nattch=0 の共有メモリを削除)
ipcs -m | awk '$6==0 && NR>3 {print $2}' | xargs -I{} ipcrm -m {}
16.3 lsipc コマンド (util-linux)
# より詳細な IPC 情報 (util-linux 2.27+)
lsipc
# 共有メモリの詳細
lsipc -m
lsipc --shmems
# セマフォの詳細
lsipc -s
lsipc --semaphores
# メッセージキューの詳細
lsipc -q
lsipc --queues
# JSON 出力
lsipc -J
# 特定の IPC リソース
lsipc -m -i <shmid>
# グローバル情報
lsipc -g
17. IPC メカニズムのパフォーマンス比較
17.1 ベンチマーク概要
以下は典型的な Linux システム (x86_64, Linux 5.15, 4コア) での各 IPC メカニズムのベンチマーク結果である。
17.1.1 レイテンシ比較 (往復時間)
IPC メカニズム 往復レイテンシ (マイクロ秒)
─────────────────────────────────────────────────────
共有メモリ + futex ~0.2 - 0.5 μs
eventfd ~1.0 - 2.0 μs
Unix ドメインソケット ~2.0 - 5.0 μs
パイプ ~3.0 - 6.0 μs
POSIX メッセージキュー ~4.0 - 8.0 μs
System V メッセージキュー ~5.0 - 10.0 μs
TCP ソケット (localhost) ~10.0 - 20.0 μs
シグナル (sigqueue) ~5.0 - 15.0 μs
* 実測値は環境により大幅に異なる
17.1.2 スループット比較
IPC メカニズム スループット (MB/s, 1KB メッセージ)
───────────────────────────────────────────────────────────
共有メモリ (memcpy) ~10,000 - 50,000 MB/s
mmap (共有) ~8,000 - 40,000 MB/s
Unix ドメインソケット ~3,000 - 8,000 MB/s
パイプ ~2,000 - 6,000 MB/s
パイプ (splice, ゼロコピー) ~5,000 - 15,000 MB/s
TCP ソケット (localhost) ~2,000 - 5,000 MB/s
POSIX メッセージキュー ~500 - 2,000 MB/s
System V メッセージキュー ~300 - 1,500 MB/s
17.2 ベンチマークスクリプト
/*
* ipc_benchmark.c
* 各 IPC メカニズムのレイテンシベンチマーク
* コンパイル: gcc -O2 -o ipc_bench ipc_benchmark.c -lrt -pthread
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/eventfd.h>
#include <sys/wait.h>
#include <stdint.h>
#define ITERATIONS 100000
#define MSG_SIZE 64
static double get_time_us(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return ts.tv_sec * 1e6 + ts.tv_nsec / 1e3;
}
/* パイプのベンチマーク */
void bench_pipe(void) {
int p1[2], p2[2];
pipe(p1);
pipe(p2);
pid_t pid = fork();
if (pid == 0) {
close(p1[1]); close(p2[0]);
char buf[MSG_SIZE];
for (int i = 0; i < ITERATIONS; i++) {
read(p1[0], buf, MSG_SIZE);
write(p2[1], buf, MSG_SIZE);
}
exit(0);
}
close(p1[0]); close(p2[1]);
char buf[MSG_SIZE];
memset(buf, 'A', MSG_SIZE);
double start = get_time_us();
for (int i = 0; i < ITERATIONS; i++) {
write(p1[1], buf, MSG_SIZE);
read(p2[0], buf, MSG_SIZE);
}
double elapsed = get_time_us() - start;
printf("Pipe: %.2f μs/roundtrip (%.0f roundtrips/sec)\n",
elapsed / ITERATIONS, ITERATIONS / (elapsed / 1e6));
wait(NULL);
close(p1[1]); close(p2[0]);
}
/* Unix ドメインソケットのベンチマーク */
void bench_unix_socket(void) {
int sv[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, sv);
pid_t pid = fork();
if (pid == 0) {
close(sv[0]);
char buf[MSG_SIZE];
for (int i = 0; i < ITERATIONS; i++) {
recv(sv[1], buf, MSG_SIZE, 0);
send(sv[1], buf, MSG_SIZE, 0);
}
exit(0);
}
close(sv[1]);
char buf[MSG_SIZE];
memset(buf, 'A', MSG_SIZE);
double start = get_time_us();
for (int i = 0; i < ITERATIONS; i++) {
send(sv[0], buf, MSG_SIZE, 0);
recv(sv[0], buf, MSG_SIZE, 0);
}
double elapsed = get_time_us() - start;
printf("Unix Socket: %.2f μs/roundtrip (%.0f roundtrips/sec)\n",
elapsed / ITERATIONS, ITERATIONS / (elapsed / 1e6));
wait(NULL);
close(sv[0]);
}
/* eventfd のベンチマーク */
void bench_eventfd(void) {
int efd1 = eventfd(0, 0);
int efd2 = eventfd(0, 0);
pid_t pid = fork();
if (pid == 0) {
uint64_t val;
for (int i = 0; i < ITERATIONS; i++) {
read(efd1, &val, sizeof(val));
val = 1;
write(efd2, &val, sizeof(val));
}
exit(0);
}
uint64_t val;
double start = get_time_us();
for (int i = 0; i < ITERATIONS; i++) {
val = 1;
write(efd1, &val, sizeof(val));
read(efd2, &val, sizeof(val));
}
double elapsed = get_time_us() - start;
printf("eventfd: %.2f μs/roundtrip (%.0f roundtrips/sec)\n",
elapsed / ITERATIONS, ITERATIONS / (elapsed / 1e6));
wait(NULL);
close(efd1); close(efd2);
}
int main(void) {
printf("IPC Latency Benchmark (%d iterations, %d byte messages)\n",
ITERATIONS, MSG_SIZE);
printf("─────────────────────────────────────────────────────\n");
bench_pipe();
bench_unix_socket();
bench_eventfd();
return 0;
}
17.3 選択ガイドライン
ユースケース別推奨 IPC メカニズム:
─────────────────────────────────────────────────────────
親子プロセス間の単純なデータ転送
→ パイプ (最もシンプル)
無関係なプロセス間のリクエスト-レスポンス
→ Unix ドメインソケット (柔軟で高性能)
大量データの共有 (データベース、キャッシュ)
→ 共有メモリ (mmap / POSIX shm) + futex/セマフォ
イベント通知 (データなし)
→ eventfd (最軽量)
定期的なタイマーイベント
→ timerfd
プロセスの状態監視
→ シグナル / signalfd
カーネルとの通信 (ネットワーク設定等)
→ Netlink ソケット
高頻度・低レイテンシ通信
→ 共有メモリ + futex (または io_uring)
メッセージの優先度付き配信
→ POSIX メッセージキュー
レガシーアプリケーションとの互換性
→ System V IPC
Android アプリケーション
→ Binder
デスクトップサービス間通信
→ D-Bus
18. セキュリティ考慮事項
18.1 IPC とセキュリティ
セキュリティチェックリスト:
1. パーミッション設定
- IPC リソースには適切な所有者とパーミッションを設定
- 0666 (全ユーザアクセス可) は本番環境で避ける
- System V IPC: ipc_perm 構造体で制御
- POSIX IPC: ファイルパーミッションで制御
- Unix ソケット: ファイルパーミッション + SO_PEERCRED
2. 名前空間による隔離
- IPC 名前空間: unshare(CLONE_NEWIPC)
- ネットワーク名前空間: Unix ソケットの隔離
- コンテナ環境では自動的に隔離
3. SELinux / AppArmor
- IPC オブジェクトへのアクセス制御
- Binder トランザクションの制御
4. 共有メモリのセキュリティ
- 機密データの場合: mlock() でスワップ防止
- 使用後: memset_explicit() / explicit_bzero() でゼロ化
- memfd_create + シーリングで改ざん防止
5. リソースリーク防止
- SEM_UNDO フラグの使用
- O_CLOEXEC フラグの使用
- RAII パターンの適用
18.2 IPC リソースの監査
# auditd による IPC 操作の監査
sudo auditctl -a always,exit -F arch=b64 -S shmget -k ipc_shm
sudo auditctl -a always,exit -F arch=b64 -S msgget -k ipc_msg
sudo auditctl -a always,exit -F arch=b64 -S semget -k ipc_sem
# 監査ログの確認
sudo ausearch -k ipc_shm
sudo ausearch -k ipc_msg
# strace による IPC 操作のトレース
strace -e trace=ipc -f -p <PID>
strace -e trace=network -f -p <PID> # ソケットベース IPC
19. トラブルシューティング
19.1 よくある問題と対処法
# 問題1: "No space left on device" (IPC リソース枯渇)
# 原因: IPC リソースの制限値に達した
ipcs -l # 制限値の確認
ipcs -a # 使用中リソースの確認
sudo sysctl -w kernel.msgmni=64000 # 制限値の引き上げ
# 問題2: 孤立した IPC リソース
# プロセスが異常終了して IPC リソースが残っている場合
ipcs -m -p # 共有メモリの PID 確認
ipcs -q -p # メッセージキューの PID 確認
# PID が存在しない場合、リソースを削除
ipcrm -m <shmid>
# 問題3: パイプのデッドロック
# 原因: パイプバッファが満杯で write() がブロック
# 対処: O_NONBLOCK フラグの使用、バッファサイズの拡大
fcntl(fd, F_SETFL, O_NONBLOCK);
# 問題4: Unix ソケットのファイルが残る
# 対処: サーバ起動時に unlink() を呼ぶ
unlink("/tmp/my.sock");
bind(fd, ...);
# 問題5: POSIX メッセージキューの作成に失敗
# 原因: /dev/mqueue がマウントされていない
mount -t mqueue none /dev/mqueue
# 問題6: mmap ENOMEM
# 原因: vm.max_map_count の制限
sysctl vm.max_map_count # デフォルト: 65530
sudo sysctl -w vm.max_map_count=262144
19.2 デバッグツール
# strace による IPC システムコールの追跡
strace -e trace=ipc ./my_program
strace -e trace=%ipc,%network -f ./my_program
# ltrace によるライブラリ呼び出しの追跡
ltrace -e 'mq_*+sem_*+shm_*' ./my_program
# perf によるシステムコールプロファイリング
sudo perf stat -e 'syscalls:sys_enter_futex' -p <PID>
sudo perf trace -e 'futex,sendmsg,recvmsg' -p <PID>
# bpftrace による動的トレース
sudo bpftrace -e '
tracepoint:syscalls:sys_enter_futex {
@futex_ops[args->op & 0xf] = count();
}'
# SystemTap
sudo stap -e '
probe syscall.shmget {
printf("%s(%d) shmget key=%d size=%d\n",
execname(), pid(), key, size);
}'
20. まとめとベストプラクティス
20.1 IPC メカニズム選択のフローチャート
IPC メカニズムの選択
│
┌────────┴────────┐
│ 同一ホスト内か? │
└────────┬────────┘
┌───┴───┐
はい いいえ
│ │
┌────┴────┐ TCP/UDP
│データ量?│ ソケット
└────┬────┘
┌─────────┼─────────┐
小 中 大
│ │ │
┌──────┴───┐ ┌───┴────┐ ┌──┴──────┐
│通知のみ?│ │双方向?│ │共有メモリ│
└──────┬───┘ └───┬────┘ │+ futex │
┌────┴──┐ ┌───┴───┐ └─────────┘
はい いいえ はい いいえ
│ │ │ │
eventfd パイプ UDS FIFO/MQ
signal
20.2 ベストプラクティス
1. リソース管理
- 必ずクリーンアップを実装する (atexit, シグナルハンドラ)
- O_CLOEXEC / SOCK_CLOEXEC フラグを常に使用
- SEM_UNDO フラグでセマフォの自動復元を有効化
2. エラーハンドリング
- 全ての IPC 操作の戻り値をチェック
- EINTR (シグナル割り込み) の再試行
- タイムアウトの設定で無限ブロックを防止
3. パフォーマンス
- 大量データには共有メモリ + 同期プリミティブ
- 小さなメッセージには Unix ドメインソケット
- splice/vmsplice でゼロコピー転送
- io_uring で非同期 I/O
4. セキュリティ
- 最小権限の原則に従ったパーミッション設定
- 名前空間による隔離の活用
- 機密データのスワップ防止 (mlock)
5. ポータビリティ
- POSIX IPC > System V IPC (新規開発)
- Linux 固有機能は #ifdef で分離
- eventfd 等は Linux 専用であることに注意
6. デバッグ容易性
- ログ出力の充実
- strace / perf での追跡を考慮した設計
- /proc ファイルシステムの活用
20.3 カーネルバージョンと IPC 機能の対応
カーネルバージョン 追加された IPC 関連機能
─────────────────────────────────────────────
Linux 2.6.0 futex
Linux 2.6.22 eventfd, signalfd, timerfd
Linux 2.6.25 timerfd_create (新API)
Linux 2.6.27 signalfd4, eventfd2
Linux 2.6.30 inotify_init1 (O_CLOEXEC)
Linux 3.4 O_DIRECT パイプ
Linux 3.17 memfd_create
Linux 3.19 Binder (メインライン統合)
Linux 4.18 io_uring 準備作業
Linux 5.1 io_uring
Linux 5.4 pidfd_send_signal
Linux 5.6 io_uring: splice, sendmsg, recvmsg
Linux 5.16 futex_waitv (futex2)
Linux 5.18 io_uring: MSG_RING
Linux 6.0 io_uring: send_zc
Linux 6.1 io_uring: fixed worker
参考文献
- Michael Kerrisk, "The Linux Programming Interface", No Starch Press, 2010
- Robert Love, "Linux Kernel Development", 3rd Edition, Addison-Wesley, 2010
- W. Richard Stevens, "UNIX Network Programming, Volume 2: Interprocess Communications", Prentice Hall, 1999
- Linux man pages: pipe(2), socket(2), mmap(2), futex(2), io_uring(7)
- Linux Kernel Documentation: https://www.kernel.org/doc/html/latest/
- LWN.net articles on IPC mechanisms
- Ulrich Drepper, "Futexes Are Tricky", 2011
本記事は AI により生成されました。実際のシステムでの動作は環境により異なる場合があります。 記事生成日: 2026-04-10