• Home
  • About Us
  • Contact Us
  • Disclaimer
  • Privacy Policy
Sunday, June 1, 2025
newsaiworld
  • Home
  • Artificial Intelligence
  • ChatGPT
  • Data Science
  • Machine Learning
  • Crypto Coins
  • Contact Us
No Result
View All Result
  • Home
  • Artificial Intelligence
  • ChatGPT
  • Data Science
  • Machine Learning
  • Crypto Coins
  • Contact Us
No Result
View All Result
Morning News
No Result
View All Result
Home Machine Learning

Constructing an Interactive UI for Llamaindex Workflows | by Lingzhen Chen | Sep, 2024

Admin by Admin
September 24, 2024
in Machine Learning
0
1ts Sxex Nayldqhw82l5uw.gif
0
SHARES
0
VIEWS
Share on FacebookShare on Twitter

READ ALSO

Agentic RAG Functions: Firm Data Slack Brokers

The Hidden Safety Dangers of LLMs


When beginning the workflow from the terminal, it’s easy to see which step it’s executing and the logging we put in these steps.

Terminal log for the workflow execution (Screenshot by creator)

We are able to additionally allow the human-in-the-loop interplay by merely utilizing user_feedback = enter()within the workflow. This may pause the workflow and look forward to the person enter (See the human-in-the-loop instance on this official Llamaindex pocket book). Nevertheless, to have the ability to obtain the identical performance in a user-friendly interface, we’d like extra modifications to the unique workflow.

Workflow can take a very long time to execute, so for a greater person expertise, Llamaindex offered a option to ship streaming occasions to point the progress of the workflow, as proven within the pocket book right here. In my workflow, I outline a WorkflowStreamingEvent class to incorporate helpful details about the occasion message, similar to the kind of the occasion, and from which step it’s despatched:

class WorkflowStreamingEvent(BaseModel):
event_type: Literal["server_message", "request_user_input"] = Discipline(
..., description="Kind of the occasion"
)
event_sender: str = Discipline(
..., description="Sender (workflow step identify) of the occasion"
)
event_content: Dict[str, Any] = Discipline(..., description="Content material of the occasion")

To allow sending streaming occasions, the workflow step must have entry to the shared context, which is finished by including @step(pass_context=True) decorator to the step definition. Then within the step definition, we will ship occasion messages in regards to the progress by way of the context. For instance, within the tavily_query() step:

@step(pass_context=True)
async def tavily_query(self, ctx: Context, ev: StartEvent) -> TavilyResultsEvent:
ctx.information["research_topic"] = ev.user_query
question = f"arxiv papers in regards to the cutting-edge of {ev.user_query}"
ctx.write_event_to_stream(
Occasion(
msg=WorkflowStreamingEvent(
event_type="server_message",
event_sender=examine.currentframe().f_code.co_name,
event_content={"message": f"Querying Tavily with: '{question}'"},
).model_dump()
)
)

On this instance, we set the event_type to be “server_message” . It implies that it’s an replace message and no person motion is required. We have now one other sort of occasion "request_user_input" that signifies a person enter is required. For instance, within the gather_feedback_outline() step within the workflow, after producing the slide textual content outlines from the unique paper abstract, a message is shipped to immediate the person to supply approval and suggestions on the define textual content:

@step(pass_context=True)
async def gather_feedback_outline(
self, ctx: Context, ev: OutlineEvent
) -> OutlineFeedbackEvent | OutlineOkEvent:
"""Current person the unique paper abstract and the outlines generated, collect suggestions from person"""
...

# Ship a particular occasion indicating that person enter is required
ctx.write_event_to_stream(
Occasion(
msg=json.dumps(
{
"event_type": "request_user_input",
"event_sender": examine.currentframe().f_code.co_name,
"event_content": {
"abstract": ev.abstract,
"define": ev.define.dict(),
"message": "Do you approve this define? If not, please present suggestions.",
},
}
)
)
)

...

These occasions are dealt with in another way within the backend API and the frontend logic, which I’ll describe intimately within the later sections of this text.

Workflow steps that requires person suggestions (Picture by creator)

When sending a "request_user_input" occasion to the person, we solely wish to proceed to the subsequent step after now we have acquired the person enter. As proven within the workflow diagram above, it both proceeds to the outlines_with_layout()step if the person approves the define, or to the summary2outline() step once more if the person doesn’t approve.

