-
Notifications
You must be signed in to change notification settings - Fork 2.8k
feat(durable): Add durable session persistence layer for long-horizon agents #4351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(durable): Add durable session persistence layer for long-horizon agents #4351
Conversation
|
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
|
Response from ADK Triaging Agent Hello @caohy1988, thank you for creating this PR! Before we can proceed with the review, could you please address the following items from our contribution guidelines:
This information will help us to review your PR more efficiently. Thanks! |
Summary of ChangesHello @caohy1988, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances ADK's capabilities by introducing a robust durable session persistence layer. This new feature allows long-running agent tasks to maintain their state across process boundaries and system failures, ensuring continuity and reliability for complex, time-consuming operations. By leveraging BigQuery and Google Cloud Storage, it provides an auditable and scalable solution for managing agent progress, effectively overcoming limitations previously faced in cloud environments. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a comprehensive and well-designed durable session persistence layer, which is a significant feature for enabling long-horizon agents. The use of BigQuery for metadata and GCS for blobs is a robust pattern, and the implementation correctly includes key features like two-phase commits and lease-based concurrency. The accompanying demo is excellent for showcasing the functionality. My review identifies a few important issues to address, primarily concerning security (a hardcoded API key and a potential path traversal vulnerability), a race condition in session creation, and several opportunities for code refinement and improved maintainability. Overall, this is a strong feature addition, and addressing these points will make it even more robust.
| GOOGLE_CLOUD_API_KEY = os.environ.get( | ||
| "GOOGLE_CLOUD_API_KEY", | ||
| "AQ.Ab8RN6L12XpDo1x7Gf2w87EfspguWGrjZPW6XocNy2og_-z_jg", | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A default API key is hardcoded as a fallback value. This is a significant security risk, as it could be accidentally committed and exposed. Even for a demo, it's best practice to avoid hardcoding secrets. The application should fail explicitly if the key is not provided in the environment, rather than falling back to a hardcoded value.
GOOGLE_CLOUD_API_KEY = os.environ.get("GOOGLE_CLOUD_API_KEY")
if not GOOGLE_CLOUD_API_KEY:
raise ValueError("GOOGLE_CLOUD_API_KEY environment variable not set.")There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted
| existing = await self.get_session(session_id=session_id) | ||
| if existing: | ||
| raise ValueError(f"Session {session_id} already exists") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a race condition here. Two concurrent requests could both check for an existing session, find none, and then both attempt to create it. Since BigQuery PRIMARY KEY constraints are not enforced, this could lead to duplicate session entries. The session creation logic should be made idempotent. One approach is to use a unique ID for the BigQuery insert job, which makes the insertion retryable and idempotent within a certain window.
| safe_members = [ | ||
| m for m in tar.getmembers() if not m.name.startswith(("/", "..")) | ||
| ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check to prevent path traversal attacks (tar-slip) is insufficient. An attacker could craft a filename like a/../../etc/passwd which would bypass the current check. A more robust approach is to resolve the real path of each member and ensure it is within the intended destination directory before extraction. Using tar.extractall with a filtered list is risky; it's safer to iterate through members and extract them individually with proper path validation.
for member in tar.getmembers():
member_path = os.path.join(self._workspace_dir, member.name)
# Resolve the absolute path and ensure it's within the workspace
if os.path.realpath(member_path).startswith(os.path.realpath(self._workspace_dir)):
tar.extract(member, self._workspace_dir)
else:
logger.warning("Skipping potentially unsafe path in tarball: %s", member.name)| async def list_sessions(): | ||
| """List all sessions from BigQuery.""" | ||
| try: | ||
| client = checkpoint_store._get_bq_client() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| except Exception as e: | ||
| return {"sessions": [], "error": str(e)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catching a broad Exception can hide bugs and make debugging difficult. It's better to catch more specific exceptions that you expect to handle (e.g., exceptions from the BigQuery client). Additionally, returning a 200 OK status with an error message in the body for a failed API call is not standard practice. Consider raising an HTTPException with a 5xx status code to provide a more accurate API response.
| except Exception as e: | |
| return {"sessions": [], "error": str(e)} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to list sessions: {e}") |
|
|
||
| async def run_task_with_checkpoints(session_id: str, duration: int, resume: bool = False): | ||
| """Run a long-running task with periodic checkpoints.""" | ||
| import random |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| with open("/tmp/lifecycle.json", "w") as f: | ||
| f.write(lifecycle_config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a hardcoded path like /tmp/lifecycle.json can be problematic in environments where /tmp is not writable or has specific restrictions (e.g., some serverless environments). It's more robust to use Python's tempfile module to create temporary files in a secure and platform-independent manner.
import tempfile
# ...
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as tmp_file:
tmp_file.write(lifecycle_config)
lifecycle_path = tmp_file.name
run_command(
[
"gsutil",
"lifecycle",
"set",
lifecycle_path,
f"gs://{GCS_BUCKET}",
],
check=False,
)
os.remove(lifecycle_path)| active_lease_id=row.active_lease_id, | ||
| lease_expiry=row.lease_expiry, | ||
| ttl_expiry=row.ttl_expiry, | ||
| metadata=row.metadata if isinstance(row.metadata, dict) else (json.loads(row.metadata) if row.metadata else None), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
… agents This PR implements a durable session persistence layer for ADK, enabling cross-process checkpoint-based recovery for long-running agent tasks. ## Key Features - **DurableSessionConfig**: Configuration for durable cross-process checkpointing - **BigQueryCheckpointStore**: Two-phase commit checkpoint storage (BQ metadata + GCS blobs) - **CheckpointableAgentState**: Abstract interface for agents supporting durability - **WorkspaceSnapshotter**: GCS-based workspace directory snapshotting ## Implementation Details - Two-phase commit: GCS blob upload → BigQuery metadata insert - SHA-256 checkpoint integrity verification - Lease-based concurrency control for safe resume - Async-first API design for non-blocking I/O ## Demo A fully functional demo is deployed on Cloud Run showcasing: - Real-time checkpoint visualization - Task failure simulation and recovery - BigQuery metadata queries - Final task output display Demo URL: https://durable-demo-201486563047.us-central1.run.app ## Files Added - src/google/adk/durable/ - Core durable module - contributing/samples/long_running_task/ - Demo agent and UI - tests/unittests/durable/ - Unit tests Co-Authored-By: Claude Opus 4.5 <[email protected]>
99e7726 to
7d946ed
Compare
Addresses the comment: "Session service is the durable session persistence" - Clarifies distinction between SessionService (conversation history) and CheckpointStore (execution state) - Provides three potential approaches (separate store, extend service, event type) - Recommends Option A (separate CheckpointStore) for v1 - Suggests specific updates to design doc sections - Lists action items and open questions for ADK team Co-Authored-By: Claude Opus 4.5 <[email protected]>
Addresses ADK team comment: "ArtifactService is designed for large blobs. Have you checked GcsArtifactService?" Response: - Reviewed GcsArtifactService capabilities and interface - Identified key gaps: two-phase commit, SHA-256 verification, key structure - Compared three approaches: adapt ArtifactService, direct GCS, extend interface - Recommends direct GCS client for v1 due to simpler implementation - Suggests design doc updates for Section 5.3 and Section 15 Co-Authored-By: Claude Opus 4.5 <[email protected]>
Addresses ADK team comment on Section 7.3: "This is not only applicable to resume. Runner.run_async also requires this. Leasing is a general requirement for app developers." Response: - Acknowledges leasing is a general ADK requirement, not durable-specific - Identifies scenarios: run_async, resume, Pub/Sub redelivery, horizontal scaling - Reviews current state: no built-in lease in Runner - Proposes three options: durable-only, Runner-level, SessionService-level - Recommends keeping durable-only for v1, consider SessionService for v2 - Suggests design doc updates for Section 7.3 and Section 18 Co-Authored-By: Claude Opus 4.5 <[email protected]>
Addresses ADK team comment: "Could you elaborate? Agent state is persisted in events which are persisted in session service." Response: - Clarifies what IS preserved: conversation history, tool call records - Clarifies what is NOT preserved: job ledgers, aggregated results, execution plans - Provides concrete example: 50-table PII scan recovery comparison - Distinguishes Session Events (LLM context) vs Checkpoint State (execution recovery) - Identifies when session events alone are sufficient vs when checkpoints add value - Suggests design doc revision to Section 1.2 with clarification table Co-Authored-By: Claude Opus 4.5 <[email protected]>
Adds comprehensive "Enterprise PII Compliance Audit" example showing: - 100-table scan across 5 datasets (~8 hour operation) - Process dies at hour 3:15 with 35 tables done, 2 jobs running - Path A (Events Only): LLM re-deduction, duplicate jobs, ~30 min recovery - Path B (Checkpoint): 5-second deterministic recovery, job reconciliation - Side-by-side comparison table - Cost impact analysis ($75.50 extra cost with events-only) - Five specific capabilities checkpoints enable that events cannot Co-Authored-By: Claude Opus 4.5 <[email protected]>
Summary
This PR implements a durable session persistence layer for ADK, enabling cross-process checkpoint-based recovery for long-running agent tasks. This addresses the "12-minute barrier" problem where agents lose state during long BigQuery jobs or other async operations.
Key Features
Implementation Highlights
@experimentalFiles Added
Core Module (
src/google/adk/durable/)config.py- DurableSessionConfigcheckpointable_state.py- CheckpointableAgentState ABCstores/base_checkpoint_store.py- DurableSessionStore ABCstores/bigquery_checkpoint_store.py- BigQuery + GCS implementationworkspace_snapshotter.py- GCS workspace snapshotsDemo (
contributing/samples/long_running_task/)agent.py- Demo agent with durable configdemo_server.py- FastAPI server with checkpoint APIsdemo_ui.html- Real-time visualization UIlong_running_task_design.md- Detailed design documentTests (
tests/unittests/durable/)Live Demo
A fully functional demo is deployed on Cloud Run:
URL: https://durable-demo-201486563047.us-central1.run.app
The demo showcases:
Infrastructure:
test-project-0728-467323.adk_metadatags://test-project-0728-467323-adk-checkpointsTest plan
Design Document
See
contributing/samples/long_running_task/long_running_task_design.mdfor the full design including:🤖 Generated with Claude Code