Open Deep Research (#317)

* Create an open Deep Research

Co-authored-by: Albert Villanova del Moral <8515462+albertvillanova@users.noreply.github.com>

---------

Co-authored-by: Albert Villanova del Moral <8515462+albertvillanova@users.noreply.github.com>
This commit is contained in:
Aymeric Roucher 2025-02-04 17:13:21 +01:00 committed by GitHub
parent a8918f7a60
commit 8b02821ac2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 14221 additions and 58 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,39 @@
anthropic>=0.37.1
beautifulsoup4>=4.12.3
datasets>=2.21.0
google_search_results>=2.4.2
huggingface_hub>=0.23.4
mammoth>=1.8.0
markdownify>=0.13.1
numexpr>=2.10.1
numpy>=2.1.2
openai>=1.52.2
openpyxl
pandas>=2.2.3
pathvalidate>=3.2.1
pdfminer>=20191125
pdfminer.six>=20240706
Pillow>=11.0.0
puremagic>=1.28
pypdf>=5.1.0
python-dotenv>=1.0.1
python_pptx>=1.0.2
Requests>=2.32.3
serpapi>=0.1.5
tqdm>=4.66.4
torch>=2.2.2
torchvision>=0.17.2
transformers>=4.46.0
youtube_transcript_api>=0.6.2
chess
sympy
pubchempy
Bio
scikit-learn
scipy
pydub
PyPDF2
python-pptx
torch
xlrd
SpeechRecognition

View File

@ -0,0 +1,300 @@
import argparse
import json
import os
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
from typing import List
import datasets
import pandas as pd
from dotenv import load_dotenv
from huggingface_hub import login
from scripts.reformulator import prepare_response
from scripts.run_agents import (
get_single_file_description,
get_zip_description,
)
from scripts.text_inspector_tool import TextInspectorTool
from scripts.text_web_browser import (
ArchiveSearchTool,
FinderTool,
FindNextTool,
PageDownTool,
PageUpTool,
SearchInformationTool,
SimpleTextBrowser,
VisitTool,
)
from scripts.visual_qa import visualizer
from tqdm import tqdm
from smolagents import (
MANAGED_AGENT_PROMPT,
CodeAgent,
# HfApiModel,
LiteLLMModel,
Model,
ToolCallingAgent,
)
AUTHORIZED_IMPORTS = [
"requests",
"zipfile",
"os",
"pandas",
"numpy",
"sympy",
"json",
"bs4",
"pubchempy",
"xml",
"yahoo_finance",
"Bio",
"sklearn",
"scipy",
"pydub",
"io",
"PIL",
"chess",
"PyPDF2",
"pptx",
"torch",
"datetime",
"fractions",
"csv",
]
load_dotenv(override=True)
login(os.getenv("HF_TOKEN"))
append_answer_lock = threading.Lock()
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--concurrency", type=int, default=8)
parser.add_argument("--model-id", type=str, default="o1")
parser.add_argument("--api-base", type=str, default=None)
parser.add_argument("--run-name", type=str, required=True)
return parser.parse_args()
### IMPORTANT: EVALUATION SWITCHES
print("Make sure you deactivated Tailscale VPN, else some URLs will be blocked!")
USE_OPEN_MODELS = False
SET = "validation"
custom_role_conversions = {"tool-call": "assistant", "tool-response": "user"}
### LOAD EVALUATION DATASET
eval_ds = datasets.load_dataset("gaia-benchmark/GAIA", "2023_all")[SET]
eval_ds = eval_ds.rename_columns({"Question": "question", "Final answer": "true_answer", "Level": "task"})
def preprocess_file_paths(row):
if len(row["file_name"]) > 0:
row["file_name"] = f"data/gaia/{SET}/" + row["file_name"]
return row
eval_ds = eval_ds.map(preprocess_file_paths)
eval_df = pd.DataFrame(eval_ds)
print("Loaded evaluation dataset:")
print(eval_df["task"].value_counts())
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0"
BROWSER_CONFIG = {
"viewport_size": 1024 * 5,
"downloads_folder": "downloads_folder",
"request_kwargs": {
"headers": {"User-Agent": user_agent},
"timeout": 300,
},
"serpapi_key": os.getenv("SERPAPI_API_KEY"),
}
os.makedirs(f"./{BROWSER_CONFIG['downloads_folder']}", exist_ok=True)
def create_agent_hierarchy(model: Model):
text_limit = 100000
ti_tool = TextInspectorTool(model, text_limit)
browser = SimpleTextBrowser(**BROWSER_CONFIG)
WEB_TOOLS = [
SearchInformationTool(browser),
VisitTool(browser),
PageUpTool(browser),
PageDownTool(browser),
FinderTool(browser),
FindNextTool(browser),
ArchiveSearchTool(browser),
TextInspectorTool(model, text_limit),
]
text_webbrowser_agent = ToolCallingAgent(
model=model,
tools=WEB_TOOLS,
max_steps=20,
verbosity_level=2,
planning_interval=4,
name="search_agent",
description="""A team member that will search the internet to answer your question.
Ask him for all your questions that require browsing the web.
Provide him as much context as possible, in particular if you need to search on a specific timeframe!
And don't hesitate to provide him with a complex search task, like finding a difference between two webpages.
Your request must be a real sentence, not a google search! Like "Find me this information (...)" rather than a few keywords.
""",
provide_run_summary=True,
managed_agent_prompt=MANAGED_AGENT_PROMPT
+ """You can navigate to .txt online files.
If a non-html page is in another format, especially .pdf or a Youtube video, use tool 'inspect_file_as_text' to inspect it.
Additionally, if after some searching you find out that you need more information to answer the question, you can use `final_answer` with your request for clarification as argument to request for more information.""",
)
manager_agent = CodeAgent(
model=model,
tools=[visualizer, ti_tool],
max_steps=12,
verbosity_level=2,
additional_authorized_imports=AUTHORIZED_IMPORTS,
planning_interval=4,
managed_agents=[text_webbrowser_agent],
)
return manager_agent
def append_answer(entry: dict, jsonl_file: str) -> None:
jsonl_file = Path(jsonl_file)
jsonl_file.parent.mkdir(parents=True, exist_ok=True)
with append_answer_lock, open(jsonl_file, "a", encoding="utf-8") as fp:
fp.write(json.dumps(entry) + "\n")
assert os.path.exists(jsonl_file), "File not found!"
print("Answer exported to file:", jsonl_file.resolve())
def answer_single_question(example, model_id, answers_file, visual_inspection_tool):
model = LiteLLMModel(
model_id,
custom_role_conversions=custom_role_conversions,
max_completion_tokens=8192,
reasoning_effort="high",
)
# model = HfApiModel("Qwen/Qwen2.5-72B-Instruct", provider="together")
# "https://lnxyuvj02bpe6mam.us-east-1.aws.endpoints.huggingface.cloud",
# custom_role_conversions=custom_role_conversions,
# # provider="sambanova",
# max_tokens=8096,
# )
document_inspection_tool = TextInspectorTool(model, 100000)
agent = create_agent_hierarchy(model)
augmented_question = """You have one question to answer. It is paramount that you provide a correct answer.
Give it all you can: I know for a fact that you have access to all the relevant tools to solve it and find the correct answer (the answer does exist). Failure or 'I cannot answer' or 'None found' will not be tolerated, success will be rewarded.
Run verification steps if that's needed, you must make sure you find the correct answer!
Here is the task:
""" + example["question"]
if example["file_name"]:
if ".zip" in example["file_name"]:
prompt_use_files = "\n\nTo solve the task above, you will have to use these attached files:\n"
prompt_use_files += get_zip_description(
example["file_name"], example["question"], visual_inspection_tool, document_inspection_tool
)
else:
prompt_use_files = "\n\nTo solve the task above, you will have to use this attached file:"
prompt_use_files += get_single_file_description(
example["file_name"], example["question"], visual_inspection_tool, document_inspection_tool
)
augmented_question += prompt_use_files
start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
# Run agent 🚀
final_result = agent.run(augmented_question)
agent_memory = agent.write_memory_to_messages(summary_mode=True)
final_result = prepare_response(augmented_question, agent_memory, reformulation_model=model)
output = str(final_result)
for memory_step in agent.memory.steps:
memory_step.model_input_messages = None
intermediate_steps = [str(step) for step in agent.memory.steps]
# Check for parsing errors which indicate the LLM failed to follow the required format
parsing_error = True if any(["AgentParsingError" in step for step in intermediate_steps]) else False
# check if iteration limit exceeded
iteration_limit_exceeded = True if "Agent stopped due to iteration limit or time limit." in output else False
raised_exception = False
except Exception as e:
print("Error on ", augmented_question, e)
output = None
intermediate_steps = []
parsing_error = False
iteration_limit_exceeded = False
exception = e
raised_exception = True
end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
annotated_example = {
"agent_name": model.model_id,
"question": example["question"],
"augmented_question": augmented_question,
"prediction": output,
"intermediate_steps": intermediate_steps,
"parsing_error": parsing_error,
"iteration_limit_exceeded": iteration_limit_exceeded,
"agent_error": str(exception) if raised_exception else None,
"start_time": start_time,
"end_time": end_time,
"task": example["task"],
"task_id": example["task_id"],
"true_answer": example["true_answer"],
}
append_answer(annotated_example, answers_file)
def get_examples_to_answer(answers_file, eval_ds) -> List[dict]:
print(f"Loading answers from {answers_file}...")
try:
done_questions = pd.read_json(answers_file, lines=True)["question"].tolist()
print(f"Found {len(done_questions)} previous results!")
except Exception as e:
print("Error when loading records: ", e)
print("No usable records! ▶️ Starting new.")
done_questions = []
return [line for line in eval_ds.to_list() if line["question"] not in done_questions]
def main():
args = parse_args()
print(f"Starting run with arguments: {args}")
answers_file = f"output/{SET}/{args.run_name}.jsonl"
tasks_to_run = get_examples_to_answer(answers_file, eval_ds)
with ThreadPoolExecutor(max_workers=args.concurrency) as exe:
futures = [
exe.submit(answer_single_question, example, args.model_id, answers_file, visualizer)
for example in tasks_to_run
]
for f in tqdm(as_completed(futures), total=len(tasks_to_run), desc="Processing tasks"):
f.result()
# for example in tasks_to_run:
# answer_single_question(example, args.model_id, answers_file, visualizer)
print("All tasks processed.")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,715 @@
from requests.cookies import RequestsCookieJar
COOKIES_LIST = [
{
"domain": ".youtube.com",
"expirationDate": 1718884961,
"hostOnly": False,
"httpOnly": False,
"name": "ST-xuwub9",
"path": "/",
"sameSite": None,
"secure": False,
"session": False,
"storeId": None,
"value": "session_logininfo=AFmmF2swRAIgf4gadACOuWOcipI1anW-dakEjtidNLkufnOC8uml7EECIDh2YisqWELDBJPTGUysCucJ3I0wjXxYjVHro1LHrdW0%3AQUQ3MjNmd2Jiajl3OWZYRnpFNnZlWWV5ZGJWZ0hpcmp4LVVPU280bk4zOS03Z0ozZG9fOFhWZ0dXaVo3NG1wTEg1b3hGaG10TFBlaFBnTlJfbER5bEp0aFhoNS1OLVhYNFRZT2F6ajgzOFpDbGhlUjZpMWRETlFFRjFfTTRiM0RnNTROSkdmMTFMVjFic1VuZ2trbGp4aktDa0JJUC1BWDh3",
},
{
"domain": ".youtube.com",
"expirationDate": 1753004444.745411,
"hostOnly": False,
"httpOnly": True,
"name": "__Secure-YEC",
"path": "/",
"sameSite": "lax",
"secure": True,
"session": False,
"storeId": None,
"value": "CgtRVnI5LW1zRHlQVSjbtNCzBjIhCgJGUhIbEhcSFRMLFBUWFwwYGRobHB0eHw4PIBAREiAk",
},
{
"domain": ".youtube.com",
"expirationDate": 1753434620.050824,
"hostOnly": False,
"httpOnly": True,
"name": "__Secure-3PSID",
"path": "/",
"sameSite": "no_restriction",
"secure": True,
"session": False,
"storeId": None,
"value": "g.a000kwibeLUu8Ea9Y-vLun7u3kU5VNJVuMAZl_jdfJaNm50JyDBB4ezJ_bdWu46a7YwObVn44wACgYKAakSARQSFQHGX2MicJcTzecTKH6bHzqU6TMbTxoVAUF8yKqQYK-MoI6Ql3vI2oYTB3E-0076",
},
{
"domain": ".youtube.com",
"expirationDate": 1750420959.974642,
"hostOnly": False,
"httpOnly": False,
"name": "SIDCC",
"path": "/",
"sameSite": None,
"secure": False,
"session": False,
"storeId": None,
"value": "AKEyXzWQZauHKOo8t87zoEcjaVNIYUX54ohoWXT-tX4aAhEuZzIIptxZAcNkHuG2oDXYL6t-lw",
},
{
"domain": ".youtube.com",
"expirationDate": 1753434620.050652,
"hostOnly": False,
"httpOnly": False,
"name": "SID",
"path": "/",
"sameSite": None,
"secure": False,
"session": False,
"storeId": None,
"value": "g.a000kwibeLUu8Ea9Y-vLun7u3kU5VNJVuMAZl_jdfJaNm50JyDBB6VHrZcC3gBAsFPbCQ0gF5AACgYKAYkSARQSFQHGX2Mi9kt0gHg5CxCYSkLQGHWaeBoVAUF8yKre_V6r3jZVak6JV4o2Q0FL0076",
},
{
"domain": ".youtube.com",
"expirationDate": 1750420958.397534,
"hostOnly": False,
"httpOnly": True,
"name": "__Secure-1PSIDTS",
"path": "/",
"sameSite": None,
"secure": True,
"session": False,
"storeId": None,
"value": "sidts-CjIB3EgAEkYL2L-GfrEzW5Dfy62S9oefGNLgst78S_986htCnGcfkxECch_9oz-qytSsZBAA",
},
{
"domain": ".youtube.com",
"expirationDate": 1753433494.44729,
"hostOnly": False,
"httpOnly": False,
"name": "_ga_M0180HEFCY",
"path": "/",
"sameSite": None,
"secure": False,
"session": False,
"storeId": None,
"value": "GS1.1.1718871908.1.0.1718873494.0.0.0",
},
{
"domain": ".youtube.com",
"expirationDate": 1753434620.050933,
"hostOnly": False,
"httpOnly": False,
"name": "SAPISID",
"path": "/",
"sameSite": None,
"secure": True,
"session": False,
"storeId": None,
"value": "mfeuiC-HraNJ-A03/ASXvCPNJSw7yTFgd6",
},
{
"domain": ".youtube.com",
"expirationDate": 1750420959.974764,
"hostOnly": False,
"httpOnly": True,
"name": "__Secure-1PSIDCC",
"path": "/",
"sameSite": None,
"secure": True,
"session": False,
"storeId": None,
"value": "AKEyXzWHDSoXGCZpZhPxRrnC7B1s8zGIUjeMVyvgtQfsm1fs92lXPtFEI_td9LBUyqVUe0xK",
},
{
"domain": ".youtube.com",
"expirationDate": 1753434620.050881,
"hostOnly": False,
"httpOnly": True,
"name": "SSID",
"path": "/",
"sameSite": None,
"secure": True,
"session": False,
"storeId": None,
"value": "AmlwXHnQvOQ10LVd-",
},
{
"domain": ".youtube.com",
"expirationDate": 1753434620.050959,
"hostOnly": False,
"httpOnly": False,
"name": "__Secure-1PAPISID",
"path": "/",
"sameSite": None,
"secure": True,
"session": False,
"storeId": None,
"value": "mfeuiC-HraNJ-A03/ASXvCPNJSw7yTFgd6",
},
{
"domain": ".youtube.com",
"expirationDate": 1753434620.050795,
"hostOnly": False,
"httpOnly": True,
"name": "__Secure-1PSID",
"path": "/",
"sameSite": None,
"secure": True,
"session": False,
"storeId": None,
"value": "g.a000kwibeLUu8Ea9Y-vLun7u3kU5VNJVuMAZl_jdfJaNm50JyDBBrlk7lRpKQGywAHEon7WGQAACgYKAQsSARQSFQHGX2MirAmnSRdZl6GPG6KLd4hOihoVAUF8yKoV17Tcj1a_OenIOkf2wBjO0076",
},
{
"domain": ".youtube.com",
"expirationDate": 1753434620.050993,
"hostOnly": False,
"httpOnly": False,
"name": "__Secure-3PAPISID",
"path": "/",
"sameSite": "no_restriction",
"secure": True,
"session": False,
"storeId": None,
"value": "mfeuiC-HraNJ-A03/ASXvCPNJSw7yTFgd6",
},
{
"domain": ".youtube.com",
"expirationDate": 1750420959.974815,
"hostOnly": False,
"httpOnly": True,
"name": "__Secure-3PSIDCC",
"path": "/",
"sameSite": "no_restriction",
"secure": True,
"session": False,
"storeId": None,
"value": "AKEyXzXM5UjKUEXwSHVmRAIo6hGHA4G63adj3EE1VdNriD0f38jZQbsUKiD4LQbA3BValmTFDg",
},
{
"domain": ".youtube.com",
"expirationDate": 1750420958.397647,
"hostOnly": False,
"httpOnly": True,
"name": "__Secure-3PSIDTS",
"path": "/",
"sameSite": "no_restriction",
"secure": True,
"session": False,
"storeId": None,
"value": "sidts-CjIB3EgAEkYL2L-GfrEzW5Dfy62S9oefGNLgst78S_986htCnGcfkxECch_9oz-qytSsZBAA",
},
{
"domain": ".youtube.com",
"expirationDate": 1753434620.050908,
"hostOnly": False,
"httpOnly": False,
"name": "APISID",
"path": "/",
"sameSite": None,
"secure": False,
"session": False,
"storeId": None,
"value": "IlQWLPjdNqziwCrV/ANG7Z4x5FF-IBxbZk",
},
{
"domain": ".youtube.com",
"expirationDate": 1753434620.050855,
"hostOnly": False,
"httpOnly": True,
"name": "HSID",
"path": "/",
"sameSite": None,
"secure": False,
"session": False,
"storeId": None,
"value": "AasA7hmRuTFv7vjoq",
},
{
"domain": ".youtube.com",
"expirationDate": 1753435873.577793,
"hostOnly": False,
"httpOnly": True,
"name": "LOGIN_INFO",
"path": "/",
"sameSite": "no_restriction",
"secure": True,
"session": False,
"storeId": None,
"value": "AFmmF2swRAIgf4gadACOuWOcipI1anW-dakEjtidNLkufnOC8uml7EECIDh2YisqWELDBJPTGUysCucJ3I0wjXxYjVHro1LHrdW0:QUQ3MjNmd2Jiajl3OWZYRnpFNnZlWWV5ZGJWZ0hpcmp4LVVPU280bk4zOS03Z0ozZG9fOFhWZ0dXaVo3NG1wTEg1b3hGaG10TFBlaFBnTlJfbER5bEp0aFhoNS1OLVhYNFRZT2F6ajgzOFpDbGhlUjZpMWRETlFFRjFfTTRiM0RnNTROSkdmMTFMVjFic1VuZ2trbGp4aktDa0JJUC1BWDh3",
},
{
"domain": ".youtube.com",
"expirationDate": 1753444956.555608,
"hostOnly": False,
"httpOnly": False,
"name": "PREF",
"path": "/",
"sameSite": None,
"secure": True,
"session": False,
"storeId": None,
"value": "f4=4000000&f6=40000000&tz=Europe.Paris&f5=30000&f7=100",
},
]
COOKIES_LIST += [
{
"domain": ".www.researchgate.net",
"hostOnly": False,
"httpOnly": True,
"name": "isInstIp",
"path": "/",
"sameSite": None,
"secure": True,
"session": True,
"storeId": None,
"value": "False",
},
{
"domain": ".researchgate.net",
"expirationDate": 1734423981,
"hostOnly": False,
"httpOnly": False,
"name": "__eoi",
"path": "/",
"sameSite": None,
"secure": False,
"session": False,
"storeId": None,
"value": "ID=c26f752377373146:T=1718871981:RT=1718884914:S=AA-AfjZw-T_OOX2kW2LLaFzXImgc",
},
{
"domain": ".www.researchgate.net",
"expirationDate": 1753444909.646103,
"hostOnly": False,
"httpOnly": True,
"name": "ptc",
"path": "/",
"sameSite": None,
"secure": True,
"session": False,
"storeId": None,
"value": "RG1.8947708639250500550.1718872043",
},
{
"domain": ".researchgate.net",
"expirationDate": 1750507578,
"hostOnly": False,
"httpOnly": False,
"name": "euconsent-v2-didomi",
"path": "/",
"sameSite": "lax",
"secure": True,
"session": False,
"storeId": None,
"value": "CQAgmoAQAgmoAAHABBENA5EsAP_gAEPgAAYgJ2pB5G5UTWlBIG53YMskIAUFhFBoQEAgAACAAwIBSBIAIIwEAGAAIAgAICACAAIAIBIAIABAGAAAAAAAYIAAIAAIAAAQIAAKIAAAAAAAAgBQAAgIAgggEAAAgEBEABAAgAAAEIIAQNgACgAAACCAAAAAAAABAAAAAAAAQAAAAAAAYCQAAAJIAAAAACAIABAIAAAAAAAAAAAAAAAABBAAIJ2wPIAFAAXABQAFQALgAcAA8ACAAEgALwAZAA0ACIAEcAJgAUgAqgBcADEAGgAPQAfgBEACOAE4AMMAZYA0QBsgDkAHOAO4AfsBBwEIAItARwBHQC6gHUAO2Ae0A_4CHQEXgJ2AUOAo8BT4CpQFqALYAXmAwQBkgDLAGXANjAhCBG8CbAE3gJ1gTtAA.f_wACHwAAAAA",
},
{
"domain": ".researchgate.net",
"expirationDate": 1718885236,
"hostOnly": False,
"httpOnly": False,
"name": "_gat",
"path": "/",
"sameSite": None,
"secure": False,
"session": False,
"storeId": None,
"value": "1",
},
{
"domain": "www.researchgate.net",
"expirationDate": 1721477183,
"hostOnly": True,
"httpOnly": False,
"name": "_pbjs_userid_consent_data",
"path": "/",
"sameSite": "lax",
"secure": False,
"session": False,
"storeId": None,
"value": "3524755945110770",
},
{
"domain": ".researchgate.net",
"expirationDate": 1752567981,
"hostOnly": False,
"httpOnly": False,
"name": "__gads",
"path": "/",
"sameSite": None,
"secure": False,
"session": False,
"storeId": None,
"value": "ID=eca2adb88969c830:T=1718871981:RT=1718884914:S=ALNI_MY2qZchynrhWX6hWMlaI87Pcj9riQ",
},
{
"domain": ".researchgate.net",
"expirationDate": 1718886709.646173,
"hostOnly": False,
"httpOnly": True,
"name": "__cf_bm",
"path": "/",
"sameSite": "no_restriction",
"secure": True,
"session": False,
"storeId": None,
"value": "IkQ_J4ciBzKQduRvjqsfSmQu8UygDWbHeROO5JVccfo-1718884909-1.0.1.1-qvNGEdbfI0HfhFP6kwe7R7mkTqODNhFuKhs72lLly6K2BOPMG3kbahpQFGvPK0U8FUfkznkq65gngd1sWj7sDA",
},
{
"domain": ".researchgate.net",
"expirationDate": 1752567981,
"hostOnly": False,
"httpOnly": False,
"name": "__gpi",
"path": "/",
"sameSite": None,
"secure": False,
"session": False,
"storeId": None,
"value": "UID=00000e4e9aa2e6f2:T=1718871981:RT=1718884914:S=ALNI_MYFNrgzkKn7K6Bd2y8hC6GJCvDiSg",
},
{
"domain": ".researchgate.net",
"hostOnly": False,
"httpOnly": True,
"name": "_cfuvid",
"path": "/",
"sameSite": "no_restriction",
"secure": True,
"session": True,
"storeId": None,
"value": "_GPmGZkBymiH3UiqTqzakEpi98br3nfFUWC2_u_wqkc-1718884909785-0.0.1.1-604800000",
},
{
"domain": ".researchgate.net",
"expirationDate": 1753445177.271667,
"hostOnly": False,
"httpOnly": False,
"name": "_ga",
"path": "/",
"sameSite": None,
"secure": False,
"session": False,
"storeId": None,
"value": "GA1.1.1525244793.1718885177",
},
{
"domain": ".researchgate.net",
"expirationDate": 1753445177.271482,
"hostOnly": False,
"httpOnly": False,
"name": "_ga_4P31SJ70EJ",
"path": "/",
"sameSite": None,
"secure": False,
"session": False,
"storeId": None,
"value": "GS1.1.1718885177.1.0.1718885177.0.0.0",
},
{
"domain": ".researchgate.net",
"expirationDate": 1718971576,
"hostOnly": False,
"httpOnly": False,
"name": "_gid",
"path": "/",
"sameSite": None,
"secure": False,
"session": False,
"storeId": None,
"value": "GA1.2.854907463.1718885177",
},
{
"domain": ".www.researchgate.net",
"expirationDate": 1750407982.506505,
"hostOnly": False,
"httpOnly": True,
"name": "did",
"path": "/",
"sameSite": None,
"secure": True,
"session": False,
"storeId": None,
"value": "1dWLO3C6am8l667Q4VUlBo0O1LI49Qi2Vw21SJEXHavBDYT56DI9007W5rYGVFVH",
},
{
"domain": ".researchgate.net",
"expirationDate": 1750507578,
"hostOnly": False,
"httpOnly": False,
"name": "didomi_token",
"path": "/",
"sameSite": "lax",
"secure": True,
"session": False,
"storeId": None,
"value": "eyJ1c2VyX2lkIjoiMTkwMzU4YTUtNWU2My02Y2UzLWJlNzAtZGFjNzVmYjdiY2ExIiwiY3JlYXRlZCI6IjIwMjQtMDYtMjBUMTI6MDY6MTYuODA2WiIsInVwZGF0ZWQiOiIyMDI0LTA2LTIwVDEyOjA2OjE4Ljc4MVoiLCJ2ZW5kb3JzIjp7ImVuYWJsZWQiOlsidHdpdHRlciIsImdvb2dsZSIsImM6bGlua2VkaW4tbWFya2V0aW5nLXNvbHV0aW9ucyIsImM6b3duZXJpcSIsImM6b21uaXR1cmUtYWRvYmUtYW5hbHl0aWNzIiwiYzp0ZWNobm9yYXRpLW1lZGlhIiwiYzppbnRlcmNvbSIsImM6aW50ZW50LWlxIiwiYzppcHJvbSIsImM6bGlua2VkaW4iLCJjOmFtYXpvbmFkdi16Y1hGTEI2WCIsImM6bWVkaWFuZXQtY1V3YUtFNnoiLCJjOmluZGV4ZXhjaC1OWkNRTTY4UCIsImM6emVvdGFwZ21iLWQ3YndtdGp3IiwiYzp0cmlwbGVsaWYtZGRKSDM0clkiLCJjOnJ0YmhvdXNlLWI4Y2RIOHRNIiwiYzptZHByaW1pcy1lYU4yOVdjUCIsImM6bG9vcG1lbGktVGRhWXRCUHEiLCJjOm1hZ25pdGVpbi05d1RZTHFSRCIsImM6Ymlkc3dpdGNoLWQ2N0V3N1c5IiwiYzpvcmFjbGVhZHYtcUhlREptQUwiLCJjOmdvb2dsZWFuYS00VFhuSmlnUiIsImM6bG90YW1lc29sLURIaTdMUmpNIiwiYzpuZXh0bWlsbGUtR0pyZlg4VWMiLCJjOm5yaWNodGVjLXFVVlEyUlFxIiwiYzpicml0ZXBvb2wtQldWeVdHeVUiLCJjOnRhcGFkaW5jLXFxY2tVN1BXIiwiYzppZDV0ZWNobi16Tk1KNGR3ZiIsImM6bWljcm9zb2Z0IiwiYzpwZXJtdXRpdmUtSjdpaHJlTWsiLCJjOm9wZXJhc29mdC1CY1hjRFZKTSIsImM6cG9zdGhvZy1Cakp4RmRGOSJdfSwicHVycG9zZXMiOnsiZW5hYmxlZCI6WyJnZW9sb2NhdGlvbl9kYXRhIiwiZGV2aWNlX2NoYXJhY3RlcmlzdGljcyJdfSwidmVuZG9yc19saSI6eyJlbmFibGVkIjpbImdvb2dsZSIsImM6b3BlcmFzb2Z0LUJjWGNEVkpNIl19LCJ2ZXJzaW9uIjoyLCJhYyI6IkRIU0FvQUZrQWNnQTVnSHFnUUhBeGdCNndEMTRJR0FRTkFqMEJJd0NTY0VyQUtCd1YtZ3MxQmgwREc0R09nQUEuREhTQW9BRmtBY2dBNWdIcWdRSEF4Z0I2d0QxNElHQVFOQWowQkl3Q1NjRXJBS0J3Vi1nczFCaDBERzRHT2dBQSJ9",
},
{
"domain": ".www.researchgate.net",
"hostOnly": False,
"httpOnly": True,
"name": "hasPdpNext",
"path": "/",
"sameSite": None,
"secure": True,
"session": True,
"storeId": None,
"value": "False",
},
{
"domain": ".researchgate.net",
"expirationDate": 1750421183,
"hostOnly": False,
"httpOnly": False,
"name": "ph_phc_ma1XTQyee96N1GML6qUTgLQRiDifnRcE9STiHTZ0CfZ_posthog",
"path": "/",
"sameSite": "lax",
"secure": True,
"session": False,
"storeId": None,
"value": "%7B%22distinct_id%22%3A%220190358a-56a1-7313-83b0-d13dddeac787%22%2C%22%24sesid%22%3A%5B1718885183223%2C%220190358a-56a1-7313-83b0-d13b2b87778d%22%2C1718885176993%5D%2C%22%24session_is_sampled%22%3Atrue%7D",
},
{
"domain": ".www.researchgate.net",
"hostOnly": False,
"httpOnly": True,
"name": "sid",
"path": "/",
"sameSite": None,
"secure": True,
"session": True,
"storeId": None,
"value": "qmH5Lc4f0CUJ3zeaxORcV0S8I8V1MuCFZtcIQqPYtv1XPejrbSLAQRbT50PL40TqeKQ1XsQDWt9gtYVzuL80bRmPjw6jn3cQ0ikNqW40maHcQ3JL2Vfa8ZZf0j7p35eJ",
},
]
COOKIES_LIST += [
{
"domain": "github.com",
"hostOnly": True,
"httpOnly": True,
"name": "_gh_sess",
"path": "/",
"sameSite": "lax",
"secure": True,
"session": True,
"storeId": None,
"value": "P%2Fmof1avuqwHaUQUIJR%2FZYn7jqbT7lgGuTGjp1BGAFIG5UpNDusEE3b8dRjz0eATE5xPdPjLYFqMs%2FI9AOalKX4YuYfSEEnxCMawU01099b4o9Xzzcv%2BmecrmO0Q8q%2Bdq1h8SIv6nvPP7HzlFesl8ysafb9b%2F0q6dTArKdSOurasza8UgLSYD08ofA50Pcm0IG7CTzF8ZCizrGgGTMi%2F%2B7L3E17jav5PM1Sf2vQKg15Gbg1QIOppJJHzlufgQoZigqFv%2BWznaws0Tt7Y2lSFCw%3D%3D--CJRhqMXJnwOaJgk4--DhUErlL4GdROikEjKD4O9g%3D%3D",
},
{
"domain": ".github.com",
"expirationDate": 1750408875.763785,
"hostOnly": False,
"httpOnly": False,
"name": "_octo",
"path": "/",
"sameSite": "lax",
"secure": True,
"session": False,
"storeId": None,
"value": "GH1.1.728652011.1718872875",
},
{
"domain": ".github.com",
"expirationDate": 1750408875.763926,
"hostOnly": False,
"httpOnly": True,
"name": "logged_in",
"path": "/",
"sameSite": "lax",
"secure": True,
"session": False,
"storeId": None,
"value": "no",
},
{
"domain": ".github.com",
"hostOnly": False,
"httpOnly": False,
"name": "preferred_color_mode",
"path": "/",
"sameSite": "lax",
"secure": True,
"session": True,
"storeId": None,
"value": "dark",
},
{
"domain": ".github.com",
"hostOnly": False,
"httpOnly": False,
"name": "tz",
"path": "/",
"sameSite": "lax",
"secure": True,
"session": True,
"storeId": None,
"value": "Europe%2FParis",
},
]
COOKIES_LIST += [
{
"domain": ".web.archive.org",
"expirationDate": 1718886430,
"hostOnly": False,
"httpOnly": False,
"name": "_gat",
"path": "/web/20201123221659/http://orcid.org/",
"sameSite": None,
"secure": False,
"session": False,
"storeId": None,
"value": "1",
},
{
"domain": ".web.archive.org",
"expirationDate": 1718972770,
"hostOnly": False,
"httpOnly": False,
"name": "_gid",
"path": "/web/20201123221659/http://orcid.org/",
"sameSite": None,
"secure": False,
"session": False,
"storeId": None,
"value": "GA1.2.402246368.1606169825",
},
{
"domain": ".web.archive.org",
"expirationDate": 1753446370.315621,
"hostOnly": False,
"httpOnly": False,
"name": "_ga",
"path": "/web/20201123221659/http://orcid.org/",
"sameSite": None,
"secure": False,
"session": False,
"storeId": None,
"value": "GA1.2.1301409987.1606169825",
},
{
"domain": ".web.archive.org",
"expirationDate": 1750422367,
"hostOnly": False,
"httpOnly": False,
"name": "_hjid",
"path": "/web/20201123221659/http://orcid.org/",
"sameSite": "lax",
"secure": False,
"session": False,
"storeId": None,
"value": "07f80263-a631-4bf4-8ffd-8fc8912085e2",
},
{
"domain": ".web.archive.org",
"expirationDate": 1718888167,
"hostOnly": False,
"httpOnly": False,
"name": "_hjFirstSeen",
"path": "/web/20201123221659/http://orcid.org/",
"sameSite": "lax",
"secure": False,
"session": False,
"storeId": None,
"value": "1",
},
]
COOKIES_LIST += [
{
"domain": "orcid.org",
"hostOnly": True,
"httpOnly": False,
"name": "AWSELBCORS",
"path": "/",
"sameSite": "no_restriction",
"secure": True,
"session": True,
"storeId": None,
"value": "CBD1D7FF1216388FA48838CBCA4774FD22800B8FB548A40EF92BB0994D5B77A8410307CDEAA69C52236663F2BF89B252C17BC0FCDF790FD59771BDDF6EA8CA4CFD29D8733F",
},
{
"domain": ".orcid.org",
"expirationDate": 1753452454.637671,
"hostOnly": False,
"httpOnly": False,
"name": "_ga_9R61FWK9H5",
"path": "/",
"sameSite": None,
"secure": False,
"session": False,
"storeId": None,
"value": "GS1.1.1718892454.1.0.1718892454.0.0.0",
},
{
"domain": ".orcid.org",
"expirationDate": 1753452454.63421,
"hostOnly": False,
"httpOnly": False,
"name": "_ga",
"path": "/",
"sameSite": None,
"secure": False,
"session": False,
"storeId": None,
"value": "GA1.1.2021310691.1718892455",
},
{
"domain": "orcid.org",
"hostOnly": True,
"httpOnly": False,
"name": "AWSELB",
"path": "/",
"sameSite": None,
"secure": False,
"session": True,
"storeId": None,
"value": "CBD1D7FF1216388FA48838CBCA4774FD22800B8FB548A40EF92BB0994D5B77A8410307CDEAA69C52236663F2BF89B252C17BC0FCDF790FD59771BDDF6EA8CA4CFD29D8733F",
},
{
"domain": ".orcid.org",
"expirationDate": 1750428454,
"hostOnly": False,
"httpOnly": False,
"name": "OptanonAlertBoxClosed",
"path": "/",
"sameSite": "lax",
"secure": False,
"session": False,
"storeId": None,
"value": "2024-06-20T14:07:34.583Z",
},
{
"domain": ".orcid.org",
"expirationDate": 1750428454,
"hostOnly": False,
"httpOnly": False,
"name": "OptanonConsent",
"path": "/",
"sameSite": "lax",
"secure": False,
"session": False,
"storeId": None,
"value": "isGpcEnabled=0&datestamp=Thu+Jun+20+2024+16%3A07%3A34+GMT%2B0200+(heure+d%E2%80%99%C3%A9t%C3%A9+d%E2%80%99Europe+centrale)&version=202310.2.0&browserGpcFlag=0&isIABGlobal=False&hosts=&landingPath=NotLandingPage&groups=C0001%3A1%2CC0003%3A1%2CC0002%3A1%2CC0004%3A1",
},
{
"domain": "orcid.org",
"hostOnly": True,
"httpOnly": False,
"name": "XSRF-TOKEN",
"path": "/",
"sameSite": None,
"secure": True,
"session": True,
"storeId": None,
"value": "6957be7a-bcb4-4d59-a522-ea9b6b210ed9",
},
]
# Create a RequestsCookieJar instance
COOKIES = RequestsCookieJar()
# Add cookies to the jar
for cookie in COOKIES_LIST:
COOKIES.set(cookie["name"], cookie["value"], domain=cookie["domain"], path=cookie["path"])

View File

@ -0,0 +1,124 @@
import re
import string
import warnings
def normalize_number_str(number_str: str) -> float:
# we replace these common units and commas to allow
# conversion to float
for char in ["$", "%", ","]:
number_str = number_str.replace(char, "")
try:
return float(number_str)
except ValueError:
print(f"String {number_str} cannot be normalized to number str.")
return float("inf")
def split_string(
s: str,
char_list: list[str] = [",", ";"],
) -> list[str]:
pattern = f"[{''.join(char_list)}]"
return re.split(pattern, s)
def is_float(element: any) -> bool:
try:
float(element)
return True
except ValueError:
return False
def question_scorer(
model_answer: str,
ground_truth: str,
) -> bool:
# if gt is a number
if is_float(ground_truth):
normalized_answer = normalize_number_str(str(model_answer))
return normalized_answer == float(ground_truth)
# if gt is a list
elif any(char in ground_truth for char in [",", ";"]):
# question with the fish: normalization removes punct
gt_elems = split_string(ground_truth)
ma_elems = split_string(model_answer)
# check length is the same
if len(gt_elems) != len(ma_elems):
warnings.warn("Answer lists have different lengths, returning False.", UserWarning)
return False
# compare each element as float or str
comparisons = []
for ma_elem, gt_elem in zip(ma_elems, gt_elems):
if is_float(gt_elem):
normalized_ma_elem = normalize_number_str(ma_elem)
comparisons.append(normalized_ma_elem == float(gt_elem))
else:
# we do not remove punct since comparisons can include punct
comparisons.append(
normalize_str(ma_elem, remove_punct=False) == normalize_str(gt_elem, remove_punct=False)
)
return all(comparisons)
# if gt is a str
else:
return normalize_str(model_answer) == normalize_str(ground_truth)
def check_prediction_contains_answer_letters_in_order(prediction, true_answer):
prediction = prediction.lower()
true_answer = true_answer.lower()
if len(prediction) > len(true_answer) * 3:
return False
i = 0
for letter in true_answer:
if letter in prediction[i:]:
i += prediction[i:].index(letter)
else:
return False
return True
def check_close_call(prediction, true_answer, is_correct):
if is_correct:
return True
else:
if is_float(true_answer):
return is_correct
else:
if (
check_prediction_contains_answer_letters_in_order(str(prediction), str(true_answer))
and len(str(true_answer)) * 0.5 <= len(str(prediction)) <= len(str(true_answer)) * 2
):
print(f"Close call: {prediction} vs {true_answer}")
return True
else:
return False
def normalize_str(input_str, remove_punct=True) -> str:
"""
Normalize a string by:
- Removing all white spaces
- Optionally removing punctuation (if remove_punct is True)
- Converting to lowercase
Parameters:
- input_str: str, the string to normalize
- remove_punct: bool, whether to remove punctuation (default: True)
Returns:
- str, the normalized string
"""
# Remove all white spaces. Required e.g for seagull vs. sea gull
no_spaces = re.sub(r"\s", "", input_str)
# Remove punctuation, if specified.
if remove_punct:
translator = str.maketrans("", "", string.punctuation)
return no_spaces.lower().translate(translator)
else:
return no_spaces.lower()

View File

@ -0,0 +1,949 @@
# This is copied from Magentic-one's great repo: https://github.com/microsoft/autogen/blob/v0.4.4/python/packages/autogen-magentic-one/src/autogen_magentic_one/markdown_browser/mdconvert.py
# Thanks to Microsoft researchers for open-sourcing this!
# type: ignore
import base64
import copy
import html
import json
import mimetypes
import os
import re
import shutil
import subprocess
import sys
import tempfile
import traceback
from typing import Any, Dict, List, Optional, Union
from urllib.parse import parse_qs, quote, unquote, urlparse, urlunparse
import mammoth
import markdownify
import pandas as pd
import pdfminer
import pdfminer.high_level
import pptx
# File-format detection
import puremagic
import pydub
import requests
import speech_recognition as sr
from bs4 import BeautifulSoup
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api.formatters import SRTFormatter
class _CustomMarkdownify(markdownify.MarkdownConverter):
"""
A custom version of markdownify's MarkdownConverter. Changes include:
- Altering the default heading style to use '#', '##', etc.
- Removing javascript hyperlinks.
- Truncating images with large data:uri sources.
- Ensuring URIs are properly escaped, and do not conflict with Markdown syntax
"""
def __init__(self, **options: Any):
options["heading_style"] = options.get("heading_style", markdownify.ATX)
# Explicitly cast options to the expected type if necessary
super().__init__(**options)
def convert_hn(self, n: int, el: Any, text: str, convert_as_inline: bool) -> str:
"""Same as usual, but be sure to start with a new line"""
if not convert_as_inline:
if not re.search(r"^\n", text):
return "\n" + super().convert_hn(n, el, text, convert_as_inline) # type: ignore
return super().convert_hn(n, el, text, convert_as_inline) # type: ignore
def convert_a(self, el: Any, text: str, convert_as_inline: bool):
"""Same as usual converter, but removes Javascript links and escapes URIs."""
prefix, suffix, text = markdownify.chomp(text) # type: ignore
if not text:
return ""
href = el.get("href")
title = el.get("title")
# Escape URIs and skip non-http or file schemes
if href:
try:
parsed_url = urlparse(href) # type: ignore
if parsed_url.scheme and parsed_url.scheme.lower() not in ["http", "https", "file"]: # type: ignore
return "%s%s%s" % (prefix, text, suffix)
href = urlunparse(parsed_url._replace(path=quote(unquote(parsed_url.path)))) # type: ignore
except ValueError: # It's not clear if this ever gets thrown
return "%s%s%s" % (prefix, text, suffix)
# For the replacement see #29: text nodes underscores are escaped
if (
self.options["autolinks"]
and text.replace(r"\_", "_") == href
and not title
and not self.options["default_title"]
):
# Shortcut syntax
return "<%s>" % href
if self.options["default_title"] and not title:
title = href
title_part = ' "%s"' % title.replace('"', r"\"") if title else ""
return "%s[%s](%s%s)%s" % (prefix, text, href, title_part, suffix) if href else text
def convert_img(self, el: Any, text: str, convert_as_inline: bool) -> str:
"""Same as usual converter, but removes data URIs"""
alt = el.attrs.get("alt", None) or ""
src = el.attrs.get("src", None) or ""
title = el.attrs.get("title", None) or ""
title_part = ' "%s"' % title.replace('"', r"\"") if title else ""
if convert_as_inline and el.parent.name not in self.options["keep_inline_images_in"]:
return alt
# Remove dataURIs
if src.startswith("data:"):
src = src.split(",")[0] + "..."
return "![%s](%s%s)" % (alt, src, title_part)
def convert_soup(self, soup: Any) -> str:
return super().convert_soup(soup) # type: ignore
class DocumentConverterResult:
"""The result of converting a document to text."""
def __init__(self, title: Union[str, None] = None, text_content: str = ""):
self.title: Union[str, None] = title
self.text_content: str = text_content
class DocumentConverter:
"""Abstract superclass of all DocumentConverters."""
def convert(self, local_path: str, **kwargs: Any) -> Union[None, DocumentConverterResult]:
raise NotImplementedError()
class PlainTextConverter(DocumentConverter):
"""Anything with content type text/plain"""
def convert(self, local_path: str, **kwargs: Any) -> Union[None, DocumentConverterResult]:
# Guess the content type from any file extension that might be around
content_type, _ = mimetypes.guess_type("__placeholder" + kwargs.get("file_extension", ""))
# Only accept text files
if content_type is None:
return None
# elif "text/" not in content_type.lower():
# return None
text_content = ""
with open(local_path, "rt", encoding="utf-8") as fh:
text_content = fh.read()
return DocumentConverterResult(
title=None,
text_content=text_content,
)
class HtmlConverter(DocumentConverter):
"""Anything with content type text/html"""
def convert(self, local_path: str, **kwargs: Any) -> Union[None, DocumentConverterResult]:
# Bail if not html
extension = kwargs.get("file_extension", "")
if extension.lower() not in [".html", ".htm"]:
return None
result = None
with open(local_path, "rt", encoding="utf-8") as fh:
result = self._convert(fh.read())
return result
def _convert(self, html_content: str) -> Union[None, DocumentConverterResult]:
"""Helper function that converts and HTML string."""
# Parse the string
soup = BeautifulSoup(html_content, "html.parser")
# Remove javascript and style blocks
for script in soup(["script", "style"]):
script.extract()
# Print only the main content
body_elm = soup.find("body")
webpage_text = ""
if body_elm:
webpage_text = _CustomMarkdownify().convert_soup(body_elm)
else:
webpage_text = _CustomMarkdownify().convert_soup(soup)
assert isinstance(webpage_text, str)
return DocumentConverterResult(
title=None if soup.title is None else soup.title.string, text_content=webpage_text
)
class WikipediaConverter(DocumentConverter):
"""Handle Wikipedia pages separately, focusing only on the main document content."""
def convert(self, local_path: str, **kwargs: Any) -> Union[None, DocumentConverterResult]:
# Bail if not Wikipedia
extension = kwargs.get("file_extension", "")
if extension.lower() not in [".html", ".htm"]:
return None
url = kwargs.get("url", "")
if not re.search(r"^https?:\/\/[a-zA-Z]{2,3}\.wikipedia.org\/", url):
return None
# Parse the file
soup = None
with open(local_path, "rt", encoding="utf-8") as fh:
soup = BeautifulSoup(fh.read(), "html.parser")
# Remove javascript and style blocks
for script in soup(["script", "style"]):
script.extract()
# Print only the main content
body_elm = soup.find("div", {"id": "mw-content-text"})
title_elm = soup.find("span", {"class": "mw-page-title-main"})
webpage_text = ""
main_title = None if soup.title is None else soup.title.string
if body_elm:
# What's the title
if title_elm and len(title_elm) > 0:
main_title = title_elm.string # type: ignore
assert isinstance(main_title, str)
# Convert the page
webpage_text = f"# {main_title}\n\n" + _CustomMarkdownify().convert_soup(body_elm)
else:
webpage_text = _CustomMarkdownify().convert_soup(soup)
return DocumentConverterResult(
title=main_title,
text_content=webpage_text,
)
class YouTubeConverter(DocumentConverter):
"""Handle YouTube specially, focusing on the video title, description, and transcript."""
def convert(self, local_path: str, **kwargs: Any) -> Union[None, DocumentConverterResult]:
# Bail if not YouTube
extension = kwargs.get("file_extension", "")
if extension.lower() not in [".html", ".htm"]:
return None
url = kwargs.get("url", "")
if not url.startswith("https://www.youtube.com/watch?"):
return None
# Parse the file
soup = None
with open(local_path, "rt", encoding="utf-8") as fh:
soup = BeautifulSoup(fh.read(), "html.parser")
# Read the meta tags
assert soup.title is not None and soup.title.string is not None
metadata: Dict[str, str] = {"title": soup.title.string}
for meta in soup(["meta"]):
for a in meta.attrs:
if a in ["itemprop", "property", "name"]:
metadata[meta[a]] = meta.get("content", "")
break
# We can also try to read the full description. This is more prone to breaking, since it reaches into the page implementation
try:
for script in soup(["script"]):
content = script.text
if "ytInitialData" in content:
lines = re.split(r"\r?\n", content)
obj_start = lines[0].find("{")
obj_end = lines[0].rfind("}")
if obj_start >= 0 and obj_end >= 0:
data = json.loads(lines[0][obj_start : obj_end + 1])
attrdesc = self._findKey(data, "attributedDescriptionBodyText") # type: ignore
if attrdesc:
metadata["description"] = str(attrdesc["content"])
break
except Exception:
pass
# Start preparing the page
webpage_text = "# YouTube\n"
title = self._get(metadata, ["title", "og:title", "name"]) # type: ignore
assert isinstance(title, str)
if title:
webpage_text += f"\n## {title}\n"
stats = ""
views = self._get(metadata, ["interactionCount"]) # type: ignore
if views:
stats += f"- **Views:** {views}\n"
keywords = self._get(metadata, ["keywords"]) # type: ignore
if keywords:
stats += f"- **Keywords:** {keywords}\n"
runtime = self._get(metadata, ["duration"]) # type: ignore
if runtime:
stats += f"- **Runtime:** {runtime}\n"
if len(stats) > 0:
webpage_text += f"\n### Video Metadata\n{stats}\n"
description = self._get(metadata, ["description", "og:description"]) # type: ignore
if description:
webpage_text += f"\n### Description\n{description}\n"
transcript_text = ""
parsed_url = urlparse(url) # type: ignore
params = parse_qs(parsed_url.query) # type: ignore
if "v" in params:
assert isinstance(params["v"][0], str)
video_id = str(params["v"][0])
try:
# Must be a single transcript.
transcript = YouTubeTranscriptApi.get_transcript(video_id) # type: ignore
# transcript_text = " ".join([part["text"] for part in transcript]) # type: ignore
# Alternative formatting:
transcript_text = SRTFormatter().format_transcript(transcript)
except Exception:
pass
if transcript_text:
webpage_text += f"\n### Transcript\n{transcript_text}\n"
title = title if title else soup.title.string
assert isinstance(title, str)
return DocumentConverterResult(
title=title,
text_content=webpage_text,
)
def _get(self, metadata: Dict[str, str], keys: List[str], default: Union[str, None] = None) -> Union[str, None]:
for k in keys:
if k in metadata:
return metadata[k]
return default
def _findKey(self, json: Any, key: str) -> Union[str, None]: # TODO: Fix json type
if isinstance(json, list):
for elm in json:
ret = self._findKey(elm, key)
if ret is not None:
return ret
elif isinstance(json, dict):
for k in json:
if k == key:
return json[k]
else:
ret = self._findKey(json[k], key)
if ret is not None:
return ret
return None
class PdfConverter(DocumentConverter):
"""
Converts PDFs to Markdown. Most style information is ignored, so the results are essentially plain-text.
"""
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# Bail if not a PDF
extension = kwargs.get("file_extension", "")
if extension.lower() != ".pdf":
return None
return DocumentConverterResult(
title=None,
text_content=pdfminer.high_level.extract_text(local_path),
)
class DocxConverter(HtmlConverter):
"""
Converts DOCX files to Markdown. Style information (e.g.m headings) and tables are preserved where possible.
"""
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# Bail if not a DOCX
extension = kwargs.get("file_extension", "")
if extension.lower() != ".docx":
return None
result = None
with open(local_path, "rb") as docx_file:
result = mammoth.convert_to_html(docx_file)
html_content = result.value
result = self._convert(html_content)
return result
class XlsxConverter(HtmlConverter):
"""
Converts XLSX files to Markdown, with each sheet presented as a separate Markdown table.
"""
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# Bail if not a XLSX
extension = kwargs.get("file_extension", "")
if extension.lower() not in [".xlsx", ".xls"]:
return None
sheets = pd.read_excel(local_path, sheet_name=None)
md_content = ""
for s in sheets:
md_content += f"## {s}\n"
html_content = sheets[s].to_html(index=False)
md_content += self._convert(html_content).text_content.strip() + "\n\n"
return DocumentConverterResult(
title=None,
text_content=md_content.strip(),
)
class PptxConverter(HtmlConverter):
"""
Converts PPTX files to Markdown. Supports heading, tables and images with alt text.
"""
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# Bail if not a PPTX
extension = kwargs.get("file_extension", "")
if extension.lower() != ".pptx":
return None
md_content = ""
presentation = pptx.Presentation(local_path)
slide_num = 0
for slide in presentation.slides:
slide_num += 1
md_content += f"\n\n<!-- Slide number: {slide_num} -->\n"
title = slide.shapes.title
for shape in slide.shapes:
# Pictures
if self._is_picture(shape):
# https://github.com/scanny/python-pptx/pull/512#issuecomment-1713100069
alt_text = ""
try:
alt_text = shape._element._nvXxPr.cNvPr.attrib.get("descr", "")
except Exception:
pass
# A placeholder name
filename = re.sub(r"\W", "", shape.name) + ".jpg"
md_content += "\n![" + (alt_text if alt_text else shape.name) + "](" + filename + ")\n"
# Tables
if self._is_table(shape):
html_table = "<html><body><table>"
first_row = True
for row in shape.table.rows:
html_table += "<tr>"
for cell in row.cells:
if first_row:
html_table += "<th>" + html.escape(cell.text) + "</th>"
else:
html_table += "<td>" + html.escape(cell.text) + "</td>"
html_table += "</tr>"
first_row = False
html_table += "</table></body></html>"
md_content += "\n" + self._convert(html_table).text_content.strip() + "\n"
# Text areas
elif shape.has_text_frame:
if shape == title:
md_content += "# " + shape.text.lstrip() + "\n"
else:
md_content += shape.text + "\n"
md_content = md_content.strip()
if slide.has_notes_slide:
md_content += "\n\n### Notes:\n"
notes_frame = slide.notes_slide.notes_text_frame
if notes_frame is not None:
md_content += notes_frame.text
md_content = md_content.strip()
return DocumentConverterResult(
title=None,
text_content=md_content.strip(),
)
def _is_picture(self, shape):
if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PICTURE:
return True
if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PLACEHOLDER:
if hasattr(shape, "image"):
return True
return False
def _is_table(self, shape):
if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.TABLE:
return True
return False
class MediaConverter(DocumentConverter):
"""
Abstract class for multi-modal media (e.g., images and audio)
"""
def _get_metadata(self, local_path):
exiftool = shutil.which("exiftool")
if not exiftool:
return None
else:
try:
result = subprocess.run([exiftool, "-json", local_path], capture_output=True, text=True).stdout
return json.loads(result)[0]
except Exception:
return None
class WavConverter(MediaConverter):
"""
Converts WAV files to markdown via extraction of metadata (if `exiftool` is installed), and speech transcription (if `speech_recognition` is installed).
"""
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# Bail if not a XLSX
extension = kwargs.get("file_extension", "")
if extension.lower() != ".wav":
return None
md_content = ""
# Add metadata
metadata = self._get_metadata(local_path)
if metadata:
for f in [
"Title",
"Artist",
"Author",
"Band",
"Album",
"Genre",
"Track",
"DateTimeOriginal",
"CreateDate",
"Duration",
]:
if f in metadata:
md_content += f"{f}: {metadata[f]}\n"
# Transcribe
try:
transcript = self._transcribe_audio(local_path)
md_content += "\n\n### Audio Transcript:\n" + ("[No speech detected]" if transcript == "" else transcript)
except Exception:
md_content += "\n\n### Audio Transcript:\nError. Could not transcribe this audio."
return DocumentConverterResult(
title=None,
text_content=md_content.strip(),
)
def _transcribe_audio(self, local_path) -> str:
recognizer = sr.Recognizer()
with sr.AudioFile(local_path) as source:
audio = recognizer.record(source)
return recognizer.recognize_google(audio).strip()
class Mp3Converter(WavConverter):
"""
Converts MP3 files to markdown via extraction of metadata (if `exiftool` is installed), and speech transcription (if `speech_recognition` AND `pydub` are installed).
"""
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# Bail if not a MP3
extension = kwargs.get("file_extension", "")
if extension.lower() != ".mp3":
return None
md_content = ""
# Add metadata
metadata = self._get_metadata(local_path)
if metadata:
for f in [
"Title",
"Artist",
"Author",
"Band",
"Album",
"Genre",
"Track",
"DateTimeOriginal",
"CreateDate",
"Duration",
]:
if f in metadata:
md_content += f"{f}: {metadata[f]}\n"
# Transcribe
handle, temp_path = tempfile.mkstemp(suffix=".wav")
os.close(handle)
try:
sound = pydub.AudioSegment.from_mp3(local_path)
sound.export(temp_path, format="wav")
_args = dict()
_args.update(kwargs)
_args["file_extension"] = ".wav"
try:
transcript = super()._transcribe_audio(temp_path).strip()
md_content += "\n\n### Audio Transcript:\n" + (
"[No speech detected]" if transcript == "" else transcript
)
except Exception:
md_content += "\n\n### Audio Transcript:\nError. Could not transcribe this audio."
finally:
os.unlink(temp_path)
# Return the result
return DocumentConverterResult(
title=None,
text_content=md_content.strip(),
)
class ImageConverter(MediaConverter):
"""
Converts images to markdown via extraction of metadata (if `exiftool` is installed), OCR (if `easyocr` is installed), and description via a multimodal LLM (if an mlm_client is configured).
"""
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# Bail if not a XLSX
extension = kwargs.get("file_extension", "")
if extension.lower() not in [".jpg", ".jpeg", ".png"]:
return None
md_content = ""
# Add metadata
metadata = self._get_metadata(local_path)
if metadata:
for f in [
"ImageSize",
"Title",
"Caption",
"Description",
"Keywords",
"Artist",
"Author",
"DateTimeOriginal",
"CreateDate",
"GPSPosition",
]:
if f in metadata:
md_content += f"{f}: {metadata[f]}\n"
# Try describing the image with GPTV
mlm_client = kwargs.get("mlm_client")
mlm_model = kwargs.get("mlm_model")
if mlm_client is not None and mlm_model is not None:
md_content += (
"\n# Description:\n"
+ self._get_mlm_description(
local_path, extension, mlm_client, mlm_model, prompt=kwargs.get("mlm_prompt")
).strip()
+ "\n"
)
return DocumentConverterResult(
title=None,
text_content=md_content,
)
def _get_mlm_description(self, local_path, extension, client, model, prompt=None):
if prompt is None or prompt.strip() == "":
prompt = "Write a detailed caption for this image."
sys.stderr.write(f"MLM Prompt:\n{prompt}\n")
data_uri = ""
with open(local_path, "rb") as image_file:
content_type, encoding = mimetypes.guess_type("_dummy" + extension)
if content_type is None:
content_type = "image/jpeg"
image_base64 = base64.b64encode(image_file.read()).decode("utf-8")
data_uri = f"data:{content_type};base64,{image_base64}"
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": data_uri,
},
},
],
}
]
response = client.chat.completions.create(model=model, messages=messages)
return response.choices[0].message.content
class FileConversionException(BaseException):
pass
class UnsupportedFormatException(BaseException):
pass
class MarkdownConverter:
"""(In preview) An extremely simple text-based document reader, suitable for LLM use.
This reader will convert common file-types or webpages to Markdown."""
def __init__(
self,
requests_session: Optional[requests.Session] = None,
mlm_client: Optional[Any] = None,
mlm_model: Optional[Any] = None,
):
if requests_session is None:
self._requests_session = requests.Session()
else:
self._requests_session = requests_session
self._mlm_client = mlm_client
self._mlm_model = mlm_model
self._page_converters: List[DocumentConverter] = []
# Register converters for successful browsing operations
# Later registrations are tried first / take higher priority than earlier registrations
# To this end, the most specific converters should appear below the most generic converters
self.register_page_converter(PlainTextConverter())
self.register_page_converter(HtmlConverter())
self.register_page_converter(WikipediaConverter())
self.register_page_converter(YouTubeConverter())
self.register_page_converter(DocxConverter())
self.register_page_converter(XlsxConverter())
self.register_page_converter(PptxConverter())
self.register_page_converter(WavConverter())
self.register_page_converter(Mp3Converter())
self.register_page_converter(ImageConverter())
self.register_page_converter(PdfConverter())
def convert(
self, source: Union[str, requests.Response], **kwargs: Any
) -> DocumentConverterResult: # TODO: deal with kwargs
"""
Args:
- source: can be a string representing a path or url, or a requests.response object
- extension: specifies the file extension to use when interpreting the file. If None, infer from source (path, uri, content-type, etc.)
"""
# Local path or url
if isinstance(source, str):
if source.startswith("http://") or source.startswith("https://") or source.startswith("file://"):
return self.convert_url(source, **kwargs)
else:
return self.convert_local(source, **kwargs)
# Request response
elif isinstance(source, requests.Response):
return self.convert_response(source, **kwargs)
def convert_local(self, path: str, **kwargs: Any) -> DocumentConverterResult: # TODO: deal with kwargs
# Prepare a list of extensions to try (in order of priority)
ext = kwargs.get("file_extension")
extensions = [ext] if ext is not None else []
# Get extension alternatives from the path and puremagic
base, ext = os.path.splitext(path)
self._append_ext(extensions, ext)
self._append_ext(extensions, self._guess_ext_magic(path))
# Convert
return self._convert(path, extensions, **kwargs)
# TODO what should stream's type be?
def convert_stream(self, stream: Any, **kwargs: Any) -> DocumentConverterResult: # TODO: deal with kwargs
# Prepare a list of extensions to try (in order of priority)
ext = kwargs.get("file_extension")
extensions = [ext] if ext is not None else []
# Save the file locally to a temporary file. It will be deleted before this method exits
handle, temp_path = tempfile.mkstemp()
fh = os.fdopen(handle, "wb")
result = None
try:
# Write to the temporary file
content = stream.read()
if isinstance(content, str):
fh.write(content.encode("utf-8"))
else:
fh.write(content)
fh.close()
# Use puremagic to check for more extension options
self._append_ext(extensions, self._guess_ext_magic(temp_path))
# Convert
result = self._convert(temp_path, extensions, **kwargs)
# Clean up
finally:
try:
fh.close()
except Exception:
pass
os.unlink(temp_path)
return result
def convert_url(self, url: str, **kwargs: Any) -> DocumentConverterResult: # TODO: fix kwargs type
# Send a HTTP request to the URL
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0"
response = self._requests_session.get(url, stream=True, headers={"User-Agent": user_agent})
response.raise_for_status()
return self.convert_response(response, **kwargs)
def convert_response(
self, response: requests.Response, **kwargs: Any
) -> DocumentConverterResult: # TODO fix kwargs type
# Prepare a list of extensions to try (in order of priority)
ext = kwargs.get("file_extension")
extensions = [ext] if ext is not None else []
# Guess from the mimetype
content_type = response.headers.get("content-type", "").split(";")[0]
self._append_ext(extensions, mimetypes.guess_extension(content_type))
# Read the content disposition if there is one
content_disposition = response.headers.get("content-disposition", "")
m = re.search(r"filename=([^;]+)", content_disposition)
if m:
base, ext = os.path.splitext(m.group(1).strip("\"'"))
self._append_ext(extensions, ext)
# Read from the extension from the path
base, ext = os.path.splitext(urlparse(response.url).path)
self._append_ext(extensions, ext)
# Save the file locally to a temporary file. It will be deleted before this method exits
handle, temp_path = tempfile.mkstemp()
fh = os.fdopen(handle, "wb")
result = None
try:
# Download the file
for chunk in response.iter_content(chunk_size=512):
fh.write(chunk)
fh.close()
# Use puremagic to check for more extension options
self._append_ext(extensions, self._guess_ext_magic(temp_path))
# Convert
result = self._convert(temp_path, extensions, url=response.url)
except Exception as e:
print(f"Error in converting: {e}")
# Clean up
finally:
try:
fh.close()
except Exception:
pass
os.unlink(temp_path)
return result
def _convert(self, local_path: str, extensions: List[Union[str, None]], **kwargs) -> DocumentConverterResult:
error_trace = ""
for ext in extensions + [None]: # Try last with no extension
for converter in self._page_converters:
_kwargs = copy.deepcopy(kwargs)
# Overwrite file_extension appropriately
if ext is None:
if "file_extension" in _kwargs:
del _kwargs["file_extension"]
else:
_kwargs.update({"file_extension": ext})
# Copy any additional global options
if "mlm_client" not in _kwargs and self._mlm_client is not None:
_kwargs["mlm_client"] = self._mlm_client
if "mlm_model" not in _kwargs and self._mlm_model is not None:
_kwargs["mlm_model"] = self._mlm_model
# If we hit an error log it and keep trying
try:
res = converter.convert(local_path, **_kwargs)
except Exception:
error_trace = ("\n\n" + traceback.format_exc()).strip()
if res is not None:
# Normalize the content
res.text_content = "\n".join([line.rstrip() for line in re.split(r"\r?\n", res.text_content)])
res.text_content = re.sub(r"\n{3,}", "\n\n", res.text_content)
# Todo
return res
# If we got this far without success, report any exceptions
if len(error_trace) > 0:
raise FileConversionException(
f"Could not convert '{local_path}' to Markdown. File type was recognized as {extensions}. While converting the file, the following error was encountered:\n\n{error_trace}"
)
# Nothing can handle it!
raise UnsupportedFormatException(
f"Could not convert '{local_path}' to Markdown. The formats {extensions} are not supported."
)
def _append_ext(self, extensions, ext):
"""Append a unique non-None, non-empty extension to a list of extensions."""
if ext is None:
return
ext = ext.strip()
if ext == "":
return
# if ext not in extensions:
if True:
extensions.append(ext)
def _guess_ext_magic(self, path):
"""Use puremagic (a Python implementation of libmagic) to guess a file's extension based on the first few bytes."""
# Use puremagic to guess
try:
guesses = puremagic.magic_file(path)
if len(guesses) > 0:
ext = guesses[0].extension.strip()
if len(ext) > 0:
return ext
except FileNotFoundError:
pass
except IsADirectoryError:
pass
except PermissionError:
pass
return None
def register_page_converter(self, converter: DocumentConverter) -> None:
"""Register a page text converter."""
self._page_converters.insert(0, converter)