That is achieved utilizing the Future() object from Python’s asyncio library. Within the SlideGenerationWorkflow class, we set an attribute self.user_input_future = asyncio.Future() that may be waited on within the gather_feedback_outline() step. The next execution of the workflow is conditioned on the content material of the person suggestions:

@step(pass_context=True)
async def gather_feedback_outline(
self, ctx: Context, ev: OutlineEvent
) -> OutlineFeedbackEvent | OutlineOkEvent:
...

# Anticipate person enter
if not self.user_input_future.finished():
user_response = await self.user_input_future
logger.data(f"gather_feedback_outline: Received person response: {user_response}")

# Course of user_response, which ought to be a JSON string
strive:
response_data = json.masses(user_response)
approval = response_data.get("approval", "").decrease().strip()
suggestions = response_data.get("suggestions", "").strip()
besides json.JSONDecodeError:
# Deal with invalid JSON
logger.error("Invalid person response format")
elevate Exception("Invalid person response format")

if approval == ":materials/thumb_up:":
return OutlineOkEvent(abstract=ev.abstract, define=ev.define)
else:
return OutlineFeedbackEvent(
abstract=ev.abstract, define=ev.define, suggestions=suggestions
)

We arrange the backend utilizing fastAPI, expose a POST endpoint to deal with requests, and provoke the workflow run. The asynchronous perform run_workflow_endpoint() takes ResearchTopic as enter. Within the perform, an asynchronous generator event_generator() is outlined, which creates a process to run the workflow and streams the occasions to the consumer because the workflow progresses. When the workflow finishes, it’s going to additionally stream the ultimate file outcomes to the consumer.


class ResearchTopic(BaseModel):
question: str = Discipline(..., instance="instance question")

@app.submit("/run-slide-gen")
async def run_workflow_endpoint(subject: ResearchTopic):
workflow_id = str(uuid.uuid4())

wf = SummaryAndSlideGenerationWorkflow(wid=workflow_id, timeout=2000, verbose=True)
wf.add_workflows(
summary_gen_wf=SummaryGenerationWorkflow(
wid=workflow_id, timeout=800, verbose=True
)
)
wf.add_workflows(
slide_gen_wf=SlideGenerationWorkflow(
wid=workflow_id, timeout=1200, verbose=True
)
)

async def event_generator():
loop = asyncio.get_running_loop()
logger.debug(f"event_generator: loop id {id(loop)}")
yield f"{json.dumps({'workflow_id': workflow_id})}nn"

process = asyncio.create_task(wf.run(user_query=subject.question))
logger.debug(f"event_generator: Created process {process}")
strive:
async for ev in wf.stream_events():
logger.data(f"Sending message to frontend: {ev.msg}")
yield f"{ev.msg}nn"
await asyncio.sleep(0.1) # Small sleep to make sure correct chunking
final_result = await process

# Assemble the obtain URL
download_pptx_url = f"http://backend:80/download_pptx/{workflow_id}"
download_pdf_url = f"http://backend:80/download_pdf/{workflow_id}"

final_result_with_url = {
"outcome": final_result,
"download_pptx_url": download_pptx_url,
"download_pdf_url": download_pdf_url,
}

yield f"{json.dumps({'final_result': final_result_with_url})}nn"
besides Exception as e:
error_message = f"Error in workflow: {str(e)}"
logger.error(error_message)
yield f"{json.dumps({'occasion': 'error', 'message': error_message})}nn"
lastly:
# Clear up
workflows.pop(workflow_id, None)

return StreamingResponse(event_generator(), media_type="textual content/event-stream")

Along with this endpoint, there are endpoints for receiving person enter from the consumer and dealing with file obtain requests. Since every workflow is assigned a singular workflow ID, we will map the person enter acquired from the consumer to the right workflow. By name the set_result() on the awaiting Future, the pending workflow can resume execution.

@app.submit("/submit_user_input")
async def submit_user_input(information: dict = Physique(...)):
workflow_id = information.get("workflow_id")
user_input = information.get("user_input")
wf = workflows.get(workflow_id)
if wf and wf.user_input_future:
loop = wf.user_input_future.get_loop() # Get the loop from the longer term
logger.data(f"submit_user_input: wf.user_input_future loop id {id(loop)}")
if not wf.user_input_future.finished():
loop.call_soon_threadsafe(wf.user_input_future.set_result, user_input)
logger.data("submit_user_input: set_result referred to as")
else:
logger.data("submit_user_input: future already finished")
return {"standing": "enter acquired"}
else:
elevate HTTPException(
status_code=404, element="Workflow not discovered or future not initialized"
)

