Python

Python 完全概要ガイド

Python の言語仕様・アーキテクチャ・主要エコシステムを設定例とともに包括的に解説する


1. はじめに — Python とは何か

1.1 Python の歴史と哲学

Python は 1991 年にオランダのプログラマー Guido van Rossum によって公開された汎用プログラミング言語である。名前はイギリスのコメディ番組「Monty Python's Flying Circus」に由来する。

Python の設計哲学は「The Zen of Python」(PEP 20)に凝縮されている:

import this
# 主要な原則:
# Beautiful is better than ugly.(美しいことは醜いことより良い)
# Explicit is better than implicit.(明示的であることは暗黙的であることより良い)
# Simple is better than complex.(単純であることは複雑であることより良い)
# Complex is better than complicated.(複雑であることは込み入っていることより良い)
# Readability counts.(読みやすさは重要である)

1.2 Python のバージョン変遷

バージョンリリース年主な特徴
Python 1.01994初の正式リリース
Python 2.02000リスト内包表記、GC導入
Python 2.72010Python 2系最終版(2020年EOL)
Python 3.02008Unicode標準化、print関数化
Python 3.62016f-string、型ヒント強化
Python 3.82019walrus演算子(:=)
Python 3.102021構造的パターンマッチング
Python 3.112022大幅な速度改善(10-60%高速化)
Python 3.122023Per-interpreter GIL、f-string改善
Python 3.132024実験的フリースレッドモード、JITコンパイラ

1.3 Python の位置づけと用途

Python は以下の分野で広く利用されている:

  • Web 開発: Django, Flask, FastAPI
  • データサイエンス・機械学習: NumPy, Pandas, scikit-learn, TensorFlow, PyTorch
  • 自動化・スクリプティング: システム管理、タスク自動化
  • 科学技術計算: SciPy, Matplotlib
  • DevOps / インフラ: Ansible, SaltStack
  • 組み込み・IoT: MicroPython, CircuitPython

1.4 Python のインストールと環境構築

# macOS(Homebrew)
brew install python@3.12

# Ubuntu/Debian
sudo apt update
sudo apt install python3.12 python3.12-venv python3.12-dev

# pyenv によるバージョン管理(推奨)
curl https://pyenv.run | bash

# ~/.bashrc or ~/.zshrc に追加
export PYENV_ROOT="$HOME/.pyenv"
[[ -d $PYENV_ROOT/bin ]] && export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init -)"

# Python バージョンのインストールと切り替え
pyenv install 3.12.4
pyenv global 3.12.4
pyenv local 3.12.4   # プロジェクト単位で設定

# バージョン確認
python --version
# Python 3.12.4

2. 言語の基本構造

2.1 動的型付けとダックタイピング

Python は動的型付け言語であり、変数の型は実行時に決定される。

# 動的型付け — 同じ変数に異なる型の値を代入可能
x = 42          # int
x = "hello"     # str
x = [1, 2, 3]   # list

# ダックタイピング: "If it walks like a duck and quacks like a duck, it's a duck"
class Duck:
    def quack(self):
        return "Quack!"

class Person:
    def quack(self):
        return "I can quack too!"

def make_it_quack(thing):
    """thing の型は問わない — quack() メソッドがあれば良い"""
    print(thing.quack())

make_it_quack(Duck())    # Quack!
make_it_quack(Person())  # I can quack too!

2.2 基本データ型

# ── 数値型 ──
integer_val = 42                  # int(任意精度整数)
float_val = 3.14                  # float(倍精度浮動小数点数)
complex_val = 3 + 4j              # complex(複素数)
big_int = 10 ** 100               # Python の int はオーバーフローしない

# 数値の便利な記法
million = 1_000_000               # アンダースコアで桁区切り
binary = 0b1010                   # 2進数 → 10
octal = 0o17                      # 8進数 → 15
hexadecimal = 0xFF                # 16進数 → 255

# ── 文字列型 ──
single = 'Hello'
double = "World"
multi_line = """
これは
複数行の
文字列です
"""

# f-string(フォーマット済み文字列リテラル)
name = "Python"
version = 3.12
print(f"{name} {version}")                  # Python 3.12
print(f"{version:.1f}")                      # 3.1
print(f"{'center':^20}")                     # "      center       "
print(f"{1_000_000:,}")                      # 1,000,000
print(f"{0.85:.0%}")                         # 85%

# raw 文字列
path = r"C:\Users\name\documents"           # バックスラッシュをエスケープしない

# ── ブール型 ──
is_active = True
is_deleted = False
# 以下は Falsy(False として評価される)
# None, 0, 0.0, "", [], {}, set(), frozenset()

# ── None 型 ──
result = None
if result is None:     # None の比較は is を使う(== ではなく)
    print("No result")

2.3 コレクション型

# ── リスト(list) — 可変の順序付きコレクション ──
fruits = ["apple", "banana", "cherry"]
fruits.append("date")              # 末尾に追加
fruits.insert(1, "avocado")        # 位置指定で挿入
fruits.extend(["elderberry"])      # 複数要素を追加
removed = fruits.pop(2)            # インデックス指定で取り出し
fruits.sort()                      # インプレースでソート
sorted_fruits = sorted(fruits)     # 新しいリストを返す

# スライス
numbers = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
print(numbers[2:5])     # [2, 3, 4]
print(numbers[::2])     # [0, 2, 4, 6, 8](ステップ2)
print(numbers[::-1])    # [9, 8, 7, ..., 0](逆順)

# リスト内包表記
squares = [x ** 2 for x in range(10)]
evens = [x for x in range(20) if x % 2 == 0]
matrix = [[i * j for j in range(5)] for i in range(5)]

# ── タプル(tuple) — 不変の順序付きコレクション ──
point = (3, 4)
x, y = point                      # アンパック
single_element = (42,)             # 1要素タプルにはカンマが必要

# 名前付きタプル
from collections import namedtuple
Point = namedtuple('Point', ['x', 'y'])
p = Point(3, 4)
print(p.x, p.y)                   # 3 4

# ── 辞書(dict) — キーと値のマッピング ──
config = {
    "host": "localhost",
    "port": 8080,
    "debug": True,
}
config["timeout"] = 30             # 追加/更新
port = config.get("port", 3000)    # デフォルト値付き取得
config.setdefault("retries", 3)    # キーがなければ設定

# 辞書内包表記
squared = {x: x ** 2 for x in range(6)}
# {0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

# 辞書のマージ(Python 3.9+)
defaults = {"color": "blue", "size": "medium"}
overrides = {"size": "large", "weight": "heavy"}
merged = defaults | overrides
# {"color": "blue", "size": "large", "weight": "heavy"}

# ── 集合(set) — 一意な要素のコレクション ──
unique_nums = {1, 2, 3, 4, 5}
unique_nums.add(6)
unique_nums.discard(3)             # 存在しなくてもエラーにならない

a = {1, 2, 3, 4}
b = {3, 4, 5, 6}
print(a | b)    # 和集合: {1, 2, 3, 4, 5, 6}
print(a & b)    # 積集合: {3, 4}
print(a - b)    # 差集合: {1, 2}
print(a ^ b)    # 対称差: {1, 2, 5, 6}

# ── frozenset — 不変の集合(辞書のキーに使える) ──
immutable_set = frozenset([1, 2, 3])

2.4 制御フロー

# ── 条件分岐 ──
score = 85
if score >= 90:
    grade = "A"
elif score >= 80:
    grade = "B"
elif score >= 70:
    grade = "C"
else:
    grade = "F"

# 三項演算子(条件式)
status = "adult" if age >= 18 else "minor"

# ── 構造的パターンマッチング(Python 3.10+) ──
command = {"action": "move", "direction": "north", "speed": 5}

match command:
    case {"action": "quit"}:
        print("Quitting")
    case {"action": "move", "direction": direction, "speed": speed} if speed > 0:
        print(f"Moving {direction} at speed {speed}")
    case {"action": "move", "direction": direction}:
        print(f"Moving {direction} at default speed")
    case _:
        print("Unknown command")

# パターンマッチングの高度な例
def process_response(response):
    match response:
        case {"status": 200, "body": body}:
            return f"Success: {body}"
        case {"status": 404}:
            return "Not found"
        case {"status": status} if 500 <= status < 600:
            return f"Server error: {status}"
        case {"status": status}:
            return f"Other status: {status}"

# ── ループ ──
# for ループ
for i in range(5):
    print(i)

for key, value in config.items():
    print(f"{key} = {value}")

for index, item in enumerate(fruits, start=1):
    print(f"{index}. {item}")

# zip で複数のイテラブルを同時に走査
names = ["Alice", "Bob", "Charlie"]
scores = [85, 92, 78]
for name, score in zip(names, scores):
    print(f"{name}: {score}")

# while ループ
count = 0
while count < 10:
    if count == 5:
        break           # ループを中断
    if count % 2 == 0:
        count += 1
        continue        # 次のイテレーションへ
    print(count)
    count += 1
else:
    # break されなかった場合に実行される
    print("Loop completed normally")

# ── walrus 演算子(Python 3.8+) ──
# 代入と評価を同時に行う
import re
if m := re.match(r'(\d+)-(\d+)', '123-456'):
    start, end = m.groups()
    print(f"Range: {start} to {end}")

# リスト内包表記との組み合わせ
data = [1, 5, 3, 8, 2, 9, 4]
filtered = [y for x in data if (y := x * 2) > 6]
# [8, 16, 18]

3. 関数とデコレータ

3.1 関数定義の基本

# 基本的な関数定義
def greet(name: str, greeting: str = "Hello") -> str:
    """挨拶メッセージを生成する。

    Args:
        name: 挨拶する相手の名前
        greeting: 挨拶の言葉(デフォルト: "Hello")

    Returns:
        フォーマットされた挨拶文字列
    """
    return f"{greeting}, {name}!"

# 呼び出し
print(greet("Alice"))               # Hello, Alice!
print(greet("Bob", "Hi"))           # Hi, Bob!
print(greet(greeting="Hey", name="Charlie"))  # キーワード引数

3.2 引数の種類

# ── 可変長引数 ──
def sum_all(*args: int) -> int:
    """任意の数の整数を受け取って合計を返す"""
    return sum(args)

print(sum_all(1, 2, 3, 4, 5))  # 15

# ── 可変長キーワード引数 ──
def build_profile(**kwargs: str) -> dict:
    """キーワード引数から辞書を構築する"""
    return {k: v for k, v in kwargs.items()}

profile = build_profile(name="Alice", role="Engineer", team="Backend")

# ── 位置専用引数とキーワード専用引数(Python 3.8+) ──
def search(query: str, /, *, max_results: int = 10, sort: str = "relevance") -> list:
    """
    query: 位置専用引数(/ の前)— キーワード指定不可
    max_results, sort: キーワード専用引数(* の後)— 位置指定不可
    """
    print(f"Searching '{query}' (max={max_results}, sort={sort})")
    return []

search("python")                                    # OK
search("python", max_results=5, sort="date")        # OK
# search(query="python")                            # TypeError: 位置専用引数
# search("python", 5)                               # TypeError: キーワード専用引数

# ── 引数のアンパック ──
def point_distance(x1, y1, x2, y2):
    return ((x2 - x1) ** 2 + (y2 - y1) ** 2) ** 0.5

coords = [0, 0, 3, 4]
print(point_distance(*coords))  # 5.0(リストをアンパック)

params = {"x1": 0, "y1": 0, "x2": 3, "y2": 4}
print(point_distance(**params))  # 5.0(辞書をアンパック)

3.3 ラムダ式と高階関数

# ラムダ式(無名関数)
square = lambda x: x ** 2
add = lambda x, y: x + y

# 高階関数
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# map — 各要素に関数を適用
squared = list(map(lambda x: x ** 2, numbers))

# filter — 条件に合う要素を抽出
evens = list(filter(lambda x: x % 2 == 0, numbers))

# reduce — 要素を累積的に結合
from functools import reduce
total = reduce(lambda acc, x: acc + x, numbers, 0)

# sorted のカスタムキー
students = [
    {"name": "Alice", "grade": 85},
    {"name": "Bob", "grade": 92},
    {"name": "Charlie", "grade": 78},
]
sorted_students = sorted(students, key=lambda s: s["grade"], reverse=True)

3.4 デコレータ

import functools
import time
from typing import Callable, TypeVar, ParamSpec

P = ParamSpec('P')
T = TypeVar('T')

# ── 基本的なデコレータ ──
def timer(func: Callable[P, T]) -> Callable[P, T]:
    """関数の実行時間を計測するデコレータ"""
    @functools.wraps(func)
    def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
        start = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = time.perf_counter() - start
        print(f"{func.__name__} took {elapsed:.4f}s")
        return result
    return wrapper

@timer
def slow_function():
    time.sleep(1)
    return "done"

# ── 引数付きデコレータ ──
def retry(max_attempts: int = 3, delay: float = 1.0):
    """リトライ機能を提供するデコレータ"""
    def decorator(func: Callable[P, T]) -> Callable[P, T]:
        @functools.wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
            last_exception = None
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    print(f"Attempt {attempt}/{max_attempts} failed: {e}")
                    if attempt < max_attempts:
                        time.sleep(delay)
            raise last_exception
        return wrapper
    return decorator

