Customization and automation
This chapter elaborates on the various methods to configure, operate, and monitor IBM InfoSphere Change Data Capture (InfoSphere CDC).
This chapter first provides a brief introduction to the different components that can be used to manage the solution and how they are related, including the Management Console and Access Server. You should be aware of how InfoSphere CDC can be controlled by each of the components and where these components are located.
This chapter goes into a higher level of detail about the configuration and monitoring tasks using the command line and Java API. Sample code is provided to make it easier for you to create your own customized scripts or code to fit your environment.
Finally, this chapter describes how to implement user exits as part of the replication process for the various platforms supported by InfoSphere CDC. Again, various examples are provided to help you develop your own development and deployment activities.
9.1 Options for managing InfoSphere CDC
InfoSphere CDC offers various options to configure and monitor the replication environment, each having their own use and applicability, depending on
your requirements.
The following sections describe the various interfaces that are delivered as part of the InfoSphere CDC product, providing a high-level overview of the common uses of each interface and a more comprehensive explanation of the automation options that are provided.
9.2 Management Console GUI
The InfoSphere CDC Management Console GUI is the most commonly used interface for configuration, operation, and monitoring of InfoSphere CDC. It provides various powerful functions to create subscriptions, map tables, start and stop subscriptions, and monitor replication throughput and latency.
All clients use the GUI in the InfoSphere CDC landscape, at least for development of subscriptions and table mappings. For most clients, this interface is the sole interface by which InfoSphere CDC is configured, operated,
and monitored.
This section does not elaborate on all the features of the Management Console GUI in this chapter, but describe some of the common practices in larger
scale environments.
For monitoring throughput and replication latency, the Management Console is the preferred interface in most environments. In larger scale environments with many source and target servers, many subscriptions or substantial numbers of mapped tables, other methods for monitoring subscriptions, such as the InfoSphere CDC Server Command-Line Interface (CLI) or the API, might
be preferred.
In many environments, clients implement a predefined path for the deployment of their business applications. The deployment starts with changes in a development environment, promoting those changes through acceptance testing, and moving those changes into production. The changes that are made to the business application logic or underlying database might also affect the InfoSphere CDC configuration. Promoting applications and database changes through a succession of development stages must also be done for
InfoSphere CDC.
Before doing any promotion to the successive environments, the subscription definitions and table mappings are changed in a development environment until they satisfy the requirements of the design. The InfoSphere CDC Management Console GUI is the preferred method for performing any of the development changes, using an interface that gives you both high-level and detailed insight into the replication configuration. When business application changes are made in the development environment, the InfoSphere CDC configuration in the associated stage is typically reviewed and adjusted as necessary.
When the business application or database changes and changes to the InfoSphere CDC configuration have been tested and approved, they are promoted to the successive development environment by, for example, a versioning solution. The Management Console offers the ability to promote subscriptions and even individual table mappings to the next environment. A wizard that leads you through a number of choices for the new source data store, the new target data store, and even different schemas to be used in the source and target tables. Another method for promoting subscriptions is to export the definitions to an XML file from within the Management Console GUI and import the XML file in the successive environment, again using the GUI.
In larger scale environments, the promotion wizard or export / import function cannot be used to deploy changes into testing and production environments because of restrictions in company policies or because of timing constraints when implementing these changes. This situation is especially true if there is a requirement for lights-out operations and deployments.
9.3 Management Console commands
The Management Console and Access Server provide a CLI for scripted control of subscriptions and the InfoSphere CDC configuration without needing physical or command-line access to the servers that have the source and target InfoSphere CDC engines. The Management Console CLI is included in Access Server installations, which are available on Linux, UNIX, and Windows platforms, and also on Management Console installations, which are available only on Windows platforms.
This section describes the common uses for the Management Console commands and provides an example of how a subscription can be started using this interface. More information about the available commands and scripting possibilities can be found in the API and Commands Reference at the following address:
9.3.1 Common uses for the Management Console commands
Usually, the Management Console CLI is used is when the InfoSphere CDC operators do not have access to the source (production) server, but are responsible for starting and stopping InfoSphere CDC at given times and monitoring its status. By using this interface, InfoSphere CDC can be operated using scripts that are suitable for unattended execution without access to the production servers.
The Management Console CLI offers limited abilities for configuring subscriptions; it allows you to add tables to the catalog, select tables for subscriptions, describe subscriptions, and assign source tables to target tables as long as table structures are alike. It is difficult to create scripts with conditional processing based on the outcome of individual Management Console commands. Therefore, you should use the Java API to control the InfoSphere CDC configuration in an automated manner.
9.3.2 Compiling Management Console command scripts
If you decide to use Management Console command scripts to control InfoSphere CDC, consider first stepping through the consecutive commands using the online.bat (or online for Linux and UNIX) interface. This procedure validates the steps and provides feedback on every step.
For example, you might want to control the start of subscriptions from your client environment by completing the following steps:
1. Connect to the Access Server (similar to starting the Management Console and entering the user and password for the Access Server).
2. Connect to the Source data store.
3. Start the subscription.
4. Shows the subscription status.
5. Disconnect from the Source data store.
6. Disconnect from the Access Server.
The script that accomplishes these steps is shown in Example 9-1.
Example 9-1 Script for starting subscriptions
C:IBMInfoSphere CDC_MC> type MCScriptsstartREDSCRIPT.txt
// Connect to the Access Server
connectServer(admin,passw0rd,172.16.74.130,10101);
// Connect to the Source data store
connectAgent(CDC_Oracle_Redo);
// Start the subscription
startMirror(CDC_Oracle_Redo,REDSCRIPT,continuous);
// Get the subscription status
getSubscriptionStatus(CDC_Oracle_Redo,REDSCRIPT);
// Disconnect from the Source data store
disconnectAgent(CDC_Oracle_Redo);
// Disconnect from the Access Server
disconnectServer();
// Exit the script
exit();
To run the script, you can run the online command (online.bat for Windows) from the Management Console or Access Server directory and redirect the input to the command as follows:
C:IBMInfoSphere CDC_MC> online.bat < MCScriptsstartREDSCRIPT.txt
Copyright (c) 1999-2010 IBM Corporation
Welcome to the IBM InfoSphere Change Data Capture Console (Version 6.5.1505.0)
>
REDSCRIPT Starting
As you can see, the command script has user readable user names and passwords, which is a security exposure. To avoid this exposure, a compilation command is provided to translate the base command script to an executable program, a batch file for Windows, or a shell file for Linux or UNIX. Only store the compiled executable programs on the server that has Management Console or Access Server, and keep the readable scripts in a secure place.
Perform the compilation by running the following command:
C:InfoSphere CDC_MC>online.bat -c MCScriptsstartREDSCRIPT.txt_MCScriptscompiledstartREDSCRIPT.bat
The startREDSCRIPT.bat script has the same commands as the source, but the users and password are encrypted. The compiled executable program is shown in Example 9-2.
Example 9-2 Compiled batch executable for starting subscriptions
C:InfoSphere CDC_MC>online.bat -c MCScriptsstartREDSCRIPT.txt MCScriptscompiledstartREDSCRIPT.bat
The startREDSCRIPT.bat script has the same commands as the source, but the users and password are xxxxx encrypted.
C:InfoSphere CDC_MC>type MCScriptscompiledstartREDSCRIPT.bat
// Connect to the Access Server
connectServer(admin,b2lxbhYQ9RgcFlwEElpKCw==,172.16.74.130,10101);
// Connect to the Source data store
connectAgent(CDC_Oracle_Redo);
// Start the subscription
startMirror(CDC_Oracle_Redo,REDSCRIPT,continuous);
// Get the subscription status
getSubscriptionStatus(CDC_Oracle_Redo,REDSCRIPT);
// Disconnect from the Source data store
disconnectAgent(CDC_Oracle_Redo);
// Disconnect from the Access Server
disconnectServer();
// Exit the script
exit();
Now that you have a compiled script, you can run it by running online -b (online.bat for Windows) as follows:
C:InfoSphere CDC_MC>online -b MCScriptscompiledstartREDSCRIPT.bat
REDSCRIPT Starting
As you can see, in both cases the REDSCRIPT subscription has the status Starting, meaning that it is still in its startup phase. The Management Console commands cannot build in delays or looping. If you want to develop a more sophisticated procedure, a controlling shell script is needed to call individual Management Console scripts and analyze the standard output of these Management Console scripts. Develop advanced operations of your InfoSphere CDC environment using the engine commands or the Java API.
9.4 InfoSphere CDC engine commands (CLI)
In many situations, you want to integrate InfoSphere CDC with the business applications and other processes that run on the servers that have InfoSphere CDC installed. Typical system management operations that must be aligned with InfoSphere CDC are the shutdown and restart of a server or the database on which InfoSphere CDC operates. For example, if the database for which a InfoSphere CDC instance has been started must be shut down, first stop all subscriptions and then shut down the InfoSphere CDC instance to avoid abnormal termination of active instances.
This section describes a few prerequisites for running InfoSphere CDC engine commands and provides some examples about how InfoSphere CDC commands can be scripted and integrated into existing
operational environments.
Commands for starting and stopping subscriptions must always be run against the InfoSphere CDC source engine. Other commands, such as showing the bookmark of a subscription, can only be run for the InfoSphere CDC target engine, as this engine is where bookmarks are kept.
9.4.1 Running commands for the Linux, UNIX, and Windows engine
Any InfoSphere CDC Linux, UNIX, and Windows engine command that is run from the command line must be run from the cdc_home/bin directory or preceded by the cdc_home/bin/ prefix. Having the bin directory in the path is not sufficient for running the command, and the commands do not run.
As an example, assume that you want to list the active subscriptions for a InfoSphere CDC instance. Change to the cdc_home/bin directory and then run dmgetsubscriptionstatus -I cdcdemo -A. See Example 9-2 on page 236 for an example of running a command for the Linux, UNIX, and Windows engine.
Figure 9-1 dmgetsubscriptionstatus
9.4.2 Running CL commands for System i
When running InfoSphere CDC commands for the InfoSphere CDC for DB2 on System i engine, the job that runs the commands must have the InfoSphere CDC product (instance) library as its current library. Not changing the job's current library but putting the InfoSphere CDC product library into the library list causes the command to fail.
If you intend to write CL programs to perform periodic activities, such as stopping the replication before a system IPL and restarting it afterward, the current library must be set in the CL program before any InfoSphere CDC command is run.
Figure 9-2 shows an example of running a command on System i to start the subscription in continuous mirror mode.
Figure 9-2 CDC_command_System_i
9.4.3 Running console commands for IBM System z
InfoSphere CDC engine commands on z/OS are submitted from the operator console and appropriate messages are returned by InfoSphere CDC. The console commands that are provided as part of the InfoSphere CDC installation can be run by running MODIFY (or F) for the InfoSphere CDC started task:
MODIFY CHCPROC,STRTSMIR,SUBSCR=ALL
The The IBM Time Sharing Option (TSO) user must have authority to issue console commands. This authority is controlled by a product such as the IBM Resource Access Control Facility (RACF®). There is a difference between TSO commands and console commands. InfoSphere CDC for IBM System z® accepts console commands only.
9.4.4 Sample scripts
When deploying InfoSphere CDC, a number of implementation topics need to be addressed, such as starting and stopping of the replication, the management of the database archive logs, and monitoring the activity of the
replication processes.
This section provides sample scripts to perform repeating tasks in InfoSphere CDC and enables you to implement these tasks to align with other processes running in your environment.
Starting and stopping the engines and subscriptions
Before a subscription can be started, InfoSphere CDC requires that both the source and target InfoSphere CDC data store be active. Depending on the platform and type of engine, starting and stopping InfoSphere CDC engines can require different commands. This section focuses on platforms and operating systems that require scripting to start processes. Windows is excluded here because a InfoSphere CDC instance is started and stopped with Windows services, and these services are controlled at the operating system
configuration level.
Linux, UNIX, and Windows engine examples for starting and stopping
Example 9-3 shows a script to start a InfoSphere CDC Java instance. All Java InfoSphere CDC engines have the same command interface and starting and stopping techniques. Although the command is started as a daemon process (by running the nohup command), it writes any messages to nohup.out in the current directory. The script monitors for messages arriving in nohup.out and sends them to stdout before returning to the command prompt.
The script only starts the first InfoSphere CDC instance that has been defined. If you must start multiple or all InfoSphere CDC instances, the script must be modified.
Example 9-3 Script to start a InfoSphere CDC Java instance
#!/bin/bash
 
# set -x
 
# CDC instance variables
CDC_AGENT=DB2
CDC_HOME=/opt/InfoSphere CDCdb2
CDC_USER=InfoSphere CDCdb2
CDC_INSTANCE=`ls ${CDC_HOME}/instance | head -1`
 
# Start CDC instance
echo "Starting CDC ${CDC_AGENT} instance ${CDC_INSTANCE} for user ${CDC_USER} ..."
rm ${CDC_HOME}/bin/nohup.out 2> /dev/null
su - ${CDC_USER} -c "cd ${CDC_HOME}/bin;nohup ./dmts64 -I ${CDC_INSTANCE} &" > ${CDC_HOME}/bin/nohup.out
 
# Wait for CDC instance to be started or error to be issued (max 60 seconds)
for i in {1..60}
do
if [ -e ${CDC_HOME}/bin/nohup.out ];
then
nLines=$(wc -l ${CDC_HOME}/bin/nohup.out | awk '{print $1}')
if (( ${nLines} != 0 ))
then
break
fi
fi
sleep 1
done
 
# Determine if instance was started correctly
nLines=$(grep -c "IBM InfoSphere Change Data Capture is running." ${CDC_HOME}/bin/nohup.out)
 
if (( ${nLines} != 0 ))
then
# Do post-instance action such as deselecting the staging store or setting system parameters
echo post-start > /dev/null
fi
 
cat ${CDC_HOME}/bin/nohup.out
 
echo
echo "Press Enter to continue ..."
read -t 5
 
Using a similar method, the CDC engine can be stopped.
 
#!/bin/bash
 
# CDC instance variables
CDC_AGENT=DB2
CDC_HOME=/opt/InfoSphere CDCdb2
CDC_USER=InfoSphere CDCdb2
CDC_INSTANCE=`ls ${CDC_HOME}/instance | head -1`
 
# Stop CDC instance
echo "Stopping CDC ${CDC_AGENT} instance ${CDC_INSTANCE} ..."
su - ${CDC_USER} -c "${CDC_HOME}/bin/dmshutdown -I ${CDC_INSTANCE}"
echo
echo "Press Enter to continue ..."
read -t 5
System i examples for starting and stopping
On most System i servers, InfoSphere CDC is started in the system's startup program, which is identified by the system value QSTRUPPGM.
Example 9-4 shows a sample CL program that could be called from the server's startup program. The program starts the InfoSphere CDC instance (IBMCDC subsystem), waits a few seconds to allow the InfoSphere CDC TCP/IP listener job to start, and then starts all the subscriptions replicating from the
current system.
Example 9-4 CL program to start InfoSphere CDC
*************** Beginning of data *************************************
PGM
/* Start the InfoSphere CDC subsystem */
STRSBS SBSD(IBMCDC/IBMCDC)
MONMSG MSGID(CPF1010)
/* Wait for the TCP listener to come up */
DLYJOB DLY(20)
/* Start subscriptions */
CHGCURLIB CURLIB(IBMCDC)
STRDTAMIR TARGET(*ALL)
ENDPGM
****************** End of data **************************************
If you choose to include the startup of InfoSphere CDC in the server startup program, ensure that the system's TCP/IP service has been started before you start the InfoSphere CDC subsystem. The listener fails to start if TCP/IP is not yet active and you are not be able to connect to the InfoSphere CDC instance using the Management Console or start any replication processes that target the system. A common practice is to place the startup of InfoSphere CDC at the end of the startup program or to build in a delay or check for TCP/IP activity using the QtocRtvTCPA API.
Similarly, a CL program could shut down InfoSphere CDC before the system must be powered down (Example 9-5).
Example 9-5 CL program to shut down InfoSphere CDC
*************** Beginning of data *************************************
PGM
/* End all subscription target processes */
CHGCURLIB CURLIB(IBMCDC)
DMENDPROC PUBID(*ALL)
/* End all subscriptions (*CNTRLD at first) */
CHGCURLIB CURLIB(IBMCDC)
ENDDTAMIR TARGET(*ALL)
DLYJOB DLY(120)
/* If still subscriptions active, end with *IMMED */
ENDDTAMIR TARGET(*ALL) ENDTYP(*IMMED)
DLYJOB DLY(10)
/* End the subsystem immediately */
ENDSBS SBS(IBMCDC) OPTION(*IMMED)
MONMSG MSGID(CPF1054)
ENDPGM
****************** End of data ***************************************
The script first ends all subscription target processes by running DMENDPROC and then issues a controlled end of the subscriptions. A controlled end allows processing of the remaining journal entries by the subscription. If there is a large backlog of transactions to be replicated, the controlled ending of the subscriptions could take a long time and delay the shutdown of the system. Therefore, the program waits for 2 minutes and then ends the
replication immediately.
You could make the program a bit more sophisticated by, for example, checking the monitoring activity of the subscription to shorten the fixed delay.
System z example for starting
The JCL to start the InfoSphere CDC instance (the default name is CHCPROC) is distributed with the product in the SHCCNTL library (see Example 9-6).
Example 9-6 JCL to start a InfoSphere CDC instance
//CHCPROC PROC CONFIG=<ConfigSuffix>,
// COMM=COMM,
// DBMS=DBMS
//*
//* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
//* *
//* LICENSED MATERIALS - PROPERTY OF IBM *
//* 5655-U96 *
//* COPYRIGHT IBM CORP 1998,2008 ALL RIGHTS RESERVED. *
//* *
//* US GOVERNMENT USERS RESTRICTED RIGHTS - *
//* USE, DUPLICATION OR DISCLOSURE RESTRICTED *
//* BY GSA ADP SCHEDULE CONTRACT WITH IBM CORP. *
//* *
//* Sample JCL to run InfoSphere Change Data Capture for z/OS *
//* *
//* Instructions: *
//* *
//* 1. If the Language Environment product was not installed into *
//* data sets with the default name prefix, change the value of *
//* the _ICONV_UCS2_PREFIX keyword in the PARM field of the EXEC *
//* statement to specify the prefix chosen for the Language *
//* Environment data sets. InfoSphere Change Data Capture for *
//* z/OS will use this item when requesting code page conversion *
//* using Language Environment provided services. *
//* *
//* 2. Change <ConfigSuffix> to the 2 character configuration suffix *
//* appended to the "CHCCFGxx" member of the CHCCNTRL data set. *
//* *
//* 3. Change <CHC.HCHC620> to the high-level data set name *
//* qualifier to be used for the InfoSphere Change Data Capture *
//* for z/OSexecution data sets. *
//* *
//* 4. Change 'DSNxxx.SDSNEXIT' to the name of the DB2 APF *
//* Authorized exit library. *
//* *
//* 5. Change 'DSNxxx.SDSNLOAD' to the name of the DB2 APF *
//* Authorized execution library. *
//* *
//* 6. Change <MetaDatacluster> to the name of the InfoSphere Change *
//* Data Capture for z/OS Meta-Data VSAM Cluster. *
//* *
//* 7. Change <PALcluster> to the name of the InfoSphere Change Data *
//* Capture for z/OS Product Administration Log (PAL) VSAM *
//* Cluster. *
//* *
//* 8. Change <CACHE.QUALIFIER> to the high-level data set name *
//* qualifier to be used for the DB2 Log Cache data sets. *
//* Remove these lines if you do not want to use a DB2 Log Cache. *
//* *
//* 9. Change <UserExitLoadLib> to the name of the execution data set *
//* for user exits. *
//* *
//* 10. Change 'TCPIP.SEZAINST(TCPDATA)' to the name of the z/OS *
//* TCP/IP Component's TCPIP.DATA data set. If the installation *
//* is using a TCP/IP Resolver address space, this DD statement *
//* can be removed. *
//* *
//* Note: *
//* The first step, named DELETE, removes the prior SYSMDUMP data *
//* set, if it exists. The SYSMDUMP data set is then allocated *
//* anew when InfoSphere Change Data Capture for z/OS starts *
//* execution. This newly allocated SYSMDUMP data set must have a *
//* disposition of MOD, so that, should multiple memory dumps occur, they *
//* will be appended in sequence, and not successively overlay the *
//* prior memory dumps in the data set. *
//* *
//* Note: *
//* The REGION parameter on the EXEC statement of the IEFPROC step *
//* is set to 0M. This value implies a default value for the *
//* MEMLIMIT parameter (which is not coded) of "NOLIMIT". This *
//* permits the use of storage "above the bar". If the REGION *
//* parameter is changed to a non zero value or removed, you must *
//* add a MEMLIMIT keyword to the EXEC statement specifying enough *
//* storage to serve InfoSphere Change Data Capture for z/OS's *
//* requirements. *
//* *
//* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
//*
//DELETE EXEC PGM=IEFBR14
//SYSMDUMP DD DSNAME=<CHC.HCHC620>.SYSMDUMP,DISP=(MOD,DELETE),
// UNIT=SYSALLDA,
// SPACE=(CYL,(1))
//*
//IEFPROC EXEC PGM=CHCMIT,
// PARM=('/ENVAR(_ICONV_UCS2_PREFIX=CEE)',
// '/&COMM,&DBMS,CONFIG=&CONFIG'),
// REGION=0M <== SEE THE NOTE ABOVE
//STEPLIB DD DSNAME=<CHC.HCHC620>.LOAD,DISP=SHR
// DD DSNAME=DSNxxx.SDSNEXIT,DISP=SHR
// DD DSNAME=DSNxxx.SDSNLOAD,DISP=SHR
//CHCCNTRL DD DDNAME=IEFRDER
//IEFRDER DD DSNAME=<CHC.HCHC620>.DATA,DISP=SHR
//CHCPRINT DD SYSOUT=*
//CHCAUDIT DD SYSOUT=*
//CHCREPRT DD SYSOUT=*
//CHCMTDTA DD DSNAME=<MetaDatacluster>,DISP=SHR
//CHCPALOG DD DSNAME=<PALcluster>,DISP=SHR
//CHCCACHE DD DSNAME=<CACHE.QUALIFIER>.CHCCACHE,DISP=SHR,
// AMP=('BUFND=256,ACCBIAS=SW')
//CHCCHCTL DD DSNAME=<CACHE.QUALIFIER>.CHCCHCTL,DISP=SHR
//CHCUXLIB DD DSNAME=<UserExitLoadLib>,DISP=SHR
//SYSTCPD DD DSNAME=TCPIP.SEZAINST(TCPDATA),DISP=SHR
//UTPRINT DD DUMMY
//* SEE THE NOTE ABOVE ABOUT THE DISPOSITION CODED FOR THIS DATA SET *
//SYSMDUMP DD DSNAME=<CHC.HCHC620>.SYSMDUMP,DISP=(MOD,CATLG), *NOTE*
// UNIT=SYSALLDA,
// SPACE=(CYL,(150,50))
//ABNLIGNR DD DUMMY
//*
The following is an example on how to issue InfoSphere CDC console commands as part of a JCL.
//WCA008CM JOB (3WCA000),'BIN 376 - CXA XM1',MSGCLASS=H,
//REGION=2M,CLASS=A,NOTIFY=&SYSUID
//ISPFPROC EXEC PGM=IEFBR14
//SYSPRINT DD SYSOUT=*
//F CHCPROC,STRTSMIR,SUBSCR=TESTSUB1
//F CHCPROC,ENDRPLCT,SUBSCR=TESTSUB2,NORM
9.4.5 Checking an InfoSphere CDC engine and subscriptions activity
When InfoSphere CDC has been deployed on a production system, it is typically treated as a business critical application and it is therefore included in standard checks for activity. This section lists the methods for determining the activity of the InfoSphere CDC instance and its subscriptions, using operating
system commands.
If you are looking for a more centralized solution to monitor activity of InfoSphere CDC instances, subscriptions, and more details, such as latency, the Java API provides better abilities to retrieve operational information. See 9.5, “InfoSphere CDC API” on page 262 for more information about customizing the configuration and monitoring of InfoSphere CDC.
Linux, UNIX, and Windows engine activity checking
There is no CLI command for checking the activity of a InfoSphere CDC instance. Most system administrators use standard operating system techniques to verify if a process has been started, and the same can be used for activity of the InfoSphere CDC engine.
There are at least three methods by which the activity of a InfoSphere CDC instance can be checked:
1. The dmts64 (or dmts32) process is running for the named instance.
2. There is a listener active for the port that has been configured for
the instance.
3. An engine command, such as dmgetsubscriptionstatus, returns a result when run and exits with code 0.
Most commonly, customers use the UNIX ps command to determine if processes are running on the system. Example 9-7 shows a script that checks whether there is a dmts process active for the first (and only) InfoSphere CDC instance configured for an engine.
Example 9-7 Script to check if a dmts process is active
#!/bin/bash
 
# set -x
 
# CDC instance variables
CDC_AGENT=DB2
CDC_HOME=/opt/InfoSphere CDCdb2
CDC_USER=InfoSphere CDCdb2
CDC_INSTANCE=`ls ${CDC_HOME}/instance | head -1`
 
# Find the process in the system using the ps command
nLines=$(ps aux | grep dmts | grep $CDC_HOME | grep $CDC_INSTANCE | wc -l)
 
if (( ${nLines} != 0 ))
then
echo "$CDC_AGENT CDC instance $CDC_INSTANCE is running."
exit 0
else
echo "$CDC_AGENT CDC instance $CDC_INSTANCE is not active."
exit 1
fi
Even though it might be interesting to know whether a InfoSphere CDC instance is active, most clients prefer to know if subscriptions are up and running (if subscriptions are active, the instance is up too). For the Linux, UNIX, and Windows engine, the full status can be retrieved by running dmgetsubscriptionstatus from a script or the command line. Examples of those commands are:
[InfoSphere CDCora@cdc-redscript bin]$ dmgetsubscriptionstatus -I cdcdemo -A -p
Subscription : REDSCRIPT
Status : Idle
[InfoSphere CDCora@cdc-redscript bin]$
If the instance is running, the active (Active) and inactive (Idle) subscriptions are shown. The command returns an error when trying to run it against an instance that is not running.
[InfoSphere CDCdb2@cdc-redscript bin]$ dmgetsubscriptionstatus -A -p
IBM InfoSphere Change Data Capture instance "cdcdemo" is not started.
Start the instance to resolve this error.
[InfoSphere CDCdb2@cdc-redscript bin]$ echo $?
253
A shell script could be written to analyze the output from the dmgetsubscriptionstatus command and take relevant actions. Also, the status check could be integrated with a monitoring solution such as
IBM Tivoli® Monitoriing.
System i activity checking
On System i, a subscription’s status can be checked through the InfoSphere CDC subsystem. If the InfoSphere CDC subsystem is active and there is a TCPLISTEN job in the subsystem, the instance is running.
Every subscription that is active for continuous mirroring has at least three source jobs in the InfoSphere CDC subsystem (more if multiple journals are processed by a subscription). When targeting a System i server, there are two or more jobs active per subscription (there is one DMTAPPLY job per journal processed on the source). Figure 9-3 shows the REDSCRIPTI subscription, which replicates intra-system, activating a total of five subscription jobs in
the subsystem.
Figure 9-3 Replicating intra-system
Most monitoring solutions can base actions on the activity of certain jobs in a subsystem. It should be fairly simple to configure activity checking by monitoring for subsystem jobs that have the same name as the subscription. You could also consider writing a CL program that employs the QUSLJOB API to list the jobs in the InfoSphere CDC subsystem. Providing a sample program to list subsystem jobs goes beyond the scope of this book.
System z activity checking
The DSPACT console command can be used to show activity in the InfoSphere CDC/z instance (Example 9-8).
Example 9-8 DSPACT console command
CHC9600I (CHCPROC) "MODIFY" command accepted, Diagnostic information ("DSPACT")
CHC9733I (CHCPROC) V5 Communications: Connections = 5
CHC9800I (CHCPROC) Target Data name = CUST10K, Sends = 6, Send Bytes = 310, Recvs = 2, Recv Bytes = 78, CPU Used = 0.011415
CHC9801I (CHCPROC) Source Data name = CUST10K, Sends = 2, Send Bytes = 78, Recvs = 6, Recv Bytes = 310, CPU Used = 0.010111
CHC9802I (CHCPROC) Monitor Agent name = DMCMVSA:37504, Sends = 18, Send Bytes = 2,031, Recvs = 20, Recv Bytes = 1,625, CP ...
CHC9802I (CHCPROC) ... U Used = 0.012863
CHC9803I (CHCPROC) Source Control name = DMCMVSA.TOROLAB.IBM.COM, Sends = 20, Send Bytes = 1,625, Recvs = 18, Recv Bytes ...
CHC9803I (CHCPROC) ... = 2,031, CPU Used = 0.011747
CHC9804I (CHCPROC) Target Control name = MNOEST2.TOROLAB.IBM.COM, Sends = 18, Send Bytes = 1,384, Recvs = 14, Recv Bytes ...
CHC9804I (CHCPROC) ... = 1,410, CPU Used = 0.013526
CHC9778I (CHCPROC) Agent Communications: Connections = 1, Admin(Act/Pnd) = 1/0, State = Processing, Shutdown = *N/A*
CHC9818I (CHCPROC) Medium = TCP/IP, State = Active, Shutdown = *N/A*
CHC9788I (CHCPROC) Datastore name = DMC0038, Medium = TCP/IP, Paths = 1, Sends = 21, Recvs = 38
CHC9832I (CHCPROC) DBMS: Repositories = 1, State = Processing, Shutdown = *N/A*
CHC9742I (CHCPROC) DBMS: Repository Type = DB2 , ID = AD91:DMC0038, Log(Monitor/Scraper) = 1/1,
Replication(admin/source/ ...
CHC9742I (CHCPROC) ... apply) = 1/1/1, HoL RBA = X'00000A9FC229FB91'
CHC0290I (CHCPROC) The DB2 Log Cache is processing with a range of X'0000000000000000' through
X'0000000000000000'
CHC9751I (CHCPROC) Replication: Log(Monitor/Scraper) = 1/1, Replication(source/apply) = 1/1
CHC9753I (CHCPROC) Replication: 1 active Target Subscription(s)
CHC9844I (CHCPROC) Subscription name = CUST10K, Repository type = DB2, Activity = Start_Mirror, State = Processing, Shutd ...
CHC9844I (CHCPROC) ... own = *N/A*, Current RBA = X'00000A9FC229FB91', Written at = 2011-04-13-13.29.22.179712, Scrape to ...
CHC9844I (CHCPROC) ... = *N/A*
CHC9753I (CHCPROC) Replication: 1 active Source Subscription(s)
CHC9857I (CHCPROC) Source name = CUST10K, Activity = , State = Processing, Shutdown = *N/A*
Example 9-8 on page 249 shows various bits of information under several major headings, as follows:
V5 Communications shows the number of communications connections currently active. In this case, a loopback subscription called CUST10K and a single Management Console user.
Agent Communications shows the single Management Console user.
DBMS shows that you are connected to a DB2 subsystem called AD91 with user ID CMS0038.
You are running one log monitor (it should always be one) and one log scraper (it should be one per source subscription).
You are also running one source / apply because you are running one loopback subscription.
HoL RBA shows the current DB2 head of log position (either RBA or LRSN).
The log cache is not active and shows no position.
Replication shows details about the source and target subscriptions currently active. Of special interest here is the Current RBA, which shows where the scraper for subscription CUST10K is currently reading the log.
Monitoring the event logs
Besides monitoring the activity of subscriptions, you typically want to automate the monitoring of event logs. Most monitoring solutions offer the ability to run system commands and analyze the output of those commands. The next section describes CLI commands that can be used in scripts or started from a monitoring solution to automate monitoring of InfoSphere CDC event logs.
Linux, UNIX, and Windows engine event log monitoring
If you plan to monitor the event logs from the InfoSphere CDC server using a command line, the Linux, UNIX, and Windows engine provides a command to list the contents of the event log. By using basic commands, you can filter out the error messages from the information messages that the replication issues (Figure 9-4 on page 251).
Figure 9-4 Event log
The output can also be directed to a file that is monitored by your external monitoring solution so that replication errors are also captured. Some monitoring solutions expect only new errors to be logged and do not filter out the ones that have already been processed, or they track one file that holds the error messages and only view it when it has been extended. To avoid having errors reported more than once, you might need to build some scripting logic to process the output of the dmshowevents command (Example 9-9).
Example 9-9 Output of the dmshowevents command
#!/bin/bash
 
# Environment setup
CDC_HOME="/opt/InfoSphere CDCdb2" # CDC Home directory
CDCInstance="cdcdemo" # CDC Instance configured
LogFile="/monitor/cdc_reported_errors.log" # Log file that is monitored
TmpLogFile="/tmp/cdc_errors_$$.tmp" # Temporary file holding all dmshowevents
# log entries
 
# List the events to a temporary output file
$CDC_HOME/bin/dmshowevents -I $CDCInstance -a | grep -i error > $TmpLogFile
 
