Batch Handling & Data Mappers
Description
Section titled “Description”This example demonstrates mapper transforms with onBatch and on handler patterns.
- Define a mapper to transform SubscribedTransaction[] to custom types
- Compare onBatch (fires once per poll) vs on (fires per transaction)
- Verify type safety with mapped data
Prerequisites
Section titled “Prerequisites”- LocalNet running (via
algokit localnet start)
Run This Example
Section titled “Run This Example”From the repository’s examples/subscriber directory:
cd examples/subscribernpx tsx 10-batch-and-mappers.ts/** * Example: Batch Handling & Data Mappers * * This example demonstrates mapper transforms with onBatch and on handler patterns. * - Define a mapper to transform SubscribedTransaction[] to custom types * - Compare onBatch (fires once per poll) vs on (fires per transaction) * - Verify type safety with mapped data * * Prerequisites: * - LocalNet running (via `algokit localnet start`) */import { algo, AlgorandClient } from '@algorandfoundation/algokit-utils';import { AlgorandSubscriber } from '@algorandfoundation/algokit-subscriber';import type { SubscribedTransaction } from '@algorandfoundation/algokit-subscriber/types/subscription';import { printHeader, printStep, printInfo, printSuccess, printError, shortenAddress, formatAlgo,} from './shared/utils.js';
/** Custom mapped type for payment summary */interface PaymentSummary { id: string; sender: string; receiver: string; amountInAlgos: number; note: string;}
async function main() { printHeader('10 — Batch Handling & Data Mappers');
// Step 1: Connect to LocalNet printStep(1, 'Connect to LocalNet'); const algorand = AlgorandClient.defaultLocalNet(); const status = await algorand.client.algod.status(); printInfo(`Current round: ${status.lastRound.toString()}`); printSuccess('Connected to LocalNet');
// Step 2: Create and fund accounts printStep(2, 'Create and fund accounts'); const sender = await algorand.account.fromEnvironment('BATCH_SENDER', algo(100)); const receiver = await algorand.account.fromEnvironment('BATCH_RECEIVER', algo(10)); const senderAddr = sender.addr.toString(); const receiverAddr = receiver.addr.toString(); printInfo(`Sender: ${shortenAddress(senderAddr)}`); printInfo(`Receiver: ${shortenAddress(receiverAddr)}`); printSuccess('Accounts created and funded');
// Step 3: Send 5 payment transactions with varying amounts and notes printStep(3, 'Send 5 payment transactions'); const payments = [ { amount: algo(1), note: 'payment-1' }, { amount: algo(2), note: 'payment-2' }, { amount: algo(3), note: 'payment-3' }, { amount: algo(5), note: 'payment-4' }, { amount: algo(8), note: 'payment-5' }, ];
let firstRound: bigint | undefined; for (const p of payments) { const result = await algorand.send.payment({ sender: sender.addr, receiver: receiver.addr, amount: p.amount, note: p.note, }); const round = result.confirmation.confirmedRound!; if (!firstRound) firstRound = round; printInfo(`Sent ${p.note}: ${formatAlgo(p.amount.microAlgo)} in round ${round}`); } printSuccess(`Sent ${payments.length} payments`);
// Step 4: Set up subscriber with a mapper that transforms SubscribedTransaction[] -> PaymentSummary[] printStep(4, 'Configure subscriber with mapper'); const watermarkBefore = firstRound! - 1n; let watermark = watermarkBefore;
const mapper = async (txns: SubscribedTransaction[]): Promise<PaymentSummary[]> => { return txns.map(txn => ({ id: txn.id, sender: txn.paymentTransaction?.receiver ? (txn.sender ?? senderAddr) : senderAddr, receiver: txn.paymentTransaction?.receiver ?? receiverAddr, amountInAlgos: Number(txn.paymentTransaction?.amount ?? 0n) / 1_000_000, note: txn.note ? Buffer.from(txn.note).toString('utf-8') : '', })); };
printInfo( `Mapper: SubscribedTransaction[] -> PaymentSummary[] { id, sender, receiver, amountInAlgos, note }`, ); printSuccess('Mapper defined');
// Step 5: Create subscriber with mapper on the filter printStep(5, 'Create subscriber with onBatch and on handlers');
const batchResults: PaymentSummary[][] = []; const individualResults: PaymentSummary[] = [];
const subscriber = new AlgorandSubscriber( { filters: [ { name: 'payments', filter: { sender: senderAddr, receiver: receiverAddr, }, mapper, }, ], syncBehaviour: 'sync-oldest', maxRoundsToSync: 100, watermarkPersistence: { get: async () => watermark, set: async (w: bigint) => { watermark = w; }, }, }, algorand.client.algod, );
// Register onBatch handler — receives the full array of mapped items per poll subscriber.onBatch<PaymentSummary>('payments', batch => { batchResults.push(batch); console.log(`\n [onBatch] Received batch of ${batch.length} items`); });
// Register on handler — receives individual mapped items one at a time subscriber.on<PaymentSummary>('payments', item => { individualResults.push(item); console.log(` [on] Received item: ${item.note} — ${item.amountInAlgos} ALGO`); });
printInfo(`onBatch<PaymentSummary>: registered — fires once per poll with full array`); printInfo(`on<PaymentSummary>: registered — fires once per transaction with individual item`);
// Step 6: Poll once to trigger handlers printStep(6, 'Poll once — observe onBatch vs on firing'); const result = await subscriber.pollOnce(); printInfo(`Raw matched count: ${result.subscribedTransactions.length.toString()}`);
// Step 7: Verify onBatch behavior printStep(7, 'Verify onBatch behavior'); printInfo(`onBatch fired: ${batchResults.length} time(s)`); printInfo(`Batch size: ${batchResults[0]?.length.toString() ?? '0'}`);
if (batchResults.length !== 1) { throw new Error(`Expected onBatch to fire exactly 1 time, got ${batchResults.length}`); } if (batchResults[0].length !== 5) { throw new Error(`Expected batch size of 5, got ${batchResults[0].length}`); } printSuccess('onBatch fired once with all 5 items');
// Step 8: Verify on behavior printStep(8, 'Verify on behavior'); printInfo(`on fired: ${individualResults.length} time(s)`);
if (individualResults.length !== 5) { throw new Error(`Expected on to fire 5 times, got ${individualResults.length}`); } printSuccess('on fired once per transaction (5 times)');
// Step 9: Show type safety — mapped items are PaymentSummary, not SubscribedTransaction printStep(9, 'Demonstrate type safety and mapped data'); console.log(); console.log(' Batch items (from onBatch):'); for (const item of batchResults[0]) { printInfo( ` ${item.note}: ${item.amountInAlgos} ALGO | ${shortenAddress(item.sender)} -> ${shortenAddress(item.receiver)}`, ); }
console.log(); console.log(' Individual items (from on):'); for (const item of individualResults) { printInfo(` ${item.note}: ${item.amountInAlgos} ALGO | id: ${item.id.slice(0, 12)}...`); }
// Step 10: Show the difference between onBatch and on printStep(10, 'Summary: onBatch vs on'); console.log(); console.log(' ┌─────────────────────────────────────────────────────────┐'); console.log(' │ onBatch<T>(filterName, listener) │'); console.log(' │ - Fires: once per poll │'); console.log( ` │ - Receives: T[] (array of ${batchResults[0].length} PaymentSummary items) │`, ); console.log(' │ - Use for: bulk inserts, batch processing │'); console.log(' ├─────────────────────────────────────────────────────────┤'); console.log(' │ on<T>(filterName, listener) │'); console.log( ` │ - Fires: once per transaction (${individualResults.length} times) │`, ); console.log(' │ - Receives: T (single PaymentSummary item) │'); console.log(' │ - Use for: per-item processing, logging │'); console.log(' ├─────────────────────────────────────────────────────────┤'); console.log(' │ mapper on filter config │'); console.log(' │ - Transforms: SubscribedTransaction[] -> T[] │'); console.log(' │ - Applied BEFORE both on and onBatch handlers │'); console.log(' │ - Type parameter <T> ensures type safety │'); console.log(' └─────────────────────────────────────────────────────────┘'); console.log();
printHeader('Example complete');}
main().catch(err => { printError(err.message); process.exit(1);});Other examples
Section titled “Other examples”- Basic Poll Once
- Continuous Subscriber
- Payment Filters
- Asset Transfer Subscription
- App Call Subscription
- Multiple Named Filters
- Balance Change Tracking
- ARC-28 Event Subscription
- Inner Transaction Subscription
- Batch Handling & Data Mappers
- Watermark Persistence
- Sync Behaviours
- Custom Filters
- Stateless Subscriptions
- Lifecycle Hooks & Error Handling