@retry(max_attempts=3, delay=0.5)
def fetch_data(url: str) -> dict:
    """外部APIからデータを取得する(リトライ付き)"""
    import urllib.request
    with urllib.request.urlopen(url) as response:
        return response.read()

# ── キャッシュデコレータ(標準ライブラリ) ──
from functools import lru_cache, cache

@lru_cache(maxsize=128)
def fibonacci(n: int) -> int:
    """フィボナッチ数列(メモ化で高速化)"""
    if n < 2:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

@cache  # Python 3.9+: maxsize=None の lru_cache と同等
def factorial(n: int) -> int:
    if n <= 1:
        return 1
    return n * factorial(n - 1)

# ── クラスベースのデコレータ ──
class CountCalls:
    """関数の呼び出し回数を記録するデコレータ"""
    def __init__(self, func):
        functools.update_wrapper(self, func)
        self.func = func
        self.count = 0

    def __call__(self, *args, **kwargs):
        self.count += 1
        print(f"{self.func.__name__} has been called {self.count} time(s)")
        return self.func(*args, **kwargs)

@CountCalls
def say_hello(name: str):
    print(f"Hello, {name}!")

say_hello("Alice")  # say_hello has been called 1 time(s) → Hello, Alice!
say_hello("Bob")    # say_hello has been called 2 time(s) → Hello, Bob!

# ── デコレータの積み重ね ──
@timer
@retry(max_attempts=2)
def process_data(data):
    """複数のデコレータを適用(下から上に適用される)"""
    return data

3.5 ジェネレータ

# ── ジェネレータ関数 ──
def fibonacci_gen(limit: int):
    """フィボナッチ数列をジェネレータとして生成"""
    a, b = 0, 1
    while a < limit:
        yield a
        a, b = b, a + b

for num in fibonacci_gen(100):
    print(num, end=" ")
# 0 1 1 2 3 5 8 13 21 34 55 89

# ── ジェネレータ式 ──
# リスト内包表記のメモリ効率版
sum_of_squares = sum(x ** 2 for x in range(1_000_000))

# ── yield from によるサブジェネレータ委譲 ──
def chain(*iterables):
    for iterable in iterables:
        yield from iterable

list(chain([1, 2], [3, 4], [5, 6]))
# [1, 2, 3, 4, 5, 6]

# ── ジェネレータを使った大容量ファイル処理 ──
def read_large_file(file_path: str, chunk_size: int = 8192):
    """大容量ファイルをチャンク単位で読み込むジェネレータ"""
    with open(file_path, 'rb') as f:
        while chunk := f.read(chunk_size):
            yield chunk

# メモリ効率的にファイルを処理
# for chunk in read_large_file("/path/to/large/file"):
#     process(chunk)

4. オブジェクト指向プログラミング

4.1 クラスの基本

class Animal:
    """動物を表す基底クラス"""

    # クラス変数(全インスタンスで共有)
    kingdom = "Animalia"
    _count = 0

    def __init__(self, name: str, species: str, age: int = 0):
        """コンストラクタ"""
        self.name = name            # パブリック属性
        self.species = species
        self._age = age             # 慣習的にプライベート(アクセスは可能)
        self.__id = id(self)        # 名前マングリング(_Animal__id)
        Animal._count += 1

    def speak(self) -> str:
        """サブクラスでオーバーライドされることを意図"""
        return f"{self.name} makes a sound"

    @property
    def age(self) -> int:
        """age のゲッター(プロパティ)"""
        return self._age

    @age.setter
    def age(self, value: int):
        """age のセッター(バリデーション付き)"""
        if value < 0:
            raise ValueError("Age cannot be negative")
        self._age = value

    @classmethod
    def get_count(cls) -> int:
        """クラスメソッド: クラス自体にアクセス"""
        return cls._count

    @staticmethod
    def is_valid_name(name: str) -> bool:
        """静的メソッド: クラスやインスタンスに依存しないユーティリティ"""
        return bool(name) and name[0].isupper()

    def __repr__(self) -> str:
        """開発者向けの文字列表現"""
        return f"Animal(name={self.name!r}, species={self.species!r}, age={self._age})"

    def __str__(self) -> str:
        """ユーザー向けの文字列表現"""
        return f"{self.name} ({self.species})"

    def __eq__(self, other) -> bool:
        if not isinstance(other, Animal):
            return NotImplemented
        return self.name == other.name and self.species == other.species

    def __hash__(self) -> int:
        return hash((self.name, self.species))

4.2 継承とポリモーフィズム

class Dog(Animal):
    """Animal を継承した Dog クラス"""

    def __init__(self, name: str, breed: str, age: int = 0):
        super().__init__(name, species="Canis lupus familiaris", age=age)
        self.breed = breed

    def speak(self) -> str:
        return f"{self.name} says Woof!"

    def fetch(self, item: str) -> str:
        return f"{self.name} fetches the {item}"


class Cat(Animal):
    def __init__(self, name: str, indoor: bool = True, age: int = 0):
        super().__init__(name, species="Felis catus", age=age)
        self.indoor = indoor

    def speak(self) -> str:
        return f"{self.name} says Meow!"


# ポリモーフィズム
animals: list[Animal] = [
    Dog("Rex", "German Shepherd", 5),
    Cat("Whiskers", indoor=True, age=3),
    Dog("Buddy", "Golden Retriever", 2),
]

for animal in animals:
    print(animal.speak())  # 各サブクラスの speak() が呼ばれる

# 多重継承と MRO(Method Resolution Order)
class FlyingAnimal:
    def move(self) -> str:
        return "Flying"

class SwimmingAnimal:
    def move(self) -> str:
        return "Swimming"

class Duck(FlyingAnimal, SwimmingAnimal, Animal):
    def __init__(self):
        super().__init__("Donald", "Anas platyrhynchos")

    def speak(self) -> str:
        return f"{self.name} says Quack!"

# MRO の確認
print(Duck.__mro__)
# (Duck, FlyingAnimal, SwimmingAnimal, Animal, object)
duck = Duck()
print(duck.move())  # "Flying"(MRO で FlyingAnimal が先)

4.3 抽象基底クラス

from abc import ABC, abstractmethod

class Shape(ABC):
    """図形の抽象基底クラス"""

    @abstractmethod
    def area(self) -> float:
        """面積を計算する(サブクラスで実装必須)"""
        ...

    @abstractmethod
    def perimeter(self) -> float:
        """周長を計算する"""
        ...

    def describe(self) -> str:
        """具象メソッド(共通の実装)"""
        return f"{self.__class__.__name__}: area={self.area():.2f}, perimeter={self.perimeter():.2f}"


class Circle(Shape):
    def __init__(self, radius: float):
        self.radius = radius

    def area(self) -> float:
        import math
        return math.pi * self.radius ** 2

    def perimeter(self) -> float:
        import math
        return 2 * math.pi * self.radius


class Rectangle(Shape):
    def __init__(self, width: float, height: float):
        self.width = width
        self.height = height

    def area(self) -> float:
        return self.width * self.height

    def perimeter(self) -> float:
        return 2 * (self.width + self.height)


# shape = Shape()  # TypeError: 抽象クラスはインスタンス化できない
circle = Circle(5)
print(circle.describe())  # Circle: area=78.54, perimeter=31.42

4.4 データクラスとスロット

from dataclasses import dataclass, field, asdict, astuple
from typing import ClassVar

# ── データクラス(Python 3.7+) ──
@dataclass
class User:
    """ユーザー情報を保持するデータクラス"""
    name: str
    email: str
    age: int
    tags: list[str] = field(default_factory=list)  # ミュータブルなデフォルト値
    _id: int = field(init=False, repr=False)        # 初期化引数に含めない
    MAX_AGE: ClassVar[int] = 150                     # クラス変数

    def __post_init__(self):
        """__init__ の後に呼ばれるフック"""
        self._id = hash((self.name, self.email))
        if self.age < 0 or self.age > self.MAX_AGE:
            raise ValueError(f"Invalid age: {self.age}")

user = User("Alice", "alice@example.com", 30, ["admin", "developer"])
print(user)  # User(name='Alice', email='alice@example.com', age=30, tags=['admin', 'developer'])

# 辞書/タプルへの変換
print(asdict(user))
print(astuple(user))

# ── イミュータブルなデータクラス ──
@dataclass(frozen=True)
class Point:
    x: float
    y: float

p = Point(1.0, 2.0)
# p.x = 3.0  # FrozenInstanceError

# ── __slots__ による省メモリ化 ──
@dataclass(slots=True)  # Python 3.10+
class Coordinate:
    x: float
    y: float
    z: float

# slots=True で __dict__ を持たなくなり、メモリ使用量が削減される
# 数百万のインスタンスを生成する場合に有効

# ── Python 3.10+ kw_only ──
@dataclass(kw_only=True)
class Config:
    host: str
    port: int
    debug: bool = False

# Config("localhost", 8080)  # TypeError: キーワード引数必須
config = Config(host="localhost", port=8080)

4.5 プロトコルと構造的サブタイピング

from typing import Protocol, runtime_checkable

@runtime_checkable
class Drawable(Protocol):
    """描画可能なオブジェクトのプロトコル"""
    def draw(self) -> None: ...
    def get_bounds(self) -> tuple[float, float, float, float]: ...

class Button:
    """Drawable を明示的に継承していないが、プロトコルに適合する"""
    def draw(self) -> None:
        print("Drawing button")

    def get_bounds(self) -> tuple[float, float, float, float]:
        return (0, 0, 100, 50)

class TextLabel:
    def draw(self) -> None:
        print("Drawing text label")

    def get_bounds(self) -> tuple[float, float, float, float]:
        return (0, 0, 200, 30)

def render(widget: Drawable) -> None:
    """Drawable プロトコルに適合する任意のオブジェクトを受け入れる"""
    bounds = widget.get_bounds()
    print(f"Rendering at bounds {bounds}")
    widget.draw()

render(Button())     # OK — 構造的にプロトコルに適合
render(TextLabel())  # OK

# ランタイムチェック
print(isinstance(Button(), Drawable))  # True

4.6 コンテキストマネージャ

# ── __enter__ / __exit__ による実装 ──
class DatabaseConnection:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.connection = None

    def __enter__(self):
        print(f"Connecting to {self.connection_string}")
        self.connection = f"Connection({self.connection_string})"
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print(f"Closing {self.connection}")
        self.connection = None
        # False を返すと例外が伝播する(デフォルト)
        # True を返すと例外を握りつぶす
        return False

    def query(self, sql: str):
        return f"Result of: {sql}"

with DatabaseConnection("postgresql://localhost/mydb") as db:
    result = db.query("SELECT * FROM users")

# ── contextlib を使った簡易実装 ──
from contextlib import contextmanager

@contextmanager
def temporary_directory():
    """一時ディレクトリを作成し、使用後に削除する"""
    import tempfile
    import shutil

    tmpdir = tempfile.mkdtemp()
    try:
        yield tmpdir
    finally:
        shutil.rmtree(tmpdir)

with temporary_directory() as tmpdir:
    print(f"Working in {tmpdir}")

# ── 非同期コンテキストマネージャ ──
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_db_connection(url: str):
    conn = await connect(url)  # 仮想的な非同期接続
    try:
        yield conn
    finally:
        await conn.close()

5. モジュールとパッケージ

5.1 モジュールシステム

# ── モジュールの作成とインポート ──
# mymodule.py
"""ユーティリティ関数を提供するモジュール"""

__all__ = ["public_function", "PublicClass"]  # from mymodule import * で公開する名前

def public_function():
    return "I'm public"

def _private_function():
    return "I'm conventionally private"

class PublicClass:
    pass

# インポートの方法
import mymodule
from mymodule import public_function
from mymodule import public_function as pub_func
import mymodule as mm

# 条件付きインポート
try:
    import ujson as json
except ImportError:
    import json

# 遅延インポート
def process():
    import heavy_module  # 必要な時だけインポート
    return heavy_module.compute()

5.2 パッケージ構造

myproject/
├── pyproject.toml          # プロジェクト設定(PEP 621)
├── src/
│   └── mypackage/
│       ├── __init__.py     # パッケージ初期化
│       ├── __main__.py     # python -m mypackage で実行
│       ├── core/
│       │   ├── __init__.py
│       │   ├── engine.py
│       │   └── config.py
│       ├── utils/
│       │   ├── __init__.py
│       │   ├── helpers.py
│       │   └── validators.py
│       └── api/
│           ├── __init__.py
│           ├── routes.py
│           └── middleware.py
├── tests/
│   ├── __init__.py
│   ├── conftest.py         # pytest のフィクスチャ
│   ├── test_core/
│   │   └── test_engine.py
│   └── test_api/
│       └── test_routes.py
├── docs/
│   └── index.md
└── README.md
# __init__.py — パッケージの公開 API を定義
# src/mypackage/__init__.py
"""MyPackage — サンプルパッケージ"""

