Custom Connectors
Build your own connector when MemorySync does not ship a managed integration for your data source. A custom connector is just code you write that calls the MemorySync API to push content as memories — it can run anywhere: as a cron job, a serverless function, or a service in your infrastructure.
When to build a custom connector
- Internal data sources — your source has no public API or is behind a VPN that MemorySync cannot reach.
- Custom field mapping — you need to transform proprietary fields into specific
metadatashapes before storing them. - Higher sync frequency — you need tighter control over when and how often data is pushed.
- Unsupported provider — the service you use (e.g., Confluence, Asana, your own CMS) does not have a managed integration yet.
- Pre-processing — you want to clean, filter, or enrich content before it becomes a memory.
Architecture
A custom connector is a consumer of the MemorySync API. It follows this pattern:
- 1Your connector authenticates with MemorySync using an API key (set via the
X-API-Keyheader). - 2It fetches data from your external source using whatever client library or API that source provides.
- 3For each item, it calls
POST /memory/add(orclient.add()via the SDK) to store the content as a memory. - 4It persists a cursor so the next run only fetches new or updated items.
Cursor management
To avoid re-importing the same data on every run, maintain a cursor that tracks your position in the external source. This is the same pattern the managed integrations use internally:
- Persist the cursor in your own store — a database row, a file, or an environment variable. Advance it only after the batch commits successfully.
- Use timestamps or IDs — most APIs support
since,updated_after, or cursor tokens. Use whatever the source provides. - Handle failures gracefully — if a batch fails mid-way, do not advance the cursor. The next run will retry from the same position.
Idempotency
To make replays and retries safe, pass a stable Idempotency-Key header with each request. Key it on the native object ID from your source:
import requests
response = requests.post(
"https://api.memorysync.io/memory/add",
headers={
"X-API-Key": api_key,
"Idempotency-Key": f"my-source-{item.id}",
},
json={
"user_id": user_id,
"text": item.content,
"source": "my_tool",
"metadata": {"native_id": item.id},
},
)If you re-send the same idempotency key, MemorySync returns the previously created memory instead of creating a duplicate.
Code skeleton
Here is a minimal connector using the Python SDK. It fetches changes from your source, stores each item as a memory, and advances the cursor:
from memorysync import MemorySyncClient
client = MemorySyncClient(api_key="ms_live_...")
cursor = load_cursor() # your persistence layer
for item in fetch_changes(cursor):
client.add(
user_id=1,
text=item.text,
source="my_tool",
metadata={"native_id": item.id, "source": "my_tool"},
deduplicate=True,
)
save_cursor(cursor) # advance only after successSource tagging
Tag every memory with a source field and include a metadata.source key so you can filter by origin later:
# Store with source tag
client.add(
user_id=1,
text="Meeting notes from Q4 planning",
source="confluence",
metadata={"source": "confluence", "space": "ENGINEERING"},
)
# Later, query only memories from this source
results = client.query(
user_id=1,
query="Q4 planning",
filters={"sources": ["confluence"]},
)Monitoring your connector
- Subscribe to
memory.createdwebhooks — confirm that your connector's writes are being received by MemorySync in real time. - Track error rates — the SDK raises
RateLimitError(429) andValidationError(400/422). Log these and alert on spikes. - Monitor cursor lag — if the cursor timestamp falls too far behind the current time, your connector is not keeping up.
- Use the health endpoint —
GET /health(orclient.health()) returns the API status. Call it from your connector's startup to fail fast if MemorySync is unreachable. - Handle rate limits gracefully — the SDK automatically retries on 429 if the
Retry-Afterheader is ≤ 120 seconds. For longer waits, it raisesRateLimitErrorwith theretry_aftervalue.