View File

@ -0,0 +1,86 @@
# Shamelessly stolen from Microsoft Autogen team: thanks to them for this great resource!
# https://github.com/microsoft/autogen/blob/gaia_multiagent_v01_march_1st/autogen/browser_utils.py
import copy
from smolagents.models import MessageRole, Model
def prepare_response(original_task: str, inner_messages, reformulation_model: Model) -> str:
messages = [
{
"role": MessageRole.SYSTEM,
"content": [
{
"type": "text",
"text": f"""Earlier you were asked the following:
{original_task}
Your team then worked diligently to address that request. Read below a transcript of that conversation:""",
}
],
}
]
# The first message just repeats the question, so remove it
# if len(inner_messages) > 1:
# del inner_messages[0]
# copy them to this context
try:
for message in inner_messages:
if not message.get("content"):
continue
message = copy.deepcopy(message)
message["role"] = MessageRole.USER
messages.append(message)
except Exception:
messages += [{"role": MessageRole.ASSISTANT, "content": str(inner_messages)}]
# ask for the final answer
messages.append(
{
"role": MessageRole.USER,
"content": [
{
"type": "text",
"text": f"""
Read the above conversation and output a FINAL ANSWER to the question. The question is repeated here for convenience:
{original_task}
To output the final answer, use the following template: FINAL ANSWER: [YOUR FINAL ANSWER]
Your FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings.
ADDITIONALLY, your FINAL ANSWER MUST adhere to any formatting instructions specified in the original question (e.g., alphabetization, sequencing, units, rounding, decimal places, etc.)
If you are asked for a number, express it numerically (i.e., with digits rather than words), don't use commas, and DO NOT INCLUDE UNITS such as $ or USD or percent signs unless specified otherwise.
If you are asked for a string, don't use articles or abbreviations (e.g. for cities), unless specified otherwise. Don't output any final sentence punctuation such as '.', '!', or '?'.
If you are asked for a comma separated list, apply the above rules depending on whether the elements are numbers or strings.
If you are unable to determine the final answer, output 'FINAL ANSWER: Unable to determine'
""",
}
],
}
)
response = reformulation_model(messages).content
final_answer = response.split("FINAL ANSWER: ")[-1].strip()
print("> Reformulated answer: ", final_answer)
# if "unable to determine" in final_answer.lower():
# messages.append({"role": MessageRole.ASSISTANT, "content": response })
# messages.append({"role": MessageRole.USER, "content": [{"type": "text", "text": """
# I understand that a definitive answer could not be determined. Please make a well-informed EDUCATED GUESS based on the conversation.
# To output the educated guess, use the following template: EDUCATED GUESS: [YOUR EDUCATED GUESS]
# Your EDUCATED GUESS should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. DO NOT OUTPUT 'I don't know', 'Unable to determine', etc.
# ADDITIONALLY, your EDUCATED GUESS MUST adhere to any formatting instructions specified in the original question (e.g., alphabetization, sequencing, units, rounding, decimal places, etc.)
# If you are asked for a number, express it numerically (i.e., with digits rather than words), don't use commas, and don't include units such as $ or percent signs unless specified otherwise.
# If you are asked for a string, don't use articles or abbreviations (e.g. cit for cities), unless specified otherwise. Don't output any final sentence punctuation such as '.', '!', or '?'.
# If you are asked for a comma separated list, apply the above rules depending on whether the elements are numbers or strings.
# """.strip()}]})
# response = model(messages).content
# print("\n>>>Making an educated guess.\n", response)
# final_answer = response.split("EDUCATED GUESS: ")[-1].strip()
return final_answer