__version__ = "1.0.0"

from mypackage.core.engine import Engine
from mypackage.core.config import Config

__all__ = ["Engine", "Config"]

# __main__.py — python -m mypackage で実行される
# src/mypackage/__main__.py
import sys
from mypackage.core.engine import Engine

def main():
    engine = Engine()
    engine.run()
    return 0

if __name__ == "__main__":
    sys.exit(main())

5.3 pyproject.toml(PEP 621 — 現代的なプロジェクト設定)

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "mypackage"
version = "1.0.0"
description = "A sample Python package"
readme = "README.md"
license = {text = "MIT"}
requires-python = ">=3.10"
authors = [
    {name = "Alice", email = "alice@example.com"},
]
keywords = ["sample", "package"]
classifiers = [
    "Development Status :: 4 - Beta",
    "Programming Language :: Python :: 3",
    "Programming Language :: Python :: 3.10",
    "Programming Language :: Python :: 3.11",
    "Programming Language :: Python :: 3.12",
]
dependencies = [
    "requests>=2.28",
    "pydantic>=2.0",
    "click>=8.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=7.0",
    "pytest-cov>=4.0",
    "ruff>=0.1.0",
    "mypy>=1.0",
]
docs = [
    "sphinx>=6.0",
    "sphinx-rtd-theme>=1.0",
]

[project.scripts]
mypackage-cli = "mypackage.__main__:main"

[project.urls]
Homepage = "https://github.com/example/mypackage"
Documentation = "https://mypackage.readthedocs.io"

# ── ツール設定 ──
[tool.ruff]
target-version = "py310"
line-length = 100
select = ["E", "F", "I", "N", "W", "UP", "B", "A", "SIM"]

[tool.ruff.lint.isort]
known-first-party = ["mypackage"]

[tool.mypy]
python_version = "3.10"
strict = true
warn_return_any = true
warn_unused_configs = true

[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-v --cov=mypackage --cov-report=term-missing"
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]

[tool.coverage.run]
source = ["mypackage"]
branch = true

[tool.coverage.report]
fail_under = 80
show_missing = true
exclude_lines = [
    "pragma: no cover",
    "if TYPE_CHECKING:",
    "if __name__ == .__main__.",
]

5.4 仮想環境とパッケージ管理

# ── venv(標準ライブラリ) ──
python -m venv .venv
source .venv/bin/activate          # macOS/Linux
# .venv\Scripts\activate           # Windows
pip install -r requirements.txt
deactivate

# ── pip の基本操作 ──
pip install requests               # パッケージのインストール
pip install "requests>=2.28,<3"    # バージョン制約
pip install -e ".[dev]"            # 開発モードでインストール
pip freeze > requirements.txt      # 依存関係の出力
pip install -r requirements.txt    # 依存関係の一括インストール

# ── uv(高速なパッケージ管理ツール) ──
# Rust で書かれた pip/venv の代替(10-100倍高速)
pip install uv

uv venv                            # 仮想環境の作成
uv pip install requests            # パッケージのインストール
uv pip compile requirements.in -o requirements.txt  # ロックファイル生成
uv pip sync requirements.txt       # 環境を同期

# ── Poetry ──
pip install poetry

poetry new myproject               # 新規プロジェクト作成
poetry init                        # 既存プロジェクトの初期化
poetry add requests                # 依存関係の追加
poetry add --group dev pytest      # 開発依存の追加
poetry install                     # 依存関係のインストール
poetry lock                        # ロックファイルの更新
poetry build                       # パッケージのビルド
poetry publish                     # PyPI への公開

# ── pipx(CLI ツールのグローバルインストール) ──
pip install pipx
pipx install ruff
pipx install mypy
pipx install black

6. エラーハンドリング

6.1 例外の基本

# ── try / except / else / finally ──
def divide(a: float, b: float) -> float:
    try:
        result = a / b
    except ZeroDivisionError:
        print("Error: Division by zero")
        return float('inf')
    except TypeError as e:
        print(f"Type error: {e}")
        raise  # 例外を再送出
    else:
        # 例外が発生しなかった場合に実行
        print(f"Result: {result}")
        return result
    finally:
        # 常に実行される(クリーンアップ処理に使用)
        print("Division operation completed")

# ── 例外の階層 ──
# BaseException
#  ├── SystemExit
#  ├── KeyboardInterrupt
#  ├── GeneratorExit
#  └── Exception
#       ├── StopIteration
#       ├── ArithmeticError
#       │    ├── ZeroDivisionError
#       │    └── OverflowError
#       ├── LookupError
#       │    ├── IndexError
#       │    └── KeyError
#       ├── OSError
#       │    ├── FileNotFoundError
#       │    ├── PermissionError
#       │    └── ConnectionError
#       ├── ValueError
#       ├── TypeError
#       ├── AttributeError
#       └── RuntimeError

# ── 複数の例外をまとめてキャッチ ──
try:
    value = int("not_a_number")
except (ValueError, TypeError) as e:
    print(f"Conversion error: {e}")

6.2 カスタム例外

class AppError(Exception):
    """アプリケーションの基底例外"""
    def __init__(self, message: str, code: int = 500):
        super().__init__(message)
        self.code = code

class ValidationError(AppError):
    """入力バリデーションエラー"""
    def __init__(self, field: str, message: str):
        super().__init__(f"Validation error on '{field}': {message}", code=400)
        self.field = field

class NotFoundError(AppError):
    """リソースが見つからないエラー"""
    def __init__(self, resource: str, identifier: str):
        super().__init__(f"{resource} '{identifier}' not found", code=404)
        self.resource = resource
        self.identifier = identifier

class AuthenticationError(AppError):
    """認証エラー"""
    def __init__(self, message: str = "Authentication failed"):
        super().__init__(message, code=401)

# 使用例
def get_user(user_id: str) -> dict:
    if not user_id:
        raise ValidationError("user_id", "Must not be empty")
    users = {"1": {"name": "Alice"}}
    if user_id not in users:
        raise NotFoundError("User", user_id)
    return users[user_id]

try:
    user = get_user("99")
except NotFoundError as e:
    print(f"[{e.code}] {e}")
    # [404] User '99' not found
except AppError as e:
    print(f"[{e.code}] {e}")

6.3 例外グループ(Python 3.11+)

# ── ExceptionGroup: 複数の例外をまとめて扱う ──
def process_batch(items: list) -> list:
    """バッチ処理で複数のエラーをまとめて報告"""
    results = []
    errors = []

    for i, item in enumerate(items):
        try:
            results.append(int(item))
        except ValueError as e:
            errors.append(ValueError(f"Item {i}: {e}"))

    if errors:
        raise ExceptionGroup("Batch processing errors", errors)
    return results

# except* で ExceptionGroup をフィルタリング
try:
    process_batch(["1", "abc", "3", "def"])
except* ValueError as eg:
    print(f"Got {len(eg.exceptions)} ValueError(s):")
    for e in eg.exceptions:
        print(f"  - {e}")

# ── add_note(Python 3.11+) ──
try:
    raise ValueError("Something went wrong")
except ValueError as e:
    e.add_note("This happened during batch processing")
    e.add_note(f"Processing item #42")
    raise  # ノート付きで再送出

7. ファイル I/O と標準ライブラリ

7.1 ファイル操作

from pathlib import Path

# ── pathlib(推奨) ──
base = Path("/var/log/myapp")
log_file = base / "app.log"            # パス結合

# パス操作
print(log_file.parent)                 # /var/log/myapp
print(log_file.name)                   # app.log
print(log_file.stem)                   # app
print(log_file.suffix)                 # .log
print(log_file.exists())               # True/False
print(log_file.is_file())              # True/False
print(log_file.is_dir())               # False

# ディレクトリ操作
Path("output/reports").mkdir(parents=True, exist_ok=True)

# ファイルの列挙
for py_file in Path("src").rglob("*.py"):
    print(py_file)

# テキストファイルの読み書き
config_path = Path("config.txt")
config_path.write_text("key=value\n", encoding="utf-8")
content = config_path.read_text(encoding="utf-8")

# バイナリファイル
data_path = Path("data.bin")
data_path.write_bytes(b"\x00\x01\x02")
raw = data_path.read_bytes()

# ── ファイルの読み書き(open) ──
# テキストファイル
with open("data.txt", "r", encoding="utf-8") as f:
    # 一括読み込み
    content = f.read()

    # 行ごとの読み込み(メモリ効率的)
    f.seek(0)
    for line in f:
        print(line.strip())

# 書き込み
with open("output.txt", "w", encoding="utf-8") as f:
    f.write("Hello, World!\n")
    f.writelines(["line1\n", "line2\n", "line3\n"])

# 追記
with open("output.txt", "a", encoding="utf-8") as f:
    f.write("Appended line\n")

7.2 JSON / YAML / TOML

import json
from pathlib import Path

# ── JSON ──
data = {
    "name": "MyApp",
    "version": "1.0.0",
    "settings": {
        "debug": True,
        "max_connections": 100,
        "allowed_hosts": ["localhost", "0.0.0.0"],
    },
}

# 書き出し
Path("config.json").write_text(
    json.dumps(data, indent=2, ensure_ascii=False),
    encoding="utf-8"
)

# 読み込み
config = json.loads(Path("config.json").read_text(encoding="utf-8"))

# カスタムエンコーダ/デコーダ
from datetime import datetime, date
from decimal import Decimal

class CustomEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, (datetime, date)):
            return obj.isoformat()
        if isinstance(obj, Decimal):
            return str(obj)
        return super().default(obj)

json.dumps({"today": date.today(), "price": Decimal("19.99")}, cls=CustomEncoder)

# ── TOML(Python 3.11+ で標準ライブラリ) ──
import tomllib  # 読み込み専用

with open("pyproject.toml", "rb") as f:
    toml_data = tomllib.load(f)

# 書き込みには tomli-w が必要
# pip install tomli-w
# import tomli_w
# with open("output.toml", "wb") as f:
#     tomli_w.dump(data, f)

# ── CSV ──
import csv

