Python
Python 完全概要ガイド
Python の言語仕様・アーキテクチャ・主要エコシステムを設定例とともに包括的に解説する
1. はじめに — Python とは何か
1.1 Python の歴史と哲学
Python は 1991 年にオランダのプログラマー Guido van Rossum によって公開された汎用プログラミング言語である。名前はイギリスのコメディ番組「Monty Python's Flying Circus」に由来する。
Python の設計哲学は「The Zen of Python」(PEP 20)に凝縮されている:
import this
# 主要な原則:
# Beautiful is better than ugly.(美しいことは醜いことより良い)
# Explicit is better than implicit.(明示的であることは暗黙的であることより良い)
# Simple is better than complex.(単純であることは複雑であることより良い)
# Complex is better than complicated.(複雑であることは込み入っていることより良い)
# Readability counts.(読みやすさは重要である)
1.2 Python のバージョン変遷
| バージョン | リリース年 | 主な特徴 |
|---|---|---|
| Python 1.0 | 1994 | 初の正式リリース |
| Python 2.0 | 2000 | リスト内包表記、GC導入 |
| Python 2.7 | 2010 | Python 2系最終版(2020年EOL) |
| Python 3.0 | 2008 | Unicode標準化、print関数化 |
| Python 3.6 | 2016 | f-string、型ヒント強化 |
| Python 3.8 | 2019 | walrus演算子(:=) |
| Python 3.10 | 2021 | 構造的パターンマッチング |
| Python 3.11 | 2022 | 大幅な速度改善(10-60%高速化) |
| Python 3.12 | 2023 | Per-interpreter GIL、f-string改善 |
| Python 3.13 | 2024 | 実験的フリースレッドモード、JITコンパイラ |
1.3 Python の位置づけと用途
Python は以下の分野で広く利用されている:
- Web 開発: Django, Flask, FastAPI
- データサイエンス・機械学習: NumPy, Pandas, scikit-learn, TensorFlow, PyTorch
- 自動化・スクリプティング: システム管理、タスク自動化
- 科学技術計算: SciPy, Matplotlib
- DevOps / インフラ: Ansible, SaltStack
- 組み込み・IoT: MicroPython, CircuitPython
1.4 Python のインストールと環境構築
# macOS(Homebrew)
brew install python@3.12
# Ubuntu/Debian
sudo apt update
sudo apt install python3.12 python3.12-venv python3.12-dev
# pyenv によるバージョン管理(推奨)
curl https://pyenv.run | bash
# ~/.bashrc or ~/.zshrc に追加
export PYENV_ROOT="$HOME/.pyenv"
[[ -d $PYENV_ROOT/bin ]] && export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init -)"
# Python バージョンのインストールと切り替え
pyenv install 3.12.4
pyenv global 3.12.4
pyenv local 3.12.4 # プロジェクト単位で設定
# バージョン確認
python --version
# Python 3.12.4
2. 言語の基本構造
2.1 動的型付けとダックタイピング
Python は動的型付け言語であり、変数の型は実行時に決定される。
# 動的型付け — 同じ変数に異なる型の値を代入可能
x = 42 # int
x = "hello" # str
x = [1, 2, 3] # list
# ダックタイピング: "If it walks like a duck and quacks like a duck, it's a duck"
class Duck:
def quack(self):
return "Quack!"
class Person:
def quack(self):
return "I can quack too!"
def make_it_quack(thing):
"""thing の型は問わない — quack() メソッドがあれば良い"""
print(thing.quack())
make_it_quack(Duck()) # Quack!
make_it_quack(Person()) # I can quack too!
2.2 基本データ型
# ── 数値型 ──
integer_val = 42 # int(任意精度整数)
float_val = 3.14 # float(倍精度浮動小数点数)
complex_val = 3 + 4j # complex(複素数)
big_int = 10 ** 100 # Python の int はオーバーフローしない
# 数値の便利な記法
million = 1_000_000 # アンダースコアで桁区切り
binary = 0b1010 # 2進数 → 10
octal = 0o17 # 8進数 → 15
hexadecimal = 0xFF # 16進数 → 255
# ── 文字列型 ──
single = 'Hello'
double = "World"
multi_line = """
これは
複数行の
文字列です
"""
# f-string(フォーマット済み文字列リテラル)
name = "Python"
version = 3.12
print(f"{name} {version}") # Python 3.12
print(f"{version:.1f}") # 3.1
print(f"{'center':^20}") # " center "
print(f"{1_000_000:,}") # 1,000,000
print(f"{0.85:.0%}") # 85%
# raw 文字列
path = r"C:\Users\name\documents" # バックスラッシュをエスケープしない
# ── ブール型 ──
is_active = True
is_deleted = False
# 以下は Falsy(False として評価される)
# None, 0, 0.0, "", [], {}, set(), frozenset()
# ── None 型 ──
result = None
if result is None: # None の比較は is を使う(== ではなく)
print("No result")
2.3 コレクション型
# ── リスト(list) — 可変の順序付きコレクション ──
fruits = ["apple", "banana", "cherry"]
fruits.append("date") # 末尾に追加
fruits.insert(1, "avocado") # 位置指定で挿入
fruits.extend(["elderberry"]) # 複数要素を追加
removed = fruits.pop(2) # インデックス指定で取り出し
fruits.sort() # インプレースでソート
sorted_fruits = sorted(fruits) # 新しいリストを返す
# スライス
numbers = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
print(numbers[2:5]) # [2, 3, 4]
print(numbers[::2]) # [0, 2, 4, 6, 8](ステップ2)
print(numbers[::-1]) # [9, 8, 7, ..., 0](逆順)
# リスト内包表記
squares = [x ** 2 for x in range(10)]
evens = [x for x in range(20) if x % 2 == 0]
matrix = [[i * j for j in range(5)] for i in range(5)]
# ── タプル(tuple) — 不変の順序付きコレクション ──
point = (3, 4)
x, y = point # アンパック
single_element = (42,) # 1要素タプルにはカンマが必要
# 名前付きタプル
from collections import namedtuple
Point = namedtuple('Point', ['x', 'y'])
p = Point(3, 4)
print(p.x, p.y) # 3 4
# ── 辞書(dict) — キーと値のマッピング ──
config = {
"host": "localhost",
"port": 8080,
"debug": True,
}
config["timeout"] = 30 # 追加/更新
port = config.get("port", 3000) # デフォルト値付き取得
config.setdefault("retries", 3) # キーがなければ設定
# 辞書内包表記
squared = {x: x ** 2 for x in range(6)}
# {0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
# 辞書のマージ(Python 3.9+)
defaults = {"color": "blue", "size": "medium"}
overrides = {"size": "large", "weight": "heavy"}
merged = defaults | overrides
# {"color": "blue", "size": "large", "weight": "heavy"}
# ── 集合(set) — 一意な要素のコレクション ──
unique_nums = {1, 2, 3, 4, 5}
unique_nums.add(6)
unique_nums.discard(3) # 存在しなくてもエラーにならない
a = {1, 2, 3, 4}
b = {3, 4, 5, 6}
print(a | b) # 和集合: {1, 2, 3, 4, 5, 6}
print(a & b) # 積集合: {3, 4}
print(a - b) # 差集合: {1, 2}
print(a ^ b) # 対称差: {1, 2, 5, 6}
# ── frozenset — 不変の集合(辞書のキーに使える) ──
immutable_set = frozenset([1, 2, 3])
2.4 制御フロー
# ── 条件分岐 ──
score = 85
if score >= 90:
grade = "A"
elif score >= 80:
grade = "B"
elif score >= 70:
grade = "C"
else:
grade = "F"
# 三項演算子(条件式)
status = "adult" if age >= 18 else "minor"
# ── 構造的パターンマッチング(Python 3.10+) ──
command = {"action": "move", "direction": "north", "speed": 5}
match command:
case {"action": "quit"}:
print("Quitting")
case {"action": "move", "direction": direction, "speed": speed} if speed > 0:
print(f"Moving {direction} at speed {speed}")
case {"action": "move", "direction": direction}:
print(f"Moving {direction} at default speed")
case _:
print("Unknown command")
# パターンマッチングの高度な例
def process_response(response):
match response:
case {"status": 200, "body": body}:
return f"Success: {body}"
case {"status": 404}:
return "Not found"
case {"status": status} if 500 <= status < 600:
return f"Server error: {status}"
case {"status": status}:
return f"Other status: {status}"
# ── ループ ──
# for ループ
for i in range(5):
print(i)
for key, value in config.items():
print(f"{key} = {value}")
for index, item in enumerate(fruits, start=1):
print(f"{index}. {item}")
# zip で複数のイテラブルを同時に走査
names = ["Alice", "Bob", "Charlie"]
scores = [85, 92, 78]
for name, score in zip(names, scores):
print(f"{name}: {score}")
# while ループ
count = 0
while count < 10:
if count == 5:
break # ループを中断
if count % 2 == 0:
count += 1
continue # 次のイテレーションへ
print(count)
count += 1
else:
# break されなかった場合に実行される
print("Loop completed normally")
# ── walrus 演算子(Python 3.8+) ──
# 代入と評価を同時に行う
import re
if m := re.match(r'(\d+)-(\d+)', '123-456'):
start, end = m.groups()
print(f"Range: {start} to {end}")
# リスト内包表記との組み合わせ
data = [1, 5, 3, 8, 2, 9, 4]
filtered = [y for x in data if (y := x * 2) > 6]
# [8, 16, 18]
3. 関数とデコレータ
3.1 関数定義の基本
# 基本的な関数定義
def greet(name: str, greeting: str = "Hello") -> str:
"""挨拶メッセージを生成する。
Args:
name: 挨拶する相手の名前
greeting: 挨拶の言葉(デフォルト: "Hello")
Returns:
フォーマットされた挨拶文字列
"""
return f"{greeting}, {name}!"
# 呼び出し
print(greet("Alice")) # Hello, Alice!
print(greet("Bob", "Hi")) # Hi, Bob!
print(greet(greeting="Hey", name="Charlie")) # キーワード引数
3.2 引数の種類
# ── 可変長引数 ──
def sum_all(*args: int) -> int:
"""任意の数の整数を受け取って合計を返す"""
return sum(args)
print(sum_all(1, 2, 3, 4, 5)) # 15
# ── 可変長キーワード引数 ──
def build_profile(**kwargs: str) -> dict:
"""キーワード引数から辞書を構築する"""
return {k: v for k, v in kwargs.items()}
profile = build_profile(name="Alice", role="Engineer", team="Backend")
# ── 位置専用引数とキーワード専用引数(Python 3.8+) ──
def search(query: str, /, *, max_results: int = 10, sort: str = "relevance") -> list:
"""
query: 位置専用引数(/ の前)— キーワード指定不可
max_results, sort: キーワード専用引数(* の後)— 位置指定不可
"""
print(f"Searching '{query}' (max={max_results}, sort={sort})")
return []
search("python") # OK
search("python", max_results=5, sort="date") # OK
# search(query="python") # TypeError: 位置専用引数
# search("python", 5) # TypeError: キーワード専用引数
# ── 引数のアンパック ──
def point_distance(x1, y1, x2, y2):
return ((x2 - x1) ** 2 + (y2 - y1) ** 2) ** 0.5
coords = [0, 0, 3, 4]
print(point_distance(*coords)) # 5.0(リストをアンパック)
params = {"x1": 0, "y1": 0, "x2": 3, "y2": 4}
print(point_distance(**params)) # 5.0(辞書をアンパック)
3.3 ラムダ式と高階関数
# ラムダ式(無名関数)
square = lambda x: x ** 2
add = lambda x, y: x + y
# 高階関数
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# map — 各要素に関数を適用
squared = list(map(lambda x: x ** 2, numbers))
# filter — 条件に合う要素を抽出
evens = list(filter(lambda x: x % 2 == 0, numbers))
# reduce — 要素を累積的に結合
from functools import reduce
total = reduce(lambda acc, x: acc + x, numbers, 0)
# sorted のカスタムキー
students = [
{"name": "Alice", "grade": 85},
{"name": "Bob", "grade": 92},
{"name": "Charlie", "grade": 78},
]
sorted_students = sorted(students, key=lambda s: s["grade"], reverse=True)
3.4 デコレータ
import functools
import time
from typing import Callable, TypeVar, ParamSpec
P = ParamSpec('P')
T = TypeVar('T')
# ── 基本的なデコレータ ──
def timer(func: Callable[P, T]) -> Callable[P, T]:
"""関数の実行時間を計測するデコレータ"""
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f"{func.__name__} took {elapsed:.4f}s")
return result
return wrapper
@timer
def slow_function():
time.sleep(1)
return "done"
# ── 引数付きデコレータ ──
def retry(max_attempts: int = 3, delay: float = 1.0):
"""リトライ機能を提供するデコレータ"""
def decorator(func: Callable[P, T]) -> Callable[P, T]:
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
last_exception = None
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
print(f"Attempt {attempt}/{max_attempts} failed: {e}")
if attempt < max_attempts:
time.sleep(delay)
raise last_exception
return wrapper
return decorator
@retry(max_attempts=3, delay=0.5)
def fetch_data(url: str) -> dict:
"""外部APIからデータを取得する(リトライ付き)"""
import urllib.request
with urllib.request.urlopen(url) as response:
return response.read()
# ── キャッシュデコレータ(標準ライブラリ) ──
from functools import lru_cache, cache
@lru_cache(maxsize=128)
def fibonacci(n: int) -> int:
"""フィボナッチ数列(メモ化で高速化)"""
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
@cache # Python 3.9+: maxsize=None の lru_cache と同等
def factorial(n: int) -> int:
if n <= 1:
return 1
return n * factorial(n - 1)
# ── クラスベースのデコレータ ──
class CountCalls:
"""関数の呼び出し回数を記録するデコレータ"""
def __init__(self, func):
functools.update_wrapper(self, func)
self.func = func
self.count = 0
def __call__(self, *args, **kwargs):
self.count += 1
print(f"{self.func.__name__} has been called {self.count} time(s)")
return self.func(*args, **kwargs)
@CountCalls
def say_hello(name: str):
print(f"Hello, {name}!")
say_hello("Alice") # say_hello has been called 1 time(s) → Hello, Alice!
say_hello("Bob") # say_hello has been called 2 time(s) → Hello, Bob!
# ── デコレータの積み重ね ──
@timer
@retry(max_attempts=2)
def process_data(data):
"""複数のデコレータを適用(下から上に適用される)"""
return data
3.5 ジェネレータ
# ── ジェネレータ関数 ──
def fibonacci_gen(limit: int):
"""フィボナッチ数列をジェネレータとして生成"""
a, b = 0, 1
while a < limit:
yield a
a, b = b, a + b
for num in fibonacci_gen(100):
print(num, end=" ")
# 0 1 1 2 3 5 8 13 21 34 55 89
# ── ジェネレータ式 ──
# リスト内包表記のメモリ効率版
sum_of_squares = sum(x ** 2 for x in range(1_000_000))
# ── yield from によるサブジェネレータ委譲 ──
def chain(*iterables):
for iterable in iterables:
yield from iterable
list(chain([1, 2], [3, 4], [5, 6]))
# [1, 2, 3, 4, 5, 6]
# ── ジェネレータを使った大容量ファイル処理 ──
def read_large_file(file_path: str, chunk_size: int = 8192):
"""大容量ファイルをチャンク単位で読み込むジェネレータ"""
with open(file_path, 'rb') as f:
while chunk := f.read(chunk_size):
yield chunk
# メモリ効率的にファイルを処理
# for chunk in read_large_file("/path/to/large/file"):
# process(chunk)
4. オブジェクト指向プログラミング
4.1 クラスの基本
class Animal:
"""動物を表す基底クラス"""
# クラス変数(全インスタンスで共有)
kingdom = "Animalia"
_count = 0
def __init__(self, name: str, species: str, age: int = 0):
"""コンストラクタ"""
self.name = name # パブリック属性
self.species = species
self._age = age # 慣習的にプライベート(アクセスは可能)
self.__id = id(self) # 名前マングリング(_Animal__id)
Animal._count += 1
def speak(self) -> str:
"""サブクラスでオーバーライドされることを意図"""
return f"{self.name} makes a sound"
@property
def age(self) -> int:
"""age のゲッター(プロパティ)"""
return self._age
@age.setter
def age(self, value: int):
"""age のセッター(バリデーション付き)"""
if value < 0:
raise ValueError("Age cannot be negative")
self._age = value
@classmethod
def get_count(cls) -> int:
"""クラスメソッド: クラス自体にアクセス"""
return cls._count
@staticmethod
def is_valid_name(name: str) -> bool:
"""静的メソッド: クラスやインスタンスに依存しないユーティリティ"""
return bool(name) and name[0].isupper()
def __repr__(self) -> str:
"""開発者向けの文字列表現"""
return f"Animal(name={self.name!r}, species={self.species!r}, age={self._age})"
def __str__(self) -> str:
"""ユーザー向けの文字列表現"""
return f"{self.name} ({self.species})"
def __eq__(self, other) -> bool:
if not isinstance(other, Animal):
return NotImplemented
return self.name == other.name and self.species == other.species
def __hash__(self) -> int:
return hash((self.name, self.species))
4.2 継承とポリモーフィズム
class Dog(Animal):
"""Animal を継承した Dog クラス"""
def __init__(self, name: str, breed: str, age: int = 0):
super().__init__(name, species="Canis lupus familiaris", age=age)
self.breed = breed
def speak(self) -> str:
return f"{self.name} says Woof!"
def fetch(self, item: str) -> str:
return f"{self.name} fetches the {item}"
class Cat(Animal):
def __init__(self, name: str, indoor: bool = True, age: int = 0):
super().__init__(name, species="Felis catus", age=age)
self.indoor = indoor
def speak(self) -> str:
return f"{self.name} says Meow!"
# ポリモーフィズム
animals: list[Animal] = [
Dog("Rex", "German Shepherd", 5),
Cat("Whiskers", indoor=True, age=3),
Dog("Buddy", "Golden Retriever", 2),
]
for animal in animals:
print(animal.speak()) # 各サブクラスの speak() が呼ばれる
# 多重継承と MRO(Method Resolution Order)
class FlyingAnimal:
def move(self) -> str:
return "Flying"
class SwimmingAnimal:
def move(self) -> str:
return "Swimming"
class Duck(FlyingAnimal, SwimmingAnimal, Animal):
def __init__(self):
super().__init__("Donald", "Anas platyrhynchos")
def speak(self) -> str:
return f"{self.name} says Quack!"
# MRO の確認
print(Duck.__mro__)
# (Duck, FlyingAnimal, SwimmingAnimal, Animal, object)
duck = Duck()
print(duck.move()) # "Flying"(MRO で FlyingAnimal が先)
4.3 抽象基底クラス
from abc import ABC, abstractmethod
class Shape(ABC):
"""図形の抽象基底クラス"""
@abstractmethod
def area(self) -> float:
"""面積を計算する(サブクラスで実装必須)"""
...
@abstractmethod
def perimeter(self) -> float:
"""周長を計算する"""
...
def describe(self) -> str:
"""具象メソッド(共通の実装)"""
return f"{self.__class__.__name__}: area={self.area():.2f}, perimeter={self.perimeter():.2f}"
class Circle(Shape):
def __init__(self, radius: float):
self.radius = radius
def area(self) -> float:
import math
return math.pi * self.radius ** 2
def perimeter(self) -> float:
import math
return 2 * math.pi * self.radius
class Rectangle(Shape):
def __init__(self, width: float, height: float):
self.width = width
self.height = height
def area(self) -> float:
return self.width * self.height
def perimeter(self) -> float:
return 2 * (self.width + self.height)
# shape = Shape() # TypeError: 抽象クラスはインスタンス化できない
circle = Circle(5)
print(circle.describe()) # Circle: area=78.54, perimeter=31.42
4.4 データクラスとスロット
from dataclasses import dataclass, field, asdict, astuple
from typing import ClassVar
# ── データクラス(Python 3.7+) ──
@dataclass
class User:
"""ユーザー情報を保持するデータクラス"""
name: str
email: str
age: int
tags: list[str] = field(default_factory=list) # ミュータブルなデフォルト値
_id: int = field(init=False, repr=False) # 初期化引数に含めない
MAX_AGE: ClassVar[int] = 150 # クラス変数
def __post_init__(self):
"""__init__ の後に呼ばれるフック"""
self._id = hash((self.name, self.email))
if self.age < 0 or self.age > self.MAX_AGE:
raise ValueError(f"Invalid age: {self.age}")
user = User("Alice", "alice@example.com", 30, ["admin", "developer"])
print(user) # User(name='Alice', email='alice@example.com', age=30, tags=['admin', 'developer'])
# 辞書/タプルへの変換
print(asdict(user))
print(astuple(user))
# ── イミュータブルなデータクラス ──
@dataclass(frozen=True)
class Point:
x: float
y: float
p = Point(1.0, 2.0)
# p.x = 3.0 # FrozenInstanceError
# ── __slots__ による省メモリ化 ──
@dataclass(slots=True) # Python 3.10+
class Coordinate:
x: float
y: float
z: float
# slots=True で __dict__ を持たなくなり、メモリ使用量が削減される
# 数百万のインスタンスを生成する場合に有効
# ── Python 3.10+ kw_only ──
@dataclass(kw_only=True)
class Config:
host: str
port: int
debug: bool = False
# Config("localhost", 8080) # TypeError: キーワード引数必須
config = Config(host="localhost", port=8080)
4.5 プロトコルと構造的サブタイピング
from typing import Protocol, runtime_checkable
@runtime_checkable
class Drawable(Protocol):
"""描画可能なオブジェクトのプロトコル"""
def draw(self) -> None: ...
def get_bounds(self) -> tuple[float, float, float, float]: ...
class Button:
"""Drawable を明示的に継承していないが、プロトコルに適合する"""
def draw(self) -> None:
print("Drawing button")
def get_bounds(self) -> tuple[float, float, float, float]:
return (0, 0, 100, 50)
class TextLabel:
def draw(self) -> None:
print("Drawing text label")
def get_bounds(self) -> tuple[float, float, float, float]:
return (0, 0, 200, 30)
def render(widget: Drawable) -> None:
"""Drawable プロトコルに適合する任意のオブジェクトを受け入れる"""
bounds = widget.get_bounds()
print(f"Rendering at bounds {bounds}")
widget.draw()
render(Button()) # OK — 構造的にプロトコルに適合
render(TextLabel()) # OK
# ランタイムチェック
print(isinstance(Button(), Drawable)) # True
4.6 コンテキストマネージャ
# ── __enter__ / __exit__ による実装 ──
class DatabaseConnection:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.connection = None
def __enter__(self):
print(f"Connecting to {self.connection_string}")
self.connection = f"Connection({self.connection_string})"
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print(f"Closing {self.connection}")
self.connection = None
# False を返すと例外が伝播する(デフォルト)
# True を返すと例外を握りつぶす
return False
def query(self, sql: str):
return f"Result of: {sql}"
with DatabaseConnection("postgresql://localhost/mydb") as db:
result = db.query("SELECT * FROM users")
# ── contextlib を使った簡易実装 ──
from contextlib import contextmanager
@contextmanager
def temporary_directory():
"""一時ディレクトリを作成し、使用後に削除する"""
import tempfile
import shutil
tmpdir = tempfile.mkdtemp()
try:
yield tmpdir
finally:
shutil.rmtree(tmpdir)
with temporary_directory() as tmpdir:
print(f"Working in {tmpdir}")
# ── 非同期コンテキストマネージャ ──
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_db_connection(url: str):
conn = await connect(url) # 仮想的な非同期接続
try:
yield conn
finally:
await conn.close()
5. モジュールとパッケージ
5.1 モジュールシステム
# ── モジュールの作成とインポート ──
# mymodule.py
"""ユーティリティ関数を提供するモジュール"""
__all__ = ["public_function", "PublicClass"] # from mymodule import * で公開する名前
def public_function():
return "I'm public"
def _private_function():
return "I'm conventionally private"
class PublicClass:
pass
# インポートの方法
import mymodule
from mymodule import public_function
from mymodule import public_function as pub_func
import mymodule as mm
# 条件付きインポート
try:
import ujson as json
except ImportError:
import json
# 遅延インポート
def process():
import heavy_module # 必要な時だけインポート
return heavy_module.compute()
5.2 パッケージ構造
myproject/
├── pyproject.toml # プロジェクト設定(PEP 621)
├── src/
│ └── mypackage/
│ ├── __init__.py # パッケージ初期化
│ ├── __main__.py # python -m mypackage で実行
│ ├── core/
│ │ ├── __init__.py
│ │ ├── engine.py
│ │ └── config.py
│ ├── utils/
│ │ ├── __init__.py
│ │ ├── helpers.py
│ │ └── validators.py
│ └── api/
│ ├── __init__.py
│ ├── routes.py
│ └── middleware.py
├── tests/
│ ├── __init__.py
│ ├── conftest.py # pytest のフィクスチャ
│ ├── test_core/
│ │ └── test_engine.py
│ └── test_api/
│ └── test_routes.py
├── docs/
│ └── index.md
└── README.md
# __init__.py — パッケージの公開 API を定義
# src/mypackage/__init__.py
"""MyPackage — サンプルパッケージ"""
__version__ = "1.0.0"
from mypackage.core.engine import Engine
from mypackage.core.config import Config
__all__ = ["Engine", "Config"]
# __main__.py — python -m mypackage で実行される
# src/mypackage/__main__.py
import sys
from mypackage.core.engine import Engine
def main():
engine = Engine()
engine.run()
return 0
if __name__ == "__main__":
sys.exit(main())
5.3 pyproject.toml(PEP 621 — 現代的なプロジェクト設定)
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "mypackage"
version = "1.0.0"
description = "A sample Python package"
readme = "README.md"
license = {text = "MIT"}
requires-python = ">=3.10"
authors = [
{name = "Alice", email = "alice@example.com"},
]
keywords = ["sample", "package"]
classifiers = [
"Development Status :: 4 - Beta",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
]
dependencies = [
"requests>=2.28",
"pydantic>=2.0",
"click>=8.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0",
"pytest-cov>=4.0",
"ruff>=0.1.0",
"mypy>=1.0",
]
docs = [
"sphinx>=6.0",
"sphinx-rtd-theme>=1.0",
]
[project.scripts]
mypackage-cli = "mypackage.__main__:main"
[project.urls]
Homepage = "https://github.com/example/mypackage"
Documentation = "https://mypackage.readthedocs.io"
# ── ツール設定 ──
[tool.ruff]
target-version = "py310"
line-length = 100
select = ["E", "F", "I", "N", "W", "UP", "B", "A", "SIM"]
[tool.ruff.lint.isort]
known-first-party = ["mypackage"]
[tool.mypy]
python_version = "3.10"
strict = true
warn_return_any = true
warn_unused_configs = true
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-v --cov=mypackage --cov-report=term-missing"
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
[tool.coverage.run]
source = ["mypackage"]
branch = true
[tool.coverage.report]
fail_under = 80
show_missing = true
exclude_lines = [
"pragma: no cover",
"if TYPE_CHECKING:",
"if __name__ == .__main__.",
]
5.4 仮想環境とパッケージ管理
# ── venv(標準ライブラリ) ──
python -m venv .venv
source .venv/bin/activate # macOS/Linux
# .venv\Scripts\activate # Windows
pip install -r requirements.txt
deactivate
# ── pip の基本操作 ──
pip install requests # パッケージのインストール
pip install "requests>=2.28,<3" # バージョン制約
pip install -e ".[dev]" # 開発モードでインストール
pip freeze > requirements.txt # 依存関係の出力
pip install -r requirements.txt # 依存関係の一括インストール
# ── uv(高速なパッケージ管理ツール) ──
# Rust で書かれた pip/venv の代替(10-100倍高速)
pip install uv
uv venv # 仮想環境の作成
uv pip install requests # パッケージのインストール
uv pip compile requirements.in -o requirements.txt # ロックファイル生成
uv pip sync requirements.txt # 環境を同期
# ── Poetry ──
pip install poetry
poetry new myproject # 新規プロジェクト作成
poetry init # 既存プロジェクトの初期化
poetry add requests # 依存関係の追加
poetry add --group dev pytest # 開発依存の追加
poetry install # 依存関係のインストール
poetry lock # ロックファイルの更新
poetry build # パッケージのビルド
poetry publish # PyPI への公開
# ── pipx(CLI ツールのグローバルインストール) ──
pip install pipx
pipx install ruff
pipx install mypy
pipx install black
6. エラーハンドリング
6.1 例外の基本
# ── try / except / else / finally ──
def divide(a: float, b: float) -> float:
try:
result = a / b
except ZeroDivisionError:
print("Error: Division by zero")
return float('inf')
except TypeError as e:
print(f"Type error: {e}")
raise # 例外を再送出
else:
# 例外が発生しなかった場合に実行
print(f"Result: {result}")
return result
finally:
# 常に実行される(クリーンアップ処理に使用)
print("Division operation completed")
# ── 例外の階層 ──
# BaseException
# ├── SystemExit
# ├── KeyboardInterrupt
# ├── GeneratorExit
# └── Exception
# ├── StopIteration
# ├── ArithmeticError
# │ ├── ZeroDivisionError
# │ └── OverflowError
# ├── LookupError
# │ ├── IndexError
# │ └── KeyError
# ├── OSError
# │ ├── FileNotFoundError
# │ ├── PermissionError
# │ └── ConnectionError
# ├── ValueError
# ├── TypeError
# ├── AttributeError
# └── RuntimeError
# ── 複数の例外をまとめてキャッチ ──
try:
value = int("not_a_number")
except (ValueError, TypeError) as e:
print(f"Conversion error: {e}")
6.2 カスタム例外
class AppError(Exception):
"""アプリケーションの基底例外"""
def __init__(self, message: str, code: int = 500):
super().__init__(message)
self.code = code
class ValidationError(AppError):
"""入力バリデーションエラー"""
def __init__(self, field: str, message: str):
super().__init__(f"Validation error on '{field}': {message}", code=400)
self.field = field
class NotFoundError(AppError):
"""リソースが見つからないエラー"""
def __init__(self, resource: str, identifier: str):
super().__init__(f"{resource} '{identifier}' not found", code=404)
self.resource = resource
self.identifier = identifier
class AuthenticationError(AppError):
"""認証エラー"""
def __init__(self, message: str = "Authentication failed"):
super().__init__(message, code=401)
# 使用例
def get_user(user_id: str) -> dict:
if not user_id:
raise ValidationError("user_id", "Must not be empty")
users = {"1": {"name": "Alice"}}
if user_id not in users:
raise NotFoundError("User", user_id)
return users[user_id]
try:
user = get_user("99")
except NotFoundError as e:
print(f"[{e.code}] {e}")
# [404] User '99' not found
except AppError as e:
print(f"[{e.code}] {e}")
6.3 例外グループ(Python 3.11+)
# ── ExceptionGroup: 複数の例外をまとめて扱う ──
def process_batch(items: list) -> list:
"""バッチ処理で複数のエラーをまとめて報告"""
results = []
errors = []
for i, item in enumerate(items):
try:
results.append(int(item))
except ValueError as e:
errors.append(ValueError(f"Item {i}: {e}"))
if errors:
raise ExceptionGroup("Batch processing errors", errors)
return results
# except* で ExceptionGroup をフィルタリング
try:
process_batch(["1", "abc", "3", "def"])
except* ValueError as eg:
print(f"Got {len(eg.exceptions)} ValueError(s):")
for e in eg.exceptions:
print(f" - {e}")
# ── add_note(Python 3.11+) ──
try:
raise ValueError("Something went wrong")
except ValueError as e:
e.add_note("This happened during batch processing")
e.add_note(f"Processing item #42")
raise # ノート付きで再送出
7. ファイル I/O と標準ライブラリ
7.1 ファイル操作
from pathlib import Path
# ── pathlib(推奨) ──
base = Path("/var/log/myapp")
log_file = base / "app.log" # パス結合
# パス操作
print(log_file.parent) # /var/log/myapp
print(log_file.name) # app.log
print(log_file.stem) # app
print(log_file.suffix) # .log
print(log_file.exists()) # True/False
print(log_file.is_file()) # True/False
print(log_file.is_dir()) # False
# ディレクトリ操作
Path("output/reports").mkdir(parents=True, exist_ok=True)
# ファイルの列挙
for py_file in Path("src").rglob("*.py"):
print(py_file)
# テキストファイルの読み書き
config_path = Path("config.txt")
config_path.write_text("key=value\n", encoding="utf-8")
content = config_path.read_text(encoding="utf-8")
# バイナリファイル
data_path = Path("data.bin")
data_path.write_bytes(b"\x00\x01\x02")
raw = data_path.read_bytes()
# ── ファイルの読み書き(open) ──
# テキストファイル
with open("data.txt", "r", encoding="utf-8") as f:
# 一括読み込み
content = f.read()
# 行ごとの読み込み(メモリ効率的)
f.seek(0)
for line in f:
print(line.strip())
# 書き込み
with open("output.txt", "w", encoding="utf-8") as f:
f.write("Hello, World!\n")
f.writelines(["line1\n", "line2\n", "line3\n"])
# 追記
with open("output.txt", "a", encoding="utf-8") as f:
f.write("Appended line\n")
7.2 JSON / YAML / TOML
import json
from pathlib import Path
# ── JSON ──
data = {
"name": "MyApp",
"version": "1.0.0",
"settings": {
"debug": True,
"max_connections": 100,
"allowed_hosts": ["localhost", "0.0.0.0"],
},
}
# 書き出し
Path("config.json").write_text(
json.dumps(data, indent=2, ensure_ascii=False),
encoding="utf-8"
)
# 読み込み
config = json.loads(Path("config.json").read_text(encoding="utf-8"))
# カスタムエンコーダ/デコーダ
from datetime import datetime, date
from decimal import Decimal
class CustomEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (datetime, date)):
return obj.isoformat()
if isinstance(obj, Decimal):
return str(obj)
return super().default(obj)
json.dumps({"today": date.today(), "price": Decimal("19.99")}, cls=CustomEncoder)
# ── TOML(Python 3.11+ で標準ライブラリ) ──
import tomllib # 読み込み専用
with open("pyproject.toml", "rb") as f:
toml_data = tomllib.load(f)
# 書き込みには tomli-w が必要
# pip install tomli-w
# import tomli_w
# with open("output.toml", "wb") as f:
# tomli_w.dump(data, f)
# ── CSV ──
import csv
# 書き込み
with open("users.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=["name", "age", "email"])
writer.writeheader()
writer.writerows([
{"name": "Alice", "age": 30, "email": "alice@example.com"},
{"name": "Bob", "age": 25, "email": "bob@example.com"},
])
# 読み込み
with open("users.csv", "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
print(f"{row['name']} ({row['age']})")
7.3 日時処理
from datetime import datetime, date, time, timedelta, timezone
from zoneinfo import ZoneInfo # Python 3.9+
# 現在日時
now = datetime.now() # ローカル時間(naiveは非推奨)
now_utc = datetime.now(timezone.utc) # UTC
now_jst = datetime.now(ZoneInfo("Asia/Tokyo")) # JST
# タイムゾーン変換
utc_time = datetime(2024, 1, 15, 12, 0, tzinfo=timezone.utc)
jst_time = utc_time.astimezone(ZoneInfo("Asia/Tokyo"))
pst_time = utc_time.astimezone(ZoneInfo("America/Los_Angeles"))
print(utc_time) # 2024-01-15 12:00:00+00:00
print(jst_time) # 2024-01-15 21:00:00+09:00
print(pst_time) # 2024-01-15 04:00:00-08:00
# 日時の計算
tomorrow = date.today() + timedelta(days=1)
next_week = datetime.now() + timedelta(weeks=1)
two_hours_later = datetime.now() + timedelta(hours=2)
# 日時のフォーマット
formatted = now.strftime("%Y-%m-%d %H:%M:%S")
iso_format = now.isoformat()
# 文字列からの解析
parsed = datetime.strptime("2024-01-15 10:30:00", "%Y-%m-%d %H:%M:%S")
from_iso = datetime.fromisoformat("2024-01-15T10:30:00+09:00")
7.4 正規表現
import re
text = "Contact us at support@example.com or sales@company.org"
# 基本的なマッチング
pattern = r'[\w.+-]+@[\w-]+\.[\w.]+'
emails = re.findall(pattern, text)
# ['support@example.com', 'sales@company.org']
# コンパイル済みパターン(繰り返し使用する場合)
email_pattern = re.compile(r'''
(?P<user>[\w.+-]+) # ユーザー名
@ # @
(?P<domain>[\w-]+\.[\w.]+) # ドメイン名
''', re.VERBOSE)
for match in email_pattern.finditer(text):
print(f"User: {match.group('user')}, Domain: {match.group('domain')}")
# 置換
sanitized = re.sub(r'\b\d{3}-\d{4}\b', 'XXX-XXXX', "Call 123-4567 or 987-6543")
# "Call XXX-XXXX or XXX-XXXX"
# 分割
parts = re.split(r'[,;]\s*', "apple, banana; cherry, date")
# ['apple', 'banana', 'cherry', 'date']
8. 型ヒントと静的解析
8.1 型ヒントの基本(PEP 484+)
from typing import (
Any, Union, Optional, Literal,
TypeVar, Generic, TypeAlias,
Callable, Iterator, Generator,
ClassVar, Final, Annotated,
TypeGuard, Never, Self,
overload
)
from collections.abc import Sequence, Mapping, Iterable
# ── 基本的な型ヒント ──
name: str = "Alice"
age: int = 30
ratio: float = 0.85
is_active: bool = True
# コレクション型(Python 3.9+ は組み込み型で直接記述可能)
names: list[str] = ["Alice", "Bob"]
scores: dict[str, int] = {"Alice": 95, "Bob": 87}
coordinates: tuple[float, float] = (3.14, 2.72)
unique_ids: set[int] = {1, 2, 3}
immutable_ids: frozenset[int] = frozenset({1, 2, 3})
# ── Optional と Union ──
# Python 3.10+ では X | Y 構文が使える
def find_user(user_id: int) -> dict[str, Any] | None: # Optional[dict[str, Any]] と同等
...
def process(value: int | str) -> str: # Union[int, str] と同等
return str(value)
# ── Literal 型 ──
def set_color(color: Literal["red", "green", "blue"]) -> None:
...
# ── TypeAlias ──
UserId: TypeAlias = int
UserRecord: TypeAlias = dict[str, Any]
Callback: TypeAlias = Callable[[str, int], bool]
# Python 3.12+ では type 文
type Vector = list[float]
type Matrix = list[Vector]
type JSONValue = str | int | float | bool | None | list["JSONValue"] | dict[str, "JSONValue"]
# ── 関数の型ヒント ──
def greet(name: str, times: int = 1) -> str:
return (f"Hello, {name}! " * times).strip()
# コールバックの型
def apply_operation(
values: list[int],
operation: Callable[[int], int]
) -> list[int]:
return [operation(v) for v in values]
# ジェネレータの型
def count_up(start: int, end: int) -> Generator[int, None, None]:
while start < end:
yield start
start += 1
8.2 ジェネリクス
from typing import TypeVar, Generic
T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')
# ── ジェネリッククラス ──
class Stack(Generic[T]):
def __init__(self) -> None:
self._items: list[T] = []
def push(self, item: T) -> None:
self._items.append(item)
def pop(self) -> T:
if not self._items:
raise IndexError("Stack is empty")
return self._items.pop()
def peek(self) -> T:
if not self._items:
raise IndexError("Stack is empty")
return self._items[-1]
def __len__(self) -> int:
return len(self._items)
int_stack: Stack[int] = Stack()
int_stack.push(1)
int_stack.push(2)
value: int = int_stack.pop()
# Python 3.12+ の新しいジェネリクス構文
class Stack[T]:
def __init__(self) -> None:
self._items: list[T] = []
def push(self, item: T) -> None:
self._items.append(item)
def pop(self) -> T:
return self._items.pop()
# ジェネリック関数(Python 3.12+)
def first[T](items: Sequence[T]) -> T:
return items[0]
# 境界付きTypeVar
from typing import TypeVar
class Comparable:
def __lt__(self, other: "Comparable") -> bool: ...
CT = TypeVar('CT', bound=Comparable)
def min_value(a: CT, b: CT) -> CT:
return a if a < b else b
8.3 Pydantic によるデータバリデーション
from pydantic import BaseModel, Field, field_validator, model_validator
from pydantic import EmailStr, HttpUrl # pip install pydantic[email]
from datetime import datetime
from enum import Enum
class UserRole(str, Enum):
ADMIN = "admin"
USER = "user"
GUEST = "guest"
class Address(BaseModel):
street: str
city: str
country: str = "Japan"
postal_code: str = Field(pattern=r'^\d{3}-\d{4}$')
class UserCreate(BaseModel):
"""ユーザー作成リクエストのバリデーションモデル"""
model_config = {
"str_strip_whitespace": True,
"str_min_length": 1,
}
name: str = Field(min_length=2, max_length=50)
email: str = Field(description="ユーザーのメールアドレス")
age: int = Field(ge=0, le=150)
role: UserRole = UserRole.USER
tags: list[str] = Field(default_factory=list, max_length=10)
address: Address | None = None
created_at: datetime = Field(default_factory=datetime.now)
@field_validator('email')
@classmethod
def validate_email(cls, v: str) -> str:
if '@' not in v:
raise ValueError('Invalid email format')
return v.lower()
@field_validator('tags')
@classmethod
def validate_tags(cls, v: list[str]) -> list[str]:
return [tag.lower().strip() for tag in v]
@model_validator(mode='after')
def validate_model(self) -> 'UserCreate':
if self.role == UserRole.ADMIN and self.age < 18:
raise ValueError('Admin users must be at least 18 years old')
return self
# 使用例
user = UserCreate(
name="Alice",
email="ALICE@Example.com",
age=30,
tags=["Developer", "Python"],
address={"street": "123 Main St", "city": "Tokyo", "postal_code": "100-0001"}
)
print(user.model_dump())
# email は自動的に小文字化される
print(user.model_dump_json(indent=2)) # JSON シリアライズ
# バリデーションエラー
from pydantic import ValidationError
try:
invalid_user = UserCreate(name="A", email="invalid", age=-1)
except ValidationError as e:
print(e.errors())
# ── Settings 管理(pydantic-settings) ──
# pip install pydantic-settings
from pydantic_settings import BaseSettings
class AppSettings(BaseSettings):
"""環境変数から設定を読み込む"""
model_config = {
"env_prefix": "APP_",
"env_file": ".env",
}
debug: bool = False
database_url: str = "sqlite:///db.sqlite3"
secret_key: str
allowed_origins: list[str] = ["http://localhost:3000"]
max_connections: int = 10
# APP_SECRET_KEY=mysecret APP_DEBUG=true python app.py
# settings = AppSettings()
8.4 mypy による静的型チェック
# インストール
pip install mypy
# 基本的な実行
mypy src/
mypy --strict src/ # 厳格モード
mypy --python-version 3.12 src/
# mypy.ini または pyproject.toml で設定
# pyproject.toml
[tool.mypy]
python_version = "3.12"
strict = true
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
check_untyped_defs = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_ignores = true
[[tool.mypy.overrides]]
module = "tests.*"
disallow_untyped_defs = false
[[tool.mypy.overrides]]
module = "third_party_module.*"
ignore_missing_imports = true
# mypy が検出するエラーの例
def process(items: list[str]) -> int:
total = 0
for item in items:
total += len(item)
return total
result: str = process(["hello"]) # error: Incompatible types in assignment
# (expression has type "int", variable has type "str")
# TypeGuard を使った型の絞り込み
from typing import TypeGuard
def is_string_list(val: list[object]) -> TypeGuard[list[str]]:
return all(isinstance(x, str) for x in val)
def process_items(items: list[object]) -> None:
if is_string_list(items):
# ここで items は list[str] として扱われる
for item in items:
print(item.upper())
9. 並行処理と非同期プログラミング
9.1 スレッディング
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
# ── 基本的なスレッド ──
def worker(name: str, delay: float) -> str:
print(f"[{name}] Starting")
time.sleep(delay)
print(f"[{name}] Done")
return f"{name} completed"
# Thread クラスの直接使用
thread = threading.Thread(target=worker, args=("Thread-1", 2))
thread.start()
thread.join() # スレッドの完了を待機
# ── ThreadPoolExecutor(推奨) ──
urls = [
"https://api.example.com/users",
"https://api.example.com/products",
"https://api.example.com/orders",
]
def fetch_url(url: str) -> dict:
"""URL からデータを取得(シミュレーション)"""
import urllib.request
time.sleep(1) # ネットワークI/Oのシミュレーション
return {"url": url, "status": 200}
with ThreadPoolExecutor(max_workers=5) as executor:
# submit で個別に投入
futures = {executor.submit(fetch_url, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
result = future.result(timeout=10)
print(f"{url}: {result['status']}")
except Exception as e:
print(f"{url}: Error - {e}")
# map でまとめて投入
results = list(executor.map(fetch_url, urls))
# ── スレッド同期プリミティブ ──
lock = threading.Lock()
counter = 0
def increment():
global counter
with lock: # ロックの取得と解放を自動化
temp = counter
time.sleep(0.001)
counter = temp + 1
threads = [threading.Thread(target=increment) for _ in range(100)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Counter: {counter}") # 100(ロックなしだと100未満になりうる)
# ── threading.Event ──
stop_event = threading.Event()
def background_task():
while not stop_event.is_set():
print("Working...")
stop_event.wait(timeout=1.0)
print("Stopped")
t = threading.Thread(target=background_task, daemon=True)
t.start()
time.sleep(3)
stop_event.set() # タスクに停止を通知
t.join()
9.2 マルチプロセシング
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
# ── CPU バウンドなタスクはマルチプロセスが効果的 ──
# (GIL の制約を受けない)
def cpu_heavy_task(n: int) -> int:
"""CPU 集約的な計算"""
return sum(i * i for i in range(n))
# ProcessPoolExecutor(推奨)
with ProcessPoolExecutor(max_workers=mp.cpu_count()) as executor:
tasks = [1_000_000, 2_000_000, 3_000_000, 4_000_000]
results = list(executor.map(cpu_heavy_task, tasks))
print(results)
# ── multiprocessing.Pool ──
def process_chunk(chunk: list[int]) -> list[int]:
return [x ** 2 for x in chunk]
if __name__ == "__main__":
data = list(range(1000))
chunk_size = len(data) // mp.cpu_count()
chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]
with mp.Pool(processes=mp.cpu_count()) as pool:
results = pool.map(process_chunk, chunks)
flat_results = [item for sublist in results for item in sublist]
# ── プロセス間通信 ──
def producer(queue: mp.Queue):
for i in range(10):
queue.put(f"item-{i}")
queue.put(None) # 終了シグナル
def consumer(queue: mp.Queue):
while True:
item = queue.get()
if item is None:
break
print(f"Consumed: {item}")
if __name__ == "__main__":
queue = mp.Queue()
p = mp.Process(target=producer, args=(queue,))
c = mp.Process(target=consumer, args=(queue,))
p.start()
c.start()
p.join()
c.join()
# ── 共有メモリ(Python 3.8+) ──
from multiprocessing import shared_memory
import numpy as np
if __name__ == "__main__":
# 共有メモリの作成
a = np.array([1, 2, 3, 4, 5], dtype=np.float64)
shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
shared_array = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
shared_array[:] = a[:]
print(f"Shared memory name: {shm.name}")
# 別プロセスからアクセス
existing_shm = shared_memory.SharedMemory(name=shm.name)
b = np.ndarray(a.shape, dtype=a.dtype, buffer=existing_shm.buf)
print(b) # [1. 2. 3. 4. 5.]
existing_shm.close()
shm.close()
shm.unlink()
9.3 asyncio — 非同期プログラミング
import asyncio
import aiohttp # pip install aiohttp
# ── 基本的な非同期関数 ──
async def fetch_data(url: str) -> dict:
"""非同期 HTTP リクエスト"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
async def main():
# 並行実行(gather)
urls = [
"https://api.example.com/users",
"https://api.example.com/products",
"https://api.example.com/orders",
]
results = await asyncio.gather(
*[fetch_data(url) for url in urls],
return_exceptions=True # エラーを例外として返す
)
for url, result in zip(urls, results):
if isinstance(result, Exception):
print(f"{url}: Error - {result}")
else:
print(f"{url}: {result}")
asyncio.run(main())
# ── TaskGroup(Python 3.11+: 構造化並行性) ──
async def main_v2():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_data("https://api.example.com/users"))
task2 = tg.create_task(fetch_data("https://api.example.com/products"))
task3 = tg.create_task(fetch_data("https://api.example.com/orders"))
# ここに到達した時点で全タスクが完了している
# いずれかが例外を送出した場合、ExceptionGroup が発生
print(task1.result(), task2.result(), task3.result())
# ── 非同期ジェネレータ ──
async def async_range(start: int, end: int, delay: float = 0.1):
for i in range(start, end):
await asyncio.sleep(delay)
yield i
async def consume():
async for value in async_range(0, 10):
print(value)
# ── 非同期コンテキストマネージャ ──
class AsyncDatabase:
async def __aenter__(self):
print("Connecting...")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Disconnecting...")
await asyncio.sleep(0.1)
async def query(self, sql: str) -> list:
await asyncio.sleep(0.1)
return [{"id": 1, "name": "Alice"}]
async def db_example():
async with AsyncDatabase() as db:
results = await db.query("SELECT * FROM users")
# ── セマフォによる並行数の制限 ──
async def rate_limited_fetch(urls: list[str], max_concurrent: int = 5):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_limit(url: str) -> dict:
async with semaphore:
return await fetch_data(url)
return await asyncio.gather(*[fetch_with_limit(url) for url in urls])
# ── タイムアウト ──
async def with_timeout():
try:
async with asyncio.timeout(5.0): # Python 3.11+
result = await fetch_data("https://slow-api.example.com/data")
except TimeoutError:
print("Request timed out")
# ── キュー ──
async def producer(queue: asyncio.Queue):
for i in range(10):
await queue.put(f"item-{i}")
await asyncio.sleep(0.1)
await queue.put(None)
async def consumer(queue: asyncio.Queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Processing: {item}")
queue.task_done()
async def queue_example():
queue: asyncio.Queue = asyncio.Queue(maxsize=5)
await asyncio.gather(
producer(queue),
consumer(queue),
)
9.4 GIL(Global Interpreter Lock)と並行処理の選択
Python の並行処理の選択ガイド:
┌─────────────────────┬───────────────────────┬─────────────────┐
│ タスクの種類 │ 推奨手法 │ 理由 │
├─────────────────────┼───────────────────────┼─────────────────┤
│ I/O バウンド │ asyncio │ 最も効率的 │
│ (HTTP, DB, ファイル) │ または ThreadPool │ GIL の影響なし │
├─────────────────────┼───────────────────────┼─────────────────┤
│ CPU バウンド │ multiprocessing │ GIL を回避 │
│ (計算、データ処理) │ または ProcessPool │ 真の並列実行 │
├─────────────────────┼───────────────────────┼─────────────────┤
│ 混合 │ asyncio + │ I/OはasyncIO、 │
│ │ ProcessPoolExecutor │ CPUはプロセス │
└─────────────────────┴───────────────────────┴─────────────────┘
Python 3.13+ の実験的フリースレッドモード:
- GIL を無効化可能(--disable-gil フラグ)
- マルチスレッドで真の並列実行が可能に
- まだ実験段階で、一部のC拡張との互換性に注意
# ── 混合パターン: asyncio + ProcessPoolExecutor ──
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_work(data: list[int]) -> int:
"""CPU 集約的な処理(別プロセスで実行)"""
return sum(x ** 2 for x in data)
async def mixed_workload():
loop = asyncio.get_event_loop()
# CPU バウンドなタスクを別プロセスに委譲
with ProcessPoolExecutor() as pool:
data_chunks = [list(range(1_000_000)) for _ in range(4)]
futures = [
loop.run_in_executor(pool, cpu_work, chunk)
for chunk in data_chunks
]
results = await asyncio.gather(*futures)
print(f"Total: {sum(results)}")
asyncio.run(mixed_workload())
10. テスト
10.1 pytest — テストフレームワーク
# tests/test_calculator.py
import pytest
from mypackage.calculator import Calculator, DivisionByZeroError
# ── 基本的なテスト ──
def test_addition():
calc = Calculator()
assert calc.add(2, 3) == 5
def test_subtraction():
calc = Calculator()
assert calc.subtract(5, 3) == 2
# ── パラメタライズ ──
@pytest.mark.parametrize("a, b, expected", [
(2, 3, 5),
(-1, 1, 0),
(0, 0, 0),
(100, -50, 50),
(1.5, 2.5, 4.0),
])
def test_add_parametrized(a, b, expected):
calc = Calculator()
assert calc.add(a, b) == expected
# ── 例外のテスト ──
def test_division_by_zero():
calc = Calculator()
with pytest.raises(DivisionByZeroError, match="Cannot divide by zero"):
calc.divide(10, 0)
# ── 近似値の比較 ──
def test_float_comparison():
assert 0.1 + 0.2 == pytest.approx(0.3)
assert [0.1, 0.2] == pytest.approx([0.1, 0.2], abs=1e-6)
10.2 フィクスチャ
# tests/conftest.py
import pytest
import tempfile
from pathlib import Path
@pytest.fixture
def calculator():
"""Calculator インスタンスを提供するフィクスチャ"""
return Calculator()
@pytest.fixture
def sample_data():
"""テスト用サンプルデータ"""
return {
"users": [
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25},
]
}
@pytest.fixture
def temp_dir():
"""一時ディレクトリを提供し、テスト後に自動クリーンアップ"""
with tempfile.TemporaryDirectory() as tmpdir:
yield Path(tmpdir)
@pytest.fixture(scope="session")
def database():
"""セッション全体で共有されるデータベース接続"""
db = Database.connect("test_db")
db.create_tables()
yield db
db.drop_tables()
db.disconnect()
@pytest.fixture(autouse=True)
def reset_state():
"""全テストの前後で自動実行されるフィクスチャ"""
yield
# テスト後のクリーンアップ
# フィクスチャを使用するテスト
def test_with_calculator(calculator):
assert calculator.add(1, 1) == 2
def test_with_temp_dir(temp_dir):
test_file = temp_dir / "test.txt"
test_file.write_text("hello")
assert test_file.read_text() == "hello"
10.3 モックとパッチ
from unittest.mock import Mock, patch, MagicMock, AsyncMock
import pytest
# ── Mock オブジェクト ──
def test_with_mock():
# 基本的な Mock
mock_db = Mock()
mock_db.query.return_value = [{"id": 1, "name": "Alice"}]
result = mock_db.query("SELECT * FROM users")
assert result == [{"id": 1, "name": "Alice"}]
mock_db.query.assert_called_once_with("SELECT * FROM users")
# side_effect で例外を発生
mock_db.query.side_effect = ConnectionError("Database unavailable")
with pytest.raises(ConnectionError):
mock_db.query("SELECT 1")
# ── patch デコレータ ──
class UserService:
def __init__(self, api_client):
self.api_client = api_client
def get_user(self, user_id: int) -> dict:
response = self.api_client.get(f"/users/{user_id}")
if response.status_code == 200:
return response.json()
raise ValueError("User not found")
@patch("mypackage.services.requests.get")
def test_get_user(mock_get):
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {"id": 1, "name": "Alice"}
mock_get.return_value = mock_response
service = UserService(Mock())
# ... テスト
# ── 非同期モック ──
@pytest.mark.asyncio
async def test_async_function():
mock_client = AsyncMock()
mock_client.fetch.return_value = {"data": "result"}
result = await mock_client.fetch("https://api.example.com")
assert result == {"data": "result"}
10.4 pytest の設定とプラグイン
# pyproject.toml
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py", "*_test.py"]
python_functions = ["test_*"]
python_classes = ["Test*"]
addopts = [
"-v", # 詳細出力
"--tb=short", # トレースバックの短縮表示
"--strict-markers", # 未登録マーカーをエラーに
"--cov=src/mypackage", # カバレッジ対象
"--cov-report=term-missing", # カバレッジレポート
"--cov-report=html:htmlcov", # HTML レポート出力
"-x", # 最初の失敗で停止
]
markers = [
"slow: marks tests as slow (deselect with '-m \"not slow\"')",
"integration: marks tests as integration tests",
]
filterwarnings = [
"error",
"ignore::DeprecationWarning",
]
asyncio_mode = "auto" # pytest-asyncio の設定
# よく使う pytest コマンド
pytest # 全テスト実行
pytest tests/test_core.py # 特定ファイル
pytest -k "test_add" # 名前でフィルタ
pytest -m "not slow" # マーカーでフィルタ
pytest --lf # 前回失敗したテストのみ
pytest --ff # 前回失敗したテストを優先実行
pytest -x # 最初の失敗で停止
pytest --pdb # 失敗時に pdb を起動
pytest -n auto # 並列実行(pytest-xdist)
11. ログとデバッグ
11.1 logging モジュール
import logging
import logging.config
import json
from pathlib import Path
# ── 基本設定 ──
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
logger.info("Application started")
logger.warning("Low memory")
logger.error("Connection failed", exc_info=True)
# ── 辞書ベースの設定(推奨) ──
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"standard": {
"format": "%(asctime)s [%(levelname)s] %(name)s: %(message)s",
},
"json": {
"()": "pythonjsonlogger.jsonlogger.JsonFormatter",
"format": "%(asctime)s %(levelname)s %(name)s %(message)s",
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "standard",
"stream": "ext://sys.stdout",
},
"file": {
"class": "logging.handlers.RotatingFileHandler",
"level": "DEBUG",
"formatter": "json",
"filename": "app.log",
"maxBytes": 10_485_760, # 10MB
"backupCount": 5,
},
},
"loggers": {
"mypackage": {
"level": "DEBUG",
"handlers": ["console", "file"],
"propagate": False,
},
},
"root": {
"level": "WARNING",
"handlers": ["console"],
},
}
logging.config.dictConfig(LOGGING_CONFIG)
# ── 構造化ロギング(structlog) ──
# pip install structlog
import structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.dev.ConsoleRenderer(), # 開発用
# structlog.processors.JSONRenderer(), # 本番用
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
)
log = structlog.get_logger()
log.info("user.login", user_id="12345", ip="192.168.1.1")
# 2024-01-15T10:30:00+00:00 [info] user.login user_id=12345 ip=192.168.1.1
11.2 デバッグ手法
# ── breakpoint()(Python 3.7+) ──
def buggy_function(data):
result = []
for item in data:
breakpoint() # ここでデバッガが起動(PYTHONBREAKPOINT=0 で無効化)
result.append(item * 2)
return result
# ── pdb コマンド ──
# n(ext) : 次の行に進む
# s(tep) : 関数の中に入る
# c(ontinue) : 次のブレークポイントまで実行
# p expr : 式を評価して表示
# l(ist) : ソースコードを表示
# w(here) : コールスタックを表示
# q(uit) : デバッガを終了
# ── 便利なデバッグテクニック ──
import sys
import traceback
# トレースバックの取得
try:
1 / 0
except:
tb = traceback.format_exc()
print(tb)
# オブジェクトの詳細情報
import inspect
print(inspect.getsource(some_function)) # ソースコードを表示
print(inspect.getmembers(obj)) # メンバーの一覧
# メモリプロファイリング
# pip install memory-profiler
# @profile
# def memory_heavy():
# data = [i for i in range(1_000_000)]
# return data
# python -m memory_profiler script.py
12. Web フレームワーク
12.1 FastAPI — 現代的な非同期 Web フレームワーク
# pip install "fastapi[standard]"
# uvicorn main:app --reload
from fastapi import FastAPI, HTTPException, Depends, Query, Path, Body
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import Annotated
from datetime import datetime
app = FastAPI(
title="My API",
description="Sample REST API with FastAPI",
version="1.0.0",
)
# ── CORS ミドルウェア ──
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ── モデル定義 ──
class UserCreate(BaseModel):
name: str = Field(min_length=2, max_length=50)
email: str
age: int = Field(ge=0, le=150)
class UserResponse(BaseModel):
id: int
name: str
email: str
age: int
created_at: datetime
model_config = {"from_attributes": True} # ORM モードを有効化
# ── 依存性注入 ──
async def get_db():
"""データベースセッションを提供する依存関数"""
db = DatabaseSession()
try:
yield db
finally:
await db.close()
async def get_current_user(token: str = Depends(oauth2_scheme)):
"""認証済みユーザーを返す依存関数"""
user = await verify_token(token)
if not user:
raise HTTPException(status_code=401, detail="Invalid token")
return user
# ── エンドポイント ──
@app.get("/")
async def root():
return {"message": "Hello, World!"}
@app.get("/users", response_model=list[UserResponse])
async def list_users(
skip: int = Query(0, ge=0),
limit: int = Query(10, ge=1, le=100),
db = Depends(get_db),
):
"""ユーザー一覧を取得"""
return await db.get_users(skip=skip, limit=limit)
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(
user_id: Annotated[int, Path(gt=0)],
db = Depends(get_db),
):
"""特定のユーザーを取得"""
user = await db.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(
user: UserCreate,
db = Depends(get_db),
):
"""新規ユーザーを作成"""
return await db.create_user(user)
@app.put("/users/{user_id}", response_model=UserResponse)
async def update_user(
user_id: int,
user: UserCreate,
db = Depends(get_db),
current_user = Depends(get_current_user),
):
"""ユーザー情報を更新(認証必須)"""
return await db.update_user(user_id, user)
@app.delete("/users/{user_id}", status_code=204)
async def delete_user(user_id: int, db = Depends(get_db)):
"""ユーザーを削除"""
await db.delete_user(user_id)
# ── バックグラウンドタスク ──
from fastapi import BackgroundTasks
@app.post("/notifications")
async def send_notification(
email: str,
background_tasks: BackgroundTasks,
):
background_tasks.add_task(send_email, email, "Welcome!")
return {"message": "Notification queued"}
# ── WebSocket ──
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo: {data}")
except WebSocketDisconnect:
print(f"Client {client_id} disconnected")
# ── ライフサイクルイベント ──
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# 起動時の処理
print("Starting up...")
await init_database()
yield
# シャットダウン時の処理
print("Shutting down...")
await cleanup_resources()
app = FastAPI(lifespan=lifespan)
12.2 Django — フルスタック Web フレームワーク
# ── プロジェクト作成 ──
# pip install django
# django-admin startproject mysite
# cd mysite
# python manage.py startapp blog
# ── settings.py(主要設定) ──
# mysite/settings.py
import os
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent.parent
SECRET_KEY = os.environ.get('DJANGO_SECRET_KEY', 'change-me-in-production')
DEBUG = os.environ.get('DJANGO_DEBUG', 'True').lower() == 'true'
ALLOWED_HOSTS = os.environ.get('DJANGO_ALLOWED_HOSTS', 'localhost').split(',')
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework', # Django REST Framework
'blog.apps.BlogConfig', # アプリケーション
]
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': os.environ.get('DB_NAME', 'mysite'),
'USER': os.environ.get('DB_USER', 'postgres'),
'PASSWORD': os.environ.get('DB_PASSWORD', ''),
'HOST': os.environ.get('DB_HOST', 'localhost'),
'PORT': os.environ.get('DB_PORT', '5432'),
}
}
REST_FRAMEWORK = {
'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
'PAGE_SIZE': 20,
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.TokenAuthentication',
],
'DEFAULT_PERMISSION_CLASSES': [
'rest_framework.permissions.IsAuthenticated',
],
}
# ── モデル定義 ──
# blog/models.py
from django.db import models
from django.contrib.auth.models import User
class Post(models.Model):
title = models.CharField(max_length=200)
slug = models.SlugField(unique=True)
author = models.ForeignKey(User, on_delete=models.CASCADE, related_name='posts')
content = models.TextField()
published = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
tags = models.ManyToManyField('Tag', blank=True)
class Meta:
ordering = ['-created_at']
indexes = [
models.Index(fields=['-created_at']),
models.Index(fields=['slug']),
]
def __str__(self):
return self.title
class Tag(models.Model):
name = models.CharField(max_length=50, unique=True)
def __str__(self):
return self.name
# ── マイグレーション ──
# python manage.py makemigrations
# python manage.py migrate
# ── ビュー(Django REST Framework) ──
# blog/serializers.py
from rest_framework import serializers
class PostSerializer(serializers.ModelSerializer):
author_name = serializers.CharField(source='author.username', read_only=True)
class Meta:
model = Post
fields = ['id', 'title', 'slug', 'author', 'author_name',
'content', 'published', 'created_at', 'updated_at']
read_only_fields = ['author']
# blog/views.py
from rest_framework import viewsets, permissions
class PostViewSet(viewsets.ModelViewSet):
queryset = Post.objects.select_related('author').prefetch_related('tags')
serializer_class = PostSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
lookup_field = 'slug'
def perform_create(self, serializer):
serializer.save(author=self.request.user)
# blog/urls.py
from django.urls import path, include
from rest_framework.routers import DefaultRouter
router = DefaultRouter()
router.register(r'posts', PostViewSet)
urlpatterns = [
path('api/', include(router.urls)),
]
12.3 Flask — 軽量 Web フレームワーク
# pip install flask
from flask import Flask, request, jsonify, abort
from functools import wraps
app = Flask(__name__)
app.config.from_mapping(
SECRET_KEY='dev',
DATABASE='sqlite:///app.db',
)
# ── ルーティング ──
@app.route('/')
def index():
return jsonify({"message": "Hello, World!"})
@app.route('/users', methods=['GET'])
def list_users():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
# ... データベースクエリ
return jsonify({"users": [], "page": page})
@app.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
user = find_user(user_id)
if not user:
abort(404)
return jsonify(user)
@app.route('/users', methods=['POST'])
def create_user():
data = request.get_json()
if not data or 'name' not in data:
abort(400, description="Name is required")
# ... ユーザー作成
return jsonify({"id": 1, **data}), 201
# ── エラーハンドラ ──
@app.errorhandler(404)
def not_found(error):
return jsonify({"error": "Not found"}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({"error": "Internal server error"}), 500
# ── Blueprint によるモジュール化 ──
from flask import Blueprint
api_bp = Blueprint('api', __name__, url_prefix='/api/v1')
@api_bp.route('/health')
def health_check():
return jsonify({"status": "healthy"})
app.register_blueprint(api_bp)
if __name__ == '__main__':
app.run(debug=True, port=5000)
13. データベースアクセス
13.1 SQLAlchemy — ORM
# pip install "sqlalchemy[asyncio]" asyncpg psycopg2-binary
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, DateTime, Boolean, Text
from sqlalchemy import select, func, and_, or_
from sqlalchemy.orm import (
DeclarativeBase, Mapped, mapped_column, relationship,
Session, sessionmaker, selectinload
)
from datetime import datetime
# ── エンジンとセッション設定 ──
# 同期
engine = create_engine(
"postgresql+psycopg2://user:password@localhost:5432/mydb",
echo=True, # SQL ログを出力
pool_size=5, # コネクションプールサイズ
max_overflow=10, # 最大追加接続数
pool_timeout=30, # 接続待ちタイムアウト
pool_recycle=3600, # コネクション再生成間隔(秒)
)
SessionLocal = sessionmaker(bind=engine)
# ── モデル定義(SQLAlchemy 2.0 スタイル) ──
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(100), nullable=False)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=func.now())
# リレーション
posts: Mapped[list["Post"]] = relationship(back_populates="author", cascade="all, delete-orphan")
def __repr__(self) -> str:
return f"User(id={self.id}, name={self.name!r})"
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
title: Mapped[str] = mapped_column(String(200), nullable=False)
content: Mapped[str] = mapped_column(Text, nullable=False)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
published: Mapped[bool] = mapped_column(Boolean, default=False)
created_at: Mapped[datetime] = mapped_column(DateTime, default=func.now())
author: Mapped["User"] = relationship(back_populates="posts")
# テーブル作成
Base.metadata.create_all(engine)
# ── CRUD 操作 ──
# 作成
with Session(engine) as session:
user = User(name="Alice", email="alice@example.com")
session.add(user)
session.commit()
session.refresh(user)
print(f"Created user: {user.id}")
# 読み取り
with Session(engine) as session:
# 単一取得
user = session.get(User, 1)
# クエリ
stmt = select(User).where(User.is_active == True).order_by(User.name)
users = session.scalars(stmt).all()
# JOIN と Eager Loading
stmt = (
select(User)
.options(selectinload(User.posts))
.where(User.name.like("A%"))
)
users_with_posts = session.scalars(stmt).all()
# 集計
stmt = select(func.count(User.id)).where(User.is_active == True)
count = session.scalar(stmt)
# 更新
with Session(engine) as session:
user = session.get(User, 1)
user.name = "Alice Updated"
session.commit()
# 削除
with Session(engine) as session:
user = session.get(User, 1)
session.delete(user)
session.commit()
# ── 非同期版 ──
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
async_engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost:5432/mydb",
echo=True,
)
AsyncSessionLocal = async_sessionmaker(async_engine, class_=AsyncSession)
async def get_user(user_id: int) -> User | None:
async with AsyncSessionLocal() as session:
return await session.get(User, user_id)
async def list_users() -> list[User]:
async with AsyncSessionLocal() as session:
stmt = select(User).where(User.is_active == True)
result = await session.scalars(stmt)
return result.all()
13.2 Alembic — データベースマイグレーション
# 初期設定
pip install alembic
alembic init migrations
# alembic.ini の設定
# sqlalchemy.url = postgresql://user:password@localhost/mydb
# migrations/env.py(主要設定)
from mypackage.models import Base
target_metadata = Base.metadata
# マイグレーションの作成と実行
# alembic revision --autogenerate -m "Create users table"
# alembic upgrade head
# alembic downgrade -1
# alembic history
13.3 Redis
# pip install redis
import redis
from redis import asyncio as aioredis
import json
# ── 同期クライアント ──
r = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True, # bytes ではなく str を返す
)
# 基本操作
r.set("key", "value", ex=3600) # 1時間のTTL
value = r.get("key")
# ハッシュ
r.hset("user:1", mapping={"name": "Alice", "age": "30", "email": "alice@example.com"})
user = r.hgetall("user:1")
# リスト(キュー)
r.rpush("queue:tasks", json.dumps({"task": "process", "data": "..."}))
task = r.lpop("queue:tasks")
# セット
r.sadd("online_users", "user1", "user2", "user3")
is_online = r.sismember("online_users", "user1")
# ソート済みセット(ランキング)
r.zadd("leaderboard", {"Alice": 100, "Bob": 85, "Charlie": 92})
top3 = r.zrevrange("leaderboard", 0, 2, withscores=True)
# パイプライン(バッチ操作)
with r.pipeline() as pipe:
pipe.set("key1", "value1")
pipe.set("key2", "value2")
pipe.get("key1")
results = pipe.execute()
# ── 非同期クライアント ──
async def redis_example():
r = aioredis.from_url("redis://localhost", decode_responses=True)
await r.set("key", "value")
value = await r.get("key")
await r.aclose()
14. データサイエンスとML エコシステム
14.1 主要ライブラリ概要
# ── NumPy: 数値計算の基盤 ──
import numpy as np
# 配列の作成
arr = np.array([1, 2, 3, 4, 5])
zeros = np.zeros((3, 4)) # 3x4 のゼロ行列
ones = np.ones((2, 3)) # 2x3 の1行列
rand = np.random.randn(3, 3) # 3x3 の正規分布乱数
# ベクトル演算(ループ不要で高速)
a = np.array([1, 2, 3])
b = np.array([4, 5, 6])
print(a + b) # [5, 7, 9]
print(a * b) # [4, 10, 18]
print(np.dot(a, b)) # 32(内積)
# 行列演算
matrix = np.array([[1, 2], [3, 4]])
print(np.linalg.inv(matrix)) # 逆行列
print(np.linalg.det(matrix)) # 行列式
eigenvalues, eigenvectors = np.linalg.eig(matrix)
# ブロードキャスティング
data = np.random.randn(1000, 3)
mean = data.mean(axis=0) # 列ごとの平均
std = data.std(axis=0) # 列ごとの標準偏差
normalized = (data - mean) / std # 正規化
# ── Pandas: データ分析 ──
import pandas as pd
# DataFrame の作成
df = pd.DataFrame({
'name': ['Alice', 'Bob', 'Charlie', 'Diana'],
'age': [30, 25, 35, 28],
'city': ['Tokyo', 'Osaka', 'Tokyo', 'Nagoya'],
'salary': [500000, 450000, 600000, 480000],
})
# データの読み込み
# df = pd.read_csv('data.csv')
# df = pd.read_json('data.json')
# df = pd.read_excel('data.xlsx')
# df = pd.read_sql('SELECT * FROM users', engine)
# 基本操作
print(df.head())
print(df.describe())
print(df.info())
# フィルタリング
tokyo_people = df[df['city'] == 'Tokyo']
high_salary = df[df['salary'] > 500000]
combined = df[(df['city'] == 'Tokyo') & (df['age'] > 25)]
# グルーピング
city_stats = df.groupby('city').agg({
'salary': ['mean', 'median', 'std'],
'age': 'mean',
'name': 'count',
})
# メソッドチェーン
result = (
df
.query('age >= 25')
.assign(tax=lambda x: x['salary'] * 0.2)
.sort_values('salary', ascending=False)
.head(10)
)
# ── Matplotlib / Seaborn: 可視化 ──
import matplotlib.pyplot as plt
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
axes[0].bar(df['name'], df['salary'])
axes[0].set_title('Salary by Person')
axes[0].set_ylabel('Salary (JPY)')
axes[1].scatter(df['age'], df['salary'])
axes[1].set_title('Age vs Salary')
axes[1].set_xlabel('Age')
axes[1].set_ylabel('Salary')
plt.tight_layout()
plt.savefig('report.png', dpi=150)
15. Python のアーキテクチャと内部構造
15.1 CPython のアーキテクチャ
Python プログラムの実行フロー:
┌─────────────┐ ┌──────────────┐ ┌────────────────┐
│ ソースコード │ ──→ │ コンパイラ │ ──→ │ バイトコード │
│ (.py) │ │ (Parser + │ │ (.pyc) │
│ │ │ Compiler) │ │ │
└─────────────┘ └──────────────┘ └───────┬────────┘
│
▼
┌────────────────┐
│ Python VM │
│ (インタープリタ) │
│ - スタックマシン │
│ - GIL │
│ - GC │
└────────────────┘
# ── バイトコードの確認 ──
import dis
def example(x, y):
return x + y * 2
dis.dis(example)
# 出力:
# 0 LOAD_FAST 0 (x)
# 2 LOAD_FAST 1 (y)
# 4 LOAD_CONST 1 (2)
# 6 BINARY_MULTIPLY
# 8 BINARY_ADD
# 10 RETURN_VALUE
# コンパイル済みバイトコードの確認
import py_compile
py_compile.compile('example.py') # __pycache__/example.cpython-312.pyc が生成される
15.2 メモリ管理とガベージコレクション
import sys
import gc
# ── 参照カウント ──
a = [1, 2, 3]
print(sys.getrefcount(a)) # 2(変数 a + getrefcount の引数)
b = a # 参照カウント +1
print(sys.getrefcount(a)) # 3
del b # 参照カウント -1
print(sys.getrefcount(a)) # 2
# ── ガベージコレクション ──
# 循環参照を検出・回収
gc.get_threshold() # (700, 10, 10) — GC のしきい値
gc.get_count() # 現在の世代ごとのオブジェクト数
gc.collect() # 手動で GC を実行
# GC の統計情報
gc.set_debug(gc.DEBUG_STATS)
# ── メモリ使用量の確認 ──
print(sys.getsizeof([])) # 56 bytes(空リスト)
print(sys.getsizeof([1, 2, 3])) # 88 bytes
print(sys.getsizeof({})) # 64 bytes(空辞書)
print(sys.getsizeof("hello")) # 54 bytes
# __slots__ によるメモリ最適化
class WithDict:
def __init__(self, x, y):
self.x = x
self.y = y
class WithSlots:
__slots__ = ('x', 'y')
def __init__(self, x, y):
self.x = x
self.y = y
print(sys.getsizeof(WithDict(1, 2))) # 48 bytes + __dict__
print(sys.getsizeof(WithSlots(1, 2))) # 48 bytes(__dict__ なし)
# 大量のインスタンスではメモリ差が顕著
# WithDict: 1M instances ≈ 200MB
# WithSlots: 1M instances ≈ 64MB
15.3 GIL(Global Interpreter Lock)
"""
GIL の仕組み:
- CPython のインタープリタは一度に1つのスレッドしか Python バイトコードを実行しない
- I/O 操作時には GIL を解放するため、I/O バウンドな処理ではスレッドが有効
- CPU バウンドな処理ではマルチスレッドでも並列化されない
Python 3.13+ のフリースレッドモード:
- --disable-gil フラグまたは PYTHON_GIL=0 環境変数で GIL を無効化
- 真のマルチスレッド並列実行が可能に
- まだ実験段階
"""
import time
import threading
def cpu_bound(n):
"""CPU バウンドなタスク"""
total = 0
for i in range(n):
total += i * i
return total
# シングルスレッド
start = time.perf_counter()
cpu_bound(10_000_000)
cpu_bound(10_000_000)
single_time = time.perf_counter() - start
# マルチスレッド(GIL のため高速化されない)
start = time.perf_counter()
t1 = threading.Thread(target=cpu_bound, args=(10_000_000,))
t2 = threading.Thread(target=cpu_bound, args=(10_000_000,))
t1.start(); t2.start()
t1.join(); t2.join()
multi_time = time.perf_counter() - start
print(f"Single thread: {single_time:.3f}s")
print(f"Multi thread: {multi_time:.3f}s")
# → ほぼ同じ時間(GIL の制約)
15.4 メタクラスとディスクリプタ
# ── メタクラス ──
class SingletonMeta(type):
"""シングルトンパターンを実装するメタクラス"""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
class Database(metaclass=SingletonMeta):
def __init__(self, url: str = "default"):
self.url = url
db1 = Database("postgresql://localhost/db1")
db2 = Database("postgresql://localhost/db2")
assert db1 is db2 # True — 同じインスタンス
print(db1.url) # postgresql://localhost/db1
# ── ディスクリプタ ──
class Validated:
"""バリデーション付き属性のディスクリプタ"""
def __init__(self, validator=None):
self.validator = validator
def __set_name__(self, owner, name):
self.name = name
self.private_name = f"_{name}"
def __get__(self, obj, objtype=None):
if obj is None:
return self
return getattr(obj, self.private_name, None)
def __set__(self, obj, value):
if self.validator:
self.validator(value)
setattr(obj, self.private_name, value)
def positive_number(value):
if not isinstance(value, (int, float)):
raise TypeError(f"Expected number, got {type(value).__name__}")
if value <= 0:
raise ValueError(f"Expected positive number, got {value}")
class Product:
name = Validated(lambda v: None if isinstance(v, str) and v else (_ for _ in ()).throw(ValueError("Invalid name")))
price = Validated(positive_number)
quantity = Validated(positive_number)
def __init__(self, name: str, price: float, quantity: int):
self.name = name
self.price = price
self.quantity = quantity
product = Product("Widget", 19.99, 100)
# product.price = -5 # ValueError: Expected positive number, got -5
16. パフォーマンス最適化
16.1 プロファイリング
# ── cProfile: 関数レベルのプロファイリング ──
import cProfile
import pstats
def slow_function():
result = []
for i in range(100000):
result.append(i ** 2)
return sum(result)
# プロファイリング実行
cProfile.run('slow_function()', 'profile_output')
# 結果の分析
stats = pstats.Stats('profile_output')
stats.strip_dirs()
stats.sort_stats('cumulative')
stats.print_stats(10)
# コマンドラインから実行
# python -m cProfile -o output.prof script.py
# python -m pstats output.prof
# ── line_profiler: 行レベルのプロファイリング ──
# pip install line-profiler
# @profile デコレータを付けて:
# kernprof -l -v script.py
# ── timeit: マイクロベンチマーク ──
import timeit
# リスト内包表記 vs ループ
list_comp = timeit.timeit('[x**2 for x in range(1000)]', number=10000)
loop_style = timeit.timeit('''
result = []
for x in range(1000):
result.append(x**2)
''', number=10000)
print(f"List comp: {list_comp:.4f}s")
print(f"Loop: {loop_style:.4f}s")
# リスト内包表記の方が約30%高速
16.2 最適化テクニック
# ── 1. 適切なデータ構造の選択 ──
# ルックアップ: list O(n) vs set/dict O(1)
import time
large_list = list(range(1_000_000))
large_set = set(large_list)
# list での検索
start = time.perf_counter()
999_999 in large_list
print(f"List lookup: {time.perf_counter() - start:.6f}s")
# set での検索
start = time.perf_counter()
999_999 in large_set
print(f"Set lookup: {time.perf_counter() - start:.6f}s")
# Set は数千倍高速
# ── 2. ジェネレータによるメモリ節約 ──
# Bad: 全データをメモリに載せる
# all_data = [process(x) for x in range(10_000_000)]
# Good: ジェネレータで逐次処理
# def process_stream(data):
# for x in data:
# yield process(x)
# ── 3. 文字列の結合 ──
# Bad: 文字列の連結(O(n^2))
# result = ""
# for word in words:
# result += word + " "
# Good: join を使用(O(n))
result = " ".join(words)
# ── 4. collections モジュール ──
from collections import defaultdict, Counter, deque
# Counter: 要素の出現回数
words = ["apple", "banana", "apple", "cherry", "banana", "apple"]
counts = Counter(words)
print(counts.most_common(2)) # [('apple', 3), ('banana', 2)]
# defaultdict: キーが存在しない場合にデフォルト値
word_positions = defaultdict(list)
for i, word in enumerate(words):
word_positions[word].append(i)
# deque: 両端キュー(先頭の追加/削除が O(1))
queue = deque(maxlen=1000)
queue.append("item")
queue.appendleft("priority_item")
oldest = queue.popleft()
# ── 5. itertools によるメモリ効率的な処理 ──
from itertools import islice, chain, groupby, product, combinations
# 大量のデータから最初の N 件だけ取得
first_10 = list(islice(range(1_000_000), 10))
# 複数のイテラブルを連結
combined = chain(range(5), range(10, 15))
17. セキュリティ
17.1 セキュリティのベストプラクティス
# ── 入力バリデーション ──
import re
from html import escape
def sanitize_input(user_input: str) -> str:
"""ユーザー入力のサニタイズ"""
# HTML エスケープ
sanitized = escape(user_input)
# 制御文字の除去
sanitized = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', sanitized)
return sanitized
# ── パスワードのハッシュ化 ──
import hashlib
import secrets
def hash_password(password: str) -> str:
"""bcrypt/scrypt/argon2 を推奨。最低限 PBKDF2 を使用"""
salt = secrets.token_hex(32)
hashed = hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
salt.encode('utf-8'),
iterations=600_000 # NIST 推奨
)
return f"{salt}:{hashed.hex()}"
def verify_password(password: str, stored_hash: str) -> bool:
salt, hash_hex = stored_hash.split(':')
hashed = hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
salt.encode('utf-8'),
iterations=600_000
)
return secrets.compare_digest(hashed.hex(), hash_hex)
# ── 推奨: bcrypt または argon2 ──
# pip install bcrypt
import bcrypt
def hash_password_bcrypt(password: str) -> str:
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def verify_password_bcrypt(password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode(), hashed.encode())
# ── シークレット管理 ──
import os
# 環境変数から取得(ハードコーディング禁止)
DATABASE_URL = os.environ["DATABASE_URL"]
API_KEY = os.environ["API_KEY"]
# セキュアなトークン生成
token = secrets.token_urlsafe(32) # URL セーフなランダムトークン
otp = secrets.randbelow(1_000_000) # 6桁の OTP
# ── SQL インジェクション対策 ──
# Bad: 文字列フォーマットでの SQL 構築
# cursor.execute(f"SELECT * FROM users WHERE name = '{user_input}'")
# Good: パラメータ化クエリ
# cursor.execute("SELECT * FROM users WHERE name = %s", (user_input,))
# SQLAlchemy の場合は自動的にパラメータ化される
# ── 安全な一時ファイル ──
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=True) as f:
f.write("sensitive data")
# ファイルは with ブロック終了時に自動削除
# ── 依存関係のセキュリティチェック ──
# pip install pip-audit
# pip-audit # 脆弱性スキャン
# pip install safety
# safety check # 既知の脆弱性をチェック
17.2 暗号化
# pip install cryptography
from cryptography.fernet import Fernet
# ── 対称鍵暗号化 ──
# 鍵の生成
key = Fernet.generate_key()
cipher = Fernet(key)
# 暗号化と復号化
message = b"Secret message"
encrypted = cipher.encrypt(message)
decrypted = cipher.decrypt(encrypted)
assert decrypted == message
# ── JWT トークン ──
# pip install PyJWT
import jwt
from datetime import datetime, timedelta, timezone
SECRET_KEY = "your-secret-key"
def create_token(user_id: int) -> str:
payload = {
"user_id": user_id,
"exp": datetime.now(timezone.utc) + timedelta(hours=24),
"iat": datetime.now(timezone.utc),
}
return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
def verify_token(token: str) -> dict:
try:
return jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
raise ValueError("Token has expired")
except jwt.InvalidTokenError:
raise ValueError("Invalid token")
18. デプロイメントとDevOps
18.1 Docker
# ── マルチステージビルド ──
# Dockerfile
# ── ステージ 1: ビルド ──
FROM python:3.12-slim AS builder
WORKDIR /app
# 依存関係のインストール(キャッシュ効率化のため先にコピー)
COPY pyproject.toml .
RUN pip install --no-cache-dir --prefix=/install .
# ── ステージ 2: ランタイム ──
FROM python:3.12-slim AS runtime
# セキュリティ: root 以外のユーザーで実行
RUN groupadd -r appuser && useradd -r -g appuser appuser
WORKDIR /app
# ビルドステージからパッケージをコピー
COPY --from=builder /install /usr/local
# アプリケーションコードをコピー
COPY src/ ./src/
# 環境変数の設定
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONPATH=/app/src
# ヘルスチェック
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"
USER appuser
EXPOSE 8000
CMD ["python", "-m", "uvicorn", "mypackage.main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
services:
app:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://postgres:password@db:5432/mydb
- REDIS_URL=redis://redis:6379/0
depends_on:
db:
condition: service_healthy
redis:
condition: service_started
restart: unless-stopped
db:
image: postgres:16-alpine
environment:
POSTGRES_DB: mydb
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 5s
retries: 5
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
volumes:
postgres_data:
redis_data:
18.2 CI/CD(GitHub Actions)
# .github/workflows/ci.yml
name: CI
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12"]
services:
postgres:
image: postgres:16
env:
POSTGRES_DB: test_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install uv
uses: astral-sh/setup-uv@v3
- name: Install dependencies
run: |
uv venv
uv pip install -e ".[dev]"
- name: Lint with ruff
run: |
uv run ruff check src/ tests/
uv run ruff format --check src/ tests/
- name: Type check with mypy
run: uv run mypy src/
- name: Run tests
env:
DATABASE_URL: postgresql://postgres:password@localhost:5432/test_db
run: uv run pytest --cov --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v4
with:
file: ./coverage.xml
18.3 コードフォーマットとリンティング
# pyproject.toml — Ruff(高速な Python リンター兼フォーマッター)
[tool.ruff]
target-version = "py310"
line-length = 100
[tool.ruff.lint]
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # Pyflakes
"I", # isort
"N", # pep8-naming
"UP", # pyupgrade
"B", # flake8-bugbear
"A", # flake8-builtins
"SIM", # flake8-simplify
"TCH", # flake8-type-checking
"RUF", # Ruff-specific rules
]
ignore = [
"E501", # line too long (format で対応)
]
[tool.ruff.lint.isort]
known-first-party = ["mypackage"]
[tool.ruff.format]
quote-style = "double"
indent-style = "space"
# Ruff の使用
ruff check src/ # リンティング
ruff check --fix src/ # 自動修正
ruff format src/ # フォーマット
# pre-commit の設定
# pip install pre-commit
# .pre-commit-config.yaml
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.0
hooks:
- id: ruff
args: [--fix]
- id: ruff-format
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.11.0
hooks:
- id: mypy
additional_dependencies: [pydantic>=2.0]
# 初期設定と実行
pre-commit install # Git フックにインストール
pre-commit run --all-files # 全ファイルに対して実行
19. 高度なトピック
19.1 デザインパターン
# ── Strategy パターン ──
from abc import ABC, abstractmethod
from dataclasses import dataclass
class CompressionStrategy(ABC):
@abstractmethod
def compress(self, data: bytes) -> bytes: ...
@abstractmethod
def decompress(self, data: bytes) -> bytes: ...
class GzipStrategy(CompressionStrategy):
def compress(self, data: bytes) -> bytes:
import gzip
return gzip.compress(data)
def decompress(self, data: bytes) -> bytes:
import gzip
return gzip.decompress(data)
class LZ4Strategy(CompressionStrategy):
def compress(self, data: bytes) -> bytes:
import lz4.frame
return lz4.frame.compress(data)
def decompress(self, data: bytes) -> bytes:
import lz4.frame
return lz4.frame.decompress(data)
@dataclass
class FileProcessor:
strategy: CompressionStrategy
def save(self, data: bytes, path: str) -> None:
compressed = self.strategy.compress(data)
with open(path, 'wb') as f:
f.write(compressed)
def load(self, path: str) -> bytes:
with open(path, 'rb') as f:
compressed = f.read()
return self.strategy.decompress(compressed)
# 使用例
processor = FileProcessor(strategy=GzipStrategy())
processor.save(b"Hello, World!", "data.gz")
# ── Observer パターン ──
from typing import Callable
class EventEmitter:
def __init__(self):
self._listeners: dict[str, list[Callable]] = {}
def on(self, event: str, callback: Callable) -> None:
self._listeners.setdefault(event, []).append(callback)
def emit(self, event: str, *args, **kwargs) -> None:
for callback in self._listeners.get(event, []):
callback(*args, **kwargs)
def off(self, event: str, callback: Callable) -> None:
if event in self._listeners:
self._listeners[event].remove(callback)
# 使用例
emitter = EventEmitter()
emitter.on("user.created", lambda user: print(f"New user: {user}"))
emitter.on("user.created", lambda user: print(f"Sending welcome email to {user}"))
emitter.emit("user.created", "Alice")
# ── Repository パターン ──
from abc import ABC, abstractmethod
from typing import TypeVar, Generic
T = TypeVar('T')
class Repository(ABC, Generic[T]):
@abstractmethod
async def get(self, id: int) -> T | None: ...
@abstractmethod
async def list(self, offset: int = 0, limit: int = 100) -> list[T]: ...
@abstractmethod
async def create(self, entity: T) -> T: ...
@abstractmethod
async def update(self, entity: T) -> T: ...
@abstractmethod
async def delete(self, id: int) -> bool: ...
class SQLAlchemyUserRepository(Repository):
def __init__(self, session):
self.session = session
async def get(self, id: int):
return await self.session.get(User, id)
async def list(self, offset: int = 0, limit: int = 100):
stmt = select(User).offset(offset).limit(limit)
result = await self.session.scalars(stmt)
return result.all()
async def create(self, entity):
self.session.add(entity)
await self.session.commit()
return entity
async def update(self, entity):
await self.session.merge(entity)
await self.session.commit()
return entity
async def delete(self, id: int):
user = await self.get(id)
if user:
await self.session.delete(user)
await self.session.commit()
return True
return False
19.2 CLI ツール開発
# ── Click(推奨) ──
# pip install click
import click
@click.group()
@click.version_option(version="1.0.0")
def cli():
"""My CLI Application"""
pass
@cli.command()
@click.argument('name')
@click.option('--greeting', '-g', default='Hello', help='Greeting word')
@click.option('--count', '-c', default=1, help='Number of times to greet')
@click.option('--verbose', '-v', is_flag=True, help='Enable verbose mode')
def greet(name: str, greeting: str, count: int, verbose: bool):
"""Greet someone by name"""
for _ in range(count):
message = f"{greeting}, {name}!"
click.echo(click.style(message, fg='green'))
if verbose:
click.echo(f"Greeted {name} {count} time(s)")
@cli.command()
@click.option('--format', 'fmt', type=click.Choice(['json', 'yaml', 'table']), default='table')
@click.option('--output', '-o', type=click.Path(), help='Output file path')
def export(fmt: str, output: str | None):
"""Export data in various formats"""
data = {"users": [{"name": "Alice"}, {"name": "Bob"}]}
if fmt == 'json':
import json
result = json.dumps(data, indent=2)
elif fmt == 'yaml':
import yaml
result = yaml.dump(data)
else:
result = "Name\n----\nAlice\nBob"
if output:
with open(output, 'w') as f:
f.write(result)
click.echo(f"Exported to {output}")
else:
click.echo(result)
@cli.command()
@click.confirmation_option(prompt='Are you sure you want to reset?')
def reset():
"""Reset all data (with confirmation)"""
click.echo("Data has been reset")
if __name__ == '__main__':
cli()
# 使用例:
# python cli.py greet Alice --greeting Hi --count 3
# python cli.py export --format json -o output.json
# python cli.py reset
# ── argparse(標準ライブラリ) ──
import argparse
parser = argparse.ArgumentParser(description="My Tool")
subparsers = parser.add_subparsers(dest="command")
# サブコマンド: init
init_parser = subparsers.add_parser("init", help="Initialize project")
init_parser.add_argument("name", help="Project name")
init_parser.add_argument("--template", "-t", default="default", choices=["default", "minimal", "full"])
# サブコマンド: run
run_parser = subparsers.add_parser("run", help="Run the project")
run_parser.add_argument("--port", "-p", type=int, default=8000)
run_parser.add_argument("--debug", action="store_true")
args = parser.parse_args()
19.3 Python 3.12+ の新機能
# ── Python 3.12: 新しいジェネリクス構文 ──
# 従来
from typing import TypeVar, Generic
T = TypeVar('T')
class OldBox(Generic[T]):
def __init__(self, content: T) -> None:
self.content = content
# Python 3.12+
class Box[T]:
def __init__(self, content: T) -> None:
self.content = content
def first[T](items: list[T]) -> T:
return items[0]
type Point = tuple[float, float]
type Matrix[T] = list[list[T]]
# ── Python 3.12: f-string の制限緩和 ──
# ネストされた f-string が可能に
names = ["Alice", "Bob", "Charlie"]
print(f"Users: {", ".join(f"{name!r}" for name in names)}")
# 複数行の式が使えるように
result = f"Total: {
sum(
x ** 2
for x in range(10)
)
}"
# ── Python 3.13: 実験的フリースレッドモード ──
# python3.13t script.py # フリースレッドビルド
# PYTHON_GIL=0 python3.13t script.py # GIL を無効化
# ── Python 3.13: 実験的 JIT コンパイラ ──
# PYTHON_JIT=1 python3.13 script.py # JIT を有効化
20. まとめとベストプラクティス
20.1 Python プロジェクトのベストプラクティス一覧
| カテゴリ | ベストプラクティス |
|---|---|
| プロジェクト構造 | src/ レイアウト + pyproject.toml |
| パッケージ管理 | uv または poetry で依存関係をロック |
| コードスタイル | ruff でリンティング&フォーマット |
| 型チェック | mypy --strict で静的解析 |
| テスト | pytest + カバレッジ 80% 以上 |
| ログ | structlog で構造化ロギング |
| セキュリティ | pip-audit で脆弱性チェック |
| CI/CD | GitHub Actions + pre-commit |
| Docker | マルチステージビルド + 非 root ユーザー |
| データバリデーション | Pydantic でスキーマを定義 |
20.2 パフォーマンスの考え方
1. まず正しく動くコードを書く
2. プロファイリングでボトルネックを特定する
3. 適切なデータ構造を選ぶ(set/dict のルックアップは O(1))
4. 組み込み関数を活用する(map, filter, sorted など)
5. ジェネレータでメモリ使用量を抑える
6. I/O バウンド → asyncio / threading
7. CPU バウンド → multiprocessing
8. それでも足りなければ → Cython, Rust 拡張, NumPy
20.3 推奨ツールチェーン(2024–2025)
パッケージ管理: uv(高速)/ poetry(安定)
リンター: ruff
フォーマッター: ruff format
型チェッカー: mypy / pyright
テスト: pytest + pytest-cov
セキュリティ: pip-audit + bandit
ドキュメント: mkdocs-material / Sphinx
タスクランナー: Makefile / just / nox
コンテナ: Docker + docker-compose
CI: GitHub Actions
本記事は Python 3.10 〜 3.13 を対象として執筆されています。Python は活発に開発が続けられており、最新の変更については公式ドキュメント(https://docs.python.org)を参照してください。