View File

@ -0,0 +1,87 @@
import json
import os
import shutil
import textwrap
from pathlib import Path
# import tqdm.asyncio
from smolagents.utils import AgentError
def serialize_agent_error(obj):
if isinstance(obj, AgentError):
return {"error_type": obj.__class__.__name__, "message": obj.message}
else:
return str(obj)
def get_image_description(file_name: str, question: str, visual_inspection_tool) -> str:
prompt = f"""Write a caption of 5 sentences for this image. Pay special attention to any details that might be useful for someone answering the following question:
{question}. But do not try to answer the question directly!
Do not add any information that is not present in the image."""
return visual_inspection_tool(image_path=file_name, question=prompt)
def get_document_description(file_path: str, question: str, document_inspection_tool) -> str:
prompt = f"""Write a caption of 5 sentences for this document. Pay special attention to any details that might be useful for someone answering the following question:
{question}. But do not try to answer the question directly!
Do not add any information that is not present in the document."""
return document_inspection_tool.forward_initial_exam_mode(file_path=file_path, question=prompt)
def get_single_file_description(file_path: str, question: str, visual_inspection_tool, document_inspection_tool):
file_extension = file_path.split(".")[-1]
if file_extension in ["png", "jpg", "jpeg"]:
file_description = f" - Attached image: {file_path}"
file_description += (
f"\n -> Image description: {get_image_description(file_path, question, visual_inspection_tool)}"
)
return file_description
elif file_extension in ["pdf", "xls", "xlsx", "docx", "doc", "xml"]:
file_description = f" - Attached document: {file_path}"
image_path = file_path.split(".")[0] + ".png"
if os.path.exists(image_path):
description = get_image_description(image_path, question, visual_inspection_tool)
else:
description = get_document_description(file_path, question, document_inspection_tool)
file_description += f"\n -> File description: {description}"
return file_description
elif file_extension in ["mp3", "m4a", "wav"]:
return f" - Attached audio: {file_path}"
else:
return f" - Attached file: {file_path}"
def get_zip_description(file_path: str, question: str, visual_inspection_tool, document_inspection_tool):
folder_path = file_path.replace(".zip", "")
os.makedirs(folder_path, exist_ok=True)
shutil.unpack_archive(file_path, folder_path)
prompt_use_files = ""
for root, dirs, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(root, file)
prompt_use_files += "\n" + textwrap.indent(
get_single_file_description(file_path, question, visual_inspection_tool, document_inspection_tool),
prefix=" ",
)
return prompt_use_files
def get_tasks_to_run(data, total: int, base_filename: Path, tasks_ids: list[int]):
f = base_filename.parent / f"{base_filename.stem}_answers.jsonl"
done = set()
if f.exists():
with open(f, encoding="utf-8") as fh:
done = {json.loads(line)["task_id"] for line in fh if line.strip()}
tasks = []
for i in range(total):
task_id = int(data[i]["task_id"])
if task_id not in done:
if tasks_ids is not None:
if task_id in tasks_ids:
tasks.append(data[i])
else:
tasks.append(data[i])
return tasks

View File

@ -0,0 +1,122 @@
from typing import Optional
from smolagents import Tool
from smolagents.models import MessageRole, Model
from .mdconvert import MarkdownConverter
class TextInspectorTool(Tool):
name = "inspect_file_as_text"
description = """
You cannot load files yourself: instead call this tool to read a file as markdown text and ask questions about it.
This tool handles the following file extensions: [".html", ".htm", ".xlsx", ".pptx", ".wav", ".mp3", ".flac", ".pdf", ".docx"], and all other types of text files. IT DOES NOT HANDLE IMAGES."""
inputs = {
"file_path": {
"description": "The path to the file you want to read as text. Must be a '.something' file, like '.pdf'. If it is an image, use the visualizer tool instead! DO NOT use this tool for an HTML webpage: use the web_search tool instead!",
"type": "string",
},
"question": {
"description": "[Optional]: Your question, as a natural language sentence. Provide as much context as possible. Do not pass this parameter if you just want to directly return the content of the file.",
"type": "string",
"nullable": True,
},
}
output_type = "string"
md_converter = MarkdownConverter()
def __init__(self, model: Model, text_limit: int):
super().__init__()
self.model = model
self.text_limit = text_limit
def forward_initial_exam_mode(self, file_path, question):
result = self.md_converter.convert(file_path)
if file_path[-4:] in [".png", ".jpg"]:
raise Exception("Cannot use inspect_file_as_text tool with images: use visualizer instead!")
if ".zip" in file_path:
return result.text_content
if not question:
return result.text_content
if len(result.text_content) < 4000:
return "Document content: " + result.text_content
messages = [
{
"role": MessageRole.SYSTEM,
"content": [
{
"type": "text",
"text": "Here is a file:\n### "
+ str(result.title)
+ "\n\n"
+ result.text_content[: self.text_limit],
}
],
},
{
"role": MessageRole.USER,
"content": [
{
"type": "text",
"text": "Now please write a short, 5 sentence caption for this document, that could help someone asking this question: "
+ question
+ "\n\nDon't answer the question yourself! Just provide useful notes on the document",
}
],
},
]
return self.model(messages).content
def forward(self, file_path, question: Optional[str] = None) -> str:
result = self.md_converter.convert(file_path)
if file_path[-4:] in [".png", ".jpg"]:
raise Exception("Cannot use inspect_file_as_text tool with images: use visualizer instead!")
if ".zip" in file_path:
return result.text_content
if not question:
return result.text_content
messages = [
{
"role": MessageRole.SYSTEM,
"content": [
{
"type": "text",
"text": "You will have to write a short caption for this file, then answer this question:"
+ question,
}
],
},
{
"role": MessageRole.USER,
"content": [
{
"type": "text",
"text": "Here is the complete file:\n### "
+ str(result.title)
+ "\n\n"
+ result.text_content[: self.text_limit],
}
],
},
{
"role": MessageRole.USER,
"content": [
{
"type": "text",
"text": "Now answer the question below. Use these three headings: '1. Short answer', '2. Extremely detailed answer', '3. Additional Context on the document and question asked'."
+ question,
}
],
},
]
return self.model(messages).content

View File

@ -0,0 +1,563 @@
# Shamelessly stolen from Microsoft Autogen team: thanks to them for this great resource!
# https://github.com/microsoft/autogen/blob/gaia_multiagent_v01_march_1st/autogen/browser_utils.py
import mimetypes
import os
import pathlib
import re
import time
import uuid
from typing import Any, Dict, List, Optional, Tuple, Union
from urllib.parse import unquote, urljoin, urlparse
import pathvalidate
import requests
from serpapi import GoogleSearch
from smolagents import Tool
from .cookies import COOKIES
from .mdconvert import FileConversionException, MarkdownConverter, UnsupportedFormatException
class SimpleTextBrowser:
"""(In preview) An extremely simple text-based web browser comparable to Lynx. Suitable for Agentic use."""
def __init__(
self,
start_page: Optional[str] = None,
viewport_size: Optional[int] = 1024 * 8,
downloads_folder: Optional[Union[str, None]] = None,
serpapi_key: Optional[Union[str, None]] = None,
request_kwargs: Optional[Union[Dict[str, Any], None]] = None,
):
self.start_page: str = start_page if start_page else "about:blank"
self.viewport_size = viewport_size # Applies only to the standard uri types
self.downloads_folder = downloads_folder
self.history: List[Tuple[str, float]] = list()
self.page_title: Optional[str] = None
self.viewport_current_page = 0
self.viewport_pages: List[Tuple[int, int]] = list()
self.set_address(self.start_page)
self.serpapi_key = serpapi_key
self.request_kwargs = request_kwargs
self.request_kwargs["cookies"] = COOKIES
self._mdconvert = MarkdownConverter()
self._page_content: str = ""
self._find_on_page_query: Union[str, None] = None
self._find_on_page_last_result: Union[int, None] = None # Location of the last result
@property
def address(self) -> str:
"""Return the address of the current page."""
return self.history[-1][0]
def set_address(self, uri_or_path: str, filter_year: Optional[int] = None) -> None:
# TODO: Handle anchors
self.history.append((uri_or_path, time.time()))
# Handle special URIs
if uri_or_path == "about:blank":
self._set_page_content("")
elif uri_or_path.startswith("google:"):
self._serpapi_search(uri_or_path[len("google:") :].strip(), filter_year=filter_year)
else:
if (
not uri_or_path.startswith("http:")
and not uri_or_path.startswith("https:")
and not uri_or_path.startswith("file:")
):
if len(self.history) > 1:
prior_address = self.history[-2][0]
uri_or_path = urljoin(prior_address, uri_or_path)
# Update the address with the fully-qualified path
self.history[-1] = (uri_or_path, self.history[-1][1])
self._fetch_page(uri_or_path)
self.viewport_current_page = 0
self.find_on_page_query = None
self.find_on_page_viewport = None
@property
def viewport(self) -> str:
"""Return the content of the current viewport."""
bounds = self.viewport_pages[self.viewport_current_page]
return self.page_content[bounds[0] : bounds[1]]
@property
def page_content(self) -> str:
"""Return the full contents of the current page."""
return self._page_content
def _set_page_content(self, content: str) -> None:
"""Sets the text content of the current page."""
self._page_content = content
self._split_pages()
if self.viewport_current_page >= len(self.viewport_pages):
self.viewport_current_page = len(self.viewport_pages) - 1
def page_down(self) -> None:
self.viewport_current_page = min(self.viewport_current_page + 1, len(self.viewport_pages) - 1)
def page_up(self) -> None:
self.viewport_current_page = max(self.viewport_current_page - 1, 0)
def find_on_page(self, query: str) -> Union[str, None]:
"""Searches for the query from the current viewport forward, looping back to the start if necessary."""
# Did we get here via a previous find_on_page search with the same query?
# If so, map to find_next
if query == self._find_on_page_query and self.viewport_current_page == self._find_on_page_last_result:
return self.find_next()
# Ok it's a new search start from the current viewport
self._find_on_page_query = query
viewport_match = self._find_next_viewport(query, self.viewport_current_page)
if viewport_match is None:
self._find_on_page_last_result = None
return None
else:
self.viewport_current_page = viewport_match
self._find_on_page_last_result = viewport_match
return self.viewport
def find_next(self) -> Union[str, None]:
"""Scroll to the next viewport that matches the query"""
if self._find_on_page_query is None:
return None
starting_viewport = self._find_on_page_last_result
if starting_viewport is None:
starting_viewport = 0
else:
starting_viewport += 1
if starting_viewport >= len(self.viewport_pages):
starting_viewport = 0
viewport_match = self._find_next_viewport(self._find_on_page_query, starting_viewport)
if viewport_match is None:
self._find_on_page_last_result = None
return None
else:
self.viewport_current_page = viewport_match
self._find_on_page_last_result = viewport_match
return self.viewport
def _find_next_viewport(self, query: str, starting_viewport: int) -> Union[int, None]:
"""Search for matches between the starting viewport looping when reaching the end."""
if query is None:
return None
# Normalize the query, and convert to a regular expression
nquery = re.sub(r"\*", "__STAR__", query)
nquery = " " + (" ".join(re.split(r"\W+", nquery))).strip() + " "
nquery = nquery.replace(" __STAR__ ", "__STAR__ ") # Merge isolated stars with prior word
nquery = nquery.replace("__STAR__", ".*").lower()
if nquery.strip() == "":
return None
idxs = list()
idxs.extend(range(starting_viewport, len(self.viewport_pages)))
idxs.extend(range(0, starting_viewport))
for i in idxs:
bounds = self.viewport_pages[i]
content = self.page_content[bounds[0] : bounds[1]]
# TODO: Remove markdown links and images
ncontent = " " + (" ".join(re.split(r"\W+", content))).strip().lower() + " "
if re.search(nquery, ncontent):
return i
return None
def visit_page(self, path_or_uri: str, filter_year: Optional[int] = None) -> str:
"""Update the address, visit the page, and return the content of the viewport."""
self.set_address(path_or_uri, filter_year=filter_year)
return self.viewport
def _split_pages(self) -> None:
# Do not split search results
if self.address.startswith("google:"):
self.viewport_pages = [(0, len(self._page_content))]
return
# Handle empty pages
if len(self._page_content) == 0:
self.viewport_pages = [(0, 0)]
return
# Break the viewport into pages
self.viewport_pages = []
start_idx = 0
while start_idx < len(self._page_content):
end_idx = min(start_idx + self.viewport_size, len(self._page_content)) # type: ignore[operator]
# Adjust to end on a space
while end_idx < len(self._page_content) and self._page_content[end_idx - 1] not in [" ", "\t", "\r", "\n"]:
end_idx += 1
self.viewport_pages.append((start_idx, end_idx))
start_idx = end_idx
def _serpapi_search(self, query: str, filter_year: Optional[int] = None) -> None:
if self.serpapi_key is None:
raise ValueError("Missing SerpAPI key.")
params = {
"engine": "google",
"q": query,
"api_key": self.serpapi_key,
}
if filter_year is not None:
params["tbs"] = f"cdr:1,cd_min:01/01/{filter_year},cd_max:12/31/{filter_year}"
search = GoogleSearch(params)
results = search.get_dict()
self.page_title = f"{query} - Search"
if "organic_results" not in results.keys():
raise Exception(f"No results found for query: '{query}'. Use a less specific query.")
if len(results["organic_results"]) == 0:
year_filter_message = f" with filter year={filter_year}" if filter_year is not None else ""
self._set_page_content(
f"No results found for '{query}'{year_filter_message}. Try with a more general query, or remove the year filter."
)
return
def _prev_visit(url):
for i in range(len(self.history) - 1, -1, -1):
if self.history[i][0] == url:
return f"You previously visited this page {round(time.time() - self.history[i][1])} seconds ago.\n"
return ""
web_snippets: List[str] = list()
idx = 0
if "organic_results" in results:
for page in results["organic_results"]:
idx += 1
date_published = ""
if "date" in page:
date_published = "\nDate published: " + page["date"]
source = ""
if "source" in page:
source = "\nSource: " + page["source"]
snippet = ""
if "snippet" in page:
snippet = "\n" + page["snippet"]
redacted_version = f"{idx}. [{page['title']}]({page['link']}){date_published}{source}\n{_prev_visit(page['link'])}{snippet}"
redacted_version = redacted_version.replace("Your browser can't play this video.", "")
web_snippets.append(redacted_version)
content = (
f"A Google search for '{query}' found {len(web_snippets)} results:\n\n## Web Results\n"
+ "\n\n".join(web_snippets)
)
self._set_page_content(content)
def _fetch_page(self, url: str) -> None:
download_path = ""
try:
if url.startswith("file://"):
download_path = os.path.normcase(os.path.normpath(unquote(url[7:])))
res = self._mdconvert.convert_local(download_path)
self.page_title = res.title
self._set_page_content(res.text_content)
else:
# Prepare the request parameters
request_kwargs = self.request_kwargs.copy() if self.request_kwargs is not None else {}
request_kwargs["stream"] = True
# Send a HTTP request to the URL
response = requests.get(url, **request_kwargs)
response.raise_for_status()
# If the HTTP request was successful
content_type = response.headers.get("content-type", "")
# Text or HTML
if "text/" in content_type.lower():
res = self._mdconvert.convert_response(response)
self.page_title = res.title
self._set_page_content(res.text_content)
# A download
else:
# Try producing a safe filename
fname = None
download_path = None
try:
fname = pathvalidate.sanitize_filename(os.path.basename(urlparse(url).path)).strip()
download_path = os.path.abspath(os.path.join(self.downloads_folder, fname))
suffix = 0
while os.path.exists(download_path) and suffix < 1000:
suffix += 1
base, ext = os.path.splitext(fname)
new_fname = f"{base}__{suffix}{ext}"
download_path = os.path.abspath(os.path.join(self.downloads_folder, new_fname))
except NameError:
pass
# No suitable name, so make one
if fname is None:
extension = mimetypes.guess_extension(content_type)
if extension is None:
extension = ".download"
fname = str(uuid.uuid4()) + extension
download_path = os.path.abspath(os.path.join(self.downloads_folder, fname))
# Open a file for writing
with open(download_path, "wb") as fh:
for chunk in response.iter_content(chunk_size=512):
fh.write(chunk)
# Render it
local_uri = pathlib.Path(download_path).as_uri()
self.set_address(local_uri)
except UnsupportedFormatException as e:
print(e)
self.page_title = ("Download complete.",)
self._set_page_content(f"# Download complete\n\nSaved file to '{download_path}'")
except FileConversionException as e:
print(e)
self.page_title = ("Download complete.",)
self._set_page_content(f"# Download complete\n\nSaved file to '{download_path}'")
except FileNotFoundError:
self.page_title = "Error 404"
self._set_page_content(f"## Error 404\n\nFile not found: {download_path}")
except requests.exceptions.RequestException as request_exception:
try:
self.page_title = f"Error {response.status_code}"
# If the error was rendered in HTML we might as well render it
content_type = response.headers.get("content-type", "")
if content_type is not None and "text/html" in content_type.lower():
res = self._mdconvert.convert(response)
self.page_title = f"Error {response.status_code}"
self._set_page_content(f"## Error {response.status_code}\n\n{res.text_content}")
else:
text = ""
for chunk in response.iter_content(chunk_size=512, decode_unicode=True):
text += chunk
self.page_title = f"Error {response.status_code}"
self._set_page_content(f"## Error {response.status_code}\n\n{text}")
except NameError:
self.page_title = "Error"
self._set_page_content(f"## Error\n\n{str(request_exception)}")
def _state(self) -> Tuple[str, str]:
header = f"Address: {self.address}\n"
if self.page_title is not None:
header += f"Title: {self.page_title}\n"
current_page = self.viewport_current_page
total_pages = len(self.viewport_pages)
address = self.address
for i in range(len(self.history) - 2, -1, -1): # Start from the second last
if self.history[i][0] == address:
header += f"You previously visited this page {round(time.time() - self.history[i][1])} seconds ago.\n"
break
header += f"Viewport position: Showing page {current_page + 1} of {total_pages}.\n"
return (header, self.viewport)
class SearchInformationTool(Tool):
name = "web_search"
description = "Perform a web search query (think a google search) and returns the search results."
inputs = {"query": {"type": "string", "description": "The web search query to perform."}}
inputs["filter_year"] = {
"type": "string",
"description": "[Optional parameter]: filter the search results to only include pages from a specific year. For example, '2020' will only include pages from 2020. Make sure to use this parameter if you're trying to search for articles from a specific date!",
"nullable": True,
}
output_type = "string"
def __init__(self, browser):
super().__init__()
self.browser = browser
def forward(self, query: str, filter_year: Optional[int] = None) -> str:
self.browser.visit_page(f"google: {query}", filter_year=filter_year)
header, content = self.browser._state()
return header.strip() + "\n=======================\n" + content
class VisitTool(Tool):
name = "visit_page"
description = "Visit a webpage at a given URL and return its text. Given a url to a YouTube video, this returns the transcript."
inputs = {"url": {"type": "string", "description": "The relative or absolute url of the webapge to visit."}}
output_type = "string"
def __init__(self, browser):
super().__init__()
self.browser = browser
def forward(self, url: str) -> str:
self.browser.visit_page(url)
header, content = self.browser._state()
return header.strip() + "\n=======================\n" + content
class DownloadTool(Tool):
name = "download_file"
description = """
Download a file at a given URL. The file should be of this format: [".xlsx", ".pptx", ".wav", ".mp3", ".png", ".docx"]
After using this tool, for further inspection of this page you should return the download path to your manager via final_answer, and they will be able to inspect it.
DO NOT use this tool for .pdf or .txt or .htm files: for these types of files use visit_page with the file url instead."""
inputs = {"url": {"type": "string", "description": "The relative or absolute url of the file to be downloaded."}}
output_type = "string"
def __init__(self, browser):
super().__init__()
self.browser = browser
def forward(self, url: str) -> str:
if "arxiv" in url:
url = url.replace("abs", "pdf")
response = requests.get(url)
content_type = response.headers.get("content-type", "")
extension = mimetypes.guess_extension(content_type)
if extension and isinstance(extension, str):
new_path = f"./downloads/file{extension}"
else:
new_path = "./downloads/file.object"
with open(new_path, "wb") as f:
f.write(response.content)
if "pdf" in extension or "txt" in extension or "htm" in extension:
raise Exception("Do not use this tool for pdf or txt or html files: use visit_page instead.")
return f"File was downloaded and saved under path {new_path}."
class ArchiveSearchTool(Tool):
name = "find_archived_url"
description = "Given a url, searches the Wayback Machine and returns the archived version of the url that's closest in time to the desired date."
inputs = {
"url": {"type": "string", "description": "The url you need the archive for."},
"date": {
"type": "string",
"description": "The date that you want to find the archive for. Give this date in the format 'YYYYMMDD', for instance '27 June 2008' is written as '20080627'.",
},
}
output_type = "string"
def __init__(self, browser):
super().__init__()
self.browser = browser
def forward(self, url, date) -> str:
no_timestamp_url = f"https://archive.org/wayback/available?url={url}"
archive_url = no_timestamp_url + f"&timestamp={date}"
response = requests.get(archive_url).json()
response_notimestamp = requests.get(no_timestamp_url).json()
if "archived_snapshots" in response and "closest" in response["archived_snapshots"]:
closest = response["archived_snapshots"]["closest"]
print("Archive found!", closest)
elif "archived_snapshots" in response_notimestamp and "closest" in response_notimestamp["archived_snapshots"]:
closest = response_notimestamp["archived_snapshots"]["closest"]
print("Archive found!", closest)
else:
raise Exception(f"Your {url=} was not archived on Wayback Machine, try a different url.")
target_url = closest["url"]
self.browser.visit_page(target_url)
header, content = self.browser._state()
return (
f"Web archive for url {url}, snapshot taken at date {closest['timestamp'][:8]}:\n"
+ header.strip()
+ "\n=======================\n"
+ content
)
class PageUpTool(Tool):
name = "page_up"
description = "Scroll the viewport UP one page-length in the current webpage and return the new viewport content."
inputs = {}
output_type = "string"
def __init__(self, browser):
super().__init__()
self.browser = browser
def forward(self) -> str:
self.browser.page_up()
header, content = self.browser._state()
return header.strip() + "\n=======================\n" + content
class PageDownTool(Tool):
name = "page_down"
description = (
"Scroll the viewport DOWN one page-length in the current webpage and return the new viewport content."
)
inputs = {}
output_type = "string"
def __init__(self, browser):
super().__init__()
self.browser = browser
def forward(self) -> str:
self.browser.page_down()
header, content = self.browser._state()
return header.strip() + "\n=======================\n" + content
class FinderTool(Tool):
name = "find_on_page_ctrl_f"
description = "Scroll the viewport to the first occurrence of the search string. This is equivalent to Ctrl+F."
inputs = {
"search_string": {
"type": "string",
"description": "The string to search for on the page. This search string supports wildcards like '*'",
}
}
output_type = "string"
def __init__(self, browser):
super().__init__()
self.browser = browser
def forward(self, search_string: str) -> str:
find_result = self.browser.find_on_page(search_string)
header, content = self.browser._state()
if find_result is None:
return (
header.strip()
+ f"\n=======================\nThe search string '{search_string}' was not found on this page."
)
else:
return header.strip() + "\n=======================\n" + content
class FindNextTool(Tool):
name = "find_next"
description = "Scroll the viewport to next occurrence of the search string. This is equivalent to finding the next match in a Ctrl+F search."
inputs = {}
output_type = "string"
def __init__(self, browser):
super().__init__()
self.browser = browser
def forward(self) -> str:
find_result = self.browser.find_next()
header, content = self.browser._state()
if find_result is None:
return header.strip() + "\n=======================\nThe search string was not found on this page."
else:
return header.strip() + "\n=======================\n" + content