# 書き込み
with open("users.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=["name", "age", "email"])
    writer.writeheader()
    writer.writerows([
        {"name": "Alice", "age": 30, "email": "alice@example.com"},
        {"name": "Bob", "age": 25, "email": "bob@example.com"},
    ])

# 読み込み
with open("users.csv", "r", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(f"{row['name']} ({row['age']})")

7.3 日時処理

from datetime import datetime, date, time, timedelta, timezone
from zoneinfo import ZoneInfo  # Python 3.9+

# 現在日時
now = datetime.now()                              # ローカル時間(naiveは非推奨)
now_utc = datetime.now(timezone.utc)              # UTC
now_jst = datetime.now(ZoneInfo("Asia/Tokyo"))    # JST

# タイムゾーン変換
utc_time = datetime(2024, 1, 15, 12, 0, tzinfo=timezone.utc)
jst_time = utc_time.astimezone(ZoneInfo("Asia/Tokyo"))
pst_time = utc_time.astimezone(ZoneInfo("America/Los_Angeles"))

print(utc_time)   # 2024-01-15 12:00:00+00:00
print(jst_time)   # 2024-01-15 21:00:00+09:00
print(pst_time)   # 2024-01-15 04:00:00-08:00

# 日時の計算
tomorrow = date.today() + timedelta(days=1)
next_week = datetime.now() + timedelta(weeks=1)
two_hours_later = datetime.now() + timedelta(hours=2)

# 日時のフォーマット
formatted = now.strftime("%Y-%m-%d %H:%M:%S")
iso_format = now.isoformat()

# 文字列からの解析
parsed = datetime.strptime("2024-01-15 10:30:00", "%Y-%m-%d %H:%M:%S")
from_iso = datetime.fromisoformat("2024-01-15T10:30:00+09:00")

7.4 正規表現

import re

text = "Contact us at support@example.com or sales@company.org"

# 基本的なマッチング
pattern = r'[\w.+-]+@[\w-]+\.[\w.]+'
emails = re.findall(pattern, text)
# ['support@example.com', 'sales@company.org']

# コンパイル済みパターン(繰り返し使用する場合)
email_pattern = re.compile(r'''
    (?P<user>[\w.+-]+)       # ユーザー名
    @                         # @
    (?P<domain>[\w-]+\.[\w.]+)  # ドメイン名
''', re.VERBOSE)

for match in email_pattern.finditer(text):
    print(f"User: {match.group('user')}, Domain: {match.group('domain')}")

# 置換
sanitized = re.sub(r'\b\d{3}-\d{4}\b', 'XXX-XXXX', "Call 123-4567 or 987-6543")
# "Call XXX-XXXX or XXX-XXXX"

# 分割
parts = re.split(r'[,;]\s*', "apple, banana; cherry, date")
# ['apple', 'banana', 'cherry', 'date']

8. 型ヒントと静的解析

8.1 型ヒントの基本(PEP 484+)

from typing import (
    Any, Union, Optional, Literal,
    TypeVar, Generic, TypeAlias,
    Callable, Iterator, Generator,
    ClassVar, Final, Annotated,
    TypeGuard, Never, Self,
    overload
)
from collections.abc import Sequence, Mapping, Iterable

# ── 基本的な型ヒント ──
name: str = "Alice"
age: int = 30
ratio: float = 0.85
is_active: bool = True

# コレクション型(Python 3.9+ は組み込み型で直接記述可能)
names: list[str] = ["Alice", "Bob"]
scores: dict[str, int] = {"Alice": 95, "Bob": 87}
coordinates: tuple[float, float] = (3.14, 2.72)
unique_ids: set[int] = {1, 2, 3}
immutable_ids: frozenset[int] = frozenset({1, 2, 3})

# ── Optional と Union ──
# Python 3.10+ では X | Y 構文が使える
def find_user(user_id: int) -> dict[str, Any] | None:  # Optional[dict[str, Any]] と同等
    ...

def process(value: int | str) -> str:  # Union[int, str] と同等
    return str(value)

# ── Literal 型 ──
def set_color(color: Literal["red", "green", "blue"]) -> None:
    ...

# ── TypeAlias ──
UserId: TypeAlias = int
UserRecord: TypeAlias = dict[str, Any]
Callback: TypeAlias = Callable[[str, int], bool]

# Python 3.12+ では type 文
type Vector = list[float]
type Matrix = list[Vector]
type JSONValue = str | int | float | bool | None | list["JSONValue"] | dict[str, "JSONValue"]

# ── 関数の型ヒント ──
def greet(name: str, times: int = 1) -> str:
    return (f"Hello, {name}! " * times).strip()

# コールバックの型
def apply_operation(
    values: list[int],
    operation: Callable[[int], int]
) -> list[int]:
    return [operation(v) for v in values]

# ジェネレータの型
def count_up(start: int, end: int) -> Generator[int, None, None]:
    while start < end:
        yield start
        start += 1

8.2 ジェネリクス

from typing import TypeVar, Generic

T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')

# ── ジェネリッククラス ──
class Stack(Generic[T]):
    def __init__(self) -> None:
        self._items: list[T] = []

    def push(self, item: T) -> None:
        self._items.append(item)

    def pop(self) -> T:
        if not self._items:
            raise IndexError("Stack is empty")
        return self._items.pop()

    def peek(self) -> T:
        if not self._items:
            raise IndexError("Stack is empty")
        return self._items[-1]

    def __len__(self) -> int:
        return len(self._items)

int_stack: Stack[int] = Stack()
int_stack.push(1)
int_stack.push(2)
value: int = int_stack.pop()

# Python 3.12+ の新しいジェネリクス構文
class Stack[T]:
    def __init__(self) -> None:
        self._items: list[T] = []

    def push(self, item: T) -> None:
        self._items.append(item)

    def pop(self) -> T:
        return self._items.pop()

# ジェネリック関数(Python 3.12+)
def first[T](items: Sequence[T]) -> T:
    return items[0]

# 境界付きTypeVar
from typing import TypeVar

class Comparable:
    def __lt__(self, other: "Comparable") -> bool: ...

CT = TypeVar('CT', bound=Comparable)

def min_value(a: CT, b: CT) -> CT:
    return a if a < b else b

8.3 Pydantic によるデータバリデーション

from pydantic import BaseModel, Field, field_validator, model_validator
from pydantic import EmailStr, HttpUrl  # pip install pydantic[email]
from datetime import datetime
from enum import Enum

class UserRole(str, Enum):
    ADMIN = "admin"
    USER = "user"
    GUEST = "guest"

class Address(BaseModel):
    street: str
    city: str
    country: str = "Japan"
    postal_code: str = Field(pattern=r'^\d{3}-\d{4}$')

class UserCreate(BaseModel):
    """ユーザー作成リクエストのバリデーションモデル"""
    model_config = {
        "str_strip_whitespace": True,
        "str_min_length": 1,
    }

    name: str = Field(min_length=2, max_length=50)
    email: str = Field(description="ユーザーのメールアドレス")
    age: int = Field(ge=0, le=150)
    role: UserRole = UserRole.USER
    tags: list[str] = Field(default_factory=list, max_length=10)
    address: Address | None = None
    created_at: datetime = Field(default_factory=datetime.now)

    @field_validator('email')
    @classmethod
    def validate_email(cls, v: str) -> str:
        if '@' not in v:
            raise ValueError('Invalid email format')
        return v.lower()

    @field_validator('tags')
    @classmethod
    def validate_tags(cls, v: list[str]) -> list[str]:
        return [tag.lower().strip() for tag in v]

    @model_validator(mode='after')
    def validate_model(self) -> 'UserCreate':
        if self.role == UserRole.ADMIN and self.age < 18:
            raise ValueError('Admin users must be at least 18 years old')
        return self

# 使用例
user = UserCreate(
    name="Alice",
    email="ALICE@Example.com",
    age=30,
    tags=["Developer", "Python"],
    address={"street": "123 Main St", "city": "Tokyo", "postal_code": "100-0001"}
)
print(user.model_dump())
# email は自動的に小文字化される
print(user.model_dump_json(indent=2))  # JSON シリアライズ

# バリデーションエラー
from pydantic import ValidationError
try:
    invalid_user = UserCreate(name="A", email="invalid", age=-1)
except ValidationError as e:
    print(e.errors())

# ── Settings 管理(pydantic-settings) ──
# pip install pydantic-settings
from pydantic_settings import BaseSettings

class AppSettings(BaseSettings):
    """環境変数から設定を読み込む"""
    model_config = {
        "env_prefix": "APP_",
        "env_file": ".env",
    }

    debug: bool = False
    database_url: str = "sqlite:///db.sqlite3"
    secret_key: str
    allowed_origins: list[str] = ["http://localhost:3000"]
    max_connections: int = 10

# APP_SECRET_KEY=mysecret APP_DEBUG=true python app.py
# settings = AppSettings()

8.4 mypy による静的型チェック

# インストール
pip install mypy

# 基本的な実行
mypy src/
mypy --strict src/           # 厳格モード
mypy --python-version 3.12 src/

# mypy.ini または pyproject.toml で設定
# pyproject.toml
[tool.mypy]
python_version = "3.12"
strict = true
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
check_untyped_defs = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_ignores = true

[[tool.mypy.overrides]]
module = "tests.*"
disallow_untyped_defs = false

[[tool.mypy.overrides]]
module = "third_party_module.*"
ignore_missing_imports = true
# mypy が検出するエラーの例
def process(items: list[str]) -> int:
    total = 0
    for item in items:
        total += len(item)
    return total

result: str = process(["hello"])  # error: Incompatible types in assignment
#                                    (expression has type "int", variable has type "str")

# TypeGuard を使った型の絞り込み
from typing import TypeGuard

def is_string_list(val: list[object]) -> TypeGuard[list[str]]:
    return all(isinstance(x, str) for x in val)

def process_items(items: list[object]) -> None:
    if is_string_list(items):
        # ここで items は list[str] として扱われる
        for item in items:
            print(item.upper())

9. 並行処理と非同期プログラミング

9.1 スレッディング

import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

# ── 基本的なスレッド ──
def worker(name: str, delay: float) -> str:
    print(f"[{name}] Starting")
    time.sleep(delay)
    print(f"[{name}] Done")
    return f"{name} completed"

# Thread クラスの直接使用
thread = threading.Thread(target=worker, args=("Thread-1", 2))
thread.start()
thread.join()  # スレッドの完了を待機

# ── ThreadPoolExecutor(推奨) ──
urls = [
    "https://api.example.com/users",
    "https://api.example.com/products",
    "https://api.example.com/orders",
]

def fetch_url(url: str) -> dict:
    """URL からデータを取得(シミュレーション)"""
    import urllib.request
    time.sleep(1)  # ネットワークI/Oのシミュレーション
    return {"url": url, "status": 200}

with ThreadPoolExecutor(max_workers=5) as executor:
    # submit で個別に投入
    futures = {executor.submit(fetch_url, url): url for url in urls}

    for future in as_completed(futures):
        url = futures[future]
        try:
            result = future.result(timeout=10)
            print(f"{url}: {result['status']}")
        except Exception as e:
            print(f"{url}: Error - {e}")

    # map でまとめて投入
    results = list(executor.map(fetch_url, urls))

# ── スレッド同期プリミティブ ──
lock = threading.Lock()
counter = 0

def increment():
    global counter
    with lock:  # ロックの取得と解放を自動化
        temp = counter
        time.sleep(0.001)
        counter = temp + 1

threads = [threading.Thread(target=increment) for _ in range(100)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"Counter: {counter}")  # 100(ロックなしだと100未満になりうる)

# ── threading.Event ──
stop_event = threading.Event()

def background_task():
    while not stop_event.is_set():
        print("Working...")
        stop_event.wait(timeout=1.0)
    print("Stopped")

t = threading.Thread(target=background_task, daemon=True)
t.start()
time.sleep(3)
stop_event.set()  # タスクに停止を通知
t.join()

9.2 マルチプロセシング

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor

# ── CPU バウンドなタスクはマルチプロセスが効果的 ──
# (GIL の制約を受けない)

def cpu_heavy_task(n: int) -> int:
    """CPU 集約的な計算"""
    return sum(i * i for i in range(n))

# ProcessPoolExecutor(推奨)
with ProcessPoolExecutor(max_workers=mp.cpu_count()) as executor:
    tasks = [1_000_000, 2_000_000, 3_000_000, 4_000_000]
    results = list(executor.map(cpu_heavy_task, tasks))
    print(results)

# ── multiprocessing.Pool ──
def process_chunk(chunk: list[int]) -> list[int]:
    return [x ** 2 for x in chunk]

if __name__ == "__main__":
    data = list(range(1000))
    chunk_size = len(data) // mp.cpu_count()
    chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]

    with mp.Pool(processes=mp.cpu_count()) as pool:
        results = pool.map(process_chunk, chunks)
        flat_results = [item for sublist in results for item in sublist]

# ── プロセス間通信 ──
def producer(queue: mp.Queue):
    for i in range(10):
        queue.put(f"item-{i}")
    queue.put(None)  # 終了シグナル

def consumer(queue: mp.Queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"Consumed: {item}")

if __name__ == "__main__":
    queue = mp.Queue()
    p = mp.Process(target=producer, args=(queue,))
    c = mp.Process(target=consumer, args=(queue,))
    p.start()
    c.start()
    p.join()
    c.join()

# ── 共有メモリ(Python 3.8+) ──
from multiprocessing import shared_memory
import numpy as np

if __name__ == "__main__":
    # 共有メモリの作成
    a = np.array([1, 2, 3, 4, 5], dtype=np.float64)
    shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
    shared_array = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
    shared_array[:] = a[:]

    print(f"Shared memory name: {shm.name}")

    # 別プロセスからアクセス
    existing_shm = shared_memory.SharedMemory(name=shm.name)
    b = np.ndarray(a.shape, dtype=a.dtype, buffer=existing_shm.buf)
    print(b)  # [1. 2. 3. 4. 5.]

    existing_shm.close()
    shm.close()
    shm.unlink()

9.3 asyncio — 非同期プログラミング

import asyncio
import aiohttp  # pip install aiohttp

# ── 基本的な非同期関数 ──
async def fetch_data(url: str) -> dict:
    """非同期 HTTP リクエスト"""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

async def main():
    # 並行実行(gather)
    urls = [
        "https://api.example.com/users",
        "https://api.example.com/products",
        "https://api.example.com/orders",
    ]
    results = await asyncio.gather(
        *[fetch_data(url) for url in urls],
        return_exceptions=True  # エラーを例外として返す
    )
    for url, result in zip(urls, results):
        if isinstance(result, Exception):
            print(f"{url}: Error - {result}")
        else:
            print(f"{url}: {result}")

asyncio.run(main())

# ── TaskGroup(Python 3.11+: 構造化並行性) ──
async def main_v2():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_data("https://api.example.com/users"))
        task2 = tg.create_task(fetch_data("https://api.example.com/products"))
        task3 = tg.create_task(fetch_data("https://api.example.com/orders"))
    # ここに到達した時点で全タスクが完了している
    # いずれかが例外を送出した場合、ExceptionGroup が発生
    print(task1.result(), task2.result(), task3.result())

# ── 非同期ジェネレータ ──
async def async_range(start: int, end: int, delay: float = 0.1):
    for i in range(start, end):
        await asyncio.sleep(delay)
        yield i

async def consume():
    async for value in async_range(0, 10):
        print(value)

