【FastAPI入門】MySQL v8.0系と連携したCRUD処理の実装

Streamlitはデータ可視化等のデータサイエンス向けに特化したフレームワークであるに対し、FastAPIPython 3.6+ 向けに設計されたWeb API フレームワークです。

FastAPIはその名の通り非常に高速に動作する APIであり、非同期処理にも対応しています。フロントエンドをStreamlitで開発し、バックエンドはFastAPI + DBで開発すれば、HTML5やCSS、JavaScriptに詳しくない方でも、Python言語だけで、フロントエンドとバックエンドが連携した本格的なWebアプリが開発できます。これは学ぶ価値が有るかと思い学習してみました。DBにはFastAPIからMySQLにアクセス(同期接続)しています。この時の注意点もありますので忘れないようにメモしておきます。

またここでは、HTTPリクエスト・レスポンスの動作確認に、Pythonのrequestsモジュールを使っています。

開発環境

  • OS: Ubuntu 22.04 (Debian Linux 12でも動作確認済)
  • Python 3.10.12の仮想環境(virtualenv 20.13.0)
  • FastAPI: v0.116.1 (バックエンド)
  • MySQL v8.3.0 (Dockerコンテナ: Docker Compose version v2.18.1)
  • requests (Pythonモジュール)

テスト環境構築

今回データベースにmysql v8.3.0を使いますがこれはDockerコンテナで作成します。それ以外はPythonの仮想環境に入って各種モジュールをインストールします。

MySQL v8.3.0(Docker環境)

docker-compose.yml

services:
  db:
    image: mysql:8.3.0 
    ports:
      - 3316:3306
    volumes:
      - './db:/var/lib/mysql'  
    environment:
      MYSQL_ROOT_PASSWORD: root_pass
      MYSQL_DATABASE: st_db 
      MYSQL_USER: st_user
      MYSQL_PASSWORD: st_pass
  phpmyadmin:
    image: phpmyadmin/phpmyadmin:latest
    depends_on:
      - db
    environment:
      - PMA_ARBITRARY=1
      - PMA_HOSTS=db
      - PMA_USER=root
      - PMA_PASSWORD=root_pass
    ports:
      - 8092:80

これをDockerコマンドで起動します。

sudo docker compose up -d

またホスト側のmysql-clientから接続する方法は次の通りです。

mysql -u root -p -h localhost -P 3316 --protocol=tcp

デバッグ用としてphpMyAdminも入ってますので、この設定ではlocalhost:8092 にアクセスすればphpMyAdmin画面が表示されます。これはなくても動作には関係ありません。

FastAPI(Python仮想環境)

Uvicornは、非常に高速なASGI(Asynchronous Server Gateway Interface)サーバーでFastAPIを起動するときに必要なので一緒にインストールします。

同期接続のDBドライバーにmysql-connectorとpymysqlをインストールします。本当はどちらか一つで良いのですが、注意すべきことが分かったので比較評価のため、ここでは両方インストールしておきます。

インストール

pip install fastapi
pip install uvicorn[standard]
pip install mysql-connector 
pip install pymysql

起動方法

プロジェクトのルートフォルダ直下に"server.py"を作成し、FastAPIのappインスタンスをport=8000番で起動する場合。--reloadはコードの変更を検知してサーバーを自動的に再起動するオプションです。付けた方が開発には便利です。

uvicorn  server:app --reload --port 8000
requests(Python仮想環境)

requestsはHTTPライブラリでWebデータの操作が簡単にできて便利です。requestsライブラリは標準ライブラリではないため、pipを使ってインストールする必要があります。これをフロントエンドとして使用します。

pip install requests

requestsライブラリはWeb APIとの通信やWebスクレイピングによく使われます。実行方法は、

python client.py で実行します。

基本的な使い方(GET)

HTTPリクエストの基本であるGETメソッドについて解説します。requestsからFastAPIで作成したWebAPIのURLにアクセスします。

client.pyとserver.py

client.py(requests)

①パスパラメータ
URLパスの一部に特定の情報を入れる。
投稿IDなど。
URLにf文字列(f-strings)を使うのが一般的。 f"http://localhost:8000/hello/{name}"
ですが、文字の連結
"http://localhost:8000/hello/"+ name
でも認識しました。

import requests

# パスパラメータ:
name = "太陽の神 ニカ"
res = requests.get(f"http://localhost:8000/hello/{name}")
# res = requests.get("http://localhost:8000/hello/"+ name)
print(res.status_code)
print(res.text)
200
"Hello 太陽の神 ニカさん!!"

②クエリパラメータ
第2引数のparams引数に辞書形式で渡すことで(, params={"q": "japan"})、自動的にURLにエンコードされます。
ここではパラメータ名は"q"です。

# クエリーパラメータ
res = requests.get("http://localhost:8000/search"
                         , params={"q": "japan"})
print(res.status_code)
print(res.text)

実行結果

server.js(FastAPI)

①パスパラメータ
app = FastAPI()でインスタンスを生成。
@app.get("/...")はデコレーターと呼ばれるもので、その下の関数がリクエストメソッド(get)とパス("/hello/{name}")に対応することをFastAPIに伝えます。
def say_hello(name):の引数名とパスパラメータの変数名は合わせてください。

from fastapi import FastAPI

app = FastAPI()

# パスパラメータ
@app.get("/hello/{name}")
def say_hello(name):
    return f"Hello {name}さん!!"

※)このサンプルコードは
クライアントから送られたnameパラメータが入った文字列を返すだけのAPIです。

②クエリパラメータ
関数の引数として宣言するだけで、FastAPIがURLのクエリパラメータを自動的に取得します。パラメータ名はqは送受信間で合わせてください。

app = FastAPI()
# クエリパラメータ
@app.get("/search/")
def search_word(q: str):
    return {"query": q}
200
{"query":"'Japan"}

FastAPIでは、POST、PUT、DELETEといった他のHTTPメソッドも同様に使うことができます。MySQLとの連携のときに詳しく見ていきます。

MySQLとの連携

usersテーブルの作成

Dockerコンテナ内に入って事前に準備してます。SQL文は以下の通りです。

DBの準備

mysql>  drop table if exists users;
mysql>  create table users (
    id int auto_increment PRIMARY KEY,
    nickname VARCHAR(50) NOT NULL UNIQUE,
    name VARCHAR(50) NOT NULL,
    password VARCHAR(100) not NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
)DEFAULT CHARSET=utf8mb4;
mysql> desc users;
+-------------+------------------+------+-----+-----------------------------+-------------------------------+
| Field           | Type                  | Null | Key | Default                              | Extra                                     |
+-------------+------------------+------+-----+------------------------------+------------------------------+
| id                  | int                      | NO   | PRI | NULL                                   | auto_increment              |
| nickname  | varchar(50)    | NO   | UNI | NULL                                  |                                                |
| name           | varchar(50)   | NO   |         | NULL                                    |                                                |
| password   | varchar(100) | NO  |         | NULL                                    |                                                |
| created_at | timestamp    | NO   |        | CURRENT_TIMESTAMP | DEFAULT_GENERATED |
+---------------+----------------+------+-----+-------------------------------+------------------------------+
5 rows in set (0.00 sec)
FastAPI v0.116.1とMySQL v8.3.0との接続

データベースst_dbに同期接続してusersテーブルを操作します。

まだユーザーは未登録状態(usersテーブル空の状態)ですが接続してみます。

client.py⇨server.py 接続(Read)

client.py

import requests

res = requests.get("http://localhost:8000/users")
print(res.status_code)
print(res.text)
# 実行結果

200
null

server.py

from fastapi import FastAPI
import mysql.connector as mysqlcon

app = FastAPI()

# 全ユーザー抽出
@app.get("/users")
def get_allusers():
    try:
        conn = mysqlcon.connect(
            host="localhost",
            port = 3316,
            user="root", # ← Dockerコンテナではroot権限でないと接続できなかった!!
            password="root_pass",   # ← rootユーザーのpassword、テストなので直書きしてます。
            database = "st_db",
        )

        cur = conn.cursor()
        cur.execute("select * from users")
        rows = cur.fetchall()  # ← 全件抽出
        conn.close()
        print("成功")
        # FastAPIのレスポンスは通常、JSON形式で返されます。
        return rows

    except Exception as e:
        print(f"エラー発生しました: {e}")
# 実行結果

エラー発生しました: Authentication plugin 'caching_sha2_password' is not supported
Authentication plugin 'caching_sha2_password' is not supportedとは?

MySQL 8系でデフォルトの認証方式caching_sha2_password になっていることが原因です。mysql-connector-pythonではこの認証方式をサポートしていないため、接続時にエラーとなりました。

解決策
  1. MySQLユーザーの認証方式を mysql_native_password に変更する。
  2. Python側のコネクタを mysql-connectorからPyMySQL に変更する。
    PyMySQL caching_sha2_password をサポートしていますが、mysql-connectorは非対応です。

安全性はcaching_sha2_passwordの方が高いので、セキュリティの観点からこの認証方式を使った方が良いので、PyMySQLに切り替えます。

server.pyの変更点

from fastapi import FastAPI
#import mysql.connector as mysqlcon  # ← 使用せず
import pymysql  # ← 新規追加