View File

@ -0,0 +1,187 @@
import base64
import json
import mimetypes
import os
import uuid
from io import BytesIO
from typing import Optional
import requests
from dotenv import load_dotenv
from huggingface_hub import InferenceClient
from PIL import Image
from transformers import AutoProcessor
from smolagents import Tool, tool
load_dotenv(override=True)
idefics_processor = AutoProcessor.from_pretrained("HuggingFaceM4/idefics2-8b-chatty")
def process_images_and_text(image_path, query, client):
messages = [
{
"role": "user",
"content": [
{"type": "image"},
{"type": "text", "text": query},
],
},
]
prompt_with_template = idefics_processor.apply_chat_template(messages, add_generation_prompt=True)
# load images from local directory
# encode images to strings which can be sent to the endpoint
def encode_local_image(image_path):
# load image
image = Image.open(image_path).convert("RGB")
# Convert the image to a base64 string
buffer = BytesIO()
image.save(buffer, format="JPEG") # Use the appropriate format (e.g., JPEG, PNG)
base64_image = base64.b64encode(buffer.getvalue()).decode("utf-8")
# add string formatting required by the endpoint
image_string = f"data:image/jpeg;base64,{base64_image}"
return image_string
image_string = encode_local_image(image_path)
prompt_with_images = prompt_with_template.replace("<image>", "![]({}) ").format(image_string)
payload = {
"inputs": prompt_with_images,
"parameters": {
"return_full_text": False,
"max_new_tokens": 200,
},
}
return json.loads(client.post(json=payload).decode())[0]
# Function to encode the image
def encode_image(image_path):
if image_path.startswith("http"):
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0"
request_kwargs = {
"headers": {"User-Agent": user_agent},
"stream": True,
}
# Send a HTTP request to the URL
response = requests.get(image_path, **request_kwargs)
response.raise_for_status()
content_type = response.headers.get("content-type", "")
extension = mimetypes.guess_extension(content_type)
if extension is None:
extension = ".download"
fname = str(uuid.uuid4()) + extension
download_path = os.path.abspath(os.path.join("downloads", fname))
with open(download_path, "wb") as fh:
for chunk in response.iter_content(chunk_size=512):
fh.write(chunk)
image_path = download_path
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}"}
def resize_image(image_path):
img = Image.open(image_path)
width, height = img.size
img = img.resize((int(width / 2), int(height / 2)))
new_image_path = f"resized_{image_path}"
img.save(new_image_path)
return new_image_path
class VisualQATool(Tool):
name = "visualizer"
description = "A tool that can answer questions about attached images."
inputs = {
"image_path": {
"description": "The path to the image on which to answer the question",
"type": "string",
},
"question": {"description": "the question to answer", "type": "string", "nullable": True},
}
output_type = "string"
client = InferenceClient("HuggingFaceM4/idefics2-8b-chatty")
def forward(self, image_path: str, question: Optional[str] = None) -> str:
output = ""
add_note = False
if not question:
add_note = True
question = "Please write a detailed caption for this image."
try:
output = process_images_and_text(image_path, question, self.client)
except Exception as e:
print(e)
if "Payload Too Large" in str(e):
new_image_path = resize_image(image_path)
output = process_images_and_text(new_image_path, question, self.client)
if add_note:
output = (
f"You did not provide a particular question, so here is a detailed caption for the image: {output}"
)
return output
@tool
def visualizer(image_path: str, question: Optional[str] = None) -> str:
"""A tool that can answer questions about attached images.
Args:
image_path: The path to the image on which to answer the question. This should be a local path to downloaded image.
question: The question to answer.
"""
add_note = False
if not question:
add_note = True
question = "Please write a detailed caption for this image."
if not isinstance(image_path, str):
raise Exception("You should provide at least `image_path` string argument to this tool!")
mime_type, _ = mimetypes.guess_type(image_path)
base64_image = encode_image(image_path)
payload = {
"model": "gpt-4o",
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": question},
{"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{base64_image}"}},
],
}
],
"max_tokens": 1000,
}
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
try:
output = response.json()["choices"][0]["message"]["content"]
except Exception:
raise Exception(f"Response format unexpected: {response.json()}")
if add_note:
output = f"You did not provide a particular question, so here is a detailed caption for the image: {output}"
return output