# Find differences between temporary and permanent output file
touch $LogFile # Create log file if it does not exist
# Output new lines to log file
diff $TmpLogFile $LogFile | tail -n+2 >> $LogFile
In addition to the dmshowevents messages, the Linux, UNIX, and Windows engine also logs more detailed messages in the <cdc_home>/instance/<instance>/log directory (Figure 9-5).
Figure 9-5 InfoSphere CDC events trace
Also, if you have switched on debugging for the engines or if you are using the com.datamirror.ts.util.Trace class from your Java user exit program, these messages are also logged in the trace_dmts* files in this directory (Figure 9-6).
Figure 9-6 InfoSphere CDC events trace - details
System i event log monitoring
InfoSphere CDC for DB2 on System i logs its engine and subscription events in message queue objects (*MSGQ) in the InfoSphere CDC product library. The following message queue objects can be distinguished:
COMMEVENT: InfoSphere CDC data store events (source and target).
<subscription>: Subscription source events, for example, REDSCRIPTI for a subscription that is called REDSCRIPTI.
<publisher ID>_T: Subscription target events, for example, REDSCRIPTI_T for a subscription that is called REDSCRIPTI and whose publisher ID
is REDSCRIPTI.
Figure 9-7 shows the output of the DSPMSG MSGQ(REDSCRIPTI) command to provide an example of source subscription messages.
Figure 9-7 InfoSphere CDC subscription source events
Figure 9-8 shows the target output of the
DSPMSG MSGQ(REDSCRIPTI_T) command.
Figure 9-8 InfoSphere CDC subscription target events
You can find more detailed messages in the job logs of the InfoSphere CDC jobs. For example, Figure 9-9 shows a target event for a subscription (CPF4128).
Figure 9-9 Subscription event
The error entry in the event log indicates the InfoSphere CDC job on the system in which the error occurred. You can find the job log of the associated job by running the WRKJOB JOB(394643/D_MIRROR/REDSCRIPTI) command. In this case, the job had already ended, so it left a spooled file job log (QPJOBLOG) on the system (Figure 9-10).
Figure 9-10 InfoSphere CDC event error
System z event log monitoring
As with most System z applications, job logs are accessed through the System Display and Search Facility (SDSF). Standard data sets are JESMSGLG, JESJCL, and JESYSMSG.
There are also a few InfoSphere CDC for System z specific data sets, CHCPRINT, CHCAUDIT, and CHCREPRT:
CHCPRINT contains a log of all event log messages issued since the instance was started.
CHCAUDIT contains information about z/OS, DB2, and InfoSphere CDC for System z, including versions, maintenance levels, and configuration parameters, and significant events, such as users connecting to the data store from the Management Console, subscriptions starting and ending, and tables being parked.
CHCREPRT contains reports written by InfoSphere CDC for System z during its processing, either by request or to report errors. An example of a report produced by request is the Staging Space Report. A target subscription also produces a report when a target error is encountered. For example, if an SQL error is encountered during the application, this report contains details about what was being applied and to which table.
9.4.6 Removing obsolete database logs
InfoSphere CDC depends on the availability of database transaction logs, and these logs must not be removed from the system before all subscriptions have finished processing them.
Most clients have an automated procedure for cleaning up archive transaction logs from the system to avoid flooding the disk space. When implementing these purging procedures, you must take the dependencies of InfoSphere CDC
into account.
Linux, UNIX, and Windows engine log maintenance
Example 9-10 shows a sample script to demonstrate how an Oracle archive log cleanup procedure could be aligned with the log dependencies of InfoSphere CDC. Although there are multiple methods that can be used, this method looks at the archive logs registered in the Oracle catalog (v$archived_log view) and compares them with the log dependencies reported by InfoSphere CDC.
Example 9-10 Archive log clean-up procedure for Oracle
#!/usr/bin/ksh
# set -x # Uncomment to debug
# ******************************************************************** #
# Description : Remove Oracle Archive Log Oracle already processed #
# and applied by InfoSphere CDC (Transformation #
# Server) #
# #
# Designed for InfoSphere CDC 6.3 and higher #
# (metadata stored outside Oracle database) #
# #
# Language : ksh #
# ******************************************************************** #
 
