织梦CMS - 轻松建站从此开始!

罗索

基于JXTA的P2P应用开发

罗索客 发布于 2009-08-12 11:27 点击:次 
本文的例子是一个基于JXTA的P2P应用,Peer可以利用它聊天。它允许Peer把自己注册到网络上,发送消息给其他Peer,或者从其他Peer接收消息。 编写JXTA应用要求JXTA内核以jxta.jar JAR文件的形成存在。(jxta.jar文件可以从http://download.jxta.org/下载。)jxta.jar文件
TAG:

P2P即Peer-to-Peer,称为对等连接或对等网络,是一种点对点计算模式。JXTA是项目创始人、Sun首席科学家Bill Joy二十多年酝酿的结晶,“JXTA技术是网络编程和计算的平台,用以解决现代分布计算尤其是P2P计算中出现的问题。”JXTA协议是一组为P2P网络计算而设计的协议,共六种。这六种协议分别是:Peer Discovery Protocol,Peer Resolver Protocol,Peer Information Protocol,Peer Membership Protocol,Pipe Binding Protocol,以及Peer Endpoint Protocol。利用这些协议,我们可以让消息跨越多个网络,发送到网络上的任意其他Peer。这些协议是所有Java P2P应用的基础,请参见图一的JXTA应用体系(该图来自jxta.org)。

P2P,JXTA


图一:JXTA应用体系

本文的例子是一个基于JXTA的P2P应用,Peer可以利用它聊天。它允许Peer把自己注册到网络上,发送消息给其他Peer,或者从其他Peer接收消息。 编写JXTA应用要求JXTA内核以jxta.jar JAR文件的形成存在。(jxta.jar文件可以从http://download.jxta.org/下载。)jxta.jar文件必须在CLASSPATH中。
构造JXTA应用并不是一件很复杂的事情。只需实现net.jxta.platform.Application接口,并提供该接口定义的三个方法:init(),startApp()和stopApp()。下面显示的SimpleJXTA类就是一个最简单的JXTA应用(注意,程序必须导入net.jxta.document.Advertisement类和net.jxta.peergroup.PeerGroup接口,init()方法需要它们)。

// 最简单的JXTA应用
import net.jxta.platform.Application;
import net.jxta.document.Advertisement;
import net.jxta.peergroup.PeerGroup;
public class SimpleJXTA implements Application {
   public void init(PeerGroup group,
      Advertisement adv) {
   }
   public int startApp(String[] args) {
      return 0;
   }
   public void stopApp() {
   }
}

作为一个Java应用,上面的程序是否遗漏了static void main()方法呢?其实,与普通Java应用相比,JXTA应用的运行方式有所不同。JXTA应用要求启动JXTA平台。JXTA平台启动之后,它将启动所有在jxtaConfig配置文件中指定的应用。JXTA平台启动一个应用时,首先调用应用的init()方法,接着调用startApp()方法。

JXTA聊天程序

本文的聊天应用改编自JXTA Shell软件包的Talk程序,设计这个应用的主要目的是为了学习JXTA编程。它很简单,与基于Java Socket的许多聊天应用相比,它缺少许多高级特性,且不具备强健的错误控制机制。然而,这个程序用到了许多重要的JXTA类和接口,初学者可以由此开始深入了解JXTA。

程序有一个很简单的GUI,如图二所示。

基于JXTA的P2P应用开发,图二:程序界面

图二:程序界面

所有来自聊天用户的消息,格式为“[用户名字] 消息”。聊天程序本身报告的信息都以“->”开头。

聊天程序的业务规则为:

● 用户在登录文本框中输入自己的名字,并点击“登录”按钮登录。

● 用户名字必须唯一,任何两个用户不能有相同的名字。如果某个用户输入的名字正在被其他用户使用,则应用将报告该信息,正在试图登录的用户必须改变自己的名字。

● 成功登录后,用户在消息输入框中输入消息。所有消息将以广播的形式发送给当前已经登录的所有用户,包括消息发送者本身。

登录

登录的目的不是进行身份验证,任何拥有聊天程序的人都可以加入聊天。登录的目的是为了让其他用户知道当前用户的存在,使得聊天程序能够发送消息,或者从其他聊天用户接收消息。

消息通过管道从一个Peer传递到另一个Peer。管道由net.jxta.pipe包里面的Pipe接口描述。管道是在两个JXTA应用或服务之间传递消息的核心机制,它为两个Peer之间的通信提供了简单的、单向的、异步的通道。换句话说,管道把多个Peer终端连接到了一起。接收端称为输入管道,发送端称为输出管道。要让输出管道能够把消息发送到接收管道,两个管道首先必须互相认知对方。例如,聊天应用的一个实例能够把消息发送给另一个实例之前,两个实例必须互相认知对方。应用的一个实例必须把自己“介绍给”其他实例。这是如何完成的呢?

在JXTA术语中,让其他Peer知道某个Peer已经在线叫做广告(advertising)。从技术上看,advertising意味着把一个广告发送给网络上的所有用户。这里的广告可以是任意网络资源的标识符,可以用来描述各种核心对象,例如Peer、PeerGroup、Service、Pipe、Content或Endpoint。JXTA广告是与平台无关的,一般以XML文档的形式提供。

在聊天程序中,由于管道是信息交换的核心对象,PipeAdvertisement代表着各个聊天用户。PipeAdvertisement描述了一个管道通信通道,管道服务利用它创建关联的输入和输出管道终端。

当一个Peer发送它的广告给另一个Peer,它可以期待另一个Peer的回应,即发回它的广告。按照这种方式,双方都将拥有对方的广告。广告由应用缓冲,以便以后使用,当应用需要广播一个消息时会用到这些缓冲的广告。JXTA自动在cm子目录的特定目录下,以文件的形式(文件名字唯一)缓冲广告。对于本文的聊天应用,JXTA创建了一个名为e7d1115b的目录。在这个目录下,有子目录Adv、Groups、PeerInfo、Peers、private、public和tmp,如图三所示。所有与聊天应用联系的Peer的所有PipeAdvertisement都保存在Adv目录下,包括聊天应用自己的PipeAdvertisement。

基于JXTA的P2P应用开发,图三:cm目录结构

图三:cm目录结构

当一个用户登录,聊天应用首先检查所有本地缓冲的PipeAdvertisement。如果其中一个缓冲的PipeAdvertisement的用户名字与聊天用户输入的名字相同,则聊天应用将向用户提示名字已经被使用。这种提示可能是错误的,因为以前使用该用户名字的远程用户当时不一定登录。另外,由于聊天应用缓冲了它自己的PipeAdvertisement,用户不能在下一次会话中使用同一个名字。因此,如果必要,在启动应用之前可以清除Adv目录。

当聊天应用搜索完所有本地缓冲的PipeAdvertisement,且不能找到匹配的用户名字时,它将把搜索扩展到远程用户。如果用户名字已经被某个远程用户使用,聊天应用将向试图登录的用户提示该名字不可用。
发送和接收消息

在消息输入框输入消息内容,按Enter键广播消息。这时,聊天应用提取出所有缓冲的PipeAdvertisement,利用循环遍历每一个PipeAdvertisement,配置通信通道,构造消息,再通过输出管道把消息发送给远程Peer。在JXTA中,消息由net.jxta.endpoint包的Message接口描述,输出管道由net.jxta.pipe包的OutputPipe接口描述。

为了回应传入的消息,聊天程序(P2PWithJXTA类)应该一直监听着消息。P2PWithJXTA类实现了Runnable接口,为监听任务构造了一个线程。run()方法利用一个没有结束条件的while循环,实现对传入消息的监听。

为了接收消息,JXTA应用必须有一个输入管道。输入管道由net.jxta.pipe包的InputPipe接口描述。接收到消息时,InputPipe的waitForMessage()方法返回一个Message对象。Message对象定义了一个XML信封,允许传输任意类型的数据。消息可以包含任意数量的命名区域,这些区域可以包含任意形式的数据。在XML文档体内利用Base64编码技术,Message对象甚至可以封装二进制数据。获得Message对象之后,接下来就可以提取Message对象的内容,并把它显示到聊天应用的屏幕上。

编写代码

整个聊天应用由一个P2PWithJXTA类实现。P2PWithJXTA首先声明一系列作用范围为类的域:

private PeerGroup group;
private Discovery discovery;
private PipeAdvertisement userAdv;
private Advertisement adv;
private Pipe pipe;

PeerGroup接口描述了Peer组。利用Peer组的成员服务,Peer可以加入和参与到组。Peer组通常由一系列认可一组共同服务的Peer构成。每一个Peer组对应一个唯一的Peer组ID以及一个Peer组广告,Peer组广告定义了各种组服务(membership,discovery,resolver,等等)。 JXTA带有两个Peer组子类:

latform:该子类描述了默认的World组。每一个Peer启动时自动成为该组的一部分。这个组提供了寻找其他组所需要的最低限度的核心服务。World组的ID是不变的。

StdPeergroup:当前,该子类用来实现所有其他类型的Peer组。这类组中的第一个称为the Net Peer Group。平台启动之后,它可以(可选地)在本地网络上搜索The Net Peer Group,如果找到,则创建其实例。否则,实例化默认配置的The Net Peer Group。在启动Peer的网络域内,网络管理员可以设置一个非默认配置的The Net Peer Group。

每一个网络资源都由一个广告描述,Peer、组、管道、服务都有广告。当一个网络资源发布了广告,远程Peer需要某种方式找到该广告。JXTA通过提供DiscoveryService使得这种发现成为可能。DiscoveryService提供了一种发现广告的异步机制。发现操作的范围可以通过指定一个名字/属性对和一个极限控制,这个极限是请求的Peer指定的广告数量的上限——应答的Peer不能超过这个限制。每一个PeerGroup有一个DiscoveryService的实例,发现操作的范围局限于组内。

P2PWithJXTA必须实现net.jxta.platform.Application接口,包括Application接口的三个方法:init()、startApp()和stopApp()。在本例中,startApp()和stopApp()实际上不做任何事情,这是允许的。init()方法将在JXTA平台启动之后被调用。

JXTA平台传递给init()方法的参数是一个PeerGroup对象group和一个Advertisement对象adv,其中group是本应用从中启动的PeerGroup,adv是应用的广告。 init()方法执行两方面的操作。首先,它初始化变量,

this.adv = adv;
this.group = group;
pipe = group.getPipe();
discovery = group.getDiscovery();

接着,init()方法调用prepareFrame()方法设置应用的界面布局。

聊天程序利用一个名为screen的JTextArea显示所有的消息,用一个名为username的JTextField让用户输入名字。输入名字后,用户点击名为login的JButton就可以登录。最后,用户可以在名为messageBox的JTextField中输入需要广播的消息。

有两个ActionListener对象分别被加入到login JButton和messageBox JTextField(参见下面的代码片断)。第一个对象的作用是,当用户点击login JButton时,调用login()方法,并传入username JTextField中输入的文字。第二个ActionListener对象的作用是,当用户在messageBox JTextField中输入文字并按下Enter键,它将调用sendAll()方法,并清除messageBox。

login.addActionListener(
   new java.awt.event.ActionListener() {
   public void actionPerformed(ActionEvent e) {
      login(username.getText());
   }
});
messageBox.addActionListener(
   new java.awt.event.ActionListener() {
   public void actionPerformed(ActionEvent e) {
      sendAll();
      messageBox.setText("");
   }
});

toDisplay()方法接受一个String类型的参数,并把这个String插入到screen JTextArea。

findUserAd()方法的参数是一个广告名字,如果找到与该名字匹配的广告,findUserAd()返回广告;否则,返回null。findUserAd()方法首先在本地搜索广告,如果没有找到,则进行远程搜索。

首先,findUserAd()方法调用Discovery接口的getLocalAdvertisements()方法。getLocalAdvertisements()方法返回一个Enumeration对象,其中包含以前缓冲的名字为“JxtaChatUserName:name”的广告。如果getLocalAdvertisements()的返回值不是null,findUserAd()方法遍历Enumeration enum,返回第一个符合条件的广告。 如下所示:

Enumeration enum = null;
// 搜索本地广告
try {
   enum = discovery.getLocalAdvertisements(Discovery.ADV,
      "name", CHAT_NAME_FLAG + ":" + name);
   if (enum != null) {
      PipeAdvertisement adv = null;
      while (enum.hasMoreElements()) {
         try {
            adv = (PipeAdvertisement) enum.nextElement();
            if (adv!=null && adv.getName()!=null &&
               adv.getName().equals ( CHAT_NAME_FLAG + ":" + name))
               return adv;
         }
         catch(Exception e) {
            continue;
         }
      }
   }
}

如果未能找到匹配的广告,findUserAd()方法调用Discovery接口的getRemoteAdvertisements()方法,执行远程查找:

discovery.getRemoteAdvertisements(null, Discovery.ADV,
   "name", CHAT_NAME_FLAG + ":" + name, 2);

 依赖于网络的响应能力,getRemoteAdvertisements()方法可能需要一段时间才能返回:

// 等待并获取远程应答
try {
   if (i > MAX_RETRY) break;
   Thread.sleep(WAIT_TIME);
   i++;
} catch (Exception e) { }

如果找到匹配的广告,则广告被缓冲到本地。因此,这时必须再次调用getLocalAdvertisements()方法,才能返回符合搜索条件的广告。

login()方法接受一个名为name的String参数,用一个名为userLoggedIn的boolean值检查用户是否已经登录。在应用生存期间,聊天应用只允许用户登录一次。login()方法首先检查名字是否已经被另外一个用户使用。如是,login()方法提示用户名字已经被使用,然后返回:

PipeAdvertisement adv = findUserAd(name);
if (adv != null) {
  toDisplay(name + ":该名字已经被使用");
  return;
}

如果名字没有被使用,login()方法利用AdvertisementFactory的newAdvertisment()方法创建一个新的广告,把它定型(cast)成为PipeAdvertisement,然后设置新广告的名字和标识符:

adv = (PipeAdvertisement)  AdvertisementFactory.newAdvertisement(
   PipeAdvertisement.getAdvertisementType() );
}
......
adv.setPipeID( new PipeID(group.getID() ) );
adv.setName(CHAT_NAME_FLAG + ":" + name);

创建好PipeAdvertisement(adv)之后,login()方法执行本地和远程的广告发布操作,声明该用户的存在:

discovery.publish(adv, Discovery.ADV);
discovery.remotePublish(adv, Discovery.ADV);

至此为止,聊天应用已经可以发送消息。为了接收消息,必须启动执行监听任务的线程。

sendAll()方法提取所有本地缓冲的PipeAdvertisement,以用户输入的文字为基础构造出一个消息,然后把消息发送给每一个PipeAdvertisement已经缓冲的远程用户。首先,sendAll()方法获取发送者的用户名字。接着,sendAll()方法调用getAllPipeAd()方法,获取一个包含所有本地缓冲的PipeAdvertisement的Vector。然后,sendAll()方法利用循环依次访问各个PipeAdvertisement,利用Pipe接口的createOutputPipe()方法构造一个OutputPipe对象。调用createOutputPipe()方法时传入找到的PipeAdvertisement。如下所示:

// 广播广告
int length = pipes.size();
for (int i = 0; i<="" {="" e)="" (exception="" catch="" }="" 1000);="" pipe.nonblocking,="" pipeout="null;" 可能需要等待更长时间="" 1000这个值仅用于测试,="" try="" outputpipe="" pipes.get(i);="" adv="(PipeAdvertisement)" pipeadvertisement="" i++)="" />

