fused-integrations

v1.0.0
localskills install hWW3EEDVmw
1 downloads
(1 this week)
Created Jun 15, 2026
arav garg
Skill Content
# Fused Integrations

Once an integration is configured (via the Workbench → Integrations UI or `fused integrations <provider> connect`), use the helpers below inside any UDF. You do **not** need to manage credentials manually — connections and secrets are resolved by the runtime.

Available integrations: `snowflake`, `bigquery`, `gcs`, `s3`, `airtable`, `notion`, `gdrive`, `modal`, `huggingface`, `baseten`, `daytona`, `comfy`, `anthropic`, `openai`, `slack` (experimental).

---

## Testing integration UDFs

Integration UDFs have two failure modes general UDFs don't: missing access grants and execution-context differences between `fused run` and public shared URLs. See the `fused-udfs` skill for general testing guidance — the integration-specific additions are below.

**Start with a connectivity smoke test.** Before building any logic, verify the integration actually connects and returns data:

```python
@fused.udf
def udf():
    # Smoke test: verify connection before writing full logic
    nt = fused.api.notion_connect()
    client = nt.client()
    results = client.search(query="", filter={"value": "page", "property": "object"})
    return {"connected": True, "pages_visible": len(results.get("results", []))}
```

**Also test via the shared URL** if the UDF will be served publicly (e.g. `https://udf.ai/fc_TOKEN/udf_name`). Some integrations — notably Notion — behave differently in that unauthenticated execution context. See the Notion section below for details.

---

## External API keys — scope and fallback patterns

A secret stored as `fused.secrets["my_api_key"]` is just a string — the Fused runtime doesn't know which endpoints it covers. Two things to verify before writing logic that depends on a key:

1. **Test the specific endpoint, not just the key.** Many providers (Google, AWS, Stripe) issue keys with per-API or per-product scopes. A key that authenticates successfully against one API may return `403`, `REQUEST_DENIED`, or `PERMISSION_DENIED` on another, even from the same provider. Test each endpoint explicitly in isolation before assuming the key covers it.

2. **Build a primary + fallback chain for critical paths.** If a UDF must geocode, enrich, or classify data and the primary API is unavailable or uncredentialed, having a fallback keeps the rest of the pipeline alive:

```python
@fused.cache
def enrich(item, _v=1):
    # Try primary API
    result = call_primary_api(item)
    if result is not None:
        return result
    # Fall back to alternative (open API, cached copy, degraded mode)
    return call_fallback_api(item)
```

**Rate-limited APIs:** Any external API that enforces per-second or per-minute limits needs an explicit sleep between uncached calls. Wrap each call in `@fused.cache` so the sleep only fires on the first call per unique input; subsequent calls return instantly from cache:

```python
@fused.cache
def call_rate_limited_api(key, _v=1):
    import time
    time.sleep(1.1)  # stay under 1 req/s
    return make_request(key)
```

Without the cache, every UDF run re-fires every API call, burning rate limit and adding latency proportional to the number of unique inputs.

---

## Google Drive

Google Drive access only works inside **cloud UDFs** — local Python raises "Google Drive is not connected" even if CLI list commands work. Connect via Workbench → Integrations → Google Drive (OAuth browser flow), then grant file/folder access via the Picker UI.

### gdrive:// path format

```
gdrive://<folder_id>/<folder_name>/   # list a folder
gdrive://<file_id>/<filename>         # read a specific file
gdrive://root/                        # all Picker-granted files at root
```

### List files

```python
@fused.udf
def udf():
    return fused.api.list("gdrive://root/")
```

### Read tabular files (CSV, Excel)

```python
@fused.udf
def udf():
    import pandas as pd
    return pd.read_csv("gdrive://<file_id>/<name>.csv")
```

```python
@fused.udf
def udf():
    import pandas as pd
    return pd.read_excel("gdrive://<file_id>/<name>.xlsx")
```

Google-native formats are auto-exported: Sheets → `.xlsx`, Docs → `.docx`, Slides → `.pptx`, Drawings → `.png`.

### Read binary files / return to frontend

Base64-encode the raw bytes and return a data URI in the DataFrame — no `fd://` write needed:

