Extensions

SQLAlchemy

In this chapter we will talk about one of the most important features of a web application: Databases! We are assuming that you are already familiar with SQLAlchemy V2 and only explain how to work with our built-in sqlalchemy extension.

Your first model

Like Flask-SQLAlchemy our extension does provide its own Model class. So you do not need to set up your own DeclarativeBase. But of course you are not forced to stick with ours. We will show you a quick example of using your own db instance in the end of this chapter.

For now let's just get started by creating your first simple model:

fluid/models.py python
from webfluid.core.ext import db
from sqlalchemy.orm import Mapped, mapped_column


class MyModel(db.Model):
    id: Mapped[int] = mapped_column(primary_key=True)
    value: Mapped[str]

    def __init__(self, value: str):
        self.value = value

Integrating SQLAlchemy

Now you can start using your model inside your application. Our extension provides sync and async utility. So when you configure your database uris, do not add the driver package to the uri string. The framework does that for you. So your config should look like this:

app_configs/app.ini ini
[general]
SECRET_KEY = supersecret

[data]
DATABASE_URI = sqlite:///app.db

[extensions]
EXT_SCHEDULING = 0
EXT_SQLALCHEMY = 1

Our SQLAlchemy extension will then derive the sync and async engine with their correct driver from your database uri. We provide this functionality for the most common dialects: sqlite, postgresql and mysql.

Alright, now that we've everything in place, we can start working with your model. Our extension provides an executor that simplifies your work with sqlalchemy:

main.py python
from webfluid import Fluid
from webfluid.core.config import register_config
from webfluid.core.ext import scheduler, db
from fastapi import Request
from fastapi.responses import HTMLResponse
from fastapi.exceptions import HTTPException
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy import select

from extension.main import MyExtension
from fluid.jobs import heart
from fluid.models import MyModel


@register_config(10)
class MyConfig:
    SESSION_COOKIE_SECURE = True

    MYEXT_BAR = "crazy"

    # Usually you would define the SQLALCHEMY_DATABASE_URI, but the DATABASE_URI value
    # from the runtimes' environment is already the default:

    # SQLALCHEMY_DATABASE_URI = os.getenv("DATABASE_URI", "sqlite:///app.db")

fluid = Fluid(__name__)

my_ext = MyExtension(fluid)
scheduler.add_job(heart, IntervalTrigger(seconds=5))


@fluid.get("/", response_class=HTMLResponse)
async def home():
    return await fluid.render(
        "index.html",
        title="Hello World!",
        name="my friend"
    )


@fluid.get("/health")
async def health():
    return {"status": "ok"}


@fluid.get("/my-model")
async def get_model(request: Request):
    model_id = request.query_params.get("id")
    if not model_id:
        raise HTTPException(status_code=400, detail="Bad Request")

    async with db.async_executor() as e:
        # Here you should note one of WebFluids opinions:
        # The following call returns a ScalarResult per default.
        # You need to set scalars=False if you do not want that.
        results = await e.exec(
            select(MyModel).where(MyModel.id == model_id),
            # scalars=False
        )
        result = results.first()
        if not result:
            raise HTTPException(status_code=404, detail="Model not found")

        return {
            "model_id": model_id,
            "value": result.value
        }


@fluid.post("/my-model")
async def add_model(request: Request):
    value = request.query_params.get("value")
    if not value:
        raise HTTPException(status_code=400, detail="Bad Request")

    async with db.async_executor() as e:
        model = await e.insert(
            MyModel(value),
            flush=True
            # We want to flush so that we can return the id.
            # This is False per default.
        )
        return {
            "model_id": model.id,
            "value": model.value
        }

    # The extension handles the lifecycle of your session for you.

    # All changes and updates you make inside the executor context
    # are committed as soon as the interpreter leaves its scope.

    # If an exception occurs inside that scope, the session will
    # be rolled back automatically for you and will always be closed.


if __name__ == "__main__":
    # For now, you do not need to understand the following lines.
    # We will explain them in the next paragraph.
    bind = db.get_bind_for_model(MyModel)
    db.Model.metadata.create_all(bind.sync_engine)

    fluid.mix()
