Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pprint_run_response doesn't print if it is ResponseModel in case of stream #1642

Open
yogin16 opened this issue Dec 26, 2024 · 8 comments
Open

Comments

@yogin16
Copy link

yogin16 commented Dec 26, 2024

pprint_run_response print blank response if it is Iterable and from structured response. Possibility in case of a Workflow.

/phi/utils/pprint.py

else:
        streaming_response_content: str = ""
        with Live() as live_log:
            status = Status("Working...", spinner="dots")
            live_log.update(status)
            response_timer = Timer()
            response_timer.start()
            for resp in run_response:
                if isinstance(resp, RunResponse) and isinstance(resp.content, str):
                    streaming_response_content += resp.content

                formatted_response = Markdown(streaming_response_content) if markdown else streaming_response_content  # type: ignore
                table = Table(box=ROUNDED, border_style="blue", show_header=False)
                if show_time:
                    table.add_row(f"Response\n({response_timer.elapsed:.1f}s)", formatted_response)  # type: ignore
                else:
                    table.add_row(formatted_response)  # type: ignore
                live_log.update(table)
            response_timer.stop()

when resp.conent is not str and something of BaseModel, it prints blank

@yogin16 yogin16 changed the title pprint_run_response doesn't support print if it is ResponseModel in case of stream pprint_run_response doesn't print if it is ResponseModel in case of stream Dec 26, 2024
@manthanguptaa
Copy link
Contributor

Hey @yogin16, can you help me with your code so that I can reproduce it on my end?

@yogin16
Copy link
Author

yogin16 commented Dec 27, 2024

import json
from typing import Optional, Iterator

from pydantic import BaseModel, Field

from phi.agent import Agent
from phi.model.openai import OpenAIChat
from phi.workflow import Workflow, RunResponse, RunEvent
from phi.storage.workflow.sqlite import SqlWorkflowStorage
from phi.tools.duckduckgo import DuckDuckGo
from phi.utils.pprint import pprint_run_response
from phi.utils.log import logger


class NewsArticle(BaseModel):
    title: str = Field(..., description="Title of the article.")
    url: str = Field(..., description="Link to the article.")
    summary: Optional[str] = Field(..., description="Summary of the article if available.")


class SearchResults(BaseModel):
    articles: list[NewsArticle]