prg=${0##*(*/|-)} # Get script name
 
# Initialization of variables (to be customized for implementation)
CDC_HOME="/opt/InfoSphere CDCora" # CDC Home directory
SrcOraSID="cdcdemo" # SID (TNS name) of source database
SrcOraUser="InfoSphere CDCora" # User to log on to Oracle
SrcOraPwd="passw0rd" # Password of user
SrcCDCInstance="cdcdemo" # CDC Instance configured
 
TmpDepLogFile="/tmp/CDCDepLog$$.tmp" # Temporary file holding the archive
# files CDC is still dependent on
TmpArcLogFile="/tmp/CDCArcLog$$.tmp" # Temporary file holding archive files
# to be deleted
LogFile="${HOME}/CDCDltArcLog.log" # Logging of all runs, change to
# "/dev/null" if no logging desired
 
MaxLogDays=9999 # Archive list threshold in days (to avoid
# unnecessary processing of archives)
 
# Function to throw an error and exit with a return code
function throwError
{
exitStatus=$?
(( exitStatus == 0 )) && exitStatus=1
 
if (( $# > 0 )); then
print "${prg}: $*" >&2
fi
 
exit ${exitStatus}
}
 
# Function to write an entry in the log
function writeLog
{
echo $1
print $1 >> ${LogFile}
}
 
# ---------------------------------------------------------------------------- #
# Main line #
# ---------------------------------------------------------------------------- #
 
# Define variables
typeset -i i=0
typeset -i nDlt=0
typeset -i nRtn=0
typeset -i nDltErr=0
 
# Check if log file exists or can be created and accessed
cat /dev/null >> ${LogFile}
(( $? == 0 )) || throwError "Cannot create or access log file: ${LogFile}"
# Check if temporary files can be created
cat /dev/null >> ${TmpDepLogFile}
(( $? == 0 )) ||
throwError "Cannot create temporary log dependency file: ${TmpDepLogFile}"
cat /dev/null >> ${TmpArcLogFile}
(( $? == 0 )) ||
throwError "Cannot create temporary archive log file: ${TmpArcLogFile}"
 
writeLog "## Archive log deletion for database ${SrcOraSID} started at `date`."
 
# First, obtain which log files are still needed by CDC
$CDC_HOME/bin/dmshowlogdependency -I ${SrcCDCInstance} -i -A > ${TmpDepLogFile}
 
# If there was an error retrieving the log dependencies, throw it
(( $? == 0 )) ||
throwError "Error running dmshowlogdependency command for CDC instance
${SrcCDCInstance}"
 
# Report back the archive logs still required by the product
writeLog "Archive logs still required by CDC instance ${SrcCDCInstance}:"
cat ${TmpDepLogFile} | while read DepLog
do
writeLog "${DepLog}"
done
nRtn=$(wc -l ${TmpDepLogFile} | awk '{print $1}')
 
# Now, list all the archive logs
sqlplus -S ${SrcOraUser}/${SrcOraPwd}@${SrcOraSID} << EOF > /dev/null
set lines 1000
set termout off
set echo off
set term off
set pages 0
set verify off
set feedback off
set trimspool on
spool ${TmpArcLogFile}
select name from v$archived_log
where next_time > (sysdate-${MaxLogDays})
order by dest_id,thread#,sequence#;
spool off
exit;
EOF
 
# If there was an error running the SQL script, throw it
(( $? == 0 )) ||
throwError "Error running SQL script to list archive logs for database
${SrcOraSID}"
 
# Process list of archives which may be deleted
grep -v -f ${TmpDepLogFile} ${TmpArcLogFile} | while read ArcLine
do
# Skip archive file if it doesn't exist anymore
if [[ -e ${ArcLine} ]]; then
rm ${ArcLine} 2>> ${LogFile}
if (( $? != 0 )); then
writeLog "Error deleting archive file ${ArcLine}"
((nDltErr += 1))
else
writeLog "Archive log ${ArcLine} deleted"
((nDlt += 1))
fi
fi
done
 
# Report how many archives deleted and retained
writeLog "Number of archives deleted: ${nDlt}"
if (( ${nDltErr} != 0 )) ; then
writeLog "Number of errors when deleting archives: ${nDltErr}"
fi
writeLog "Number of archives retained: ${nRtn}"
 
# Remove temporary files
rm ${TmpDepLogFile}
rm ${TmpArcLogFile}
 
writeLog "## Archive log deletion for database ${SrcOraSID} finished at `date`."
writeLog " "
When running the script, it automatically removes all archive logs from the file system that are no longer needed by any of the subscriptions. The output of the script is shown in Example 9-11.
Example 9-11 Deleting the archive files
[InfoSphere CDCora@cdc-redscript bin]$ ./CDCDltArcLog.sh
## Archive log deletion for database cdcdemo started at Fri Apr 8 15:17:45 CEST 2011.
Archive logs still required by CDC instance cdcdemo:
IBM InfoSphere Change Data Capture Show Log Dependency Utility
Archive log /oradata/cdcdemo/archive/1_19_698772245.dbf deleted
Archive log /oradata/cdcdemo/archive/1_20_698772245.dbf deleted
Archive log /oradata/cdcdemo/archive/1_21_698772245.dbf deleted
Archive log /oradata/cdcdemo/archive/1_22_698772245.dbf deleted
Archive log /oradata/cdcdemo/archive/1_23_698772245.dbf deleted
Archive log /oradata/cdcdemo/archive/1_24_698772245.dbf deleted
Archive log /oradata/cdcdemo/archive/1_25_698772245.dbf deleted
Archive log /oradata/cdcdemo/archive/1_26_698772245.dbf deleted
Archive log /oradata/cdcdemo/archive/1_27_698772245.dbf deleted
Archive log /oradata/cdcdemo/archive/1_28_698772245.dbf deleted
Number of archives deleted: 10
Number of archives retained: 1
## Archive log deletion for database cdcdemo finished at Fri Apr 8 15:17:50 CEST 2011.
If the archive logs are kept in an Oracle ASM file system, they would not be accessible using operating system commands. In most Oracle implementations, clients probably already have a log management process in place using RMAN. The script in Example 9-11 must be adjusted to retrieve the log sequence number from the v$archived_log view and generate the RMAN commands (RMAN> delete noprompt archivelog logseq=nn) to remove the archive logs.
System i journal receiver maintenance
On System i, InfoSphere CDC subscription process journal receiver entries are stored as journal receiver (*JRNRCV) objects. After all subscriptions reading a certain journal have finished processing a journal receiver, this journal receiver can be removed from the system from a InfoSphere CDC perspective.
InfoSphere CDC for System i provides the CHGJRNDM command to clean up journal receivers from the system after the subscriptions no longer need them.
If there are other co-existing applications using the same journals that InfoSphere CDC does, management of the journal receivers should be coordinated between these applications. The oldest journal receiver that is still in use by any of the applications, and the next receivers in the chain, must be retained on the system.
For these situations, the RTVDMJENT command can be used to retrieve the oldest journal receiver needed by InfoSphere CDC. If you already have a journal management solution, see if it can check of the InfoSphere CDC journal
receiver dependencies.
System z database log maintenance
InfoSphere CDC for System z contains no log management features. Active DB2 log data sets are automatically archived as they grow. When the DB2 IFI must read the log entries from the archived DB2 log data sets, this action might result in slow processing of the logs.
9.5 InfoSphere CDC API
By providing an API for accessing InfoSphere CDC Management Console functions within a Java application or program, you are able to create specialized software to satisfy needs and requirements unique to your InfoSphere CDC implementation. In particular, customized Java interfaces that include necessary enhancements and the ability to access some or all of InfoSphere CDC Management Console functions can be developed by your organization.
This section provides a background on how the API classes and methods are organized and how that relates to the functionality of the Management Console. You learn how to use the data stores, configure subscriptions and table mappings, and operate and monitor the environment.
9.5.1 Development environment setup
Development using the InfoSphere CDC Java APIs requires that you have a specific compilation and building environment configured. Although most Java Development Kits at release 1.5 or higher can be used to compile classes that use the APIs, use the IBM JDK for your development environment to be in line with the required running environment.
Running any custom programs that use the InfoSphere CDC Java API classes requires that you use IBM Java Runtime Engine V1.5 or later. If you attempt to instantiate an object from the InfoSphere CDC API using a Java Runtime Environment (JRE) from a different provider, exceptions are thrown.
The class path for compiling your self-developed classes must include the api.jar file. This file can be found in the lib directory of the Access Server or Management Console installation. Do not use the api.jar file that ships with the Java CDC replication engines, as it is for engine-only use. To avoid errors when Access Server or Management Console installations are done on your development server, consider copying the api.jar file to your development environment. The api.jar file works together with the version of Access Server that is running in your environment. Should a new version of the Access Server be installed, it might be necessary to replace the api.jar file with the latest version and review your code.
9.5.2 Contents of the api.jar file
The API shipped with InfoSphere CDC Management Console and Access Server contains the following three main packages:
com.datamirror.ea.api
com.datamirror.ea.api.publisher
com.datamirror.ea.api.subscriber
The com.datamirror.common.util.* package is also useful, especially when automatic management of the data store is required. This package contains the methods for encryption and decryption of passwords.
Interfaces and classes that are contained in the API and expose InfoSphere CDC functions are in the API Javadocs. The index for the API Javadocs is found in the following directory:
<MC_install_directory>apiindex.html
Furthermore, a high-level description of the API and the architecture can be found in the IBM Information Center at the following address:
Mapping the api.jar components to the Management Console
There are three main areas (packages) in the api.jar file, each encompassing a different part of the entire InfoSphere CDC configuration:
com.datamirror.ea.ap: Classes related to Access Server and its configured users and data stores.
com.datamirror.ea.api.publisher: Classes for controlling the source metadata, including the catalog tables and subscriptions (source side).
com.datamirror.ea.api.subscriber: Classes for controlling the target metadata, mainly the publications (target component of subscriptions).
Figure 9-11 shows the Access Manager perspective of the InfoSphere CDC Management Console with the most important API classes mapped to screen items. All classes in this perspective are contained in the
com.datamirror.ea.api package.
Figure 9-11 MC configuration mapping of API classes
Table 9-1 provides a list of the most commonly used classes contained in the com.datamirror.ea.api package.
Table 9-1 Commonly used classes in the com.datamirror.ea.api package
Class
Description
DataSource
Used to establish and keep the connection to the Access Server. Equivalent to connecting the Management Console to the Access Server and specifying the user and password.
AccessServerAgent
Represents an Access Server data store. After establishing a connection to the Access Server, you can obtain a list of data stores that have been registered.
AccessServerUser
Represents a user that is registered for the Access Server; the user can connect to the Access Server.
AccessServerAgentAccessParameters
Holds the link between an Access Server user and a data store. After establishing a connection to the Access Server, the user can only connect to the data stores for which a connection exists. Also, this object stores the database user and password that are used to connect to the database represented by the data store.
ReplicationRole
The interface that is implemented by the Publisher and Subscriber classes. An AccessServerAgent can be connected through one of the classes that implement the ReplicationRole interface.
Figure 9-12 shows the Configuration perspective of the Management Console and a map of the most commonly used API classes mapped to screen items. All classes in this perspective are contained in the com.datamirror.ea.api.publisher (source) and com.datamirror.ea.api.subscriber (target) packages.
Figure 9-12 Management Console configuration of commonly used API classes
Table 9-2 provides a list of the most commonly used classes for the configuration of subscriptions, which are contained in the com.datamirror.ea.api.publisher (source) and com.datamirror.ea.api.subscriber (target) packages.
Table 9-2 Commonly used subscription configuration packages
Class
Location
Description
Publisher
Source
The object representing a data store that has been connected to be used as a source. When connecting to a data store using the Publisher class, the source metadata is revealed to the Java API methods. Most InfoSphere CDC engines have a dual role and can be connected either as a source or as a target. If the engine must be used in both roles, you must instantiate a Publisher object and a Subscriber object.
Subscriber
Target
An object representing a data store that has been connected to be used as a target. When connecting to a data store using the Subscriber class, the target metadata is revealed to the Java API methods. Most InfoSphere CDC engines have a dual role and can be connected either as a source or as a target. If the engine must be used in both roles, you must instantiate a Publisher object and a Subscriber object.
Catalog
Source
A repository of tables that may be used to replicate from. Every database table from which you want to replicate data or changes must first be registered in the catalog. The catalog keeps a record of table names and table structure, together with some technical information, such as supplemental logging identification (journal and log group).
PublishableTable
Source
This class describes the source database table, including table level information, such as logging and the columns that make up the table, regardless whether they have been selected for replication or not. Among others, InfoSphere CDC keeps the table description to detect discrepancies between the real database table structure and the algorithm that InfoSphere CDC uses to process the database transaction log entries.
Subscription
Source
The connection between the source and the target data store that references the source side of the table mappings. From the InfoSphere CDC metadata point of view, a subscription is the container of the source side of the table mapping definitions. A subscription is identified by a Subscription name, which is an uppercase string made up of a maximum of 30 characters. A subscription has a Publisher ID attribute (a string that has a maximum of eight uppercase characters) that is the unique identification of a subscription on the target side (Publication).
Publication
Target
The target side of a Subscription object. A Publication references the source tables that are selected for replication (SubscribedTable) as PublishedTable objects. A publication is identified by a Publisher ID, which is an uppercase string that has a maximum eight characters that are registered for the subscription.
SubscribedTable
Source
These tables are the Catalog tables (PublishableTable) that have been selected for replication by a subscription. When mapping tables in a subscription, the catalog tables definitions are copied to SubscribedTable objects under a Subscription. A SubscribedTable object can also hold derived columns (virtual columns) as DerivedColumn objects.
PublishedTable
Target
The target side of a SubscribedTable object. It holds the table and columns definitions of tables that are replicated by a subscription. When finding the PublishedTable object on the target side, the schema name and table name are converted to uppercase during the description. If your source schema / table is in lowercase, they show in uppercase in the PublishedTable object. A PublishedTable object is owned by a Publication (target side of a Subscription).
TableAssignment
Target
Defines the link between PublishedTable (table coming from the source) and destination database table (or application object). TableAssignment is referenced by PublishedTable (one published table can be assigned to one destination object). A TableAssignment holds all information about how the source table is mapped to the target table, including operations and user exits.
ColumnAssignment
Target
Defines the mapping of source columns (PublishedDBColumn) to target columns, including derived expressions and journal control columns.
Figure 9-13 shows the relationship between the source and target metadata and how the various classes fit into this relationship. To keep the picture simple, we only included the main objects that make up subscriptions and table mappings. The relationship between source and target metadata is important to understand the table mapping process, which is covered in 9.5.7, “Procedure for mapping tables” on page 287.
Figure 9-13 InfoSphere CDC source to target relationship
9.5.3 Connecting to and managing the Access Server
The first task in controlling InfoSphere CDC using the Java API is to connect to the Access Server. This action is the equivalent of starting the InfoSphere CDC Management Console and typing the user, password, host, and port of the Access Server. When connecting to the Access Server using the API, the exact same arguments must be passed to the method that connects to the DataSource object. The DataSource object is the object representing the Access Server.
The code first prepares the context of connecting to the Access Server and then tries to create the DataSource object and connect to it using the context. As with all API methods, the method throws an ApiException if it encounters a failure. When running the connectAccessServer() method, for example, that error
is received.
Example 9-12 shows the sample code.
Example 9-12 Sample code to connect to Access Server
/**
* Connect to an Access Server
*
* @param hostName
* - Access Server host name
* @param port
* - Access Server port
* @param userName
* - Access Server user
* @param password
* - Access Server password
* @return The connected Access Server
* @throws ApiException
*/
public DataSource connectAccessServer(String hostName, Integer port,
String userName, String password) throws ApiException {
DataSource accessServer = null;
DefaultContext asCtx = new DefaultContext();
asCtx.setString(DataSource.Hostname, hostName);
asCtx.setInt(DataSource.Port, port);
asCtx.setString(DataSource.User, userName);
asCtx.setString(DataSource.Password, password);
accessServer = Toolkit.getDefaultToolkit().createDataSource();
accessServer.connect(asCtx);
System.out.println("Connected to Access Server on host " + hostName
+ " and port " + port + ".");
return accessServer;
}
After you are finished with the custom processing, disconnect from the Access Server to release the connections. You could build in the disconnection in the general exception handling code of your custom program, as shown in Example 9-13.
Example 9-13 Sample code to disconnect from Access Server
/**
* Close the existing connection to the Access Server
*
* @param accessServer
* - The Access Server from which you will be disconnected
*/
public void disconnectAccessServer(DataSource accessServer) {
if (accessServer != null) {
accessServer.close();
}
System.out.println("Disconnected from Access Server.");
}
Two attempts are made to connect to the Access Server. The first attempt fails with an invalid password and the second one is successful. The sample code is shown in Example 9-14.
Example 9-14 Example of connecting to Access Server
// Failed attempt
DataSource failedAccessServer = null;
System.out.println("Failed attempt to connect to Access Server:");
try {
failedAccessServer = connectAccessServer("172.16.74.191", 10101,
"admin", "invalid_passw0rd");
} catch (ApiException e) {
System.err.println(e.getMessage());
}
// Successful attempt
DataSource accessServer = null;
System.out.println("Successful attempt to connect to Access Server:");
try {
accessServer = connectAccessServer("172.16.74.191", 10101, "admin",
"passw0rd");
} catch (ApiException e) {
System.err.println(e.getMessage());
}
The output of Example 9-14 on page 271 is shown in Example 9-15.
Example 9-15 Successful connection to Access Server
Failed attempt to connect to Access Server:
Invalid user name or password.
Successful attempt to connect to Access Server:
Connected to Access Server on host 172.16.74.191 and port 10101.
After you have connected to the Access Server, the API does not automatically connect all data stores that the user is entitled to use. If you have kept the default Management Console settings, the Access Server attempts to connect to all data stores when started. The API allows you to selectively connect and disconnect to data stores as needed.
However, now that the connection has been established, you can list all data stores (agents) that have been defined in the Access Manager and attempt to connect to them. Creating connections is described in 9.5.4, “Connecting to the data stores” on page 277. Before elaborating on this subject, here are a few examples for retrieving information from the data store definitions. The first example is shown in Example 9-16.
Example 9-16 Retrieving information from data store definitions
/**
* List the accessible data stores for the user connected to the Access
* Server
*
* @param accessServer
* - The Access Server to which you're connected
* @throws ApiException
*/
public void listDataStores(DataSource accessServer) throws ApiException {
SortedMap<String, ReplicationRole> agentMap = new TreeMap<String, ReplicationRole>();
ReplicationRole[] dataStores = accessServer.getAgents();
accessServer.getAgentList();
// First put all data stores in a map to remove duplicates. A data store
// that has a dual role is listed twice when using the getAgentList()
// method
for (ReplicationRole dataStore : dataStores) {
agentMap.put(dataStore.getName(), dataStore);
}
// Now iterate through the map entries to list
Iterator<String> dataStoreList = agentMap.keySet().iterator();
while (dataStoreList.hasNext()) {
ReplicationRole dataStore = agentMap.get(dataStoreList.next());
System.out.println("Datastore name : " + dataStore.getName());
System.out.println("Description : "
+ dataStore.getDescription());
System.out.print("Type : ");
if (accessServer.getAgentProperties(dataStore.getName())
.getSourceOrTarget() == AccessServerAgent.SOURCE_OR_TARGET_DUAL)
System.out.println("Dual");
else if (accessServer.getAgentProperties(dataStore.getName())
.getSourceOrTarget() == AccessServerAgent.SOURCE_OR_TARGET_TARGET)
System.out.println("Target");
else
System.out.println("Source");
System.out.println("Version : " + dataStore.getVersion());
System.out.println("Host : " + dataStore.getHostName());
System.out.println("Port : " + dataStore.getPort());
AccessServerAgentAccessParameters[] dataStoreParms = accessServer
.getUserAgentProperties(accessServer.getUserProfile()
.getUserID(), dataStore.getName());
System.out.println("Database user : "
+ dataStoreParms[0].getDbLogin());
System.out.println("Database password: "
+ dataStoreParms[0].getDbPassword());
System.out.println();
}
}
You are already connected to the Access Server. The code sample in Example 9-16 on page 272 shows how to list all data stores to which the user has access. Use this approach if you want to connect to the data stores, subscription definitions, and table mappings. If the intention is to create additional data stores, and you want to ensure that you are not using an existing name, you could use the getAgentList() method. This method lists all data stores, regardless whether the user has access to them.
If a data store has a dual role (it can serve both as a source and as a target), the getAgents() method returns two entries of the data store, once for the source role and once for the target role. The listDataStores() method avoids displaying multiple entries of the same data store by first adding the entries to a sorted map and then listing the map entries.
For each data store, the name registered in the Access Server, the host name and port number on which the engine is running, and the database user and password that is used to connect to the target database / application is shown. Normally, you would not show the users and passwords in clear text, but you do so here to create a subscription later.
The output of the listing of accessible data stores looks similar to Example 9-17.
Example 9-17 List of accessible data stores
Data store name : CDC_DB2
Description : InfoSphere CDC DB2 6.5
Type : Dual
Version : V6R5M0T0BCDC_HOHDLPJU_164
Host : linux-iscdc
Port : 10901
Database user : db2inst1
Database password: passw0rd
 
eData store name : CDC_Oracle_Redo
Description : InfoSphere CDC Oracle Redo 6.5
Type : Dual
Version : V6R5M0T0BCDC_HOHDLPJU_165_38
Host : linux-InfoSphere CDC
Port : 11001
Database user : InfoSphere CDCora
Database password: passw0rd
Some of the methods defined for the data stores, such as retrieving the agent database extended ID (engine type) and version information, can only be called after the connection to the data store has been established. If you attempt to call these methods before connecting to the data store, the methods throw
an ApiException.
In environments where many InfoSphere CDC engines are installed on various servers, you might want to automate the creation of the Access Server components, such as new data stores and users, and make the data stores available by assigning them to the entitled users. For a fully automated configuration of InfoSphere CDC in large environments, this step is the first step in the definition process, as shown in Example 9-18.
Example 9-18 Creating Access Server components
/**
* Create a new Access Server user, datastore and assign the user to the
* newly created data store
*
* @param accessServer
* - The Access Server to which you are connected
* @throws ApiException
*/
public void createDataStoreAndUser(DataSource accessServer)
throws ApiException {
// Create the data store
String dataStoreHost = "linux-iscdc";
int dataStorePort = 11001;
String dataStoreDefaultDB = "";
String dataStoreDefaultDBUser = "iscdcora";
String dataStoreDefaultDBPassword = “Passw0rd”;
byte dataStoreMultiUser = (byte) 0;
// First ping the CDC engine to check existence and retrieve
// type/version, then create the data store
AccessServerAgent dataStore = accessServer.pingAgent(dataStoreHost,
dataStorePort);
Result createDatastoreResult = accessServer.createNewAgent(
"New_Datastore", "New Datastore Description",
dataStore.getAgentType(), TransformationServer.COMM_TCPIP,
dataStoreHost, dataStorePort, dataStore.getDatabaseType(),
dataStore.getDmVersion(), dataStore.getSourceOrTarget(),
dataStoreDefaultDB, dataStoreDefaultDBUser,
Encryptor.encryptAndEncode(dataStoreDefaultDBPassword),
dataStoreMultiUser);
// Now create the new user
String userRole = "TS System Administrator";
String userPassword = "password";
byte isAccountDisabled = (byte) 0;
byte isAccountLocked = (byte) 0;
byte isPasswordChangeRequired = (byte) 0;
byte isPasswordNeverExpires = (byte) 1;
byte enableUserDataStoreAdmin = (byte) 1;
byte isForceSavePassword = (byte) 1;
Result createUserResult = accessServer.createNewUser("New_User",
"New User Full Name", userRole, "New User Description",
Encryptor.encryptAndEncode(userPassword), isAccountDisabled,
isAccountLocked, isPasswordChangeRequired,
isPasswordNeverExpires, enableUserDataStoreAdmin,
isForceSavePassword);
// Assign the data store to the user
if (createDatastoreResult.isSucceed() && createUserResult.isSucceed()) {
byte showConnectionDialog = (byte) 0;
byte showParamsValues = (byte) 1;
byte writeProtectParams = (byte) 1;
byte allowConnectionParamsSaving = (byte) 1;
accessServer.addAgentAccessInUser("New_User", "New_Datastore",
dataStoreDefaultDB, dataStoreDefaultDBUser,
dataStoreDefaultDBPassword, showConnectionDialog,
showParamsValues, writeProtectParams,
allowConnectionParamsSaving);
}
}
Before creating a data store for the Access Server, first run ping against the InfoSphere CDC engine in question to validate its existence and retrieve attributes such as the engine type, database type, and engine version. The pingAgent method establishes a TCP/IP connection between the Access Server and the InfoSphere CDC engine using the host and port you pass to it and retrieves those engine attributes.
Although you could create a data store and specify all the attributes, you might end up with a data store you cannot connect to. The Access Manager in the Management Console interface uses the same technique to validate a data store before allowing creation. When creating a data store, you can also store the default database connection parameters (database, database user, and password). If you choose to store the default connection parameters, assigning a new user to the data store takes these parameters as the database connection settings for that user. The password in the data store default database connection must be encrypted when passed.
For the new Access Server user, you must also pass the user's password in an encrypted manner. The password is validated to meet the password rules configured for the Access Server. If you want to create the user even if the password does pass the validation, you can force saving the password when calling the method.
For the user's role, the following string values are allowed (not case-sensitive):
"TS System Administrator"
"TS Administrator"
"TS Operator"
"TS Monitor"
After the data store and user have been created, you can assign the data store to the user to make it available when the user is connected. Again, the database, database user, and password can be specified for the user connected to the data store. The password must be passed in an unencrypted manner (however, the information is stored as encrypted). The getUserAgentProperties command also returns the decrypted password.
9.5.4 Connecting to the data stores
In accordance with the Management Console, you must connect to at least one data store before you can do anything with the replication definitions (subscriptions). Typically, you connect to a source and a target data store.
A data store can have a source, target, or dual (can serve both as source or target) role. All source configurations are accessible by classes in the com.datamirror.ea.publisher package, the main one being Publisher. The target configuration is accessible by classes in the com.datamirror.ea.subscriber package, the main one being Subscriber. Both classes implement the ReplicationRole interface and a number of common methods are inherited from this superclass.
Publisher is a replication role responsible for configuring the source side of the replication definitions and table mappings. It publishes source table definitions and database transactions that the source business applications generate to the target side. A Publisher refers to a Catalog, which maintains a list of source database tables that are available for subscriptions. Additionally, a Publisher object holds the source side of subscriptions, which define what and how data is being replicated.
Subscriber is a replication role and maintains a list of subscription targets (called Publications) and mappings from source tables to target (destination) tables or application objects. It subscribes to the information that the Publisher distributes (table definitions and database transactions) and uses it to perform activities on the target.
All data stores can be connected using the connect() method in the ReplicationRole class. Before establishing a connection to a data store, you need to determine the role of the data store. This role can either be Publisher or Subscriber. Before trying to establish a connection to a data store, run ping against the InfoSphere CDC engine to see if it is listening on the port and host configured for the data store. Use the pingAgent() method if you need to check InfoSphere CDC instance activity from an external monitoring solution.
In some environments, it might take some time before the connection to the data stores is established. This delay could be because of poor network connections, slow servers, or a combination of these and other factors. To avoid lengthy waits when trying to ping or connect to the data stores, set the communications timeout value. The maximum time the Access Server waits to establish a connection to the data store can then be controlled.
The connectPublisherDataStore() method connects to a data store and establishes a Publisher object that provides access to the source engine metadata. When connecting to the data store, the attempt times out if the connection takes more than 10 seconds (10,000 milliseconds). The sample code for the connection is shown in Example 9-19.
Example 9-19 Connect to a publisher (source) data store
/**
* Connect to a publisher data store defined for an Access Server
*
* @param accessServer
* - The Access Server to which you're connected
* @param dataStoreName
* - The name of the publisher data store
* @return The Publisher, null if the data store is not found or could
* not be connected
* @throws ApiException
*/
public Publisher connectPublisherDataStore(DataSource accessServer,
String dataStoreName) throws ApiException {
int dataStoreTimeout = 10000;
Publisher dataStore = null;
dataStore = accessServer.getPublisher(dataStoreName);
if (dataStore != null) {
try {
// First ping the data store to see if it can be reached
accessServer.pingAgent(dataStore.getHostName(),
dataStore.getPort());
} catch (ApiException e) {
System.out.println("No reponse from publisher datastore "
+ dataStoreName + " on " + dataStore.getHostName()
+ ":" + dataStore.getPort());
return null;
}
// Now try to connect
dataStore.setTimeOut(dataStoreTimeout);
dataStore.connect();
System.out.println("Connected to publisher datastore "
+ dataStoreName);
}
return dataStore;
}
In the connectSubscriberDataStore() method, connect to a data store and establish a Subscriber object that allows you to access the target metadata. The sample code for this connection is shown in Example 9-20.
Example 9-20 Connect to a subscriber (target) data store
/**
* Connect to a subscriber data store defined for an Access Server
*
* @param accessServer
* - The Access Server to which you're connected
* @param dataStoreName
* - The name of the subscriber data store
* @return The Subscriber, null if the data store is not found or could
* not be connected
* @throws ApiException
*/
public Subscriber connectSubscriberDataStore(DataSource accessServer,
String dataStoreName) throws ApiException {
int dataStoreTimeout = 10000;
Subscriber dataStore = null;
dataStore = accessServer.getSubscriber(dataStoreName);
if (dataStore != null) {
try {
// First ping the data store to see if it can be reached
accessServer.pingAgent(dataStore.getHostName(),
dataStore.getPort());
} catch (ApiException e) {
System.out.println("No reponse from subscriber datastore "
+ dataStoreName + " on " + dataStore.getHostName()
+ ":" + dataStore.getPort());
return null;
}
// Now try to connect
dataStore.setTimeOut(dataStoreTimeout);
dataStore.connect();
System.out.println("Connected to subscriber datastore "
+ dataStoreName);
}
return dataStore;
}
After a data store is connected, you can retrieve additional properties, such as engine type, and work with system parameters, subscriptions, and so on. The listDataStoreDetail() method determines, among other details, the InfoSphere CDC engine type, which an attribute that can only be retrieved after the connection to the engine has been established. Also, the version returned by the connected data store (getAgentVersion) is always accurate, but the version that is returned by the getVersion() method reflects the last version that was obtained when the Access Manager ping command was sent. The sample code for this method is shown in Example 9-21.
Example 9-21 List data store details
/**
* List the details of a data store defined for an Access Server once it is connected
*
* @param accessServer
* - The Access Server to which you're connected
* @param dataStoreName
* - The name of the data store
* @throws ApiException
*/
public void listDataStoreDetail(ReplicationRole dataStore)
throws ApiException {
System.out.println("Datastore name : " + dataStore.getName());
System.out.println("Description : " + dataStore.getDescription());
System.out.println("Engine type : "
+ getDataStoreTypeString(dataStore.getExtendedDatabaseId()));
System.out.println("Version : " + dataStore.getAgentVersion());
System.out.println();
}
If an additional method is defined to list the InfoSphere CDC engine type as a string, this definition could be useful if you want to build customized configuration and monitoring solutions across many different types of InfoSphere CDC engines. Suppose that you are writing custom code to import subscriptions from an XML file. You want to validate that both the source and target data stores you are importing to are of the same type as the data stores used when the XML file was generated. You want to accomplish this task to avoid compatibility problems. The sample code for this process is shown in Example 9-22.
Example 9-22 Retrieve engine type as a string
/**
* Find and return the engine type of a data store
*
* @param dataStoreType
* - Type of the data store
* @return the type name of the data store
*/
private String getDataStoreTypeString(int dataStoreType) {
switch (dataStoreType) {
case DataTypeUtilities.PRODUCT_DATASTAGE:
return DataTypeUtilities.EXTENDED_PRODUCT_NAME_DATASTAGE2;
case DataTypeUtilities.PRODUCT_INFORMIX:
return DataTypeUtilities.EXTENDED_PRODUCT_NAME_INFORMIX;
case DataTypeUtilities.PRODUCT_MSSQL:
return DataTypeUtilities.EXTENDED_PRODUCT_NAME_MSSQL2;
case DataTypeUtilities.PRODUCT_ORACLE_REDO:
return DataTypeUtilities.EXTENDED_PRODUCT_NAME_ORACLE_REDO2;
case DataTypeUtilities.PRODUCT_ORACLE_TRIGGER:
return DataTypeUtilities.EXTENDED_PRODUCT_NAME_ORACLE_TRIGGER2;
case DataTypeUtilities.PRODUCT_SOLIDDB:
return DataTypeUtilities.EXTENDED_PRODUCT_NAME_SOLIDDB;
case DataTypeUtilities.PRODUCT_SYBASE:
return DataTypeUtilities.EXTENDED_PRODUCT_NAME_SYBASE2;
case DataTypeUtilities.PRODUCT_TERADATA:
return DataTypeUtilities.EXTENDED_PRODUCT_NAME_TERADATA;
case DataTypeUtilities.PRODUCT_TSES:
return DataTypeUtilities.EXTENDED_PRODUCT_NAME_TSES;
case DataTypeUtilities.PRODUCT_UDB:
return DataTypeUtilities.EXTENDED_PRODUCT_NAME_UDB2;
case DataTypeUtilities.DB_DB2_400:
return DataTypeUtilities.DB_DB2_400_NAME;
case DataTypeUtilities.DB_DB2_MVS:
return DataTypeUtilities.DB_DB2_MVS_NAME;
   case DataTypeUtilities.DB_CLASSIC:
      return DataTypeUtilities.DB_CLASSIC_NAME;
default:
return "Datastore type " + dataStoreType + " not found.";
}
}
If you connect to two of the data stores and then run the listDataStoreDetail() method, you can find additional information. The sample code to accomplish this task is shown in Example 9-23.
Example 9-23 Connecting to source and target data stores
Publisher db2DataStore = connectPublisherDataStore(accessServer,
"CDC_DB2");
if (db2DataStore != null) {
listDataStoreDetail(db2DataStore);
db2DataStore.disconnect();
}
Subscriber dataStageDataStore = connectSubscriberDataStore(
accessServer, "CDC_DataStage");
if (dataStageDataStore != null) {
listDataStoreDetail(dataStageDataStore);
dataStageDataStore.disconnect();
}
The output from Example 9-23 on page 281 is shown in Example 9-24.
Example 9-24 Output of data store details
Connected to publisher data store CDC_DB2
Data store name : CDC_DB2
Description : InfoSphere CDC DB2 6.5
Engine type : IBM DB2
Version : V6R5M0T0BCDC_HOHDLPJU_164
Connected to subscriber data store CDC_DataStage
Data store name : CDC_DataStage
Description : InfoSphere CDC DataStage 6.5
Engine type : IBM InfoSphere DataStage
Version : V6R5M0T0BCDC_HOHDLPJU_165_31
Now that you have seen how to connect to the Access Server and data stores, you can continue with managing subscriptions and table mappings, and start using the various classes related to Publisher and Subscriber.
There is one more technique that you can use: Locating the target data store for a subscription. A subscription is always created on a Publisher data store, and you do not find the name of the target data store in the subscription's properties. Instead, you find the host name (or IP address) and the port number to which connection must be established. Data store names can be flexibly assigned and are only known to the Access Server, not to the InfoSphere CDC
replication engines.
Example 9-25 shows the getTargetDataStoreForSubscription() method, which obtains the target data store for the passed subscription.
Example 9-25 Sample getTargetDataStoreForSubscription() method
/**
* Obtains the target data store for a subscription. Uses the subscriber host
* name (or IP address) and port number to locate the target data store of
* the subscription.
*
* @param accessServer
* - The Access Server to which you're connected
* @param subscription
* - Subscription for which the target data store must be found
* @return The Subscriber, null if the data store was not found
* @throws ApiException
*/
public Subscriber getTargetDataStoreForSubscription(
DataSource accessServer, Subscription subscription)
throws ApiException {
// Get the Publication for the Subscription
Context subCtx = subscription.getProperty();
String targetHost = subCtx.getString(Subscription.SubscriberLocation);
int targetPort = subCtx.getInt(Subscription.SubscriberPort);
ReplicationRole[] dataStores = accessServer.getAgents();
for (ReplicationRole dataStore : dataStores) {
if (dataStore.getHostName().equalsIgnoreCase(targetHost)
&& dataStore.getPort() == targetPort) {
System.out.println("Target datastore for subscription "
+ subscription.getName() + " is " + dataStore.getName()
+ ".");
return accessServer.getSubscriber(dataStore.getName());
}
}
return null;
}
9.5.5 Configuring InfoSphere CDC replication
The configuration of InfoSphere CDC is kept in a set of internal metadata tables that are associated with a InfoSphere CDC instance. The operation of subscriptions within the InfoSphere CDC instance entirely depends on the metadata. In most common implementations, you do not need to be aware of the metadata and where it is located because the Management Console GUI provides a subscription-centric view of the configuration and conceals the complexity of the underlying metadata database. When working with the InfoSphere CDC Linux, UNIX, and Windows engine, the configuration metadata tables are kept in a proprietary database. Only the InfoSphere CDC for DB2 on System i and InfoSphere CDC for DB2 on System z engines keep the metadata in database tables.
 
Important: When designing your automatic configuration process, follow the steps InfoSphere CDC uses to create the individual items in the metadata. When creating subscriptions, you must work from the source towards the target or you will not be able to link the source and the target tables and start the subscriptions. More importantly, when removing subscriptions from your configuration, the steps must be done (almost) in the reverse order or you end up with orphaned metadata in the target InfoSphere CDC instance. You will not be able to remove this metadata unless you delete and recreate the InfoSphere CDC instance.
The following sections describe the various steps that are needed to create a subscription and map tables. Try to align the steps with the actions that the InfoSphere CDC Management Console performs.
9.5.6 Creating a subscription
Creating a subscription is always done against the source InfoSphere CDC data store (Publisher). To create a subscription using the API, you must provide a number of obvious context variables, such as subscription name and subscription description. Other context variables require additional explanation. The example code for creating a subscription only provides basic attributes, such as the source data store, target data store, subscription name, and subscription description. The remaining parameters are derived from the data store definition or kept at their defaults. The sample code for creating a subscription is shown in Example 9-26.
Example 9-26 Creating a subscription
/**
* Create a subscription for a specific source data store and target data store
*
* @param accessServer
* - The Access Server to which you are connected
* @param sourceDataStore
* - The source data store of the subscription
* @param targetDataStore
* - The target data store of the subscription
* @param subscriptionName
* - Name of the subscription
* @param subscriptionDescription
* - Description of the subscription
* @return subscription
* @throws ApiException
*/
public Subscription createSubscription(DataSource accessServer,
Publisher sourceDataStore, Subscriber targetDataStore,
String subscriptionName, String subscriptionDescription)
throws ApiException {
Subscription subscription = null;
// Obtain the database login information of the target data store
// using the currently logged in Access Server user
AccessServerAgentAccessParameters[] targetAgentParms;
targetAgentParms = accessServer.getUserAgentProperties(accessServer
.getUserProfile().getUserID(), targetDataStore.getName());
String databaseUser = targetAgentParms[0].getDbLogin();
;
String databasePassword = targetAgentParms[0].getDbPassword();
// Prepare context for creating subscription
Context subCtx = new DefaultContext();
subCtx.setString(Subscription.SubscriptionDescription,
subscriptionDescription);
subCtx.setString(
Subscription.PublisherID,
((subscriptionName.length() > 8) ? subscriptionName.substring(
0, 8).toUpperCase() : subscriptionName.toUpperCase()));
subCtx.setString(
Subscription.PublisherIDPending,
((subscriptionName.length() > 8) ? subscriptionName.substring(
0, 8).toUpperCase() : subscriptionName.toUpperCase()));
subCtx.setString(Subscription.PublisherDescription,
subscriptionDescription);
subCtx.setString(Subscription.PublisherDescriptionPending,
subscriptionDescription);
subCtx.setString(Subscription.CommunicationProtocol,
TransformationServer
.getProtocolByID(TransformationServer.COMM_TCPIP));
subCtx.setString(Subscription.SubscriberOS,
targetDataStore.getOSPlatform());
subCtx.setString(Subscription.SubscriberDBPlatform,
targetDataStore.getDBPlatform());
subCtx.setString(Subscription.SubscriberVersion, targetDataStore
.getAgentVersion().substring(0, 4));
subCtx.setString(Subscription.SubscriberLocation,
targetDataStore.getHostName());
subCtx.setInt(Subscription.SubscriberPort, targetDataStore.getPort());
subCtx.setString(Subscription.DatabaseUser, databaseUser);
subCtx.setString(Subscription.DatabasePassword, databasePassword);
// Source IP address or host name
subCtx.setString(Subscription.PublisherLocation, "");
subCtx.setInt(Subscription.PublisherPort, 0);
// Source firewall port
// Define whether the source and target engines support MBCS
// automapping (CDC 6.5+)
if (sourceDataStore.isMBCSAutomappingSupported()
&& targetDataStore.isMBCSAutomappingSupported())
subCtx.setByte(Subscription.SubscriptionMBCSState, (byte) 2);
else
subCtx.setByte(Subscription.SubscriptionMBCSState, (byte) 1);
// Define whether the target engine can accept transferable work
// from the source engine
if (sourceDataStore.isSubscriptionTransferableWorkSupported())
subCtx.setByte(Subscription.SubscriptionTransferableWork, (byte) 1);
else
subCtx.setByte(Subscription.SubscriptionTransferableWork, (byte) 0);
// Now create subscription in source data store
subscription = sourceDataStore
.addSubscription(subscriptionName, subCtx);
System.out.println("Subscription " + subscriptionName + " created.");
return subscription;
}
The publisher ID is the identification of the subscription on the target data store (Subscriber) and has a maximum length of eight. In most cases, using the uppercase left eight characters of the subscription name establishes the publisher ID. However, if you have multiple subscriptions with almost matching names, such as NEW_SUBSCRIPTION_1 and NEW_SUBSCRIPTION_2, this action leads to duplicate publisher IDs (NEW_SUBS) and the Subscriber does not create the Publication when you describe the second subscription.
Ensure that you create unique names for the publisher ID for each target data store. You can use the getPublicationNames() method against the target data store to get a list of existing publisher IDs.
The PublisherIDPending argument must be populated with the publisher ID you want to assign to the subscription (identification of a subscription in the target metadata). After the subscription source metadata has been sent to the target and the uniqueness of the publisher ID has been confirmed, PublisherIDPending is blanked out and PublisherID is populated with the specified value. The same situation applies to PublisherDescriptionPending.
To have the subscription successfully find the target InfoSphere CDC engine, you must specify the host / IP address and port number to establish the connection. Additionally, the source engine needs to understand the format of the information that must be sent across to the target. For that reason, the operating system platform and database platform parameters must be specified.
During subscription creation, you can determine whether the source and target engines support multibyte character set (MBCS) automapping. MBCS is useful when specifying the encoding conversions for the columns in 9.5.13, “Encoding conversions (before and after Version 6.5)” on page 315.
After the subscription has connected to the target engine, it connects to the target database using the database user and password specified in the subscription parameters. For security reasons, the database credentials must be specified in the subscription to confirm the source system is allowed to send data to the target database.
9.5.7 Procedure for mapping tables
Figure 9-14 shows the successive steps that must be performed for table mappings in InfoSphere CDC.
Figure 9-14 Table mapping process
The process includes the functions of add catalog, select, describe, assign, and set method, as described in the following list:
1. Add Source Tables to Catalog.
Before mapping any source table in a subscription, the table must exist in the InfoSphere CDC catalog. Among other things, this separate store is needed to determine if the real table, as defined in the source database, matches with what InfoSphere CDC expects to find when parsing entries in the database log. When you add a table to the catalog, InfoSphere CDC retrieves the table structure from the database and stores information about the table location (path), internal database object ID, column names, data types, and length in its Publisher metadata. A table that is located in the catalog does not necessarily have to be used in any subscription, but can be used by one or more subscriptions. Only tables that are used as a source for replication must be added to the catalog; tables that are only used as a target do not have to be present in the catalog.
2. Map Source Tables to Subscription (Select).
This process takes the tables from the catalog and selects them for replication in the subscription. When selecting the tables for replication, you can also configure the row and column filtering and code page conversions. This step is the final step on the source side to configure tables to
be replicated.
3. Describe Subscription.
To inform the target InfoSphere CDC engine that tables will be published, the subscription definition and the definitions for selected tables must be sent to the target engine. In InfoSphere CDC, this activity is called describing the subscription. The subscription definition (Subscription) is sent to the target and creates a publication (Publication) object. Also, all definitions of tables that have been selected for replication in the subscription are sent to the target. The SubscribedTable objects on the source are turned into PublishedTable objects on the target. When describing a subscription, the source InfoSphere CDC engine makes a connection to the target InfoSphere CDC engine through the same channel that is normally used for replicating the data changes. If the path to the target server (host or port) or connection to the database (database name, user and password) cannot be established, the describe process fails, and the target metadata is not populated.
4. Assign Source Tables to Destination.
After the source tables are known on the target InfoSphere CDC engine, they can be linked to their destination, for example, the target database tables. Through this linkage, the target InfoSphere CDC engine knows how to direct incoming database operations to the designated destination and which column mapping to apply.
5. Set Replication Method.
The final step in the mapping process is to set the replication method (Mirror or Refresh). Replication methods are kept with the source table, as they tell the source engine whether to refresh the table contents (Refresh) or the changes from the database log (Mirror). Even though the replication method could be set in an earlier step, you should set it as the final step. Depending on the engine, logging (that is, DATA CAPTURE CHANGES flag for DB2 for Linux, UNIX, and Windows, table full supplemental logging for Oracle, publication for SQL Server, and journaling for DB2/400) for the source table in question must be activated if a table replication method is Mirror. Most database management systems require a short exclusive lock on the table to activate / deactivate the logging. By setting the replication method as the final step, the table mapping has already been accomplished. Should the method not be able to acquire an exclusive lock on the table in question, only this step must be repeated instead of the entire mapping process.
9.5.8 Table mapping example
This example creates a simple table mapping from Oracle to DB2. The example first addresses the individual steps and, at the end of this section, ties
them together.
The sample code for the method shown in Example 9-27 adds a table to the Publisher catalog if it does not exist yet.
Example 9-27 Add a table to the catalog
/**
* Add a table to the source data store catalog
*
* @param sourceDataStore
* @param tablePath
* @param tableName
* @return publisableTable
* @throws ApiException
*/
public PublishableTable addTableCatalog(Publisher sourceDataStore,
String tablePath, String tableName) throws ApiException {
PublishableTable publishableTable = null;
// Build DBTable object to represent source table
DBTable newTable = sourceDataStore.getTable(
sourceDataStore.getDBPath(tablePath), tableName);
Catalog sourceCatalog = sourceDataStore.getCatalog();
// Only add table to the catalog if not already existing
if (!sourceCatalog.isTablePublishable(newTable)) {
sourceCatalog.addTable(newTable);
System.out.println("Table " + tablePath + "." + tableName
+ " added to catalog of datastore "
+ sourceDataStore.getName() + ".");
} else {
System.out.println("Table " + tablePath + "." + tableName
+ " already existed in catalog of datastore "
+ sourceDataStore.getName() + ".");
}
publishableTable = sourceCatalog.getPublishableTable(tablePath,
tableName);
return publishableTable;
}
A table can be replicated by multiple subscriptions, so the table could have already been added to the catalog. To determine if the table has already been added, you must create a DBTable object that is composed of a DBPath object that represents the schema and a string representing the table name. In some databases, such as DB2 on System z, the table is not qualified by a schema name, but a hierarchy of qualifiers.
For historical reasons within the InfoSphere CDC describe processing, you need to specify the pending publisher ID and pending publisher description when creating a subscription. When doing a description for the first time, InfoSphere CDC validates the publisher ID on the target side for uniqueness and, if accepted, the pending publisher ID and pending publisher description are blanked out. This action is not apparent to the user and probably is of no concern for definitions using the Java APIs, but is mentioned for completeness if you choose to analyze the subscription attributes.
After the subscription exists, you can select the tables to be replicated in the subscription. However, only tables that have been previously added to the catalog can be added to the subscription. The sample code to perform this action is shown in Example 9-28.
Example 9-28 Select source table for replication
/**
* Select a source table to the subscription and set its method-status
* to Refresh-Idle
*
* @param sourceDataStore
* - The source data store of the subscription
* @param subscription
* - Subscription
* @param publishableTable
* - Table from the catalog to be selected
* @return SubscribedTable
* @throws ApiException
*/
public SubscribedTable selectTable(Publisher sourceDataStore,
Subscription subscription, PublishableTable publishableTable)
throws ApiException {
// Select the table to the subscription
subscription.addTable(publishableTable);
SubscribedTable subscribedTable = subscription.getSubscribedTable(
publishableTable.getUpperPath(), publishableTable.getName());
// Set method to Refresh and status to Parked
try {
subscribedTable.setReplicationMethod(
SubscribedTable.METHOD_REFRESH, null, (byte) 0);
subscribedTable.setReplicationStatus(SubscribedTable.STATUS_IDLE);
} catch (ApiException e) {
e.printStackTrace();
}
System.out.println("Table " + publishableTable.getUpperPath().getName()
+ "." + publishableTable.getName()
+ " selected to subscription " + subscription.getName() + ".");
return subscribedTable;
}
Although you want to set the eventual replication method and status immediately after having added the table to the subscription, set this setting in a separate step. This separate step avoids the wait for table locks if supplemental logging must be activated on the database table.
The source side of the subscription is now finished and you can describe the Publisher side subscription metadata to the target (Subscriber) side. When starting the description, the source InfoSphere CDC engine establishes a connection to the target InfoSphere CDC engine using the host name and port used when creating the subscription. If the target engine responds, it tries to connect to the target database / application using the database user and password specified when the subscription was created. The sample code for describing the subscription is shown in Example 9-29.
Example 9-29 Describing the subscription
/**
* Describe the subscription to the target data store
*
* @param subscription
* @param targetDataStore
* @throws ApiException
* @throws InterruptedException
*/
public void describeSubscription(Subscription subscription,
Subscriber targetDataStore) throws ApiException,
InterruptedException {
System.out.println("Describing subscription " + subscription.getName()
+ ".");
subscription.describe();
waitForDescribe(subscription, targetDataStore);
}
After the describe() method has been issued for the subscription, it could take a few seconds or longer before the metadata has arrived on the Subscriber side. Your custom code needs to wait long enough to allow the description to complete and yet not wait too long if the description has failed. Unfortunately, there is no built-in method available to verify whether the description has finished and succeeded. The procedure that is most successful is a combination of a small wait time after the description has been submitted and then checking the existence of the target metadata. The sample code for this action is shown in Example 9-30.
Example 9-30 Waiting for the description to complete
/**
* Waiting for the subscription to be described
*
* @param subscription
* - Subscription to be described
* @param targetDataStore
* - The target data store of the subscription
* @throws UnsupportedFeatureException
* @throws ApiException
* @throws InterruptedException
*/
private void waitForDescribe(Subscription subscription,
Subscriber targetDataStore) throws ApiException,
InterruptedException {
int minWaitDescribeStart = 2000;
// Minimum time the describe will take int maxWaitDescribeStop = 1800000;
// Maximum wait time for describe to be finished
int maxWaitPublication = 30000;
// Maximum time to wait for publication created/deleted
int maxWaitTableDescribed = 10000;
// Maximum time to wait for tables describe (per table)
int waitInterval = 300; // Wait interval
// First give the describe some time to start
Thread.sleep(minWaitDescribeStart);
// Wait until the subscription is no longer active
int waitedTimeDescribe = 0;
byte[] status;
do {
System.out
.println("Waiting until describe activity of subscription "
+ subscription.getName() + " finished.");
Thread.sleep(waitInterval);
status = subscription.getLiveActivityStatus();
waitedTimeDescribe += waitInterval;
if (status[1] != Subscription.LIVE_STATUS_IDLE
&& waitedTimeDescribe > maxWaitDescribeStop) {
throw new ApiException(
"Timeout while waiting for describe for subscription "
+ subscription.getName() + " to finish.");
}
} while (status[1] != Subscription.LIVE_STATUS_IDLE);
// Retrieve all selected tables for the subscription
ArrayList<SubscribedTable> subscribedTables = new ArrayList<SubscribedTable>();
for (DBPath dbPath : subscription.getSubscribedTableDBPaths()) {
for (SubscribedTable subscribedTable : subscription
.getSubscribedTables(dbPath)) {
subscribedTables.add(subscribedTable);
}
}
subscribedTables.trimToSize();
if (subscribedTables.size() != 0) {
// Wait for publication to be created
int waitedTimePublication = 0;
Publication publication = null;
do {
System.out
.println("Waiting for creation of the publication for subscription "
+ subscription.getName() + ".");
Thread.sleep(waitInterval);
waitedTimePublication += waitInterval;
publication = getSubscriptionPublication(subscription,
targetDataStore);
if (publication == null
&& waitedTimePublication > maxWaitPublication) {
throw new ApiException(
"Timeout while waiting for publication to be created for "
+ subscription.getName() + ".");
}
} while (publication == null);
// Wait for all source tables to be described
int waitedTimeTablesDescribed = 0;
int numberSubscribedTables = subscribedTables.size();
int numberTablesDescribed = 0;
while (numberTablesDescribed < numberSubscribedTables) {
publication.refresh();
numberTablesDescribed = 0;
for (SubscribedTable subscribedTable : subscribedTables) {
PublishedTable publishedTable = getPublishedTableForSubscribedTable(
subscription, subscribedTable, targetDataStore);
if (publishedTable != null)
numberTablesDescribed += 1;
}
System.out.println("Waiting for describe of "
+ numberSubscribedTables
+ " tables. Remaining number of tables: "
+ (numberSubscribedTables - numberTablesDescribed)
+ ".");
Thread.sleep(waitInterval);
waitedTimeTablesDescribed += waitInterval;
if (subscribedTables.size() > 0
&& waitedTimeTablesDescribed > (numberSubscribedTables *                     maxWaitTableDescribed))
throw new ApiException(
"Timeout while waiting for all tables to be described.");
}
} else {
// Wait until Publication no longer exists (maximum 30 seconds)
int waitedTime = 0;
Publication publication;
do {
System.out
.println("Waiting for deletion of publication for subscription "
+ subscription.getName() + ".");
targetDataStore.refresh();
publication = getSubscriptionPublication(subscription,
targetDataStore);
Thread.sleep(waitInterval);
if (publication == null && waitedTime > maxWaitPublication)
throw new ApiException(
"Timeout while waiting for publication to be removed for "
+ subscription.getName() + ".");
} while (publication != null);
// No more tables in the subscription
}
It is best to define a method to retrieve the Publication object for a subscription, as the link between source and target metadata must be made when doing automated mappings. The sample code for this action is shown in Example 9-31.
Example 9-31 Retrieving the Publication object for a subscription
/**
* Get the Publication object for a subscription through the Publisher ID.
* As long as the subscription has not yet been described successfully, the
* Pending Publisher ID must be used to identify the Publication on the
* target data store.
*
* @param subscription
* - Subscription for which you want to obtain the Publication
* @param targetDataStore
* - The target data store of the subscription
* @return The Publication that was found
* @throws ApiException
*/
public Publication getSubscriptionPublication(Subscription subscription,
Subscriber targetDataStore) throws ApiException {
// Get the Publication for the Subscription
Context subCtx = subscription.getProperty();
String publisherID = (subCtx.getString(Subscription.PublisherID)
.isEmpty() ? subCtx.getString(Subscription.PublisherIDPending)
: subCtx.getString(Subscription.PublisherID));
Publication publication = targetDataStore.getPublication(publisherID);
return publication;
}
The first part of the sample code codes an initial wait of two seconds. This wait time allows a subscription to be started. An alternative to using the wait is to poll for subscription describe activity. However, on some fast servers, the description process could last a short time and you run the chance of missing the description process activity.
The second part of the sample code builds the list of SubscribedTables (selected tables) for the subscription. This action is primarily done to determine whether you expect the Publication to be created and exist after the description has completed or whether it should be removed. Removal of the publication is done by the description process when all table mappings have been removed. More details about this action can be found in 9.5.9, “Procedure for removing mapped tables” on page 303.
In the next phase, you wait for the subscription to become inactive (status[1]!=Subscription.LIVE_STATUS_IDLE). Typically, the description process takes a few seconds, but in some environments with slow or unstable networks, and if there are many tables to describe, it could take longer, so set the timeout value to 1800 seconds.
After the description is complete, the Publication has been created in the target metadata. If it has not yet been created, any action on the Publication could result in a null pointer exception.
The code iterates through the list of subscribed (selected) tables for the subscription and checks if the PublishedTable object has already been created on the target side. The PublishedTable object identification (path and table name) is not always an exact match with the SubscribedTable object identification. Sometimes aliasing is applied to the source tables to reduce the length of schema and table names, and any lowercase source table names are translated to uppercase (for example, when the source is IBM Informix®). Use a special method, getPublishedTableForSubscribedTable(), to retrieve the PublishedTable object for a SubscribedTable. The code for getPublishedTableForSubscribedTable can be found in Example 9-32.
Example 9-32 Retrieving the PublishedTable for a SubscribedTable
   /**
* Retrieves the PublishedTable (target) object for a SubscribedTable
* (source) object. First, the alias of the SubscribedTable name and path is
* retrieved through its PublishableTable object that is part of the
* Catalog. Subsequently, the method tries to retrieve the PublishedTable
* object through the alias. If not found, an attempt is made to retrieve
* through the full table identification.
*
* @param subscription
* The subscription that holds the SubscribedTable object.
* @param subscribedTable
* SubscribedTable object for which the PublishedTable object
* must be retrieved.
* @param targetDataStore
* The target data store which holds the target metadata for the
* passed subscription.
* @return The PublishedTable object that was found, else null.
* @throws ApiException
*/
private PublishedTable getPublishedTableForSubscribedTable(
Subscription subscription, SubscribedTable subscribedTable,
Subscriber targetDataStore) throws ApiException {
Publisher sourceDataStore = subscription.getPublisher();
Catalog catalog = sourceDataStore.getCatalog();
String sourceTablePath = subscribedTable.getUpperPath().getFullName();
String sourceTableName = subscribedTable.getName();
PublishableTable publishableTable = catalog.getPublishableTable(
sourceTablePath, sourceTableName);
String sourceTablePathAlias = publishableTable.getPathAlias();
String sourceTableNameAlias = publishableTable.getTableAlias();
Publication publication = getSubscriptionPublication(subscription,
targetDataStore);
// First check if the PublishedTable can be retrieved using the alias
PublishedTable publishedTable = publication.getPublishedTable(
targetDataStore.createDBPath(sourceTablePathAlias),
sourceTableNameAlias);
if (publishedTable == null) {
try {
publishedTable = publication.getPublishedTable(
targetDataStore.createDBPath(sourceTablePath),
sourceTableName);
} catch (Exception ignore) {
}
}
return publishedTable;
}
The maximum wait time depends on the number of tables to be described (10 seconds per table).
The final piece of code of the waitForDescribe() method is for the situation where there are no tables selected for the subscription. If that is the case, the description process removes the Publication from the target metadata. This condition is the condition that the loop waits for.
Now that the PublishedTable objects have been created on the target side, you can link them to the target tables (assign activity), as shown in Example 9-33.
Example 9-33 Linking published table objects to target tables (assign)
/**
* @param subscription
* - Subscription for which the table must be assigned
* @param targetDataStore
* - Target data store of the subscription
* @param subscribedTable
* - Subscribed (selected) table
* @param targetTablePath
* - Schema (path) of the target table
* @param targetTableName
* - Name of the target table
* @throws ApiException
*/
public void assignTable(Subscription subscription,
Subscriber targetDataStore, SubscribedTable subscribedTable,
String targetTablePath, String targetTableName) throws ApiException {
// Get the Publication for the Subscription
Context subCtx = subscription.getProperty();
String publisherID = (subCtx.getString(Subscription.PublisherID)
.isEmpty() ? subCtx.getString(Subscription.PublisherIDPending)
: subCtx.getString(Subscription.PublisherID));
Publication publication = targetDataStore.getPublication(publisherID);
// Get the PublishedTable associated with the SubscribedTable object
DBPath publishedTableDBPath = targetDataStore
.createDBPath(subscribedTable.getUpperPath().getFullName()
.toUpperCase());
String publishedTableName = subscribedTable.getName().toUpperCase();
PublishedTable publishedTable = publication.getPublishedTable(
publishedTableDBPath, publishedTableName);
// Map the table to the target table using standard mapping type
String targetDatabase = "";
String destinedMember = null;
String indexLibrary = null;
String indexName = null;
publishedTable.assign(targetTableName, targetTablePath, targetDatabase,
destinedMember, indexLibrary, indexName,
PublishedTable.STANDARD);
System.out.println("Table " + subscribedTable.getUpperPath().getName()
+ "." + subscribedTable.getName() + " assigned to "
+ targetTablePath + "." + targetTableName + ".");
TableAssignment tableAssignment = publishedTable.getTableAssignment();
// Loop through all target columns and map values depending on name
String[] destinedColumnNames = tableAssignment.getDestinedColumnNames();
for (String targetColumnName : destinedColumnNames) {
ColumnAssignment columnAssignment = tableAssignment
.getColumnAssignment(targetColumnName);
if (targetColumnName.equals("AUD_TIMESTAMP")) {
columnAssignment.mapTo(ColumnAssignment.MAP_JOURNAL_CONTROL,
"&TIMSTAMP");
System.out
.println("Journal control column &TIMSTAMP assigned to column "
+ targetColumnName + ".");
} else if ((targetColumnName.equals("APPLY_TIMESTAMP"))) {
columnAssignment.mapTo(ColumnAssignment.MAP_CURRENT_DATE);
System.out
.println("Default value CURRENT DATE assigned to column "
+ targetColumnName + ".");
}
}
}
The example has been kept straightforward to accommodate most of the mapping scenarios you want to accomplish. PublishedTable.assign() has a number of overloaded methods suited for different mapping types. For Standard, Adaptive Apply, and LiveAudit mapping types, the simplest method to use is the one used in the assignTable() example. When leaving the index name and library blank, the InfoSphere CDC engine automatically chooses the most appropriate unique key index when starting the apply process.
A number of parameters, such as targetDatabase, destinedMember, indexLibrary, and indexName, are passed as null. The destinedMember parameter is only applicable to DB2 on System i, and the indexLibrary and indexName parameters are only applicable if you want to choose the index that InfoSphere CDC should use to determine the unique keys of the target table. The targetDatabase parameter is applicable if the target engine is InfoSphere CDC for DB2 on z/OS. This engine is the only engine that requires the target database and uses a composite table path (database.schema). The last argument (PublishedTable.STANDARD) indicates the apply method to be used for this mapped table.
When the table is assigned, InfoSphere CDC automatically maps any source and target columns that have the same names and compatible data types. This behavior is the same that the Management Console exposes. In the example, after the table has been successfully assigned, you also iterate through the columns and map a journal control column and constant value to the
target column.
If the target InfoSphere CDC engine is associated with a database (as in this example), use the target columns as a basis to configure the column mappings. For a few engines, such as InfoSphere CDC for DataStage and InfoSphere CDC Event Server, the PublishedTable object is the basis for all columns being mapped to the target engine. Target columns are not available in these engines.
Now that the source table has been assigned to a target, the table mapping is completed. However, during the table selection (selectTable) processing, set the replication method to Refresh and status to Parked to avoid potential failure when activating supplemental logging for the source tables.
Similar to what the Management Console does, set the replication method as the final step. If the method is set to mirror and supplemental logging cannot be started (for example, because of locks on the table), at least you have completed the table mapping. You can then change the replication methods through the Management Console or use the API without having to worry about incomplete table mappings. The sample code to accomplish this task is shown in Example 9-34.
Example 9-34 Setting the replication method
   /**
* @param subscribedTable
* - Source table selected for the subscription
* @throws ApiException
*/
public void setReplicationMethod(
SubscribedTable subscribedTable) throws ApiException {
DBTable journalTable = null;
subscribedTable.setReplicationMethod(
SubscribedTable.METHOD_MIRROR, journalTable,
SubscribedTable.MEMBER_SELECTION_SINGLE);
subscribedTable
.setReplicationStatus(SubscribedTable.STATUS_REFRESH);
System.out.println("Replication method and status set"
+ " to Mirror-Refresh for table "
+ subscribedTable.getUpperPath().getName() + "."
+ subscribedTable.getName() + ".");
}
If the replication method is set to mirror (METHOD_MIRROR), InfoSphere CDC determines if supplemental logging (that is, journaling and data capture changes) has already been enabled for the source table in question. If not, InfoSphere CDC activates logging automatically depending on the source engine you are running. The journalTable parameter is only applicable to InfoSphere CDC for Oracle Trigger-based, and the member selection final parameter is applicable to DB2 for System i source tables that have multiple members.
Now that you have all the individual methods to map tables, place them in sequence to create a subscription and map two tables. The sample code for that task is shown in Example 9-35.
Example 9-35 Create subscription and map two tables
Publisher oracleDataStore = connectPublisherDataStore(accessServer,
"CDC_Oracle_Redo");
Subscriber db2DataStore = connectSubscriberDataStore(accessServer,
"CDC_DB2");
if (oracleDataStore != null && db2DataStore != null) {
Subscription subscription = null;
// Create the subscription
subscription = createSubscription(accessServer, oracleDataStore,
db2DataStore, "REDSCRIPT", "Redscript example");
// Add the tables to the catalog
PublishableTable[] publishableTables = new PublishableTable[2];
publishableTables[0] = addTableCatalog(oracleDataStore, "CDCDEMO",
"DEMO_CUSTOMER");
publishableTables[1] = addTableCatalog(oracleDataStore, "CDCDEMO",
"PRODUCT");
System.out.println("Schema alias: "
+ publishableTables[0].getPathAlias());
System.out.println("Table alias: "
+ publishableTables[0].getTableAlias());
// Select tables to subscription
SubscribedTable[] subscribedTables = new SubscribedTable[2];
subscribedTables[0] = selectTable(oracleDataStore, subscription,
publishableTables[0]);
subscribedTables[1] = selectTable(oracleDataStore, subscription,
publishableTables[1]);
// Describe the subscription
describeSubscription(subscription, db2DataStore);
// Assign the tables
assignTable(subscription, db2DataStore, subscribedTables[0],
"CDCDEMO", "DEMO_CUSTOMER_TARGET");
assignTable(subscription, db2DataStore, subscribedTables[1],
"CDCDEMO", "PRODUCT_TARGET");
// Set the replication method for the tables
setReplicationMethod(subscribedTables[0]);
setReplicationMethod(subscribedTables[1]);
}
// Disconnect from the data stores
if (oracleDataStore != null)
oracleDataStore.disconnect();
if (db2DataStore != null)
db2DataStore.disconnect();
After the snippet has finished running, you can open the subscription in the Management Console and show the table mappings. If you select the source data store and open the replication tables, you see the tables. Figure 9-15 shows the table mapping after running the code. The CDCDEMO.DEMO_CUSTOMER and CDCDEMO.PRODUCT source tables have been added to the catalog (Replication Tables) and are also mapped in subscription REDSCRIPT. The columns AUD_TIMESTAMP and APPLY_TIMESTAMP have been mapped to journal control column &TIMSTAMP and initial value CURRENT DATE.
Figure 9-15 Management Console -Table Mapping
9.5.9 Procedure for removing mapped tables
You must follow the procedure for removing mapped tables (unassign, deselect, describe, and remove the catalog) exactly to avoid orphaned metadata in the source or target agent's metadata. The Java API cannot remove orphaned metadata; only the Management Console can do so. Figure 9-16 shows the successive steps that must be performed to unmap tables in a subscription and remove the table from the catalog.
Figure 9-16 InfoSphere CDC - table mapping delete process
The steps in Figure 9-16 are briefly described in the following list:
1. Deassign Target Tables.
This step removes the link between the source and target table and removes any information about mapped columns, operations, and user exits that have been configured for the table mapping. At this stage, it does not remove the source table entry in the target engine's metadata, and the bookmark of the subscription and marked table capture points are kept.
2. Remove Source Table from Subscription (deselect).
When removing the source table from the subscription, the subscription no longer replicates the changes for this table. Also, any source side configuration of the previously mapped table, such as column filtering, row filtering, and code page conversions, is removed from the source metadata. When deselecting tables at the source, the marked table capture point is lost. When selecting the same table to the subscription again, it assumes a new capture point.
3. Describe
Describing the subscription causes the InfoSphere CDC source engine to communicate with the InfoSphere CDC target engine. The target InfoSphere CDC engine is informed about the tables that are still replicated according to the source engine. Any table that is no longer selected for replication in the subscription (source side) and has been unassigned on the target side is removed from the target metadata. Additionally, if there are no more tables replicated, it removes the publication entry of the subscription.
It is important to stress that any table that is still assigned on the target InfoSphere CDC engine is not removed from the target metadata and is orphaned. Additionally, you could delete the subscription from the source metadata and even leave the publication orphaned on the target side. After a publication has been orphaned, you can no longer remove it from the target engine's metadata using the API. You either must remove it through the Management Console or recreate the entire InfoSphere CDC instance.
4. Remove Tables from Catalog (optional).
Database tables that are no longer mapped by any subscription can be removed from the source engine's catalog. InfoSphere CDC does not allow you to remove catalog entries of any tables that are still referenced in a subscription, and throws an exception if you try to remove it.
9.5.10 Table mapping removal example
This example removes the previously created subscription and its table mappings. Also, the tables re removed from the source data store catalog. The individual steps are being addressed and then they are tied together in consecutive calls.
First, remove tables is to unlink the target tables from the source tables (the deassign process). Remember that it is important to perform the deassignment to allow the description process to remove the published table eventually. The sample code for this task is shown in Example 9-36.
Example 9-36 Deassign a table
/**
* @param sourceDataStore
* - Source data store of the subscription
* @param targetDataStore
* - Target data store of the subscription
* @param subscription
* - Subscription for which the table must be deassigned
* @param sourceTablePath
* - Schema (path) of the source table
* @param sourceTableName
* - Name of the source table
* @throws ApiException
*/
public void deassignTable(Publisher sourceDataStore,
Subscriber targetDataStore, Subscription subscription,
String sourceTablePath, String sourceTableName) throws ApiException {
// Get the Publication for the Subscription
Publication publication = getSubscriptionPublication(subscription,
targetDataStore);
// Get the PublishedTable object
PublishedTable publishedTable = publication.getPublishedTable(
sourceDataStore.getDBPath(sourceTablePath), sourceTableName);
publishedTable.deassign();
System.out.println("Table " + sourceTablePath + "." + sourceTableName
+ " deassigned from publication "
+ publication.getPublisherID() + ".");
}
We chose to only provide limited information for deassigning the tables, that is, the source data store, target data store, subscription, source table path, and source table name. Using these keys, the PublishedTable object can be retrieved from the target data store (through the Publication).
It might seem counter-intuitive to provide the source table path and source table name, but the table assignment is kept under the PublishedTable object, which is effectively the equivalent of the SubscribedTable (selected table) on the source side. The table's schema (table path) can be established by the source data store getDBPath() method.
Now deselect the source table from the subscription. The sample code for this action is shown in Example 9-37.
Example 9-37 Deselect the source table from the subscription
/**
* Deselect a source table from the subscription
*
* @param sourceDataStore
* - The source data store of the subscription
* @param subscription
* - Subscription
* @param tablePath
* @param tableName
* @throws ApiException
*/
public void deselectTable(Publisher sourceDataStore,
Subscription subscription, String tablePath, String tableName)
throws ApiException {
SubscribedTable subscribedTable = subscription.getSubscribedTable(
sourceDataStore.getDBPath(tablePath), tableName);
subscription.removeTable(subscribedTable);
System.out.println("Table " + tablePath + "." + tableName
+ " deselected from subscription " + subscription.getName()
+ ".");
}
After the table has been deselected from the subscription, the description process must be run to remove the PublishedTable object from the target Publication. The sample code for this action is shown in Example 9-38.
Example 9-38 Remove a table from source data store catalog
/**
* Remove a table from the source data store catalog
*
* @param sourceDataStore
* @param tablePath
* @param tableName
* @throws ApiException
*/
public void deleteTableCatalog(Publisher sourceDataStore, String tablePath,
String tableName) throws ApiException {
Catalog sourceCatalog = sourceDataStore.getCatalog();
PublishableTable publishableTable = sourceCatalog.getPublishableTable(
tablePath, tableName);
if (publishableTable != null) {
sourceCatalog.removeTable(publishableTable, true);
System.out.println("Table " + tablePath + "." + tableName
+ " removed from catalog of datastore "
+ sourceDataStore.getName() + ".");
} else {
System.out.println("Table " + tablePath + "." + tableName
+ " does not exist in catalog of datastore "
+ sourceDataStore.getName() + ".");
}
}
When setting the description, any PublishedTable object that is no longer assigned and has no equivalent selected table (SubscribedTable) in the subscription is removed from the target metadata. The description process is the same as described in 9.5.8, “Table mapping example” on page 289, but if there are no more selected tables for the subscription, the waitDescribe() method waits for the removal of the Publication object from the target metadata.
If the source table is no longer used in any subscription, it can also be removed from the source data store catalog.
Removal of the table from the catalog is optional. However, if the structure of the source table changes over time, for example, when columns are added or removed, the information stored in the InfoSphere CDC catalog would no longer apply. Therefore, always remove tables from the catalog after they are no
longer needed.
The method shown in Example 9-39 removes the subscription from the source data store.
Example 9-39 Removal of the subscription from the source data store
/**
* Delete a subscription from the source data store. The method first checks
* that the publication has already been removed from the target data store
* before allowing to delete the subscription.
*
* @param sourceDataStore
* - Source data store from which the subscription must be removed
* @param subscriptionName
* - The name of the subscription you want to delete
* @param targetDataStore
* - The target data store of the subscription
* @throws ApiException
*/
public void deleteSubscription(Publisher sourceDataStore,
Subscription subscription, Subscriber targetDataStore)
throws ApiException {
// Verify that the publication no longer exists
Publication publication = getSubscriptionPublication(subscription,
targetDataStore);
if (publication != null) {
throw new ApiException("Publication with publisher ID "
+ publication.getPublisherID()
+ " still exists, cannot delete subscription.");
} else {
sourceDataStore.removeSubscription(subscription);
}
}
In the method, check that the Publication in the target data store has been removed, which prevents orphaned metadata.
Now that you have the individual methods to remove table mappings and delete a subscription, you can run them in sequence and remove the table mappings and subscriptions that had been created. The sample code for this action is shown in Example 9-40.
Example 9-40 Running the methods in sequence
Publisher oracleDataStore = connectPublisherDataStore(accessServer,
"CDC_Oracle_Redo");
Subscriber db2DataStore = connectSubscriberDataStore(accessServer,
"CDC_DB2");
if (oracleDataStore != null && db2DataStore != null) {
/*
* Delete the subscription and catalog tables
*/
Subscription subscription = oracleDataStore
.getSubscription("REDSCRIPT");
// Deassign the tables from the subscription
deassignTable(oracleDataStore, db2DataStore, subscription,
"CDCDEMO", "DEMO_CUSTOMER");
   deassignTable(oracleDataStore, db2DataStore, subscription,
"CDCDEMO", "PRODUCT");
// Deselect the tables from the subscription
deselectTable(oracleDataStore, subscription, "CDCDEMO",
"DEMO_CUSTOMER");
deselectTable(oracleDataStore, subscription, "CDCDEMO", "PRODUCT");
// Describe the subscription
describeSubscription(subscription, db2DataStore);
// Delete the tables from the catalog
deleteTableCatalog(oracleDataStore, "CDCDEMO", "DEMO_CUSTOMER");
deleteTableCatalog(oracleDataStore, "CDCDEMO", "PRODUCT");
// Delete the subscription
deleteSubscription(oracleDataStore, subscription, db2DataStore);
}
// Disconnect from the data stores
if (oracleDataStore != null)
oracleDataStore.disconnect();
if (db2DataStore != null)
db2DataStore.disconnect();
 
Example 9-39a Result of deleting the subscription
Connected to publisher data store CDC_Oracle_Redo
Connected to subscriber data store CDC_DB2
Table CDCDEMO.DEMO_CUSTOMER deassigned from publication REDSCRIPT.
Table CDCDEMO.PRODUCT deassigned from publication REDSCRIPT.
Table CDCDEMO.DEMO_CUSTOMER deselected from subscription REDSCRIPT.
Table CDCDEMO.PRODUCT deselected from subscription REDSCRIPT.
Describing subscription REDSCRIPT.
Waiting until describe activity of subscription REDSCRIPT finished.
Waiting for deletion of publication REDSCRIPT for subscription REDSCRIPT.
Table CDCDEMO.DEMO_CUSTOMER removed from catalog of data store CDC_Oracle_Redo.
Table CDCDEMO.PRODUCT removed from catalog of data store CDC_Oracle_Redo.
Disconnected from Access Server.
9.5.11 Row and column filtering
Many clients use InfoSphere CDC to exclude certain rows and columns from the replication process, which is called row filtering and column filtering. By filtering rows or columns, the volume of changes sent to the target can be dramatically reduced. Filtering columns that you do not need on the target side has an additional positive side effect: If an update only affects non-replicated columns, the entire change is not sent to the target.
A good example is the CRM application that keeps a record of users accessing customer opportunity information. Every table has a column that indicates when a record was last retrieved and the user who retrieved it, and this column is updated on every read operation (the reading of a record automatically becomes an update). If the information about when a user retrieves the record is not important for target processing, you can clear the last retrieve time stamp and the user columns (column filtering) and reduce the number of operations that are sent to the target.
Row and column filtering is configured at the source side, in the SubscribedTable object. Configure the row and column selection (especially the column selection) when the table is selected to the subscription. If columns are cleared later in the process, this action affects the PublishedTable object on the target, which requires that the subscription be described again. If you forget to redescribe the subscription after having cleared columns from replication, the subscription is not run until it has been described again.
Example 9-41 shows the setFilter() method that accepts a SubscribedTable object as an argument. If the table has a STATUS column, this column is used to set the row filtering for the table in question. If the table has a TIMESTAMP_UPDATED column, it is cleared from the replicated columns.
Example 9-41 Setting the row and column filtering
/**
* Sets the row filter to STATUS='A' if the table has a STATUS column.
* Clears the TIMESTAMP_UPDATED column from replication
*
* @param subscribedTable
* - The table that is selected to the subscription
* @throws ApiException
*/
public void setFilter(SubscribedTable subscribedTable) throws ApiException {
// Set the row filter if the replicated table has a STATUS column
if (subscribedTable.getColumn("STATUS") != null) {
subscribedTable.setRowSelection("STATUS='A'",
SubscribedTable.ROW_SELECTION_SELECT);
System.out.println("Row selection STATUS='A' set for table "
+ subscribedTable.getFullName() + ".");
}
// Clear the TIMESTAMP_UPDATED column from replication if it exists
DBColumn timestampUpdatedColumn = subscribedTable
.getColumn("TIMESTAMP_UPDATED");
if (timestampUpdatedColumn != null) {
subscribedTable.setColumnSelected(timestampUpdatedColumn.getName(),
false, false);
System.out.println("Column " + timestampUpdatedColumn.getName()
+ " deselected from replication for table "
+ subscribedTable.getFullName() + ".");
}
}
The setRowSelected() method sets the row filter. The sample code for this action is shown in Example 9-42.
Example 9-42 Implementing row and column filtering
...
// Select tables to subscription
SubscribedTable[] subscribedTables = new SubscribedTable[2];
subscribedTables[0] = selectTable(oracleDataStore, subscription,
publishableTables[0]);
subscribedTables[1] = selectTable(oracleDataStore, subscription,
publishableTables[1]);
// Set the row and column filtering
setFilter(subscribedTables[0]);
setFilter(subscribedTables[1]);
// Describe the subscription
describeSubscription(subscription, db2DataStore);
...
Similar to the Management Console, you can specify whether the row selection condition causes rows to be selected for or omitted from replication (they are each other's counterparts). For the setColumnSelected() method, you must specify the selected and the critical attributes. When setting the selected attribute to false, the column is cleared from replication. When you select a column as critical, InfoSphere CDC only replicates update operations when any critical column has changed value. A critical column cannot be cleared from replication, so there are three possible combinations of selected / critical.
This example includes the setFilter() method shown in Example 9-41 on page 309 in the table mapping flow.
After recreating the subscription and mapping the tables, Figure 9-17 shows that row filtering was applied to the CDCDEMO.PRODUCT table because it has a STATUS column.
Figure 9-17 Row filtering
For the CDCDEMO.DEMO_CUSTOMER table, column filtering has been applied by running the setFilter() method (Figure 9-18).
Figure 9-18 InfoSphere CDC - API column filter
Figure 9-18 shows that the TIMESTAMP_UPDATED column has been cleared from replication. At the same time, the automated column mapping was affected because the TIMESTAMP_UPDATED column was no longer sent to the target during the description process. Therefore, the assign() method could not map the source column to the target column of the same name (Figure 9-19).
Figure 9-19 Column filter
9.5.12 Derived columns
InfoSphere CDC allows you to move the processing of a derived expression from the target to the source. This action might be useful in situations where the information needed to calculate a column cannot be found on the target, for example, a %GETCOL retrieval of a column of a table that is only on the source). This action also might be useful if you need to call a user function (%USERFUNC) that is not available on the target engine (for example, when replicating from Oracle to DB2 on z/OS and you need to start a Java method to perform a calculation).
In Example 9-43, the addDerivedWeekNo() method adds the CURRENT_WEEKNO derived column that calls a Java user function to a subscribed table (selected table). You can find the source for the UEWeekOfYear user exit program in “Java user exit programs” on page 371.
Example 9-43 Add a derived column
/**
* Adds a derived column to the mapped table which will calculate the week
* number of the current date
*
* @param subscribedTable
* - The table that is selected to the subscription
* @throws ApiException
*/
public void addDerivedWeekNo(SubscribedTable subscribedTable)
throws ApiException {
DefaultContext derivedColumnContext = new DefaultContext();
derivedColumnContext.setString(DerivedColumn.Name, "CURRENT_WEEKNO");
derivedColumnContext.setString(DerivedColumn.BasedOnColumn, "");
derivedColumnContext.setInt(DerivedColumn.ColumnLength, 2);
derivedColumnContext.setInt(DerivedColumn.ColumnPrecision, 2);
derivedColumnContext.setInt(DerivedColumn.ColumnScale, 0);
derivedColumnContext.setString(DerivedColumn.DataTypeName, "NUMBER");
derivedColumnContext.setString(DerivedColumn.Description,
"Week number of current date");
derivedColumnContext.setString(DerivedColumn.EvaluationFrequency,
DerivedColumn.EF_AFTER);
// *AFT
// Evalutation
// frequency
derivedColumnContext.setByte(DerivedColumn.Nullable, (byte) 0);
// False
derivedColumnContext.setString(DerivedColumn.Expression,
"%USERFUNC("JAVA","UEWeekOfYear")");
subscribedTable.addDerivedColumn(derivedColumnContext);
System.out.println("Derived column "
+ derivedColumnContext.getString(DerivedColumn.Name)
+ " added to table " + subscribedTable.getFullName() + ".");
}
As with other classes in the InfoSphere CDC API, DerivedColumn works with a context object to specify the attributes of the derived column. All attributes are mandatory; leaving out any one of them causes the addDerivedColumn to fail.
In Figure 9-20, the CURRENT_WEEKNO column has been added to the list of columns at the source, as a derived column. This example does not map the derived column to a target column; the column mapping for derived columns is the same as regular database columns.
Figure 9-20 Derived column
9.5.13 Encoding conversions (before and after Version 6.5)
InfoSphere CDC replicates character data between a wide variety of encodings and automatically converts the data from the column encoding detected on the source to the column encoding detected on the target. For example, you can replicate multibyte character data such as Japanese, Chinese, or Korean, or single-byte character data, such as French, Turkish, Arabic, or Russian.
By default, InfoSphere CDC assumes that the data stored in a character capable column is using the encoding associated with that column type. For example, if your database is set to use Shift-JIS (Japanese), then data stored in the CHAR and VARCHAR columns is assumed to be in Shift-JIS by default. However, InfoSphere CDC is only concerned with the encoding of the data, not the encoding of the column storage type. This flexibility allows the product to deal with situations where the actual contents of the column do not match the encoding specified for the column in the database, commonly seen in many applications. Overriding the detected column encoding allows you to specify the actual encoding of the data as known by you.
In all cases, if the database column encoding defined for source and target matches the contents of the columns, InfoSphere CDC picks up this configuration automatically and there is no need to override it in the InfoSphere CDC configuration.
All InfoSphere CDC engines before InfoSphere CDC V6.5 perform encoding conversions on the source side. Starting from Version 6.5, you can transfer the work of encoding conversions to the target side to reduce processor resource utilization on the source server.
The difference in execution of the encoding conversion is reflected in the metadata and the Java API. Different methods are available to set encoding conversion depending on the version of the InfoSphere CDC engine.
If either the source or target InfoSphere CDC engine is version 6.3 or earlier, encoding settings are configured on the source, for the SubscribedTables columns. Both source and target encoding is configured in the source metadata using the setColumnUnicodeHandling() and setEncoding() methods, which are part of the SubscribedTable class, and conversion takes place at the source. If both source and target engines are at Version 6.5 or later, the source encoding is configured at the source and the target encoding is configured at the target. Source and target encodings must then be set using the setMBCSColumnEncoding() method, for the SubscribedTable class (source) and the ColumnAssignment class (target).
Define a method that retrieves the capability of the subscription to do multi-byte character set (MBCS) automapping and use this capability to determine the encoding conversion type that must be configured. Whether the subscription can automap is defined when the subscription is created; at that time, the source and target data store properties are analyzed to set the SubscriptionMBCSState property. The sample code for this action is shown in Example 9-44.
Example 9-44 Determine if a subscription has MBCS automapping enabled
/**
* Determines if the source and target datastores both support MBCS
* automatic mapping
*
* @param subscription
*    - The subscription for which MBCS automatic mapping must be
* determined
* @return
*/
private boolean isSubscriptionMbcsAutoMappingEnabled(
Subscription subscription) {
boolean enabled = false;
try {
Context properties = subscription.getProperty();
byte state = properties.getByte(Subscription.SubscriptionMBCSState);
// 0 - unknown, 1 - 6.3 level (disabled), 2 - 6.5 level (enabled)
if (state == 2)
enabled = true;
} catch (Exception e) {
// ignore the error, consider it disabled anyway
}
return enabled;
}
When using InfoSphere CDC V6.5 for both the source and target engines, MBCS automapping is supported and you can configure encoding overrides on source and target columns:
MBCS Encoding type: You can specify whether encoding conversion should be done. For some data types, such as numbers or dates, encoding conversion is not applicable, so be aware of this limitation when specifying the encoding types. When encoding is possible, the default encoding that was picked up from the database during the table mapping can be overridden by selecting either COLUMN_MBCS_ENCODING_USE_AS_IS (no conversion, pass as binary) or COLUMN_MBCS_ENCODING_USE_SPECIFIED. In the latter case, the data encoding must be specified for the column in question.
Data encoding: The Internet Assigned Numbers Authority (IANA) encoding name is the identification used by the InfoSphere CDC V6.5 or later engines. For example, the IANA name for Western European character sets is windows-1252. If you specify an IANA encoding name that matches the value that was picked up by InfoSphere CDC from the source database, the data encoding is not stored as an overridden encoding conversion.
The setSourceColumnEncoding65() method sets the encoding of a source column to an IANA name, as shown in Example 9-45.
Example 9-45 Set column encoding for InfoSphere CDC V6.5
/**
* Set the encoding for a source column in a SubscribedTable object, CDC
* 6.5+
*
* @param subscription
*    - Subscription
* @param subscribedTable
*    - The table that is selected to the subscription
* @param columnName
*    - The column of the selected table for which the source * encoding must be            set
* @param encodingName
* - IANA encoding name of the encoding
* @throws ApiException
*/
private void setSourceColumnEncoding65(Subscription subscription,
SubscribedTable subscribedTable, String columnName,
String encodingName) throws ApiException {
subscribedTable.setMBCSColumnEncoding(columnName,
(byte) MBCSColumnEncoding.COLUMN_MBCS_ENCODING_USE_SPECIFIED,
encodingName);
System.out.println("Encoding set to " + encodingName + " for column "
+ columnName + " in table " + subscribedTable.getFullName()
+ ".");
}
The attributes of InfoSphere CDC V6.3 and earlier encoding conversions are:
Unicode handling: Whether Unicode handling is done for the column in question. InfoSphere CDC tries to convert the source columns as though they contain Unicode data (UCS-2, UTF-8, UTF-16, or UTF-32).
Encoding method: Specifies whether encoding conversion must be done, and if so, whether the database default, overridden conversion, or Unicode conversion must be done.
CCSID for column: The Coded Character Set ID for the column. When specified for the source column, InfoSphere CDC expects the data in the source column to be encoded with this CCSID. When specified for the target column, InfoSphere CDC converts the data to this CCSID.
Encoding name: The ISO or IANA encoding name. For example, the IANA name for Western European character sets is windows-1252. This encoding name is used by the InfoSphere CDC V6.5 and later engines.
Encoding length: Specifies whether the characters must be processed (source) or generated (target) as single bytes (SBCS), double bytes (DBCS), or multiple bytes (MBCS).
The CCSID, encoding name, and encoding length depend on each other. You must specify all three values to ensure that the InfoSphere CDC source engine runs the correct encoding based on the target engine. For a map of IANA/CCSID and encoding length, go to the Management Console preferences (click Edit) and click the Encoding button. Figure 9-21 shows the IANA names together with the CCSID and encoding length for your reference.
Figure 9-21 IANA information
The setSourceColumnEncoding63() method, in Example 9-46, shows how to set the encoding for a column if either the source or target InfoSphere CDC engine has a version earlier than Version 6.5. This situation means that the encoding must be specified entirely at the source, that is, both the source and target column encodings.
Example 9-46 Set column encoding for InfoSphere CDC V6.3
/**
* Set the encoding for a source column in a SubscribedTable object, CDC 6.3
* and earlier releases
*
* @param subscription
*       - Subscription
* @param subscribedTable
*       - The table that is selected to the subscription
* @param columnName
*       - The column of the selected table for which the source encoding must be set
* @param ccsid
*       - Coded Character Set ID of the encoding
* @param encodingName
*       - IANA encoding name of the encoding
* @param encodingLength
*       - Single, double or multiple byte
* @throws ApiException
*/
private void setSourceColumnEncoding63(Subscription subscription,
SubscribedTable subscribedTable, String columnName, String ccsid,
String encodingName, int encodingLength) throws ApiException {
int sourceEncodingMethod = subscribedTable
.getSourceEncodingMethod(columnName);
String sourceEncodingCcsid = subscribedTable
.getSourceEncodingCcsid(columnName);
String sourceEncodingName = subscribedTable
.getSourceEncodingName(columnName);
int sourceEncodingLength = subscribedTable
.getSourceEncodingLength(columnName);
int targetEncodingMethod = subscribedTable
.getTargetEncodingMethod(columnName);
String targetEncodingCcsid = subscribedTable
.getTargetEncodingCcsid(columnName);
String targetEncodingName = subscribedTable
.getTargetEncodingName(columnName);
int targetEncodingLength = subscribedTable
.getTargetEncodingLength(columnName);
subscribedTable.setEncoding(columnName,
SubscribedTable.COLUMN_ENCODING_USE_SPECIFIED, ccsid,
encodingName, encodingLength, targetEncodingMethod,
targetEncodingCcsid, targetEncodingName, targetEncodingLength);
System.out.println("Encoding set to CCSID " + ccsid + " and IANA name "
+ encodingName + " for column " + columnName + " in table "
+ subscribedTable.getFullName() + ".");
}
The method shown in Example 9-47 retrieves the column encoding that has been set during the mapping process and then overrides it with the CCSID, IANA encoding name, and encoding length that were passed as a parameter. This method is, as an example, how to set the source column encoding to GBK (a Chinese national standard extension) for both InfoSphere CDC V6.5 and InfoSphere CDC V6.3.
Example 9-47 Implementation of setting the source column encoding
// Set encoding for CUSTOMER_NAME column
if (isSubscriptionMbcsAutoMappingEnabled(subscription))
setSourceColumnEncoding65(subscription, subscribedTables[0],
"CUSTOMER_NAME", "GBK");
else
setSourceColumnEncoding63(subscription, subscribedTables[0],
"CUSTOMER_NAME", "1384", "GBK",
SubscribedTable.COLUMN_ENCODING_DOUBLE_BYTE);
Running the example code results in the configuration shown in Figure 9-22. The source encoding for CUSTOMER_NAME has been overridden to GBK while the target encoding has been left unchanged.
Figure 9-22 Management Console table mapping encoding
9.5.14 Operations and user exits
InfoSphere CDC allows you to suppress the standard operations and include custom code in the processing of table-level and row-level operations. For example, you might not want any table clear operations or row delete operations to be applied onto the target side. Typically, this situation is the case when you target a data warehouse or operational data store. Another example is that you might want to process user-defined actions instead of a standard operation, such as applying a soft delete instead of the row delete that InfoSphere CDC would normally run with a user exit.
In addition to the row-level user exits, the InfoSphere CDC Linux, UNIX, and Windows engine now supports subscription-level user exits, which let you define a set of actions that InfoSphere CDC can run before or after a commit event occurs on a specified subscription. Subscription-level user exits can work alone or in tandem with row-level user exits, for example, to complete and send an XML message that has been built using the row-level user exits.
The Management Console has split the standard operations and row-level user exits across two tabs. In the API, these items are controlled through the same class, UserExit.
As an example, you can configure a table mapping for soft delete. Basically, this situation means that you map a table with an adaptive apply mapping type, and then disable the delete operation and specify a pre-delete user exit that calls the UESoftDelete Java class. There is a SoftDelete.class file in the target InfoSphere CDC engine's lib directory, so it is available (this setting is validated by InfoSphere CDC when trying to configure), as shown in Example 9-48.
Example 9-48 Table mapping for soft delete
/**
* Assigns the source table to a target table, configuring it for soft
* delete.
*
* @param subscription
* - Subscription for which the table must be assigned
* @param targetDataStore
* - Target datastore of the subscription
* @param subscribedTable
* - Subscribed (selected) table
* @param targetTablePath
* - Schema (path) of the target table
* @param targetTableName
* - Name of the target table
* @param mappingType
* - The mapping type that must be used
* @throws ApiException
*/
public void assignSoftDelete(Subscription subscription,
Subscriber targetDataStore, SubscribedTable subscribedTable,
String targetTablePath, String targetTableName) throws ApiException {
// Get the publication for the subscription
Publication publication = getSubscriptionPublication(subscription,
targetDataStore);
// Get the PublishedTable associated with the SubscribedTable object
DBPath publishedTableDBPath = targetDataStore
.createDBPath(subscribedTable.getUpperPath().getFullName()
.toUpperCase());
String publishedTableName = subscribedTable.getName().toUpperCase();
PublishedTable publishedTable = publication.getPublishedTable(
publishedTableDBPath, publishedTableName);
// Map the table to the target table using specified mapping type
String targetDatabase = "";
String destinedMember = null;
String indexLibrary = null;
String indexName = null;
publishedTable.assign(targetTableName, targetTablePath, targetDatabase,
destinedMember, indexLibrary, indexName,
PublishedTable.ADAPTIVE_APPLY);
System.out.println("Table " + subscribedTable.getFullName()
+ " assigned to " + targetTablePath + "." + targetTableName
+ " using soft delete mapping type.");
TableAssignment tableAssignment = publishedTable.getTableAssignment();
// Loop through all target columns and map values depending on name
String[] destinedColumnNames = tableAssignment.getDestinedColumnNames();
for (String targetColumnName : destinedColumnNames) {
ColumnAssignment columnAssignment = tableAssignment
.getColumnAssignment(targetColumnName);
if (targetColumnName.equals("AUD_TIMESTAMP")) {
columnAssignment.mapTo(ColumnAssignment.MAP_JOURNAL_CONTROL,
"&TIMSTAMP");
System.out
.println("Journal control column &TIMSTAMP assigned to column "
+ targetColumnName + ".");
} else if ((targetColumnName.equals("APPLY_TIMESTAMP"))) {
columnAssignment.mapTo(ColumnAssignment.MAP_CURRENT_DATE);
System.out
.println("Default value CURRENT DATE assigned to column "
+ targetColumnName + ".");
} else if ((targetColumnName.equals("LAST_OPERATION"))) {
columnAssignment.mapTo(ColumnAssignment.MAP_JOURNAL_CONTROL,
"&ENTTYP");
System.out
.println("Journal control column &ENTTYP assigned to column "
+ targetColumnName + ".");
}
}
// Set table level operations and user exits
UserExit tableUserExit = tableAssignment
.createNewUserExit(UserExit.JAVA_CLASS);
tableUserExit.setStdOperation(UserExit.STANDARD_INSERT,
UserExit.INSERT_UPDATE);
tableUserExit.setStdOperation(UserExit.STANDARD_UPDATE,
UserExit.INSERT_UPDATE);
tableUserExit.setStdOperation(UserExit.STANDARD_DELETE,
UserExit.DISABLE);
System.out
.println("Standard delete operation has been disabled for operations on table "
+ targetTablePath + "." + targetTableName);
tableUserExit.setFunctionType(UserExit.JAVA_CLASS);
tableUserExit.setJavaClass("UESoftDelete");
tableUserExit.addOperation(UserExit.BEFORE_DELETE, "Y");
System.out
.println("Before delete operation has been set to UESoftDelete Java class for table "
+ targetTablePath + "." + targetTableName);
// Set subscription-level user exit
tableAssignment.setUserExit(tableUserExit);
System.out.println("Subscription-level user exit for subscription "
+ subscription.getName() + " has been set to UESoftDelete");
}
The initial part of the table assignment is the same as before, with just a small difference in the mapping type (PublishedTable.ADAPTIVE_APPLY instead of PublishedTable.STANDARD). Iterate through the columns to map the &ENTTYP (operation type) to the LAST_OPERATION column. If the row has been soft-deleted, this column contains "DL".
During the second part of the method, a new UserExit object is created and the standard operations are assigned. The standard delete (STANDARD_DELETE) operation is disabled. Then the user exit type is set to use Java class UESoftDelete and it is marked to be run before the delete operation occurs. InfoSphere CDC still runs the before and after user exits, even if the standard operation is disabled.
The UESoftDelete class is described in “Java user exit for row-level user exits” on page 383.
For subscription-level user exits, there is not much you must configure. After you have created the subscription, and the Publication object exists in the target metadata, you can get the subscription-level user exit configuration by running the getSubscriptionUserExits() method from the Publication object. The function type that is currently supported is UserExit.JAVA_CLASS, and you can set the Java class and parameters by using the methods in the
SubscriptionUserExits class.
The subscription-level user exit must implement the SubscriptionUserExitIF interface. The UESoftDelete class implements both UserExitIF and SubscriptionUserExitIF, so one class can be used both at the table-level and subscription-level.
9.5.15 Common procedures (updating table definitions)
After you change the structure of a mapped source or target table in your database, you must update the definition of the table in the InfoSphere CDC metadata so that subscriptions are aware of the new structure and can adjust the log reader or apply process.
If you change the definition of a source table (add a column, or change column length or data type) in your database, then you must update the definition of the table in the source data store catalog. InfoSphere CDC requires you to update the source table so that the new structure is available for configuration when editing your table mapping details. For example, if you have added a column on the source table, then you might want to map this new column to a target column. In the Management Console, you can update the source table definition by choosing the table and updating the source definition. When you need to update the source table definition through the API, the Catalog class provides the reAddTable() method. Both actions retrieve the table structure from the database and update the PublishableTable object in the source data
store's catalog.
When the definition of a target table is altered (add a column, or set column constraints or different primary key constraint) in your database, you need to update the definition of the target table’s metadata. This action ensures that the column mappings can be updated according to the new target table structure. For example, if you have added a column on the target table, then you might want to map a source column to the new target column or populate it with a constant value. The Management Console provides a table menu option to update the target table definition. If you need to update the target table definition through the API, run the reassign() method from the PublishedTable object that links the source table to the destination (target) table.
Typically, you perform the updates of the InfoSphere CDC source and target tables only if the structure has changed.
Example 9-49 shows whether the CDCDEMO.SALESREP catalog table has changed; if it has, update the table definition in the catalog. Before running the snippet, check the structure of the table registered in the source data store catalog. The table structure before applying the DDL changes is shown in Figure 9-23 on page 329.
Example 9-49 Checking if the table structure has changed
/**
* Check if a table in the source datastore catalog has changed. Returns
* true if there is a difference between: - Number of columns in database
* table and catalog table - Data type of one of the columns in the database
* table and catalog table - Length of one of the columns in the database
* table and catalog table - Precision of one of the columns in the database
* table and catalog table - Scale of one of the columns in the database
* table and catalog table
*
* @param sourceDataStore
* - The source datastore that has the catalog in which the table
* must be checked
* @param tablePath
* - Schema of the table to be checked
* @param tableName
* - Name of the table to be checked
* @return – true if the table has changed, false if there is no difference
* between database table and catalog table
* @throws ApiException
*/
public boolean isCatalogTableChanged(Publisher sourceDataStore,
String tablePath, String tableName) throws ApiException {
boolean isChanged = false;
DBTable databaseTable = sourceDataStore.getTable(
sourceDataStore.getDBPath(tablePath), tableName);
Catalog sourceCatalog = sourceDataStore.getCatalog();
PublishableTable publishableTable = sourceCatalog.getPublishableTable(
tablePath, tableName);
int numberDatabaseColumns = databaseTable.getColumnNames().length;
int numberCatalogColumns = publishableTable.getColumnNames().length;
if (numberDatabaseColumns != numberCatalogColumns) {
System.out
.println("Number of columns in database does not correspond to number of columns in catalog for table "
+ tablePath + "." + tableName + ".");
isChanged = true;
} else {
for (String databaseColumnName : databaseTable.getColumnNames()) {
DBColumn databaseColumn = databaseTable
.getColumn(databaseColumnName);
try {
DBColumn catalogColumn = publishableTable
.getColumn(databaseColumnName);
if (!catalogColumn.getDataType().equalsIgnoreCase(
databaseColumn.getDataType())) {
System.out.println("Database column "
+ databaseColumnName + " of table " + tablePath
+ "." + tableName
+ " is of a different data type ("
+ databaseColumn.getDataType()
+ ") than catalog column ("
+ catalogColumn.getDataType() + ").");
isChanged = true;
}
if (catalogColumn.getLength() != databaseColumn.getLength()) {
System.out.println("Database column "
+ databaseColumnName + " of table " + tablePath
+ "." + tableName + " has a different length ("
+ databaseColumn.getLength()
+ ") than catalog column ("
+ catalogColumn.getLength() + ").");
isChanged = true;
}
if (catalogColumn.getPrecision() != databaseColumn
.getPrecision()) {
System.out.println("Database column "
+ databaseColumnName + " of table " + tablePath
+ "." + tableName
+ " has a different precision ("
+ databaseColumn.getPrecision()
+ ") than catalog column ("
+ catalogColumn.getPrecision() + ").");
isChanged = true;
}
if (catalogColumn.getScale() != databaseColumn.getScale()) {
System.out.println("Database column "
+ databaseColumnName + " of table " + tablePath
+ "." + tableName + " has a different scale ("
+ databaseColumn.getScale()
+ ") than catalog column ("
+ catalogColumn.getScale() + ").");
isChanged = true;
}
} catch (ApiException e) {
System.out.println("Database column " + databaseColumnName
+ " not found in catalog table " + tablePath + "."
+ tableName + ".");
isChanged = true;
}
}
}
return isChanged;
}
Figure 9-23 Table structure before applying DDL
The table is replicated by UPDTABLE and its status is active (Figure 9-24).
Figure 9-24 Replicated table
Now the table is altered. A new column (NEWCOL) is added to the table and the length of an existing column (NAME1ST) is changed from 30 to 50.
The isCatalogTableChanged() method, shown in Example 9-49 on page 326, retrieves the table structure from the database using the getTable() method that is part of the Publisher class. It then checks whether the number of columns in the database table corresponds with the number of columns in the catalog table and iterates through the database table columns to compare the attributes of all the database columns with the ones registered in the PublishableTable object. The primary reason for comparing the number of columns is to detect when a column has been dropped from the database table.
The exact code as presented might not work for all InfoSphere CDC engine types. InfoSphere CDC for DB2 on System i, for example, always reports precision 0 for database table columns that are retrieved with the getTable() method, while InfoSphere CDC stores the correct precision in its configuration.
If a difference is found between database and catalog table, the table is updated in the catalog using the reAddTable() method (Example 9-50) that is part of the Catalog class.
Example 9-50 Method to readd a catalog table
/**
* Update the source table definition in the catalog
*
* @param sourceDataStore
* - The source datastore that has the catalog in which the table
* must be updated
* @param tablePath
* - Schema of the table to be updated
* @param tableName
* - Name of the table to be updated
* @throws ApiException
*/
public void reAddCatalogTable(Publisher sourceDataStore, String tablePath,
String tableName) throws ApiException {
Catalog sourceCatalog = sourceDataStore.getCatalog();
PublishableTable publishableTable = sourceCatalog.getPublishableTable(
tablePath, tableName);
sourceCatalog.reAddTable(publishableTable);
System.out.println("Updated catalog table definition " + tablePath
+ "." + tableName + ".");
}
When these two methods are brought together, the code in Example 9-51 first checks whether the structure of the CDCDEMO.SALESREP has changed; if so, it updates the table definition in the catalog.
Example 9-51 Check to see whether to update the table definition in the catalog
if (isCatalogTableChanged(oracleDataStore, "CDCDEMO", "SALESREP")) {
reAddCatalogTable(oracleDataStore, "CDCDEMO", "SALESREP");
The output of the code when running it after the table's structure has changed is shown in Example 9-52.
Example 9-52 Output from Example 9-51 on page 331
Number of columns in database does not correspond to number of columns in catalog for table CDCDEMO.SALESREP.
Updating source table definition CDCDEMO.SALESREP affects subscription UPDTABLE.
Updated catalog table definition CDCDEMO.SALESREP.
Look at the Management Console (Figure 9-25) and see that the table's structure has been updated.
Figure 9-25 Table structure is updated
The status of the replicated table has been set to parked (Figure 9-26).
Figure 9-26 Replicated table status set to parked
The work to make the subscription runnable is not yet complete. The source table structure has changed and the target side is not aware yet that it will receive data different from before. The PublishedTable must be updated with the new source table structure by describing the subscription.
To discover which subscriptions are affected by the update of the source table definition, that is, getting a list of subscriptions that replicate the table, you must analyze the subscriptions individually (Example 9-53). There is no single method that retrieves the subscriptions that depend on a table.
Example 9-53 Getting the list of subscriptions that replicate a certain source table
/**
* Get the list of subscriptions which replicate a certain source table.
*
* @param sourceDataStore
* - The source datastore that has the table it its catalog
* @param tablePath
* - Schema of the table
* @param tableName
* - Name of the table
* @throws ApiException
*/
public ArrayList<Subscription> getSourceTableSubscriptions(
Publisher sourceDataStore, String tablePath, String tableName)
throws ApiException {
Catalog sourceCatalog = sourceDataStore.getCatalog();
PublishableTable publishableTable = sourceCatalog.getPublishableTable(
tablePath, tableName);
ArrayList<Subscription> sourceTableSubscriptions = new ArrayList<Subscription>();
for (String subscriptionName : sourceDataStore.getSubscriptionNames()) {
Subscription subscription = sourceDataStore
.getSubscription(subscriptionName);
try {
SubscribedTable subscribedTable = subscription
.getSubscribedTable(publishableTable.getUpperPath(),
publishableTable.getName());
if (subscribedTable != null) {
System.out.println("Subscription " + subscription.getName()
+ " replicates table "
+ subscribedTable.getFullName() + ".");
sourceTableSubscriptions.add(subscription);
}
} catch (ApiException e) {
// Do nothing, subscription does not map the table
}
}
return sourceTableSubscriptions;
}
If you want to make the process complete, that is, making the subscription runnable again and replicating the changes of the table that has been updated in the catalog, the snippet would have to be extended (Example 9-54).
Example 9-54 Making the subscription runnable and replicating table changes
if (isCatalogTableChanged(oracleDataStore, "CDCDEMO", "SALESREP")) {
// Get the subscriptions which replicate this table
ArrayList<Subscription> sourceTableSubscriptions = getSourceTableSubscriptions(
oracleDataStore, "CDCDEMO", "CDCDEMO");
// Update the source table definition in the catalog
reAddCatalogTable(oracleDataStore, "CDCDEMO", "SALESREP");
// Change replication status in all subscriptions to Active and
// describe
for (Subscription sourceTableSubscription : sourceTableSubscriptions) {
// Change table replication status to Active (only if Mirror
// method)
SubscribedTable changedSubscribedTable = sourceTableSubscription
.getSubscribedTable(
oracleDataStore.getDBPath("CDCDEMO"),
"SALESREP");
if (changedSubscribedTable.getReplicationMethod() == SubscribedTable.METHOD_MIRROR) {
System.out.println("Replication status of table "
+ changedSubscribedTable.getFullName()
+ " is changed to Active.");
changedSubscribedTable
.setReplicationStatus(SubscribedTable.STATUS_ACTIVE);
}
// Now describe the subscription
describeSubscription(sourceTableSubscription, db2DataStore);
}
 
}
When having to update the table definitions for target tables, the procedure is different. The procedure for retrieval of the database table structure is equivalent to the one done for the source, using the getTable() method. However, you must now trace back to the published table starting from the destined table. Again, there is no simple method in the InfoSphere CDC Java API that allows direct access to the source table starting from the destination table (Example 9-55).
Example 9-55 Updating the target table definition for a subscription
/**
* Update the target table definition for a subscription
*
* @param subscription
* - The subscription that targets the table to be reassigned
* @param targetDataStore
* - The datastore that is targeted by the subscription
* @param targetTablePath
* - Schema of the target table to be updated
* @param targetTableName
* - Name of the target table to be updated
* @throws ApiException
*/
public void reAssignTable(Subscription subscription,
Subscriber targetDataStore, String targetTablePath,
String targetTableName) throws ApiException {
// Get publication for subscription
Publication publication = getSubscriptionPublication(subscription,
targetDataStore);
// Process all database paths (schemas for publication)
for (DBPath publicationDBPath : publication.getDBPaths()) {
// Process all tables for database path
for (PublishedTable publishedTable : publication
.getPublishedTables(publicationDBPath)) {
TableAssignment tableAssignment = publishedTable
.getTableAssignment();
DBTable destinedTable = tableAssignment.getDestinedTable();
DBPath destinedTableUniqueIndex = tableAssignment
.getDestinedTableUniqueIndex();
// If destined table is table to reassign, do so
if (destinedTable.getUpperPath().getFullName()
.equalsIgnoreCase(targetTablePath)
&& destinedTable.getName().equalsIgnoreCase(
targetTableName)) {
publishedTable.reassign(destinedTableUniqueIndex);
System.out.println("Updated target table definition "
+ targetTablePath + "." + targetTableName + ".");
}
}
}
}
Check that the subscription is not active when reassigning tables, or the underlying metadata changes while the running subscription might not be aware of this change. Subscription statuses can only be obtained from the source data store. You should trace even further back until you have found the subscription that is connected to the Publication that holds the PublishedTable that is assigned to the target table for which the table structure has changed.
In most cases, you are aware of the table changes that are being deployed and which subscriptions they affect. Therefore, you could include the reassignment of the target table in the deployment of the subscription. When using the reAssignTable() method, assume that you are deploying a target table change for a known subscription and that the subscription is inactive.
Add a column to the CDCDEMO.SALESREP_TARGET table and run the code snippet shown in Example 9-56 to update the definition of the target table.
Example 9-56 Snippet to update the target table definition
// Get the subscription object
Subscription subscription=oracleDataStore.getSubscription("UPDTABLE");
reAssignTable(subscription, db2DataStore, "CDCDEMO", "SALESREP_TARGET");
When refreshing the view in the Management Console, the new column appears with a default mapping (Figure 9-27). If there is a source column with the same name and equivalent data type, the source column is automatically mapped to the new target column.
Figure 9-27 New column with default mapping
9.5.16 Deploying subscription changes and considerations
Using the API, you can deploy subscription changes to production environments without manual intervention through the Management Console. You want to make the deployment as smooth as possible and avoid having to do a refresh of the replicated tables or run the risk of losing changes.
This section assumes that you do not want to perform a refresh of the replicated tables when making subscription changes. This situation is the case for most clients, and in environments where very large tables are replicated, refreshing those tables may not be an option.
Before we continue describing how to read how table structure changes pertain to the InfoSphere CDC API, you need to understand the bookmarks of subscriptions and replicated tables. More information about this subject can be found in 7.6.1, “Understanding InfoSphere CDC bookmarks” on page 191.
As long as you do not delete a subscription (or the Publication in the target metadata), the subscription bookmark is kept in the target bookmark table. This situation means that restarting the subscription after making changes to the production subscription causes the bookmark to be retrieved from the bookmark table and start reading the database logs from this bookmark forward. The risk of inadvertently losing the subscription bookmark is not high because you must deassign all target tables (PublishedTable), remove the selected tables (SubscribedTable), and perform a description to remove the Publication object. Only in that situation will InfoSphere CDC drop the bookmark on the target side.
A greater risk is inadvertently marking the table capture point after changing the mapped tables. If you set the status of a selected table (SubscribedTable) to Active, you are effectively marking the table capture point for that table. Any transaction for that table since the last applied bookmark is skipped when the subscription is restarted.
Starting with Version 6.5, the InfoSphere CDC engines offer a CLI command option for dmsetbookmark to set the bookmark and mark the table capture points of all tables according to the bookmark. This option is useful in situations where subscriptions must be recreated and you want to restart from the last known position without skipping any transactions for any of the tables.
Unfortunately, the Java API does not have an interface for getting or setting the bookmarks for the engine, and you must start engine commands from the custom Java class to perform this function. Starting engine commands from the deployment Java programs might pose a challenge because the API is run against the Access Server, which might be installed on a different server than the source and target engines. You then must run remote CLI commands for the InfoSphere CDC servers.
If you want to keep the deployment processing within the scope of the Java API and not lose any subscription or table bookmarks, the selected tables (SubscribedTable) must be kept in the subscription and the replication method and status for these tables must not be changed. This situation means that if you need to change the mapping attributes of the source tables, such as row filtering, column filtering, or derived columns, you must modify the existing SubscribedTable object instead of recreating it. For example, if the table mapping change encompasses the removal of a derived column, but other derived columns must be kept for the selected table, you could remove all the derived columns from the SubscribedTable objects. Then, add them all (except the removed one) again based on the definition of the subscription to
be deployed.
For table assignment processing, you do not have to take the same precautions. It is safe to remove table assignments (deassign) and assign them again with new attributes. Deassigning tables does not touch the subscription bookmark or the marked table capture point for the tables. For example, if you want to change the mapping of tables that have been mapped with a standard mapping type to adaptive apply, you can start by deassigning the mapped tables and then assigning them again using the adaptive apply mapping type.
There are situations in which the table capture point cannot be kept. One of the common situations is when the format of the source table has changed and the table must be readded to source data store catalog. When readding a table to the catalog to replace the table's recorded structure, InfoSphere CDC automatically changes the table's replication status in all subscriptions to idle (parked). To resume replication for the table in question, you must change the replication status to active. This action is the same action as marking the capture point for the table (the operations between the last stopping of the subscription and the marked table capture point are skipped).
9.5.17 Starting, stopping, and monitoring subscriptions
Subscriptions are always started and stopped from the source side. Also, if you want to monitor for subscription activity, this action can only be done on the source server. Some companies have strict policies regarding access to source (production) servers and may not allow command-line access to these servers. In those scenarios, the API might be a good alternative for starting, stopping, and monitoring subscriptions.
If you do have access to the source server, you can start and stop subscriptions using the engine command interface. The Linux, UNIX, and Windows engine also allows you to check the activity of subscriptions using InfoSphere CDC commands. If you want to provide more sophisticated functionality, the API provides more control over specific features within the InfoSphere CDC engine.
Continuous mirroring replicates changes to the target on a continuous basis. Use this type of mirroring when business requirements dictate that you need replication to be running continuously and you do not have a clearly defined reason to end replication now.
Scheduled end (Net Change) mirroring replicates changes (to the target) up to a user-specified point in the source database log and then ends replication. Use this type of mirroring when business requirements dictate that you only replicate your data periodically and you have a clearly defined endpoint for the state of your target database when replication ends. Scheduled end mirroring allows you to end replication at the following points in your source database log:
Current time or “now”
User-specified date and time
User-specified log position
The startSubscription() method shown in Example 9-57 starts a subscription in Continuous mirroring or Net Change with a “Now” ending specification (replication stops when the current log position has been reached).
Example 9-57 Start subscriptions (Continuous mirroring or Net Change)
/**
* Start a subscription in continuous mirror or net change mode
*
* @param subscription
*       - Subscription you want to start
* @param continuousMirror
*       - Start the subscription in continuous mirror mode (true) or net change              (false)
* @return
* @throws ApiException
*/
public void startSubscription(Subscription subscription,
boolean continuousMirror) throws ApiException {
// Check if the subscription is already active before attempting to
// start it
byte[] liveActivityStatus = subscription.getLiveActivityStatus();
if (liveActivityStatus[1] == Subscription.LIVE_STATUS_IDLE) {
subscription.startMirror(continuousMirror);
if (continuousMirror)
System.out.println("Started subscription "
+ subscription.getName()
+ " in Continuous Mirror mode.");
else
System.out.println("Started subscription "
+ subscription.getName() + " in Net change mode.");
} else {
System.out.println("Subscription " + subscription.getName()
+ " is already active.");
}
}
In startSubscription(), use the Subscription.startMirror method to start the replication using the two basic techniques that are available for all versions of InfoSphere CDC. Starting with Version 6.5, InfoSphere CDC supports starting mirroring with a user-specified end date and time or log position when the replication should stop. If you want to use this functionality, use the startReplicationSpecifiable method, which is part of the Subscription class.
In Example 9-58, a subscription is started to replicate yesterday's changes. After the log position reaches the specified end time (23:59:59.999), it stops normally.
Example 9-58 Specifiable start of replication
/**
 * Start a subscription and replicate the changes that were made yesterday
 *    (relative to today).
 *
 * @param subscription
 * - Subscription you want to start
 * @return
 * @throws ApiException
 */
public void startReplicationYesterdaysChanges(Subscription subscription)
throws ApiException {
// Calculate the ending time (yesterday, 23:59:59.999)
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");
Calendar subscriptionEndTime = Calendar.getInstance();
subscriptionEndTime.add(Calendar.DATE, -1);
subscriptionEndTime.set(Calendar.HOUR_OF_DAY, 23);
subscriptionEndTime.set(Calendar.MINUTE, 59);
subscriptionEndTime.set(Calendar.SECOND, 59);
subscriptionEndTime.set(Calendar.MILLISECOND, 999);
// Check if the subscription is already active before attempting to start it
byte[] liveActivityStatus = subscription.getLiveActivityStatus();
if (liveActivityStatus[1] == Subscription.LIVE_STATUS_IDLE) {
byte flag = 5; // Scheduled end
byte tag = 1; // Ignored for mirror
byte endValueType = 2; // Source local time (log time)
String endValue = sdf.format(subscriptionEndTime.getTime());
subscription.startReplicationSpecifiable(flag, tag, endValueType,
endValue);
System.out.println("Started subscription " + subscription.getName()
+ " with an ending log time of " + endValue);
} else {
System.out.println("Subscription " + subscription.getName()
+ " is already active.");
}
}
To end a subscription, you can use the endSubscription() method. Similar to the starting of the replication, ending replication can now also be made “specifiable” by using the endReplicationSpecifiable() method (Example 9-59).
Example 9-59 Stopping replication
/**
* Stop a mirroring subscription controlled or immediately
*
* @param subscription
* - Subscription you want to stop
* @param continuousMirror
* - Start the subscription in continuous mirror mode (true) or
* net change (false)
* @return
* @throws ApiException
*/
public void stopSubscription(Subscription subscription,
boolean immediateStop) throws ApiException {
// Check if the subscription is already active before attempting to
// start it
byte[] liveActivityStatus = subscription.getLiveActivityStatus();
if (liveActivityStatus[1] == Subscription.LIVE_STATUS_ACTIVE) {
subscription.stopMirror(immediateStop);
if (immediateStop)
System.out.println("Stopping subscription "
+ subscription.getName() + " immediately.");
else
System.out
.println("Stopping subscription "
+ subscription.getName()
+ " in a controlled manner. "
+ "Subscription will stop when current log position "
+ "has been reached.");
} else {
System.out.println("Subscription " + subscription.getName()
+ " is not active.");
}
}
Obtaining the state of a subscription is a bit more elaborate, as the state depends on the activity (Refresh, Continuous Mirror, or Net Change) and status (Starting, Active, or Inactive). The current version of the Java API has relatively limited capabilities for retrieving the actual state of a subscription. The Management Console has a greater variety of states it can retrieve from the InfoSphere CDC engines, such as Refresh before mirror, and Ending. In the getSubscriptionStateAsString() method (Example 9-60), attempt to copy the Management Console states as accurately as possible, providing a return string that resembles the activity (what is the subscription doing) and status (is it active or not) of a subscription.
Example 9-60 Returns the activity of a subscription as a string
/**
* Returns the activity of a subscription as a string for display. Trying to
* copy the activity state from the Management Console as best as possible.
*
* @param subscription
* - The subscription for which the state must be returned
* @return State of the subscription
* @throws ApiException
* @throws UnsupportedFeatureException
*/
private String getSubscriptionStateAsString(Subscription subscription)
throws UnsupportedFeatureException, ApiException {
byte liveActivity = 0;
byte liveStatus = 0;
 
byte[] liveActivityStatus = subscription.getLiveActivityStatus();
liveActivity = liveActivityStatus[0];
liveStatus = liveActivityStatus[1];
 
String stateString = "Unknown";
if (liveStatus == Subscription.LIVE_STATUS_IDLE)
stateString = "Inactive";
else if (liveStatus == Subscription.LIVE_STATUS_START)
stateString = "Starting";
else if (liveStatus == Subscription.LIVE_STATUS_ACTIVE) {
if (liveActivity == Subscription.LIVE_ACTIVITY_MIRROR)
stateString = "Mirror Continuous";
else if (liveActivity == Subscription.LIVE_ACTIVITY_NET_CHANGE)
stateString = "Mirror Scheduled End";
if (liveActivity == Subscription.LIVE_ACTIVITY_DESCRIBE)
stateString = "Describe";
if (liveActivity == Subscription.LIVE_ACTIVITY_REFRESH)
stateString = "Refresh";
} else if (liveStatus == Subscription.LIVE_STATUS_DS_STARTING_JOB)
stateString = "Starting DataStage Job";
else if (liveStatus == Subscription.LIVE_STATUS_DS_WAITING_FOR_JOB_TO_START)
stateString = "Waiting for DataStage Job to be Started";
else if (liveStatus == Subscription.LIVE_STATUS_DS_CONNECTING_WITH_TARGET)
stateString = "Waiting for DataStage Job to Connect";
else if (liveStatus == Subscription.LIVE_STATUS_DS_JOB_ENDING)
stateString = "Ending DataStage Job";
return stateString;
}
The code shown in Example 9-61 demonstrates the asynchronous nature of starting and stopping subscriptions. For a duration of 15 seconds, the subscription is monitored, and every half second, the state of the subscription is sent to the standard output. After one second of running, the subscription is started in Continuous mirror mode, and after 10 seconds, the subscription is stopped immediately. An immediate stop of the subscription does not mean terminating the activity. The subscription still stops in a normal fashion, but does not process all the log entries until the log time of the attempted stop is reached (a controlled stop).
Example 9-61 Asynchronous starting and stopping a subscription
// Show the subscription state for 15 seconds, interval at 500 ms
// Start the subscription after 1 second, stop after 10 seconds
int maxTime = 15000;
int statusInterval = 500;
int subscriptionStart = 1000;
int subscriptionStop = 10000;
int timeConsumed = 0;
while (timeConsumed < maxTime) {
System.out.println("Current state of subscription "
+ subscription.getName() + ": "
+ getSubscriptionStateAsString(subscription));
Thread.sleep(statusInterval);
timeConsumed += statusInterval;
if (timeConsumed == subscriptionStart) {
// Start the subscription in continuous mirror mode
try {
startSubscription(subscription, true);
} catch (ApiException e) {
e.printStackTrace();
}
} else if (timeConsumed == subscriptionStop) {
// Stop the subscription immediate
try {
stopSubscription(subscription, true);
} catch (ApiException e) {
e.printStackTrace();
}
}
}
Running the code produces the output shown in Example 9-62.
Example 9-62 Output from Example 9-61 on page 344
Current state of subscription REDSCRIPT: Inactive
Current state of subscription REDSCRIPT: Inactive
Started subscription REDSCRIPT in Continuous Mirror mode.
Current state of subscription REDSCRIPT: Starting
Current state of subscription REDSCRIPT: Starting
Current state of subscription REDSCRIPT: Refresh
Current state of subscription REDSCRIPT: Refresh
Current state of subscription REDSCRIPT: Refresh
Current state of subscription REDSCRIPT: Refresh
Current state of subscription REDSCRIPT: Mirror Continuous
Current state of subscription REDSCRIPT: Mirror Continuous
Current state of subscription REDSCRIPT: Mirror Continuous
Current state of subscription REDSCRIPT: Mirror Continuous
Current state of subscription REDSCRIPT: Mirror Continuous
Current state of subscription REDSCRIPT: Mirror Continuous
Current state of subscription REDSCRIPT: Mirror Continuous
Current state of subscription REDSCRIPT: Mirror Continuous
Current state of subscription REDSCRIPT: Mirror Continuous
Current state of subscription REDSCRIPT: Mirror Continuous
Current state of subscription REDSCRIPT: Mirror Continuous
Current state of subscription REDSCRIPT: Mirror Continuous
Stopping subscription REDSCRIPT immediately.
Current state of subscription REDSCRIPT: Mirror Continuous
Current state of subscription REDSCRIPT: Mirror Continuous
Current state of subscription REDSCRIPT: Mirror Continuous
Current state of subscription REDSCRIPT: Mirror Continuous
Current state of subscription REDSCRIPT: Mirror Continuous
Current state of subscription REDSCRIPT: Inactive
Current state of subscription REDSCRIPT: Inactive
Current state of subscription REDSCRIPT: Inactive
Current state of subscription REDSCRIPT: Inactive
Current state of subscription REDSCRIPT: Inactive
In Example 9-62 on page 345, you can see that the subscription is inactive during the first second (two messages). After that period, the subscription starts its processing and then commences refreshing the tables that have been marked for refresh, which takes a total of approximately 2.5 seconds. Then the subscription goes into Mirror Continuous mode until the loop reaches the 10 second limit you set and stops the subscription immediately (that is, it takes about 2.5 seconds to make the subscription inactive).
The code could have been coded using threads, but for simplicity, we chose to use a loop.
9.5.18 Monitoring latency
Management Console provides a convenient method for viewing performance and statistics data generated by the InfoSphere CDC engines. But sometimes you need to automate the retrieval of this information, for example, to build an offline history of performance or perform automated application monitoring through third-party solutions. The InfoSphere CDC Java API provides classes to access this information.
Figure 9-28 shows an overview of the process to access live performance and statistics data on the data store.
Figure 9-28 Monitoring latency
Here is a brief description of the processes shown in Figure 9-28:
1. Connect to the Access Server (create a DataSource object).
2. Create the MonitorAgentListener, a monitor that listens to all incoming messages from data stores and directs them to a MonitorAgentMessageHandler object.
3. Send the performance statistics request to the data store.
4. The data store returns (asynchronously) the response to the MonitorAgentListener object (through the Access Server).
5. MonitorAgentListener accepts the message and starts methods of the MonitorAgentMessageHandler interface.
The dashed lines between MonitorAgentLister and the Access Server represent communication between them, after the listener has been established.
After the connection to the Access Server (DataSource) is established, set up a listener that accepts incoming messages from the data stores. The sample code to perform this task is shown in Example 9-63.
Example 9-63 Sample code to set up a listener
/**
* Configures and starts the monitor agent listener for incoming statistics
* messages
*
* @param accessServerHost
* - host name of the access server
* @param accessServerPort
* - port of the access server
* @return No return value
* @throws Exception
*/
private CDCMonitorHandler setMonitorAgentListener(DataSource accessServer,
String accessServerHost, int accessServerPort) throws Exception {
CDCMonitorHandler monitorHandler = null;
MonitorAgentListener monitorAgentListener = null;
 
monitorHandler = new CDCMonitorHandler();
monitorAgentListener = new MonitorAgentListener(accessServerHost,
accessServerPort, 0, 0, monitorHandler);
monitorAgentListener.setDaemon(true);
 
if (monitorAgentListener.init()) {
monitorAgentListener.start();
if (monitorAgentListener.isAlive()) {
accessServer.setMonitorListeningPort(
monitorAgentListener.getHostName(),
monitorAgentListener.getPort());
System.out.println("MonitorAgentListener listens on host "
+ monitorAgentListener.getHostName() + " and port "
+ monitorAgentListener.getPort() + ".");
}
}
return monitorHandler;
}
In Example 9-63 on page 348, a MonitorAgentListener object is created, with host name, port, and MyMonitorHandler object, and is initialized and started as a daemon (background) thread. This action starts a listener process on a new port that is assigned by the TCP/IP stack. Then check if the monitor is alive and let the Access Server know that any monitor information must be passed to the monitor agent listener.
After the listener has been started, you must request the data from the data stores. Information about throughput and statistics is requested from the subscribers (target data stores) in the context of a specific subscription. The requestPerformanceData() method requests statistics from data stores. Depending on the version of the InfoSphere CDC engines, you must determine how to obtain the statistics from the data stores. Starting with InfoSphere CDC V6.5, enhanced monitoring is supported and many statistics figures can be retrieved from those data stores. To understand if the InfoSphere CDC engine represented by the Subscribed object supports enhanced statistics, request the FID_MONITORING_LEVEL feature from the engine. The sample code to perform this action is shown in Example 9-64.
Example 9-64 Requesting performance statistics
/**
* Issue statistics requests to target datastore for a subscription
*
* @param subscription
* - The subscription for which the statistics are requested
* @param targetDataStore
* - Target datastore of the subscription
* @param monitorHandler
* - The object that will handle the statistics responses
*
* @return No return value
*/
private void requestPerformanceData(Subscription subscription,
Subscriber targetDataStore, CDCMonitorHandler monitorHandler)
throws ApiException, InterruptedException {
// Check if the subscription is active
byte[] subscriptionStatus = subscription.getLiveActivityStatus();
Publication publication = getSubscriptionPublication(subscription,
targetDataStore);
// If subscription is active, request statistics
if (subscriptionStatus[1] == Subscription.LIVE_STATUS_ACTIVE) {
// Issue 10 requests
for (int i = 1; i <= 10; i++) {
System.out.println("Request " + i);
// Check if datastore supports enhanced statistics
if (targetDataStore.getFeature(TsFeatureSet.GID_GENERIC,
TsFeatureSet.FID_MONITORING_LEVEL).equals("1")) {
// Issue InfoSphere CDC 6.5 request
monitorHandler.requestTargetOperationalStatistics(
targetDataStore, publication.getPublisherID());
} else {
// Issue InfoSphere CDC 6.3 requests
targetDataStore.sendGetPerformanceData(publication
.getPublisherID());
targetDataStore.sendGetStatisticsData(
publication.getPublisherID(), "");
}
Thread.sleep(1000);
}
} else {
System.err.println("Subscription " + subscription.getName()
+ " is not active, cannot retrieve statistics.");
}
}
After checking that the subscription is active, the code issues 10 performance requests to the target data store. Depending on whether the data store supports enhanced statistics (which were introduced in InfoSphere CDC V6.5), statistics requests are sent to the target data store. The method for obtaining operational performance statistics has changed dramatically with the introduction of InfoSphere CDC V6.5. If the InfoSphere CDC engine is Version 6.5 or later, you can use the CDCMonitorHandler class requestTargetOperationalStatistics() method. For Version 6.3 and earlier, enhanced statistics are not supported and you use the sendGetPerformanceData() and sendGetStatisticsData() methods from the Subscriber class. Requests are issued with a three second pause between them.
Responses from data stores arrive asynchronously and they start methods in the MonitorAgentMessageHandler interface, which is implemented by the CDCMonitorHandler class. InfoSphere CDC V6.5 data stores start different methods in this interface than previous versions when the statistics messages arrive. InfoSphere CDC V6.5 data stores can be handled in the same way as InfoSphere CDC V6.3 data stores, but only a limited set of data is available. All the methods are implemented in the CDCMonitorHandler class, which was supplied as an argument to MonitorAgentListener object.
To handle latency data that arrives from InfoSphere CDC V6.3 data stores, the handleLatencyUpdate() method in the MonitorAgentMessageHandler interface should be implemented. The sample code to perform this action is shown in Example 9-65.
Example 9-65 Handling latency data for InfoSphere CDC V6.3
/**
* Handle latency message that arrives from InfoSphere CDC 6.3 datastore
*
* @param inDataStream
* - stream of data that arrives from datastore
* @return No return value
*/
public void handleLatencyUpdate(DataInputStream inDataStream) throws IOException, DataNotFoundException {
 
// Populate query object with the data arrived
AbstractQuery query = new Query_5002(null);
Result result = query.receiveResult(inDataStream);
 
// Get latency records from the stream
Context[] latencyRecords = result.getContexts(Query.LatencyRecord);
 
// Iterate over the records and print, if there are more than one
for (int i = 0; i < latencyRecords.length; i++) {
long latencyValue = latencyRecords[i].getLong(Query.LatencyValue) / 1000;
System.out.println("Latency: " + latencyValue + " seconds");
}
}
Your main thread could be busy while the statistics messages arrive. So, the code shown in Example 9-65 demonstrates the processing of an array of latency records.
For every sendGetPerformanceData() invocation, a message from the data store arrives and triggers invocation of this method. Subscription latency, expressed in milliseconds, is extracted from the input data stream and printed.
To handle statistics data (number of inserts, updates, and deletes), the handleStatisticsUpdate() method, shown in Example 9-66, should
be implemented.
Example 9-66 Handling operational statistics for InfoSphere CDC 6.3
/**
* Handle operation statistics message that arrives from InfoSphere CDC 6.3 datastore
*
* @param inDataStream
* - stream of data that arrives from datastore
* @return No return value
*/
public void handleStatisticsUpdate(DataInputStream inDataStream) throws IOException, DataNotFoundException {
 
// Populate query object with the data arrived
AbstractQuery query = new Query_5003(null);
Result result = query.receiveResult(inDataStream);
 
if (result.hasData(Query.StatisticsRecord)) {
Context[] statisticsRecords = result.getContexts(Query.StatisticsRecord);
 
// Go over the messages received and based on the type of the message print the data
   for (int i = 0; i < statisticsRecords.length; i++) {
String name = statisticsRecords[i].getString(Query.FieldName);
String value = statisticsRecords[i].getString(Query.FieldValue);
 
if (name.compareToIgnoreCase("APPLY_UPDATE_COUNT") == 0) {
System.out.println("Updates: " + value);
} else if (name.compareToIgnoreCase("APPLY_DELETE_COUNT") == 0) {
System.out.println("Deletes: " + value);
} else if (name.compareToIgnoreCase("APPLY_INSERT_COUNT") == 0) {
System.out.println("Inserts: " + value);
}
}
}
}
This method is run for every sendGetStatisticsData() invocation in the program. It is important to remember that data stores begin to collect data from the first time it was requested. So for the first run of the program, all statistics begin from 0 (even if thousands of operations were performed before) and every other execution shows values updated since the first request.
InfoSphere CDC V6.5 data stores have enhanced monitoring capabilities, and can provide a significant amount of statistical information that was not available in prior versions of InfoSphere CDC. Therefore, accessing that data requires different methods to be used. Because there is much more information that can be requested, not all data arrives by default, and specific categories should be defined in the request. Requests to the target data store is defined in the requestTargetOperationalStatistics() method (Example 9-67).
Example 9-67 Requesting statistical information for InfoSphere CDC V6.5
/**
* Request operational statistics from InfoSphere CDC 6.5 datastore
*
* @param subscriber
* - subscriber that will send messages
* @param subscriptionName
* - subscription name
* @return No return value
*/
public void requestTargetOperationalStatistics(Subscriber subscriber, String subscriptionName)
throws ApiException {
 
targetStatisticData = new PerformanceStatisticData();
 
StatisticDefinition definition;
 
// Get the supported statistics from the replication subscriber.
StatisticCategory[] categories = subscriber.getStatisticDefinitions();
for (StatisticCategory category : categories) {
if (category.getId() == StatisticConstants.CATEGORY_SUBSCRIPTION_TARGET) {
definition = category
.getDefinition(StatisticConstants.TARGET_APPLY_INSERT_OPERATIONS);
if (definition != null) {
targetStatisticData.addDefinition(definition);
}
 
definition = category
.getDefinition(StatisticConstants.TARGET_APPLY_UPDATE_OPERATIONS);
if (definition != null) {
targetStatisticData.addDefinition(definition);
}
 
definition = category
.getDefinition(StatisticConstants.TARGET_APPLY_DELETE_OPERATIONS);
if (definition != null) {
targetStatisticData.addDefinition(definition);
}
 
definition = category.getDefinition(StatisticConstants.TARGET_LATENCY);
if (definition != null) {
targetStatisticData.addDefinition(definition);
}
}
}
 
// Perform the request
requestStatistics(subscriber, targetStatisticData, false, subscriptionName);
}
The targetStatisticsData object of class PerformanceStatisticData is defined as a data member in the CDCMonitorHandler class. From the subscriber, obtain all statistic definitions you are interested in and add them to the PerformanceStatisticData object, which is sent as part of the request to the data store. The requestStatistics() method performs the request and is used because it can send requests both to source and target data stores, because only the part of building the statistics definitions is different (Example 9-68).
Example 9-68 Request statistics from both source and target
/**
* Requests the statistics from an agent as identified by the statistic data
* object. The object must only contain supported statistics and must not
* mix source and target statistics in the same request.
*
* @param replicationRole
* - the publisher or subscriber agent.
* @param statisticData
* - the statistic data object that identifies which statistics
* to request.
* @param isSource
* - true if the request is made to the source agent, false if
* made to the target agent.
* @param subscriptionName
* - the name of the subscription if the request is for the
* source, or the published subscription ID if the request is for
* the target.
* @throws ApiException
* - if any exception occurs.
*/
private void requestStatistics(ReplicationRole replicationRole, StatisticData statisticData,
boolean isSource, String subscriptionName) throws ApiException {
Query query = replicationRole.getQuery(Query.MSG_GET_STATISTIC_VALUES);
 
ArrayList<StatisticDefinition> statisticDefinitions =       statisticData.getDefinitions();
 
query.setInt(Query.Correlator, statisticData.getCorrelatorId());
 
// Initialize agentType according to the type of the datastore byte agentType = 0;
if (!isSource)
agentType = 1;
 
// Build the request object
query.setByte(Query.AgentType, agentType);
query.setByte(Query.ContextObjectType,
StatisticContextObjectType.SUBSCRIPTION.getMessageValue());
query.setString(Query.SubscriptionName, subscriptionName);
query.setString(Query.TableOwner, ""); //$NON-NLS-1$
query.setString(Query.TableName, ""); //$NON-NLS-1$
 
// Add all statistics definitions
for (StatisticDefinition definition : statisticDefinitions) {
query.addInt(Query.StatIds, definition.getId());
}
 
query.setLong(Query.BeginTime, 0);
query.setLong(Query.EndTime, 0);
query.setInt(Query.SampleCount, 1);
 
// Send the request to the datastore
query.request();
}
In this method, the Query object is built, based on all statistics definitions that are added and the type of the data store, and it is sent to the data store. Unlike a InfoSphere CDC V6.3 data store, all response messages arrive at the same (handleStatisticValues()) method, regardless whether the latency or operations was requested (Example 9-69).
Example 9-69 Handling statistics for InfoSphere CDC V6.5
/**
* Handles the returned statistics from an agent as identified by the
* statistic data object. The object must only contain supported statistics
* and must not mix source and target statistics in the same request.
*
* @param dataStoreHost
* - host name of the datastore
* @param dataStorePort
* - port of the datastore
* @param messageCorrelator
* - correlation id of the response (to match request)
* @param maMessageBodyLength
* - length of the message body
* @param maMessageBody
* - message body
* @return No return value
* @throws ApiException
* - if any exception occurs.
*/
public void handleStatisticValues(String dataStoreHost, int dataStorePort,
int messageCorrelator, int maMessageBodyLength, byte[] maMessageBody)
throws IOException, DataNotFoundException {
 
PerformanceStatisticData statisticData = null;
 
// Obtain statistics received
if (targetStatisticData != null
&& targetStatisticData.getCorrelatorId() == messageCorrelator) {
statisticData = targetStatisticData;
}
 
// If it's not null
if (statisticData != null) {
statisticData.loadData(maMessageBody);
 
long value;
 
// Extract all the data that was sent and print it
value = statisticData
.getCurrentValueById(StatisticConstants.TARGET_APPLY_UPDATE_OPERATIONS);
System.out.println("Updates: " + value);
 
value = statisticData
.getCurrentValueById(StatisticConstants.TARGET_APPLY_DELETE_OPERATIONS);
System.out.println("Deletes: " + value);
 
value = statisticData
.getCurrentValueById(StatisticConstants.TARGET_APPLY_INSERT_OPERATIONS);
System.out.println("Inserts: " + value);
 
value = statisticData
.getCurrentValueById(StatisticConstants.TARGET_LATENCY);
System.out.println("Latency: " + value + " seconds");
}
}
When this method is called, upon message arrival, the data is extracted from the message and printed out. In this example, only latency and operations are requested and received, but in principle everything that appears in Management Console could be extracted.
The code shown in Example 9-70 starts the requestPerformanceData() method, which sends 10 requests to the target InfoSphere CDC data store.
Example 9-70 Request performance data
// Set up the monitor handler
CDCMonitorHandler monitorHandler = setMonitorAgentListener(
accessServer, asHost, asPort);
// Send the requests to get subscription statistics
requestPerformanceData(subscription, db2DataStore, monitorHandler);
When the code is run, the output could look something like the output shown in Example 9-71. We deliberately included some delays in the apply process to be able to show some latency.
Example 9-71 Sample output from performance data request
MonitorAgentListener listens on host 127.0.1.1 and port 50738.
Request 1
Updates: 200
Deletes: 92
Inserts: 1262
Latency: 17 seconds
Request 2
Updates: 200
Deletes: 92
Inserts: 1309
Latency: 22 seconds
Request 3
Updates: 200
Deletes: 92
Inserts: 1309
Latency: 22 seconds
Request 4
Updates: 200
Deletes: 92
Inserts: 1309
Latency: 22 seconds
Request 5
Updates: 200
Deletes: 92
Inserts: 1309
Latency: 22 seconds
Request 6
Updates: 200
Deletes: 92
Inserts: 1309
Latency: 22 seconds
Request 7
Updates: 200
Deletes: 92
Inserts: 1358
Latency: 27 seconds
Request 8
Updates: 200
Deletes: 92
Inserts: 1358
Latency: 27 seconds
Request 9
Updates: 200
Deletes: 92
Inserts: 1358
Latency: 27 seconds
Request 10
Updates: 200
Deletes: 92
Inserts: 1358
Latency: 27 seconds
9.5.19 Monitoring event logs using the API
Event logs are kept in every InfoSphere CDC engine's metadata repository or in operating system objects (System i and System z). In InfoSphere CDC, every event is represented by an object of type TsEvent. A TsEvent object contains all the information about the event, such as date and time, ID, type (Information and Error as examples), originator module of the event, and the text itself. If an event is an object of type TsEvent, the sample content could be as shown in Example 9-72.
Example 9-72 Event sample content
event.getTime() : 035247
event.getDate() : 20110330
event.getID() : 1463
event.getSubscription(): REDSCRIPT
event.getType() : 4 (means Information)
event.getOriginator() : com.datamirror.ts.source.subscription.SubscriberProxy
event.getInformation() : +++ Subscription REDSCRIPT is starting in Continuous
Mirroring mode.
The getType() method returns a byte that indicates the severity of the message. The method that translates this code into a meaningful description is shown in Example 9-73.
Example 9-73 Method to translate the message event type
/**
* Return description of the event type code
*
* @param type
* - byte, returned by event.getType() method
* @return Description of the type
*/
private String translateEventType(byte type) {
switch (type) {
case TsEvent.TYPE_ALL:
return "All";
case TsEvent.TYPE_DIAGNOSTIC:
return "Diagnostic";
case TsEvent.TYPE_ERROR:
return "Error";
case TsEvent.TYPE_ESCAPE:
return "Escape";
case TsEvent.TYPE_INFORMATION:
return "Information";
case TsEvent.TYPE_NOTICE:
return "Notice";
case TsEvent.TYPE_WARNING:
return "Warning";
}
return "Unknown";
}
The event log is retrieved as Enumeration of TsEvent objects by the getEvents() method, which is starts on Subscription or Publication (subscription level events) objects, or Publisher or Subscriber (data store level events). We have created a single method that shows the events (Example 9-74).
Example 9-74 Method to show log events
/**
* Show events that were collected in the <TsEvent> enumeration
*
*/
private void showEvents(Enumeration<TsEvent> events) {
int numberEvents = 0;
while (events != null && events.hasMoreElements()) {
TsEvent event = events.nextElement();
System.out.println(event.getTime() + "|" + event.getDate() + "|"
+ event.getID() + "|" + translateEventType(event.getType())
+ "|" + event.getOriginator() + "|"
+ event.getInformation());
numberEvents++;
}
System.out.println(numberEvents + " events displayed.");
}
Subscription event log
As previously mentioned, information about subscription is contained in both source and target engines, so events that are related to a specific subscription is generated on them both, depending on where the event took place. To retrieve all subscription events, you need the Subscription and Publication object. The sample code to perform this task is shown in Example 9-75.
Example 9-75 Show subscription events
/**
* Show subscription events stored on the source and target
*
* @param Subscription
* - The subscription for which to show source and target events
* @param targetDataStore
* - Target engine of the subscription
* @param subscriptionName
* - Name of the subscription
* @return No return value
*/
private void showSubscriptionEventLog(Subscription subscription,
Subscriber targetDataStore) throws ApiException {
// Object to hold event log
Enumeration<TsEvent> events = null;
// Get events of the subscription from the source
events = subscription.getEvents(true);
System.out.println("SOURCE EVENTS FOR SUBSCRIPTION "
+ subscription.getName() + ":");
showEvents(events);
// Get publication for the subscription
Publication publication = getSubscriptionPublication(subscription,
targetDataStore);
if (publication != null) {
// Get events of the subscription from the target
events = publication.getEvents(true, Publication.REPLICATION_LOG);
System.out.println("TARGET EVENTS FOR SUBSCRIPTION "
+ subscription.getName() + ":");
showEvents(events);
}
}
For the passed Subscription object, getEvents() is starts and the events populate variable events of type Enumeration<TsEvent>. The method has one Boolean parameter, recentFirst. When set to true, the events are sorted from the most recent to the oldest, and when set to false, the oldest events appear first and the most recent last.
Then, the Publication object for the subscription is retrieved from the target data store and the events are retrieved from that object as well. In both cases, the showEvents() method is called to show the events.
Data store event log
The event log of every data store can be retrieved by starts the getEvents() method on the ReplicationRole object or any of its subclasses (Publisher or Subscriber). Assuming you have the ReplicationRole object, Example 9-76 shows the sample code to retrieve the event log:
Example 9-76 Show data store events
/**
* Show datastore events
*
* @param dataStore
* - The source or target datastore to show the events for
* @return No return value
*/
private void showDatastoreEventLog(ReplicationRole dataStore)
throws ApiException {
 
// Object to hold event log
Enumeration<TsEvent> events = null;
// Get events of the datastore
events = dataStore.getEvents(true);
System.out.println("EVENTS FOR DATASTORE " +       dataStore.getDescription() + ":");
showEvents(events);
}
After the connection to the data store is established, events are retrieved and printed similar to the way it was done from the subscription. The output of the method is similar to the output of subscription events, but with the events that are related to the data store and not a specific subscription.
Single scrape event log
Retrieving the events of a single scrape is different. The sample method that retrieves and prints events is shown in Example 9-77.
Example 9-77 Retrieving single scrape events
/**
* Show single scrape events
*
* @param sourceDataStore
* - Publisher that holds the single scrape
* @return No return value
*/
private void showSingleScrapeEventLog(Publisher sourceDataStore)
throws ApiException {
// Object to hold event log
Enumeration<TsEvent> events = null;
// Get events of the datastore
events = new EventLog(sourceDataStore, (byte) 2, null).getEvents(true);
System.out.println("SINGLE SCRAPE EVENTS FOR DATASTORE "
+ sourceDataStore.getDescription() + ":");
showEvents(events);
}
The difference from the previous sample is the way the EventLog object is instantiated by passing the source data store and type of events as the arguments. The single scrape event log is of type (byte)2. Then, on this object starts the getEvents() method. As you can see in the method's parameters, expect a Publisher object to be passed to it. Because single scrape only exists in source data stores, prevent sending data stores of type target (such as InfoSphere CDC DataStage) to the method and encounter errors in trying to retrieve an event log that does not exist. The output is similar to other methods, with different events.
Showing the event logs
We now combine the three methods in the code shown in Example 9-78.
Example 9-78 Event log display methods
showDatastoreEventLog(oracleDataStore);
showSingleScrapeEventLog(oracleDataStore);
showSubscriptionEventLog(subscription, db2DataStore);
showDatastoreEventLog(db2DataStore);
When you run this sample, the output shown in Example 9-79 is produced (not all events are shown).
Example 9-79 Output from showing event logs
EVENTS FOR DATASTORE InfoSphere CDC Oracle Redo 6.5:
0 events displayed.
 
SINGLE SCRAPE EVENTS FOR DATASTORE InfoSphere CDC Oracle Redo 6.5:
143324|20110415|2917|Information|com.datamirror.ts.util.oracle.OracleRedoNativeApi|IBM InfoSphere Change Data Capture daemon has reported an informational message. Redo log scraping started at position '1855797.79260.332.16786944' timestamp 'FriApr 15 14:32:40 2011'.
143324|20110415|2917|Information|com.datamirror.ts.util.oracle.OracleRedoNativeApi|IBM InfoSphere Change Data Capture daemon has reported an informational message. Started on-line redo log file '/oradata/cdcdemo/redo03.log'. Redo log processing has been initiated on the on-line file '/oradata/cdcdemo/redo03.log'. The current sequence is 37. The low scn is 1811661. The low timestamp is Thu Apr 14 22:19:35 2011. The next scn is -. The next timestamp is -.
143324|20110415|2917|Information|com.datamirror.ts.util.oracle.OracleRedoNativeApi|IBM InfoSphere Change Data Capture daemon has reported an informational message. New scrape point specified by redo log position '1855797.0.0.0'. The previously recorded redo log position is '0.0.0.0'. The corresponding previous redo timestamp is 'Thu Jan 1 01:00:00 1970'. A user command has specified the new starting redo log position '1855797.0.0.0'.
3 events displayed.
 
SOURCE EVENTS FOR SUBSCRIPTION REDSCRIPT:
143324|20110415|2922|Information|com.datamirror.ts.scrapers.singlescrape.SingleScrape Thread|Subscription REDSCRIPT has started using the single scrape staging store.
143320|20110415|44|Information|com.datamirror.ts.source.replication.MirrorModerator|Mirroring has been initiated for table CDCDEMO.PRODUCT.
143320|20110415|44|Information|com.datamirror.ts.source.replication.MirrorModerator|Mirroring has been initiated for table CDCDEMO.DEMO_CUSTOMER.
143320|20110415|1437|Information|com.datamirror.ts.source.replication.MirrorModerator|Table CDCDEMO.PRODUCT refresh to REDSCRIPT is complete. 231 rows were sent.
143320|20110415|225|Information|com.datamirror.ts.source.replication.MirrorModerator|Table CDCDEMO.PRODUCT refresh to REDSCRIPT has been confirmed by the target system. 231 rows were received, 231 rows were successfully applied, 0 rows failed.
143318|20110415|9703|Information|com.datamirror.ts.source.tablereader.TableReader|Method used for Refresh on source: JDBC
143318|20110415|223|Information|com.datamirror.ts.source.replication.MirrorModerator|Table CDCDEMO.PRODUCT will be refreshed to REDSCRIPT .
26 events displayed.
 
TARGET EVENTS FOR SUBSCRIPTION REDSCRIPT:
143329|20110415|6673|Information|com.datamirror.ts.target.publication.TargetMirrorApplyJob|IBM InfoSphere Change Data Capture will commit on source transaction boundaries.
143320|20110415|227|Information|com.datamirror.ts.target.publication.TargetRefreshApplyJob|Refresh was completed for table CDCDEMO.PRODUCT_TARGET. 231 rows were received, 231 rows were successfully applied, 0 rows failed.
143319|20110415|226|Information|com.datamirror.ts.target.publication.TargetRefreshHandler|Refresh was started for table CDCDEMO.PRODUCT_TARGET.
143319|20110415|322|Information|com.datamirror.ts.target.apply.udb.fastload.UDBFastloadApply|The table CDCDEMO.PRODUCT_TARGET was cleared.
143318|20110415|6693|Information|com.datamirror.ts.target.publication.TargetRefreshApplyJob|DB2 Fastload apply mode is used for refresh.
143318|20110415|227|Information|com.datamirror.ts.target.publication.TargetRefreshApplyJob|Refresh was completed for table CDCDEMO.DEMO_CUSTOMER_TARGET. 228,117 rows were received, 228,117 rows were successfully applied, 0 rows failed.
143236|20110415|226|Information|com.datamirror.ts.target.publication.TargetRefreshHandler|Refresh was started for table CDCDEMO.DEMO_CUSTOMER_TARGET.
143235|20110415|322|Information|com.datamirror.ts.target.apply.udb.fastload.UDBFastloadApply|The table CDCDEMO.DEMO_CUSTOMER_TARGET was cleared.
143234|20110415|6693|Information|com.datamirror.ts.target.publication.TargetRefreshApplyJob|DB2 Fastload apply mode is used for refresh.
54 events displayed.
 
EVENTS FOR DATASTORE InfoSphere CDC DB2 6.5:
0 events displayed.
9.6 Monitoring and integration with external monitoring solutions
The previous sections described configuring InfoSphere CDC so that it is optimal for your environment. The sections demonstrated the usage of several special implementation topologies, such as large distributions and consolidations, and optimizing InfoSphere CDC for high volumes.
For many clients, InfoSphere CDC is a business critical application or a component of another business process upon which users depend. After you have set up your environment and deployed the configuration in your production environment, ensure that the replication functions as expected, that transactions are delivered to the intended destinations, and that the latency meets
your requirements.
The Management Console offers functionality to configure, operate, and monitor your replication environment and can show an integral overview of your entire replication landscape. The Management Console has a InfoSphere CDC focus and is interactive, being a graphical user interface, and does not lend itself to integration with systems or business monitoring solutions.
For example, you could have chosen InfoSphere CDC to do the transportation of orders from your website to your order processing application, and at the same time provide feedback about current stock levels to your web customers. You expect that orders entered on the web are instantly made available to the back-end application to be processed for order picking and shipment and that there is maximum latency for the delivery of the order data (for example, less than 5 minutes). At the same time, you want to provide your customers with stock information that is up to date in real time so that they know that certain goods are readily available for shipment.
This section describes the various InfoSphere CDC components that can be monitored and provides ideas about how this monitoring can be integrated with your systems and business monitoring solution.
9.6.1 Components to monitor
The key InfoSphere CDC items you want to monitor are the replication latency of any subscriptions going from the web server to the order processing application (placed orders) and the subscriptions in the reverse direction (stock information).
There are several methods for monitoring the latency of the replication, which are described in Chapter 8, “Performance analysis and design considerations” on page 211. If the lag time between generation of transactions on the source and arrival on the target is too long, there could be several reaons for this lag. Some reasons include the subscription being slow due to congestion on the intermediate network or even the target InfoSphere CDC instance being down, rendering the subscriptions inactive as a result.
9.6.2 InfoSphere CDC instance activity
Subscription activity is directly dependent on the InfoSphere CDC source and target instances running well. Should either instance not be active, subscriptions cannot be kept active and stop. On the server itself, the activity of the InfoSphere CDC instance can be monitored by interrogating the operating system processes. If you want to consolidate the monitoring of InfoSphere CDC and other processes to a single location, use the Java API to ping or connect to the data store to determine its activity. If the data stores cannot be reached, you can then notify the systems monitor of this condition.
Examples for checking the InfoSphere CDC instance activity from the server have been provided in 9.4.5, “Checking an InfoSphere CDC engine and subscriptions activity” on page 246. If you want to implement the instance activity checking using the Java API, see 9.5.4, “Connecting to the data stores” on page 277.
9.6.3 Subscription activity
Starting with InfoSphere CDC V6.5 (and earlier releases for InfoSphere CDC for System z or DB2), subscriptions can be marked persistent, meaning that replication initiates a normal shutdown under certain circumstances, such as interruptions in the network communications. InfoSphere CDC attempts to automatically restart continuous mirroring for persistent subscriptions at regular intervals. This persistence keeps the replication environment active at almost all times and the need for continuously checking the replication activity might no longer be a strong requirement. There might still be reasons why subscriptions stop, such as apply failures due to data inconsistency or even an operator error.
Subscription activity must be monitored on the source server of the subscription. The source InfoSphere CDC engine provides the only valid entry point for checking this activity. Even though it is possible on some operating systems to check the activity of InfoSphere CDC target processes, do not use this information as the only source for determining activity. You can assume that if the subscription is reported active by the InfoSphere CDC source engine that the end-to-end process is running fine. If there is a condition that causes the target replication processes to terminate, the InfoSphere CDC heartbeat feature ensures that the source process also stops, and vice versa.
In some environments, customers have a mix of subscriptions that should be active at all times, and other subscriptions that are only started occasionally (for example, for refreshing certain tables on a daily basis). When you design your activity monitoring, implement a naming convention that allows the monitoring process to distinguish between these classes of subscriptions. If you consider using the Java API to monitor subscription activity, you could mark the subscription activity service levels in the description of the subscription and analyze it through the API.
Examples for checking subscription activity from the server have been provided in 9.4.5, “Checking an InfoSphere CDC engine and subscriptions activity” on page 246. If you want to implement the subscription activity checking using the Java API, see 9.5.17, “Starting, stopping, and monitoring subscriptions” on page 339.
9.6.4 Events
InfoSphere CDC distinguishes the areas where events can occur and are logged in the data store event logs (source and target), subscription event logs (source and target), or single scrape event log (at the source side; it is available for the InfoSphere CDC V6.5 Linux, UNIX, and Windows engine only).
If, for example, the replication stops because of a duplicate key in the target table, this error is logged in the target subscription event log. As a result of stopping the subscription in an abnormal fashion, a number of errors are also logged in the source subscription event log.
InfoSphere CDC offers a number of possibilities to monitor the events that are logged by the InfoSphere CDC engines:
Show the individual event logs from the Management Console
Monitoring perspective.
Use the Management Console to export the event logs in a text or comma-separated values file (events in all areas can be brought together).
Collect the events on the source and target servers running InfoSphere CDC, using either the InfoSphere CDC engine or operating system commands.
Configure notifications for the InfoSphere CDC engines or subscriptions to conditional action events that occur and forward these events to email (available for the Linux, UNIX, and Windows engine), message queues (System i), SYSLOG (System z), CHCPRINT spooled member (System z), or start a notification user exit program (all engines).
Create a custom program employing the Java API to gather the event logs from source and target engines / subscriptions.
If you want to integrate the InfoSphere CDC events with a systems monitoring solution (such as IBM Tivoli Monitoring), the collection of event logs from the Management Console is not suitable. When gathering events on the servers running InfoSphere CDC, the source and target subscription events must be collected on the source and target server. To determine the cause of a replication failure, you typically need to explore both source and target events to make a correct assessment of what exactly went wrong.
Event logs can be collected on source and target InfoSphere CDC servers using the techniques described in “Monitoring the event logs” on page 250. Your external monitoring solution should then be configured to collect the event log output or monitor the objects on these servers (use the output of dmshowevents for Linux, UNIX, and Windows engine, a message queue object for System i, and SYSLOG for System z), and take act accordingly.
If your external monitoring solution does not support all the InfoSphere CDC platforms or if you have specific requirements for feeding the monitoring solution, the Java API and writing notification user exits provide the most flexible options. Using the API, you can retrieve the event logs of all InfoSphere CDC engines that have been registered using the techniques described in 9.5.19, “Monitoring event logs using the API” on page 359. Alternatively, if you want the InfoSphere CDC engine to send notifications to your external monitoring solution, you could use the notifications user exits described in 9.7.7, “Notifications” on page 425.
9.6.5 Latency
If subscriptions have a backlog of transactions to process, the subscriptions are latent. InfoSphere CDC provides functionality in the Management Console to view the latency of a subscription and monitor it over time. Many customer environments have critical business processes that depend on the timeliness of the data that is transported and applied through InfoSphere CDC, so these items can be an important one to monitor.
For example, because of production systems overloading, you might decide that reporting is primarily done from a reporting server. Business users make their decisions based on the information that they obtain through the reports, which creates a dependency on the accuracy and timeliness of the data replicated to this reporting server.
Management Console allows you to set latency notification thresholds (in number of minutes) for subscriptions, specifying a threshold for warning and one for error. When the latency of the subscription exceeds the configured latency, the replication process starts sending events to the target subscription event log (the target side is where latency is measured).
Monitoring the latency then becomes a matter of monitoring the event log, which is described in “Monitoring the event logs” on page 250.
9.7 User exits
InfoSphere CDC lets you define subroutines that are starts when a predefined event occurs. You can use these user exits to customize the replication and related functionality to fit your business requirements.
Among other uses, user exits can be starts before or after database operations are being applied to the target or to calculate a column value to be assigned to a column on the target side.
This section describes the most commonly used user exit points and provides guidelines about how to implement them in the InfoSphere CDC environment. Additionally, a number of examples are provided that help you design your own user exit programs.
9.7.1 Common uses for user exits
The most common use case for InfoSphere CDC user exit programs is execution of additional actions on the apply side of the replication process. Your subscriptions replicate changes from a source to a target database and, based on the type of operation, the user exit program is starts to perform
custom routines.
For example, in an application integration scenario, you could exchange the data from your source application with your target application using database tables. Then, as application transactions are applied to the target database, you might want to notify the receiving application that there is a transaction to be processed by using a message on a queue. The target application monitors the queue for new incoming messages, and when a message arrives, picks up the transaction details from the tables that have been populated by the subscription
application process.
As an alternative, the process could build an XML document based on the incoming operations and then post this document on an Enterprise Service Bus when the transaction is committed.
Another frequent use of custom code is for row filtering. Assume that you want to filter rows based on a lookup in a master table that only has a few rows. InfoSphere CDC can use a %GETCOL function in the row filtering to retrieve the values of this master table for every operation that was read from the log. If the filtering is then defined for a table that has high volumes, this situation could potentially lead to an additional impact on the source. One of the solutions to reduce the workload on the source is to write a user exit program that reads the entries from the master table into a cache when it is called, and then use the cached entries from that moment onwards.
Row filtering and user exits are also sometimes used for scaling the replication if the target database can only handle a certain volume throughput of operations in a single database session. By using a row filter on the key columns, you can then share the work for this single table across multiple subscriptions, and then initiate multiple database sessions on the target side and increase the throughput. Using a derived expression, a user exit provides additional flexibility when designing the row filter. For example, if the replicated table has a numeric key column, a user exit could calculate the modulo value of this numeric key with a certain divisor and allow you to use the remainder in your row selection.
Coding and using the modulo function is described in 9.7.3, “Derived expression user exits” on page 373.
9.7.2 User exit programs
Keep your code for the user exit as efficient as possible. When you start a user-written program at a defined exit point, it is important to realize that a call is issued each time a clear, insert, update, or delete operation is applied to a target table. Therefore, when data replication activity is high, overall throughput and performance impact is affected by the actions that are implemented through the code in the user-written programs.
Java user exit programs
When developing Java user exit programs to interface with InfoSphere CDC, you should be aware that a specific environment must be configured. Although most Java Development Kits at Version 1.6 or later can be used to compile classes that use the JAR files that come with the InfoSphere CDC engines, use the IBM JDK for your development environment to be synchronized with the required running environment. All user exit programs that are called from an InfoSphere CDC V6.5 engine run using IBM Java Runtime Engine V1.6.
The class path for compiling your self-developed classes must include the ts.jar file, which can be found in the lib directory of the InfoSphere CDC engine product. Consider copying the ts.jar file to your development environment if your development environment does not have direct access to the directories that contain the InfoSphere CDC product.
 
ts.jar file: Keep in mind that the ts.jar file integrates with the version of InfoSphere CDC that is running in your environment. Should a new version of InfoSphere CDC be installed, it might be necessary to replace the copy of the ts.jar file with the latest version.
To let the InfoSphere CDC engine discover the user exit classes, the classes must be placed in the class path of the engine. A good practice is to store the custom classes in the cdc_home/lib directory. If you change the user exit programs, you must restart the InfoSphere CDC instance to ensure that the JVM picks up the new version of the class.
System i user exit programs
System i user exit programs for InfoSphere CDC can be written in any original program model (OPM) or integrated language environment (ILE) compliant language, such as RPG, COBOL, C, or CL.
In order for the InfoSphere CDC subscription jobs to find your user exit programs, the programs must be in the library list of the jobs running the InfoSphere CDC subscription. This library could be the product library or any library that is in the library list of the DMCJOBD job description. To avoid overwriting your custom programs when installing new releases of InfoSphere CDC, place the programs in a separate library.
When you design your user exit programs, remember that loading a program into memory for execution and opening tables (files) are expensive operations. If you intend to use a row-level user exit program to apply changes to a different table, ensure that the program does not open and close files on entry and exit.
Here are some considerations when coding and compiling your user
exit programs:
In RPG, do not set the Last Record (LR) indicator when returning from your program, as this setting causes any open files to be closed and the program removed from memory.
Compile ILE user exit programs with DFTACTGRP(*NO) ACTGRP(*CALLER) so that your user exit program uses the same activation group as InfoSphere CDC and does not have to reinitialize itself on every call.
More information about user exits for InfoSphere CDC on System i can be found at the following address:
System z user exit programs
Programs can be written in any high-level language (including C, COBOL, and Assembler) that supports reentrant coding techniques and standard OS linkage, and uses the z/OS IBM Language Environment®. The user exit programs must be compiled and link-edited to use AMODE 31 (31-bit addressing mode).
The link-edited load modules that represent the user exit must be made accessible to InfoSphere CDC using a CHCUXLIB DD statement in the execution JCL. This load library must not be APF authorized. User exit programs must be successfully link-edited before proceeding to configure them in the
Management Console.
For more information about z/OS user exit programs, go to the following address:
Stored procedure user exits
Besides running user exits using a programming language that is callable from the InfoSphere CDC replication engine, you can also configure stored procedures to be called based on predefined events. Stored procedures are compiled programs that are physically stored in a database and, when called, are run by the database engine. This action could provide a handle for delivering additional scalability when you must run complex operations or calculations.
A typical use of a stored procedure user exit is when you want to replicate the contents of a single table to two target tables. In a standard InfoSphere CDC configuration, this action requires you to create two subscriptions, each mapping the same table but having a different destination table. As subscriptions run independently of each other, the transaction consistency between the target tables can no longer be guaranteed. If you call a stored procedure to operate on the second table (or both tables), information in the target tables is still transaction consistent.
When used in a table-level or row-level exit point, the stored procedures are run in line with the operations that InfoSphere CDC applies into the target tables. Both share a database connection, which ensures that when a stored procedure user exit is called from an after-operation exit point, the changes to the table that were made by InfoSphere CDC are visible to the stored procedure user
exit program.
You can retrieve the columns that were replicated from the source table, journal control columns, and system values by specifying parameters in your stored procedure user exit program. InfoSphere CDC analyzes these parameters and passes the values when it calls the stored procedure.
9.7.3 Derived expression user exits
This section describes derived expression user exits.
Java user exit for derived expression
The UEModuloFilter65 class is an implementation of the InfoSphere CDC derived expression interface and can be called using the %USERFUNC("JAVA") function from within InfoSphere CDC, either on the source or the target. In Example 9-80, the user function is suitable for performing row filtering at
the source.
Example 9-80 Modulo function derived expression user exit
import java.math.BigDecimal;
 
import com.datamirror.ts.derivedexpressionmanager.DEUserExitIF;
import com.datamirror.ts.derivedexpressionmanager.UserExitInvalidArgumentException;
import com.datamirror.ts.derivedexpressionmanager.UserExitInvokeException;
import com.datamirror.ts.util.Trace;
 
/*
Overview:
The user exit performs a modulo operation against a specified column and
determine if it equals the specified remainder value. The parameters passed
into the user exit are:
a) Dividend -> Numeric value to perform the modulo function against
b) Divisor -> Numeric value by which the Dividend will be divided
c) Remainder -> Remainder value to be tested
 
The comparison logic is as follows:
 
<Dividend> % <Divisor> == <Remainder>
 
Instructions:
 
1) Copy the UEModuloFilter65.class file to the <cdc install>/lib directory
2) Go to the mapping details for the table
3) Under Filtering, specify the following
 
%USERFUNC("JAVA","UEModuloFilter65",<Dividend column>, <Divisor value>,
<Remainder>)
 
Return value is boolean, true or false.
*/
 
public class UEModuloFilter65 implements DEUserExitIF {
 
public Object starts(Object[] aobjList)
throws UserExitInvalidArgumentException, UserExitInvokeException {
try {
long dividendColumn = ((BigDecimal) aobjList[0]).longValue();
long divisorValue = ((BigDecimal) aobjList[1]).longValue();
long remainderValue = ((BigDecimal) aobjList[2]).longValue();
return new Boolean(
(dividendColumn % divisorValue) == remainderValue);
} catch (ClassCastException e) {
// Piggyback on the CDC logging facility
Trace.traceAlways(e);
throw new UserExitInvalidArgumentException(
"Invalid number parameter passed "
+ "to the user function, arguments passed: [0]="
+ aobjList[0] + ", [1]=" + aobjList[1] + ", [2]="
+ aobjList[2] + ", Message: " + e.getMessage());
}
}
}
When specified in the row filter for a replicated table, InfoSphere CDC calls the starts() method of the class for every log entry of that table. Should invalid numerics be passed as parameters, the exception is caught and the InfoSphere CDC tracing facility starts. The exception that is then thrown is logged under the <cdc_home>/instance/<instance>/log directory in the current trace_dmts* file.
You can also use the logging facility if you are in the process of developing your user exit and you want to debug it.
For example, suppose that you have a high volume table that has an integer primary key column (KEYCOLUMN). You want to divide the workload of replicating this table across three subscriptions to have multiple database sessions apply the transactions on this table in parallel. You create three subscriptions, with each subscription mapping the same table. However, in the row filter condition, you specify different values, as shown in the following list:
RF_SUB1: %USERFUNC("JAVA","UEModuloFilter65",KEYCOLUMN,3,0)
RF_SUB2: %USERFUNC("JAVA","UEModuloFilter65",KEYCOLUMN,3,1)
RF_SUB3: %USERFUNC("JAVA","UEModuloFilter65",KEYCOLUMN,3,2)
When processing the operations for this table, each subscription does the calculation of KEYCOLUMN%3 and determines if the remainder value is the same as the remainder value specified as the last parameter. The first subscription replicates all rows where KEYCOLUMN%3 can be divided by 3. The second subscription replicates the rows where there is a remainder value of 1. The third second subscription replicates the rows where there is a remainder value of 2. If the least significant digit of the KEYCOLUMN values is distributed evenly, the three subscriptions complement each other and take one-third of
the workload.
Figure 9-29 defines a row filter according to the outcome of the modulo function.
Figure 9-29 InfoSphere CDC user exit DE-ModuloFilter
After the subscription has finished refreshing the table, the target table appears as shown in Figure 9-30. All the CUSTOMER_IDs shown in the output are divisible by 3 (CUSTOMER_ID%3==0).
Figure 9-30 InfoSphere CDC user exit DE-ModuloFilter output
Another example is a derived expression user exit to calculate the week of the year. For completeness, we provide the source for that class in Example 9-81.
Example 9-81 Week of the year calculation derived expression user exit
import com.datamirror.ts.derivedexpressionmanager.DEUserExitIF;
import com.datamirror.ts.derivedexpressionmanager.UserExitInvalidArgumentException;
import com.datamirror.ts.derivedexpressionmanager.UserExitInvokeException;
import com.datamirror.ts.util.Trace;
 
import java.text.SimpleDateFormat;
 
/*
Overview:
The user exit returns the week of the year for a date. Week number is returned
as an integer. The parameters passed into the user exit are:
a) Date -> Date value for which the week number must be calculated
 
Instructions:
 
1) Copy the UEWeekOfYear.class to the <cdc install>/lib directory
2) Go to the mapping details for the table
3) For the column that must contain the week number, specify the following:
 
%USERFUNC("JAVA", "UEWeekOfYear", <date_column>)
*/
 
public class UEWeekOfYear implements DEUserExitIF {
 
public Object starts(Object[] aobjList)
throws UserExitInvalidArgumentException, UserExitInvokeException {
SimpleDateFormat outWeek = new SimpleDateFormat("ww");
String strWeek = outWeek.format(aobjList[0]);
int intWeek = Integer.parseInt(strWeek);
 
try {
return new Integer(intWeek);
} catch (ClassCastException e) {
Trace.traceAlways(e);
throw new UserExitInvalidArgumentException(
"Invalid date parameter passed to " + "the user exit: "
+ aobjList[0]);
}
}
}
You could use this function as a derived expression in the column mapping so that it is run on the target side. In this case, the starts() method expects to be passed a date or time stamp value and returns the number of the week as defined by the SimpleDateFormat class. The example class does not cater to null values in the passed date value. If you write a similar function for a column that contains a null value, you would need to plan for this situation in your code.
When you configure a table mapping using this function, it appears as shown in Figure 9-31.
Figure 9-31 InfoSphere CDC user exit for week of the year
When you look at the target table, the function has generated the values shown in Figure 9-32.
Figure 9-32 InfoSphere CDC user exit week of the year output
System i user exit for derived expressions
When defining a user exit program for System i, create an ILE program and have it called in the same activation group of InfoSphere CDC. The sample program, shown in Example 9-82, uses two input parameters. The first one is packed (5,0) and the second is packed (7,0). It is important that the two parameters that are passed are indeed packed values of the specified length and precision. If you try to pass a zoned number or any other type, the exit program might fail with a decimal data error.
Example 9-82 Sample user exit for derived expressions
* ============================================================ *
* Program name: TDRVFLD *
* *
* Synopsis : This InfoSphere CDC for System i user exit program splits *
* out the input parameter(s) and returns the sum *
* of the passed values. *
* *
* Parameters : The %USER function can have 1 output parameter *
* and a variable number of input parameters. *
* For each input parameter, a data structure *
* must be created and referred to in the *
* entry parameter list. *
* *
* Create remarks: CRTBNDRPG DFTACTGRP(*NO) ACTGRP(*CALLER) *
* CHGPGM USEADPAUT(*NO) *
* *
* ------------------------------------------------------------ *
* Changes made to this source *
* *
* Date Who Description *
* -------- --- ----------------------------------------------- *
* 20051018 FK Initial delivery
* ============================================================ *
HDATFMT(*ISO)
* ------------------------------------------------------------ *
* File definitions *
* ------------------------------------------------------------ *
* ------------------------------------------------------------ *
* Arrays and tables *
* ------------------------------------------------------------ *
* ------------------------------------------------------------ *
* Data structures and field definitions *
* ------------------------------------------------------------ *
* Output parameter
DpOut DS 256
D OutType 4b 0 * Data type
D OutLen 4b 0 * Length
D OutDigits 4b 0 * Digits
D OutDec 4b 0 * Decimal positions
D OutNull 4b 0 * Null indicator
D OutDatFmt 4 * Date format
D OutValue 9p 0 * Parameter value
* Input parameter 1 - Value 1
DpIn01 DS 256
D I01Type 4b 0 * Data type
D I01Len 4b 0 * Length
D I01Digits 4b 0 * Digits
D I01Dec 4b 0 * Decimal positions
D I01Null 4b 0 * Null indicator
D I01DatFmt 4 * Date format
D I01Value 5p 0 * Parameter value
* Input parameter 2 - Value 2
DpIn02 DS 256
D I02Type 4b 0 * Data type
D I02Len 4b 0 * Length
D I02Digits 4b 0 * Digits
D I02Dec 4b 0 * Decimal positions
D I02Null 4b 0 * Null indicator
D I02DatFmt 4 * Date format
D I02Value 7p 0 * Parameter value
* ------------------------------------------------------------ *
* Constants *
* ------------------------------------------------------------ *
* Data type of value to be returned
D#TypChar C CONST(1) * Character
D#TypDate C CONST(2) * Date
D#TypFloat C CONST(3) * Floating point
D#TypInt C CONST(4) * Integer
D#TypPacked C CONST(5) * Packed
D#TypTime C CONST(6) * Time
D#TypZoned C CONST(7) * Zoned decimal
* ------------------------------------------------------------ *
* Key lists *
* ------------------------------------------------------------ *
* ------------------------------------------------------------ *
* Parameter lists *
* ------------------------------------------------------------ *
C *Entry PList
C Parm pOut * Output parameter
C Parm pIn01 * Input parameter 01
C Parm pIn02 * Input parameter 02
* .... ...... * Input parameter 03
* ------------------------------------------------------------ *
* Main line *
* ------------------------------------------------------------ *
* Do user actions ...
* Prepare output parameter
C Eval OutType = #TypPacked * Packed
C Eval OutLen = 5 * Packed length
C Eval OutDigits = 9 * Digits
C Eval OutDec = 0 * Decimal places
C Eval OutNull = 0 * Null indicator
C Eval OutDatFmt = *Blanks * Date format
C Eval OutValue = I01Value * I02Value * Calculate value
C Return
* ------------------------------------------------------------ *
* *INZSR - Initialisation subroutine *
* ------------------------------------------------------------ *
C *INZSR BegSR
C EndSR
Derived expression user exits always return a value to InfoSphere CDC that is then used for row filtering or to populate a target column. Which type of return value is passed back to InfoSphere CDC is determined by completing the OutType data structure field with the correct data type constant value listed under the Constants section.
9.7.4 Table and row-level user exits
 
Important: User exit programs and stored procedures must not use COMMIT or ROLLBACK SQL statements; let InfoSphere CDC to manage the units of work. If you commit or roll back transactions in the InfoSphere CDC database session, you interfere with its bookmark processing, which could result in duplicated or missed transactions when restarting after failure.
Java user exit for row-level user exits
The UESoftDelete user exit class implements the UserExitIF interface and must be specified in the User Exits tab under the table mapping. Also, the before-delete exit point check box must be checked so that the user exit is starts (checking any of the check boxes also starts the user exit).
This section clarifies a couple of points that help you design your own user exits. During the initialization (using the init() method), the user exit evaluates the parameters that were passed, if any. An optimization has been built into the user exit to force it to update only a few columns in the row. This action optimizes the SQL statement and execution on the target.
Also, the user exit explicitly unsubscribes from all possible events and then subscribes to BEFORE_DELETE_EVENT. By specifying this event in the code, you prevent the situation occurring where the user exit is called on an exit point that it cannot handle. The SoftDelete user exit must only be used in the event of a delete operation, hence the registration for this event. The sample code for a soft delete user exit is shown in Example 9-83.
Example 9-83 Sample soft delete user exit
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.PreparedStatement;
import java.util.ArrayList;
 
// InfoSphere CDC specific imports
import com.datamirror.ts.target.publication.userexit.DataTypeConversionException;
import com.datamirror.ts.target.publication.userexit.ReplicationEventIF;
import com.datamirror.ts.target.publication.userexit.DataRecordIF;
import com.datamirror.ts.target.publication.userexit.ReplicationEventPublisherIF;
import com.datamirror.ts.target.publication.userexit.ReplicationEventTypes;
import com.datamirror.ts.target.publication.userexit.UserExitException;
import com.datamirror.ts.target.publication.userexit.UserExitIF;
 
/*
Overview:
This soft delete user exit will mark a row as inactive instead of performing a physical
delete. If the target row does not exist, a row will be inserted into the target table.
 
Assumptions:
- Delete operations have been disabled for this table.
- The &ENTTYP journal control field has been used to determine the value of the
"deleted" column
- Table has been configured for Adaptive Apply or Conflict Detection/Resolution
(Source wins)
- Table does not possess LOB/LONG columns.
 
Instructions
1. Copy the class files to the <cdc install directory>/lib directory
2. In Mapping Details->Operation, set the "On Delete" setting to "Do Not Delete".
3. In Mapping Details->User Exits,
Set "User Exit Type" to "Java Class"
Set "Class Name" to "UESoftDelete" (omit quotes)
In the "Events and Actions" section, tick the "Delete Before" checkbox
 
NOTE:
By default, when the user exit updates the target row, it will update all
target columns. For an optimized update operation you can specify the columns
you want updated in the "Parameter" field. Example: Set "Parameter" to
"ENTRY_TYPE,AUD_TIME,SRC_AUD_TIME" (omit quotes)
*/
public class UESoftDelete implements UserExitIF {
 
protected static final int[] eventsToSubscribe =
{ ReplicationEventTypes.BEFORE_DELETE_EVENT };
 
protected Boolean firstTime = true;
protected UETrace trace;
protected Connection dbConnection; // Connection to the target database
 
private String qualifiedTableName;
private String updateStatementString;
private String[] colNameList = null;
 
private ArrayList<Integer> keyColIndexes;
private ArrayList<Integer> updateColIndexes;
 
private PreparedStatement updateStatement;
private PreparedStatement insertStatement;
 
/**
 * Executes table level initialization
 *
 * @param publisher
 * - Handle to engine environment information
 */
public void init(ReplicationEventPublisherIF publisher)
throws UserExitException {
// Tracing is always switched on, can be disabled or parameterized
trace = new UETrace();
trace.init(true);
 
// Process parameter (list of columns that will be updated on soft
// delete)
if (publisher.getParameter() != null) {
colNameList = publisher.getParameter().split(",");
}
if (colNameList[0].equalsIgnoreCase("") || colNameList[0] == null) {
colNameList = null;
trace.write(this.getClass().getName()
+ ": Parameter list for UESoftDelete is empty, all columns "
+ "will be updated.");
} else {
trace.write(this.getClass().getName()
+ ": Columns that will be updated: "
+ publisher.getParameter());
}
 
// For the user exit program to subscribe only to the events which it
// can handle (before-delete).
// Even if the configuration in MC specifies that the user exit will be
// called on other
// exit points, the subscribeEvent overrides this.
publisher.unsubscribeEvent(ReplicationEventTypes.ALL_EVENTS);
for (int i = 0; i < eventsToSubscribe.length; i++) {
publisher.subscribeEvent(eventsToSubscribe[i]);
}
}
 
/**
 * Method: processReplicationEvent -> Table event processing Purpose:
 * handles the change events at the table level
 *
 * @param event
 * - Handle to the change record
 */
public boolean processReplicationEvent(ReplicationEventIF event)
throws UserExitException {
DataRecordIF image = event.getBeforeData();
if (firstTime) {
/*
* This is the first time the user exit is being starts for this
* table mapping. Initialize all the SQL statements that will be
* executed
*/
qualifiedTableName = event.getTablePath() + "."
+ event.getTableName();
if (dbConnection == null) {
// Get the shared JDBC database connection with the target
// database (CDC 6.5)
dbConnection = event.getSharedConnection();
}
getUpdateColumnIndexes(image);
getKeyList(image);
prepareUpdateStatement(image);
firstTime = false;
}
 
/*
* When the event is BEFORE-DELETE, update the row
*/
switch (event.getEventType()) {
case ReplicationEventTypes.BEFORE_DELETE_EVENT:
performUpdate(image);
default:
}
 
return true;
}
 
/**
 * Retrieves the indexes for the key columns of the target tables. This
 * information is used to build the WHERE clause of UPDATE statements.
 *
 * @param image
 * - The row image
 */
private boolean getKeyList(DataRecordIF image) throws UserExitException {
keyColIndexes = new ArrayList<Integer>();
 
for (int i = 1; i <= image.getColumnCount(); i++) {
if (image.isKey(i)) {
keyColIndexes.add(i);
}
}
return true;
}
 
/**
 * Retrieves the indexes for columns that need to be updated
 *
 * @param image
 * - The row image
 */
private boolean getUpdateColumnIndexes(DataRecordIF image)
throws UserExitException {
updateColIndexes = new ArrayList<Integer>();
 
// Implements the functionality that you can specify the columns to be
// updated
if (colNameList != null && colNameList.length != 0) {
// User has specified specific columns they want updated
for (int j = 0; j < colNameList.length; j++) {
for (int i = 1; i <= image.getColumnCount(); i++) {
if (image.getColumnName(i).equalsIgnoreCase(colNameList[j])) {
updateColIndexes.add(i);
break;
}
}
}
} else {
// Add all columns for update if no parameters are specified
for (int i = 1; i <= image.getColumnCount(); i++) {
updateColIndexes.add(i);
}
}
return true;
}
 
/**
 * Prepares the UPDATE statement that will be used to mark a row as
 * "deleted". This method is the optimized one where none of the key fields
 * are NULL. All key values can be bound to the key fields in the WHERE
 * statement and the user exit does not have to rebuild the UPDATE statement
 * every time.
 *
 * @param image
 * - The row image
 */
private void prepareUpdateStatement(DataRecordIF image)
throws UserExitException {
updateStatementString = "UPDATE " + qualifiedTableName + " SET ";
// build the SET clause
for (int i = 0; i < updateColIndexes.size(); i++) {
if (i > 0) {
updateStatementString += ", ";
}
updateStatementString += """
+ image.getColumnName(updateColIndexes.get(i)) + "" = ? ";
}
// Build the WHERE clause
updateStatementString += " WHERE ";
for (int i = 0; i < keyColIndexes.size(); i++) {
if (i > 0) {
updateStatementString += " AND ";
}
updateStatementString += """
+ image.getColumnName(keyColIndexes.get(i)) + """
+ " = ? ";
}
try {
updateStatement = dbConnection
.prepareStatement(updateStatementString);
} catch (SQLException e) {
e.printStackTrace();
throw new UserExitException(e.getMessage());
}
 
trace.write(updateStatementString);
return;
}
 
/**
 * Prepares the UPDATE statement if columns in the WHERE clause are NULL. In
 * that case SQL requires that you specify "<column> IS NULL" instead of
 * "<column>=?". As the user exit cannot know in advance which key values
 * are null, the UPDATE statement is dynamically built for every update that
 * has to occur.
 *
 * @param image
 * - The row image
 */
private void dynamicUpdate(DataRecordIF image) throws UserExitException {
String dynamicUpdateStatementString = "UPDATE " + qualifiedTableName
+ " SET ";
PreparedStatement dynamicStatement;
// Build the SET clause
for (int i = 0; i < updateColIndexes.size(); i++) {
if (i > 0) {
dynamicUpdateStatementString += ", ";
}
dynamicUpdateStatementString += """
+ image.getColumnName(updateColIndexes.get(i)) + "" = ? ";
}
// Build the WHERE clause
dynamicUpdateStatementString += " WHERE ";
ArrayList<Integer> keyIndexes = new ArrayList<Integer>();
int keyCount = 0;
for (int i = 1; i <= image.getColumnCount(); i++) {
if (image.isKey(i)) {
if (keyCount > 0) {
dynamicUpdateStatementString += " AND ";
}
// Specify the column name <column>=? or <column> IS NULL
if (!image.isNull(i)) {
dynamicUpdateStatementString += """
+ image.getColumnName(i) + """ + " = ? ";
keyIndexes.add(i);
} else {
dynamicUpdateStatementString += """
+ image.getColumnName(i) + """ + " IS NULL ";
}
}
}
// Prepare the dynamic update statement
try {
dynamicStatement = dbConnection
.prepareStatement(dynamicUpdateStatementString);
} catch (SQLException e) {
e.printStackTrace();
throw new UserExitException(e.getMessage());
}
 
int parameterCount = 1;
for (int i = 0; i < updateColIndexes.size(); i++) {
try {
dynamicStatement.setObject(parameterCount, image
.getObject(updateColIndexes.get(i)), dynamicStatement
.getParameterMetaData()
.getParameterType(parameterCount));
parameterCount++;
} catch (Exception e) {
throw new UserExitException(e.getMessage());
}
}
 
for (int i = 0; i < keyIndexes.size(); i++) {
try {
dynamicStatement.setObject(parameterCount,
image.getObject(keyIndexes.get(i)));
parameterCount++;
} catch (Exception e) {
throw new UserExitException(e.getMessage());
}
}
// Update the row
try {
dynamicStatement.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
throw new UserExitException(e.getMessage());
}
// Close the statement
try {
dynamicStatement.close();
} catch (SQLException e) {
e.printStackTrace();
throw new UserExitException(e.getMessage());
}
return;
}
 
/**
 * Performs the actual soft delete of the row (update with the passed
 * values).
 *
 * @param image
 * - The row image
 * @throws UserExitException
 */
private void performUpdate(DataRecordIF image) throws UserExitException {
trace.write(this.getClass().getName() + ": Soft deleting row");
int parameterCount = 1;
 
for (int i = 0; i < updateColIndexes.size(); i++) {
try {
updateStatement.setObject(parameterCount, image
.getObject(updateColIndexes.get(i)), updateStatement
.getParameterMetaData()
.getParameterType(parameterCount));
parameterCount++;
} catch (Exception e) {
throw new UserExitException(e.getMessage());
}
}
// Prepare the static update statement or starts the dynamic update
// statement
for (int i = 0; i < keyColIndexes.size(); i++) {
try {
if (image.getObject(keyColIndexes.get(i)) != null) {
updateStatement.setObject(parameterCount,
image.getObject(keyColIndexes.get(i)));
parameterCount++;
} else {
dynamicUpdate(image);
return;
}
} catch (SQLException e) {
e.printStackTrace();
throw new UserExitException(e.getMessage());
} catch (DataTypeConversionException e) {
e.printStackTrace();
throw new UserExitException(e.getMessage());
}
}
// Execute the prepared (static) statement
try {
trace.write("Executing Update");
if (updateStatement.executeUpdate() == 0) {
return;
}
} catch (SQLException e) {
e.printStackTrace();
throw new UserExitException(e.getMessage());
}
return;
}
 
/**
 * Performs any required cleanup processing, starts on subscription
 * shutdown. Also starts on table-level user exit
 */
public void finish() {
// perform any table specific cleanup
if (updateStatement != null) {
try {
updateStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return;
}
}
The processReplicationEvent() method is starts for every event to which the user exit is subscribed. During first time processing, the method obtains a shared connection to the target database from the InfoSphere CDC engine so that it can use the same session InfoSphere CDC already uses to update the target tables. Additionally, it obtains the list of columns that must be populated in the SET clause of the UPDATE statement and the list of key columns for the
WHERE clause.
Eventually, when the before-delete event is detected and the processReplicationEvent() method is called, the method does an update of the existing row instead of an update. There are two versions of the update method, one static, which uses a pre-built PreparedStatement object, and the other one dynamic, which builds the UPDATE statement on every execution. Dynamic building of the prepared statement is less efficient than using a pre-built statement. However, if there are NULL values in the key columns, SQL requires that the WHERE clause includes the IS NULL for the column. As it is impossible to know in advance which key column is null, if at all, the UPDATE statement must be built for every execution. If there are no NULLable key columns, the pre-built PreparedStatement is always used.
In various methods in the UESoftDelete class, trace records are written to the InfoSphere CDC trace files, which are kept in the <cdc_home>/instance/<instance>/log directory. This class uses the com.datamirror.ts.util.Trace class to combine the InfoSphere CDC tracing. The sample code for this tracing utility is shown in Example 9-84.
Example 9-84 Sample tracing utility for user exits
import com.datamirror.ts.util.Trace;
 
/**
* Tracing facility for user exit
*/
public class UETrace {
boolean enabled = false;
 
/**
 * Initializes the tracing facility
 */
public void init(boolean enabled) {
this.enabled = enabled;
}
 
/**
 * Writes a trace message
 *
 * @param message - Messag to write to the trace
 */
public void write(String message) {
if (enabled) {
// Piggyback on the InfoSphere CDC logging facility
Trace.traceAlways(message);
}
return;
}
 
/**
 * Cleanup for trace facility -> not used in this implementation
 */
public void close() {
}
}
 
After refreshing the table and removing some of the rows on the source, the target table would look something like Figure 9-33.
Figure 9-33 Sample soft delete output
Stored procedure user exit for row-level operations
Stored procedure row-level user exits, shown in Example 9-85, are starts just before or after InfoSphere CDC applies the operation to the target table. If you specify a stored procedure user exit and disable the operation that it is attached to, the user exit is still starts and can therefore be run instead of the operation being applied by InfoSphere CDC.
Example 9-85 Sample table for row-level user exit
CREATE TABLE CDCDEMO.RISKY_CUSTOMER (
CUSTNO DECIMAL(6,0), CRLIMIT DECIMAL(7,0), UPD_TIMESTAMP TIMESTAMP)
IN USERSPACE1
Example 9-83 on page 383 adds a row to the CDCDEMO.RISKY_CUSTOMER table, but only if the credit limit of the customer has exceeded 10,000. The inserted row logs the customer number, credit limit, and time stamp of update at the source.
The stored procedure is defined as shown in Example 9-86.
Example 9-86 Sample stored procedure
CREATE OR REPLACE PROCEDURE CDCDEMO.LOG_RISKY_CUSTOMER (
OUT result INT, OUT returnMsg CHAR,
IN a$CUSTNO DECIMAL(6,0), IN a$CRLIMIT DECIMAL(7,0), IN j$TIMSTAMP VARCHAR(26))
BEGIN
if a$CRLIMIT>=10000 then
insert into CDCDEMO.RISKY_CUSTOMER values(a$CUSTNO,
a$CRLIMIT, TIMESTAMP(j$TIMSTAMP));
end if;
set result=0;
set returnMsg='Row inserted';
END@
Defining the stored procedure to be called means specifying the schema where it is located and the name of the stored procedure at the exit points that you want it to be (Figure 9-34).
Figure 9-34 User exit stored procedure configuration
When running the subscription and changing a number of source rows that have or attain a credit limit of 10,000 or higher, the stored procedure starts populating the RISKY_CUSTOMER table (Figure 9-35).
Figure 9-35 Sample row stored procedure output
System i RPG user exit for row-level operations
Row-level and table-level user exits for InfoSphere CDC on System i are handled using a single interface in which the apply process passes a fixed set of arguments to the user exit program, such as the before and after data images, the publisher ID, information about the operation type (entry type), and journal control columns. The data images are sent to the program as fixed-length strings in the format of the target table and the user exit program is responsible for unraveling the individual columns. The easiest way to perform this task depends on the language in which you have developed your user exit. RPG allows you to create an external data structure that maps the flat input to individual fields.
The example user exit must be customized per source and target table for which you want to soft delete rows. However, you can change the program so that a single program accommodates all possible source and target tables. The before image for the delete is mapped onto a source table structure using the BfrImg data structure, and the journal control columns are also populated. By interrogating the journal control column for the file name, you can determine which mapping must take place.
The TGTCUST target table is opened when the program is called for the first time, and because the Last Record indicator is not set on, the file remains opened and the program active. If you want to use one program to handle soft deletes on multiple target tables, dynamically open the files, as soft delete operations must be applied to limit the amount of resources taken by the InfoSphere CDC apply process.
The sample code for performing soft deletes in a table is shown in Example 9-87.
Example 9-87 Sample code for soft deletes in a table
 * ============================================================================================= *
* Program name: TROWSFTDLT *
* *
* Synopsis : This user exit program is an example of how to handle soft deletes in a table. *
* *
* We assume that the subscription replicates the CUSTOMER table to the TGTCUST table on
* the target system. Inserts and Updates are defined as normal operation. However, for the*
* Delete operation "No Action" is specified and this program is specified as the Before
* Delete user exit program. *
* *
 * In effect, the program checks the row operation it was starts for. If it was for *
* a delete, the target table's fields TXTYP and TXDTS are updated with journal control *
* columns &ENTTYP and &TIMSTAMP *
* *
* Create parms: CRTBNDRPG DFTACTGRP(*NO) ACTGRP(*CALLER) *
* After create: CHGPGM USEADPAUT(*NO) *
* *
* ------------------------------------------------------------ *
* *
* ------------------------------------------------------------ *
* Changes made to this source *
* *
* Date Who Description *
* -------- --- ----------------------------------------------- *
* 20061122 FK Initial delivery
* ============================================================ *
HDATFMT(*ISO) DFTACTGRP(*NO) ACTGRP(*CALLER)
* ------------------------------------------------------------ *
* File definitions *
* ------------------------------------------------------------ *
* Customer file on the target database, equipped with
* Transaction type and Transaction timestamp
FTGTCUST UF E K DISK
* ------------------------------------------------------------ *
* Arrays and tables *
* ------------------------------------------------------------ *
* ------------------------------------------------------------ *
* Data structures and field definitions *
* ------------------------------------------------------------ *
* Binary field definitions
DEntTypB DS 4
D EntTyp 1 4B 0
DBfrNbrNullB DS 4
D BfrNbrNull 1 4B 0
DAftNbrNullB DS 4
D AftNbrNull 1 4B 0
DTSBufLenB DS 4
D TSBufLen 1 4B 0
DTSNbrNullB DS 4
D TSNbrNull 1 4B 0
* Before Image for record (mapped to external structure for original table, CUSTOMER)
DBfrImg E DS ExtName(CUSTOMER) Prefix(Bfr)
* After Image for record (mapped to external structure for
* original table, CUSTOMER)
DAftImg E DS ExtName(CUSTOMER) Prefix(Aft)
* Before Image for journal control fields
DBfrJrnCtl DS
D JBEntLen 5 * Entry length
D JBSeqNbr 10 * J/E Sequence #
D JBJrnNam 10 * Journal name
D JBJrnLib 10 * Journal library
D JBRcvNam 10 * Receiver name
D JBRcvLib 10 * Receiver library
D JBJrnCde 1 * Journal code
D JBEntTyp 2 * Entry type
D JBSysEnt 2 * System J/E
D JBTimStp 26 * Time stamp
D JBTimStpDt 10 OVERLAY(JBTimStp:1) * Time stamp date
D JBTimStpHr 2 OVERLAY(JBTimStp:12) $ Time stamp hour
D JBJobNam 10 * Job name
D JBJobUsr 10 * Job user
D JBJobNbr 6 * Job number
D JBPgmNam 10 * Program name
D JBFilNam 10 * File name
D JBFilLib 10 * File library
D JBFilMbr 10 * File member
D JBRRN 10 * Relative record
D JBFlg 1 * Flag 1 or 0
D JBCmtID 10 * Commit cycle
D JBUsrPrf 10 * User profile
D JBSysNam 8 * System name
* After Image for journal control fields
DAftJrnCtl DS
D JAEntLen 5 * Entry length
D JASeqNbr 10 * J/E Sequence #
D JAJrnNam 10 * Journal name
D JAJrnLib 10 * Journal library
D JARcvNam 10 * Receiver name
D JARcvLib 10 * Receiver library
D JAJrnCde 1 * Journal code
D JAEntTyp 2 * Entry type
D JASysEnt 2 * System J/E
D JATimStp 26 * Time stamp
D JATimStpDt 10 OVERLAY(JATimStp:1) * Time stamp date
D JATimStpHr 2 OVERLAY(JATimStp:12) $ Time stamp hour
D JAJobNam 10 * Job name
D JAJobUsr 10 * Job user
D JAJobNbr 6 * Job number
D JAPgmNam 10 * Program name
D JAFilNam 10 * File name
D JAFilLib 10 * File library
D JAFilMbr 10 * File member
D JARRN 10 * Relative record
D JAFlg 1 * Flag 1 or 0
D JACmtID 10 * Commit cycle
D JAUsrPrf 10 * User profile
D JASysNam 8 * System name
* ------------------------------------------------------------ *
* Constants *
* ------------------------------------------------------------ *
D#BfrClr C CONST(1)
D#AftClr C CONST(2)
D#BfrIns C CONST(3)
D#AftIns C CONST(4)
D#BfrUpd C CONST(5)
D#AftUpd C CONST(6)
D#BfrDlt C CONST(7)
D#AftDlt C CONST(8)
D#BfrRsh C CONST(9)
D#AftRsh C CONST(10)
D#None C CONST('*N')
* ------------------------------------------------------------ *
* Key lists *
* ------------------------------------------------------------ *
* Key list target customer table
C KeyCus KList
C KFld BfrCUSTNO * Customer number
* ------------------------------------------------------------ *
* Parameter lists *
* ------------------------------------------------------------ *
C *ENTRY Plist
C Parm RtnCde 10 * Return code
C Parm PgmName 10 * Program name
C Parm EntTypB * Entry type
C Parm BfrImg * Before Image
C Parm AftImg * After Image
C Parm BfrNbrNullB * # of Null in Bfr
C Parm AftNbrNullB * # of Null in Aft
C Parm BfrNull 1 * Null in Bfr
C Parm AftNull 1 * Null in Aft
C Parm BfrJrnCtl * Bfr Journal Ctl
C Parm AftJrnCtl * Aft Journal Ctl
C Parm TSBufLenB * TS Buffer length
C Parm TSBuf 158 * TS Buffer
C Parm TSNbrNullB * # of Null TS Buf
C Parm TSNull 5 * Null in TS Buf
C Parm TSSrcID 8 * TS Source system
* ------------------------------------------------------------ *
* Main line *
* ------------------------------------------------------------ *
* Determine type of transaction
C Select
* Clear
C EntTyp WhenEQ #BfrClr * Before clear
C EntTyp OrEQ #AftClr * After clear
* Insert
C EntTyp WhenEQ #BfrIns * Before clear
C EntTyp OrEQ #AftIns * After clear
* Update
C EntTyp WhenEQ #BfrUpd * Before clear
C EntTyp OrEQ #AftUpd * After clear
* Delete
C EntTyp WhenEQ #BfrDlt * Before clear
C EntTyp OrEQ #AftDlt * After clear
* Find the record in the target table and update Transaction type/Timestamp
C KeyCus Chain TGTCUSTR 95
C *In95 IfEQ *Off
C Eval TXTYP=JBEntTyp * &ENTTYP
C Eval TXDTS=JBTimStp * &TIMSTAMP
C Update TGTCUSTR
C EndIf
C EndSL
C Return
* ------------------------------------------------------------ *
* *INZSR - Initialisation subroutine *
* ------------------------------------------------------------ *
C *INZSR BegSR
C EndSR
Configuration of this user exit program in the Management Console is shown in Figure 9-36. The table has been mapped for Adaptive Apply, which ensures that if a customer that has been soft deleted on the target is reinserted, the row on the target is overwritten. In the column mapping, the &ENTTYP journal control column is mapped to the TXTYP column. A soft deleted row on the target has DL in this column. Finally, the standard delete operation is disabled (Do Not Delete) and the Before Delete user exit is configured to call the TROWSFTDLT user exit program. As stated before, the user exit program must be found in the library list of the DMCJOBD job description or in the InfoSphere CDC installation library.
Figure 9-36 User exit configuration
9.7.5 Subscription-level (unit of work)
In some cases, your implementation might require committed transactions that are delivered to a non-database target, such as a message queue or web service. Row-level user exits can enhance or replace InfoSphere CDC apply processing and run custom code based on insert, update, delete, or even table level events. The subscription-level user exits take your user exits a step further by letting you to ignite a process based on the unit of work that was read from the source database.
For example, your customers fill their shopping cart on your website. When the first item is added to the shopping cart, an order header row is created in the underlying database. As the customer adds items to the shopping cart, rows are inserted into the order detail table and, after the checkout process is started, your business application commits the transaction into the database and provides a unit of work.
Assume that you want to place the completed transaction as a single message on a message queue (or enterprise service bus). InfoSphere CDC row-level user exits allow you to pick up the individual database operations and run them, but you do not know when the last item has been placed in the shopping cart to complete the transaction.
The InfoSphere CDC subscription-level user exit point provides a method that is starts when the transaction is committed by the subscription. You can implement this method to start an action based on a transaction that was prepared in the row-level user exit points. Referring to the previous example, your row-level user exit points start building the message to be sent (in XML or other format). When the commit is started by InfoSphere CDC, the subscription-level user exit takes the message that was built, appends any closing tags in the case of XML, and places it onto a message queue.
In Example 9-88, the sample code builds an XML document (flat file) for each transaction sent from the source side.
Example 9-88 Sample code to build an XML document for each transaction
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.*;
 
import com.datamirror.ts.target.publication.userexit.*;
 
public class CDCTransactionFileWriter implements UserExitIF,
SubscriptionUserExitIF {
 
static final String FILE_TIMESTAMP_FORMAT = "yyyyMMdd_HHmmssSSS";
private boolean calledAtSubscriptionLevel = false;
protected SubscriptionContext subscriptionContext; // shared between all
// instances
private UETrace trace;
private String publisherID;
 
/**
* Subscription-level initialization.
*
* This method is called once when the subscription is started and
* initializes the subscription context. Also, it ensures that the
* processSubscriptionEvent method is starts before every commit.
*
* @param publisher
* - Handle to the event publisher; this parameter can only be
* used during this method to subscribe to certain events.
*/
public void init(SubscriptionEventPublisherIF publisher)
throws UserExitException {
// Initialize the subscription context
subscriptionContext = (SubscriptionContext) publisher
.getUserExitSubscriptionContext();
if (subscriptionContext == null) {
// this is only called once during initialization
// an object that calls getUserExitSubscriptionContext will be
// passed this context we are creating
subscriptionContext = createContext(publisher.getSourceSystemID());
publisher.setUserExitSubscriptionContext(subscriptionContext);
}
calledAtSubscriptionLevel = true;
trace = subscriptionContext.trace;
publisherID = subscriptionContext.publisherID;
trace.write("Subscription-level init() start");
publisher.unsubscribeEvent(SubscriptionEventTypes.ALL_EVENTS);
publisher.subscribeEvent(SubscriptionEventTypes.BEFORE_COMMIT_EVENT);
trace.write("Subscription-level init() end");
}
 
/**
* Executed when a subscription-level event is detected (commit). This
* method writes the ending tag to the XML message and closes the current
* output file.
*
* @param subscriptionEvent
* - handle to subscription event
*/
public boolean processSubscriptionEvent(
SubscriptionEventIF subscriptionEvent) throws UserExitException {
trace.write("processSubscriptionEvent() started");
trace.write("Subscription event: "
+ getSubscriptionEventTypeAsString(subscriptionEvent
.getEventType()));
trace.write("Commit reason: "
+ getSubscriptionCommitReasonAsString(subscriptionEvent
.getCommitReason()));
closeOutputFile();
return true;
}
 
/**
* Table-level initialization.
*
* This method is called once for every mapped table at subscription
* startup. It first retrieves the subscription context and then registers
* the events it wants to listen to.
*
* @param eventPublisher
* - Handle to engine environment information
*/
public void init(ReplicationEventPublisherIF eventPublisher) {
// Retrieve the subscription-level context
subscriptionContext = (SubscriptionContext) eventPublisher
.getUserExitSubscriptionContext();
trace = subscriptionContext.trace;
publisherID = subscriptionContext.publisherID;
trace.write("Table-level init() start");
 
// Subscribe to After-Insert/Update/Delete events
eventPublisher.unsubscribeEvent(ReplicationEventTypes.ALL_EVENTS);
eventPublisher.subscribeEvent(ReplicationEventTypes.AFTER_INSERT_EVENT);
eventPublisher.subscribeEvent(ReplicationEventTypes.AFTER_UPDATE_EVENT);
eventPublisher.subscribeEvent(ReplicationEventTypes.AFTER_DELETE_EVENT);
 
trace.write("Table-level init() end");
}
 
/**
* Executed when table-level event is detected (insert/update/delete). This
* method writes an XML entry for the table-level operation to the currently
* open output file. If there is no open file, it creates a new output file
* on the fly.
*
* @param replicationEvent
* - Handle to replication event
* @return true - This flag indicates whether the default operation should
* be applied (true) or not (false). As the event is only called
* after insert/update/delete, the returned value is of no
* importance
*/
public boolean processReplicationEvent(ReplicationEventIF replicationEvent)
throws UserExitException {
trace.write("processReplicationEvent() start");
String tableName = replicationEvent.getTableName();
String entryType = replicationEvent.getJournalHeader().getEntryType();
DataRecordIF beforeImage = replicationEvent.getBeforeData();
DataRecordIF afterImage = replicationEvent.getData();
trace.write("Table: " + tableName);
trace.write("Operation type: " + entryType);
// If there is no open file, create one on the fly to write the XML
// records to
if (subscriptionContext.printStream == null) {
createNewOutputFile();
}
// Write the table-level information
subscriptionContext.printStream.println(" <table" + tableName + ">");
subscriptionContext.printStream.println(" <tableOperation>"
+ entryType + "</tableOperation>");
// Write column-level information for the before-image (update + delete)
if (beforeImage != null) {
for (int i = 1; i <= beforeImage.getColumnCount(); i++) {
try {
subscriptionContext.printStream.println(" <before"
+ beforeImage.getColumnName(i)
+ ">"
+ formatXmlContent(beforeImage.getObject(i)
.toString()) + "</before"
+ beforeImage.getColumnName(i) + ">");
} catch (DataTypeConversionException e) {
trace.write(e.getMessage());
}
}
}
// Write column-level information for the after-image (insert+update)
if (afterImage != null) {
for (int i = 1; i <= afterImage.getColumnCount(); i++) {
try {
subscriptionContext.printStream.println(" <after"
+ afterImage.getColumnName(i)
+ ">"
+ formatXmlContent(afterImage.getObject(i)
.toString()) + "</after"
+ afterImage.getColumnName(i) + ">");
} catch (DataTypeConversionException e) {
trace.write(e.getMessage());
}
}
}
// Write the ending tag for the tabel-level information
subscriptionContext.printStream.println(" </table"
+ replicationEvent.getTableName() + ">");
return true;
}
 
/**
* This method is called for both subscription-level and table-level
* clean-up. It will close the current output file.
*/
public void finish() {
if (calledAtSubscriptionLevel) {
trace.write("finish() start");
closeOutputFile();
trace.write("finish() end");
}
return;
}
 
/**
* Formats the content of an XML element and substitutes special characters
* with mark-up replacements.
*
* @param content
* - Input element content.
* @return Contents with special characters marked up.
*/
private String formatXmlContent(String content) {
String markedUpString = null;
markedUpString = content;
markedUpString = markedUpString.replaceAll(""", "&quot;");
markedUpString = markedUpString.replaceAll("'", "&apos;");
markedUpString = markedUpString.replaceAll("&", "&amp;");
markedUpString = markedUpString.replaceAll("<", "&lt;");
markedUpString = markedUpString.replaceAll(">", "&gt;");
return markedUpString;
}
 
/**
* Translates the subscription event to a readable text string (mainly for
* debugging).
*
* @param eventType
* - Type of the subscription event
* @return Event type description
*/
private String getSubscriptionEventTypeAsString(int eventType) {
if (eventType == SubscriptionEventTypes.BEFORE_COMMIT_EVENT)
return "BEFORE_COMMIT_EVENT";
else if (eventType == SubscriptionEventTypes.AFTER_COMMIT_EVENT)
return "AFTER_COMMIT_EVENT";
else if (eventType == SubscriptionEventTypes.AFTER_EVENT_SHIFT)
return "AFTER_EVENT_SHIFT";
else
return "UNKNOWN_SUBSCRIPTION_EVENT_TYPE";
}
 
/**
* Translates the commit reason to a readable text string (mainly for
* debugging).
*
* @param commitReason
* - Reason for committing the transaction
* @return Commit reason description
*/
private String getSubscriptionCommitReasonAsString(int commitReason) {
if (commitReason == CommitReasonTypes.SOURCE_COMMIT)
return "SOURCE_COMMIT";
else if (commitReason ==
CommitReasonTypes.OPERATION_WITHOUT_COMMITMENT_CONTROL)
return "OPERATION_WITHOUT_COMMITMENT_CONTROL";
else if (commitReason == CommitReasonTypes.REFRESH)
return "REFRESH";
else if (commitReason == CommitReasonTypes.REPORT_POSITION)
return "REPORT_POSITION";
else if (commitReason == CommitReasonTypes.INTERIM_COMMIT)
return "INTERIM_COMMIT";
else if (commitReason == CommitReasonTypes.SHUTDOWN)
return "SHUTDOWN";
else
return "UNKNOWN_COMMIT_REASON";
}
 
/**
* Creates an output file to hold the XML representation of a database
* transaction. The method creates a file with the name
* <PublisherID>_yyyyMMdd_HHmmssSSS.xml and prepares the XML heading. Other
* methods subsequently write the XML records to the output file.
*
* @throws UserExitException
*/
private void createNewOutputFile() throws UserExitException {
Calendar cal = Calendar.getInstance();
SimpleDateFormat sdf = new SimpleDateFormat(FILE_TIMESTAMP_FORMAT);
String timestampSuffix = sdf.format(cal.getTime());
 
// Compose name of output file
subscriptionContext.outputFileName = publisherID + "_"
+ timestampSuffix + ".xml";
trace.write("Name of work file: " + subscriptionContext.outputFileName);
 
try {
subscriptionContext.printStream = new PrintStream(
subscriptionContext.outputFileName);
} catch (FileNotFoundException e) {
trace.write(e.getMessage());
throw new UserExitException("Error creating output file "
+ subscriptionContext.outputFileName);
}
trace.write("Output file " + subscriptionContext.outputFileName
+ " created");
subscriptionContext.printStream
.println("<?xml version="1.0" encoding="UTF-8"?>");
subscriptionContext.printStream.println("<transaction>");
}
 
/**
* Writes the ending tag for the XML transaction and closes the current
* output file.
*/
private void closeOutputFile() {
if (subscriptionContext.printStream != null) {
subscriptionContext.printStream.println("</transaction>");
trace.write("Closing output file: "
+ subscriptionContext.outputFileName);
subscriptionContext.printStream.close();
}
subscriptionContext.printStream = null;
}
 
/**
* This subclass is used to maintain the overall subscription context as
* this user exit is instantiated at the subscription (target) level and for
* all tables. As the same class is used at the subscription and table level
* we have chosen to create a subclass to create a subclass for the
* subscription context.
*/
protected class SubscriptionContext {
protected UETrace trace; // trace object
protected String publisherID; // Publisher ID for subscription
protected String outputFileName; // Current output file name
protected PrintStream printStream; // Current output print stream
}
 
/**
* Initializes the context for the subscription. This method is only called
* once at the subscription level. At the table level, the
* SubscriptionContext object is retrieved only.
*
* @param publisherID
* - The publisher ID of the subscription
*/
protected SubscriptionContext createContext(String publisherID) {
SubscriptionContext context = new SubscriptionContext();
context.publisherID = new String(publisherID);
context.trace = new UETrace();
context.trace.init(true);
context.trace.write("Context created for publisher ID " + publisherID);
context.outputFileName = null;
context.printStream = null;
return context;
}
}
In the InfoSphere CDC configuration, the Java user exit class is specified both at the subscription level and the table level. Both the subscription-level and table-level interfaces are implemented by the user exit, SubscriptionUserExitIF and UserExitIF. For the functionality intended by the user exit, it is imperative to register it at both levels. If you do not configure the subscription-level user exit, the table-level entry points do not have all information available to properly write the XML records.
When the subscription is started, the target side instantiates a CDCTransactionFileWriter object and immediately starts the init(SubscriptionEventPublisherIF) method (subscription-level), which has the primary task of creating a subscription context object to be shared with the table-level objects. Then, for each mapped table that has the CDCTransactionFileWrite specified as the user exit, a table-level object is instantiated and the init(ReplicationEventPublisherIF) method is starts. This method registers the after-insert, after-update, and after-delete exit points for the table in question. Every insert, update, and delete operation on the table in question causes the processReplicationEvent() method to be starts. This method first checks whether there already is an open file for this subscription, which is done by checking the subscriptionContext.printStream object. If there is no open file, a new file is created dynamically and opened. Then, the insert, update, or delete operation for the table in question is written as an XML format structure. When a commit is received from the source side, the processSubscriptionEvent() method is called, which closes the transaction tag and then closes the file.
The configuration of the CDCTransactionFileWriter user exit is shown in Figure 9-37.
Figure 9-37 InfoSphere CDC user exit subscription level
When doing two updates in a single unit of work, the resulting XML appears as shown in Example 9-89.
Example 9-89 Two updates in a single unit of work
<?xml version="1.0" encoding="UTF-8"?>
<transaction>
<tableCUSTOMER>
<tableOperation>UP</tableOperation>
<beforeCUSTNO>987560</beforeCUSTNO>
<beforeBRANCH>11</beforeBRANCH>
<beforeNAME1>CALIFORNIA SPA & FITNESS</beforeNAME1>
<beforeNAME2>abc12345444</beforeNAME2>
<beforeADDRESS1>100 SANDYHOOK SQ.</beforeADDRESS1>
<beforeADDRESS2> </beforeADDRESS2>
<beforeCITY>UPLAND</beforeCITY>
<beforeSTATE>CA</beforeSTATE>
<beforeSTATUS>A</beforeSTATUS>
<beforeCRLIMIT>7506</beforeCRLIMIT>
<beforeBALANCE>6500</beforeBALANCE>
<beforeREPNO>251</beforeREPNO>
<afterCUSTNO>987560</afterCUSTNO>
<afterBRANCH>11</afterBRANCH>
<afterNAME1>CALIFORNIA SPA & FITNESS</afterNAME1>
<afterNAME2>abc12345444</afterNAME2>
<afterADDRESS1>100 SANDYHOOK SQ.</afterADDRESS1>
<afterADDRESS2> </afterADDRESS2>
<afterCITY>UPLAND</afterCITY>
<afterSTATE>CA</afterSTATE>
<afterSTATUS>A</afterSTATUS>
<afterCRLIMIT>7507</afterCRLIMIT>
<afterBALANCE>6500</afterBALANCE>
<afterREPNO>251</afterREPNO>
</tableCUSTOMER>
<tablePRODUCT>
<tableOperation>UP</tableOperation>
<beforePRODUCTID>100</beforePRODUCTID>
<beforeDESCRIPTN>White paper 8.5 by 11</beforeDESCRIPTN>
<beforeLOCATION>Aisle 5</beforeLOCATION>
<beforeSTATUS>O</beforeSTATUS>
<beforeUNITPRICE>7.00</beforeUNITPRICE>
<beforeUNITCOST>2200.00</beforeUNITCOST>
<beforeQTYONHAND>17850</beforeQTYONHAND>
<beforeQTYALLOC>50</beforeQTYALLOC>
<beforeQTYMINORD>5000</beforeQTYMINORD>
<afterPRODUCTID>100</afterPRODUCTID>
<afterDESCRIPTN>White paper 8.5 by 11</afterDESCRIPTN>
<afterLOCATION>Aisle 5</afterLOCATION>
<afterSTATUS>O</afterSTATUS>
<afterUNITPRICE>8.00</afterUNITPRICE>
<afterUNITCOST>2200.00</afterUNITCOST>
<afterQTYONHAND>17850</afterQTYONHAND>
<afterQTYALLOC>50</afterQTYALLOC>
<afterQTYMINORD>5000</afterQTYMINORD>
</tablePRODUCT>
</transaction>
9.7.6 Java user exit for flat file custom formatter
When using InfoSphere CDC to deliver flat files for consumption by external applications, the target engine is often InfoSphere CDC for DataStage. This engine can generate flat files and has additional functionality to automatically close and make the flat files available based on time or number of rows.
Flat files that are generated by InfoSphere CDC for DataStage have the
following characteristics:
Journal control information written as the first few columns on every line
Characters written in UTF-8 encoding
Columns that are separated by a comma
Columns that are delimited by a double quotation mark
This fixed output format might not be suitable for the targeted applications. Example 9-90 tailors the standard InfoSphere CDC for DataStage Flat File output to use a different column delimiter ("ª", feminine ordinal indicator, instead of the double quotation mark) and column separator (|, vertical line, instead of the comma). Also, the flat file is generated in ISO8859-1 (Western-European) instead of the default unicode UTF-8 encoding.
Example 9-90 Sample code for tailored flat file output
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Time;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
 
import com.datamirror.ts.target.publication.UserExitJournalHeader;
import com.datamirror.ts.target.publication.userexit.DataRecordIF;
import com.datamirror.ts.target.publication.userexit
.DataTypeConversionException;
import com.datamirror.ts.target.publication.userexit.ReplicationEventIF;
import com.datamirror.ts.target.publication.userexit
.datastage.DataStageDataFormatIF;
 
/**
*
* Format the data suitable for the target application's sequential file reader.
*
*/
public class CDCDataStageFormat implements DataStageDataFormatIF {
 
// Specified encoding for output (formatted string)
private static final String OUTPUT_STRING_ENCODING = "ISO-8859-1";
 
// Separator character (between columns) and delimiter (surrounding columns)
private static String SEPARATOR = "|";
private static String DELIMITER = "ª";
 
// Types of record images that can be received by the formatter user exit
public final char SUB_RLA_AUDIT = 'A';
public final char SUB_RLA_AUDIT_BEFORE = 'B';
public final char SUB_RLA_INS_UPD = 'I';
public final char SUB_RLA_NON_UPD = 'U';
public final char SUB_RLA_DEL_NONE = 'D';
 
// Formatting for date, time and timestamp columns
private SimpleDateFormat outDateFormat = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
private SimpleDateFormat outDateOnlyFormat = new SimpleDateFormat(
"yyyy-MM-dd");
private SimpleDateFormat outTimeOnlyFormat = new SimpleDateFormat(
"HH:mm:ss");
 
public static final int BYTE_BUFFER_AUTO_INCREMENT_SIZE = 10000;
public static final int BYTE_BUFFER_AUTO_INCREMENT_BREATHING_SPACE = 1000;
public static final int BYTE_BUFFER_SPACE_FOR_FIELD_SEPARATORS = 100;
public static final int BYTE_BUFFER_SPACE_FOR_JOURNAL_CONTROL = 1000;
ByteBuffer outBuffer = ByteBuffer.allocate(BYTE_BUFFER_AUTO_INCREMENT_SIZE);
 
private int destinationType;
// Is this flat file or direct connect?
private int clobTruncationPoint;
// Truncation for CLOB columns as specified
// in subscription
private int blobTruncationPoint;
// Truncation for BLOB columns as specified
// in subscription
 
private static String ZERO_STRING = "0";
// Used for boolean FALSE external
// representation
private static String ONE_STRING = "1";
// Used for boolean TRUE external
// representation
 
// These are the byte arrays to be appended to the buffer. The content is
// already encoded
// in the specified encoding
private static byte[] SEPARATOR_AS_BYTE_ARRAY =
getAsEncodedByteArray(SEPARATOR);
private static byte[] DELIMITER_AS_BYTE_ARRAY =
getAsEncodedByteArray(DELIMITER);
private static byte[] SEP_DEL_SEP_AS_BYTE_ARRAY =
getAsEncodedByteArray(SEPARATOR + DELIMITER);
private static byte[] DEL_SEP_DEL_AS_BYTE_ARRAY =
getAsEncodedByteArray(DELIMITER+SEPARATOR+DELIMITER);
private static byte[] DEL_SEP_AS_BYTE_ARRAY =
getAsEncodedByteArray(DELIMITER + SEPARATOR);
private static byte[] ZERO_AS_BYTE_ARRAY =
getAsEncodedByteArray(ZERO_STRING);
private static byte[] ONE_AS_BYTE_ARRAY = getAsEncodedByteArray(ONE_STRING);
 
private int numberDataColumns = 0; // Number of data columns in table
ByteBuffer nullImage = null;
boolean firstTimeData = true;
boolean firstTimeJournal = true;
UETrace trace;
 
public CDCDataStageFormat() {
trace = new UETrace();
trace.init(true);
}
 
/**
 * This method is called when the subscription is started to indicate at
 * which position to truncate CLOB and BLOB columns.
 */
public void setLobTruncationPoint(int maxClobLengthInChars,
int maxBlobLengthInBytes) {
clobTruncationPoint = maxClobLengthInChars;
blobTruncationPoint = maxBlobLengthInBytes;
}
 
/**
 * Create a string containing the data images for the row; this method is
 * starts from the InfoSphere CDC for DataStage engine when a new row has to be
 * formatted.
 *
 * @param image
 * - The record image that must be processed
 * @return A buffer of bytes with the formatted data image
 */
public ByteBuffer formatDataImage(DataRecordIF image)
throws DataTypeConversionException {
boolean needToCloseQuote = false;
outBuffer.position(0);
 
if (image != null) {
// When called for the first time, determine the number of data
// columns
if (firstTimeData) {
numberDataColumns = getNumberDataColumns(image);
firstTimeData = false;
}
// End debug
for (int i = 1; i <= numberDataColumns; i++) {
Object colObj = image.getObject(i);
 
// For NULL values, we just leave the field empty
if (colObj != null) {
// For performance, we have this logic to only do one insert
// of separators and delimiters between columns;
// this reduces the amount of processing the user exit has
// to do
if (needToCloseQuote) {
outBuffer.put(DEL_SEP_DEL_AS_BYTE_ARRAY);
} else {
outBuffer.put(SEP_DEL_SEP_AS_BYTE_ARRAY);
}
needToCloseQuote = true;
 
// Time, timestamp and dates must be processed in this order
// because time and timestamp are also date objects
if (colObj instanceof Time) {
outBuffer = addEncodedStringToByteBuffer(outBuffer,
outTimeOnlyFormat.format((Time) colObj));
} else if (colObj instanceof Timestamp) {
outBuffer = addEncodedStringToByteBuffer(outBuffer,
outDateFormat.format((Timestamp) colObj));
} else if (colObj instanceof Date) {
outBuffer = addEncodedStringToByteBuffer(outBuffer,
outDateOnlyFormat.format((Date) colObj));
} else if (colObj instanceof byte[]) { // BLOB
byte[] val = (byte[]) colObj;
if (val.length > blobTruncationPoint) {
byte[] truncVal = new byte[blobTruncationPoint];
ByteBuffer truncBuffer = ByteBuffer.wrap(truncVal);
truncBuffer.put(val, 0, blobTruncationPoint);
val = truncVal;
}
outBuffer = addBytesToByteBuffer(outBuffer, val);
} else if (colObj instanceof Boolean) { // Boolean
if (((Boolean) colObj).booleanValue()) {
outBuffer = addBytesToByteBuffer(outBuffer,
ONE_AS_BYTE_ARRAY);
} else {
outBuffer = addBytesToByteBuffer(outBuffer,
ZERO_AS_BYTE_ARRAY);
}
} else if (colObj instanceof String) { // CLOB and strings
String val = ((String) colObj);
if (val.length() > clobTruncationPoint) {
val = val.substring(0, clobTruncationPoint);
}
outBuffer = addEncodedStringToByteBuffer(outBuffer, val);
} else if (colObj instanceof BigDecimal) { // All numerics
// Use toPlainString for Java 1.5
outBuffer = addEncodedStringToByteBuffer(outBuffer,
((BigDecimal) colObj).toString());
} else { // Any other type
outBuffer = addEncodedStringToByteBuffer(outBuffer,
colObj.toString());
}
} else {
if (needToCloseQuote) {
outBuffer.put(DEL_SEP_AS_BYTE_ARRAY);
needToCloseQuote = false;
} else {
outBuffer.put(SEPARATOR_AS_BYTE_ARRAY);
}
}
}
if (needToCloseQuote) {
outBuffer.put(DELIMITER_AS_BYTE_ARRAY);
}
}
return outBuffer;
}
 
/**
 * Create a string containing the null images for the row. This is the
 * before-image for an insert operation, or the after-image for a delete
 * operation.
 *
 * @param image
 * - The null record image that must be processed
 * @return A buffer of bytes with the formatted null image
 */
public ByteBuffer formatNullImage(DataRecordIF image)
throws DataTypeConversionException {
// There is a separate data formatter for each table, so a null image is
// the same for each row, so just need to create it once
if (nullImage == null) {
// when called for the first time, determine the number of data
// columns
if (firstTimeData) {
numberDataColumns = getNumberDataColumns(image);
firstTimeData = false;
}
String outString = "";
if (image != null) {
for (int i = 1; i <= numberDataColumns; i++) {
outString = outString + SEPARATOR;
}
}
nullImage = ByteBuffer.wrap(getAsEncodedByteArray(outString));
nullImage.position(nullImage.capacity());
}
return nullImage;
}
 
/**
 * Create a string holding the journal control columns which must be output
 * in the flat file
 *
 * @param event
 * - Indication of when the event occurred
 * @param operationType
 * - The type of the operation for which the image must be
 * generated
 * @return A buffer of bytes with the formatted journal control image
 */
/**
 * Return a ByteBuffer containing the journal control field values that are
 * of interest.
 *
 */
public ByteBuffer formatJournalControlFields(ReplicationEventIF event,
int operationType) throws DataTypeConversionException {
// when called for the first time, determine the number of data columns
if (firstTimeJournal) {
trace.write("Table that will be formatted: "
+ event.getJournalHeader().getLibrary() + "."
+ event.getJournalHeader().getObjectName());
firstTimeJournal = false;
}
// Determine the character to use to indicate the operation type
char opChar = ' ';
switch (operationType) {
case DataStageDataFormatIF.INSERT_RECORD:
opChar = SUB_RLA_INS_UPD;
break;
case DataStageDataFormatIF.DELETE_RECORD:
opChar = SUB_RLA_DEL_NONE;
break;
case DataStageDataFormatIF.FULL_UPDATE_RECORD:
opChar = SUB_RLA_NON_UPD;
break;
case DataStageDataFormatIF.BEFORE_UPDATE_RECORD:
opChar = SUB_RLA_AUDIT_BEFORE;
break;
case DataStageDataFormatIF.AFTER_UPDATE_RECORD:
opChar = SUB_RLA_AUDIT;
break;
 
}
 
UserExitJournalHeader header = (UserExitJournalHeader) event
.getJournalHeader();
String journalControlString = DELIMITER
+ header.getDSOutputTimestampStr() + DELIMITER + SEPARATOR
+ DELIMITER + header.getCommitID() + DELIMITER + SEPARATOR
+ DELIMITER + opChar + DELIMITER + SEPARATOR + DELIMITER
+ header.getUserName() + DELIMITER;
 
ByteBuffer retVal = ByteBuffer
.allocate(BYTE_BUFFER_SPACE_FOR_JOURNAL_CONTROL);
retVal = addEncodedStringToByteBuffer(retVal, journalControlString);
return retVal;
}
 
/**
 * Indicate whether this table is being delivered to DataStage using flat
 * files or by direct connect.
 *
 * @param destination
 * indicates the destination type
 */
public void setDestinationType(int destination) {
destinationType = destination;
}
 
/**
 * Method used when mapping tables with Direct Connect (InfoSphere CDC 6.5+).
 *
 * @param journalHeader
 * - Information about the journal control columns
 * @param rowDataImage
 * - Changed row data columns
 * @param changeRecord
 * - Changed row data columns in Map object
 * @param opType
 * - Operation type
 */
public void formatChangedRowFields(UserExitJournalHeader journalHeader,
DataRecordIF rowDataImage, Map<String, Object> changeRecord,
int opType) throws DataTypeConversionException {
 
}
 
/**
 * Determines the number of data columns in the image, excluding the journal
 * control columns as these are also passed when the data must be formatted
 *
 * @param image
 * - The data image to be evaluated
 * @return Number of table (data) columns in the table
 */
private int getNumberDataColumns(DataRecordIF image) {
int numberColumns = 0;
trace.write("Columns in published image:");
for (int i = 1; i <= image.getColumnCount(); i++) {
if (!image.getColumnName(i).startsWith("&"))
numberColumns++;
trace.write(image.getColumnName(i));
}
trace.write("Number of data columns: " + numberColumns);
return numberColumns;
}
 
/**
 * Returns the passed string as a byte array, encoded in the character set
 * that is specified in the settings
 *
 * @param inString
 * - Input string to encode and return as byte array
 * @return
 */
public static byte[] getAsEncodedByteArray(String inString) {
byte[] retval;
 
try {
retval = inString.getBytes(OUTPUT_STRING_ENCODING);
} catch (UnsupportedEncodingException e) {
// If the encoding is not supported, the default encoding is used
retval = inString.getBytes();
}
return retval;
}
 
/**
 * Append the passed string to the ByteBuffer object, encoded in the
 * character set that is specified in the settings. This method is used to
 * format non-binary objects which all must be encoded (including timestamp
 * and numerics).
  *
  * @param buf
 * - Byte buffer to which the string will be appended
 * @param inString
 * - String to be appended to the byte buffer
 * @return Changed byte buffer
 * @throws DataTypeConversionException
 */
public static ByteBuffer addEncodedStringToByteBuffer(ByteBuffer buf,
String inString) throws DataTypeConversionException {
ByteBuffer retVal;
byte[] asBytes;
 
asBytes = getAsEncodedByteArray(inString);
 
if (buf.capacity() < buf.position() + asBytes.length
+ BYTE_BUFFER_SPACE_FOR_FIELD_SEPARATORS) {
int increment = BYTE_BUFFER_AUTO_INCREMENT_SIZE;
if (increment < asBytes.length) {
increment = asBytes.length
+ BYTE_BUFFER_AUTO_INCREMENT_BREATHING_SPACE;
}
retVal = ByteBuffer.allocate(buf.capacity() + increment);
buf.flip();
retVal.put(buf);
} else {
retVal = buf;
}
 
retVal.put(asBytes);
 
return retVal;
}
 
/**
 * Append the passed byte to the ByteBuffer object, as-is. This method is
 * used to format binary objects which must not be encoded.
 *
 * @param buf
 * - Byte buffer to which the byte will be appended
 * @param inByte
 * - Byte to be appended to the byte buffer
 * @return Changed byte buffer
 */
public static ByteBuffer addByteToByteBuffer(ByteBuffer buf, byte inByte) {
ByteBuffer retVal;
 
if (buf.capacity() < buf.position() + 1
+ BYTE_BUFFER_SPACE_FOR_FIELD_SEPARATORS) {
retVal = ByteBuffer.allocate(buf.capacity()
+ BYTE_BUFFER_AUTO_INCREMENT_SIZE);
buf.flip();
retVal.put(buf);
} else {
retVal = buf;
}
retVal.put(inByte);
 
return retVal;
}
 
/**
 * Append the passed byte array to the ByteBuffer object, as-is. This method
 * is used to format binary objects which must not be encoded.
 *
 * @param buf
 * - Byte buffer to which the byte will be appended
 * @param asBytes
 * - Byte array to be appended to the byte buffer
 * @return Changed byte buffer
 */
public static ByteBuffer addBytesToByteBuffer(ByteBuffer buf,
byte[] asBytes) {
ByteBuffer retVal;
 
if (buf.capacity() < buf.position() + asBytes.length
+ BYTE_BUFFER_SPACE_FOR_FIELD_SEPARATORS) {
int increment = BYTE_BUFFER_AUTO_INCREMENT_SIZE;
if (increment < asBytes.length) {
increment = asBytes.length
+ BYTE_BUFFER_AUTO_INCREMENT_BREATHING_SPACE;
}
retVal = ByteBuffer.allocate(buf.capacity() + increment);
buf.flip();
retVal.put(buf);
} else {
retVal = buf;
}
retVal.put(asBytes);
return retVal;
}
}
The CDCDataStageFormat class provides an example of how the flat file output can be customized to meet your needs. There are three main methods in the interface that must be implemented to format the data: formatDataImage(), formatNullImage() and formatJournalControlFields(). The other interface methods must be implemented too, but you can choose whether you want to provide any code for methods. When data is formatted, the formatJournalControlFields() method is called first. This method provides the means to do initialization processing at the table level and formats the journal control columns. For formatting the data image, the formatDataImage() method is passed the argument, which holds the data record object. When an update operation is processed, this method is starts twice, once for the before image and once for the after image. The formatNullImage() method is called for the insert and delete operations if the before and after images are in a single record and can return the number of column separators equivalent to the number
of columns.
If a character sequence shows the column delimiter, which is common when binary objects are replicated, the example could also fit in that scenario. Change the DELIMITER to the sequence of characters that defines the column delimiter, for example, "?~#".
To activate the custom data formatter, the class must be specified in the Flat File properties of the table mappings. The class cannot accept any parameters.
An example configuration for DataStage is shown in Figure 9-38.
Figure 9-38 InfoSphere CDC user exit configuration for DataStage
When running the subscription and making a few changes, the flat file output appears as shown in Figure 9-39.
Figure 9-39 InfoSphere CDC user exit DataStage output
9.7.7 Notifications
As explained in 9.6.4, “Events” on page 367, some external monitoring solutions expect that monitored log files be appended to determine if there are any new messages to be acted upon.
The example Java notification user exit writes events that have been selected for user exit handling to the <cdc_home>/log/cdc_notifications.log file. When a notification has been configured for a certain category in the InfoSphere CDC process and that category is detected, InfoSphere CDC starts the
handle() method.
Example 9-91 shows sample code for a notification user exit.
Example 9-91 Notification user exit
import java.io.*;
import java.util.*;
import java.text.SimpleDateFormat;
import com.datamirror.ts.api.*;
 
public class NotificationToFile implements AlertHandlerIF {
 
// Line separator is dependent on the platform (Linux/Unix/Windows)
private final static String LINE_SEPARATOR = System
.getProperty("line.separator");
// Directory separator is dependent on the platform (/ on Unix or Linux,
// on Windows)
private final static String FILE_SEPARATOR = System
.getProperty("file.separator");
 
/**
 * Constructor, will be called when the object is instantiated. You could
 * include activity such as creating the log file.
 */
public NotificationToFile() {
}
 
/**
 * This method is starts for every event that is defined to be handled by a
 * USER HANDLER at the datastore or subscription level.
 *
 * When the method is called, it opens the cdc_notifications.log file in the
 * <cdc_home>/log directory and writes the event in a format that is
 * equivalent to the output of dmshowevents. The log file is continuously
 * appended to and can be monitored by an external monitoring solution.
 *
 * @param zone
 * - Zone of the event (not used anymore with InfoSphere CDC 6.5)
 * @param category
 * - Category of the event (information, error, ...)
 * @param sourceOrTarget
 * - Did the event happen on the source or the target
 * @param subscriptionName
 * - Subscription that generated the event
 * @param eventID
 * - Numeric representation of the event
 * @param eventText
 * - Message issued by CDC engine
 * @param otherInfo
 * - Other properties information (not used)
 */
public void handle(int zone, int category, String sourceOrTarget,
String subscriptionName, int eventID, String eventText,
Properties otherInfo) throws Exception {
BufferedWriter notificationWriter = null;
try {
// Locate or create the file and open it for output
notificationWriter = new BufferedWriter(new FileWriter("log"
+ FILE_SEPARATOR + "cdc_notifications.log", true));
// Determine the current time and convert it to ISO representation
Calendar calendar = new GregorianCalendar();
calendar.setTime(new Date());
SimpleDateFormat dateFormat = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
 
// Format the message string and write to the log file
String message = dateFormat.format(calendar.getTime()) + "|"
+ sourceOrTarget + "|" + subscriptionName + "|" + eventID
+ "|" + getCategoryString(category) + "|"
+ getZoneString(zone) + "|" + eventText + LINE_SEPARATOR;
notificationWriter.write(message);
} finally {
// Always executed
if (notificationWriter != null) {
try {
notificationWriter.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return;
}
 
/**
 * Converts the event category to a readable string
 *
 * @param category
 * - Category of the event
 * @return Category string representation
 */
private String getCategoryString(int category) {
switch (category) {
case 1:
return "Fatal";
case 2:
return "Error";
case 3:
return "Information";
case 4:
return "Status";
case 5:
return "Operational";
default:
return "Unknown Zone";
}
}
 
/**
 * Converts the event zone to a readable string
 *
 * @param zone
 * - Zone of the event
 * @return Zone string representation
 */
private String getZoneString(int zone) {
switch (zone) {
case 1:
return "Communication";
case 2:
return "Apply";
case 3:
return "Environment";
default:
return "";
}
}
}
To implement the notification handling, go to the Notifications section of your data store and select the notifications. If you only want to handle certain events (for example, unrecoverable and error messages), select only the notifications that generate entries in this file. Figure 9-40 shows an example of how the notification user handler can be specified at the data store level.
Figure 9-40 User exit notification configuration
When you start a subscription and look at the events being generated, you see the event log messages logged in the <cdc_home>/log/cdc_notifications.log file (Figure 9-41).
Figure 9-41 Output of event log messages
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset