Skip to content
Algorand Developer Portal

Batch Handling & Data Mappers

← Back to Examples

This example demonstrates mapper transforms with onBatch and on handler patterns.

  • Define a mapper to transform SubscribedTransaction[] to custom types
  • Compare onBatch (fires once per poll) vs on (fires per transaction)
  • Verify type safety with mapped data
  • LocalNet running (via algokit localnet start)

From the repository’s examples/subscriber directory:

Terminal window
cd examples/subscriber
npx tsx 10-batch-and-mappers.ts

View source on GitHub

10-batch-and-mappers.ts
/**
* Example: Batch Handling & Data Mappers
*
* This example demonstrates mapper transforms with onBatch and on handler patterns.
* - Define a mapper to transform SubscribedTransaction[] to custom types
* - Compare onBatch (fires once per poll) vs on (fires per transaction)
* - Verify type safety with mapped data
*
* Prerequisites:
* - LocalNet running (via `algokit localnet start`)
*/
import { algo, AlgorandClient } from '@algorandfoundation/algokit-utils';
import { AlgorandSubscriber } from '@algorandfoundation/algokit-subscriber';
import type { SubscribedTransaction } from '@algorandfoundation/algokit-subscriber/types/subscription';
import {
printHeader,
printStep,
printInfo,
printSuccess,
printError,
shortenAddress,
formatAlgo,
} from './shared/utils.js';
/** Custom mapped type for payment summary */
interface PaymentSummary {
id: string;
sender: string;
receiver: string;
amountInAlgos: number;
note: string;
}
async function main() {
printHeader('10 — Batch Handling & Data Mappers');
// Step 1: Connect to LocalNet
printStep(1, 'Connect to LocalNet');
const algorand = AlgorandClient.defaultLocalNet();
const status = await algorand.client.algod.status();
printInfo(`Current round: ${status.lastRound.toString()}`);
printSuccess('Connected to LocalNet');
// Step 2: Create and fund accounts
printStep(2, 'Create and fund accounts');
const sender = await algorand.account.fromEnvironment('BATCH_SENDER', algo(100));
const receiver = await algorand.account.fromEnvironment('BATCH_RECEIVER', algo(10));
const senderAddr = sender.addr.toString();
const receiverAddr = receiver.addr.toString();
printInfo(`Sender: ${shortenAddress(senderAddr)}`);
printInfo(`Receiver: ${shortenAddress(receiverAddr)}`);
printSuccess('Accounts created and funded');
// Step 3: Send 5 payment transactions with varying amounts and notes
printStep(3, 'Send 5 payment transactions');
const payments = [
{ amount: algo(1), note: 'payment-1' },
{ amount: algo(2), note: 'payment-2' },
{ amount: algo(3), note: 'payment-3' },
{ amount: algo(5), note: 'payment-4' },
{ amount: algo(8), note: 'payment-5' },
];
let firstRound: bigint | undefined;
for (const p of payments) {
const result = await algorand.send.payment({
sender: sender.addr,
receiver: receiver.addr,
amount: p.amount,
note: p.note,
});
const round = result.confirmation.confirmedRound!;
if (!firstRound) firstRound = round;
printInfo(`Sent ${p.note}: ${formatAlgo(p.amount.microAlgo)} in round ${round}`);
}
printSuccess(`Sent ${payments.length} payments`);
// Step 4: Set up subscriber with a mapper that transforms SubscribedTransaction[] -> PaymentSummary[]
printStep(4, 'Configure subscriber with mapper');
const watermarkBefore = firstRound! - 1n;
let watermark = watermarkBefore;
const mapper = async (txns: SubscribedTransaction[]): Promise<PaymentSummary[]> => {
return txns.map(txn => ({
id: txn.id,
sender: txn.paymentTransaction?.receiver ? (txn.sender ?? senderAddr) : senderAddr,
receiver: txn.paymentTransaction?.receiver ?? receiverAddr,
amountInAlgos: Number(txn.paymentTransaction?.amount ?? 0n) / 1_000_000,
note: txn.note ? Buffer.from(txn.note).toString('utf-8') : '',
}));
};
printInfo(
`Mapper: SubscribedTransaction[] -> PaymentSummary[] { id, sender, receiver, amountInAlgos, note }`,
);
printSuccess('Mapper defined');
// Step 5: Create subscriber with mapper on the filter
printStep(5, 'Create subscriber with onBatch and on handlers');
const batchResults: PaymentSummary[][] = [];
const individualResults: PaymentSummary[] = [];
const subscriber = new AlgorandSubscriber(
{
filters: [
{
name: 'payments',
filter: {
sender: senderAddr,
receiver: receiverAddr,
},
mapper,
},
],
syncBehaviour: 'sync-oldest',
maxRoundsToSync: 100,
watermarkPersistence: {
get: async () => watermark,
set: async (w: bigint) => {
watermark = w;
},
},
},
algorand.client.algod,
);
// Register onBatch handler — receives the full array of mapped items per poll
subscriber.onBatch<PaymentSummary>('payments', batch => {
batchResults.push(batch);
console.log(`\n [onBatch] Received batch of ${batch.length} items`);
});
// Register on handler — receives individual mapped items one at a time
subscriber.on<PaymentSummary>('payments', item => {
individualResults.push(item);
console.log(` [on] Received item: ${item.note}${item.amountInAlgos} ALGO`);
});
printInfo(`onBatch<PaymentSummary>: registered — fires once per poll with full array`);
printInfo(`on<PaymentSummary>: registered — fires once per transaction with individual item`);
// Step 6: Poll once to trigger handlers
printStep(6, 'Poll once — observe onBatch vs on firing');
const result = await subscriber.pollOnce();
printInfo(`Raw matched count: ${result.subscribedTransactions.length.toString()}`);
// Step 7: Verify onBatch behavior
printStep(7, 'Verify onBatch behavior');
printInfo(`onBatch fired: ${batchResults.length} time(s)`);
printInfo(`Batch size: ${batchResults[0]?.length.toString() ?? '0'}`);
if (batchResults.length !== 1) {
throw new Error(`Expected onBatch to fire exactly 1 time, got ${batchResults.length}`);
}
if (batchResults[0].length !== 5) {
throw new Error(`Expected batch size of 5, got ${batchResults[0].length}`);
}
printSuccess('onBatch fired once with all 5 items');
// Step 8: Verify on behavior
printStep(8, 'Verify on behavior');
printInfo(`on fired: ${individualResults.length} time(s)`);
if (individualResults.length !== 5) {
throw new Error(`Expected on to fire 5 times, got ${individualResults.length}`);
}
printSuccess('on fired once per transaction (5 times)');
// Step 9: Show type safety — mapped items are PaymentSummary, not SubscribedTransaction
printStep(9, 'Demonstrate type safety and mapped data');
console.log();
console.log(' Batch items (from onBatch):');
for (const item of batchResults[0]) {
printInfo(
` ${item.note}: ${item.amountInAlgos} ALGO | ${shortenAddress(item.sender)} -> ${shortenAddress(item.receiver)}`,
);
}
console.log();
console.log(' Individual items (from on):');
for (const item of individualResults) {
printInfo(` ${item.note}: ${item.amountInAlgos} ALGO | id: ${item.id.slice(0, 12)}...`);
}
// Step 10: Show the difference between onBatch and on
printStep(10, 'Summary: onBatch vs on');
console.log();
console.log(' ┌─────────────────────────────────────────────────────────┐');
console.log(' │ onBatch<T>(filterName, listener) │');
console.log(' │ - Fires: once per poll │');
console.log(
` │ - Receives: T[] (array of ${batchResults[0].length} PaymentSummary items) │`,
);
console.log(' │ - Use for: bulk inserts, batch processing │');
console.log(' ├─────────────────────────────────────────────────────────┤');
console.log(' │ on<T>(filterName, listener) │');
console.log(
` │ - Fires: once per transaction (${individualResults.length} times) │`,
);
console.log(' │ - Receives: T (single PaymentSummary item) │');
console.log(' │ - Use for: per-item processing, logging │');
console.log(' ├─────────────────────────────────────────────────────────┤');
console.log(' │ mapper on filter config │');
console.log(' │ - Transforms: SubscribedTransaction[] -> T[] │');
console.log(' │ - Applied BEFORE both on and onBatch handlers │');
console.log(' │ - Type parameter <T> ensures type safety │');
console.log(' └─────────────────────────────────────────────────────────┘');
console.log();
printHeader('Example complete');
}
main().catch(err => {
printError(err.message);
process.exit(1);
});