The obtain endpoint additionally identifies the place the ultimate file is positioned primarily based on the workflow ID.

@app.get("/download_pptx/{workflow_id}")
async def download_pptx(workflow_id: str):
file_path = (
Path(settings.WORKFLOW_ARTIFACTS_PATH)
/ "SlideGenerationWorkflow"
/ workflow_id
/ "remaining.pptx"
)
if file_path.exists():
return FileResponse(
path=file_path,
media_type="software/vnd.openxmlformats-officedocument.presentationml.presentation",
filename=f"remaining.pptx",
)
else:
elevate HTTPException(status_code=404, element="File not discovered")

Within the frontend web page, after the person submits the analysis subject by way of st.text_input(), a long-running course of is began in a background thread in a brand new occasion loop for receiving the streamed occasions from the backend, with out interfering with the remainder of the web page:

def start_long_running_task(url, payload, message_queue, user_input_event):
strive:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(
get_stream_data(url, payload, message_queue, user_input_event)
)
loop.shut()
besides Exception as e:
message_queue.put(("error", f"Exception in background thread: {str(e)}"))

...

def predominant():

...

with st.sidebar:
with st.type(key="slide_gen_form"):
question = st.text_input(
"Enter the subject of your analysis:",
)
submit_button = st.form_submit_button(label="Submit")

if submit_button:
# Reset the workflow_complete flag for a brand new workflow
st.session_state.workflow_complete = False
# Begin the long-running process in a separate thread
if (
st.session_state.workflow_thread is None
or not st.session_state.workflow_thread.is_alive()
):
st.write("Beginning the background thread...")

st.session_state.workflow_thread = threading.Thread(
goal=start_long_running_task,
args=(
"http://backend:80/run-slide-gen",
{"question": question},
st.session_state.message_queue,
st.session_state.user_input_event,
),
)
st.session_state.workflow_thread.begin()
st.session_state.received_lines = []
else:
st.write("Background thread is already operating.")

The occasion information streamed from the backend is fetched by httpx.AsyncClient and put right into a message queue for additional processing. Totally different info is extracted relying on the occasion sorts. For occasion sort “request_user_input”, the thread can be paused till the person enter is offered.

async def fetch_streaming_data(url: str, payload: dict = None):
async with httpx.AsyncClient(timeout=1200.0) as consumer:
async with consumer.stream("POST", url=url, json=payload) as response:
async for line in response.aiter_lines():
if line:
yield line

async def get_stream_data(url, payload, message_queue, user_input_event):
# message_queue.put(("message", "Beginning to fetch streaming information..."))
data_json = None
async for information in fetch_streaming_data(url, payload):
if information:
strive:
data_json = json.masses(information)
if "workflow_id" in data_json:
# Ship workflow_id to predominant thread
message_queue.put(("workflow_id", data_json["workflow_id"]))
proceed
elif "final_result" in data_json:
# Ship final_result to predominant thread
message_queue.put(("final_result", data_json["final_result"]))
proceed
event_type = data_json.get("event_type")
event_sender = data_json.get("event_sender")
event_content = data_json.get("event_content")
if event_type in ["request_user_input"]:
# Ship the message to the principle thread
message_queue.put(("user_input_required", data_json))
# Wait till person enter is offered
user_input_event.wait()
user_input_event.clear()
proceed
else:
# Ship the road to the principle thread
message_queue.put(("message", format_workflow_info(data_json)))
besides json.JSONDecodeError: # todo: is that this crucial?
message_queue.put(("message", information))
if data_json and "final_result" in data_json or "final_result" in str(information):
break # Cease processing after receiving the ultimate outcome

We retailer the messages within the st.session_state and use a st.expander() to show and replace these streamed information.

if st.session_state.received_lines:
with expander_placeholder.container():
# Create or replace the expander with the most recent truncated line
expander = st.expander(st.session_state.expander_label)
for line in st.session_state.received_lines:
expander.write(line)
expander.divider()

To make sure the UI stays responsive and shows the occasion messages when they’re being processed in a background thread, we use a customed autorefresh element to refresh the web page at a set interval:

if not st.session_state.workflow_complete:
st_autorefresh(interval=2000, restrict=None, key="data_refresh")

When the streamed occasion is of sort “request_user_input”, we are going to show associated info in a separate container and collect person suggestions. As there could be a number of occasions that require person enter from one workflow run, we put them in a message queue and ensure to assign a singular key to the st.suggestions(), st.text_area() and st.button() which are linked to every occasion to make sure the widgets don’t intervene with one another:

def gather_outline_feedback(placeholder):
container = placeholder.container()
with container:
if st.session_state.user_input_required:
information = st.session_state.user_input_prompt
event_type = information.get("event_type")
if event_type == "request_user_input":
abstract = information.get("event_content").get("abstract")
define = information.get("event_content").get("define")
prompt_message = information.get("event_content").get(
"message", "Please assessment the define."
)

# show the content material for person enter
st.markdown("## Unique Abstract:")
st.text_area("Abstract", abstract, disabled=True, peak=400)
st.divider()
st.markdown("## Generated Slide Define:")
st.json(define)
st.write(prompt_message)

# Outline distinctive keys for widgets
current_prompt = st.session_state.prompt_counter
approval_key = f"approval_state_{current_prompt}"
feedback_key = f"user_feedback_{current_prompt}"

# Show the approval suggestions widget
approval = st.suggestions("thumbs", key=approval_key)
st.write(f"Present Approval state is: {approval}")
logging.data(f"Present Approval state is: {approval}")

# Show the suggestions textual content space
suggestions = st.text_area(
"Please present suggestions when you have any:", key=feedback_key
)

# Deal with the submission of person response
if st.button(
"Submit Suggestions", key=f"submit_response_{current_prompt}"
):
if not st.session_state.user_response_submitted:
# Retrieve approval and suggestions utilizing distinctive keys
approval_state = st.session_state.get(approval_key)
user_feedback = st.session_state.get(feedback_key, "")

# Guarantee approval_state is legitimate
if approval_state not in [0, 1]:
st.error("Please choose an approval possibility.")
return

user_response = {
"approval": (
":materials/thumb_down:"
if approval_state == 0
else ":materials/thumb_up:"
),
"suggestions": user_feedback,
}
# Ship the person's response to the backend

strive:
response = requests.submit(
"http://backend:80/submit_user_input",
json={
"workflow_id": st.session_state.workflow_id,
"user_input": json.dumps(user_response),
},
)
response.raise_for_status()
logging.data(
f"Backend response for submitting approval: {response.status_code}"
)
besides requests.RequestException as e:
st.error(f"Did not submit person enter: {str(e)}")
return

...

Ultimately, when the workflow run lastly finishes, the frontend consumer will get a response that accommodates the trail to the ultimate generated information (identical slide deck in pdf format for rendering within the UI and pptx format for downloading as the ultimate outcome). We show the pdf file and create a button for downloading the pptx file:

  if "download_url_pdf" in st.session_state and st.session_state.download_url_pdf:
download_url_pdf = st.session_state.download_url_pdf
strive:
# Fetch the PDF content material
pdf_response = requests.get(download_url_pdf)
pdf_response.raise_for_status()
st.session_state.pdf_data = pdf_response.content material

st.markdown("### Generated Slide Deck:")
# Show the PDF utilizing an iframe
st.markdown(
f'',
unsafe_allow_html=True,
)
besides Exception as e:
st.error(f"Did not load the PDF file: {str(e)}")

# Present the obtain button for PPTX if obtainable
if (
"download_url_pptx" in st.session_state
and st.session_state.download_url_pptx
):
download_url_pptx = st.session_state.download_url_pptx
strive:
# Fetch the PPTX content material
pptx_response = requests.get(download_url_pptx)
pptx_response.raise_for_status()
pptx_data = pptx_response.content material

st.download_button(
label="Obtain Generated PPTX",
information=pptx_data,
file_name="generated_slides.pptx",
mime="software/vnd.openxmlformats-officedocument.presentationml.presentation",
)
besides Exception as e:
st.error(f"Did not load the PPTX file: {str(e)}")

We are going to create a multi-service Docker software with docker-compose to run the frontend and backend apps.

model: '3.8'

companies:
backend:
construct:
context: ./backend
args:
- --no-cache
ports:
- "8000:80"
networks:
- app-network
volumes:
- .env:/app/.env
- ./information:/app/information
- ./workflow_artifacts:/app/workflow_artifacts
- ~/.azure:/root/.azure