fluid/templates/index.html html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>{{ title }}</title>

    <script>
        function healthCheck() {
            const result = document.getElementById('result')
            fetch('/health').then(res => {
                if (res.ok) result.style.color = 'green'
                else result.style.color = 'red'
                return res.json()
            }).then(data => {
                result.innerText = JSON.stringify(data)
                setTimeout(() => result.innerText = '', 2000)
            })
        }

        function addModel() {
            const value = document.getElementById('value')
            const newModel = document.getElementById('newModel')

            const clear = () => setTimeout(() => {
                value.value = null
                newModel.innerText = ''
            }, 2000)

            if (!value.value) {
                newModel.style.color = 'red'
                newModel.innerText = 'Enter a value first!'
                clear()
                return
            }

            fetch('/my-model', {
                method: 'POST',
                headers: { 'Content-Type': 'application/json' },
                body: JSON.stringify({ value: value.value })
            }).then(res => {
                if (res.ok) newModel.style.color = 'green'
                else newModel.style.color = 'red'
                return res.json()
            }).then(data => {
                newModel.innerText = JSON.stringify(data)
                clear()
            })
        }

        function getModel() {
            const id = document.getElementById('id')
            const modelValue = document.getElementById('modelValue')

            const clear = () => setTimeout(() => {
                id.value = null
                modelValue.innerText = ''
            }, 2000)

            if (!id.value) {
                modelValue.style.color = 'red'
                modelValue.innerText = 'Provide an id first!'
                clear()
                return
            }

            fetch(`/my-model?id=${id.value}`).then(res => {
                if (res.ok) modelValue.style.color = 'green'
                else modelValue.style.color = 'red'
                return res.json()
            }).then(data => {
                modelValue.innerText = JSON.stringify(data)
                clear()
            })
        }
    </script>
</head>
<body>
    <p>Hello <b>{{ name }}</b>!</p>

    <button onclick="healthCheck()">Health Check</button>
    <p id="result"></p>

    <p>{{ my_ext() }}</p>

    <input id="value" type="text" placeholder="Your value">
    <p id="newModel"></p>
    <button onclick="addModel()">Save</button>

    <br><br>

    <input id="id" type="number" placeholder="Model id">
    <p id="modelValue"></p>
    <button onclick="getModel()">Get Model</button>
</body>
</html>

Now you have successfully integrated SQLAlchemy into your application. But you may want to find out what the hell that db.get_bind_for_model(MyModel) is...

Working with binds

Like Flask-SQLAlchemy our extension is able to handle multiple binds using bind keys. You can define binds by setting MyModel.__bind_key__ and adding the SQLALCHEMY_BINDS dictionary to your config.

Here we have a quick example of wiring it all up:

app_configs/app.ini ini
[general]
SECRET_KEY = supersecret

[data]
DATABASE_URI = sqlite:///app.db
TEST_DB_URI = sqlite:///test.db

[extensions]
EXT_SCHEDULING = 0
EXT_SQLALCHEMY = 1
fluid/models.py python
from webfluid.core.ext import db
from sqlalchemy.orm import Mapped, mapped_column


class MyModel(db.Model):
    id: Mapped[int] = mapped_column(primary_key=True)
    value: Mapped[str]

    def __init__(self, value: str):
        self.value = value


class SecondModel(db.Model):
    __bind_key__ = "test"

    id: Mapped[int] = mapped_column(primary_key=True)
    value: Mapped[str]

    def __init__(self, value: str):
        self.value = value
main.py python
from webfluid import Fluid
from webfluid.core.config import register_config
from webfluid.core.ext import scheduler, db
from fastapi import Request
from fastapi.responses import HTMLResponse
from fastapi.exceptions import HTTPException
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy import select
import os

from extension.main import MyExtension
from fluid.jobs import heart
from fluid.models import MyModel, SecondModel


@register_config(10)
class MyConfig:
    SESSION_COOKIE_SECURE = True

    MYEXT_BAR = "crazy"

    SQLALCHEMY_BINDS = {
        "test": os.getenv("TEST_DB_URI", "sqlite:///test.db")
    }

fluid = Fluid(__name__)

my_ext = MyExtension(fluid)
scheduler.add_job(heart, IntervalTrigger(seconds=5))


