TAG:
package AudioCaptureTransmit; import java.io.*; import java.net.*; import javax.media.*; import javax.media.format.*; import java.util.*; /** * Title: * Description: * Copyright: Copyright (c) 2003 * Company: * @author not attributable * @version 1.0 */ public class AudioCapture { public static void main(String[] args) { try { CreateJoinMuiltcastThread(args[0]); //将输入的IP地址和端口号传入组播线程 createJoinMuiltcastThread.start(); //启动线程 AudioSenderThread audioSenderThread = new AudioSenderThread(args[0]); //将输入的IP地址和端口号传入发送线程中 audioSenderThread.start(); //启动线程开始传输 } catch (Exception e) {} } } //定义创建及加入组播组线程CreateJoinMuiltcastThread class CreateJoinMuiltcastThread extends Thread { MulticastSocket socket; //声明建立组播组使用的MulticastSocket类 InetAddress group; //声明建立组播组使用的组播组地址 int port; //声明加入和离开组播组用的组播组地址 String MuiltAddr; public CreateJoinMuiltcastThread(String MuiltAddr) throws SocketException { this.MuiltAddr = MuiltAddr; } public void run() { try { SessionAdd multicastGroup = new SessionAdd(MuiltAddr); port = multicastGroup.ports; //设置组播端口 group = InetAddress.getByName(multicastGroup.addr); //设置组播地址 socket = new MulticastSocket(port); //创建MulticastSocket类并将端口与之关联 socket.setTimeToLive(1); socket.joinGroup(group); //加入此组播组 } } catch (Exception e) {} } } //定义音频发送线程AudioSenderThread class AudioSenderThread extends Thread { MulticastSocket socket; //声明建立组播组使用的MulticastSocket类 InetAddress group; //声明建立组播组使用的组播组地址 int port; //声明加入和离开组播组用的组播组地址 String SendLAddr; public AudioSenderThread(String SendLAddr) throws SocketException { this.SendLAddr = SendLAddr; } public void run() { try { RTPTransmit rtpTransmit = null; String encoding = AudioFormat.GSM_RTP; //定义传输模式为GSM_RTP double rateSample = 8000.0; //定义采样频率为8000 int bitsPerSample = 8; //定义采样位数为8位 int channel = 1; //得到通道数,‘1’为单通道,‘2’为双通道即立体声 AudioFormat audioFormat = new AudioFormat(encoding, rateSample, bitsPerSample, channel); //根据用户的设置构造音频编码方式 Vector deviceList = CaptureDeviceManager.getDeviceList(new AudioFormat( AudioFormat.LINEAR)); //得到音频采集设备列表 CaptureDeviceInfo di = null; if (deviceList.size() > 0) { //如果设备列表不为空 di = (CaptureDeviceInfo) deviceList.firstElement(); //将得到的第一种设备作为本系统的音频采集设备 } else { System.err.println("找不到合适的音频采集设备!"); return; } MediaLocator mediaLoc = di.getLocator(); //由采集设备得到媒体定位器 rtpTransmit = new RTPTransmit(mediaLoc, SendLAddr, audioFormat); String result = rtpTransmit.start(); //开始传输同时也开始了采集 if (result != null) { System.err.println("错误:" + result); } else { System.err.println("开始传输....."); } } catch (Exception e) { System.out.println(e); } } } package AudioCaptureTransmit; import java.io.*; import java.net.InetAddress; import javax.media.*; import javax.media.protocol.*; import javax.media.protocol.DataSource; import javax.media.format.*; import javax.media.control.TrackControl; import javax.media.rtp.*; /** * Title: * Description: * Copyright: Copyright (c) 2003 * Company: * @author not attributable * @version 1.0 */ public class RTPTransmit { private MediaLocator locator; // 媒体定位,可以是一个本机文件,也可以是一个网络文件或采集设备得到的数据源 private String session; //取得会话字符串 private int SendLPort = 65534; //本地端口号 private AudioFormat audioFormat = null; private Processor processor = null; // 处理器 private RTPManager rtpMgrs[]; // RTP管理器 private DataSource dataOutput = null; // 输出的数据源 // 构造函数 public RTPTransmit(MediaLocator locator, String session, Format format) { this.locator = locator; this.session = session; audioFormat = (AudioFormat) format; } // 开始传输 // 如果一切正常,就返回 null,否则返回出错原因 public synchronized String start() { String result; result = createProcessor(); // 产生一个处理器 if (result != null) { return result; } result = createTransmitter(); // 产生RTP会话,将处理器输出的数据传给指定的IP地址的指定的端口号 if (result != null) { processor.close(); processor = null; return result; } processor.start(); // 让处理器开始传输 return null; } // 为指定的媒体定位器产生一个处理器 private String createProcessor() { if (locator == null) { return "媒体定位器为空"; } DataSource ds; DataSource clone; try { ds = javax.media.Manager.createDataSource(locator); // 通过媒体定位器产生一个数据源 } catch (Exception e) { return "不能创建数据源"; } try { processor = javax.media.Manager.createProcessor(ds); // 通过数据源来产生一个处理器 } catch (NoProcessorException npe) { return "不能创建处理器"; } catch (IOException ioe) { return "创建处理器异常"; } boolean result = waitForState(processor, Processor.Configured); // 等待处理器配置好 if (result == false) { return "不能配置处理器"; } TrackControl[] tracks = processor.getTrackControls(); // 为媒体流中的每一个磁道得到一个控制器 if (tracks == null || tracks.length < 1) { // 确保至少有一个可用的磁道 return "找不到处理器轨道"; } ContentDescriptor cd = new ContentDescriptor(ContentDescriptor.RAW_RTP); processor.setContentDescriptor(cd); // 设置输出的内容描述为RAW_RTP // 从而限定每个磁道支持的格式仅为合法的RTP格式,即它影响后面的 Track.getSupportedFormats() Format supported[]; Format chosen = null; boolean atLeastOneTrack = false; for (int i = 0; i < tracks.length; i++) { // 对每一个磁道,选择一种RTP支持的传输格式 Format format = tracks[i].getFormat(); if (tracks[i].isEnabled()) { supported = tracks[i].getSupportedFormats(); if (supported.length > 0) { for (int j = 0; j < supported.length; j++) { if (supported[j] instanceof AudioFormat) { // 如果某被支持的格式为音频,则检查其是否符合要求 AudioFormat temp = (AudioFormat) supported[j]; // 强制转换为音频格式 if (audioFormat.matches(temp)) { // 检查该格式是否与用户设定的音频格式相匹配 chosen = supported[j]; // 匹配则选中 break; } } } if (chosen == null) { return ("不支持选定的格式!"); } tracks[i].setFormat(chosen); System.err.println("轨道 " + i + " 传输设置:"); System.err.println(" " + chosen); atLeastOneTrack = true; } else { tracks[i].setEnabled(false); } } else { tracks[i].setEnabled(false); } } if (!atLeastOneTrack) { return "RTP格式无效不能设置任何轨道"; } result = waitForState(processor, Controller.Realized); // 等待处理器实现 if (result == false) { return "处理器不能实现"; } dataOutput = processor.getDataOutput(); // 从处理器得到输出的数据源 return null; } // 为处理器的每一个媒体磁道产生一个RTP会话 private String createTransmitter() { SessionAddress localAddr; SessionAddress destAddr; PushBufferDataSource pbds = (PushBufferDataSource) dataOutput; // 将数据源转化为“Push”(推)数据源 PushBufferStream pbss[] = pbds.getStreams(); // 得到“Push”数据流 rtpMgrs = new RTPManager[pbss.length]; // 为每个会话产生一个RTP会话管理器 for (int i = 0; i < pbss.length; i++) { try { rtpMgrs[i] = RTPManager.newInstance(); SessionAdd Sess = new SessionAdd(session); InetAddress ipAddr = InetAddress.getByName(Sess.addr); // 得到发送目的地的IP地址 if (ipAddr.isMulticastAddress()) { localAddr = new SessionAddress(ipAddr, Sess.ports); destAddr = new SessionAddress(ipAddr, Sess.ports); } //如果是组播,则本机地址和目的机地址相同 else { localAddr = new SessionAddress(InetAddress.getLocalHost(), SendLPort); // 得到本机的会话地址 SendLPort = SendLPort - 2; // 这里传输端使用和接收目的端不同的端口号(实际上也可以相同) destAddr = new SessionAddress(ipAddr, Sess.ports); // 得到目的机器的会话地址 } rtpMgrs[i].initialize(localAddr); // 将本机会话地址传给RTP管理器 rtpMgrs[i].addTarget(destAddr); // 加入目的会话地址 System.err.println("创建RTP会话: " + Sess.addr + " " + Sess.ports); SendStream sendStream = rtpMgrs[i].createSendStream(dataOutput, 0); // 产生数据源的RTP传输流 sendStream.start(); // 开始RTP传输 } catch (Exception e) { return e.getMessage(); } } return null; } // 停止传输 public void stop() { synchronized (this) { if (processor != null) { processor.stop(); processor.close(); processor = null; for (int i = 0; i < rtpMgrs.length; i++) { rtpMgrs[i].removeTargets("会话结束."); rtpMgrs[i].dispose(); } } } } // 以下两个变量为对处理器状态改变的处理服务 private Integer stateLock = new Integer(0); private boolean failed = false; // 得到状态锁 Integer getStateLock() { return stateLock; } // 设置失败标志 void setFailed() { failed = true; } // 等待处理器达到相应的状态 private synchronized boolean waitForState(Processor p, int state) { p.addControllerListener(new StateListener()); // 为处理器加上状态监听 failed = false; if (state == Processor.Configured) { // 配置处理器 p.configure(); } else if (state == Processor.Realized) { // 实现处理器 p.realize(); } // 一直等待,直到成功达到所需状态,或失败 while (p.getState() < state && !failed) { synchronized (getStateLock()) { try { getStateLock().wait(); } catch (InterruptedException ie) { return false; } } } if (failed) { return false; } else { return true; } } // 内部类:处理器的状态监听器 class StateListener implements ControllerListener { public void controllerUpdate(ControllerEvent ce) { // 如果在处理器配置或实现过程中出现错误,它将关闭 if (ce instanceof ControllerClosedEvent) { // 控制器关闭 setFailed(); // 对于所有的控制器事件,通知在waitForState方法中等待的线程 } if (ce instanceof ControllerEvent) { synchronized (getStateLock()) { getStateLock().notifyAll(); } } } } } package AudioCaptureTransmit; import java.net.*; /** * Title: * Description: * Copyright: Copyright (c) 2003 * Company: * @author not attributable * @version 1.0 */ public class SessionAdd { public String addr = null; public int ports; SessionAdd(String Session) throws IllegalArgumentException { //将异常抛出 int off; String portStr = null; if (Session != null && Session.length() > 0) { while (Session.length() > 1 && Session.charAt(0) == ''/'') { //如果字符串中第一个字符为‘/’,则将其去掉 Session = Session.substring(1); } off = Session.indexOf(''/''); //找到字符串中‘/’的位置 if (off == -1) { if (!Session.equals("")) { //如果字符串不为空 addr = Session; //则将字符串作为IP地址保存 } } else { addr = Session.substring(0, off); Session = Session.substring(off + 1); //得到第一个‘/’后面的子串 off = Session.indexOf(''/''); //找到子串中‘/’的位置 if (off == -1) { if (!Session.equals("")) { //如果子串不为空,则将其作为端口号保存 portStr = Session; } } } } if (addr == null) { //如果输入的IP地址为空,则抛出异常 throw new IllegalArgumentException(); } if (portStr != null) { try { Integer Integ = Integer.valueOf(portStr); if (Integ != null) { //如果得到的端口号不为空,则将端口号转化为整数 ports = Integ.intvalue(); } } catch (Throwable t) { throw new IllegalArgumentException(); } } else { throw new IllegalArgumentException(); } } } 在上面的这段程序中有三个类,其中transmit类是数据流的捕捉与传输,而sessionadd类是解释输入的IP地址和端口号的。在我昨天发的帖子上我曾经提到过,这段代码如果是针对点对点的语音传输的时候是没有问题的,但是如果输入的IP地址是组播地址的话,那么在一台机器上测试的时候,不需要创建组播组就可以正常的收发语音信息,但是,如果在网络中测试的时候,即使正常的创建了组播组也没有办法正常收发数据流,以上这部分代码是可以正常运行的。不知道哪位高手有兴趣能帮我研究一下,看看这段代码的问题到底在哪?是什么原因导致了,采用组播时单机测试能通过,而网络测试就不正常呢? (dingyh) |