例子:消息驱动Bank
为了阐述我们的观点,我们将开发和安装一个完整的样板应用程序:一个消息驱动的银行系统. 通过(幸亏有Spring)改进的基于POJOs的编程模型和保留相同的事务,我们可以不需要EJB或者一个应用服务器来实现这个系统。在下一个部分,我们将从消息驱动架构产生到另一个架构.就像基于WEB的架构一样.图1展示我们的样本应用程序的架构.
Figure 1. Architecture of the message-driven bank
在我们的例子中,我们将处理来自Java消息服务队列的银行定单.一张定单的处理包括通过JDBC来更新当前帐户的数据库.为了避免信息的丢失和重复,我们将使用JTA和JTA/XA事务来配合更新:处理信息和更新数据库将发生在一个原子事务里.资源部分可得到JTA/XA的更多信息.
编写应用程序代码 该应用程序将由两个JAVA类组成: Bank(一个DAO)和MessageDrivenBank.如图2.
Figure 2. Classes for the message-driven bank
Bank是一个数据访问对象,这个对象封装数据库访问。MessageDrivenBank是一个消息驱动façade并且是DAO的委托.与典型的J2EE方法不同,这个应用程序不包括EJB类.
第一步:编写Bank DAO 如下, Bank源代码是很直接和简单的JDBC操作.
package jdbc;
import javax.sql.*;
import java.sql.*;
public class Bank
{
private DataSource dataSource;
public Bank() {}
public void setDataSource ( DataSource dataSource )
{
this.dataSource = dataSource;
}
private DataSource getDataSource()
{
return this.dataSource;
}
private Connection getConnection()
throws SQLException
{
Connection ret = null;
if ( getDataSource() != null ) {
ret = getDataSource().
getConnection();
}
return ret;
}
private void closeConnection ( Connection c )
throws SQLException
{
if ( c != null ) c.close();
}
public void checkTables()
throws SQLException
{
Connection conn = null;
try {
conn = getConnection();
Statement s = conn.createStatement();
try {
s.executeQuery (
"select * from Accounts" );
}
catch ( SQLException ex ) {
//table not there => create it
s.executeUpdate (
"create table Accounts ( " +
"account VARCHAR ( 20 ), " +
"owner VARCHAR(300), " +
"balance DECIMAL (19,0) )" );
for ( int i = 0; i < 100 ; i++ ){
s.executeUpdate (
"insert into Accounts values ( " +
"'account"+i +"' , 'owner"+i +"', 10000 )"
);
}
}
s.close();
}
finally {
closeConnection ( conn );
}
//That concludes setup
}
//
//Business methods are below
//
public long getBalance ( int account )
throws SQLException
{
long res = -1;
Connection conn = null;
try {
conn = getConnection();
Statement s = conn.createStatement();
String query =
"select balance from Accounts where account='"+
"account" + account +"'";
ResultSet rs = s.executeQuery ( query );
if ( rs == null || !rs.next() )
throw new SQLException (
"Account not found: " + account );
res = rs.getLong ( 1 );
s.close();
}
finally {
closeConnection ( conn );
}
return res;
}
public void withdraw ( int account , int amount )
throws Exception
{
Connection conn = null;
try {
conn = getConnection();
Statement s = conn.createStatement();
String sql =
"update Accounts set balance = balance - "+
amount + " where account ='account"+
account+"'";
s.executeUpdate ( sql );
s.close();
}
finally {
closeConnection ( conn );
}
}
}
注意:代码并没有依赖EJB或任何专门的应用程序服务器.实际上,这是一个纯JAVA代码,这个JAVA代码是能在任何J2SE环境下运行的.
你同时应注意:我们使用了来自JDBC的DataSource接口.这意味着我们的类是独立于目前JDBC供应商提供的类. 你可能会疑惑,这怎么能与特定的数据管理系统(DBMS)提供商的JDBC实现紧密结合呢? 这里就是Spring框架帮你实现的. 这个技术被称为依赖注入:在我们的应用程序的启动期间,通过调用setDataSource方法,Spring为我们提供了相应的datasource对象.在后面几部分我们会更多地提到Spring.如果我们在以前使用应用程序服务器,我们将不得不借助于JAVA命名绑定接口(JNDI)查询.
除了直接使用JDBC,我们也可以使用Hibernate或者一个JDO工具来实现我们的持久层.这同样不需要任何的EJB代码.
第二步:配置BankDAO 我们会将便用Spring框架来配置我们的应用程序.Spring不是必需的,但是使用Spring的好处是我们将可以简单的添加服务,如:我们JAVA对象的事务和安全.这类似于应用服务器为EJB提供的东西,只是在我们的例子中Spring将变得更容易.
Spring也允许我们把我们的类从目前的JDBC驱动实现中分离出来:Spring能够配置Driver(基于我们的XML配置数据)并把它提供给BankDAO对象(依赖注入原理).这样可以保持我们的JAVA代码的清淅和集中.这步的Spring配置文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<bean id="datasource"
class="com.atomikos.jdbc.nonxa.NonXADataSourceBean">
<property name="user">
<value>sa</value>
</property>
<property name="url">
<value>jdbc:hsqldb:SpringNonXADB
</value>
</property>
<property name="driverClassName">
<value>org.hsqldb.jdbcDriver</value>
</property>
<property name="poolSize">
<value>1</value>
</property>
<property name="connectionTimeout">
<value>60</value>
</property>
</bean>
<bean id="bank" class="jdbc.Bank">
<property name="dataSource">
<ref bean="datasource"/>
</property>
</bean>
</beans>
这个XML文件包括两个对象的配置:访问数据库的DataSource和使用这个DataSource的Bank对象.下面是由Spring维护的一些基本任务.
· 创建应用程序(例: Bank和DataSource)需要的对象(“beans”).在XML文件中给出了这些对象的类名,并且在我们的例子中,这些对象需要有一个公共的无参数constructor (Spring也允许参数,但是配置语法上有所不同).这些对象都被命名(XML中的id属性),所以我们后面能够引用这些对象. id也允许我们的应用程序找回它需要的已配置对象.
· 这些对象的初始化是通过在XML文件中的properties的值实现. 在XML文件中这些properties名 应与对应的类中的setXXX方法相对应.
· 将对象连接在一起 :一个property可能是另一个对象(例如:在我们例子中的数据源)的引用,引用可以通过id创建.
注意:在我们下一步中, 我们将选择配置一个JTA-enabled的数据源(由Atomikos Transactions提供,可用于企业和J2SE的JTA产品,我们将应用于我们的应用程序). 简单起见,我们将使用HypersonicSQLDB,这个DBMS不需要专门的安装步骤—它是在.jar文件里,就像JTA和Spring.
但是,考虑到渐增的可靠性需求,强列推荐你使用XA-capable的DBMS和JDBC驱动.没有XA的支持, 在crash或重启之后你的应用程序将不能恢复原有数据. 资源部分有链接到关于事务和XA的信息和一些例子.
作为一个练习,你可以试试从HypersonicSQLDB转换到FirstSQL,一个易安装XA-compliant的DBMS.换句话说,任何其他为企业准备的和XA-capable的DBMS也会做得很好.
第三步:测试BankDAO 让我们来测试我们的代码,(使用极限编程的程序员会首先写测试,但因开始不是很清淅,所以我们直到现在才开始写测试.)下面是一个简单的单元测试.这个测试可在你的的应用程序里运行:它通过Spring获得一个BANK对象来进行测试(这在setUp方法中实现).注意:这个测试使用清楚的事务划分:每一个测试开始之前开始一个事务,每个测试结束时强制进行事务回滚.这是通过手工的方式来减少测试对数据库数据的影响.
package jdbc;
import com.atomikos.icatch.jta.UserTransactionImp;
import junit.framework.TestCase;
import java.io.FileInputStream;
import java.io.InputStream;
import org.springframework.beans.factory.xml.XmlBeanFactory;
public class BankTest extends TestCase
{
private UserTransactionImp utx;
private Bank bank;
public BankTest ( String name )
{
super ( name );
utx = new UserTransactionImp();
}
protected void setUp()
throws Exception
{
//start a new transaction
//so we can rollback the
//effects of each test
//in teardown!
utx.begin();
//open bean XML file
InputStream is =
new FileInputStream("config.xml");
//the factory is Spring's entry point
//for retrieving the configured
//objects from the XML file
XmlBeanFactory factory =
new XmlBeanFactory(is);
bank = ( Bank ) factory.getBean ( "bank" );
bank.checkTables();
}
protected void tearDown()
throws Exception
{
//rollback all DBMS effects
//of testing
utx.rollback();
}
public void testBank()
throws Exception
{
int accNo = 10;
long initialBalance = bank.getBalance ( accNo );
bank.withdraw ( accNo , 100 );
long newBalance = bank.getBalance ( accNo );
if ( ( initialBalance - newBalance ) != 100 )
fail ( "Wrong balance after withdraw: " +
newBalance );
}
}
我们将需要JTA事务来确保JMS和JDBC都是原子操作.一般来说,当经常都是两个或多个连接的时候,你应考虑一下JTA/XA。例如,在我们例子中的JMS和JDBC. Spring本身不提供JTA事务;它需要一个JTA实现或者委派一个应用服务器来处理这个事务.在这里,我们使用了一个JTA实现,这个实现可以在任何J2SE平台上工作.
最终架构如下面图3.白色方框代表我们的应用程序代码.
Figure 3. Architecture for the test
如你所看到的,当我们执行我们的测试,将会发生下面的情况:
1. BankTest开始一个新事务.
2. 然后,这个test在Spring运行期间获得一个BANK对象.这步触发Sping的创建和初始化过程.
3. 这个test调用BANK的方法.
4. BANK调用datasource对象,通过它自己的setDataSource 方法从Spring 获取这个对像.
5. 这个数据源是JTA-enabled,并且与JTA实现交互来注册当前事务.
6. JDBC statements和帐户数据库交互.
7. 当方法返回时, test调用事务回滚.
8. JTA记得住datasource对象,会命令它进行回滚.
第四步:添加声明式事务管理 Spring允许添加声明式事务管理来管理java对象.假设我们想确认bank总是和一个有效的事务上下文一起被调用.我们通过在实际对象的上部配置一个proxy对象. Proxy和实际对象有相同接口,所以客户通过完全相同的方式使用它. 配置Proxy wrap每个BankDAO方法到事务中.结果配置文件如下. 不要被XML的庞大吓倒—大多数内容能通过复制和粘贴到你自己的工程中再使用.
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<!--
Use a JTA-aware DataSource
to access the DB transactionally
-->
<bean id="datasource"
class="com.atomikos.jdbc.nonxa.NonXADataSourceBean">
<property name="user">
<value>sa</value>
</property>
<property name="url">
<value>jdbc:hsqldb:SpringNonXADB</value>
</property>
<property name="driverClassName">
<value>org.hsqldb.jdbcDriver</value>
</property>
<property name="poolSize">
<value>1</value>
</property>
<property name="connectionTimeout">
<value>60</value>
</property>
</bean>
<!--
Construct a TransactionManager,
needed to configure Spring
-->
<bean id="jtaTransactionManager"
class="com.atomikos.icatch.jta.UserTransactionManager"/>
<!--
Also configure a UserTransaction,
needed to configure Spring
-->
<bean id="jtaUserTransaction"
class="com.atomikos.icatch.jta.UserTransactionImp"/>
<!--
Configure the Spring framework to use
JTA transactions from the JTA provider
-->
<bean id="springTransactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager">
<ref bean="jtaTransactionManager"/>
</property>
<property name="userTransaction">
<ref bean="jtaUserTransaction"/>
</property>
</bean>
<!-- Configure the bank to use our datasource -->
<bean id="bankTarget" class="jdbc.Bank">
<property name="dataSource">
<ref bean="datasource"/>
</property>
</bean>
<!--
Configure Spring to insert
JTA transaction logic for all methods
-->
<bean id="bank"
class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">
<property name="transactionManager">
<ref bean="springTransactionManager"/>
</property>
<property name="target">
<ref bean="bankTarget"/>
</property>
<property name="transactionAttributes">
<props>
<prop key="*">
PROPAGATION_REQUIRED, -Exception
</prop>
</props>
</property>
</bean>
</beans>
这个XML文件告诉Spring去配置下面的对象:
1. 需要通过JDBC连接的datasource.
2. 添加jtaTransactionManager和jtaUserTransaction用于为JTA事务的Spring配置作准备.
3. springTransactionManager用于告诉Spring需要使用JTA.
4. BankDAO被重命名为bankTarget (因如下解释的原因).
5. bank对象被添加用于包装事务和bankTarget的所有方法.我们通过配置bank对象来使用
springTransactionManager,这意味着所有事务将是JTA事务. 每个事务都被设置为PROPAGATION_REQUIRED,这将在任何异常下出现强制回滚.
对这些对象包含的内容,你都可以很容易的复制和粘贴jtaTransactionManager, jtaUserTransaction, springTransactionManager到其他工程.其他的是应用程序相关的对象:datasource, bankTarget, bank. Bank对象很有趣:事实上对于bankTarget它是一个proxy;他们拥有相同的接口. Trick如下:当我们的应用程序请求Spring去配置和返回bank对象,Spring实际上将返回proxy(看起来和我们的应用程序完全相同),随后这个proxy将为我们开始/结束事务.这样,应用程序和Bank类本身都不需要知道JTA!图4阐述了在这步我们所得到的.
Figure 4. Architecture with declarative JTA transactions in Spring
现在的工作如下:
1. 应用程序调用bank对象.这将触发Spring的初始化处理和返回proxy对象. 对应用程序而言,这个proxy行为和我们的Bank是一样的.
2. 当bank的一个方法被调用, 这个调用将会通过proxy进行.
3. proxy使用springTransactionManager创建一个新事务.
4. springTransactionManager被配置为使用JTA,因些它委派到JTA.
5. 调用被forward到Bank的实际对象,bankTarget.
6. bankTarget使用从Spring中得到的datasource.
7. datasource对事务进行注册.
8. 通过规则的JDBC访问数据库.
9. 在返回时, proxy终止事务:如果在先前的序列中没有发生异常,那么将会提交终止指令.否则,它将会被回滚.
10. transaction 管理器与数据库配合进行提交和回滚.
在这步中的测试怎样进行?我们可重用BankTest 和它清晰的事务划分:因为PROPAGATION_REQUIRED, proxy将和在BankTest中创建的事务上下文一起执行.
第五步:编写PROPAGATION_REQUIRED 在这步,我们将添加JMS处理逻辑.为了做到这样,我们主要需要实现JMS MessageListener接口.我们也会添加公共的setBank方法使Spring的依赖注入起作用.源代码如下:
package jms;
import jdbc.Bank;
import javax.jms.Message;
import javax.jms.MapMessage;
import javax.jms.MessageListener;
public class MessageDrivenBank
implements MessageListener
{
private Bank bank;
public void setBank ( Bank bank )
{
this.bank = bank;
}
//this method can be private
//since it is only needed within
//this class
private Bank getBank()
{
return this.bank;
}
public void onMessage ( Message msg )
{
try {
MapMessage m = ( MapMessage ) msg;
int account = m.getIntProperty ( "account" );
int amount = m.getIntProperty ( "amount" );
bank.withdraw ( account , amount );
System.out.println ( "Withdraw of " +
amount + " from account " + account );
}
catch ( Exception e ) {
e.printStackTrace();
//force rollback
throw new RuntimeException (
e.getMessage() );
}
}
}
第六步:配置MessageDrivenBank 这里我们配置MessageDrivenBank去监听事务的QueueReceiverSessionPool.这样给我们可以实现和EJB(没有丢失信息和冗余信息)类似的消息机制,但在这里我们是用简单的POJO对象实现.当向pool中插入一个MessageListener,这个会话池将确保用JTA/XA事务接收到消息.结合JTA/XA-capable 的JDBC数据源,我们可以实现可靠的消息机制. Spring的配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<!--
NOTE: no explicit transaction manager bean
is necessary
because the QueueReceiverSessionPool will
start transactions by itself.
-->
<beans>
<bean id="datasource"
class="com.atomikos.jdbc.nonxa.NonXADataSourceBean">
<property name="user">
<value>sa</value>
</property>
<property name="url">
<value>jdbc:hsqldb:SpringNonXADB</value>
</property>
<property name="driverClassName">
<value>org.hsqldb.jdbcDriver</value>
</property>
<property name="poolSize">
<value>1</value>
</property>
<property name="connectionTimeout">
<value>60</value>
</property>
</bean>
<bean id="xaFactory"
class="org.activemq.ActiveMQXAConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
<bean id="queue"
class="org.activemq.message.ActiveMQQueue">
<property name="physicalName">
<value>BANK_QUEUE</value>
</property>
</bean>
<bean id="bank" class="jdbc.Bank">
<property name="dataSource">
<ref bean="datasource"/>
</property>
</bean>
<bean id="messageDrivenBank"
class="jms.MessageDrivenBank">
<property name="bank">
<ref bean="bank"/>
</property>
</bean>
<bean id="queueConnectionFactoryBean"
class="com.atomikos.jms.QueueConnectionFactoryBean">
<property name="resourceName">
<value>QUEUE_BROKER</value>
</property>
<property name="xaQueueConnectionFactory">
<ref bean="xaFactory"/>
</property>
</bean>
<bean id="queueReceiverSessionPool"
class="com.atomikos.jms.QueueReceiverSessionPool"
init-method="start">
<property name="queueConnectionFactoryBean">
<ref bean="queueConnectionFactoryBean"/>
</property>
<property name="transactionTimeout">
<value>120</value>
</property>
<!--
default license allows only limited
concurrency so keep pool small
-->
<property name="poolSize">
<value>1</value>
</property>
<property name="queue">
<ref bean="queue"/>
</property>
<property name="messageListener">
<ref bean="messageDrivenBank"/>
</property>
</bean>
</beans>
因为这篇文章需要一个便于安装的JMS服务,所以这里我们使用ActiveMQ.如果你正在使用另一个JMS实现,那么你将仍然能使用这部分提出的技术.接下来除了datasource和bank对象,我们将增加下面的对象定义:
· xaFactory: 为建立JMS连接的connection工厂.
· queue: queue代表我们将使用的JMS队列, 这个队列被配置成ActiveMQ要求的形式.
· queueConnectionFactoryBean:一个JTA-aware的JMS连接器.
· A queueReceiverSessionPool for JTA-enabled message consumption:注意:我们同时指定了用来调用的初始化方法(例:start);这是Spring的另一个特性. Start方法在session pool类里定义,它是在Spring配置文件中进行配置的.
· messageDrivenBank:负责处理消息.
你可以问问自己事务管理是在哪里进行的.事实上, 在先前部分被添加的对象已消失.为什么呢?因为我们现在使用QueueReceiverSessionPool来接收来自JMS的消息,并且这个类也为每次接收启动一个JTA事务.我们也可以保留JTA配置,另外添加JMS配置, 但是这样可能会使XML文件更长. 现在session pool类将担当事务管理角色.它和proxy方法的工作相似; 只是这个类需要JMS MessageListener 为之添加事务. 通过这样配置,在每个消息收接之前程序将启动一个新事务 无论何时, 当我们的消息实例正常返回时, 这个事务将提交. 如果出现RuntimeException, 那么这个事务将回滚. 结构如下面图5(可以清淅地看到一些JMS对象).
Figure 5. Architecture for message-driven applications in Spring
现在该架构工作如下:
1. 应用程序调用bank对象和初始化数据库表.
2. 应用程序queueReceiverSessionPool, 因此触发一个start方法的调用去监听到达的消息.
3. queueReceiverSessionPool在队列中侦察一个新消息.
4. queueReceiverSessionPool开始一个新事务,并且注册这个事务.
5. queueReceiverSessionPool调用已注册的MessageListener (messageDrivenBank).
6. 这将触发对bank 对象的调用.
7. bank 对象通过datasource访问数据库.
8. datasource注册事务.
9. 通过JDBC访问数据库.
10. 当处理完成时, queueReceiverSessionPool会终止这个事务。然后进行commint(除非发生RuntimeException).
11. transaction manager开始消息队列的两阶段提交.
12. transaction manager开始数据库的两阶段提交.
第七步:编写应用程序 因为我们没有使用容器,我们仅仅提供一个Java应用程序就可以启动整个银行系统.我们的Java应用程序是非常简单: 它有能力找回配置的对象(Spring通过XML文件将他们放到一起). 这个应用程序能在任何兼容的JDK(Java Development Kit)上运行,并且不需要应用服务器.
package jms;
import java.io.FileInputStream;
import java.io.InputStream;
import org.springframework.beans.factory.xml.XmlBeanFactory;
import com.atomikos.jms.QueueReceiverSessionPool;
import jdbc.Bank;
public class StartBank
{
public static void main ( String[] args )
throws Exception
{
//open bean XML file
InputStream is =
new FileInputStream(args[0]);
//the factory is Spring's entry point
//for retrieving the configured
//objects from the XML file
XmlBeanFactory factory =
new XmlBeanFactory(is);
//retrieve the bank to initialize
//alternatively, this could be done
//in the XML configuration too
Bank bank =
( Bank ) factory.getBean ( "bank" );
//initialize the bank if needed
bank.checkTables();
//retrieve the pool;
//this will also start the pool
//as specified in the beans XML file
//by the init-method attribute!
QueueReceiverSessionPool pool =
( QueueReceiverSessionPool )
factory.getBean (
"queueReceiverSessionPool" );
//Alternatively, start pool here
//(if not done in XML)
//pool.start();
System.out.println (
"Bank is listening for messages..." );
}
}
这就是J2EE!是不是认为J2EE也很容易呢?
对通用性的考虑 这部分里我们看看更多的概念,这些概念在许多J2EE应用程序中是很重要的.我们同样将看到对这些概念来说,一个应用服务器并不是必须的.
集群和可扩展性 健壮的企业应用程序需要集群来分流负担. 在消息驱动应用程序的例子中,这很容易:我们自动地从JMS应用程序继承得来处理能力.如果我们需要更强大的处理能力,那么我们只需增加更多连接相同JMS服务器的进程.一个对服务性能有效的衡量标准是在队列中停留的消息的数量. 在其他情况下,如基于web的 架构(如下)我们能很容易地使用web环境下的集群能力.
方法级别的安全 一个典型的观点是认为EJB能增加方法级的安全性.虽然并没有在这篇文章中提到,但是在Sping中配置方法级别的安全是可能的.这种配置类似于我们增加方法级的事务划分的方式.
对非消息驱动的应用程序的通用性 在不改变源代码的情况下(除了主应用程序类),我们使用的平台能很容易地被整合到任何J2EE web 应用服务器. 换句话说 , 通过JMS进行后台处理;这使得web服务器在面对后台处理的延迟问题上更可靠和更独立.在任何情况下, 为了实现容器管理的事务或容器管理的安全性,我们都不再需要依靠EJB容器来实现.
关于容器管理持久化? 存在并被很多开发者检验过的技术例如:JDO或者Hibernate 都不一定需要一个应用服务器. 另外,这些工具已经占据了持久化市场.
结论 今天,不需要应用服务器的J2EE已经成为可能,也很容易. 有人或许会说,没有应用程序服务器,有一些应用程序仍不能实现:例如,如果你需要一般的JCA(Java Connectivity API) 功能性, 那么我们上面提供的平台是不够的. 但是,这可能会发生改变,因为不用使用一个应用程序服务器进行开发,测式和部署的好处实在是太大. 人们越来越相信: 将来得J2EE是一个模块化的”选择你所需要”架构. 这与我们之前的完全基于应用服务器的方法相反. 在这样的情况下,J2EE开发者将从应用服务器和EJB中解放出来.