@fluid.get("/", response_class=HTMLResponse)
async def home():
    return await fluid.render(
        "index.html",
        title="Hello World!",
        name="my friend"
    )


@fluid.get("/health")
async def health():
    return {"status": "ok"}


@fluid.get("/my-model")
async def get_model(request: Request):
    model_id = request.query_params.get("id")
    if not model_id:
        raise HTTPException(status_code=400, detail="Bad Request")

    # The context manager resolves the correct bind for you:
    async with db.async_executor(model=SecondModel) as e:
        results = await e.exec(
            select(SecondModel).where(SecondModel.id == model_id)
        )
        result = results.first()
        if not result:
            raise HTTPException(status_code=404, detail="Model not found")

        return {
            "model_id": model_id,
            "value": result.value
        }


@fluid.post("/my-model")
async def add_model(request: Request):
    data = await request.json()
    value = data.get("value")
    if not value:
        raise HTTPException(status_code=400, detail="Bad Request")

    async with db.async_executor(model=SecondModel) as e:
        model = await e.insert(
            SecondModel(value),
            flush=True
        )
        return {
            "model_id": model.id,
            "value": model.value
        }


if __name__ == "__main__":
    # bind = db.get_bind_for_model(MyModel)
    # db.Model.metadata.create_all(bind.sync_engine)

    bind = db.get_bind_for_model(SecondModel)
    db.Model.metadata.create_all(bind.sync_engine)

    fluid.mix()

Dynamically setting binds

In some cases you may want to define the bind of your model dynamically. For example, by using a config value. Then you would use MyModel.set_bind and do something like this:

main.py python
from webfluid import Fluid
from webfluid.core.config import register_config
from webfluid.core.ext import scheduler, db
from fastapi import Request
from fastapi.responses import HTMLResponse
from fastapi.exceptions import HTTPException
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy import select
import os

from extension.main import MyExtension
from fluid.jobs import heart
from fluid.models import MyModel, SecondModel


@register_config(10)
class MyConfig:
    SESSION_COOKIE_SECURE = True

    MYEXT_BAR = "crazy"

    SQLALCHEMY_BINDS = {
        "test": os.getenv("TEST_DB_URI", "sqlite:///test.db"),
        "dynamic": os.getenv("DYNAMIC_URI", "sqlite:///dynamic.db")
    }
    MODEL_BIND = "dynamic"

fluid = Fluid(__name__)

my_ext = MyExtension(fluid)
scheduler.add_job(heart, IntervalTrigger(seconds=5))

# Now you can set your models bind dynamically.
# The "default" string is the key that resolves to the
# SQLALCHEMY_DATABASE_URI bind:
MyModel.set_bind(fluid.config.get("MODEL_BIND", "default"))


@fluid.get("/", response_class=HTMLResponse)
async def home():
    return await fluid.render(
        "index.html",
        title="Hello World!",
        name="my friend"
    )


@fluid.get("/health")
async def health():
    return {"status": "ok"}


@fluid.get("/my-model")
async def get_model(request: Request):
    model_id = request.query_params.get("id")
    if not model_id:
        raise HTTPException(status_code=400, detail="Bad Request")

    async with db.async_executor(model=MyModel) as e:
        results = await e.exec(
            select(MyModel).where(MyModel.id == model_id)
        )
        result = results.first()
        if not result:
            raise HTTPException(status_code=404, detail="Model not found")

        return {
            "model_id": model_id,
            "value": result.value
        }


@fluid.post("/my-model")
async def add_model(request: Request):
    data = await request.json()
    value = data.get("value")
    if not value:
        raise HTTPException(status_code=400, detail="Bad Request")

    async with db.async_executor(model=MyModel) as e:
        model = await e.insert(
            MyModel(value),
            flush=True
        )
        return {
            "model_id": model.id,
            "value": model.value
        }


if __name__ == "__main__":
    bind = db.get_bind_for_model(MyModel)
    db.Model.metadata.create_all(bind.sync_engine)

    fluid.mix()

Changing table names

As you may have already thought, SQLAlchemy resolves table names by converting your models' name to snake case like the Flask extension does. And so you can change the models' table name by setting __tablename__ as well:

