Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.cocobase.cc/llms.txt

Use this file to discover all available pages before exploring further.

Database — Advanced Methods

Additional db methods not covered in the introduction.

update_document_fields()

Merge specific fields into a document without overwriting the rest. Safer than update_document() when you only need to change a few keys.
db.update_document_fields(
    collection_name: str,
    document_id: str,
    fields: dict
)
Returns: Updated document dict, or None if not found.
def main():
    doc_id = req.get("doc_id")

    # Only updates 'status' and 'published_at' — other fields untouched
    updated = db.update_document_fields("posts", doc_id, {
        "status": "published",
        "published_at": datetime.utcnow().isoformat(),
    })

    if not updated:
        return {"error": "Not found"}, 404

    return {"post": updated}

count_documents()

Count documents matching optional filters without fetching the data.
db.count_documents(
    collection_name: str,
    **filters
)
Returns: Integer count.
def main():
    # Total posts
    total = db.count_documents("posts")

    # Only published
    published = db.count_documents("posts", status="published")

    # Published in a category
    tech = db.count_documents("posts",
        status="published",
        category_id="cat-tech"
    )

    return {
        "total": total,
        "published": published,
        "tech_published": tech,
    }

search_documents()

Full-text search across a collection. Searches all text fields by default, or specific fields if you supply them.
db.search_documents(
    collection_name: str,
    search_term: str,
    search_fields: list[str] = None,   # None = search all text fields
    limit: int = None
)
Returns: List of matching document dicts (not a paginated result — just a plain list).
def main():
    term = req.get("q", "")
    if not term:
        return {"results": []}

    # Search all text fields
    results = db.search_documents("posts", term, limit=20)

    # Search specific fields only
    results = db.search_documents("posts", term,
        search_fields=["title", "body"],
        limit=20
    )

    return {"results": results, "count": len(results)}

fetch_all()

Run multiple independent queries in one call. Returns a list of results in the same order as the queries.
db.fetch_all(queries: list[dict])
Each query dict must have a type key:
typeEquivalent call
"query"db.query(collection, ...)
"find_one"db.find_one(collection, ...)
"query_users"db.query_users(...)
"find_user"db.find_user(...)
"count"db.count_documents(collection)
For "query" and "find_one", also include "collection": "<name>". All other kwargs are passed as filters. Returns: List where each element is the result of the corresponding query.
def main():
    user_id = req.get("user_id")

    results = db.fetch_all([
        # Index 0 — published posts
        {
            "type": "query",
            "collection": "posts",
            "status": "published",
            "limit": 10,
            "sort": "created_at",
            "order": "desc",
        },
        # Index 1 — current user
        {
            "type": "find_user",
            "id": user_id,
        },
        # Index 2 — total post count
        {
            "type": "count",
            "collection": "posts",
        },
    ])

    posts  = results[0]   # query result (has "data", "total", "has_more")
    user   = results[1]   # single user dict or None
    total  = results[2]   # integer

    return {
        "posts": posts["data"],
        "user": user,
        "total_posts": total,
    }

get_document()

Get a single document by ID.
db.get_document(
    collection_name: str,
    document_id: str,
    populate: list[str] = None
)
Returns: Document dict, or None if not found.
def main():
    post_id = req.get("post_id")

    post = db.get_document("posts", post_id, populate=["author", "category"])
    if not post:
        return {"error": "Not found"}, 404

    return {"post": post}

App User CRUD

get_app_user()

Fetch a single app user by their ID.
db.get_app_user(user_id: str)
Returns: User dict or None.
def main():
    user_id = req.get("user_id")
    user = db.get_app_user(user_id)
    if not user:
        return {"error": "User not found"}, 404
    return {"user": user}

get_app_user_by_email()

Fetch a single app user by their email address.
db.get_app_user_by_email(email: str)
Returns: User dict or None.
def main():
    email = req.payload.get("email")
    user = db.get_app_user_by_email(email)
    if not user:
        return {"error": "User not found"}, 404
    return {"user": user}

