1, we confirmed how we may leverage HTMX so as to add interactivity to our HTML parts. In different phrases, Javascript with out Javascript. As an instance that, we started constructing a easy chat that may return a simulated LLM response. On this article, we’ll prolong the capabilities of our chatbot and add a number of options, amongst which streaming, which is a major enhancement when it comes to consumer expertise in comparison with the synchronous chat constructed beforehand.
- ✅ Actual-time streaming with SSE
- ✅ Session-based structure for a number of customers
- ✅ Async coordination with asyncio.Queue
- ✅ Clear HTMX patterns with devoted SSE dealing with
- ✅ A Google Search Agent to reply queries with contemporary information
- ✅ Virtually Zero JavaScript
Here’s what we’ll construct at this time:

From sync communication to async
What we constructed beforehand leveraged very fundamental internet functionalities leveraging types.Our communication was synchronous, which means we don’t get something till the server is finished. We challenge a request, we anticipate the total response, and we show it. Between the 2, we simply…wait.
However trendy chatbots work in a different way, by offering asynchronous communication capabilities. That is completed utilizing streaming: we get updates and partial responses as a substitute of ready for the total response. That is notably useful when the response course of takes time, which is usually the case for LLMs when the reply is lengthy.
SSE vs Websockets
SSE (Server Aspect Occasions) and Websockets are two real-time information exchanges protocols between a consumer and a server.
Websockets permits for full-duplex connections: this implies the browser and the server can each ship and obtain information concurrently. That is sometimes utilized in on-line gaming, chat functions, and collaborative instruments (google sheets).
SSE is unidirectional and solely permits a one-way dialog, from server to consumer. Because of this the consumer can not ship something to the server through this protocol. If websockets is a two-way cellphone dialog the place folks can converse and pay attention on the similar time, SSE is like listening to the radio. SSE are sometimes used to ship notification, replace charts in finance functions, or newsfeeds.
So why can we select SSE? Nicely as a result of in our use case we don’t want full duplex, and that straightforward HTTP (which isn’t how Websockets work) is sufficient for our use case: we ship information, we obtain information. SSE simply means that we’ll obtain information in a stream, nothing extra is required.
What we need to do
- Person inputs a question
- Server receives the question and sends it to the LLM
- LLM begins producing content material
- For each bit of content material, the server returns it instantly
- Browser provides this piece of data to the DOM
We are going to separate our work into backend and frontend sections.
Backend
The backend will proceed in 2 steps:
- A POST endpoint that may obtain the message, and return nothing
- A GET endpoint that may learn a queue and produce an output stream.
In our demo, to start with, we’ll create a pretend LLM response by repeating the consumer enter, which means that the phrases of the stream will probably be precisely the identical because the consumer enter.
To maintain issues clear, we have to separate the message streams (the queues) by consumer session, in any other case we’d find yourself mixing up conversations. We are going to subsequently create a session dictionary to host our queues.
Subsequent, we have to inform the backend to attend earlier than the queue is crammed earlier than streaming our response. If we don’t, we’ll encounter concurrency run or timing points: SSE begins on consumer facet, queue is empty, SSE closes, consumer inputs a message however…it’s too late!
The answer: async queues! Utilizing asynchronous queues has a number of benefits:
- If queue has information: Returns instantly
- If queue is empty: Suspends execution till queue.put() is named
- A number of shoppers: Every will get their very own information
- Thread-safe: No race situations
I do know you’re burning to know extra, so right here is the code beneath:
from fastapi import FastAPI, Request, Type
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse, StreamingResponse
import asyncio
import time
import uuid
app = FastAPI()
templates = Jinja2Templates("templates")
# This object will retailer session id and their corresponding worth, an async queue.
classes = dict()
@app.get("/")
async def root(request: Request):
session_id = str(uuid.uuid4())
classes[session_id] = asyncio.Queue()
return templates.TemplateResponse(request, "index.html", context={"session_id": session_id})
@app.put up("/chat")
async def chat(request: Request, question: str=Type(...), session_id: str=Type(...)):
""" Ship message to session-based queue """
# Create the session if it doesn't exist
if session_id not in classes:
classes[session_id] = asyncio.Queue()
# Put the message within the queue
await classes[session_id].put(question)
return {"standing": "queued", "session_id": session_id}
@app.get("/stream/{session_id}")
async def stream(session_id: str):
async def response_stream():
if session_id not in classes:
print(f"Session {session_id} not discovered!")
return
queue = classes[session_id]
# This BLOCKS till information arrives
print(f"Ready for message in session {session_id}")
information = await queue.get()
print(f"Acquired message: {information}")
message = ""
await asyncio.sleep(1)
for token in information.change("n", " ").cut up(" "):
message += token + " "
information = f"""information: AI
{message}
nn"""
yield information
await asyncio.sleep(0.03)
queue.task_done()
return StreamingResponse(response_stream(), media_type="textual content/event-stream")
Let’s clarify a few key ideas right here.
Session isolation
It will be significant that every customers will get their very own message queue, in order to not combine up conversations. The way in which to try this is by utilizing the classes dictionary. In actual manufacturing apps, we’d in all probability use Redis to retailer that. Within the code beneath, we see {that a} new session id is created on web page load, and saved within the classes dictionary. Reloading the web page will begin a brand new session, we’re not persisting the message queues however we may through a database for instance. This matter is roofed partially 3.
# This object will retailer session id and their corresponding worth, an async queue.
classes = dict()
@app.get("/")
async def root(request: Request):
session_id = str(uuid.uuid4())
classes[session_id] = asyncio.Queue()
return templates.TemplateResponse(request, "index.html", context={"session_id": session_id})
Blocking coordination
We have to management the order during which SSE are despatched and the consumer question is obtained. The order is, on the backend facet:
- Obtain consumer message
- Create a message queue and populate it
- Ship messages from the queue in a Streaming Response
Failure to take action could result in undesirable habits, ie. first studying the (empty) message queue, then populating it with the consumer’s question.
The answer to manage the order is to make use of asyncio.Queue. This object will probably be used twice:
- After we insert new messages within the queue. Inserting messages will “get up” the polling within the SSE endpoint
await classes[session_id].put(question)
- After we pull messages from the queue. On this line, the code is blocked till a sign from the queue arrives saying “hey, i’ve new information!”:
information = await queue.get()
This sample presents a number of benefits:
- Every consumer has its personal queue
- There isn’t any danger of race situations
Streaming simulation
On this article, we’ll simulate a LLM response by splitting the consumer’s question in phrases and return these phrases one after the other. Partly 3, we’ll really plug an actual LLM to that.
The streaming is dealt with through the StreamingResponse object from FastAPI. This object expects an asynchronous generator that may yield information till the generator is over. We’ve to make use of the yield key phrase as a substitute of the return key phrase, in any other case our generator would simply cease after the primary iteration.
Let’s decompose our streaming operate:
First, we have to guarantee we have now a queue for the present session from which we’ll pull messages:
if session_id not in classes:
print(f"Session {session_id} not discovered!")
return
queue = classes[session_id]
Subsequent, as soon as we have now the queue, we’ll pull messages from the queue if it incorporates any, in any other case the code pauses and waits for messages to reach. That is crucial a part of our operate:
# This BLOCKS till information arrives
print(f"Ready for message in session {session_id}")
information = await queue.get()
print(f"Acquired message: {information}")
To simulate stream, we’ll now chunk the message in phrases (known as tokens right here), and add a while sleeps to simulate the textual content technology course of from a LLM (the asyncio.sleep elements). Discover how the information we yield is definitely HTML strings, encapsulated in a string beginning with “information:”. That is how SSE messages are despatched. It’s also possible to select to flag your messages with the “occasion:” metadata. An instance can be:
occasion: my_custom_event
information: Content material to swap into your HTML web page.
Let’s see how we implement it in Python (for the purists, use Jinja templates to render the HTML as a substitute of a string:) ):
message = ""
# First pause to let the browser show "Considering when the message is distributed"
await asyncio.sleep(1)
# Simulate streaming by splitting message in phrases
for token in information.change("n", " ").cut up(" "):
# We append tokens to the message
message += token + " "
# We wrap the message in HTML tags with the "information" metadata
information = f"""information: AI
{message}
nn"""
yield information
# Pause to simulate the LLM technology course of
await asyncio.sleep(0.03)
queue.task_done()
Frontend
Our frontend has 2 jobs: ship consumer queries to the backend, and pay attention for SSE message on a selected channel (the session_id). To try this, we apply an idea known as “Separation of ideas”, which means every HTMX ingredient is chargeable for a single job solely.
- the shape sends a consumer enter
- the sse listener handles the streaming
- the ul chat shows the message
To ship messages, we’ll use a normal textarea enter in a type. The HTMX magic is just under:
In the event you bear in mind the article from half 1, we have now a number of HTMX attributes which deserve explanations:
hx-post: The endpoint the shape information will probably be submitted.hx-swap: Set to none, as a result of in our case the endpoint doesn’t return any information.hx-trigger: Specifies which occasion will set off the requesthx-on::before-request: A really gentle half with javascript so as to add some snappiness to the app. We are going to append the consumer’s request to the checklist within the chat, and show a “Considering” message to the consumer whereas we’re ready for the SSE messages to stream. That is nicer that having to stare at a clean web page.
It’s value nothing that we really ship 2 parameters to the backend: the consumer’s enter and the session id. This manner, the message will probably be inserted in the appropriate queue on the backend facet.
Then, we outline one other part that’s particularly devoted to listening to SSE messages.
li:last-child"
model="show: none;"
>
This part will take heed to the /stream endpoint and move its session id to pay attention for messages for this session solely. The hx-target tells the browser so as to add the information to the final li ingredient of the chat. The hx-swap specifies that the information is definitely meant to switch the complete present li ingredient. That is how our streaming impact will work: changing present message with the newest one.
Notice: different strategies may have been used to switch particular parts of the DOM, corresponding to out-of-band (OOB) swaps. They work slightly bit in a different way since they require a selected id to search for within the DOM. In our case, we selected on function to not assign ids to every written checklist parts
A Actual Chatbot utilizing Google Agent Improvement Equipment
Now’s the time to switch our dummy streaming endpoint with an actual LLM. To attain that, we’ll construct an agent utilizing Google ADK, outfitted with instruments and reminiscence to fetch info and bear in mind dialog particulars.
A really brief introduction to brokers
You in all probability already know what a LLM is, not less than I assume you do. The primary disadvantage of LLMs as of at this time is that LLMs alone can not entry actual time info: their data is frozen for the time being they have been skilled. The opposite disadvantage is their incapability to entry info that’s outdoors their coaching scope (eg, your organization’s inside information),
Brokers are a kind of AI functions that may purpose, act and observe. The reasoning half is dealt with by the LLM, the “mind”. The “palms” of the brokers are what we name “instruments”, and might take a number of types:
- a Python operate, for instance to fetch an API
- a MCP server, which is a normal that enables brokers to connect with APIs via a standardized interface (eg accessing all of the Gsuite instruments with out having to jot down your self the API connectors)
- different brokers (in that case, this sample is named agent delegation have been a router or grasp brokers controls totally different sub-agents)
In our demo, to make issues quite simple, we’ll use a quite simple agent that may use one device: Google Search. This can enable us to get contemporary info and guarantee it’s dependable (not less than we hope that the Google Search outcomes are…)
Within the Google ADK world, brokers want fundamental info:
- title and outline, for documentation functions largely
- directions: the immediate that defines the habits of the agent (instruments use, output format, steps to comply with, and many others)
- instruments: the capabilities / MCP servers / brokers the agent can use to satisfy its goal
There are additionally different ideas round reminiscence and session administration, however which can be out of scope.
With out additional ado, let’s outline our agent!
A Streaming Google Search Agent
from google.adk.brokers import Agent
from google.adk.brokers.run_config import RunConfig, StreamingMode
from google.adk.runners import Runner
from google.adk.classes import InMemorySessionService
from google.genai import varieties
from google.adk.instruments import google_search
# Outline constants for the agent
APP_NAME = "default" # Software
USER_ID = "default" # Person
SESSION = "default" # Session
MODEL_NAME = "gemini-2.5-flash-lite"
# Step 1: Create the LLM Agent
root_agent = Agent(
mannequin=MODEL_NAME,
title="text_chat_bot",
description="A textual content chatbot",
instruction="You're a useful assistant. Your aim is to reply questions primarily based in your data. Use your Google Search device to offer the newest and most correct info",
instruments=[google_search]
)
# Step 2: Arrange Session Administration
# InMemorySessionService shops conversations in RAM (non permanent)
session_service = InMemorySessionService()
# Step 3: Create the Runner
runner = Runner(agent=root_agent, app_name=APP_NAME, session_service=session_service)
The `Runner` object acts because the orchestrator between you and the agent.
Subsequent, we (re)outline our `/stream` endpoint. We first test the session for the agent exists, in any other case we create it:
# Try and create a brand new session or retrieve an present one
strive:
session = await session_service.create_session(
app_name=APP_NAME, user_id=USER_ID, session_id=session_id
)
besides:
session = await session_service.get_session(
app_name=APP_NAME, user_id=USER_ID, session_id=session_id
)
Then, we take the consumer question, move it to the agent in an async style to get a stream again:
# Convert the question string to the ADK Content material format
question = varieties.Content material(function="consumer", elements=[types.Part(text=query)])
# Stream the agent's response asynchronously
async for occasion in runner.run_async(
user_id=USER_ID, session_id=session.id, new_message=question, run_config=RunConfig(streaming_mode=StreamingMode.SSE)
):
There’s a subtlety subsequent. When producing a response, the agent may output a double linebreak “nn”. That is problematic as a result of SSE occasions finish with this image. Having a double linebreak in your string subsequently means:
- your present message will probably be truncated
- your subsequent message will probably be incorrectly formatted and the SSE stream will cease
You may strive it by your self. To repair this, we’ll use slightly hack, together with one other little hack to format checklist parts (I take advantage of Tailwind CSS which overrides sure CSS guidelines). The hack is:
if occasion.partial:
message += occasion.content material.elements[0].textual content
# Hack right here
html_content = markdown.markdown(message, extensions=['fenced_code']).change("n", "
").change("", " ").change("", "")
full_html = f"""information: -
AI
{html_content}
nn"""
yield full_html
This manner, we be sure that no double linebreaks will break our SSE stream.
Full code for the route is beneath:
@app.get("/stream/{session_id}")
async def stream(session_id: str):
async def response_stream():
if session_id not in classes:
print(f"Session {session_id} not discovered!")
return
# Try and create a brand new session or retrieve an present one
strive:
session = await session_service.create_session(
app_name=APP_NAME, user_id=USER_ID, session_id=session_id
)
besides:
session = await session_service.get_session(
app_name=APP_NAME, user_id=USER_ID, session_id=session_id
)
queue = classes[session_id]
# This BLOCKS till information arrives
print(f"Ready for message in session {session_id}")
question = await queue.get()
print(f"Acquired message: {question}")
message = ""
# Convert the question string to the ADK Content material format
question = varieties.Content material(function="consumer", elements=[types.Part(text=query)])
# Stream the agent's response asynchronously
async for occasion in runner.run_async(
user_id=USER_ID, session_id=session.id, new_message=question, run_config=RunConfig(streaming_mode=StreamingMode.SSE)
):
if occasion.partial:
message += occasion.content material.elements[0].textual content
html_content = markdown.markdown(message, extensions=['fenced_code']).change("n", "
").change("", " ").change("", "")
full_html = f"""information: -
AI
{html_content}
nn"""
yield full_html
queue.task_done()
return StreamingResponse(response_stream(), media_type="textual content/event-stream")
And that’s it! It is possible for you to to converse together with your chat!
I add beneath slightly CSS snippet to format code blocks. Certainly, for those who ask your chat to provide code snippets, you need it correctly formatted. Right here is the HTML:
pre, code {
background-color: black;
coloration: lightgrey;
padding: 1%;
border-radius: 10px;
white-space: pre-wrap;
font-size: 0.8rem;
letter-spacing: -1px;
}
Now you can additionally generate code snippets:

Thoughts = blown
Workflow recap
With much less that 200 LoC, we have been in a position to write a chat with the next worflow, stream a response from the server and show it very properly by taking part in with SSE and HTMX.
Person varieties "Whats up World" → Submit
├── 1. Add "Me: Whats up World" to talk
├── 2. Add "AI: Considering..." to talk
├── 3. POST /chat with message
├── 4. Server queues message
├── 5. SSE stream produces a LLM response primarily based on the question
├── 6. Stream "AI: This" (replaces "Considering...")
├── 7. Stream "AI: That is the reply ..."
└── 8. Full
Conclusion
On this sequence of articles, we confirmed how straightforward it might be to develop a chatbot app with nearly no javascript and no heavy JS framework, simply by utilizing Python and HTML. We lined matters corresponding to Server-side rendering, Server-sent Occasions (SSE), async streaming, brokers, with the assistance of a magical library, HTMX.
The primary function of those articles was to indicate that internet functions will not be inaccessible to non-Javascript builders. There may be really a really robust and legitimate purpose to not use Javascript everytime for internet growth, and though Javascript is a robust language, my feeling at this time is that’s it generally overused rather than easier, but sturdy approaches. The server-side vs client-side functions debate is long-standing and never over but, however I hope these articles have been an eye-opener to a few of you, and that it will definitely taught you one thing
Keep tuned!
