# ── 非同期コンテキストマネージャ ──
class AsyncDatabase:
    async def __aenter__(self):
        print("Connecting...")
        await asyncio.sleep(0.1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Disconnecting...")
        await asyncio.sleep(0.1)

    async def query(self, sql: str) -> list:
        await asyncio.sleep(0.1)
        return [{"id": 1, "name": "Alice"}]

async def db_example():
    async with AsyncDatabase() as db:
        results = await db.query("SELECT * FROM users")

# ── セマフォによる並行数の制限 ──
async def rate_limited_fetch(urls: list[str], max_concurrent: int = 5):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_with_limit(url: str) -> dict:
        async with semaphore:
            return await fetch_data(url)

    return await asyncio.gather(*[fetch_with_limit(url) for url in urls])

# ── タイムアウト ──
async def with_timeout():
    try:
        async with asyncio.timeout(5.0):  # Python 3.11+
            result = await fetch_data("https://slow-api.example.com/data")
    except TimeoutError:
        print("Request timed out")

# ── キュー ──
async def producer(queue: asyncio.Queue):
    for i in range(10):
        await queue.put(f"item-{i}")
        await asyncio.sleep(0.1)
    await queue.put(None)

async def consumer(queue: asyncio.Queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"Processing: {item}")
        queue.task_done()

async def queue_example():
    queue: asyncio.Queue = asyncio.Queue(maxsize=5)
    await asyncio.gather(
        producer(queue),
        consumer(queue),
    )

9.4 GIL(Global Interpreter Lock)と並行処理の選択

Python の並行処理の選択ガイド:

┌─────────────────────┬───────────────────────┬─────────────────┐
│ タスクの種類         │ 推奨手法               │ 理由            │
├─────────────────────┼───────────────────────┼─────────────────┤
│ I/O バウンド         │ asyncio               │ 最も効率的       │
│ (HTTP, DB, ファイル)  │ または ThreadPool     │ GIL の影響なし   │
├─────────────────────┼───────────────────────┼─────────────────┤
│ CPU バウンド         │ multiprocessing       │ GIL を回避       │
│ (計算、データ処理)    │ または ProcessPool    │ 真の並列実行     │
├─────────────────────┼───────────────────────┼─────────────────┤
│ 混合                │ asyncio +             │ I/OはasyncIO、   │
│                     │ ProcessPoolExecutor   │ CPUはプロセス    │
└─────────────────────┴───────────────────────┴─────────────────┘

Python 3.13+ の実験的フリースレッドモード:
  - GIL を無効化可能(--disable-gil フラグ)
  - マルチスレッドで真の並列実行が可能に
  - まだ実験段階で、一部のC拡張との互換性に注意
# ── 混合パターン: asyncio + ProcessPoolExecutor ──
import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_work(data: list[int]) -> int:
    """CPU 集約的な処理(別プロセスで実行)"""
    return sum(x ** 2 for x in data)

async def mixed_workload():
    loop = asyncio.get_event_loop()

    # CPU バウンドなタスクを別プロセスに委譲
    with ProcessPoolExecutor() as pool:
        data_chunks = [list(range(1_000_000)) for _ in range(4)]
        futures = [
            loop.run_in_executor(pool, cpu_work, chunk)
            for chunk in data_chunks
        ]
        results = await asyncio.gather(*futures)
        print(f"Total: {sum(results)}")

asyncio.run(mixed_workload())

10. テスト

10.1 pytest — テストフレームワーク

# tests/test_calculator.py
import pytest
from mypackage.calculator import Calculator, DivisionByZeroError

# ── 基本的なテスト ──
def test_addition():
    calc = Calculator()
    assert calc.add(2, 3) == 5

def test_subtraction():
    calc = Calculator()
    assert calc.subtract(5, 3) == 2

# ── パラメタライズ ──
@pytest.mark.parametrize("a, b, expected", [
    (2, 3, 5),
    (-1, 1, 0),
    (0, 0, 0),
    (100, -50, 50),
    (1.5, 2.5, 4.0),
])
def test_add_parametrized(a, b, expected):
    calc = Calculator()
    assert calc.add(a, b) == expected

# ── 例外のテスト ──
def test_division_by_zero():
    calc = Calculator()
    with pytest.raises(DivisionByZeroError, match="Cannot divide by zero"):
        calc.divide(10, 0)

# ── 近似値の比較 ──
def test_float_comparison():
    assert 0.1 + 0.2 == pytest.approx(0.3)
    assert [0.1, 0.2] == pytest.approx([0.1, 0.2], abs=1e-6)

10.2 フィクスチャ

# tests/conftest.py
import pytest
import tempfile
from pathlib import Path

@pytest.fixture
def calculator():
    """Calculator インスタンスを提供するフィクスチャ"""
    return Calculator()

@pytest.fixture
def sample_data():
    """テスト用サンプルデータ"""
    return {
        "users": [
            {"name": "Alice", "age": 30},
            {"name": "Bob", "age": 25},
        ]
    }

@pytest.fixture
def temp_dir():
    """一時ディレクトリを提供し、テスト後に自動クリーンアップ"""
    with tempfile.TemporaryDirectory() as tmpdir:
        yield Path(tmpdir)

@pytest.fixture(scope="session")
def database():
    """セッション全体で共有されるデータベース接続"""
    db = Database.connect("test_db")
    db.create_tables()
    yield db
    db.drop_tables()
    db.disconnect()

@pytest.fixture(autouse=True)
def reset_state():
    """全テストの前後で自動実行されるフィクスチャ"""
    yield
    # テスト後のクリーンアップ

# フィクスチャを使用するテスト
def test_with_calculator(calculator):
    assert calculator.add(1, 1) == 2

def test_with_temp_dir(temp_dir):
    test_file = temp_dir / "test.txt"
    test_file.write_text("hello")
    assert test_file.read_text() == "hello"

10.3 モックとパッチ

from unittest.mock import Mock, patch, MagicMock, AsyncMock
import pytest

# ── Mock オブジェクト ──
def test_with_mock():
    # 基本的な Mock
    mock_db = Mock()
    mock_db.query.return_value = [{"id": 1, "name": "Alice"}]

    result = mock_db.query("SELECT * FROM users")
    assert result == [{"id": 1, "name": "Alice"}]
    mock_db.query.assert_called_once_with("SELECT * FROM users")

    # side_effect で例外を発生
    mock_db.query.side_effect = ConnectionError("Database unavailable")
    with pytest.raises(ConnectionError):
        mock_db.query("SELECT 1")

# ── patch デコレータ ──
class UserService:
    def __init__(self, api_client):
        self.api_client = api_client

    def get_user(self, user_id: int) -> dict:
        response = self.api_client.get(f"/users/{user_id}")
        if response.status_code == 200:
            return response.json()
        raise ValueError("User not found")

@patch("mypackage.services.requests.get")
def test_get_user(mock_get):
    mock_response = Mock()
    mock_response.status_code = 200
    mock_response.json.return_value = {"id": 1, "name": "Alice"}
    mock_get.return_value = mock_response

    service = UserService(Mock())
    # ... テスト

# ── 非同期モック ──
@pytest.mark.asyncio
async def test_async_function():
    mock_client = AsyncMock()
    mock_client.fetch.return_value = {"data": "result"}

    result = await mock_client.fetch("https://api.example.com")
    assert result == {"data": "result"}

10.4 pytest の設定とプラグイン

# pyproject.toml
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py", "*_test.py"]
python_functions = ["test_*"]
python_classes = ["Test*"]
addopts = [
    "-v",                          # 詳細出力
    "--tb=short",                  # トレースバックの短縮表示
    "--strict-markers",            # 未登録マーカーをエラーに
    "--cov=src/mypackage",         # カバレッジ対象
    "--cov-report=term-missing",   # カバレッジレポート
    "--cov-report=html:htmlcov",   # HTML レポート出力
    "-x",                          # 最初の失敗で停止
]
markers = [
    "slow: marks tests as slow (deselect with '-m \"not slow\"')",
    "integration: marks tests as integration tests",
]
filterwarnings = [
    "error",
    "ignore::DeprecationWarning",
]
asyncio_mode = "auto"  # pytest-asyncio の設定
# よく使う pytest コマンド
pytest                              # 全テスト実行
pytest tests/test_core.py           # 特定ファイル
pytest -k "test_add"                # 名前でフィルタ
pytest -m "not slow"                # マーカーでフィルタ
pytest --lf                         # 前回失敗したテストのみ
pytest --ff                         # 前回失敗したテストを優先実行
pytest -x                           # 最初の失敗で停止
pytest --pdb                        # 失敗時に pdb を起動
pytest -n auto                      # 並列実行(pytest-xdist)

11. ログとデバッグ

11.1 logging モジュール

import logging
import logging.config
import json
from pathlib import Path

# ── 基本設定 ──
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)

logger = logging.getLogger(__name__)
logger.info("Application started")
logger.warning("Low memory")
logger.error("Connection failed", exc_info=True)

# ── 辞書ベースの設定(推奨) ──
LOGGING_CONFIG = {
    "version": 1,
    "disable_existing_loggers": False,
    "formatters": {
        "standard": {
            "format": "%(asctime)s [%(levelname)s] %(name)s: %(message)s",
        },
        "json": {
            "()": "pythonjsonlogger.jsonlogger.JsonFormatter",
            "format": "%(asctime)s %(levelname)s %(name)s %(message)s",
        },
    },
    "handlers": {
        "console": {
            "class": "logging.StreamHandler",
            "level": "INFO",
            "formatter": "standard",
            "stream": "ext://sys.stdout",
        },
        "file": {
            "class": "logging.handlers.RotatingFileHandler",
            "level": "DEBUG",
            "formatter": "json",
            "filename": "app.log",
            "maxBytes": 10_485_760,  # 10MB
            "backupCount": 5,
        },
    },
    "loggers": {
        "mypackage": {
            "level": "DEBUG",
            "handlers": ["console", "file"],
            "propagate": False,
        },
    },
    "root": {
        "level": "WARNING",
        "handlers": ["console"],
    },
}

logging.config.dictConfig(LOGGING_CONFIG)

# ── 構造化ロギング(structlog) ──
# pip install structlog
import structlog

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.dev.ConsoleRenderer(),  # 開発用
        # structlog.processors.JSONRenderer(),  # 本番用
    ],
    logger_factory=structlog.stdlib.LoggerFactory(),
    wrapper_class=structlog.stdlib.BoundLogger,
)

log = structlog.get_logger()
log.info("user.login", user_id="12345", ip="192.168.1.1")
# 2024-01-15T10:30:00+00:00 [info] user.login user_id=12345 ip=192.168.1.1

11.2 デバッグ手法

# ── breakpoint()(Python 3.7+) ──
def buggy_function(data):
    result = []
    for item in data:
        breakpoint()  # ここでデバッガが起動(PYTHONBREAKPOINT=0 で無効化)
        result.append(item * 2)
    return result

# ── pdb コマンド ──
# n(ext)     : 次の行に進む
# s(tep)     : 関数の中に入る
# c(ontinue) : 次のブレークポイントまで実行
# p expr     : 式を評価して表示
# l(ist)     : ソースコードを表示
# w(here)    : コールスタックを表示
# q(uit)     : デバッガを終了

# ── 便利なデバッグテクニック ──
import sys
import traceback

# トレースバックの取得
try:
    1 / 0
except:
    tb = traceback.format_exc()
    print(tb)

# オブジェクトの詳細情報
import inspect
print(inspect.getsource(some_function))  # ソースコードを表示
print(inspect.getmembers(obj))           # メンバーの一覧

# メモリプロファイリング
# pip install memory-profiler
# @profile
# def memory_heavy():
#     data = [i for i in range(1_000_000)]
#     return data
# python -m memory_profiler script.py

12. Web フレームワーク

12.1 FastAPI — 現代的な非同期 Web フレームワーク

# pip install "fastapi[standard]"
# uvicorn main:app --reload

from fastapi import FastAPI, HTTPException, Depends, Query, Path, Body
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import Annotated
from datetime import datetime

app = FastAPI(
    title="My API",
    description="Sample REST API with FastAPI",
    version="1.0.0",
)

# ── CORS ミドルウェア ──
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# ── モデル定義 ──
class UserCreate(BaseModel):
    name: str = Field(min_length=2, max_length=50)
    email: str
    age: int = Field(ge=0, le=150)

class UserResponse(BaseModel):
    id: int
    name: str
    email: str
    age: int
    created_at: datetime

    model_config = {"from_attributes": True}  # ORM モードを有効化

# ── 依存性注入 ──
async def get_db():
    """データベースセッションを提供する依存関数"""
    db = DatabaseSession()
    try:
        yield db
    finally:
        await db.close()

async def get_current_user(token: str = Depends(oauth2_scheme)):
    """認証済みユーザーを返す依存関数"""
    user = await verify_token(token)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid token")
    return user

# ── エンドポイント ──
@app.get("/")
async def root():
    return {"message": "Hello, World!"}

@app.get("/users", response_model=list[UserResponse])
async def list_users(
    skip: int = Query(0, ge=0),
    limit: int = Query(10, ge=1, le=100),
    db = Depends(get_db),
):
    """ユーザー一覧を取得"""
    return await db.get_users(skip=skip, limit=limit)

@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(
    user_id: Annotated[int, Path(gt=0)],
    db = Depends(get_db),
):
    """特定のユーザーを取得"""
    user = await db.get_user(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(
    user: UserCreate,
    db = Depends(get_db),
):
    """新規ユーザーを作成"""
    return await db.create_user(user)

@app.put("/users/{user_id}", response_model=UserResponse)
async def update_user(
    user_id: int,
    user: UserCreate,
    db = Depends(get_db),
    current_user = Depends(get_current_user),
):
    """ユーザー情報を更新(認証必須)"""
    return await db.update_user(user_id, user)

@app.delete("/users/{user_id}", status_code=204)
async def delete_user(user_id: int, db = Depends(get_db)):
    """ユーザーを削除"""
    await db.delete_user(user_id)

# ── バックグラウンドタスク ──
from fastapi import BackgroundTasks

@app.post("/notifications")
async def send_notification(
    email: str,
    background_tasks: BackgroundTasks,
):
    background_tasks.add_task(send_email, email, "Welcome!")
    return {"message": "Notification queued"}

# ── WebSocket ──
from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Echo: {data}")
    except WebSocketDisconnect:
        print(f"Client {client_id} disconnected")

# ── ライフサイクルイベント ──
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 起動時の処理
    print("Starting up...")
    await init_database()
    yield
    # シャットダウン時の処理
    print("Shutting down...")
    await cleanup_resources()

app = FastAPI(lifespan=lifespan)

12.2 Django — フルスタック Web フレームワーク

# ── プロジェクト作成 ──
# pip install django
# django-admin startproject mysite
# cd mysite
# python manage.py startapp blog

# ── settings.py(主要設定) ──
# mysite/settings.py

import os
from pathlib import Path

BASE_DIR = Path(__file__).resolve().parent.parent

SECRET_KEY = os.environ.get('DJANGO_SECRET_KEY', 'change-me-in-production')
DEBUG = os.environ.get('DJANGO_DEBUG', 'True').lower() == 'true'
ALLOWED_HOSTS = os.environ.get('DJANGO_ALLOWED_HOSTS', 'localhost').split(',')

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'rest_framework',        # Django REST Framework
    'blog.apps.BlogConfig',  # アプリケーション
]

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': os.environ.get('DB_NAME', 'mysite'),
        'USER': os.environ.get('DB_USER', 'postgres'),
        'PASSWORD': os.environ.get('DB_PASSWORD', ''),
        'HOST': os.environ.get('DB_HOST', 'localhost'),
        'PORT': os.environ.get('DB_PORT', '5432'),
    }
}

REST_FRAMEWORK = {
    'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
    'PAGE_SIZE': 20,
    'DEFAULT_AUTHENTICATION_CLASSES': [
        'rest_framework.authentication.SessionAuthentication',
        'rest_framework.authentication.TokenAuthentication',
    ],
    'DEFAULT_PERMISSION_CLASSES': [
        'rest_framework.permissions.IsAuthenticated',
    ],
}

# ── モデル定義 ──
# blog/models.py
from django.db import models
from django.contrib.auth.models import User

class Post(models.Model):
    title = models.CharField(max_length=200)
    slug = models.SlugField(unique=True)
    author = models.ForeignKey(User, on_delete=models.CASCADE, related_name='posts')
    content = models.TextField()
    published = models.BooleanField(default=False)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    tags = models.ManyToManyField('Tag', blank=True)

    class Meta:
        ordering = ['-created_at']
        indexes = [
            models.Index(fields=['-created_at']),
            models.Index(fields=['slug']),
        ]

    def __str__(self):
        return self.title

class Tag(models.Model):
    name = models.CharField(max_length=50, unique=True)

    def __str__(self):
        return self.name

# ── マイグレーション ──
# python manage.py makemigrations
# python manage.py migrate

# ── ビュー(Django REST Framework) ──
# blog/serializers.py
from rest_framework import serializers

class PostSerializer(serializers.ModelSerializer):
    author_name = serializers.CharField(source='author.username', read_only=True)

    class Meta:
        model = Post
        fields = ['id', 'title', 'slug', 'author', 'author_name',
                  'content', 'published', 'created_at', 'updated_at']
        read_only_fields = ['author']

# blog/views.py
from rest_framework import viewsets, permissions

class PostViewSet(viewsets.ModelViewSet):
    queryset = Post.objects.select_related('author').prefetch_related('tags')
    serializer_class = PostSerializer
    permission_classes = [permissions.IsAuthenticatedOrReadOnly]
    lookup_field = 'slug'

    def perform_create(self, serializer):
        serializer.save(author=self.request.user)

# blog/urls.py
from django.urls import path, include
from rest_framework.routers import DefaultRouter

router = DefaultRouter()
router.register(r'posts', PostViewSet)

urlpatterns = [
    path('api/', include(router.urls)),
]

12.3 Flask — 軽量 Web フレームワーク

# pip install flask

from flask import Flask, request, jsonify, abort
from functools import wraps

app = Flask(__name__)
app.config.from_mapping(
    SECRET_KEY='dev',
    DATABASE='sqlite:///app.db',
)

# ── ルーティング ──
@app.route('/')
def index():
    return jsonify({"message": "Hello, World!"})

@app.route('/users', methods=['GET'])
def list_users():
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 10, type=int)
    # ... データベースクエリ
    return jsonify({"users": [], "page": page})

@app.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
    user = find_user(user_id)
    if not user:
        abort(404)
    return jsonify(user)

@app.route('/users', methods=['POST'])
def create_user():
    data = request.get_json()
    if not data or 'name' not in data:
        abort(400, description="Name is required")
    # ... ユーザー作成
    return jsonify({"id": 1, **data}), 201

# ── エラーハンドラ ──
@app.errorhandler(404)
def not_found(error):
    return jsonify({"error": "Not found"}), 404

@app.errorhandler(500)
def internal_error(error):
    return jsonify({"error": "Internal server error"}), 500

# ── Blueprint によるモジュール化 ──
from flask import Blueprint

api_bp = Blueprint('api', __name__, url_prefix='/api/v1')

@api_bp.route('/health')
def health_check():
    return jsonify({"status": "healthy"})

app.register_blueprint(api_bp)

if __name__ == '__main__':
    app.run(debug=True, port=5000)

13. データベースアクセス

13.1 SQLAlchemy — ORM

# pip install "sqlalchemy[asyncio]" asyncpg psycopg2-binary

from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, DateTime, Boolean, Text
from sqlalchemy import select, func, and_, or_
from sqlalchemy.orm import (
    DeclarativeBase, Mapped, mapped_column, relationship,
    Session, sessionmaker, selectinload
)
from datetime import datetime

# ── エンジンとセッション設定 ──
# 同期
engine = create_engine(
    "postgresql+psycopg2://user:password@localhost:5432/mydb",
    echo=True,          # SQL ログを出力
    pool_size=5,         # コネクションプールサイズ
    max_overflow=10,     # 最大追加接続数
    pool_timeout=30,     # 接続待ちタイムアウト
    pool_recycle=3600,   # コネクション再生成間隔(秒)
)

SessionLocal = sessionmaker(bind=engine)

# ── モデル定義(SQLAlchemy 2.0 スタイル) ──
class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    name: Mapped[str] = mapped_column(String(100), nullable=False)
    email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    created_at: Mapped[datetime] = mapped_column(DateTime, default=func.now())

    # リレーション
    posts: Mapped[list["Post"]] = relationship(back_populates="author", cascade="all, delete-orphan")

    def __repr__(self) -> str:
        return f"User(id={self.id}, name={self.name!r})"

class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    title: Mapped[str] = mapped_column(String(200), nullable=False)
    content: Mapped[str] = mapped_column(Text, nullable=False)
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
    published: Mapped[bool] = mapped_column(Boolean, default=False)
    created_at: Mapped[datetime] = mapped_column(DateTime, default=func.now())

    author: Mapped["User"] = relationship(back_populates="posts")

# テーブル作成
Base.metadata.create_all(engine)

# ── CRUD 操作 ──
# 作成
with Session(engine) as session:
    user = User(name="Alice", email="alice@example.com")
    session.add(user)
    session.commit()
    session.refresh(user)
    print(f"Created user: {user.id}")

# 読み取り
with Session(engine) as session:
    # 単一取得
    user = session.get(User, 1)

    # クエリ
    stmt = select(User).where(User.is_active == True).order_by(User.name)
    users = session.scalars(stmt).all()

    # JOIN と Eager Loading
    stmt = (
        select(User)
        .options(selectinload(User.posts))
        .where(User.name.like("A%"))
    )
    users_with_posts = session.scalars(stmt).all()

    # 集計
    stmt = select(func.count(User.id)).where(User.is_active == True)
    count = session.scalar(stmt)

# 更新
with Session(engine) as session:
    user = session.get(User, 1)
    user.name = "Alice Updated"
    session.commit()

# 削除
with Session(engine) as session:
    user = session.get(User, 1)
    session.delete(user)
    session.commit()

# ── 非同期版 ──
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession

async_engine = create_async_engine(
    "postgresql+asyncpg://user:password@localhost:5432/mydb",
    echo=True,
)

AsyncSessionLocal = async_sessionmaker(async_engine, class_=AsyncSession)

async def get_user(user_id: int) -> User | None:
    async with AsyncSessionLocal() as session:
        return await session.get(User, user_id)

async def list_users() -> list[User]:
    async with AsyncSessionLocal() as session:
        stmt = select(User).where(User.is_active == True)
        result = await session.scalars(stmt)
        return result.all()

13.2 Alembic — データベースマイグレーション

# 初期設定
pip install alembic
alembic init migrations

# alembic.ini の設定
# sqlalchemy.url = postgresql://user:password@localhost/mydb
# migrations/env.py(主要設定)
from mypackage.models import Base
target_metadata = Base.metadata

# マイグレーションの作成と実行
# alembic revision --autogenerate -m "Create users table"
# alembic upgrade head
# alembic downgrade -1
# alembic history

13.3 Redis

# pip install redis

import redis
from redis import asyncio as aioredis
import json

# ── 同期クライアント ──
r = redis.Redis(
    host='localhost',
    port=6379,
    db=0,
    decode_responses=True,  # bytes ではなく str を返す
)

# 基本操作
r.set("key", "value", ex=3600)  # 1時間のTTL
value = r.get("key")

# ハッシュ
r.hset("user:1", mapping={"name": "Alice", "age": "30", "email": "alice@example.com"})
user = r.hgetall("user:1")

# リスト(キュー)
r.rpush("queue:tasks", json.dumps({"task": "process", "data": "..."}))
task = r.lpop("queue:tasks")

# セット
r.sadd("online_users", "user1", "user2", "user3")
is_online = r.sismember("online_users", "user1")

# ソート済みセット(ランキング)
r.zadd("leaderboard", {"Alice": 100, "Bob": 85, "Charlie": 92})
top3 = r.zrevrange("leaderboard", 0, 2, withscores=True)

# パイプライン(バッチ操作)
with r.pipeline() as pipe:
    pipe.set("key1", "value1")
    pipe.set("key2", "value2")
    pipe.get("key1")
    results = pipe.execute()

# ── 非同期クライアント ──
async def redis_example():
    r = aioredis.from_url("redis://localhost", decode_responses=True)
    await r.set("key", "value")
    value = await r.get("key")
    await r.aclose()

14. データサイエンスとML エコシステム

14.1 主要ライブラリ概要

# ── NumPy: 数値計算の基盤 ──
import numpy as np

# 配列の作成
arr = np.array([1, 2, 3, 4, 5])
zeros = np.zeros((3, 4))           # 3x4 のゼロ行列
ones = np.ones((2, 3))             # 2x3 の1行列
rand = np.random.randn(3, 3)      # 3x3 の正規分布乱数

# ベクトル演算(ループ不要で高速)
a = np.array([1, 2, 3])
b = np.array([4, 5, 6])
print(a + b)           # [5, 7, 9]
print(a * b)           # [4, 10, 18]
print(np.dot(a, b))    # 32(内積)

# 行列演算
matrix = np.array([[1, 2], [3, 4]])
print(np.linalg.inv(matrix))     # 逆行列
print(np.linalg.det(matrix))     # 行列式
eigenvalues, eigenvectors = np.linalg.eig(matrix)

# ブロードキャスティング
data = np.random.randn(1000, 3)
mean = data.mean(axis=0)          # 列ごとの平均
std = data.std(axis=0)            # 列ごとの標準偏差
normalized = (data - mean) / std   # 正規化

# ── Pandas: データ分析 ──
import pandas as pd

# DataFrame の作成
df = pd.DataFrame({
    'name': ['Alice', 'Bob', 'Charlie', 'Diana'],
    'age': [30, 25, 35, 28],
    'city': ['Tokyo', 'Osaka', 'Tokyo', 'Nagoya'],
    'salary': [500000, 450000, 600000, 480000],
})

# データの読み込み
# df = pd.read_csv('data.csv')
# df = pd.read_json('data.json')
# df = pd.read_excel('data.xlsx')
# df = pd.read_sql('SELECT * FROM users', engine)

# 基本操作
print(df.head())
print(df.describe())
print(df.info())

# フィルタリング
tokyo_people = df[df['city'] == 'Tokyo']
high_salary = df[df['salary'] > 500000]
combined = df[(df['city'] == 'Tokyo') & (df['age'] > 25)]

