classFastA2A(Starlette):"""The main class for the FastA2A library."""def__init__(self,*,storage:Storage,broker:Broker,# Agent cardname:str|None=None,url:str='http://localhost:8000',version:str='1.0.0',description:str|None=None,provider:Provider|None=None,skills:list[Skill]|None=None,# Starlettedebug:bool=False,routes:Sequence[Route]|None=None,middleware:Sequence[Middleware]|None=None,exception_handlers:dict[Any,ExceptionHandler]|None=None,lifespan:Lifespan[FastA2A]|None=None,):iflifespanisNone:lifespan=_default_lifespansuper().__init__(debug=debug,routes=routes,middleware=middleware,exception_handlers=exception_handlers,lifespan=lifespan,)self.name=nameor'Agent'self.url=urlself.version=versionself.description=descriptionself.provider=providerself.skills=skillsor[]# NOTE: For now, I don't think there's any reason to support any other input/output modes.self.default_input_modes=['application/json']self.default_output_modes=['application/json']self.task_manager=TaskManager(broker=broker,storage=storage)# Setupself._agent_card_json_schema:bytes|None=Noneself.router.add_route('/.well-known/agent.json',self._agent_card_endpoint,methods=['HEAD','GET','OPTIONS'])self.router.add_route('/',self._agent_run_endpoint,methods=['POST'])asyncdef__call__(self,scope:Scope,receive:Receive,send:Send)->None:ifscope['type']=='http'andnotself.task_manager.is_running:raiseRuntimeError('TaskManager was not properly initialized.')awaitsuper().__call__(scope,receive,send)asyncdef_agent_card_endpoint(self,request:Request)->Response:ifself._agent_card_json_schemaisNone:agent_card=AgentCard(name=self.name,url=self.url,version=self.version,skills=self.skills,default_input_modes=self.default_input_modes,default_output_modes=self.default_output_modes,capabilities=Capabilities(streaming=False,push_notifications=False,state_transition_history=False),authentication=Authentication(schemes=[]),)ifself.descriptionisnotNone:agent_card['description']=self.descriptionifself.providerisnotNone:agent_card['provider']=self.providerself._agent_card_json_schema=agent_card_ta.dump_json(agent_card,by_alias=True)returnResponse(content=self._agent_card_json_schema,media_type='application/json')asyncdef_agent_run_endpoint(self,request:Request)->Response:"""This is the main endpoint for the A2A server. Although the specification allows freedom of choice and implementation, I'm pretty sure about some decisions. 1. The server will always either send a "submitted" or a "failed" on `tasks/send`. Never a "completed" on the first message. 2. There are three possible ends for the task: 2.1. The task was "completed" successfully. 2.2. The task was "canceled". 2.3. The task "failed". 3. The server will send a "working" on the first chunk on `tasks/pushNotification/get`. """data=awaitrequest.body()a2a_request=a2a_request_ta.validate_json(data)ifa2a_request['method']=='tasks/send':jsonrpc_response=awaitself.task_manager.send_task(a2a_request)elifa2a_request['method']=='tasks/get':jsonrpc_response=awaitself.task_manager.get_task(a2a_request)elifa2a_request['method']=='tasks/cancel':jsonrpc_response=awaitself.task_manager.cancel_task(a2a_request)else:raiseNotImplementedError(f'Method {a2a_request["method"]} not implemented.')returnResponse(content=a2a_response_ta.dump_json(jsonrpc_response,by_alias=True),media_type='application/json')
The broker class is in charge of scheduling the tasks.
The HTTP server uses the broker to schedule tasks.
The simple implementation is the InMemoryBroker, which is the broker that
runs the tasks in the same process as the HTTP server. That said, this class can be
extended to support remote workers.
@dataclassclassBroker(ABC):"""The broker class is in charge of scheduling the tasks. The HTTP server uses the broker to schedule tasks. The simple implementation is the `InMemoryBroker`, which is the broker that runs the tasks in the same process as the HTTP server. That said, this class can be extended to support remote workers. """@abstractmethodasyncdefrun_task(self,params:TaskSendParams)->None:"""Send a task to be executed by the worker."""raiseNotImplementedError('send_run_task is not implemented yet.')@abstractmethodasyncdefcancel_task(self,params:TaskIdParams)->None:"""Cancel a task."""raiseNotImplementedError('send_cancel_task is not implemented yet.')@abstractmethodasyncdef__aenter__(self)->Self:...@abstractmethodasyncdef__aexit__(self,exc_type:Any,exc_value:Any,traceback:Any):...@abstractmethoddefreceive_task_operations(self)->AsyncIterator[TaskOperation]:"""Receive task operations from the broker. On a multi-worker setup, the broker will need to round-robin the task operations between the workers. """
@abstractmethodasyncdefrun_task(self,params:TaskSendParams)->None:"""Send a task to be executed by the worker."""raiseNotImplementedError('send_run_task is not implemented yet.')
@abstractmethodasyncdefcancel_task(self,params:TaskIdParams)->None:"""Cancel a task."""raiseNotImplementedError('send_cancel_task is not implemented yet.')
On a multi-worker setup, the broker will need to round-robin the task operations
between the workers.
Source code in fasta2a/fasta2a/broker.py
46474849505152
@abstractmethoddefreceive_task_operations(self)->AsyncIterator[TaskOperation]:"""Receive task operations from the broker. On a multi-worker setup, the broker will need to round-robin the task operations between the workers. """
@pydantic.with_config(config={'alias_generator':to_camel})classSkill(TypedDict):"""Skills are a unit of capability that an agent can perform."""id:str"""A unique identifier for the skill."""name:str"""Human readable name of the skill."""description:str"""A human-readable description of the skill. It will be used by the client or a human as a hint to understand the skill. """tags:list[str]"""Set of tag-words describing classes of capabilities for this specific skill. Examples: "cooking", "customer support", "billing". """examples:NotRequired[list[str]]"""The set of example scenarios that the skill can perform. Will be used by the client as a hint to understand how the skill can be used. (e.g. "I need a recipe for bread") """input_modes:list[str]"""Supported mime types for input data."""output_modes:list[str]"""Supported mime types for output data."""
classStorage(ABC):"""A storage to retrieve and save tasks. The storage is used to update the status of a task and to save the result of a task. """@abstractmethodasyncdefload_task(self,task_id:str,history_length:int|None=None)->Task|None:"""Load a task from storage. If the task is not found, return None. """@abstractmethodasyncdefsubmit_task(self,task_id:str,session_id:str,message:Message)->Task:"""Submit a task to storage."""@abstractmethodasyncdefupdate_task(self,task_id:str,state:TaskState,message:Message|None=None,artifacts:list[Artifact]|None=None,)->Task:"""Update the state of a task."""
@abstractmethodasyncdefload_task(self,task_id:str,history_length:int|None=None)->Task|None:"""Load a task from storage. If the task is not found, return None. """
@abstractmethodasyncdefupdate_task(self,task_id:str,state:TaskState,message:Message|None=None,artifacts:list[Artifact]|None=None,)->Task:"""Update the state of a task."""
@dataclassclassWorker(ABC):"""A worker is responsible for executing tasks."""broker:Brokerstorage:Storage@asynccontextmanagerasyncdefrun(self)->AsyncIterator[None]:"""Run the worker. It connects to the broker, and it makes itself available to receive commands. """asyncwithanyio.create_task_group()astg:tg.start_soon(self._loop)yieldtg.cancel_scope.cancel()asyncdef_loop(self)->None:asyncfortask_operationinself.broker.receive_task_operations():awaitself._handle_task_operation(task_operation)asyncdef_handle_task_operation(self,task_operation:TaskOperation)->None:try:withuse_span(task_operation['_current_span']):withtracer.start_as_current_span(f'{task_operation["operation"]} task',attributes={'logfire.tags':['fasta2a']}):iftask_operation['operation']=='run':awaitself.run_task(task_operation['params'])eliftask_operation['operation']=='cancel':awaitself.cancel_task(task_operation['params'])else:assert_never(task_operation)exceptException:awaitself.storage.update_task(task_operation['params']['id'],state='failed')@abstractmethodasyncdefrun_task(self,params:TaskSendParams)->None:...@abstractmethodasyncdefcancel_task(self,params:TaskIdParams)->None:...@abstractmethoddefbuild_message_history(self,task_history:list[Message])->list[Any]:...@abstractmethoddefbuild_artifacts(self,result:Any)->list[Artifact]:...
It connects to the broker, and it makes itself available to receive commands.
Source code in fasta2a/fasta2a/worker.py
28293031323334353637
@asynccontextmanagerasyncdefrun(self)->AsyncIterator[None]:"""Run the worker. It connects to the broker, and it makes itself available to receive commands. """asyncwithanyio.create_task_group()astg:tg.start_soon(self._loop)yieldtg.cancel_scope.cancel()
This module contains the schema for the agent card.
@pydantic.with_config(config={'alias_generator':to_camel})classAgentCard(TypedDict):"""The card that describes an agent."""name:str"""Human readable name of the agent e.g. "Recipe Agent"."""description:NotRequired[str]"""A human-readable description of the agent. Used to assist users and other agents in understanding what the agent can do. (e.g. "Agent that helps users with recipes and cooking.") """# TODO(Marcelo): The spec makes url required.url:NotRequired[str]"""A URL to the address the agent is hosted at."""provider:NotRequired[Provider]"""The service provider of the agent."""# TODO(Marcelo): The spec makes version required.version:NotRequired[str]"""The version of the agent - format is up to the provider. (e.g. "1.0.0")"""documentation_url:NotRequired[str]"""A URL to documentation for the agent."""capabilities:Capabilities"""The capabilities of the agent."""authentication:Authentication"""The authentication schemes supported by the agent. Intended to match OpenAPI authentication structure. """default_input_modes:list[str]"""Supported mime types for input data."""default_output_modes:list[str]"""Supported mime types for output data."""skills:list[Skill]
@pydantic.with_config(config={'alias_generator':to_camel})classCapabilities(TypedDict):"""The capabilities of the agent."""streaming:NotRequired[bool]"""Whether the agent supports streaming."""push_notifications:NotRequired[bool]"""Whether the agent can notify updates to client."""state_transition_history:NotRequired[bool]"""Whether the agent exposes status change history for tasks."""
The authentication schemes supported by the agent.
Source code in fasta2a/fasta2a/schema.py
838485868788899091
@pydantic.with_config(config={'alias_generator':to_camel})classAuthentication(TypedDict):"""The authentication schemes supported by the agent."""schemes:list[str]"""The authentication schemes supported by the agent. (e.g. "Basic", "Bearer")"""credentials:NotRequired[str]"""The credentials a client should use for private cards."""
@pydantic.with_config(config={'alias_generator':to_camel})classSkill(TypedDict):"""Skills are a unit of capability that an agent can perform."""id:str"""A unique identifier for the skill."""name:str"""Human readable name of the skill."""description:str"""A human-readable description of the skill. It will be used by the client or a human as a hint to understand the skill. """tags:list[str]"""Set of tag-words describing classes of capabilities for this specific skill. Examples: "cooking", "customer support", "billing". """examples:NotRequired[list[str]]"""The set of example scenarios that the skill can perform. Will be used by the client as a hint to understand how the skill can be used. (e.g. "I need a recipe for bread") """input_modes:list[str]"""Supported mime types for input data."""output_modes:list[str]"""Supported mime types for output data."""
@pydantic.with_config(config={'alias_generator':to_camel})classArtifact(TypedDict):"""Agents generate Artifacts as an end result of a Task. Artifacts are immutable, can be named, and can have multiple parts. A streaming response can append parts to existing Artifacts. A single Task can generate many Artifacts. For example, "create a webpage" could create separate HTML and image Artifacts. """name:NotRequired[str]"""The name of the artifact."""description:NotRequired[str]"""A description of the artifact."""parts:list[Part]"""The parts that make up the artifact."""metadata:NotRequired[dict[str,Any]]"""Metadata about the artifact."""index:int"""The index of the artifact."""append:NotRequired[bool]"""Whether to append this artifact to an existing one."""last_chunk:NotRequired[bool]"""Whether this is the last chunk of the artifact."""
A2A supports a secure notification mechanism whereby an agent can notify a client of an update
outside of a connected session via a PushNotificationService. Within and across enterprises,
it is critical that the agent verifies the identity of the notification service, authenticates
itself with the service, and presents an identifier that ties the notification to the executing
Task.
The target server of the PushNotificationService should be considered a separate service, and
is not guaranteed (or even expected) to be the client directly. This PushNotificationService is
responsible for authenticating and authorizing the agent and for proxying the verified notification
to the appropriate endpoint (which could be anything from a pub/sub queue, to an email inbox or
other service, etc).
For contrived scenarios with isolated client-agent pairs (e.g. local service mesh in a contained
VPC, etc.) or isolated environments without enterprise security concerns, the client may choose to
simply open a port and act as its own PushNotificationService. Any enterprise implementation will
likely have a centralized service that authenticates the remote agents with trusted notification
credentials and can handle online/offline scenarios. (This should be thought of similarly to a
mobile Push Notification Service).
@pydantic.with_config(config={'alias_generator':to_camel})classPushNotificationConfig(TypedDict):"""Configuration for push notifications. A2A supports a secure notification mechanism whereby an agent can notify a client of an update outside of a connected session via a PushNotificationService. Within and across enterprises, it is critical that the agent verifies the identity of the notification service, authenticates itself with the service, and presents an identifier that ties the notification to the executing Task. The target server of the PushNotificationService should be considered a separate service, and is not guaranteed (or even expected) to be the client directly. This PushNotificationService is responsible for authenticating and authorizing the agent and for proxying the verified notification to the appropriate endpoint (which could be anything from a pub/sub queue, to an email inbox or other service, etc). For contrived scenarios with isolated client-agent pairs (e.g. local service mesh in a contained VPC, etc.) or isolated environments without enterprise security concerns, the client may choose to simply open a port and act as its own PushNotificationService. Any enterprise implementation will likely have a centralized service that authenticates the remote agents with trusted notification credentials and can handle online/offline scenarios. (This should be thought of similarly to a mobile Push Notification Service). """url:str"""The URL to send push notifications to."""token:NotRequired[str]"""Token unique to this task/session."""authentication:NotRequired[Authentication]"""Authentication details for push notifications."""
A Message contains any content that is not an Artifact.
This can include things like agent thoughts, user context, instructions, errors, status, or metadata.
All content from a client comes in the form of a Message. Agents send Messages to communicate status or to provide
instructions (whereas generated results are sent as Artifacts).
A Message can have multiple parts to denote different pieces of content. For example, a user request could include
a textual description from a user and then multiple files used as context from the client.
classMessage(TypedDict):"""A Message contains any content that is not an Artifact. This can include things like agent thoughts, user context, instructions, errors, status, or metadata. All content from a client comes in the form of a Message. Agents send Messages to communicate status or to provide instructions (whereas generated results are sent as Artifacts). A Message can have multiple parts to denote different pieces of content. For example, a user request could include a textual description from a user and then multiple files used as context from the client. """role:Literal['user','agent']"""The role of the message."""parts:list[Part]"""The parts of the message."""metadata:NotRequired[dict[str,Any]]"""Metadata about the message."""
@pydantic.with_config(config={'alias_generator':to_camel})classFilePart(_BasePart):"""A part that contains a file."""type:Literal['file']"""The type of the part."""file:File"""The file of the part."""
@pydantic.with_config(config={'alias_generator':to_camel})classDataPart(_BasePart):"""A part that contains data."""type:Literal['data']"""The type of the part."""data:dict[str,Any]"""The data of the part."""
@pydantic.with_config(config={'alias_generator':to_camel})classTaskStatus(TypedDict):"""Status and accompanying message for a task."""state:TaskState"""The current state of the task."""message:NotRequired[Message]"""Additional status updates for client."""timestamp:NotRequired[str]"""ISO datetime value of when the status was updated."""
A Task is a stateful entity that allows Clients and Remote Agents to achieve a specific outcome.
Clients and Remote Agents exchange Messages within a Task. Remote Agents generate results as Artifacts.
A Task is always created by a Client and the status is always determined by the Remote Agent.
@pydantic.with_config(config={'alias_generator':to_camel})classTask(TypedDict):"""A Task is a stateful entity that allows Clients and Remote Agents to achieve a specific outcome. Clients and Remote Agents exchange Messages within a Task. Remote Agents generate results as Artifacts. A Task is always created by a Client and the status is always determined by the Remote Agent. """id:str"""Unique identifier for the task."""session_id:NotRequired[str]"""Client-generated id for the session holding the task."""status:TaskStatus"""Current status of the task."""history:NotRequired[list[Message]]"""Optional history of messages."""artifacts:NotRequired[list[Artifact]]"""Collection of artifacts created by the agent."""metadata:NotRequired[dict[str,Any]]"""Extension metadata."""
Sent by server during sendSubscribe or subscribe requests.
Source code in fasta2a/fasta2a/schema.py
349350351352353354355356357358359360361362363
@pydantic.with_config(config={'alias_generator':to_camel})classTaskStatusUpdateEvent(TypedDict):"""Sent by server during sendSubscribe or subscribe requests."""id:str"""The id of the task."""status:TaskStatus"""The status of the task."""final:bool"""Indicates the end of the event stream."""metadata:NotRequired[dict[str,Any]]"""Extension metadata."""
Sent by server during sendSubscribe or subscribe requests.
Source code in fasta2a/fasta2a/schema.py
366367368369370371372373374375376377
@pydantic.with_config(config={'alias_generator':to_camel})classTaskArtifactUpdateEvent(TypedDict):"""Sent by server during sendSubscribe or subscribe requests."""id:str"""The id of the task."""artifact:Artifact"""The artifact that was updated."""metadata:NotRequired[dict[str,Any]]"""Extension metadata."""
@pydantic.with_config(config={'alias_generator':to_camel})classTaskIdParams(TypedDict):"""Parameters for a task id."""id:strmetadata:NotRequired[dict[str,Any]]
@pydantic.with_config(config={'alias_generator':to_camel})classTaskQueryParams(TaskIdParams):"""Query parameters for a task."""history_length:NotRequired[int]"""Number of recent messages to be retrieved."""
@pydantic.with_config(config={'alias_generator':to_camel})classTaskSendParams(TypedDict):"""Sent by the client to the agent to create, continue, or restart a task."""id:str"""The id of the task."""session_id:NotRequired[str]"""The server creates a new sessionId for new tasks if not set."""message:Message"""The message to send to the agent."""history_length:NotRequired[int]"""Number of recent messages to be retrieved."""push_notification:NotRequired[PushNotificationConfig]"""Where the server should send notifications when disconnected."""metadata:NotRequired[dict[str,Any]]"""Extension metadata."""
classJSONRPCRequest(JSONRPCMessage,Generic[Method,Params]):"""A JSON RPC request."""method:Method"""The method to call."""params:Params"""The parameters to pass to the method."""
classA2AClient:"""A client for the A2A protocol."""def__init__(self,base_url:str='http://localhost:8000',http_client:httpx.AsyncClient|None=None)->None:ifhttp_clientisNone:self.http_client=httpx.AsyncClient(base_url=base_url)else:self.http_client=http_clientself.http_client.base_url=base_urlasyncdefsend_task(self,message:Message,history_length:int|None=None,push_notification:PushNotificationConfig|None=None,metadata:dict[str,Any]|None=None,)->SendTaskResponse:task=TaskSendParams(message=message,id=str(uuid.uuid4()))ifhistory_lengthisnotNone:task['history_length']=history_lengthifpush_notificationisnotNone:task['push_notification']=push_notificationifmetadataisnotNone:task['metadata']=metadatapayload=SendTaskRequest(jsonrpc='2.0',id=None,method='tasks/send',params=task)content=a2a_request_ta.dump_json(payload,by_alias=True)response=awaitself.http_client.post('/',content=content,headers={'Content-Type':'application/json'})self._raise_for_status(response)returnsend_task_response_ta.validate_json(response.content)asyncdefget_task(self,task_id:str)->GetTaskResponse:payload=GetTaskRequest(jsonrpc='2.0',id=None,method='tasks/get',params={'id':task_id})content=a2a_request_ta.dump_json(payload,by_alias=True)response=awaitself.http_client.post('/',content=content,headers={'Content-Type':'application/json'})self._raise_for_status(response)returnget_task_response_ta.validate_json(response.content)def_raise_for_status(self,response:httpx.Response)->None:ifresponse.status_code>=400:raiseUnexpectedResponseError(response.status_code,response.text)
An error raised when an unexpected response is received from the server.
Source code in fasta2a/fasta2a/client.py
737475767778
classUnexpectedResponseError(Exception):"""An error raised when an unexpected response is received from the server."""def__init__(self,status_code:int,content:str)->None:self.status_code=status_codeself.content=content