Documentation Index
Fetch the complete documentation index at: https://docs.cocobase.cc/llms.txt
Use this file to discover all available pages before exploring further.
Database — Advanced Methods
Additional db methods not covered in the introduction.
update_document_fields()
Merge specific fields into a document without overwriting the rest. Safer than update_document() when you only need to change a few keys.
db.update_document_fields(
collection_name: str,
document_id: str,
fields: dict
)
Returns: Updated document dict, or None if not found.
def main():
doc_id = req.get("doc_id")
# Only updates 'status' and 'published_at' — other fields untouched
updated = db.update_document_fields("posts", doc_id, {
"status": "published",
"published_at": datetime.utcnow().isoformat(),
})
if not updated:
return {"error": "Not found"}, 404
return {"post": updated}
count_documents()
Count documents matching optional filters without fetching the data.
db.count_documents(
collection_name: str,
**filters
)
Returns: Integer count.
def main():
# Total posts
total = db.count_documents("posts")
# Only published
published = db.count_documents("posts", status="published")
# Published in a category
tech = db.count_documents("posts",
status="published",
category_id="cat-tech"
)
return {
"total": total,
"published": published,
"tech_published": tech,
}
search_documents()
Full-text search across a collection. Searches all text fields by default, or specific fields if you supply them.
db.search_documents(
collection_name: str,
search_term: str,
search_fields: list[str] = None, # None = search all text fields
limit: int = None
)
Returns: List of matching document dicts (not a paginated result — just a plain list).
def main():
term = req.get("q", "")
if not term:
return {"results": []}
# Search all text fields
results = db.search_documents("posts", term, limit=20)
# Search specific fields only
results = db.search_documents("posts", term,
search_fields=["title", "body"],
limit=20
)
return {"results": results, "count": len(results)}
fetch_all()
Run multiple independent queries in one call. Returns a list of results in the same order as the queries.
db.fetch_all(queries: list[dict])
Each query dict must have a type key:
type | Equivalent call |
|---|
"query" | db.query(collection, ...) |
"find_one" | db.find_one(collection, ...) |
"query_users" | db.query_users(...) |
"find_user" | db.find_user(...) |
"count" | db.count_documents(collection) |
For "query" and "find_one", also include "collection": "<name>". All other kwargs are passed as filters.
Returns: List where each element is the result of the corresponding query.
def main():
user_id = req.get("user_id")
results = db.fetch_all([
# Index 0 — published posts
{
"type": "query",
"collection": "posts",
"status": "published",
"limit": 10,
"sort": "created_at",
"order": "desc",
},
# Index 1 — current user
{
"type": "find_user",
"id": user_id,
},
# Index 2 — total post count
{
"type": "count",
"collection": "posts",
},
])
posts = results[0] # query result (has "data", "total", "has_more")
user = results[1] # single user dict or None
total = results[2] # integer
return {
"posts": posts["data"],
"user": user,
"total_posts": total,
}
get_document()
Get a single document by ID.
db.get_document(
collection_name: str,
document_id: str,
populate: list[str] = None
)
Returns: Document dict, or None if not found.
def main():
post_id = req.get("post_id")
post = db.get_document("posts", post_id, populate=["author", "category"])
if not post:
return {"error": "Not found"}, 404
return {"post": post}
App User CRUD
get_app_user()
Fetch a single app user by their ID.
db.get_app_user(user_id: str)
Returns: User dict or None.
def main():
user_id = req.get("user_id")
user = db.get_app_user(user_id)
if not user:
return {"error": "User not found"}, 404
return {"user": user}
get_app_user_by_email()
Fetch a single app user by their email address.
db.get_app_user_by_email(email: str)
Returns: User dict or None.
def main():
email = req.payload.get("email")
user = db.get_app_user_by_email(email)
if not user:
return {"error": "User not found"}, 404
return {"user": user}
create_app_user()
Create a new app user programmatically (without going through the signup endpoint).
db.create_app_user(
email: str,
password: str,
data: dict = None, # custom fields
roles: list[str] = None
)
Returns: Created user dict.
def main():
payload = req.payload
user = db.create_app_user(
email=payload.get("email"),
password=payload.get("password"),
data={
"name": payload.get("name"),
"plan": "free",
},
roles=["user"]
)
return {"user": user}
update_app_user()
Update an app user’s fields. Pass only the fields you want to change.
db.update_app_user(
user_id: str,
email: str = None, # update email
password: str = None, # update password (auto-hashed)
roles: list[str] = None, # replace roles array
data: dict = None, # merged into existing data
**extra_fields
)
Returns: Updated user dict.
data is merged into the existing user data — you don’t need to include unchanged fields.
def main():
user_id = req.user["id"] if req.user else None
if not user_id:
return {"error": "Unauthorized"}, 401
payload = req.payload
updated = db.update_app_user(user_id,
data={
"name": payload.get("name"),
"avatar_url": payload.get("avatar_url"),
}
)
return {"user": updated}
# Ban a user (from an admin function)
def main():
if not req.user or "admin" not in req.user.get("roles", []):
return {"error": "Forbidden"}, 403
user_id = req.payload.get("user_id")
reason = req.payload.get("reason", "Policy violation")
db.update_app_user(user_id, data={
"is_banned": True,
"ban_reason": reason,
})
return {"banned": True}
delete_app_user()
Permanently delete an app user.
db.delete_app_user(user_id: str)
Returns: True if deleted, False if not found.
def main():
if not req.user:
return {"error": "Unauthorized"}, 401
success = db.delete_app_user(req.user["id"])
return {"deleted": success}
count_app_users()
Return the total number of app users in the project.
def main():
total = db.count_app_users()
return {"total_users": total}
User Relationships
get_user_relationships()
Get users related to a user — followers, following, friends, teammates, etc.
db.get_user_relationships(
user_id: str,
relationship_type: str, # e.g. "followers", "following", "friends"
filters: dict = None,
limit: int = None,
offset: int = None,
populate: list[str] = None,
select: list[str] = None,
)
Returns: {"data": [...], "total": N, "has_more": bool, ...}
How it works:
- If the user has a
{type}_ids array in their data (e.g. following_ids), it fetches those users directly.
- If not, it does a reverse lookup — finds users who have this user in their corresponding array.
def main():
user_id = req.get("user_id")
followers = db.get_user_relationships(user_id, "followers", limit=50)
following = db.get_user_relationships(user_id, "following", limit=50)
return {
"followers": followers["data"],
"following": following["data"],
"follower_count": followers["total"],
}
add_user_relationship()
Add a relationship between two users. Updates the {type}_ids array in user data.
db.add_user_relationship(
user_id: str,
related_user_id: str,
relationship_type: str, # e.g. "following", "friends"
bidirectional: bool = False, # also add the reverse relationship
)
Returns: {"success": True, "message": "...", "bidirectional": bool}
def main():
if not req.user:
return {"error": "Unauthorized"}, 401
target_id = req.payload.get("user_id")
# User follows target
result = db.add_user_relationship(
req.user["id"],
target_id,
"following"
)
# Mutual friend (both sides updated)
result = db.add_user_relationship(
req.user["id"],
target_id,
"friends",
bidirectional=True
)
return result
remove_user_relationship()
Remove a relationship between two users.
db.remove_user_relationship(
user_id: str,
related_user_id: str,
relationship_type: str,
bidirectional: bool = False,
)
def main():
if not req.user:
return {"error": "Unauthorized"}, 401
target_id = req.payload.get("user_id")
result = db.remove_user_relationship(
req.user["id"],
target_id,
"following"
)
return result
get_user_collections()
Get documents from a collection that belong to a specific user. Automatically matches on author_id, user_id, creator_id, or owner_id.
db.get_user_collections(
user_id: str,
collection_name: str,
filters: dict = None,
limit: int = None,
offset: int = None,
populate: list[str] = None,
select: list[str] = None,
sort: str = None,
order: str = "desc",
)
Returns: {"data": [...], "total": N, "has_more": bool, ...}
def main():
user_id = req.user["id"] if req.user else req.get("user_id")
posts = db.get_user_collections(user_id, "posts",
filters={"status": "published"},
populate=["category"],
sort="created_at",
order="desc",
limit=20,
)
return {
"posts": posts["data"],
"total": posts["total"],
}
transaction()
Execute a function exactly once with atomic locking and idempotency. Protects against duplicate runs, race conditions, and double-charges.
db.transaction(
idempotency_key: str, # unique key for this operation
fn, # zero-arg callable with your logic
lock_resource: str = None, # optional mutex key (e.g. "wallet:user-123")
lock_ttl_seconds: int = 30,
result_ttl_seconds: int = 86400, # how long to cache the result (default 24h)
)
Returns dict:
| Key | Type | Meaning |
|---|
ok | bool | True if fn ran successfully or was replayed |
result | dict | fn() return value, or {} on failure |
idempotent | bool | True if this was a cached replay (fn did not run again) |
locked | bool | True if resource was busy — respond with 429 |
error | str|None | Error message, or None on success |
retry_in | int|None | Seconds until lock expires (only when locked=True) |
Never raises — always returns a plain dict.
def main():
order_id = req.payload.get("order_id")
user_id = req.user["id"] if req.user else None
outcome = db.transaction(
idempotency_key=f"charge-order-{order_id}",
lock_resource=f"wallet:{user_id}",
fn=lambda: _do_charge(order_id, user_id),
)
if outcome["locked"]:
return {"error": "Another operation is in progress. Try again shortly."}, 429
if not outcome["ok"]:
return {"error": outcome["error"]}, 500
if outcome["idempotent"]:
return {"message": "Already processed", **outcome["result"]}
return outcome["result"]
def _do_charge(order_id, user_id):
# This only runs once per idempotency_key within 24h
order = db.find_one("orders", id=order_id)
if not order:
raise ValueError("Order not found")
# ... charge logic ...
db.update_document_fields("orders", order_id, {"status": "paid"})
return {"charged": True, "order_id": order_id}
When to use it:
- Payment processing — prevent double charges on retried requests
- Inventory deduction — prevent overselling under concurrent requests
- Credit/token spending — protect balances from race conditions
- Any operation where running twice would cause data corruption
Collection Management
You can also create, update, and delete collections from cloud functions (rarely needed, but useful for dynamic schemas):
# List all collections
collections = db.get_collections()
# Get a single collection
col = db.get_collection("posts")
# Create a collection
new_col = db.create_collection("events", webhook_url="https://...")
# Update collection settings
db.update_collection("events", webhook_url="https://new-url.com")
# Delete a collection and all its documents
db.delete_collection("temp_data")
asyncdb
asyncdb is the async version of db — identical methods, but awaitable. Use it inside async def main() for concurrent queries.
async def main():
# Run three queries concurrently instead of sequentially
import asyncio
posts, users, count = await asyncio.gather(
asyncdb.query("posts", status="published", limit=10),
asyncdb.query_users(role="admin"),
asyncdb.count_documents("posts"),
)
return {
"posts": posts["data"],
"admins": users["data"],
"total": count,
}
All the same methods are available — asyncdb.find_one(), asyncdb.create_document(), asyncdb.update_app_user(), etc.