@app.get("/users")
def get_allusers():
    try:
        #conn = mysqlcon.connect(   # ← 使用せず
        conn = pymysql.connect(       # ← こちらを採用
    ...省略...
# server.pyの実行結果

成功
INFO:     127.0.0.1:50404 - "GET /users HTTP/1.1" 200 OK

client.py

# client.pyの実行結果

200
[]

成功しました。

新規登録(Create)

ボディ・データ受信(POST)

ボディー・リクエストをJSONデータで受け取るには、PydanticというライブラリのBaseModelを使用します。BaseModelを継承したクラスで定義されたデータに基づいて整合性をチェックします。

さらに、FastAPIがPostリクエストで受け取ったデータはBaseModelのインスタンスですから、MySQLにアクセスできるようにタプルに変換します

class User(BaseModel):
    nickname : str
    name : str
    password : str

# 新規登録
@app.post("/user/create")
def save_user(new_user :User):
    # 辞書に変換
    user_dict = dict(new_user)
    # タプルに変換
    user_tupple = (
        user_dict["nickname"],
        user_dict["name"],
        user_dict["password"]
    )
...省略...
SQLのプレースホルダーについて

usersテーブルに新規追加する場合のコードは、以下のようになります。

cur.execute(
    "INSERT INTO users (nickname, name, password) VALUES (%s, %s, %s)", user_data)

プレースホルダー記法で「%s」を使っていますが、Sqlite3なら「?」でした。DBによって書き方が変わります。因みにPHP言語のPDOクラスは「?」でした。「?」に慣れていると「%s」に少し驚きますが。また「%s」は必ずしも文字列を表しません。整数値だから「%d」とするとエラーになります。整数値でも「%s」です。C言語風の型指定は行いませんので要注意です。

実装例

client.py⇨server.py 新規作成(Create)

client.py

res = requests.post('http://localhost:8000/user/create',
            json={"nickname":"robinchan","name":"ニコ ロビン", "password": "pass123"}  )
print(res.status_code)
print(res.text)

json引数に辞書を渡すと、自動的にJSON形式に変換されて送信されます。

# 実行結果
200
true

server.py

from pydantic import BaseModel 

class User(BaseModel):
    nickname : str
    name : str
    password : str

@app.post("/user/create")
def save_user(new_user :User):
    print(new_user)  # nickname='robin' name='ニコ ロビン' password='robin123'
    # 辞書に変換
    user_dict = dict(new_user)
    # タプルに変換
    user_tupple = (
        user_dict["nickname"],
        user_dict["name"],
        user_dict["password"]
    )
    print(user_tupple)  # ('robin', 'ニコ ロビン', 'robin123')
    try:
        conn = pymysql.connect(
            host="localhost",
            port = 3316,
            user="root",
            password="root_pass",
            database = "st_db",
        )

        cur = conn.cursor()
        cur.execute("insert into users (nickname, name, password) values(%s, %s, %s) ", user_tupple)
        conn.commit()
        conn.close()
        print("登録成功")
        return True

    except Exception as e:
        print(f"エラー発生しました: {e}")
        return False

削除(Delete)

プレースホルダーの注意点

削除する場合、where句で条件指定しますが、匿名プレースホルダーが一つでもタプルと分かるように(id,)のように「,」を付けてください。知らないと少し嵌ります!

cur.execute("delete from users where id = %s",(id,))
実装例

client.py⇨server.py (削除)

client.py

user_id =3
res = requests.delete(f'http://localhost:8000/user/{user_id}/destroy')
print(res.status_code)
print(res.text)

server.py

# 削除
@app.delete("/user/{user_id}/destroy")
def delete_user_byid(user_id):
    id= int(user_id)
    try:
        conn = pymysql.connect(
            host="localhost",
            port = 3316,
            user="root",
            password="root_pass",
            database = "st_db",
        )

        cur = conn.cursor()
        cur.execute("select count(*) from users where id=%s",(id,))
        result = cur.fetchone()
        cnt = result[0]
        print(f"id={id} は {cnt} 件です")
        if cnt > 0:
            #削除
            print("削除します")
            cur.execute("delete from users where id = %s",(id,))
            conn.commit()
            conn.close()
            return True
        else :
            print("該当データがありません")
            conn.close()
            return False
        
    except Exception as e:
        print(f"エラー発生しました: {e}")
        return False

削除したいidがデータベースにないといけないので、その有無を最初にみて、有れば削除するようにしています。

更新(Update)

usersテーブル

実装例

client.py⇨server.py (更新)

client.py

user_id=5
res = requests.put(f'http://localhost:8000/user/{user_id}/update',
            json={"nickname":"zoro","name":"ロロノア ゾロ", "password": "zoro123"}  )
print(res.status_code)
print(res.text)

パスクエリが含まれますので、URLはf文字列(f-strings)で記載することを忘れないでください。

server.py

@app.put("/user/{user_id}/update")
def update_user(user_id:int, newuser: User):
    id= int(user_id)
    newuser_dic = dict(newuser)
    newuser_tupple = (
        newuser_dic["nickname"],
        newuser_dic["name"],
        newuser_dic["password"],
        id
    )
    try:
        conn = pymysql.connect(
            host="localhost",
            port = 3316,
            user="root",
            password="root_pass",
            database = "st_db",
        )

        cur = conn.cursor()
        cur.execute("select count(*) from users where id=%s",(id,))
        result = cur.fetchone()
        cnt = result[0]
        print(f"id={id} は {cnt} 件です")
        if cnt > 0:
            #更新処理
            cur.execute("update users set nickname = %s, name = %s, password = %s
                where id = %s" , newuser_tupple)
            conn.commit()
            conn.close()
            return True
        else:
            #何もしない
            return False
        
    except Exception as e:
        print(f"エラー発生しました: {e}")
        return False

以上です。😀

Follow me!