create_app_user()

Create a new app user programmatically (without going through the signup endpoint).
db.create_app_user(
    email: str,
    password: str,
    data: dict = None,     # custom fields
    roles: list[str] = None
)
Returns: Created user dict.
def main():
    payload = req.payload

    user = db.create_app_user(
        email=payload.get("email"),
        password=payload.get("password"),
        data={
            "name": payload.get("name"),
            "plan": "free",
        },
        roles=["user"]
    )

    return {"user": user}

update_app_user()

Update an app user’s fields. Pass only the fields you want to change.
db.update_app_user(
    user_id: str,
    email: str = None,          # update email
    password: str = None,       # update password (auto-hashed)
    roles: list[str] = None,    # replace roles array
    data: dict = None,          # merged into existing data
    **extra_fields
)
Returns: Updated user dict.
data is merged into the existing user data — you don’t need to include unchanged fields.
def main():
    user_id = req.user["id"] if req.user else None
    if not user_id:
        return {"error": "Unauthorized"}, 401

    payload = req.payload

    updated = db.update_app_user(user_id,
        data={
            "name": payload.get("name"),
            "avatar_url": payload.get("avatar_url"),
        }
    )

    return {"user": updated}
# Ban a user (from an admin function)
def main():
    if not req.user or "admin" not in req.user.get("roles", []):
        return {"error": "Forbidden"}, 403

    user_id = req.payload.get("user_id")
    reason  = req.payload.get("reason", "Policy violation")

    db.update_app_user(user_id, data={
        "is_banned": True,
        "ban_reason": reason,
    })

    return {"banned": True}

delete_app_user()

Permanently delete an app user.
db.delete_app_user(user_id: str)
Returns: True if deleted, False if not found.
def main():
    if not req.user:
        return {"error": "Unauthorized"}, 401

    success = db.delete_app_user(req.user["id"])
    return {"deleted": success}

count_app_users()

Return the total number of app users in the project.
db.count_app_users()
def main():
    total = db.count_app_users()
    return {"total_users": total}

User Relationships

get_user_relationships()

Get users related to a user — followers, following, friends, teammates, etc.
db.get_user_relationships(
    user_id: str,
    relationship_type: str,       # e.g. "followers", "following", "friends"
    filters: dict = None,
    limit: int = None,
    offset: int = None,
    populate: list[str] = None,
    select: list[str] = None,
)
Returns: {"data": [...], "total": N, "has_more": bool, ...} How it works:
  • If the user has a {type}_ids array in their data (e.g. following_ids), it fetches those users directly.
  • If not, it does a reverse lookup — finds users who have this user in their corresponding array.
def main():
    user_id = req.get("user_id")

    followers = db.get_user_relationships(user_id, "followers", limit=50)
    following = db.get_user_relationships(user_id, "following", limit=50)

    return {
        "followers": followers["data"],
        "following": following["data"],
        "follower_count": followers["total"],
    }

add_user_relationship()

Add a relationship between two users. Updates the {type}_ids array in user data.
db.add_user_relationship(
    user_id: str,
    related_user_id: str,
    relationship_type: str,       # e.g. "following", "friends"
    bidirectional: bool = False,  # also add the reverse relationship
)
Returns: {"success": True, "message": "...", "bidirectional": bool}
def main():
    if not req.user:
        return {"error": "Unauthorized"}, 401

    target_id = req.payload.get("user_id")

    # User follows target
    result = db.add_user_relationship(
        req.user["id"],
        target_id,
        "following"
    )

    # Mutual friend (both sides updated)
    result = db.add_user_relationship(
        req.user["id"],
        target_id,
        "friends",
        bidirectional=True
    )

    return result

remove_user_relationship()

Remove a relationship between two users.
db.remove_user_relationship(
    user_id: str,
    related_user_id: str,
    relationship_type: str,
    bidirectional: bool = False,
)
def main():
    if not req.user:
        return {"error": "Unauthorized"}, 401

    target_id = req.payload.get("user_id")

    result = db.remove_user_relationship(
        req.user["id"],
        target_id,
        "following"
    )

    return result

get_user_collections()

Get documents from a collection that belong to a specific user. Automatically matches on author_id, user_id, creator_id, or owner_id.
db.get_user_collections(
    user_id: str,
    collection_name: str,
    filters: dict = None,
    limit: int = None,
    offset: int = None,
    populate: list[str] = None,
    select: list[str] = None,
    sort: str = None,
    order: str = "desc",
)
Returns: {"data": [...], "total": N, "has_more": bool, ...}
def main():
    user_id = req.user["id"] if req.user else req.get("user_id")

    posts = db.get_user_collections(user_id, "posts",
        filters={"status": "published"},
        populate=["category"],
        sort="created_at",
        order="desc",
        limit=20,
    )

    return {
        "posts": posts["data"],
        "total": posts["total"],
    }

transaction()

Execute a function exactly once with atomic locking and idempotency. Protects against duplicate runs, race conditions, and double-charges.
db.transaction(
    idempotency_key: str,      # unique key for this operation
    fn,                        # zero-arg callable with your logic
    lock_resource: str = None, # optional mutex key (e.g. "wallet:user-123")
    lock_ttl_seconds: int = 30,
    result_ttl_seconds: int = 86400,  # how long to cache the result (default 24h)
)
Returns dict:
KeyTypeMeaning
okboolTrue if fn ran successfully or was replayed
resultdictfn() return value, or {} on failure
idempotentboolTrue if this was a cached replay (fn did not run again)
lockedboolTrue if resource was busy — respond with 429
errorstr|NoneError message, or None on success
retry_inint|NoneSeconds until lock expires (only when locked=True)
Never raises — always returns a plain dict.
def main():
    order_id = req.payload.get("order_id")
    user_id  = req.user["id"] if req.user else None

    outcome = db.transaction(
        idempotency_key=f"charge-order-{order_id}",
        lock_resource=f"wallet:{user_id}",
        fn=lambda: _do_charge(order_id, user_id),
    )

    if outcome["locked"]:
        return {"error": "Another operation is in progress. Try again shortly."}, 429

    if not outcome["ok"]:
        return {"error": outcome["error"]}, 500

    if outcome["idempotent"]:
        return {"message": "Already processed", **outcome["result"]}

    return outcome["result"]


def _do_charge(order_id, user_id):
    # This only runs once per idempotency_key within 24h
    order = db.find_one("orders", id=order_id)
    if not order:
        raise ValueError("Order not found")

    # ... charge logic ...

    db.update_document_fields("orders", order_id, {"status": "paid"})
    return {"charged": True, "order_id": order_id}
When to use it:
  • Payment processing — prevent double charges on retried requests
  • Inventory deduction — prevent overselling under concurrent requests
  • Credit/token spending — protect balances from race conditions
  • Any operation where running twice would cause data corruption

Collection Management

You can also create, update, and delete collections from cloud functions (rarely needed, but useful for dynamic schemas):
# List all collections
collections = db.get_collections()

# Get a single collection
col = db.get_collection("posts")

# Create a collection
new_col = db.create_collection("events", webhook_url="https://...")

# Update collection settings
db.update_collection("events", webhook_url="https://new-url.com")

# Delete a collection and all its documents
db.delete_collection("temp_data")

asyncdb

asyncdb is the async version of db — identical methods, but awaitable. Use it inside async def main() for concurrent queries.
async def main():
    # Run three queries concurrently instead of sequentially
    import asyncio

    posts, users, count = await asyncio.gather(
        asyncdb.query("posts", status="published", limit=10),
        asyncdb.query_users(role="admin"),
        asyncdb.count_documents("posts"),
    )

    return {
        "posts": posts["data"],
        "admins": users["data"],
        "total": count,
    }
All the same methods are available — asyncdb.find_one(), asyncdb.create_document(), asyncdb.update_app_user(), etc.