fluid/models.py python
from webfluid.core.ext import db
from sqlalchemy.orm import Mapped, mapped_column


class MyModel(db.Model):
    id: Mapped[int] = mapped_column(primary_key=True)
    value: Mapped[str]

    def __init__(self, value: str):
        self.value = value


class SecondModel(db.Model):
    __tablename__ = "my_table"
    __bind_key__ = "test"

    id: Mapped[int] = mapped_column(primary_key=True)
    value: Mapped[str]

    def __init__(self, value: str):
        self.value = value

Leave the framework defaults

Like we've mentioned earlier, you are not forced to stick with our defaults. You can wire up extensions yourself if you do not want to rely on WebFluids conventions. Keep in mind that this may break the frameworks' functionality if you do not know what you are doing.

Anyway, we will show you how to escape from our conventions with the SQLAlchemy extension. The same counts for every other built-in extension as well. So in the following chapters we will only tell you what parameters you can pass to our extensions if you want to leave their defaults.

Okay, but now let's set up your own conventions. The SQLAlchemy extension only supports passing your own DeclarativeBase. So this will be exactly what we are doing:

app_configs/app.ini ini
[general]
SECRET_KEY = supersecret

[data]
DATABASE_URI = sqlite:///app.db

[extensions]
EXT_SCHEDULING = 0
EXT_SQLALCHEMY = 0
fluid/ext.py python
from webfluid.extensions import SQLAlchemy
from sqlalchemy.orm import DeclarativeBase


class MyBase(DeclarativeBase):
    __tablename__: str
    __bind_key__: str

    # Setting up your base like this will make it necessary
    # to define the __tablename__ for each of your models.

    # This is just an example, and we are assuming that you
    # know what you are doing if you are playing around with
    # this kind of stuff here.

db = SQLAlchemy(base=MyBase)
fluid/models.py python
from sqlalchemy.orm import Mapped, mapped_column

from fluid.ext import db


class MyModel(db.Model):
    __tablename__ = "model"

    id: Mapped[int] = mapped_column(primary_key=True)
    value: Mapped[str]

    def __init__(self, value: str):
        self.value = value
main.py python
from webfluid import Fluid
from webfluid.core.config import register_config
from webfluid.core.ext import scheduler
from fastapi import Request
from fastapi.responses import HTMLResponse
from fastapi.exceptions import HTTPException
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy import select
import os

from extension.main import MyExtension
from fluid.jobs import heart
from fluid.models import MyModel
from fluid.ext import db


@register_config(10)
class MyConfig:
    SESSION_COOKIE_SECURE = True

    MYEXT_BAR = "crazy"

fluid = Fluid(__name__)

my_ext = MyExtension(fluid)
scheduler.add_job(heart, IntervalTrigger(seconds=5))


@fluid.get("/", response_class=HTMLResponse)
async def home():
    return await fluid.render(
        "index.html",
        title="Hello World!",
        name="my friend"
    )


@fluid.get("/health")
async def health():
    return {"status": "ok"}


@fluid.get("/my-model")
async def get_model(request: Request):
    model_id = request.query_params.get("id")
    if not model_id:
        raise HTTPException(status_code=400, detail="Bad Request")

    async with db.async_executor(model=MyModel) as e:
        results = await e.exec(
            select(MyModel).where(MyModel.id == model_id)
        )
        result = results.first()
        if not result:
            raise HTTPException(status_code=404, detail="Model not found")

        return {
            "model_id": model_id,
            "value": result.value
        }


@fluid.post("/my-model")
async def add_model(request: Request):
    data = await request.json()
    value = data.get("value")
    if not value:
        raise HTTPException(status_code=400, detail="Bad Request")

    async with db.async_executor(model=MyModel) as e:
        model = await e.insert(
            MyModel(value),
            flush=True
        )
        return {
            "model_id": model.id,
            "value": model.value
        }


if __name__ == "__main__":
    # Now you have to expand the fluid yourself:
    db.expand_fluid(fluid)

    bind = db.get_bind_for_model(MyModel)
    db.Model.metadata.create_all(bind.sync_engine)

    fluid.mix()

Continue reading

From here you can continue straight with Migrate.