```python
import base64, mimetypes, io, pandas as pd

data = fused.api.get(gdrive_path)   # e.g. "gdrive://<id>/<name>.jpg"
file_name = gdrive_path.split("/")[-1]

# Tabular — return DataFrame directly
if file_name.endswith(".csv"):
    return pd.read_csv(io.BytesIO(data))
if file_name.endswith((".xlsx", ".xls")):
    return pd.read_excel(io.BytesIO(data))

# Binary — return as data URI
mime, _ = mimetypes.guess_type(file_name)
data_uri = f"data:{mime or 'application/octet-stream'};base64,{base64.b64encode(data).decode()}"
return pd.DataFrame([{"file": file_name, "size_kb": round(len(data)/1024, 1), "data_uri": data_uri}])
```

Use `data_uri` in the widget as `<img src>`, `<video src>`, or `<embed src>`. For very large files fall back to `fd://` staging + `fused.api.sign_url()`.

### Upload a file to Google Drive from a URL

```python
@fused.udf
def udf():
    import urllib.request, requests

    url = "https://example.com/data.csv"
    folder_id = "root"

    file_name = url.split("?")[0].rstrip("/").split("/")[-1] or "upload.csv"
    with urllib.request.urlopen(url) as resp:
        data = resp.read()
    api = fused.api.api
    creds = api.AUTHORIZATION.credentials
    r = requests.put(
        f"{api.OPTIONS.base_url}/files/upload",
        params={"path": f"gdrive://{folder_id}/{file_name}"},
        data=data,
        headers={
            "Authorization": f"Bearer {creds.access_token}",
            "Content-Type": "application/octet-stream",
        },
    )
    r.raise_for_status()
```

A 401 response means credentials belong to a different user — make a copy of the canvas.

### Parse fused.api.list() results

```python
items = fused.api.list("gdrive://root/")
for item in items:
    stripped = item.replace("gdrive://", "").rstrip("/")
    item_id, item_name = stripped.split("/", 1)
    is_dir = item.endswith("/")
```

---

## Snowflake

Docs: https://docs.fused.io/workbench/integrations/snowflake

### Simple query

```python
@fused.udf
def udf():
    import fused.api
    return fused.api.snowflake_query(
        "SELECT * FROM my_db.my_schema.my_table LIMIT 10"
    )
```

### Reusable connection (multiple operations)

```python
@fused.udf
def udf():
    import fused.api
    conn = fused.api.snowflake_connect(
        warehouse="COMPUTE_WH",
        database="ANALYTICS",
        schema="PUBLIC",
        role="ANALYST",
    )
    print("Tables:", conn.list_tables("ANALYTICS", "PUBLIC"))
    return conn.query("""
        SELECT region, SUM(amount) AS total
        FROM orders
        WHERE order_date >= '2025-01-01'
        GROUP BY region
        ORDER BY total DESC
    """)
```

### Write a DataFrame back to Snowflake

```python
@fused.udf
def udf():
    import fused.api, pandas as pd
    conn = fused.api.snowflake_connect(
        warehouse="COMPUTE_WH", database="ANALYTICS", schema="PUBLIC"
    )
    df = pd.DataFrame({"id": [1, 2, 3], "value": [10.5, 20.3, 30.1]})
    conn.write(df, "ANALYTICS.PUBLIC.METRICS", mode="overwrite")
    return df
```

### Read from a Snowflake Stage

```python
@fused.udf
def udf():
    import fused.api
    conn = fused.api.snowflake_connect(
        warehouse="COMPUTE_WH", database="RAW_DATA", schema="INGEST"
    )
    files = conn.list_stage_files("@csv_stage", pattern=".*[.]csv")
    if files:
        return conn.read_stage(f"@csv_stage/{files[0].split('/')[-1]}")
```

**Connection methods:** `.query()`, `.list_tables()`, `.write(df, table, mode=)`, `.list_stage_files()`, `.read_stage()`

---

## BigQuery

Docs: https://docs.fused.io/workbench/integrations/bigquery

Credentials are stored as a service-account JSON string in `fused.secrets["gcs_fused"]`.

```python
@fused.udf
def udf():
    import json
    from google.cloud import bigquery
    from google.oauth2 import service_account

    credentials = service_account.Credentials.from_service_account_info(
        json.loads(fused.secrets["gcs_fused"]),
        scopes=["https://www.googleapis.com/auth/cloud-platform"],
    )
    client = bigquery.Client(credentials=credentials, project=credentials.project_id)
    query = """
        SELECT * FROM `bigquery-public-data.new_york.tlc_yellow_trips_2015`
        LIMIT 10
    """
    return client.query(query).to_dataframe()
    # For geospatial results: .to_geodataframe(geography_column="geometry")
```

---

## Google Cloud Storage (GCS)

Docs: https://docs.fused.io/workbench/integrations/gcs

Credentials are stored as a service-account JSON string in `fused.secrets["gcs_fused"]`.

### List files in a bucket

```python
@fused.udf
def udf():
    import os
    from google.cloud import storage

    with open("/tmp/gcs_key.json", "w") as f:
        f.write(fused.secrets["gcs_fused"])
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/tmp/gcs_key.json"

    client = storage.Client()
    bucket = client.bucket("your_bucket_name")
    blobs = bucket.list_blobs(prefix="path/to/your/data")
    print({blob.name for blob in blobs})
```

---

## Amazon S3

Docs: https://docs.fused.io/workbench/integrations/s3

S3 access is granted via IAM role (configured once in the Integrations UI). No credentials are needed inside the UDF.

### List files

```python
@fused.udf
def udf():
    return fused.api.list("s3://<BUCKET_NAME>/")
```

Reading and writing S3 files from UDFs works with standard libraries (`boto3`, `s3fs`, `pandas`) once the role is attached — the runtime inherits the IAM permissions automatically.

---

## Airtable

Docs: https://docs.fused.io/workbench/integrations/airtable

All operations go through `fused.api.airtable_connect()`.

### List bases

```python
@fused.udf()
def udf():
    at = fused.api.airtable_connect()
    bases = at.list_bases()
    for base in bases:
        print(base["id"], base["name"])
```

### Read records

```python
@fused.udf()
def udf():
    import pandas as pd
    at = fused.api.airtable_connect(base_id="appXXXXXXXXXXXXXX")
    records = at.list_records(
        "Tasks",
        view="Grid view",
        filterByFormula="{Status} = 'Done'",
        maxRecords=100,
    )
    rows = [{"id": r["id"], **r["fields"]} for r in records]
    return pd.DataFrame(rows)
```

### Create records

```python
@fused.udf()
def udf():
    at = fused.api.airtable_connect(base_id="appXXXXXXXXXXXXXX")
    created = at.create_records("Tasks", [
        {"fields": {"Name": "Buy groceries", "Status": "Todo"}},
        {"fields": {"Name": "Write docs", "Status": "In Progress"}},
    ])
    for r in created:
        print(r["id"])
```

### Update records

```python
@fused.udf()
def udf():
    at = fused.api.airtable_connect(base_id="appXXXXXXXXXXXXXX")
    at.update_records("Tasks", [
        {"id": "recXXXXXXXXXXXXXX", "fields": {"Status": "Done"}},
    ])
```

### Delete records

```python
@fused.udf()
def udf():
    at = fused.api.airtable_connect(base_id="appXXXXXXXXXXXXXX")
    at.delete_records("Tasks", ["recAAAAAAAAAAAA", "recBBBBBBBBBBBB"])
```

**Connection methods:** `.list_bases()`, `.list_records(table, view=, filterByFormula=, maxRecords=)`, `.create_records(table, rows)`, `.update_records(table, rows)`, `.delete_records(table, ids)`

---

## Notion

Docs: https://docs.fused.io/workbench/integrations/notion

All operations go through `fused.api.notion_connect()`, which returns a thin wrapper. Call `.client()` to get a full `notion-client` SDK instance — `nt` itself does not expose search/create/etc. directly.

> **Access grant required.** If you get `APIResponseError: Could not find page`, the Notion integration hasn't been granted access to that page or database. Fix: Workbench → Integrations → Notion → reconnect and grant access to all relevant pages/databases. This is separate from the OAuth connect step.

> **Notion URL parsing.** URLs copied from Notion often include a query string and block anchor: `https://www.notion.so/workspace/Page-abc123?v=uuid&source=copy_link#blockanchor`. The `#blockanchor` is 32 hex chars and will match a page-ID regex if not stripped first. Always clean the URL before extracting the ID:
> ```python
> clean = url.split("#")[0].split("?")[0].rstrip("/")
> slug = clean.split("/")[-1]
> ```

### Search pages

```python
@fused.udf()
def udf():
    nt = fused.api.notion_connect()
    client = nt.client()
    results = client.search(query="Q4 Planning")
    pages = client.search(query="Meeting", filter={"value": "page", "property": "object"})
```

### Get / update a page

```python
@fused.udf()
def udf():
    nt = fused.api.notion_connect()
    client = nt.client()
    page = client.pages.retrieve(page_id="a1b2c3d4-...")
    client.pages.update(
        page_id="your-page-id",
        properties={"Status": {"status": {"name": "Done"}}},
    )
```

### Query a database

> **⚠ `client.databases.query()` is not available in the Fused Notion client.** Calling it raises `AttributeError: 'DatabasesEndpoint' object has no attribute 'query'`. Use `client.search()` with a page filter instead:

```python
@fused.udf()
def udf():
    nt = fused.api.notion_connect()
    client = nt.client()
    # Search within a specific database by filtering on the database ID via parent
    results = client.search(
        query="",
        filter={"property": "object", "value": "page"},
    )
    for r in results.get("results", []):
        # Check parent to scope to a specific database
        if r.get("parent", {}).get("database_id") == "your-database-id":
            print(r["id"], r["properties"])
```

### Create a page in a database

```python
@fused.udf()
def udf():
    nt = fused.api.notion_connect()
    client = nt.client()
    client.pages.create(
        parent={"database_id": "your-database-id"},
        properties={
            "Name": {"title": [{"text": {"content": "Weekly Report"}}]},
            "Status": {"select": {"name": "Draft"}},
        },
    )
```

### Create a page with formatted bullet-point content

Pass `children` as a list of block dicts. Use `bulleted_list_item` blocks (not a single `paragraph` with `\n`) so content renders as actual bullets. Use `annotations` for bold labels:

```python
@fused.udf()
def udf():
    nt = fused.api.notion_connect()
    client = nt.client()

    def bullet(label, text):
        return {
            "object": "block",
            "type": "bulleted_list_item",
            "bulleted_list_item": {
                "rich_text": [
                    {"type": "text", "text": {"content": label}, "annotations": {"bold": True}},
                    {"type": "text", "text": {"content": " " + text}},
                ]
            },
        }

    client.pages.create(
        parent={"database_id": "your-database-id"},
        properties={
            "Name": {"title": [{"text": {"content": "My ticket"}}]},
        },
        children=[
            bullet("What:", "The widget fails on empty datasets."),
            bullet("Fix:", "Add a null-check before rendering."),
        ],
    )
```

> **Note:** Putting formatted content in a single `paragraph` block with `\n` separators renders `**bold**` as literal asterisks. Always use separate block types.

### Pull all pages into a DataFrame

```python
@fused.udf()
def udf():
    import pandas as pd
    nt = fused.api.notion_connect()
    client = nt.client()
    response = client.search(filter={"value": "page", "property": "object"})
    rows = []
    for p in response["results"]:
        props = p.get("properties", {})
        title_parts = props.get("Name", {}).get("title", [])
        title = "".join(t["plain_text"] for t in title_parts)
        rows.append({"id": p["id"], "title": title, "url": p.get("url"), "last_edited": p.get("last_edited_time")})
    return pd.DataFrame(rows)
```

### Read a JSON code block from a Notion page

A useful pattern for lightweight data storage: write a JSON code block to a dedicated Notion page (e.g. from a daily pipeline), then read it back in a UDF. This avoids needing file storage for simple structured data.

```python
@fused.udf
def udf():
    import json
    nt = fused.api.notion_connect()
    client = nt.client()
    blocks = client.blocks.children.list(block_id="YOUR_PAGE_ID")
    for block in blocks["results"]:
        if block["type"] == "code":
            raw = block["code"]["rich_text"][0]["plain_text"]
            return json.dumps(json.loads(raw))  # validate + return
    return json.dumps({})
```

To write the code block from a pipeline (e.g. a Claude agent), use `notion-update-page` with `replace_content` and format the JSON inside a fenced ` ```json ``` ` block.

### ⚠ notion_connect() fails in shared URL execution environment

`fused.api.notion_connect()` works correctly when running via `fused run` (local or remote authenticated execution), but **can fail with HTTP 422** when the UDF is called via a public shared URL endpoint (e.g. `https://udf.ai/fc_TOKEN/udf_name`). The Notion OAuth token is not available in that unauthenticated execution context.

**Fix:** wrap in try/except and fall back to an alternative data source (e.g. Fused file storage):

```python
@fused.udf
def udf():
    import json

    # Try Notion first (works in authenticated context)
    try:
        nt = fused.api.notion_connect()
        client = nt.client()
        blocks = client.blocks.children.list(block_id="YOUR_PAGE_ID")
        for block in blocks["results"]:
            if block["type"] == "code":
                raw = block["code"]["rich_text"][0]["plain_text"]
                data = json.loads(raw)
                if data:
                    return json.dumps(data)
    except Exception:
        pass

    # Fall back to Fused file storage (works in all contexts)
    try:
        import fsspec
        with fsspec.open("s3://fused-users/fused/YOUR_USERNAME/data/latest.json", "r") as f:
            return json.dumps(json.load(f))
    except Exception:
        pass

    return json.dumps({})
```

---

## Modal

Docs: https://docs.fused.io/workbench/integrations/modal

Run Modal apps, functions, and serverless GPU workloads. Set `MODAL_TOKEN_ID` and `MODAL_TOKEN_SECRET` in **Settings > Integrations & Secrets**. `fused.api.modal_connect()` returns an authenticated `modal.Client` and configures the SDK for the current process — subsequent `modal.Function.from_name(...).remote(...)` calls authenticate automatically.

### Invoke a deployed function

```python
@fused.udf()
def udf(prompt: str = "A high-resolution satellite image of a coastline"):
    import modal
    fused.api.modal_connect()
    generate = modal.Function.from_name("image-gen", "generate")
    return generate.remote(prompt)
```

### Fan out with `.map()`

```python
@fused.udf()
def udf():
    import modal, pandas as pd
    fused.api.modal_connect()
    embed = modal.Function.from_name("embeddings", "embed")
    texts = ["hello", "world", "fused", "modal"]
    vectors = list(embed.map(texts))
    return pd.DataFrame({"text": texts, "embedding": vectors})
```

### Class-based service

```python
fused.api.modal_connect()
Model = modal.Cls.from_name("llm-app", "Llama")
model = Model()
return model.complete.remote("Explain GeoParquet in one paragraph.")
```

### Async (spawn + poll)

```python
call = train.spawn(epochs=10)
# Later UDF: modal.FunctionCall.from_id(call.object_id).get()
```

---

## Hugging Face

Docs: https://docs.fused.io/workbench/integrations/huggingface

Token stored as `HUGGINGFACE_API_KEY`. Two helpers, both pre-authenticated:

- `fused.api.huggingface_connect()` → `HfApi` (repos, models, datasets, files, Spaces)
- `fused.api.huggingface_inference()` → `InferenceClient` (hosted chat, embeddings, text-to-image, ASR…)

### Inspect / list models

```python
@fused.udf()
def udf():
    import pandas as pd
    hf = fused.api.huggingface_connect()
    models = hf.list_models(author="meta-llama", limit=20)
    return pd.DataFrame([{"id": m.id, "downloads": m.downloads, "likes": m.likes} for m in models])
```

### Download a file from a repo

```python
from huggingface_hub import hf_hub_download
fused.api.huggingface_connect()  # configures token
path = hf_hub_download(repo_id="meta-llama/Llama-3.2-1B-Instruct", filename="config.json")
```

### Load a dataset

```python
from datasets import load_dataset
fused.api.huggingface_connect()
ds = load_dataset("squad", split="validation[:100]")
return ds.to_pandas()
```

### Chat completion (hosted inference)

```python
client = fused.api.huggingface_inference()
reply = client.chat_completion(
    model="meta-llama/Llama-3.2-3B-Instruct",
    messages=[{"role": "user", "content": question}],
    max_tokens=256,
)
return reply.choices[0].message.content
```

### Embeddings

```python
client = fused.api.huggingface_inference()
vec = client.feature_extraction("coastal erosion", model="sentence-transformers/all-MiniLM-L6-v2")
```

### Text-to-image

```python
client = fused.api.huggingface_inference()
image = client.text_to_image(prompt, model="black-forest-labs/FLUX.1-schnell")  # PIL.Image
```

### Push to a repo

```python
hf = fused.api.huggingface_connect()
hf.upload_file(
    path_or_fileobj="results.parquet",
    path_in_repo="results.parquet",
    repo_id="my-org/my-dataset",
    repo_type="dataset",
)
```

---

## Baseten

Docs: https://docs.fused.io/workbench/integrations/baseten

API key stored as `BASETEN_API_KEY`. No connect helper — Baseten exposes an **OpenAI-compatible** endpoint at `https://inference.baseten.co/v1`, so use the `openai` SDK (pre-installed in the runtime).

### Chat completion

```python
@fused.udf()
def udf(prompt: str = "Summarize what GeoParquet is in one sentence."):
    from openai import OpenAI
    client = OpenAI(
        api_key=fused.secrets["BASETEN_API_KEY"],
        base_url="https://inference.baseten.co/v1",
    )
    response = client.chat.completions.create(
        model="deepseek-ai/DeepSeek-V3.1",
        messages=[{"role": "user", "content": prompt}],
        max_tokens=256,
    )
    return response.choices[0].message.content
```

### Streaming

```python
stream = client.chat.completions.create(
    model="deepseek-ai/DeepSeek-V3.1",
    messages=[{"role": "user", "content": "Write a haiku about S3."}],
    stream=True,
)
return "".join(chunk.choices[0].delta.content or "" for chunk in stream)
```

### Dedicated model deployment

For models deployed to your own Baseten workspace, call the model-specific endpoint with an `Api-Key` header:

```python
import requests
api_key = fused.secrets["BASETEN_API_KEY"]
resp = requests.post(
    f"https://model-{model_id}.api.baseten.co/production/predict",
    headers={"Authorization": f"Api-Key {api_key}"},
    json={"prompt": "Hello, world!"},
    timeout=60,
)
resp.raise_for_status()
return resp.json()
```

---

## Daytona

Docs: https://docs.fused.io/workbench/integrations/daytona

Run untrusted code, manage files, and clone repositories in cloud sandboxes. API key stored as `DAYTONA_API_KEY`. `fused.api.daytona_connect()` returns an authenticated `Daytona` client.

> **Always clean up sandboxes.** Wrap usage in `try/finally` with `daytona.delete(sandbox)` — orphaned sandboxes keep running and accrue cost.

### Create a sandbox and run code

```python
@fused.udf()
def udf():
    daytona = fused.api.daytona_connect()
    sandbox = daytona.create()
    try:
        response = sandbox.process.code_run('import math; print(f"{math.pi:.10f}")')
        return response.result
    finally:
        daytona.delete(sandbox)
```

### Stateful execution (variables persist)

```python
sandbox.code_interpreter.run_code("data = [1, 2, 3, 4, 5]")
result = sandbox.code_interpreter.run_code("print(sum(data))")  # 15
```

### Upload / download files

```python
sandbox.fs.upload_file("/home/daytona/input.txt", b"Hello, world!")
content = sandbox.fs.download_file("/home/daytona/input.txt")
```

### Clone a Git repo

```python
sandbox.git.clone(url="https://github.com/fusedio/udfs", path="/home/daytona/udfs")
```

### Custom configuration

```python
from daytona import CreateSandboxFromSnapshotParams
params = CreateSandboxFromSnapshotParams(
    language="python",
    env_vars={"DEBUG": "true"},
    auto_stop_interval=0,
)
sandbox = daytona.create(params, timeout=40)
```

**Client methods:** `.create(params=, timeout=)`, `.get(sandbox_id)`, `.list()`, `.delete(sandbox)`
**Sandbox surfaces:** `.process.code_run(code)`, `.code_interpreter.run_code(code)`, `.fs.upload_file/.download_file`, `.git.clone(url=, path=)`

---

## ComfyOrg (Comfy Cloud)

Docs: https://docs.fused.io/workbench/integrations/comfy

Run [ComfyUI](https://www.comfy.org) workflows on Comfy Cloud. API key stored as `COMFY_API_KEY`. There is **no connect helper** — call the REST API at `https://cloud.comfy.org` directly.

Prerequisite: export your workflow as JSON via **Graph → Export (API)** in ComfyUI and upload it somewhere Fused can read (typically S3).

The flow is: load workflow → patch input nodes → `POST /api/prompt` → poll `/api/job/{id}/status` → download from `/api/view`.

```python
@fused.udf()
def udf(prompt: str = "A high-resolution satellite image of a coastline"):
    import io, time, requests, numpy as np
    from PIL import Image

    BASE_URL = "https://cloud.comfy.org"
    api_key = fused.secrets["COMFY_API_KEY"]
    headers = {"X-API-Key": api_key}

    # 1. Load workflow JSON from S3
    workflow = requests.get(fused.api.sign_url("s3://your-bucket/workflow.json")).json()

    # 2. Patch input nodes — node IDs are top-level keys in the exported JSON
    workflow["104:90"]["inputs"]["text"] = prompt
    workflow["104:92"]["inputs"]["seed"] = 42

    # 3. Submit
    prompt_id = requests.post(
        f"{BASE_URL}/api/prompt",
        headers={**headers, "Content-Type": "application/json"},
        json={"prompt": workflow, "extra_data": {"api_key_comfy_org": api_key}},
    ).json()["prompt_id"]

    # 4. Poll
    history = {}
    for _ in range(120):
        job = requests.get(f"{BASE_URL}/api/job/{prompt_id}/status", headers=headers).json()
        if job.get("status") in ("failed", "cancelled", "error"):
            raise RuntimeError(f"Comfy job failed: {job.get('error_message')}")
        if job.get("status") in ("success", "completed"):
            history = requests.get(f"{BASE_URL}/api/history_v2/{prompt_id}", headers=headers).json().get(prompt_id, {})
            break
        time.sleep(5)

    # 5. Download first output image
    for node_outputs in history.get("outputs", {}).values():
        for file_info in node_outputs.get("images", []):
            data = requests.get(
                f"{BASE_URL}/api/view",
                headers=headers,
                params={"filename": file_info["filename"], "type": file_info.get("type", "output")},
            ).content
            return np.array(Image.open(io.BytesIO(data)))
```

> **Node IDs are workflow-specific.** Keys like `"104:90"` come from one particular exported workflow — open your own JSON and look up the actual top-level keys for the nodes you want to patch.

### Upload an input image

For image-to-video or style-transfer workflows, upload the input first and reference its returned filename:

```python
resp = requests.post(
    f"{BASE_URL}/api/upload/image",
    headers={"X-API-Key": fused.secrets["COMFY_API_KEY"]},
    files={"image": (fname, img_bytes, "image/png")},
)
uploaded_name = resp.json()["name"]
workflow["3"]["inputs"]["image"] = uploaded_name
```

### Cache the slow job, wrap with a thin UDF

Generation is expensive — split the work: a `@fused.cache` helper that submits/polls/uploads to S3, and a thin `@fused.udf(engine="small")` wrapper that returns a `fused.api.sign_url(...)`. `engine="small"` runs as a batch job so you escape the 120-second realtime UDF limit (often necessary for video).

---

## Anthropic

Docs: https://docs.anthropic.com/en/api/client-sdks/python

API key stored as `ANTHROPIC_API_KEY`. `fused.api.anthropic_connect()` returns an authenticated `anthropic.Anthropic` client — the full SDK surface is available (`client.messages`, `client.models`, `client.beta`).

### Chat completion

```python
@fused.udf()
def udf(question: str = "Explain map projections in three sentences."):
    client = fused.api.anthropic_connect()
    message = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": question}],
    )
    return message.content[0].text
```

### System prompt

```python
client.messages.create(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    system="You are a geospatial data expert. Answer concisely.",
    messages=[{"role": "user", "content": question}],
)
```

### Streaming

```python
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a Haversine function."}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
```

### Tool use

```python
tools = [{
    "name": "get_weather",
    "description": "Get the current weather for a location.",
    "input_schema": {
        "type": "object",
        "properties": {"latitude": {"type": "number"}, "longitude": {"type": "number"}},
        "required": ["latitude", "longitude"],
    },
}]
message = client.messages.create(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    tools=tools,
    messages=[{"role": "user", "content": "What's the weather in San Francisco?"}],
)
for block in message.content:
    if block.type == "tool_use":
        print(block.name, block.input)
```

### Vision (base64 image)

```python
import base64
with open("satellite.png", "rb") as f:
    image_data = base64.standard_b64encode(f.read()).decode()
client.messages.create(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{
        "role": "user",
        "content": [
            {"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": image_data}},
            {"type": "text", "text": "Describe the land use patterns."},
        ],
    }],
)
```

> **Anthropic image format differs from OpenAI.** Anthropic uses `{"type": "image", "source": {"type": "base64", "media_type": ..., "data": ...}}`. OpenAI uses `{"type": "image_url", "image_url": {"url": "data:image/png;base64,..."}}`. Don't swap them.

Token usage is on `message.usage.input_tokens` / `message.usage.output_tokens`.

---

## OpenAI

Docs: https://platform.openai.com/docs/libraries/python

API key stored as `OPENAI_API_KEY`. `fused.api.openai_connect()` returns an authenticated `openai.OpenAI` client — the full SDK surface is available (`client.chat.completions`, `client.embeddings`, `client.images`, `client.models`).

### Chat completion

```python
@fused.udf()
def udf(question: str = "Summarize GeoParquet's key features."):
    client = fused.api.openai_connect()
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": question}],
    )
    return response.choices[0].message.content
```

### System prompt

```python
client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": "You are a geospatial data expert. Answer concisely."},
        {"role": "user", "content": question},
    ],
)
```

### Streaming

```python
stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Write a Haversine function."}],
    stream=True,
)
for chunk in stream:
    content = chunk.choices[0].delta.content
    if content:
        print(content, end="", flush=True)
```

### Function calling

```python
tools = [{
    "type": "function",
    "function": {
        "name": "get_weather",
        "description": "Get the current weather for a location.",
        "parameters": {
            "type": "object",
            "properties": {"latitude": {"type": "number"}, "longitude": {"type": "number"}},
            "required": ["latitude", "longitude"],
        },
    },
}]
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "What's the weather in San Francisco?"}],
    tools=tools,
)
for tc in response.choices[0].message.tool_calls or []:
    print(tc.function.name, tc.function.arguments)  # arguments is a JSON string — json.loads it
```

### Vision (data URI)

```python
import base64
with open("satellite.png", "rb") as f:
    image_data = base64.standard_b64encode(f.read()).decode()
client.chat.completions.create(
    model="gpt-4o",
    messages=[{
        "role": "user",
        "content": [
            {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_data}"}},
            {"type": "text", "text": "Describe the land use patterns."},
        ],
    }],
)
```

### Embeddings

```python
response = client.embeddings.create(
    model="text-embedding-3-small",
    input=["geospatial data processing", "satellite imagery analysis"],
)
vectors = [d.embedding for d in response.data]
```

Token usage is on `response.usage.prompt_tokens` / `response.usage.completion_tokens` (note: different names than Anthropic).

---

## Slack (Experimental)

Docs: https://docs.fused.io/workbench/integrations/slack

Slack integration is a **team-level Slack bot for talking to a Canvas** — it is not a UDF-level helper. Enable it in **Preferences → Slack integration**, then check the **Integrations** panel on your Fused home page:

- **SYNCED** — your team is set up. Go straight to *Adding a new Canvas to Slack* in the docs.
- **STANDBY** — run the one-time team setup (Loom walkthrough in the docs).

There is no `fused.api.slack_connect()` and no `SLACK_*` secret in UDFs — wiring happens at the team/canvas level, not in code.

---

## Secrets (generic key/value)

Any secret stored via `fused secrets set KEY VALUE` (or the Workbench UI) is available as `fused.secrets["KEY"]` inside any UDF. Use this for API keys, tokens, or JSON credential blobs that don't have a first-class connect helper.

```python
@fused.udf
def udf():
    api_key = fused.secrets["my_api_key"]
```

> **`fused.secrets` raises on missing keys.** Accessing a key that doesn't exist raises `SecretKeyNotFound`, not `KeyError` and not `None`. Run `fused secrets list` to verify a key exists before relying on it in a UDF.