如果成功地创建了OutputPipe对象,接下来开始构造Message对象:

if (pipeOut != null) {
   Message msg = null;
   String userInput = null;
   InputStream inputStream = null;
   try {
      // 构造一个消息
      msg = pipe.createMessage();

Message对象的push()方法用来以InputStream的形式插入一组数据。因此,先把用户消息(messageBox.getText())转换成一个Byte数组,再转换成ByteArrayInputStream,然后,把InputStream以参数的形式提供给push()方法。另外,我们还要在消息中加入发送者的名字。为此,先从sender字符串构造出另一个InputStream,再把它作为参数传递给push()方法:

inputStream = new ByteArrayInputStream(
     messageBox.getText().getBytes());
msg.push(SENDER_TEXT, inputStream);
inputStream = new ByteArrayInputStream(sender.getBytes());
msg.push(SENDER_ID, inputStream);

准备好Message对象之后,把它作为参数传递给OutputPipe类的send()方法,把消息发送给接收的Peer:

pipeOut.send(msg);

最后,关闭OutputPipe:

pipeOut.close();

run()方法执行与sendAll()方法相反的操作。run()方法创建一个InputPipe,监听传入的消息,提取出Message对象的用户名字和内容,并调用toDisplay()方法显示出消息。

首先,run()方法利用当前Pipe对象(pipe)的createInputPipe()方法创建一个InputPipe。如果createInputPipe()方法返回null,则显示“不能打开InputPipe”错误信息;反之,如果成功地创建了InputPipe,run()方法开始一个无结束条件的while循环,监听传入的消息,如下所示:

// 监听传入的消息
while (true) {
   try {
      msg = pipeIn.waitForMessage();
      if (msg==null && Thread.interrupted()) {
         // 要求线程停止运行
         toDisplay("结束监听");
         pipeIn.close();
         return;
      }
   }
   catch (Exception e) {
      // 不能接收消息
      // 执行清除操作
      pipeIn.close();
      toDisplay("---> 结束监听");
      return;
   }

一旦接收到消息,run()方法获取消息的发送者名字和内容。Message接口的pop()方法执行与push()方法相反的操作:提取出发送者的名字和消息内容。 下面是传入消息的处理过程:

InputStream inputStream = null;
Enumeration enum = msg.getNames();
// 空消息
if ((enum==null) || (!enum.hasMoreElements())) { continue; }
   String from = "Anonymous";
   byte[] buffer = null;
   // 获取有关发送者的信息
   try {
      inputStream = msg.pop(SENDER_ID);
      if (inputStream != null) {
         buffer = new byte [inputStream.available()];
         inputStream.read(buffer);
         from = new String(buffer);
      }
   }
   catch (Exception e) {
      // 没有用户信息
   }
   try {
      inputStream = msg.pop(SENDER_TEXT);
      if (inputStream != null) {
         buffer = new byte [inputStream.available()];
         inputStream.read(buffer);
         toDisplay("[" + from + "] " + (new String(buffer)));
      }
      else {
         toDisplay("---> 来自以下用户的空消息:" + from);
         continue;
      }
   }
   catch (Exception e) {
      toDisplay("---> 无法读取来自以下用户的消息:" + from);
   }
}

运行JXTA应用

运行任何JXTA应用都必须先启动JXTA平台。JXTA应用的入口点是net.jxta.impl.peergroup.Boot类的static void main()方法。为了让Boot类能够在JXTA平台启动后调用聊天应用,必须配置jxtaConfig文件。

在jxtaConfig文件中,InitialNetPeerGroupAppCode的值决定了将要被调用的类,默认值是JXTA Shell应用的Shell类,现在应该用聊天应用的类名称替换这个值。jxtaConfig文件中的#字符用来注释一个行。为便于切换到原始的配置文件,最好保留原来的InitialNetPeerGroupAppCode值。下面是配置本文聊天应用的jxtaConfig文件片段:

#InitialNetPeerGroupAppCode=net.jxta.impl.shell.bin.Shell.Shell
InitialNetPeerGroupAppCode=testjxtaapp.P2PWithJXTA

最后要做的事情就是调用Boot类:

Java -cp jxta.jar net.jxta.impl.peergroup.Boot

(佚名)
本站文章除注明转载外,均为本站原创或编译欢迎任何形式的转载,但请务必注明出处,尊重他人劳动,同学习共成长。转载请注明:文章转载自:罗索实验室 [http://www.rosoo.net/a/200908/7392.html]
本文出处:网络博客 作者:佚名
顶一下
(2)
100%
踩一下
(0)
0%
------分隔线----------------------------
发表评论
请自觉遵守互联网相关的政策法规,严禁发布色情、暴力、反动的言论。
评价:
表情:
用户名: 验证码:点击我更换图片
栏目列表
将本文分享到微信
织梦二维码生成器
推荐内容