SQLAlchemy
In this chapter we will talk about one of the most important features of a web application: Databases! We are assuming that you are already familiar with SQLAlchemy V2 and only explain how to work with our built-in sqlalchemy extension.
Your first model
Like Flask-SQLAlchemy our extension does provide its own Model class. So you do not need
to set up your own DeclarativeBase. But of course you are not forced to stick with ours.
We will show you a quick example of using your own db instance in the end of
this chapter.
For now let's just get started by creating your first simple model:
from webfluid.core.ext import db
from sqlalchemy.orm import Mapped, mapped_column
class MyModel(db.Model):
id: Mapped[int] = mapped_column(primary_key=True)
value: Mapped[str]
def __init__(self, value: str):
self.value = value
Integrating SQLAlchemy
Now you can start using your model inside your application. Our extension provides sync and async utility. So when you configure your database uris, do not add the driver package to the uri string. The framework does that for you. So your config should look like this:
[general]
SECRET_KEY = supersecret
[data]
DATABASE_URI = sqlite:///app.db
[extensions]
EXT_SCHEDULING = 0
EXT_SQLALCHEMY = 1
Our SQLAlchemy extension will then derive the sync and async engine with
their correct driver from your database uri. We provide this functionality for
the most common dialects: sqlite, postgresql and mysql.
Alright, now that we've everything in place, we can start working with your model.
Our extension provides an executor that simplifies your work with sqlalchemy:
from webfluid import Fluid
from webfluid.core.config import register_config
from webfluid.core.ext import scheduler, db
from fastapi import Request
from fastapi.responses import HTMLResponse
from fastapi.exceptions import HTTPException
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy import select
from extension.main import MyExtension
from fluid.jobs import heart
from fluid.models import MyModel
@register_config(10)
class MyConfig:
SESSION_COOKIE_SECURE = True
MYEXT_BAR = "crazy"
# Usually you would define the SQLALCHEMY_DATABASE_URI, but the DATABASE_URI value
# from the runtimes' environment is already the default:
# SQLALCHEMY_DATABASE_URI = os.getenv("DATABASE_URI", "sqlite:///app.db")
fluid = Fluid(__name__)
my_ext = MyExtension(fluid)
scheduler.add_job(heart, IntervalTrigger(seconds=5))
@fluid.get("/", response_class=HTMLResponse)
async def home():
return await fluid.render(
"index.html",
title="Hello World!",
name="my friend"
)
@fluid.get("/health")
async def health():
return {"status": "ok"}
@fluid.get("/my-model")
async def get_model(request: Request):
model_id = request.query_params.get("id")
if not model_id:
raise HTTPException(status_code=400, detail="Bad Request")
async with db.async_executor() as e:
# Here you should note one of WebFluids opinions:
# The following call returns a ScalarResult per default.
# You need to set scalars=False if you do not want that.
results = await e.exec(
select(MyModel).where(MyModel.id == model_id),
# scalars=False
)
result = results.first()
if not result:
raise HTTPException(status_code=404, detail="Model not found")
return {
"model_id": model_id,
"value": result.value
}
@fluid.post("/my-model")
async def add_model(request: Request):
value = request.query_params.get("value")
if not value:
raise HTTPException(status_code=400, detail="Bad Request")
async with db.async_executor() as e:
model = await e.insert(
MyModel(value),
flush=True
# We want to flush so that we can return the id.
# This is False per default.
)
return {
"model_id": model.id,
"value": model.value
}
# The extension handles the lifecycle of your session for you.
# All changes and updates you make inside the executor context
# are committed as soon as the interpreter leaves its scope.
# If an exception occurs inside that scope, the session will
# be rolled back automatically for you and will always be closed.
if __name__ == "__main__":
# For now, you do not need to understand the following lines.
# We will explain them in the next paragraph.
bind = db.get_bind_for_model(MyModel)
db.Model.metadata.create_all(bind.sync_engine)
fluid.mix()
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>{{ title }}</title>
<script>
function healthCheck() {
const result = document.getElementById('result')
fetch('/health').then(res => {
if (res.ok) result.style.color = 'green'
else result.style.color = 'red'
return res.json()
}).then(data => {
result.innerText = JSON.stringify(data)
setTimeout(() => result.innerText = '', 2000)
})
}
function addModel() {
const value = document.getElementById('value')
const newModel = document.getElementById('newModel')
const clear = () => setTimeout(() => {
value.value = null
newModel.innerText = ''
}, 2000)
if (!value.value) {
newModel.style.color = 'red'
newModel.innerText = 'Enter a value first!'
clear()
return
}
fetch('/my-model', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ value: value.value })
}).then(res => {
if (res.ok) newModel.style.color = 'green'
else newModel.style.color = 'red'
return res.json()
}).then(data => {
newModel.innerText = JSON.stringify(data)
clear()
})
}
function getModel() {
const id = document.getElementById('id')
const modelValue = document.getElementById('modelValue')
const clear = () => setTimeout(() => {
id.value = null
modelValue.innerText = ''
}, 2000)
if (!id.value) {
modelValue.style.color = 'red'
modelValue.innerText = 'Provide an id first!'
clear()
return
}
fetch(`/my-model?id=${id.value}`).then(res => {
if (res.ok) modelValue.style.color = 'green'
else modelValue.style.color = 'red'
return res.json()
}).then(data => {
modelValue.innerText = JSON.stringify(data)
clear()
})
}
</script>
</head>
<body>
<p>Hello <b>{{ name }}</b>!</p>
<button onclick="healthCheck()">Health Check</button>
<p id="result"></p>
<p>{{ my_ext() }}</p>
<input id="value" type="text" placeholder="Your value">
<p id="newModel"></p>
<button onclick="addModel()">Save</button>
<br><br>
<input id="id" type="number" placeholder="Model id">
<p id="modelValue"></p>
<button onclick="getModel()">Get Model</button>
</body>
</html>
Now you have successfully integrated SQLAlchemy into your application.
But you may want to find out what the hell that db.get_bind_for_model(MyModel)
is...
Working with binds
Like Flask-SQLAlchemy our extension is able to handle multiple binds using bind keys.
You can define binds by setting MyModel.__bind_key__ and adding the SQLALCHEMY_BINDS
dictionary to your config.
Here we have a quick example of wiring it all up:
[general]
SECRET_KEY = supersecret
[data]
DATABASE_URI = sqlite:///app.db
TEST_DB_URI = sqlite:///test.db
[extensions]
EXT_SCHEDULING = 0
EXT_SQLALCHEMY = 1
from webfluid.core.ext import db
from sqlalchemy.orm import Mapped, mapped_column
class MyModel(db.Model):
id: Mapped[int] = mapped_column(primary_key=True)
value: Mapped[str]
def __init__(self, value: str):
self.value = value
class SecondModel(db.Model):
__bind_key__ = "test"
id: Mapped[int] = mapped_column(primary_key=True)
value: Mapped[str]
def __init__(self, value: str):
self.value = value
from webfluid import Fluid
from webfluid.core.config import register_config
from webfluid.core.ext import scheduler, db
from fastapi import Request
from fastapi.responses import HTMLResponse
from fastapi.exceptions import HTTPException
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy import select
import os
from extension.main import MyExtension
from fluid.jobs import heart
from fluid.models import MyModel, SecondModel
@register_config(10)
class MyConfig:
SESSION_COOKIE_SECURE = True
MYEXT_BAR = "crazy"
SQLALCHEMY_BINDS = {
"test": os.getenv("TEST_DB_URI", "sqlite:///test.db")
}
fluid = Fluid(__name__)
my_ext = MyExtension(fluid)
scheduler.add_job(heart, IntervalTrigger(seconds=5))
@fluid.get("/", response_class=HTMLResponse)
async def home():
return await fluid.render(
"index.html",
title="Hello World!",
name="my friend"
)
@fluid.get("/health")
async def health():
return {"status": "ok"}
@fluid.get("/my-model")
async def get_model(request: Request):
model_id = request.query_params.get("id")
if not model_id:
raise HTTPException(status_code=400, detail="Bad Request")
# The context manager resolves the correct bind for you:
async with db.async_executor(model=SecondModel) as e:
results = await e.exec(
select(SecondModel).where(SecondModel.id == model_id)
)
result = results.first()
if not result:
raise HTTPException(status_code=404, detail="Model not found")
return {
"model_id": model_id,
"value": result.value
}
@fluid.post("/my-model")
async def add_model(request: Request):
data = await request.json()
value = data.get("value")
if not value:
raise HTTPException(status_code=400, detail="Bad Request")
async with db.async_executor(model=SecondModel) as e:
model = await e.insert(
SecondModel(value),
flush=True
)
return {
"model_id": model.id,
"value": model.value
}
if __name__ == "__main__":
# bind = db.get_bind_for_model(MyModel)
# db.Model.metadata.create_all(bind.sync_engine)
bind = db.get_bind_for_model(SecondModel)
db.Model.metadata.create_all(bind.sync_engine)
fluid.mix()
Dynamically setting binds
In some cases you may want to define the bind of your model dynamically.
For example, by using a config value. Then you would use MyModel.set_bind
and do something like this:
from webfluid import Fluid
from webfluid.core.config import register_config
from webfluid.core.ext import scheduler, db
from fastapi import Request
from fastapi.responses import HTMLResponse
from fastapi.exceptions import HTTPException
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy import select
import os
from extension.main import MyExtension
from fluid.jobs import heart
from fluid.models import MyModel, SecondModel
@register_config(10)
class MyConfig:
SESSION_COOKIE_SECURE = True
MYEXT_BAR = "crazy"
SQLALCHEMY_BINDS = {
"test": os.getenv("TEST_DB_URI", "sqlite:///test.db"),
"dynamic": os.getenv("DYNAMIC_URI", "sqlite:///dynamic.db")
}
MODEL_BIND = "dynamic"
fluid = Fluid(__name__)
my_ext = MyExtension(fluid)
scheduler.add_job(heart, IntervalTrigger(seconds=5))
# Now you can set your models bind dynamically.
# The "default" string is the key that resolves to the
# SQLALCHEMY_DATABASE_URI bind:
MyModel.set_bind(fluid.config.get("MODEL_BIND", "default"))
@fluid.get("/", response_class=HTMLResponse)
async def home():
return await fluid.render(
"index.html",
title="Hello World!",
name="my friend"
)
@fluid.get("/health")
async def health():
return {"status": "ok"}
@fluid.get("/my-model")
async def get_model(request: Request):
model_id = request.query_params.get("id")
if not model_id:
raise HTTPException(status_code=400, detail="Bad Request")
async with db.async_executor(model=MyModel) as e:
results = await e.exec(
select(MyModel).where(MyModel.id == model_id)
)
result = results.first()
if not result:
raise HTTPException(status_code=404, detail="Model not found")
return {
"model_id": model_id,
"value": result.value
}
@fluid.post("/my-model")
async def add_model(request: Request):
data = await request.json()
value = data.get("value")
if not value:
raise HTTPException(status_code=400, detail="Bad Request")
async with db.async_executor(model=MyModel) as e:
model = await e.insert(
MyModel(value),
flush=True
)
return {
"model_id": model.id,
"value": model.value
}
if __name__ == "__main__":
bind = db.get_bind_for_model(MyModel)
db.Model.metadata.create_all(bind.sync_engine)
fluid.mix()
Changing table names
As you may have already thought, SQLAlchemy resolves table names by
converting your models' name to snake case like the Flask extension does.
And so you can change the models' table name by setting __tablename__
as well:
from webfluid.core.ext import db
from sqlalchemy.orm import Mapped, mapped_column
class MyModel(db.Model):
id: Mapped[int] = mapped_column(primary_key=True)
value: Mapped[str]
def __init__(self, value: str):
self.value = value
class SecondModel(db.Model):
__tablename__ = "my_table"
__bind_key__ = "test"
id: Mapped[int] = mapped_column(primary_key=True)
value: Mapped[str]
def __init__(self, value: str):
self.value = value
Leave the framework defaults
Like we've mentioned earlier, you are not forced to stick with our defaults.
You can wire up extensions yourself if you do not want to rely on WebFluids conventions.
Keep in mind that this may break the frameworks' functionality if you do not know what you are doing.
Anyway, we will show you how to escape from our conventions with the SQLAlchemy extension. The
same counts for every other built-in extension as well. So in the following chapters we will only
tell you what parameters you can pass to our extensions if you want to leave their defaults.
Okay, but now let's set up your own conventions. The SQLAlchemy extension only supports passing
your own DeclarativeBase. So this will be exactly what we are doing:
[general]
SECRET_KEY = supersecret
[data]
DATABASE_URI = sqlite:///app.db
[extensions]
EXT_SCHEDULING = 0
EXT_SQLALCHEMY = 0
from webfluid.extensions import SQLAlchemy
from sqlalchemy.orm import DeclarativeBase
class MyBase(DeclarativeBase):
__tablename__: str
__bind_key__: str
# Setting up your base like this will make it necessary
# to define the __tablename__ for each of your models.
# This is just an example, and we are assuming that you
# know what you are doing if you are playing around with
# this kind of stuff here.
db = SQLAlchemy(base=MyBase)
from sqlalchemy.orm import Mapped, mapped_column
from fluid.ext import db
class MyModel(db.Model):
__tablename__ = "model"
id: Mapped[int] = mapped_column(primary_key=True)
value: Mapped[str]
def __init__(self, value: str):
self.value = value
from webfluid import Fluid
from webfluid.core.config import register_config
from webfluid.core.ext import scheduler
from fastapi import Request
from fastapi.responses import HTMLResponse
from fastapi.exceptions import HTTPException
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy import select
import os
from extension.main import MyExtension
from fluid.jobs import heart
from fluid.models import MyModel
from fluid.ext import db
@register_config(10)
class MyConfig:
SESSION_COOKIE_SECURE = True
MYEXT_BAR = "crazy"
fluid = Fluid(__name__)
my_ext = MyExtension(fluid)
scheduler.add_job(heart, IntervalTrigger(seconds=5))
@fluid.get("/", response_class=HTMLResponse)
async def home():
return await fluid.render(
"index.html",
title="Hello World!",
name="my friend"
)
@fluid.get("/health")
async def health():
return {"status": "ok"}
@fluid.get("/my-model")
async def get_model(request: Request):
model_id = request.query_params.get("id")
if not model_id:
raise HTTPException(status_code=400, detail="Bad Request")
async with db.async_executor(model=MyModel) as e:
results = await e.exec(
select(MyModel).where(MyModel.id == model_id)
)
result = results.first()
if not result:
raise HTTPException(status_code=404, detail="Model not found")
return {
"model_id": model_id,
"value": result.value
}
@fluid.post("/my-model")
async def add_model(request: Request):
data = await request.json()
value = data.get("value")
if not value:
raise HTTPException(status_code=400, detail="Bad Request")
async with db.async_executor(model=MyModel) as e:
model = await e.insert(
MyModel(value),
flush=True
)
return {
"model_id": model.id,
"value": model.value
}
if __name__ == "__main__":
# Now you have to expand the fluid yourself:
db.expand_fluid(fluid)
bind = db.get_bind_for_model(MyModel)
db.Model.metadata.create_all(bind.sync_engine)
fluid.mix()
Continue reading
From here you can continue straight with Migrate.