summaryrefslogtreecommitdiff
path: root/examples/web/asgi_starlette.py
blob: edc070c246dc9cd647875515248d614a1bca2045 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
"""
Example demonstrating use with the Starlette web framework.

Requires the "postgresql" service to be running.
To install prerequisites: pip install sqlalchemy asycnpg starlette uvicorn
To run: uvicorn asgi_starlette:app

It should print a line on the console on a one-second interval while running a
basic web app at http://localhost:8000.
"""

from __future__ import annotations

from datetime import datetime

from sqlalchemy.ext.asyncio import create_async_engine
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.requests import Request
from starlette.responses import PlainTextResponse, Response
from starlette.routing import Route
from starlette.types import ASGIApp, Receive, Scope, Send

from apscheduler.datastores.async_sqlalchemy import AsyncSQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
from apscheduler.schedulers.async_ import AsyncScheduler
from apscheduler.triggers.interval import IntervalTrigger


def tick():
    print("Hello, the time is", datetime.now())


class SchedulerMiddleware:
    def __init__(
        self,
        app: ASGIApp,
        scheduler: AsyncScheduler,
    ) -> None:
        self.app = app
        self.scheduler = scheduler

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        if scope["type"] == "lifespan":
            async with self.scheduler:
                await self.scheduler.add_schedule(
                    tick, IntervalTrigger(seconds=1), id="tick"
                )
                await self.app(scope, receive, send)
        else:
            await self.app(scope, receive, send)


async def root(request: Request) -> Response:
    return PlainTextResponse("Hello, world!")


engine = create_async_engine("postgresql+asyncpg://postgres:secret@localhost/testdb")
data_store = AsyncSQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
scheduler = AsyncScheduler()
routes = [Route("/", root)]
middleware = [Middleware(SchedulerMiddleware, scheduler=scheduler)]
app = Starlette(routes=routes, middleware=middleware)