# グルーピング
city_stats = df.groupby('city').agg({
    'salary': ['mean', 'median', 'std'],
    'age': 'mean',
    'name': 'count',
})

# メソッドチェーン
result = (
    df
    .query('age >= 25')
    .assign(tax=lambda x: x['salary'] * 0.2)
    .sort_values('salary', ascending=False)
    .head(10)
)

# ── Matplotlib / Seaborn: 可視化 ──
import matplotlib.pyplot as plt

fig, axes = plt.subplots(1, 2, figsize=(12, 5))

axes[0].bar(df['name'], df['salary'])
axes[0].set_title('Salary by Person')
axes[0].set_ylabel('Salary (JPY)')

axes[1].scatter(df['age'], df['salary'])
axes[1].set_title('Age vs Salary')
axes[1].set_xlabel('Age')
axes[1].set_ylabel('Salary')

plt.tight_layout()
plt.savefig('report.png', dpi=150)

15. Python のアーキテクチャと内部構造

15.1 CPython のアーキテクチャ

Python プログラムの実行フロー:

┌─────────────┐     ┌──────────────┐     ┌────────────────┐
│ ソースコード  │ ──→ │  コンパイラ    │ ──→ │ バイトコード     │
│  (.py)       │     │ (Parser +    │     │ (.pyc)         │
│              │     │  Compiler)   │     │                │
└─────────────┘     └──────────────┘     └───────┬────────┘
                                                  │
                                                  ▼
                                         ┌────────────────┐
                                         │ Python VM      │
                                         │ (インタープリタ)  │
                                         │ - スタックマシン  │
                                         │ - GIL          │
                                         │ - GC           │
                                         └────────────────┘
# ── バイトコードの確認 ──
import dis

def example(x, y):
    return x + y * 2

dis.dis(example)
# 出力:
#   0 LOAD_FAST                0 (x)
#   2 LOAD_FAST                1 (y)
#   4 LOAD_CONST               1 (2)
#   6 BINARY_MULTIPLY
#   8 BINARY_ADD
#  10 RETURN_VALUE

# コンパイル済みバイトコードの確認
import py_compile
py_compile.compile('example.py')  # __pycache__/example.cpython-312.pyc が生成される

15.2 メモリ管理とガベージコレクション

import sys
import gc

# ── 参照カウント ──
a = [1, 2, 3]
print(sys.getrefcount(a))  # 2(変数 a + getrefcount の引数)

b = a                      # 参照カウント +1
print(sys.getrefcount(a))  # 3

del b                      # 参照カウント -1
print(sys.getrefcount(a))  # 2

# ── ガベージコレクション ──
# 循環参照を検出・回収
gc.get_threshold()       # (700, 10, 10) — GC のしきい値
gc.get_count()           # 現在の世代ごとのオブジェクト数
gc.collect()             # 手動で GC を実行

# GC の統計情報
gc.set_debug(gc.DEBUG_STATS)

# ── メモリ使用量の確認 ──
print(sys.getsizeof([]))         # 56 bytes(空リスト)
print(sys.getsizeof([1, 2, 3]))  # 88 bytes
print(sys.getsizeof({}))         # 64 bytes(空辞書)
print(sys.getsizeof("hello"))    # 54 bytes

# __slots__ によるメモリ最適化
class WithDict:
    def __init__(self, x, y):
        self.x = x
        self.y = y

class WithSlots:
    __slots__ = ('x', 'y')
    def __init__(self, x, y):
        self.x = x
        self.y = y

print(sys.getsizeof(WithDict(1, 2)))    # 48 bytes + __dict__
print(sys.getsizeof(WithSlots(1, 2)))   # 48 bytes(__dict__ なし)

# 大量のインスタンスではメモリ差が顕著
# WithDict: 1M instances ≈ 200MB
# WithSlots: 1M instances ≈ 64MB

15.3 GIL(Global Interpreter Lock)

"""
GIL の仕組み:
- CPython のインタープリタは一度に1つのスレッドしか Python バイトコードを実行しない
- I/O 操作時には GIL を解放するため、I/O バウンドな処理ではスレッドが有効
- CPU バウンドな処理ではマルチスレッドでも並列化されない

Python 3.13+ のフリースレッドモード:
- --disable-gil フラグまたは PYTHON_GIL=0 環境変数で GIL を無効化
- 真のマルチスレッド並列実行が可能に
- まだ実験段階
"""

import time
import threading

def cpu_bound(n):
    """CPU バウンドなタスク"""
    total = 0
    for i in range(n):
        total += i * i
    return total

# シングルスレッド
start = time.perf_counter()
cpu_bound(10_000_000)
cpu_bound(10_000_000)
single_time = time.perf_counter() - start

# マルチスレッド(GIL のため高速化されない)
start = time.perf_counter()
t1 = threading.Thread(target=cpu_bound, args=(10_000_000,))
t2 = threading.Thread(target=cpu_bound, args=(10_000_000,))
t1.start(); t2.start()
t1.join(); t2.join()
multi_time = time.perf_counter() - start

print(f"Single thread: {single_time:.3f}s")
print(f"Multi thread:  {multi_time:.3f}s")
# → ほぼ同じ時間(GIL の制約)

15.4 メタクラスとディスクリプタ

# ── メタクラス ──
class SingletonMeta(type):
    """シングルトンパターンを実装するメタクラス"""
    _instances = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super().__call__(*args, **kwargs)
        return cls._instances[cls]

class Database(metaclass=SingletonMeta):
    def __init__(self, url: str = "default"):
        self.url = url

db1 = Database("postgresql://localhost/db1")
db2 = Database("postgresql://localhost/db2")
assert db1 is db2  # True — 同じインスタンス
print(db1.url)     # postgresql://localhost/db1

# ── ディスクリプタ ──
class Validated:
    """バリデーション付き属性のディスクリプタ"""
    def __init__(self, validator=None):
        self.validator = validator

    def __set_name__(self, owner, name):
        self.name = name
        self.private_name = f"_{name}"

    def __get__(self, obj, objtype=None):
        if obj is None:
            return self
        return getattr(obj, self.private_name, None)

    def __set__(self, obj, value):
        if self.validator:
            self.validator(value)
        setattr(obj, self.private_name, value)

def positive_number(value):
    if not isinstance(value, (int, float)):
        raise TypeError(f"Expected number, got {type(value).__name__}")
    if value <= 0:
        raise ValueError(f"Expected positive number, got {value}")

class Product:
    name = Validated(lambda v: None if isinstance(v, str) and v else (_ for _ in ()).throw(ValueError("Invalid name")))
    price = Validated(positive_number)
    quantity = Validated(positive_number)

    def __init__(self, name: str, price: float, quantity: int):
        self.name = name
        self.price = price
        self.quantity = quantity

product = Product("Widget", 19.99, 100)
# product.price = -5  # ValueError: Expected positive number, got -5

16. パフォーマンス最適化

16.1 プロファイリング

# ── cProfile: 関数レベルのプロファイリング ──
import cProfile
import pstats

def slow_function():
    result = []
    for i in range(100000):
        result.append(i ** 2)
    return sum(result)

# プロファイリング実行
cProfile.run('slow_function()', 'profile_output')

# 結果の分析
stats = pstats.Stats('profile_output')
stats.strip_dirs()
stats.sort_stats('cumulative')
stats.print_stats(10)

# コマンドラインから実行
# python -m cProfile -o output.prof script.py
# python -m pstats output.prof

# ── line_profiler: 行レベルのプロファイリング ──
# pip install line-profiler
# @profile デコレータを付けて:
# kernprof -l -v script.py

# ── timeit: マイクロベンチマーク ──
import timeit

# リスト内包表記 vs ループ
list_comp = timeit.timeit('[x**2 for x in range(1000)]', number=10000)
loop_style = timeit.timeit('''
result = []
for x in range(1000):
    result.append(x**2)
''', number=10000)

print(f"List comp: {list_comp:.4f}s")
print(f"Loop:      {loop_style:.4f}s")
# リスト内包表記の方が約30%高速

16.2 最適化テクニック

# ── 1. 適切なデータ構造の選択 ──
# ルックアップ: list O(n) vs set/dict O(1)
import time

large_list = list(range(1_000_000))
large_set = set(large_list)

# list での検索
start = time.perf_counter()
999_999 in large_list
print(f"List lookup: {time.perf_counter() - start:.6f}s")

# set での検索
start = time.perf_counter()
999_999 in large_set
print(f"Set lookup: {time.perf_counter() - start:.6f}s")
# Set は数千倍高速

# ── 2. ジェネレータによるメモリ節約 ──
# Bad: 全データをメモリに載せる
# all_data = [process(x) for x in range(10_000_000)]

# Good: ジェネレータで逐次処理
# def process_stream(data):
#     for x in data:
#         yield process(x)

# ── 3. 文字列の結合 ──
# Bad: 文字列の連結(O(n^2))
# result = ""
# for word in words:
#     result += word + " "

# Good: join を使用(O(n))
result = " ".join(words)

# ── 4. collections モジュール ──
from collections import defaultdict, Counter, deque

# Counter: 要素の出現回数
words = ["apple", "banana", "apple", "cherry", "banana", "apple"]
counts = Counter(words)
print(counts.most_common(2))  # [('apple', 3), ('banana', 2)]

# defaultdict: キーが存在しない場合にデフォルト値
word_positions = defaultdict(list)
for i, word in enumerate(words):
    word_positions[word].append(i)

# deque: 両端キュー(先頭の追加/削除が O(1))
queue = deque(maxlen=1000)
queue.append("item")
queue.appendleft("priority_item")
oldest = queue.popleft()

# ── 5. itertools によるメモリ効率的な処理 ──
from itertools import islice, chain, groupby, product, combinations

# 大量のデータから最初の N 件だけ取得
first_10 = list(islice(range(1_000_000), 10))

# 複数のイテラブルを連結
combined = chain(range(5), range(10, 15))

17. セキュリティ

17.1 セキュリティのベストプラクティス

# ── 入力バリデーション ──
import re
from html import escape

def sanitize_input(user_input: str) -> str:
    """ユーザー入力のサニタイズ"""
    # HTML エスケープ
    sanitized = escape(user_input)
    # 制御文字の除去
    sanitized = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', sanitized)
    return sanitized

# ── パスワードのハッシュ化 ──
import hashlib
import secrets

def hash_password(password: str) -> str:
    """bcrypt/scrypt/argon2 を推奨。最低限 PBKDF2 を使用"""
    salt = secrets.token_hex(32)
    hashed = hashlib.pbkdf2_hmac(
        'sha256',
        password.encode('utf-8'),
        salt.encode('utf-8'),
        iterations=600_000  # NIST 推奨
    )
    return f"{salt}:{hashed.hex()}"

def verify_password(password: str, stored_hash: str) -> bool:
    salt, hash_hex = stored_hash.split(':')
    hashed = hashlib.pbkdf2_hmac(
        'sha256',
        password.encode('utf-8'),
        salt.encode('utf-8'),
        iterations=600_000
    )
    return secrets.compare_digest(hashed.hex(), hash_hex)

# ── 推奨: bcrypt または argon2 ──
# pip install bcrypt
import bcrypt

def hash_password_bcrypt(password: str) -> str:
    return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()

def verify_password_bcrypt(password: str, hashed: str) -> bool:
    return bcrypt.checkpw(password.encode(), hashed.encode())

# ── シークレット管理 ──
import os

# 環境変数から取得(ハードコーディング禁止)
DATABASE_URL = os.environ["DATABASE_URL"]
API_KEY = os.environ["API_KEY"]

# セキュアなトークン生成
token = secrets.token_urlsafe(32)   # URL セーフなランダムトークン
otp = secrets.randbelow(1_000_000)  # 6桁の OTP

# ── SQL インジェクション対策 ──
# Bad: 文字列フォーマットでの SQL 構築
# cursor.execute(f"SELECT * FROM users WHERE name = '{user_input}'")

# Good: パラメータ化クエリ
# cursor.execute("SELECT * FROM users WHERE name = %s", (user_input,))
# SQLAlchemy の場合は自動的にパラメータ化される

# ── 安全な一時ファイル ──
import tempfile

with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=True) as f:
    f.write("sensitive data")
    # ファイルは with ブロック終了時に自動削除

# ── 依存関係のセキュリティチェック ──
# pip install pip-audit
# pip-audit                    # 脆弱性スキャン
# pip install safety
# safety check                 # 既知の脆弱性をチェック

17.2 暗号化

# pip install cryptography

from cryptography.fernet import Fernet

# ── 対称鍵暗号化 ──
# 鍵の生成
key = Fernet.generate_key()
cipher = Fernet(key)

# 暗号化と復号化
message = b"Secret message"
encrypted = cipher.encrypt(message)
decrypted = cipher.decrypt(encrypted)

assert decrypted == message

# ── JWT トークン ──
# pip install PyJWT
import jwt
from datetime import datetime, timedelta, timezone

SECRET_KEY = "your-secret-key"