View File

@ -0,0 +1,350 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install \"smolagents[litellm]\" -q"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import datasets\n",
"\n",
"\n",
"eval_ds = datasets.load_dataset(\"gaia-benchmark/GAIA\", \"2023_all\")[\"validation\"]"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"to_keep = [\n",
" \"What's the last line of the rhyme under the flavor\",\n",
" 'Of the authors (First M. Last) that worked on the paper \"Pie Menus or Linear Menus',\n",
" \"In Series 9, Episode 11 of Doctor Who, the Doctor is trapped inside an ever-shifting maze. What is this location called in the official script for the episode? Give the setting exactly as it appears in the first scene heading.\",\n",
" \"Which contributor to the version of OpenCV where support was added for the Mask-RCNN model has the same name as a former Chinese head of government when the names are transliterated to the Latin alphabet?\",\n",
" \"The photograph in the Whitney Museum of American Art's collection with accession number 2022.128 shows a person holding a book. Which military unit did the author of this book join in 1813? Answer without using articles.\",\n",
" \"I went to Virtue restaurant & bar in Chicago for my birthday on March 22, 2021 and the main course I had was delicious! Unfortunately, when I went back about a month later on April 21, it was no longer on the dinner menu.\",\n",
" \"In Emily Midkiff's June 2014 article in a journal named for the one of Hreidmar's \",\n",
" \"Under DDC 633 on Bielefeld University Library's BASE, as of 2020\",\n",
" \"In the 2018 VSCode blog post on replit.com, what was the command they clicked on in the last video to remove extra lines?\",\n",
" \"The Metropolitan Museum of Art has a portrait in its collection with an accession number of 29.100.5. Of the consecrators and co-consecrators\",\n",
" \"In Nature journal's Scientific Reports conference proceedings from 2012, in the article that did not mention plasmons or plasmonics, what nano-compound is studied?\",\n",
" 'In the year 2022, and before December, what does \"R\" stand for in the three core policies of the type of content',\n",
" \"Who nominated the only Featured Article on English Wikipedia about a dinosaur that was promoted in November 2016?\",\n",
"]\n",
"eval_ds = eval_ds.filter(lambda row: any([el in row[\"Question\"] for el in to_keep]))\n",
"eval_ds = eval_ds.rename_columns({\"Question\": \"question\", \"Final answer\": \"true_answer\", \"Level\": \"task\"})"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"from dotenv import load_dotenv\n",
"from huggingface_hub import login\n",
"\n",
"\n",
"load_dotenv(override=True)\n",
"\n",
"login(os.getenv(\"HF_TOKEN\"))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Text browser"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from scripts.run_agents import answer_questions\n",
"from scripts.text_inspector_tool import TextInspectorTool\n",
"from scripts.text_web_browser import (\n",
" ArchiveSearchTool,\n",
" FinderTool,\n",
" FindNextTool,\n",
" NavigationalSearchTool,\n",
" PageDownTool,\n",
" PageUpTool,\n",
" SearchInformationTool,\n",
" VisitTool,\n",
")\n",
"from scripts.visual_qa import VisualQAGPT4Tool\n",
"\n",
"from smolagents import CodeAgent, LiteLLMModel\n",
"\n",
"\n",
"proprietary_model = LiteLLMModel(\"gpt-4o\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"### BUILD AGENTS & TOOLS\n",
"\n",
"WEB_TOOLS = [\n",
" SearchInformationTool(),\n",
" NavigationalSearchTool(),\n",
" VisitTool(),\n",
" PageUpTool(),\n",
" PageDownTool(),\n",
" FinderTool(),\n",
" FindNextTool(),\n",
" ArchiveSearchTool(),\n",
"]\n",
"\n",
"\n",
"surfer_agent = CodeAgent(\n",
" model=proprietary_model,\n",
" tools=WEB_TOOLS,\n",
" max_steps=20,\n",
" verbosity_level=2,\n",
")\n",
"\n",
"results_text = answer_questions(\n",
" eval_ds,\n",
" surfer_agent,\n",
" \"code_gpt4o_27-01_text\",\n",
" reformulation_model=proprietary_model,\n",
" output_folder=\"output_browsers\",\n",
" visual_inspection_tool=VisualQAGPT4Tool(),\n",
" text_inspector_tool=TextInspectorTool(proprietary_model, 40000),\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Vision browser"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install helium -q"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from scripts.visual_qa import VisualQAGPT4Tool\n",
"\n",
"from smolagents import CodeAgent, DuckDuckGoSearchTool, LiteLLMModel\n",
"from smolagents.vision_web_browser import (\n",
" close_popups,\n",
" go_back,\n",
" helium_instructions,\n",
" initialize_agent,\n",
" save_screenshot,\n",
" search_item_ctrl_f,\n",
")\n",
"\n",
"\n",
"proprietary_model = LiteLLMModel(\"gpt-4o\")\n",
"vision_browser_agent = initialize_agent(proprietary_model)\n",
"### BUILD AGENTS & TOOLS\n",
"\n",
"CodeAgent(\n",
" tools=[DuckDuckGoSearchTool(), go_back, close_popups, search_item_ctrl_f],\n",
" model=proprietary_model,\n",
" additional_authorized_imports=[\"helium\"],\n",
" step_callbacks=[save_screenshot],\n",
" max_steps=20,\n",
" verbosity_level=2,\n",
")\n",
"\n",
"results_vision = answer_questions(\n",
" eval_ds,\n",
" vision_browser_agent,\n",
" \"code_gpt4o_27-01_vision\",\n",
" reformulation_model=proprietary_model,\n",
" output_folder=\"output_browsers\",\n",
" visual_inspection_tool=VisualQAGPT4Tool(),\n",
" text_inspector_tool=TextInspectorTool(proprietary_model, 40000),\n",
" postprompt=helium_instructions\n",
" + \"Any web browser controls won't work on .pdf urls, rather use the tool 'inspect_file_as_text' to read them\",\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Browser-use browser"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install browser-use lxml_html_clean -q\n",
"!playwright install"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"\n",
"import nest_asyncio\n",
"\n",
"\n",
"nest_asyncio.apply()\n",
"\n",
"from browser_use import Agent\n",
"from dotenv import load_dotenv\n",
"from langchain_openai import ChatOpenAI\n",
"\n",
"\n",
"load_dotenv()\n",
"\n",
"\n",
"class BrowserUseAgent:\n",
" logs = []\n",
"\n",
" def write_inner_memory_from_logs(self, summary_mode):\n",
" return self.results\n",
"\n",
" def run(self, task, **kwargs):\n",
" agent = Agent(\n",
" task=task,\n",
" llm=ChatOpenAI(model=\"gpt-4o\"),\n",
" )\n",
" self.results = asyncio.get_event_loop().run_until_complete(agent.run())\n",
" return self.results.history[-1].result[0].extracted_content\n",
"\n",
"\n",
"browser_use_agent = BrowserUseAgent()\n",
"\n",
"results_browseruse = answer_questions(\n",
" eval_ds,\n",
" browser_use_agent,\n",
" \"gpt-4o_27-01_browseruse\",\n",
" reformulation_model=proprietary_model,\n",
" output_folder=\"output_browsers\",\n",
" visual_inspection_tool=VisualQAGPT4Tool(),\n",
" text_inspector_tool=TextInspectorTool(proprietary_model, 40000),\n",
" postprompt=\"\",\n",
" run_simple=True,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Get results"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"from scripts.gaia_scorer import question_scorer\n",
"\n",
"\n",
"results_vision, results_text, results_browseruse = (\n",
" pd.DataFrame(results_vision),\n",
" pd.DataFrame(results_text),\n",
" pd.DataFrame(results_browseruse),\n",
")\n",
"\n",
"results_vision[\"is_correct\"] = results_vision.apply(\n",
" lambda x: question_scorer(x[\"prediction\"], x[\"true_answer\"]), axis=1\n",
")\n",
"results_text[\"is_correct\"] = results_text.apply(lambda x: question_scorer(x[\"prediction\"], x[\"true_answer\"]), axis=1)\n",
"results_browseruse[\"is_correct\"] = results_browseruse.apply(\n",
" lambda x: question_scorer(x[\"prediction\"], x[\"true_answer\"]), axis=1\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"results = pd.concat([results_vision, results_text, results_browseruse])\n",
"results.groupby(\"agent_name\")[\"is_correct\"].mean()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"correct_vision_results = results_vision.loc[results_vision[\"is_correct\"]]\n",
"correct_vision_results"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"false_text_results = results_text.loc[~results_text[\"is_correct\"]]\n",
"false_text_results"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "gaia",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.0"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -143,6 +143,7 @@ class MultiStepAgent:
name (`str`, *optional*): Necessary for a managed agent only - the name by which this agent can be called.
description (`str`, *optional*): Necessary for a managed agent only - the description of this agent.
managed_agent_prompt (`str`, *optional*): Custom prompt for the managed agent. Defaults to None.
provide_run_summary (`bool`, *optional*): Wether to provide a run summary when called as a managed agent.
"""
def __init__(
@ -162,6 +163,7 @@ class MultiStepAgent:
name: Optional[str] = None,
description: Optional[str] = None,
managed_agent_prompt: Optional[str] = None,
provide_run_summary: bool = False,
):
if system_prompt is None:
system_prompt = CODE_SYSTEM_PROMPT
@ -181,6 +183,7 @@ class MultiStepAgent:
self.name = name
self.description = description
self.managed_agent_prompt = managed_agent_prompt if managed_agent_prompt else MANAGED_AGENT_PROMPT
self.provide_run_summary = provide_run_summary
self.managed_agents = {}
if managed_agents is not None:
@ -356,7 +359,7 @@ class MultiStepAgent:
if tool_name in self.tools:
tool_description = get_tool_description_with_args(available_tools[tool_name])
error_msg = (
f"Error in tool call execution: {e}\nYou should only use this tool with a correct input.\n"
f"Error in tool call execution: {type(e).__name__}: {e}\nYou should only use this tool with a correct input.\n"
f"As a reminder, this tool's description is the following:\n{tool_description}"
)
raise AgentExecutionError(error_msg, self.logger)
@ -453,10 +456,10 @@ You have been provided with these additional arguments, that you can access usin
observations_images=images,
)
try:
if self.planning_interval is not None and self.step_number % self.planning_interval == 0:
if self.planning_interval is not None and self.step_number % self.planning_interval == 1:
self.planning_step(
task,
is_first_step=(self.step_number == 0),
is_first_step=(self.step_number == 1),
step=self.step_number,
)
self.logger.log_rule(f"Step {self.step_number}", level=LogLevel.INFO)
@ -651,21 +654,21 @@ Now begin!""",
"""
self.memory.replay(self.logger, detailed=detailed)
def __call__(self, request, provide_run_summary=False, **kwargs):
"""Adds additional prompting for the managed agent, and runs it."""
def __call__(self, request: str, **kwargs):
"""
This methd is called only by a manager agent.
Adds additional prompting for the managed agent, runs it, and wraps the output.
"""
full_task = self.managed_agent_prompt.format(name=self.name, task=request).strip()
output = self.run(full_task, **kwargs)
if provide_run_summary:
answer = f"Here is the final answer from your managed agent '{self.name}':\n"
answer += str(output)
answer = f"Here is the final answer from your managed agent '{self.name}':\n{str(output)}"
if self.provide_run_summary:
answer += f"\n\nFor more detail, find below a summary of this agent's work:\nSUMMARY OF WORK FROM AGENT '{self.name}':\n"
for message in self.write_memory_to_messages(summary_mode=True):
content = message["content"]
answer += "\n" + truncate_content(str(content)) + "\n---"
answer += f"\nEND OF SUMMARY OF WORK FROM AGENT '{self.name}'."
return answer
else:
return output
return answer
class ToolCallingAgent(MultiStepAgent):
@ -925,8 +928,8 @@ class CodeAgent(MultiStepAgent):
]
observation = "Execution logs:\n" + execution_logs
except Exception as e:
if hasattr(self.python_executor, "state") and "print_outputs" in self.python_executor.state:
execution_logs = self.python_executor.state["print_outputs"]
if hasattr(self.python_executor, "state") and "_print_outputs" in self.python_executor.state:
execution_logs = str(self.python_executor.state["_print_outputs"])
if len(execution_logs) > 0:
execution_outputs_console = [
Text("Execution logs:", style="bold"),

View File

@ -76,7 +76,7 @@ class PythonInterpreterTool(Tool):
authorized_imports=self.authorized_imports,
)[0] # The second element is boolean is_final_answer
)
return f"Stdout:\n{state['print_outputs']}\nOutput: {output}"
return f"Stdout:\n{str(state['_print_outputs'])}\nOutput: {output}"
class FinalAnswerTool(Tool):
@ -169,10 +169,10 @@ class GoogleSearchTool(Tool):
if "organic_results" not in results.keys():
if filter_year is not None:
raise Exception(
f"'organic_results' key not found for query: '{query}' with filtering on year={filter_year}. Use a less restrictive query or do not filter on year."
f"No results found for query: '{query}' with filtering on year={filter_year}. Use a less restrictive query or do not filter on year."
)
else:
raise Exception(f"'organic_results' key not found for query: '{query}'. Use a less restrictive query.")
raise Exception(f"No results found for query: '{query}'. Use a less restrictive query.")
if len(results["organic_results"]) == 0:
year_filter_message = f" with filter year={filter_year}" if filter_year is not None else ""
return f"No results found for '{query}'{year_filter_message}. Try with a more general query, or remove the year filter."

View File

@ -50,8 +50,8 @@ ERRORS = {
if isinstance(getattr(builtins, name), type) and issubclass(getattr(builtins, name), BaseException)
}
PRINT_OUTPUTS, DEFAULT_MAX_LEN_OUTPUT = "", 50000
OPERATIONS_COUNT, MAX_OPERATIONS = 0, 10000000
DEFAULT_MAX_LEN_OUTPUT = 50000
MAX_OPERATIONS = 10000000
def custom_print(*args):
@ -114,6 +114,32 @@ BASE_PYTHON_TOOLS = {
}
class PrintContainer:
def __init__(self):
self.value = ""
def append(self, text):
self.value += text
return self
def __iadd__(self, other):
"""Implements the += operator"""
self.value += str(other)
return self
def __str__(self):
"""String representation"""
return self.value
def __repr__(self):
"""Representation for debugging"""
return f"PrintContainer({self.value})"
def __len__(self):
"""Implements len() function support"""
return len(self.value)
class BreakException(Exception):
pass
@ -215,7 +241,7 @@ def evaluate_while(
custom_tools: Dict[str, Callable],
authorized_imports: List[str],
) -> None:
max_iterations = 1000
max_iterations = 1000000
iterations = 0
while evaluate_ast(while_loop.test, state, static_tools, custom_tools, authorized_imports):
for node in while_loop.body:
@ -603,10 +629,7 @@ def evaluate_call(
raise InterpreterError("super() takes at most 2 arguments")
else:
if func_name == "print":
output = " ".join(map(str, args))
global PRINT_OUTPUTS
PRINT_OUTPUTS += output + "\n"
# cap the number of lines
state["_print_outputs"] += " ".join(map(str, args)) + "\n"
return None
else: # Assume it's a callable object
if (
@ -1090,6 +1113,42 @@ def evaluate_dictcomp(
return result
def evaluate_delete(
delete_node: ast.Delete,
state: Dict[str, Any],
static_tools: Dict[str, Callable],
custom_tools: Dict[str, Callable],
authorized_imports: List[str],
) -> None:
"""
Evaluate a delete statement (del x, del x[y]).
Args:
delete_node: The AST Delete node to evaluate
state: The current state dictionary
static_tools: Dictionary of static tools
custom_tools: Dictionary of custom tools
authorized_imports: List of authorized imports
"""
for target in delete_node.targets:
if isinstance(target, ast.Name):
# Handle simple variable deletion (del x)
if target.id in state:
del state[target.id]
else:
raise InterpreterError(f"Cannot delete name '{target.id}': name is not defined")
elif isinstance(target, ast.Subscript):
# Handle index/key deletion (del x[y])
obj = evaluate_ast(target.value, state, static_tools, custom_tools, authorized_imports)
index = evaluate_ast(target.slice, state, static_tools, custom_tools, authorized_imports)
try:
del obj[index]
except (TypeError, KeyError, IndexError) as e:
raise InterpreterError(f"Cannot delete index/key: {str(e)}")
else:
raise InterpreterError(f"Deletion of {type(target).__name__} targets is not supported")
def evaluate_ast(
expression: ast.AST,
state: Dict[str, Any],
@ -1117,12 +1176,11 @@ def evaluate_ast(
The list of modules that can be imported by the code. By default, only a few safe modules are allowed.
If it contains "*", it will authorize any import. Use this at your own risk!
"""
global OPERATIONS_COUNT
if OPERATIONS_COUNT >= MAX_OPERATIONS:
if state["_operations_count"] >= MAX_OPERATIONS:
raise InterpreterError(
f"Reached the max number of operations of {MAX_OPERATIONS}. Maybe there is an infinite loop somewhere in the code, or you're just asking too many calculations."
)
OPERATIONS_COUNT += 1
state["_operations_count"] += 1
if isinstance(expression, ast.Assign):
# Assignment -> we evaluate the assignment which should update the state
# We return the variable assigned as it may be used to determine the final result.
@ -1241,6 +1299,8 @@ def evaluate_ast(
)
elif isinstance(expression, ast.Pass):
return None
elif isinstance(expression, ast.Delete):
return evaluate_delete(expression, state, static_tools, custom_tools, authorized_imports)
else:
# For now we refuse anything else. Let's add things as we need them.
raise InterpreterError(f"{expression.__class__.__name__} is not supported.")
@ -1277,7 +1337,7 @@ def evaluate_python_code(
state (`Dict[str, Any]`):
A dictionary mapping variable names to values. The `state` should contain the initial inputs but will be
updated by this function to contain all variables as they are evaluated.
The print outputs will be stored in the state under the key 'print_outputs'.
The print outputs will be stored in the state under the key "_print_outputs".
"""
try:
expression = ast.parse(code)
@ -1294,10 +1354,8 @@ def evaluate_python_code(
static_tools = static_tools.copy() if static_tools is not None else {}
custom_tools = custom_tools if custom_tools is not None else {}
result = None
global PRINT_OUTPUTS
PRINT_OUTPUTS = ""
global OPERATIONS_COUNT
OPERATIONS_COUNT = 0
state["_print_outputs"] = PrintContainer()
state["_operations_count"] = 0
def final_answer(value):
raise FinalAnswerException(value)
@ -1307,16 +1365,22 @@ def evaluate_python_code(
try:
for node in expression.body:
result = evaluate_ast(node, state, static_tools, custom_tools, authorized_imports)
state["print_outputs"] = truncate_content(PRINT_OUTPUTS, max_length=max_print_outputs_length)
state["_print_outputs"].value = truncate_content(
str(state["_print_outputs"]), max_length=max_print_outputs_length
)
is_final_answer = False
return result, is_final_answer
except FinalAnswerException as e:
state["print_outputs"] = truncate_content(PRINT_OUTPUTS, max_length=max_print_outputs_length)
state["_print_outputs"].value = truncate_content(
str(state["_print_outputs"]), max_length=max_print_outputs_length
)
is_final_answer = True
return e.value, is_final_answer
except Exception as e:
exception_type = type(e).__name__
state["print_outputs"] = truncate_content(PRINT_OUTPUTS, max_length=max_print_outputs_length)
state["_print_outputs"].value = truncate_content(
str(state["_print_outputs"]), max_length=max_print_outputs_length
)
raise InterpreterError(
f"Code execution failed at line '{ast.get_source_segment(code, node)}' due to: {exception_type}:{str(e)}"
)
@ -1353,7 +1417,7 @@ class LocalPythonInterpreter:
authorized_imports=self.authorized_imports,
max_print_outputs_length=self.max_print_outputs_length,
)
logs = self.state["print_outputs"]
logs = str(self.state["_print_outputs"])
return output, logs, is_final_answer

View File

@ -347,6 +347,9 @@ class HfApiModel(Model):
If not provided, the class will try to use environment variable 'HF_TOKEN', else use the token stored in the Hugging Face CLI configuration.
timeout (`int`, *optional*, defaults to 120):
Timeout for the API request, in seconds.
custom_role_conversions (`dict[str, str]`, *optional*):
Custom role conversion mapping to convert message roles in others.
Useful for specific models that do not support specific message roles like "system".
**kwargs:
Additional keyword arguments to pass to the Hugging Face API.
@ -374,6 +377,7 @@ class HfApiModel(Model):
provider: Optional[str] = None,
token: Optional[str] = None,
timeout: Optional[int] = 120,
custom_role_conversions: Optional[Dict[str, str]] = None,
**kwargs,
):
super().__init__(**kwargs)
@ -382,6 +386,7 @@ class HfApiModel(Model):
if token is None:
token = os.getenv("HF_TOKEN")
self.client = InferenceClient(self.model_id, provider=provider, token=token, timeout=timeout)
self.custom_role_conversions = custom_role_conversions
def __call__(
self,
@ -397,9 +402,9 @@ class HfApiModel(Model):
grammar=grammar,
tools_to_call_from=tools_to_call_from,
convert_images_to_image_urls=True,
custom_role_conversions=self.custom_role_conversions,
**kwargs,
)
response = self.client.chat_completion(**completion_kwargs)
self.last_input_token_count = response.usage.prompt_tokens

View File

@ -351,7 +351,8 @@ pope_age_search = web_search(query="current pope age")
print("Pope age as per google search:", pope_age_search)
```<end_code>
Observation:
Pope age: "The pope Francis is currently 88 years old."
Pope age as per wikipedia: "The pope Francis is currently 88 years old."
Pope age as per google search: "The current pope, Francis, just turned 88."
Thought: I know that the pope is 88 years old. Let's compute the result using python code.
Code:
@ -501,7 +502,7 @@ You have been submitted this task by your manager.
Task:
{task}
---
You're helping your manager solve a wider task: so make sure to not provide a one-line answer, but give as much information as possible to give them a clear understanding of the answer.
You're helping your manager solve a wider task: so do not just provide a one-line answer, instead give as much information as possible to give them a clear understanding of the answer.
Your final_answer WILL HAVE to contain these parts:
### 1. Task outcome (short version):

View File

@ -35,19 +35,25 @@ def add_two(x):
class PythonInterpreterTester(unittest.TestCase):
def assertDictEqualNoPrint(self, dict1, dict2):
return self.assertDictEqual(
{k: v for k, v in dict1.items() if k != "_print_outputs"},
{k: v for k, v in dict2.items() if k != "_print_outputs"},
)
def test_evaluate_assign(self):
code = "x = 3"
state = {}
result, _ = evaluate_python_code(code, {}, state=state)
assert result == 3
self.assertDictEqual(state, {"x": 3, "print_outputs": ""})
self.assertDictEqualNoPrint(state, {"x": 3, "_operations_count": 2})
code = "x = y"
state = {"y": 5}
result, _ = evaluate_python_code(code, {}, state=state)
# evaluate returns the value of the last assignment.
assert result == 5
self.assertDictEqual(state, {"x": 5, "y": 5, "print_outputs": ""})
self.assertDictEqualNoPrint(state, {"x": 5, "y": 5, "_operations_count": 2})
code = "a=1;b=None"
result, _ = evaluate_python_code(code, {}, state={})
@ -73,7 +79,7 @@ class PythonInterpreterTester(unittest.TestCase):
state = {"x": 3}
result, _ = evaluate_python_code(code, {"add_two": add_two}, state=state)
assert result == 5
self.assertDictEqual(state, {"x": 3, "y": 5, "print_outputs": ""})
self.assertDictEqualNoPrint(state, {"x": 3, "y": 5, "_operations_count": 3})
# Should not work without the tool
with pytest.raises(InterpreterError) as e:
@ -85,14 +91,14 @@ class PythonInterpreterTester(unittest.TestCase):
state = {}
result, _ = evaluate_python_code(code, {}, state=state)
assert result == 3
self.assertDictEqual(state, {"x": 3, "print_outputs": ""})
self.assertDictEqualNoPrint(state, {"x": 3, "_operations_count": 2})
def test_evaluate_dict(self):
code = "test_dict = {'x': x, 'y': add_two(x)}"
state = {"x": 3}
result, _ = evaluate_python_code(code, {"add_two": add_two}, state=state)
self.assertDictEqual(result, {"x": 3, "y": 5})
self.assertDictEqual(state, {"x": 3, "test_dict": {"x": 3, "y": 5}, "print_outputs": ""})
self.assertDictEqualNoPrint(state, {"x": 3, "test_dict": {"x": 3, "y": 5}, "_operations_count": 7})
def test_evaluate_expression(self):
code = "x = 3\ny = 5"
@ -100,7 +106,7 @@ class PythonInterpreterTester(unittest.TestCase):
result, _ = evaluate_python_code(code, {}, state=state)
# evaluate returns the value of the last assignment.
assert result == 5
self.assertDictEqual(state, {"x": 3, "y": 5, "print_outputs": ""})
self.assertDictEqualNoPrint(state, {"x": 3, "y": 5, "_operations_count": 4})
def test_evaluate_f_string(self):
code = "text = f'This is x: {x}.'"
@ -108,7 +114,7 @@ class PythonInterpreterTester(unittest.TestCase):
result, _ = evaluate_python_code(code, {}, state=state)
# evaluate returns the value of the last assignment.
assert result == "This is x: 3."
self.assertDictEqual(state, {"x": 3, "text": "This is x: 3.", "print_outputs": ""})
self.assertDictEqualNoPrint(state, {"x": 3, "text": "This is x: 3.", "_operations_count": 6})
def test_evaluate_if(self):
code = "if x <= 3:\n y = 2\nelse:\n y = 5"
@ -116,40 +122,40 @@ class PythonInterpreterTester(unittest.TestCase):
result, _ = evaluate_python_code(code, {}, state=state)
# evaluate returns the value of the last assignment.
assert result == 2
self.assertDictEqual(state, {"x": 3, "y": 2, "print_outputs": ""})
self.assertDictEqualNoPrint(state, {"x": 3, "y": 2, "_operations_count": 6})
state = {"x": 8}
result, _ = evaluate_python_code(code, {}, state=state)
# evaluate returns the value of the last assignment.
assert result == 5
self.assertDictEqual(state, {"x": 8, "y": 5, "print_outputs": ""})
self.assertDictEqualNoPrint(state, {"x": 8, "y": 5, "_operations_count": 6})
def test_evaluate_list(self):
code = "test_list = [x, add_two(x)]"
state = {"x": 3}
result, _ = evaluate_python_code(code, {"add_two": add_two}, state=state)
self.assertListEqual(result, [3, 5])
self.assertDictEqual(state, {"x": 3, "test_list": [3, 5], "print_outputs": ""})
self.assertDictEqualNoPrint(state, {"x": 3, "test_list": [3, 5], "_operations_count": 5})
def test_evaluate_name(self):
code = "y = x"
state = {"x": 3}
result, _ = evaluate_python_code(code, {}, state=state)
assert result == 3
self.assertDictEqual(state, {"x": 3, "y": 3, "print_outputs": ""})
self.assertDictEqualNoPrint(state, {"x": 3, "y": 3, "_operations_count": 2})
def test_evaluate_subscript(self):
code = "test_list = [x, add_two(x)]\ntest_list[1]"
state = {"x": 3}
result, _ = evaluate_python_code(code, {"add_two": add_two}, state=state)
assert result == 5
self.assertDictEqual(state, {"x": 3, "test_list": [3, 5], "print_outputs": ""})
self.assertDictEqualNoPrint(state, {"x": 3, "test_list": [3, 5], "_operations_count": 9})
code = "test_dict = {'x': x, 'y': add_two(x)}\ntest_dict['y']"
state = {"x": 3}
result, _ = evaluate_python_code(code, {"add_two": add_two}, state=state)
assert result == 5
self.assertDictEqual(state, {"x": 3, "test_dict": {"x": 3, "y": 5}, "print_outputs": ""})
self.assertDictEqualNoPrint(state, {"x": 3, "test_dict": {"x": 3, "y": 5}, "_operations_count": 11})
code = "vendor = {'revenue': 31000, 'rent': 50312}; vendor['ratio'] = round(vendor['revenue'] / vendor['rent'], 2)"
state = {}
@ -173,14 +179,14 @@ for result in search_results:
state = {}
result, _ = evaluate_python_code(code, {"range": range}, state=state)
assert result == 2
self.assertDictEqual(state, {"x": 2, "i": 2, "print_outputs": ""})
self.assertDictEqualNoPrint(state, {"x": 2, "i": 2, "_operations_count": 11})
def test_evaluate_binop(self):
code = "y + x"
state = {"x": 3, "y": 6}
result, _ = evaluate_python_code(code, {}, state=state)
assert result == 9
self.assertDictEqual(state, {"x": 3, "y": 6, "print_outputs": ""})
self.assertDictEqualNoPrint(state, {"x": 3, "y": 6, "_operations_count": 4})
def test_recursive_function(self):
code = """
@ -377,7 +383,7 @@ if char.isalpha():
print('2')"""
state = {}
evaluate_python_code(code, BASE_PYTHON_TOOLS, state=state)
assert state["print_outputs"] == "2\n"
assert state["_print_outputs"].value == "2\n"
def test_imports(self):
code = "import math\nmath.sqrt(4)"
@ -456,9 +462,9 @@ if char.isalpha():
state = {}
result, _ = evaluate_python_code(code, BASE_PYTHON_TOOLS, state=state)
assert result is None
assert state["print_outputs"] == "Hello world!\nOk no one cares\n"
assert state["_print_outputs"].value == "Hello world!\nOk no one cares\n"
# test print in function
# Test print in function (state copy)
code = """
print("1")
def function():
@ -466,7 +472,17 @@ def function():
function()"""
state = {}
evaluate_python_code(code, {"print": print}, state=state)
assert state["print_outputs"] == "1\n2\n"
assert state["_print_outputs"].value == "1\n2\n"
# Test print in list comprehension (state copy)
code = """
print("1")
def function():
print("2")
[function() for i in range(10)]"""
state = {}
evaluate_python_code(code, {"print": print, "range": range}, state=state)
assert state["_print_outputs"].value == "1\n2\n2\n2\n2\n2\n2\n2\n2\n2\n2\n"
def test_tuple_target_in_iterator(self):
code = "for a, b in [('Ralf Weikert', 'Austria'), ('Samuel Seungwon Lee', 'South Korea')]:res = a.split()[0]"
@ -588,7 +604,7 @@ except ValueError as e:
code = "print(min([1, 2, 3]))"
state = {}
evaluate_python_code(code, {"min": min, "print": print}, state=state)
assert state["print_outputs"] == "1\n"
assert state["_print_outputs"].value == "1\n"
def test_types_as_objects(self):
code = "type_a = float(2); type_b = str; type_c = int"