class BlogDraft(BaseModel):
    title: str = Field(..., description="Title of the drafted blog)
    content: str = File(..., description="Blog content in the markdown format)

class BlogPostGenerator(Workflow):
    # Define an Agent that will search the web for a topic
    searcher: Agent = Agent(
        model=OpenAIChat(id="gpt-4o-mini"),
        tools=[DuckDuckGo()],
        instructions=["Given a topic, search for the top 5 articles."],
        response_model=SearchResults,
        structured_outputs=True,
    )

    # Define an Agent that will write the blog post
    writer: Agent = Agent(
        model=OpenAIChat(id="gpt-4o"),
        instructions=[
            "You will be provided with a topic and a list of top articles on that topic.",
            "Carefully read each article and generate a New York Times worthy blog post on that topic.",
            "Break the blog post into sections and provide key takeaways at the end.",
            "Make sure the title is catchy and engaging.",
            "Always provide sources, do not make up information or sources.",
        ],
        response_model=BlogDraft,
    )

    def run(self, topic: str, use_cache: bool = True) -> Iterator[RunResponse]:
        """This is where the main logic of the workflow is implemented."""

        logger.info(f"Generating a blog post on: {topic}")

        # Step 1: Use the cached blog post if use_cache is True
        if use_cache:
            cached_blog_post = self.get_cached_blog_post(topic)
            if cached_blog_post:
                yield RunResponse(content=cached_blog_post, event=RunEvent.workflow_completed)
                return

        # Step 2: Search the web for articles on the topic
        search_results: Optional[SearchResults] = self.get_search_results(topic)
        # If no search_results are found for the topic, end the workflow
        if search_results is None or len(search_results.articles) == 0:
            yield RunResponse(
                event=RunEvent.workflow_completed,
                content=f"Sorry, could not find any articles on the topic: {topic}",
            )
            return

        # Step 3: Write a blog post
        yield from self.write_blog_post(topic, search_results)

    def get_cached_blog_post(self, topic: str) -> Optional[str]:
        """Get the cached blog post for a topic."""

        logger.info("Checking if cached blog post exists")
        return self.session_state.get("blog_posts", {}).get(topic)

    def add_blog_post_to_cache(self, topic: str, blog_post: Optional[str]):
        """Add a blog post to the cache."""

        logger.info(f"Saving blog post for topic: {topic}")
        self.session_state.setdefault("blog_posts", {})
        self.session_state["blog_posts"][topic] = blog_post

    def get_search_results(self, topic: str) -> Optional[SearchResults]:
        """Get the search results for a topic."""

        MAX_ATTEMPTS = 3

        for attempt in range(MAX_ATTEMPTS):
            try:
                searcher_response: RunResponse = self.searcher.run(topic)

                # Check if we got a valid response
                if not searcher_response or not searcher_response.content:
                    logger.warning(f"Attempt {attempt + 1}/{MAX_ATTEMPTS}: Empty searcher response")
                    continue
                # Check if the response is of the expected SearchResults type
                if not isinstance(searcher_response.content, SearchResults):
                    logger.warning(f"Attempt {attempt + 1}/{MAX_ATTEMPTS}: Invalid response type")
                    continue

                article_count = len(searcher_response.content.articles)
                logger.info(f"Found {article_count} articles on attempt {attempt + 1}")
                return searcher_response.content

            except Exception as e:
                logger.warning(f"Attempt {attempt + 1}/{MAX_ATTEMPTS} failed: {str(e)}")

        logger.error(f"Failed to get search results after {MAX_ATTEMPTS} attempts")
        return None

    def write_blog_post(self, topic: str, search_results: SearchResults) -> Iterator[RunResponse]:
        """Write a blog post on a topic."""

        logger.info("Writing blog post")
        # Prepare the input for the writer
        writer_input = {"topic": topic, "articles": [v.model_dump() for v in search_results.articles]}
        # Run the writer and yield the response
        yield from self.writer.run(json.dumps(writer_input, indent=4), stream=True)
        # Save the blog post in the cache
        self.add_blog_post_to_cache(topic, self.writer.run_response.content)


# Run the workflow if the script is executed directly
if __name__ == "__main__":
    from rich.prompt import Prompt

    # Get topic from user
    topic = Prompt.ask(
        "[bold]Enter a blog post topic[/bold]\n✨",
        default="Why Cats Secretly Run the Internet",
    )

    # Convert the topic to a URL-safe string for use in session_id
    url_safe_topic = topic.lower().replace(" ", "-")

    # Initialize the blog post generator workflow
    # - Creates a unique session ID based on the topic
    # - Sets up SQLite storage for caching results
    generate_blog_post = BlogPostGenerator(
        session_id=f"generate-blog-post-on-{url_safe_topic}",
        storage=SqlWorkflowStorage(
            table_name="generate_blog_post_workflows",
            db_file="tmp/workflows.db",
        ),
    )

    # Execute the workflow with caching enabled
    # Returns an iterator of RunResponse objects containing the generated content
    blog_post: Iterator[RunResponse] = generate_blog_post.run(topic=topic, use_cache=True)

    # Print the response
    pprint_run_response(blog_post)

@manthanguptaa
Copy link
Contributor

@yogin16 the problem is that your run function is returning a tuple instead of RunResponse

@yogin16
Copy link
Author

yogin16 commented Dec 29, 2024

I think that is part of the point. tuple is coming from agent only when using ResponseModel

let me explain more in the detail.

pprint works in following case

import json
from typing import Optional, Iterator

from pydantic import BaseModel, Field

from phi.agent import Agent
from phi.model.openai import OpenAIChat
from phi.workflow import Workflow, RunResponse, RunEvent
from phi.storage.workflow.sqlite import SqlWorkflowStorage
from phi.tools.duckduckgo import DuckDuckGo
from phi.utils.pprint import pprint_run_response
from phi.utils.log import logger


class NewsArticle(BaseModel):
    title: str = Field(..., description="Title of the article.")
    url: str = Field(..., description="Link to the article.")
    summary: Optional[str] = Field(..., description="Summary of the article if available.")


class SearchResults(BaseModel):
    articles: list[NewsArticle]

class BlogDraft(BaseModel):
    title: str = Field(..., description="Title of the drafted blog")
    content: str = Field(..., description="Blog content in the markdown format")

class BlogPostGenerator(Workflow):
    # Define an Agent that will search the web for a topic
    searcher: Agent = Agent(
        model=OpenAIChat(id="gpt-4o-mini"),
        tools=[DuckDuckGo()],
        instructions=["Given a topic, search for the top 5 articles."],
        response_model=SearchResults
    )

    # Define an Agent that will write the blog post
    writer: Agent = Agent(
        model=OpenAIChat(id="gpt-4o"),
        instructions=[
            "You will be provided with a topic and a list of top articles on that topic.",
            "Carefully read each article and generate a New York Times worthy blog post on that topic.",
            "Break the blog post into sections and provide key takeaways at the end.",
            "Make sure the title is catchy and engaging.",
            "Always provide sources, do not make up information or sources.",
        ],
    )

    def run(self, topic: str, use_cache: bool = True) -> Iterator[RunResponse]:
        """This is where the main logic of the workflow is implemented."""

        logger.info(f"Generating a blog post on: {topic}")

        # Step 1: Use the cached blog post if use_cache is True
        if use_cache:
            cached_blog_post = self.get_cached_blog_post(topic)
            if cached_blog_post:
                yield RunResponse(content=cached_blog_post, event=RunEvent.workflow_completed)
                return

        # Step 2: Search the web for articles on the topic
        search_results: Optional[SearchResults] = self.get_search_results(topic)
        # If no search_results are found for the topic, end the workflow
        if search_results is None or len(search_results.articles) == 0:
            yield RunResponse(
                event=RunEvent.workflow_completed,
                content=f"Sorry, could not find any articles on the topic: {topic}",
            )
            return

        # Step 3: Write a blog post
        yield from self.write_blog_post(topic, search_results)

    def get_cached_blog_post(self, topic: str) -> Optional[str]:
        """Get the cached blog post for a topic."""

        logger.info("Checking if cached blog post exists")
        return self.session_state.get("blog_posts", {}).get(topic)

    def add_blog_post_to_cache(self, topic: str, blog_post: Optional[str]):
        """Add a blog post to the cache."""

        logger.info(f"Saving blog post for topic: {topic}")
        self.session_state.setdefault("blog_posts", {})
        self.session_state["blog_posts"][topic] = blog_post

    def get_search_results(self, topic: str) -> Optional[SearchResults]:
        """Get the search results for a topic."""

        MAX_ATTEMPTS = 3

        for attempt in range(MAX_ATTEMPTS):
            try:
                searcher_response: RunResponse = self.searcher.run(topic)

                # Check if we got a valid response
                if not searcher_response or not searcher_response.content:
                    logger.warning(f"Attempt {attempt + 1}/{MAX_ATTEMPTS}: Empty searcher response")
                    continue
                # Check if the response is of the expected SearchResults type
                if not isinstance(searcher_response.content, SearchResults):
                    logger.warning(f"Attempt {attempt + 1}/{MAX_ATTEMPTS}: Invalid response type")
                    continue

                article_count = len(searcher_response.content.articles)
                logger.info(f"Found {article_count} articles on attempt {attempt + 1}")
                return searcher_response.content

            except Exception as e:
                logger.warning(f"Attempt {attempt + 1}/{MAX_ATTEMPTS} failed: {str(e)}")

        logger.error(f"Failed to get search results after {MAX_ATTEMPTS} attempts")
        return None

    def write_blog_post(self, topic: str, search_results: SearchResults) -> Iterator[RunResponse]:
        """Write a blog post on a topic."""

        logger.info("Writing blog post")
        # Prepare the input for the writer
        writer_input = {"topic": topic, "articles": [v.model_dump() for v in search_results.articles]}
        # Run the writer and yield the response
        writer_response = self.writer.run(json.dumps(writer_input, indent=4), stream=True)
        print("***********")
        print(type(writer_response))
        print("***********")
        yield from writer_response
        # Save the blog post in the cache
        self.add_blog_post_to_cache(topic, self.writer.run_response.content)


# Run the workflow if the script is executed directly
if __name__ == "__main__":
    from rich.prompt import Prompt

    # Get topic from user
    topic = Prompt.ask(
        "[bold]Enter a blog post topic[/bold]\n✨",
        default="Why Cats Secretly Run the Internet",
    )

    # Convert the topic to a URL-safe string for use in session_id
    url_safe_topic = topic.lower().replace(" ", "-")

    # Initialize the blog post generator workflow
    # - Creates a unique session ID based on the topic
    # - Sets up SQLite storage for caching results
    generate_blog_post = BlogPostGenerator(
        session_id=f"generate-blog-post-on-{url_safe_topic}",
        storage=SqlWorkflowStorage(
            table_name="generate_blog_post_workflows",
            db_file="tmp/workflows.db",
        ),
    )

    # Execute the workflow with caching enabled
    # Returns an iterator of RunResponse objects containing the generated content
    blog_post: Iterator[RunResponse] = generate_blog_post.run(topic=topic, use_cache=True)

    print(type(blog_post))
    # Print the response
    pprint_run_response(blog_post)

    # for resp in blog_post:
    #    print(type(resp))
    #    print(resp)
    #    print(resp.content)

Notice writer agent doesn't use ResponseModel in this case

writer: Agent = Agent(
        model=OpenAIChat(id="gpt-4o"),
        instructions=[
            "You will be provided with a topic and a list of top articles on that topic.",
            "Carefully read each article and generate a New York Times worthy blog post on that topic.",
            "Break the blog post into sections and provide key takeaways at the end.",
            "Make sure the title is catchy and engaging.",
            "Always provide sources, do not make up information or sources.",
        ],

if the agent was supposed to use the response model, pprint breaks. and here is the example for broken case

i.e., adding this param for the writer agent

writer: Agent = Agent(
        model=OpenAIChat(id="gpt-4o"),
        instructions=[
            "You will be provided with a topic and a list of top articles on that topic.",
            "Carefully read each article and generate a New York Times worthy blog post on that topic.",
            "Break the blog post into sections and provide key takeaways at the end.",
            "Make sure the title is catchy and engaging.",
            "Always provide sources, do not make up information or sources.",
        ],
        response_model=BlogDraft,

Here is the complete case with this one line changed:

import json
from typing import Optional, Iterator

from pydantic import BaseModel, Field

from phi.agent import Agent
from phi.model.openai import OpenAIChat
from phi.workflow import Workflow, RunResponse, RunEvent
from phi.storage.workflow.sqlite import SqlWorkflowStorage
from phi.tools.duckduckgo import DuckDuckGo
from phi.utils.pprint import pprint_run_response
from phi.utils.log import logger


class NewsArticle(BaseModel):
    title: str = Field(..., description="Title of the article.")
    url: str = Field(..., description="Link to the article.")
    summary: Optional[str] = Field(..., description="Summary of the article if available.")


class SearchResults(BaseModel):
    articles: list[NewsArticle]

class BlogDraft(BaseModel):
    title: str = Field(..., description="Title of the drafted blog")
    content: str = Field(..., description="Blog content in the markdown format")

class BlogPostGenerator(Workflow):
    # Define an Agent that will search the web for a topic
    searcher: Agent = Agent(
        model=OpenAIChat(id="gpt-4o-mini"),
        tools=[DuckDuckGo()],
        instructions=["Given a topic, search for the top 5 articles."],
        response_model=SearchResults
    )

    # Define an Agent that will write the blog post
    writer: Agent = Agent(
        model=OpenAIChat(id="gpt-4o"),
        instructions=[
            "You will be provided with a topic and a list of top articles on that topic.",
            "Carefully read each article and generate a New York Times worthy blog post on that topic.",
            "Break the blog post into sections and provide key takeaways at the end.",
            "Make sure the title is catchy and engaging.",
            "Always provide sources, do not make up information or sources.",
        ],
        response_model=BlogDraft,   # ADDED THIS LINE TO GET STRUCTURED OUTPUT
    )

    def run(self, topic: str, use_cache: bool = True) -> Iterator[RunResponse]:
        """This is where the main logic of the workflow is implemented."""

        logger.info(f"Generating a blog post on: {topic}")

        # Step 1: Use the cached blog post if use_cache is True
        if use_cache:
            cached_blog_post = self.get_cached_blog_post(topic)
            if cached_blog_post:
                yield RunResponse(content=cached_blog_post, event=RunEvent.workflow_completed)
                return

        # Step 2: Search the web for articles on the topic
        search_results: Optional[SearchResults] = self.get_search_results(topic)
        # If no search_results are found for the topic, end the workflow
        if search_results is None or len(search_results.articles) == 0:
            yield RunResponse(
                event=RunEvent.workflow_completed,
                content=f"Sorry, could not find any articles on the topic: {topic}",
            )
            return

        # Step 3: Write a blog post
        yield from self.write_blog_post(topic, search_results)

    def get_cached_blog_post(self, topic: str) -> Optional[str]:
        """Get the cached blog post for a topic."""

        logger.info("Checking if cached blog post exists")
        return self.session_state.get("blog_posts", {}).get(topic)

    def add_blog_post_to_cache(self, topic: str, blog_post: Optional[str]):
        """Add a blog post to the cache."""

        logger.info(f"Saving blog post for topic: {topic}")
        self.session_state.setdefault("blog_posts", {})
        self.session_state["blog_posts"][topic] = blog_post

    def get_search_results(self, topic: str) -> Optional[SearchResults]:
        """Get the search results for a topic."""

        MAX_ATTEMPTS = 3

        for attempt in range(MAX_ATTEMPTS):
            try:
                searcher_response: RunResponse = self.searcher.run(topic)

                # Check if we got a valid response
                if not searcher_response or not searcher_response.content:
                    logger.warning(f"Attempt {attempt + 1}/{MAX_ATTEMPTS}: Empty searcher response")
                    continue
                # Check if the response is of the expected SearchResults type
                if not isinstance(searcher_response.content, SearchResults):
                    logger.warning(f"Attempt {attempt + 1}/{MAX_ATTEMPTS}: Invalid response type")
                    continue

                article_count = len(searcher_response.content.articles)
                logger.info(f"Found {article_count} articles on attempt {attempt + 1}")
                return searcher_response.content

            except Exception as e:
                logger.warning(f"Attempt {attempt + 1}/{MAX_ATTEMPTS} failed: {str(e)}")

        logger.error(f"Failed to get search results after {MAX_ATTEMPTS} attempts")
        return None

    def write_blog_post(self, topic: str, search_results: SearchResults) -> Iterator[RunResponse]:
        """Write a blog post on a topic."""

        logger.info("Writing blog post")
        # Prepare the input for the writer
        writer_input = {"topic": topic, "articles": [v.model_dump() for v in search_results.articles]}
        # Run the writer and yield the response
        writer_response = self.writer.run(json.dumps(writer_input, indent=4), stream=True)
        print("***********")
        print(type(writer_response))
        print("***********")
        yield from writer_response
        # Save the blog post in the cache
        self.add_blog_post_to_cache(topic, self.writer.run_response.content)


# Run the workflow if the script is executed directly
if __name__ == "__main__":
    from rich.prompt import Prompt

    # Get topic from user
    topic = Prompt.ask(
        "[bold]Enter a blog post topic[/bold]\n✨",
        default="Why Cats Secretly Run the Internet",
    )

    # Convert the topic to a URL-safe string for use in session_id
    url_safe_topic = topic.lower().replace(" ", "-")

    # Initialize the blog post generator workflow
    # - Creates a unique session ID based on the topic
    # - Sets up SQLite storage for caching results
    generate_blog_post = BlogPostGenerator(
        session_id=f"generate-blog-post-on-{url_safe_topic}",
        storage=SqlWorkflowStorage(
            table_name="generate_blog_post_workflows",
            db_file="tmp/workflows.db",
        ),
    )

    # Execute the workflow with caching enabled
    # Returns an iterator of RunResponse objects containing the generated content
    blog_post: Iterator[RunResponse] = generate_blog_post.run(topic=topic, use_cache=True)

    print(type(blog_post))
    # Print the response
    # pprint_run_response(blog_post) <- THIS DOESN'T WORK

    for resp in blog_post:
       print(type(resp))
       print(resp)
       print(resp.content)

One would assume that all agent run responses are composable for a workflow as well as pprint_run_response to handle structured response content (like it does in the non streaming case?)

@manthanguptaa
Copy link
Contributor

@yogin16 I think you are using this cookbook to write your version of it https://github.com/phidatahq/phidata/blob/main/cookbook/workflows/blog_post_generator.py

There might be a very small thing that we might be missing here but I suggest taking a look at the cookbook.

@yogin16
Copy link
Author

yogin16 commented Dec 30, 2024

Of course there are workaround available i can work with, as we are talking about just pprint util here.

let me make the point more clear still so it is actionable.

if you look the the pprint implementation here

https://github.com/phidatahq/phidata/blob/main/phi/utils/pprint.py

if isinstance(run_response.content, str):
            single_response_content = (
                Markdown(run_response.content) if markdown else run_response.get_content_as_string(indent=4)
            )
        elif isinstance(run_response.content, BaseModel):
            try:
                single_response_content = JSON(run_response.content.model_dump_json(exclude_none=True), indent=2)
            except Exception as e:
                logger.warning(f"Failed to convert response to Markdown: {e}")
        else:
            try:
                single_response_content = JSON(json.dumps(run_response.content), indent=4)
            except Exception as e:
                logger.warning(f"Failed to convert response to string: {e}")

it handles the case of BaseModel as well -- but only when run_response is a single RunResponse

for streaming no handling for BaseModel case. is that expected?

streaming_response_content: str = ""
       with Live(console=console) as live_log:
           status = Status("Working...", spinner="dots")
           live_log.update(status)
           response_timer = Timer()
           response_timer.start()
           for resp in run_response:
               if isinstance(resp, RunResponse) and isinstance(resp.content, str):
                   streaming_response_content += resp.content

               formatted_response = Markdown(streaming_response_content) if markdown else streaming_response_content  # type: ignore
               table = Table(box=ROUNDED, border_style="blue", show_header=False)
               if show_time:
                   table.add_row(f"Response\n({response_timer.elapsed:.1f}s)", formatted_response)  # type: ignore
               else:
                   table.add_row(formatted_response)  # type: ignore
               live_log.update(table)
           response_timer.stop()

do we have a cookbook example where the last agent is using a response_model?

thanks

@yogin16
Copy link
Author

yogin16 commented Dec 30, 2024

another bigger discussion point maybe is

if we have following code anywhere

agent_response = self.agent.run(json.dumps(writer_input, indent=4), stream=True)

where agent is the phidata Agent

do we always expect agent_response to be of type RunResponse?
in theory? (what is the expectation)

because it seems that it can be a tuple as well!

@manthanguptaa
Copy link
Contributor

@yogin16 confirming if you are still facing this issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants