Data Sync
Synchronize data between MongoDB collections, transform data with ETL workflows, and maintain data consistency across environments
Data Sync Business
Data sync enables automated synchronization of data between MongoDB collections, databases, and clusters, with support for transformations, filtering, and conflict resolution.
Overview
Use data sync to:
- Environment synchronization - Keep development, staging, and production data in sync
- Data warehousing - Replicate operational data to analytics databases
- Multi-region deployment - Synchronize data across geographic regions
- Legacy migration - Gradually migrate data from old to new schemas
- Backup and redundancy - Maintain real-time copies of critical data
Data sync is available on Business and Enterprise plans. It complements MongoDB's built-in replication with application-level sync capabilities and transformations.
Creating a Sync Job
Navigate to Automation > Data Sync in the workspace.
Click New Sync Job to start the configuration wizard.
Configure the Source:
- Select source connection
- Choose source database and collection
- Set optional filter to limit which documents sync
Configure the Destination:
- Select destination connection (can be the same cluster)
- Choose destination database and collection
- Set conflict resolution strategy
Set the Sync Mode:
- One-time sync (immediate execution)
- Scheduled sync (recurring on a schedule)
- Real-time sync (change stream based)
Optionally add Transformations to modify data during sync.
Click Create Sync Job and activate the sync.

Sync Modes
One-Time Sync
Execute a single bulk copy of data:
{
"name": "Initial Migration to New Schema",
"mode": "one_time",
"source": {
"connection": "prod_cluster",
"database": "legacy_db",
"collection": "users_old"
},
"destination": {
"connection": "prod_cluster",
"database": "app_db",
"collection": "users"
},
"batchSize": 1000,
"parallel": true,
"maxConcurrency": 4
}
Use cases:
- Initial data migration
- One-time data exports
- Database seeding
- Historical data archival
Scheduled Sync
Sync data on a recurring schedule:
{
"name": "Hourly Analytics Sync",
"mode": "scheduled",
"schedule": {
"frequency": "hourly",
"minute": 15,
"timezone": "UTC"
},
"source": {
"connection": "prod_cluster",
"database": "app_db",
"collection": "orders",
"filter": {
"updatedAt": {
"$gte": "{{last_sync_time}}"
}
}
},
"destination": {
"connection": "analytics_cluster",
"database": "analytics",
"collection": "orders_replica"
},
"incrementalSync": true
}
Use cases:
- Regular data warehouse updates
- Periodic backup creation
- Scheduled data aggregation
- Cross-region synchronization
Real-Time Sync Enterprise
Continuously sync using MongoDB change streams:
{
"name": "Real-Time User Sync",
"mode": "realtime",
"source": {
"connection": "prod_cluster",
"database": "app_db",
"collection": "users",
"changeStream": {
"fullDocument": "updateLookup",
"pipeline": [
{
"$match": {
"operationType": { "$in": ["insert", "update", "replace"] }
}
}
]
}
},
"destination": {
"connection": "cache_cluster",
"database": "cache_db",
"collection": "users_cache"
},
"latencyTarget": "1s",
"bufferSize": 100
}
Use cases:
- Cache invalidation
- Real-time search index updates
- Live data replication
- Event-driven synchronization
Real-time sync requires MongoDB 4.0+ with replica sets enabled. Change streams are not available on standalone MongoDB instances.
Data Filtering
Source Filters
Limit which documents are synced:
// Sync only active users
{
"status": "active",
"deletedAt": { "$exists": false }
}
// Sync recent orders only
{
"createdAt": {
"$gte": new Date(Date.now() - 30 * 24 * 60 * 60 * 1000)
}
}
// Sync specific user segments
{
"subscription": { "$in": ["premium", "enterprise"] },
"country": "US"
}
Incremental Sync
Sync only changed documents since last run:
{
"incrementalSync": true,
"incrementalField": "updatedAt",
"trackingCollection": "sync_metadata",
"filter": {
"updatedAt": {
"$gte": "{{last_sync_time}}"
}
}
}
MongoDash automatically tracks the last sync time and uses it for subsequent runs.

Data Transformations
Field Mapping
Rename or restructure fields during sync:
// Transformation script
function transform(document) {
return {
id: document._id.toString(),
email: document.email,
profile: {
firstName: document.first_name,
lastName: document.last_name,
fullName: `${document.first_name} ${document.last_name}`
},
metadata: {
createdAt: document.created_at,
updatedAt: document.updated_at
},
// Computed fields
isActive: document.status === 'active',
accountAge: Math.floor((Date.now() - document.created_at) / (1000 * 60 * 60 * 24))
};
}
Data Enrichment
Add computed fields or lookup related data:
function transform(document) {
// Add computed fields
document.totalSpent = document.orders
? document.orders.reduce((sum, order) => sum + order.amount, 0)
: 0;
// Add derived flags
document.isVIP = document.totalSpent > 10000;
// Normalize data
document.email = document.email.toLowerCase().trim();
// Add timestamps
document.syncedAt = new Date();
return document;
}
Data Sanitization
Remove sensitive information:
function transform(document) {
// Remove sensitive fields
delete document.password;
delete document.ssn;
delete document.creditCard;
// Anonymize PII for non-production environments
if (process.env.ENVIRONMENT !== 'production') {
document.email = `user${document._id}@example.com`;
document.phone = '555-0000';
}
// Redact partial data
if (document.apiKey) {
document.apiKey = document.apiKey.substring(0, 8) + '...';
}
return document;
}
Schema Migration
Transform between different schemas:
// Old schema to new schema
function transform(oldDoc) {
return {
// New structure
_id: oldDoc._id,
user: {
email: oldDoc.email,
name: {
first: oldDoc.firstName,
last: oldDoc.lastName
}
},
settings: {
notifications: oldDoc.notifications || true,
theme: oldDoc.theme || 'light',
language: oldDoc.lang || 'en'
},
// Migrate nested arrays
addresses: (oldDoc.address ? [oldDoc.address] : []).map(addr => ({
street: addr.street,
city: addr.city,
state: addr.state,
zip: addr.zipCode,
type: 'primary'
})),
// Add version tracking
schemaVersion: 2,
migratedAt: new Date()
};
}
Test transformations on a small dataset before running full sync jobs. Use the "Preview Transformation" feature to see output samples.
Conflict Resolution
Resolution Strategies
Handle conflicts when destination documents already exist:
1. Overwrite (Default)
{
"conflictResolution": "overwrite",
"description": "Always replace destination document with source"
}
2. Skip
{
"conflictResolution": "skip",
"description": "Keep destination document, don't update if it exists"
}
3. Merge
{
"conflictResolution": "merge",
"mergeStrategy": "shallow",
"description": "Merge source fields into destination, keeping non-conflicting fields"
}
4. Custom Resolution
function resolveConflict(sourceDoc, destDoc) {
// Use newest document based on timestamp
if (sourceDoc.updatedAt > destDoc.updatedAt) {
return sourceDoc;
}
// Or merge specific fields
return {
...destDoc,
// Update specific fields from source
status: sourceDoc.status,
lastSyncedAt: new Date(),
// Keep destination fields
localMetadata: destDoc.localMetadata
};
}
Duplicate Handling
Configure how to identify duplicates:
{
"duplicateDetection": {
"method": "custom_field",
"field": "email",
"caseInsensitive": true
}
}
{
"duplicateDetection": {
"method": "composite_key",
"fields": ["email", "accountId"]
}
}

ETL Workflows
Multi-Stage Pipelines
Chain multiple sync jobs for complex ETL:
{
"pipeline": "User Analytics ETL",
"stages": [
{
"name": "Extract Users",
"type": "sync",
"source": {
"connection": "prod",
"collection": "users"
},
"destination": {
"connection": "staging",
"collection": "users_raw"
}
},
{
"name": "Transform and Enrich",
"type": "aggregation",
"input": "staging.users_raw",
"pipeline": [
{
"$lookup": {
"from": "orders",
"localField": "_id",
"foreignField": "userId",
"as": "orders"
}
},
{
"$addFields": {
"totalOrders": { "$size": "$orders" },
"totalSpent": { "$sum": "$orders.amount" }
}
}
],
"output": "staging.users_enriched"
},
{
"name": "Load to Warehouse",
"type": "sync",
"source": {
"connection": "staging",
"collection": "users_enriched"
},
"destination": {
"connection": "warehouse",
"collection": "dim_users"
}
}
],
"schedule": "0 2 * * *"
}
Data Aggregation
Sync aggregated data for reporting:
// Aggregate before sync
{
"source": {
"connection": "prod",
"database": "app",
"collection": "events",
"aggregation": [
{
"$match": {
"timestamp": {
"$gte": new Date(Date.now() - 24 * 60 * 60 * 1000)
}
}
},
{
"$group": {
"_id": {
"userId": "$userId",
"eventType": "$type"
},
"count": { "$sum": 1 },
"firstEvent": { "$min": "$timestamp" },
"lastEvent": { "$max": "$timestamp" }
}
},
{
"$project": {
"_id": 0,
"userId": "$_id.userId",
"eventType": "$_id.eventType",
"count": 1,
"firstEvent": 1,
"lastEvent": 1,
"reportDate": new Date()
}
}
]
},
"destination": {
"connection": "analytics",
"collection": "daily_user_events"
}
}
Monitoring and Debugging
Sync Status Dashboard
Track sync job performance:
- Sync progress - Documents synced vs total
- Sync rate - Documents per second
- Error rate - Failed documents and reasons
- Latency - Time lag for real-time syncs
- Data volume - Bytes transferred

Execution Logs
View detailed logs for troubleshooting:
{
"syncJobId": "sync_abc123",
"executionId": "exec_xyz789",
"startTime": "2024-02-24T10:00:00Z",
"endTime": "2024-02-24T10:05:23Z",
"status": "completed_with_errors",
"statistics": {
"documentsRead": 10000,
"documentsSynced": 9987,
"documentsSkipped": 13,
"documentsErrored": 0,
"bytesTransferred": "52.3 MB",
"avgThroughput": "175 docs/sec"
},
"errors": [],
"warnings": [
{
"type": "conflict_resolution",
"count": 13,
"message": "13 documents skipped due to conflict resolution policy",
"sample": {
"_id": "507f1f77bcf86cd799439011",
"reason": "destination_newer"
}
}
]
}
Error Handling
Configure retry and failure behavior:
{
"errorHandling": {
"onDocumentError": "skip_and_log",
"maxErrorsBeforeAbort": 100,
"retryFailedDocuments": true,
"retryAttempts": 3,
"retryBackoff": "exponential",
"deadLetterQueue": {
"enabled": true,
"connection": "prod",
"collection": "sync_failed_documents"
}
}
}
Performance Optimization
Batch Configuration
Optimize throughput with batching:
{
"performance": {
"batchSize": 1000,
"maxConcurrency": 4,
"bulkWriteOrdered": false,
"useCompression": true
}
}
Network Optimization
Reduce network overhead:
{
"network": {
"compression": "snappy",
"connectionPoolSize": 10,
"socketTimeout": "60s",
"useStreamingReads": true
}
}
Resource Limits
Prevent sync jobs from overwhelming systems:
{
"limits": {
"maxDocumentsPerRun": 1000000,
"maxExecutionTime": "3600s",
"maxMemoryUsage": "2GB",
"throttle": {
"enabled": true,
"maxDocsPerSecond": 500
}
}
}
Large sync jobs can impact database performance. Schedule resource-intensive syncs during off-peak hours.
Cross-Environment Sync
Development to Staging
Sync sanitized production data to lower environments:
{
"name": "Prod to Staging (Sanitized)",
"source": {
"connection": "production",
"database": "app",
"collection": "users",
"filter": {
"createdAt": {
"$gte": new Date(Date.now() - 30 * 24 * 60 * 60 * 1000)
}
}
},
"destination": {
"connection": "staging",
"database": "app",
"collection": "users"
},
"transformation": "sanitize_pii",
"schedule": "0 2 * * 0"
}
Multi-Region Sync
Keep data synchronized across geographic regions:
{
"name": "US to EU Region Sync",
"mode": "realtime",
"source": {
"connection": "us_east_cluster",
"database": "app",
"collection": "global_data"
},
"destination": {
"connection": "eu_west_cluster",
"database": "app",
"collection": "global_data"
},
"latencyTarget": "5s",
"bidirectional": false
}
Best Practices
Planning Sync Jobs
- Start with one-time syncs - Test configurations before setting up schedules
- Use incremental sync - Avoid full re-syncs when possible
- Filter at source - Reduce data transfer by filtering early
- Test transformations - Validate output on small datasets first
Performance
- Optimize batch sizes - Larger batches (1000-5000) are typically more efficient
- Use parallel execution - Enable concurrency for large collections
- Schedule wisely - Run during low-traffic periods
- Monitor resource usage - Watch for memory and CPU spikes
Data Integrity
- Validate results - Check document counts and sample records after sync
- Use transactions - Enable for critical data consistency
- Implement checksums - Verify data integrity across sync
- Maintain audit trails - Log all sync operations
Security
- Encrypt in transit - Use TLS for all connections
- Sanitize sensitive data - Remove PII for non-production environments
- Limit access - Use read-only credentials for source connections
- Audit sync operations - Track who created and modified sync jobs
What's Next?
- Scheduled Queries - Automate recurring queries
- Webhooks - Trigger external systems on data changes
- API Integration - Build custom automation workflows