frontend:
construct:
context: ./frontend
args:
- --no-cache
ports:
- "8501:8501"
networks:
- app-network

networks:
app-network:

That’s it! Simply run docker-compose up, and we now have an app that may run a analysis workflow primarily based on the person’s enter question, immediate the person for suggestions throughout the execution, and show the ultimate outcome to the person.

Tags: BuildingChenInteractiveLingzhenLlamaindexSepWorkflows

Related Posts

1 mkll19xekuwg7kk23hy0jg.webp.webp
Machine Learning

Agentic RAG Functions: Firm Data Slack Brokers

May 31, 2025
Bernd dittrich dt71hajoijm unsplash scaled 1.jpg
Machine Learning

The Hidden Safety Dangers of LLMs

May 29, 2025
Pexels buro millennial 636760 1438081 scaled 1.jpg
Machine Learning

How Microsoft Energy BI Elevated My Information Evaluation and Visualization Workflow

May 28, 2025
Img 0258 1024x585.png
Machine Learning

Code Brokers: The Way forward for Agentic AI

May 27, 2025
Jason dent jvd3xpqjlaq unsplash.jpg
Machine Learning

About Calculating Date Ranges in DAX

May 26, 2025
1748146670 default image.jpg
Machine Learning

Do Extra with NumPy Array Sort Hints: Annotate & Validate Form & Dtype

May 25, 2025
Next Post
Spot Bitcoin Etfs Record 4 5m In Net Inflow On September 23 Et.webp.webp

Spot Bitcoin ETFs Document $4.5M in Internet Influx on September 23 ET

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

POPULAR NEWS

0 3.png

College endowments be a part of crypto rush, boosting meme cash like Meme Index

February 10, 2025
Gemini 2.0 Fash Vs Gpt 4o.webp.webp

Gemini 2.0 Flash vs GPT 4o: Which is Higher?

January 19, 2025
1da3lz S3h Cujupuolbtvw.png

Scaling Statistics: Incremental Customary Deviation in SQL with dbt | by Yuval Gorchover | Jan, 2025

January 2, 2025
0khns0 Djocjfzxyr.jpeg

Constructing Data Graphs with LLM Graph Transformer | by Tomaz Bratanic | Nov, 2024

November 5, 2024
How To Maintain Data Quality In The Supply Chain Feature.jpg

Find out how to Preserve Knowledge High quality within the Provide Chain

September 8, 2024

EDITOR'S PICK

1726142042 Pods Deifi Returns.jpg

What’s DeFi Returns? A brand new manner of DeFi Investing – CryptoNinjas

September 12, 2024
0hq Rzxifsgxy15qr.jpeg

The Newest on LLMs: Determination-Making, Data Graphs, Reasoning Abilities, and Extra | by TDS Editors | Sep, 2024

September 5, 2024
Big Data Storage Shutterstock.jpg

New MLPerf Storage v1.0 Benchmark Outcomes Present Storage Techniques Play a Essential Position in AI Mannequin Coaching Efficiency

September 29, 2024
Ai healthcare shutterstock 2323242825 special.png

New Examine Places Claude3 and GPT-4 up In opposition to a Medical Data Strain Check

August 1, 2024

About Us

Welcome to News AI World, your go-to source for the latest in artificial intelligence news and developments. Our mission is to deliver comprehensive and insightful coverage of the rapidly evolving AI landscape, keeping you informed about breakthroughs, trends, and the transformative impact of AI technologies across industries.

Categories

  • Artificial Intelligence
  • ChatGPT
  • Crypto Coins
  • Data Science
  • Machine Learning

Recent Posts

  • The Evolution of Knowledge Lakes within the Cloud: From Storage to Intelligence
  • Cardano Backer Particulars Case for SEC Approval of Spot ADA ETF ⋆ ZyCrypto
  • The Secret Energy of Information Science in Buyer Help
  • Home
  • About Us
  • Contact Us
  • Disclaimer
  • Privacy Policy

© 2024 Newsaiworld.com. All rights reserved.

No Result
View All Result
  • Home
  • Artificial Intelligence
  • ChatGPT
  • Data Science
  • Machine Learning
  • Crypto Coins
  • Contact Us

© 2024 Newsaiworld.com. All rights reserved.

Are you sure want to unlock this post?
Unlock left : 0
Are you sure want to cancel subscription?