def create_token(user_id: int) -> str:
    payload = {
        "user_id": user_id,
        "exp": datetime.now(timezone.utc) + timedelta(hours=24),
        "iat": datetime.now(timezone.utc),
    }
    return jwt.encode(payload, SECRET_KEY, algorithm="HS256")

def verify_token(token: str) -> dict:
    try:
        return jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
    except jwt.ExpiredSignatureError:
        raise ValueError("Token has expired")
    except jwt.InvalidTokenError:
        raise ValueError("Invalid token")

18. デプロイメントとDevOps

18.1 Docker

# ── マルチステージビルド ──
# Dockerfile

# ── ステージ 1: ビルド ──
FROM python:3.12-slim AS builder

WORKDIR /app

# 依存関係のインストール(キャッシュ効率化のため先にコピー)
COPY pyproject.toml .
RUN pip install --no-cache-dir --prefix=/install .

# ── ステージ 2: ランタイム ──
FROM python:3.12-slim AS runtime

# セキュリティ: root 以外のユーザーで実行
RUN groupadd -r appuser && useradd -r -g appuser appuser

WORKDIR /app

# ビルドステージからパッケージをコピー
COPY --from=builder /install /usr/local

# アプリケーションコードをコピー
COPY src/ ./src/

# 環境変数の設定
ENV PYTHONUNBUFFERED=1 \
    PYTHONDONTWRITEBYTECODE=1 \
    PYTHONPATH=/app/src

# ヘルスチェック
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
    CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"

USER appuser

EXPOSE 8000

CMD ["python", "-m", "uvicorn", "mypackage.main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
services:
  app:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://postgres:password@db:5432/mydb
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_started
    restart: unless-stopped

  db:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: mydb
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
    volumes:
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 5s
      timeout: 5s
      retries: 5

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data

volumes:
  postgres_data:
  redis_data:

18.2 CI/CD(GitHub Actions)

# .github/workflows/ci.yml
name: CI

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        python-version: ["3.10", "3.11", "3.12"]

    services:
      postgres:
        image: postgres:16
        env:
          POSTGRES_DB: test_db
          POSTGRES_USER: postgres
          POSTGRES_PASSWORD: password
        ports:
          - 5432:5432
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python ${{ matrix.python-version }}
        uses: actions/setup-python@v5
        with:
          python-version: ${{ matrix.python-version }}

      - name: Install uv
        uses: astral-sh/setup-uv@v3

      - name: Install dependencies
        run: |
          uv venv
          uv pip install -e ".[dev]"

      - name: Lint with ruff
        run: |
          uv run ruff check src/ tests/
          uv run ruff format --check src/ tests/

      - name: Type check with mypy
        run: uv run mypy src/

      - name: Run tests
        env:
          DATABASE_URL: postgresql://postgres:password@localhost:5432/test_db
        run: uv run pytest --cov --cov-report=xml

      - name: Upload coverage
        uses: codecov/codecov-action@v4
        with:
          file: ./coverage.xml

18.3 コードフォーマットとリンティング

# pyproject.toml — Ruff(高速な Python リンター兼フォーマッター)
[tool.ruff]
target-version = "py310"
line-length = 100

[tool.ruff.lint]
select = [
    "E",    # pycodestyle errors
    "W",    # pycodestyle warnings
    "F",    # Pyflakes
    "I",    # isort
    "N",    # pep8-naming
    "UP",   # pyupgrade
    "B",    # flake8-bugbear
    "A",    # flake8-builtins
    "SIM",  # flake8-simplify
    "TCH",  # flake8-type-checking
    "RUF",  # Ruff-specific rules
]
ignore = [
    "E501",  # line too long (format で対応)
]

[tool.ruff.lint.isort]
known-first-party = ["mypackage"]

[tool.ruff.format]
quote-style = "double"
indent-style = "space"
# Ruff の使用
ruff check src/                # リンティング
ruff check --fix src/          # 自動修正
ruff format src/               # フォーマット

# pre-commit の設定
# pip install pre-commit
# .pre-commit-config.yaml
repos:
  - repo: https://github.com/astral-sh/ruff-pre-commit
    rev: v0.6.0
    hooks:
      - id: ruff
        args: [--fix]
      - id: ruff-format

  - repo: https://github.com/pre-commit/mirrors-mypy
    rev: v1.11.0
    hooks:
      - id: mypy
        additional_dependencies: [pydantic>=2.0]
# 初期設定と実行
pre-commit install               # Git フックにインストール
pre-commit run --all-files       # 全ファイルに対して実行

19. 高度なトピック

19.1 デザインパターン

# ── Strategy パターン ──
from abc import ABC, abstractmethod
from dataclasses import dataclass

class CompressionStrategy(ABC):
    @abstractmethod
    def compress(self, data: bytes) -> bytes: ...

    @abstractmethod
    def decompress(self, data: bytes) -> bytes: ...

class GzipStrategy(CompressionStrategy):
    def compress(self, data: bytes) -> bytes:
        import gzip
        return gzip.compress(data)

    def decompress(self, data: bytes) -> bytes:
        import gzip
        return gzip.decompress(data)

class LZ4Strategy(CompressionStrategy):
    def compress(self, data: bytes) -> bytes:
        import lz4.frame
        return lz4.frame.compress(data)

    def decompress(self, data: bytes) -> bytes:
        import lz4.frame
        return lz4.frame.decompress(data)

@dataclass
class FileProcessor:
    strategy: CompressionStrategy

    def save(self, data: bytes, path: str) -> None:
        compressed = self.strategy.compress(data)
        with open(path, 'wb') as f:
            f.write(compressed)

    def load(self, path: str) -> bytes:
        with open(path, 'rb') as f:
            compressed = f.read()
        return self.strategy.decompress(compressed)

# 使用例
processor = FileProcessor(strategy=GzipStrategy())
processor.save(b"Hello, World!", "data.gz")

# ── Observer パターン ──
from typing import Callable

class EventEmitter:
    def __init__(self):
        self._listeners: dict[str, list[Callable]] = {}

    def on(self, event: str, callback: Callable) -> None:
        self._listeners.setdefault(event, []).append(callback)

    def emit(self, event: str, *args, **kwargs) -> None:
        for callback in self._listeners.get(event, []):
            callback(*args, **kwargs)

    def off(self, event: str, callback: Callable) -> None:
        if event in self._listeners:
            self._listeners[event].remove(callback)

# 使用例
emitter = EventEmitter()
emitter.on("user.created", lambda user: print(f"New user: {user}"))
emitter.on("user.created", lambda user: print(f"Sending welcome email to {user}"))
emitter.emit("user.created", "Alice")

# ── Repository パターン ──
from abc import ABC, abstractmethod
from typing import TypeVar, Generic

T = TypeVar('T')

class Repository(ABC, Generic[T]):
    @abstractmethod
    async def get(self, id: int) -> T | None: ...

    @abstractmethod
    async def list(self, offset: int = 0, limit: int = 100) -> list[T]: ...

    @abstractmethod
    async def create(self, entity: T) -> T: ...

    @abstractmethod
    async def update(self, entity: T) -> T: ...

    @abstractmethod
    async def delete(self, id: int) -> bool: ...

class SQLAlchemyUserRepository(Repository):
    def __init__(self, session):
        self.session = session

    async def get(self, id: int):
        return await self.session.get(User, id)

    async def list(self, offset: int = 0, limit: int = 100):
        stmt = select(User).offset(offset).limit(limit)
        result = await self.session.scalars(stmt)
        return result.all()

    async def create(self, entity):
        self.session.add(entity)
        await self.session.commit()
        return entity

    async def update(self, entity):
        await self.session.merge(entity)
        await self.session.commit()
        return entity

    async def delete(self, id: int):
        user = await self.get(id)
        if user:
            await self.session.delete(user)
            await self.session.commit()
            return True
        return False

19.2 CLI ツール開発

# ── Click(推奨) ──
# pip install click

import click

@click.group()
@click.version_option(version="1.0.0")
def cli():
    """My CLI Application"""
    pass

@cli.command()
@click.argument('name')
@click.option('--greeting', '-g', default='Hello', help='Greeting word')
@click.option('--count', '-c', default=1, help='Number of times to greet')
@click.option('--verbose', '-v', is_flag=True, help='Enable verbose mode')
def greet(name: str, greeting: str, count: int, verbose: bool):
    """Greet someone by name"""
    for _ in range(count):
        message = f"{greeting}, {name}!"
        click.echo(click.style(message, fg='green'))
    if verbose:
        click.echo(f"Greeted {name} {count} time(s)")

@cli.command()
@click.option('--format', 'fmt', type=click.Choice(['json', 'yaml', 'table']), default='table')
@click.option('--output', '-o', type=click.Path(), help='Output file path')
def export(fmt: str, output: str | None):
    """Export data in various formats"""
    data = {"users": [{"name": "Alice"}, {"name": "Bob"}]}

    if fmt == 'json':
        import json
        result = json.dumps(data, indent=2)
    elif fmt == 'yaml':
        import yaml
        result = yaml.dump(data)
    else:
        result = "Name\n----\nAlice\nBob"

    if output:
        with open(output, 'w') as f:
            f.write(result)
        click.echo(f"Exported to {output}")
    else:
        click.echo(result)

@cli.command()
@click.confirmation_option(prompt='Are you sure you want to reset?')
def reset():
    """Reset all data (with confirmation)"""
    click.echo("Data has been reset")

if __name__ == '__main__':
    cli()

# 使用例:
# python cli.py greet Alice --greeting Hi --count 3
# python cli.py export --format json -o output.json
# python cli.py reset

# ── argparse(標準ライブラリ) ──
import argparse

parser = argparse.ArgumentParser(description="My Tool")
subparsers = parser.add_subparsers(dest="command")

# サブコマンド: init
init_parser = subparsers.add_parser("init", help="Initialize project")
init_parser.add_argument("name", help="Project name")
init_parser.add_argument("--template", "-t", default="default", choices=["default", "minimal", "full"])

# サブコマンド: run
run_parser = subparsers.add_parser("run", help="Run the project")
run_parser.add_argument("--port", "-p", type=int, default=8000)
run_parser.add_argument("--debug", action="store_true")

args = parser.parse_args()

19.3 Python 3.12+ の新機能

# ── Python 3.12: 新しいジェネリクス構文 ──
# 従来
from typing import TypeVar, Generic
T = TypeVar('T')

class OldBox(Generic[T]):
    def __init__(self, content: T) -> None:
        self.content = content

# Python 3.12+
class Box[T]:
    def __init__(self, content: T) -> None:
        self.content = content

def first[T](items: list[T]) -> T:
    return items[0]

type Point = tuple[float, float]
type Matrix[T] = list[list[T]]

# ── Python 3.12: f-string の制限緩和 ──
# ネストされた f-string が可能に
names = ["Alice", "Bob", "Charlie"]
print(f"Users: {", ".join(f"{name!r}" for name in names)}")

# 複数行の式が使えるように
result = f"Total: {
    sum(
        x ** 2
        for x in range(10)
    )
}"

# ── Python 3.13: 実験的フリースレッドモード ──
# python3.13t script.py  # フリースレッドビルド
# PYTHON_GIL=0 python3.13t script.py  # GIL を無効化

# ── Python 3.13: 実験的 JIT コンパイラ ──
# PYTHON_JIT=1 python3.13 script.py  # JIT を有効化

20. まとめとベストプラクティス

20.1 Python プロジェクトのベストプラクティス一覧

カテゴリベストプラクティス
プロジェクト構造src/ レイアウト + pyproject.toml
パッケージ管理uv または poetry で依存関係をロック
コードスタイルruff でリンティング&フォーマット
型チェックmypy --strict で静的解析
テストpytest + カバレッジ 80% 以上
ログstructlog で構造化ロギング
セキュリティpip-audit で脆弱性チェック
CI/CDGitHub Actions + pre-commit
Dockerマルチステージビルド + 非 root ユーザー
データバリデーションPydantic でスキーマを定義

20.2 パフォーマンスの考え方

1. まず正しく動くコードを書く
2. プロファイリングでボトルネックを特定する
3. 適切なデータ構造を選ぶ(set/dict のルックアップは O(1))
4. 組み込み関数を活用する(map, filter, sorted など)
5. ジェネレータでメモリ使用量を抑える
6. I/O バウンド → asyncio / threading
7. CPU バウンド → multiprocessing
8. それでも足りなければ → Cython, Rust 拡張, NumPy

20.3 推奨ツールチェーン(2024–2025)

パッケージ管理:     uv(高速)/ poetry(安定)
リンター:          ruff
フォーマッター:     ruff format
型チェッカー:       mypy / pyright
テスト:            pytest + pytest-cov
セキュリティ:       pip-audit + bandit
ドキュメント:       mkdocs-material / Sphinx
タスクランナー:     Makefile / just / nox
コンテナ:          Docker + docker-compose
CI:               GitHub Actions

本記事は Python 3.10 〜 3.13 を対象として執筆されています。Python は活発に開発が続けられており、最新の変更については公式ドキュメント(https://docs.python.org)を参照してください。