1. package com.allinpay.mina; 
  2.  
  3. import java.net.InetSocketAddress; 
  4.  
  5. import org.apache.mina.filter.codec.ProtocolCodecFilter; 
  6. import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory; 
  7. import org.apache.mina.filter.executor.ExecutorFilter; 
  8. import org.apache.mina.filter.executor.OrderedThreadPoolExecutor; 
  9. import org.apache.mina.filter.logging.LoggingFilter; 
  10. import org.apache.mina.transport.socket.nio.NioSocketAcceptor; 
  11.  
  12.  
  13. public class MyServer { 
  14.     private NioSocketAcceptor acceptor; 
  15.  
  16.     public MyServer() { 
  17.         try { 
  18.             acceptor = new NioSocketAcceptor(); 
  19.             acceptor.getFilterChain().addLast("threadPool"
  20.                     new ExecutorFilter(new OrderedThreadPoolExecutor()));// 设置线程池,以支持多线程 
  21.             acceptor.getFilterChain().addLast("logger"new LoggingFilter()); 
  22.             // acceptor.getFilterChain().addLast( 
  23.             // "codec", 
  24.             // new ProtocolCodecFilter(new TextLineCodecFactory(Charset 
  25.             // .forName("UTF-8"))));// 指定编码过滤器 
  26.             DemuxingProtocolCodecFactory pcf = new DemuxingProtocolCodecFactory(); 
  27.             // 自定义编码器 
  28.             pcf.addMessageEncoder(String.classnew MyMessageEncoder()); 
  29.             // 自定义××× 
  30.             pcf.addMessageDecoder(new MyMessageDecoder()); 
  31.             ProtocolCodecFilter codec = new ProtocolCodecFilter(pcf); 
  32.             acceptor.getFilterChain().addLast("codec", codec);// 指定编码过滤器 
  33.             acceptor.setReuseAddress(true); 
  34.             acceptor.setHandler(new ServerIoHandler());// 指定业务逻辑处理器 
  35.             acceptor.setDefaultLocalAddress(new InetSocketAddress(10000));// 设置端口号 
  36.             acceptor.bind();// 启动监听 
  37.         } catch (Exception e) { 
  38.             e.printStackTrace(); 
  39.         } 
  40.     } 
  41.  
  42.     public static void main(String[] args) { 
  43.         new MyServer(); 
  44.     } 
 
  1. package com.allinpay.mina; 
  2.  
  3. import org.apache.mina.core.service.IoHandler; 
  4. import org.apache.mina.core.session.IdleStatus; 
  5. import org.apache.mina.core.session.IoSession; 
  6.  
  7. public class ServerIoHandler implements IoHandler { 
  8.     public void exceptionCaught(IoSession session, Throwable pArg1) 
  9.             throws Exception { 
  10.     } 
  11.  
  12.     public void messageReceived(IoSession session, Object obj) throws Exception { 
  13.         // 收到的信息 
  14.         System.out.println(obj.toString()); 
  15.     } 
  16.  
  17.     public void messageSent(IoSession session, Object pArg1) throws Exception { 
  18.     } 
  19.  
  20.     public void sessionClosed(IoSession session) throws Exception { 
  21.     } 
  22.  
  23.     public void sessionCreated(IoSession session) throws Exception { 
  24.     } 
  25.  
  26.     public void sessionIdle(IoSession session, IdleStatus pArg1) 
  27.             throws Exception { 
  28.     } 
  29.  
  30.     public void sessionOpened(IoSession session) throws Exception { 
  31.         session.write("[Server: Client,I'm server.][Server: Client,I'm server.]"); 
  32.     } 
 
  1. package com.allinpay.mina; 
  2.  
  3. import java.net.InetSocketAddress; 
  4. import java.util.Date; 
  5. import java.util.Timer; 
  6. import java.util.TimerTask; 
  7.  
  8. import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder; 
  9. import org.apache.mina.core.future.ConnectFuture; 
  10. import org.apache.mina.filter.codec.ProtocolCodecFilter; 
  11. import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory; 
  12. import org.apache.mina.filter.logging.LoggingFilter; 
  13. import org.apache.mina.transport.socket.nio.NioSocketConnector; 
  14.  
  15.  
  16. public class MyClient { 
  17.     private NioSocketConnector connector; 
  18.  
  19.     public MyClient() { 
  20.         connector = new NioSocketConnector(); 
  21.         connector.setHandler(new ClientIoHandler()); 
  22.         // 配置过滤器 
  23.         DefaultIoFilterChainBuilder chain = connector.getFilterChain(); 
  24.         // 增加日志过滤器 
  25.         chain.addLast("logger"new LoggingFilter()); 
  26.         // 增加字符编码过滤器以及设置编码器和××× 
  27.         // chain.addLast("codec", new ProtocolCodecFilter(new 
  28.         // TextLineCodecFactory(Charset.forName("UTF-8")))); 
  29.         // acceptor.getFilterChain().addLast( 
  30.         // "codec", 
  31.         // new ProtocolCodecFilter(new TextLineCodecFactory(Charset 
  32.         // .forName("UTF-8"))));// 指定编码过滤器 
  33.         DemuxingProtocolCodecFactory pcf = new DemuxingProtocolCodecFactory(); 
  34.         // 自定义编码器 
  35.         pcf.addMessageEncoder(String.classnew MyMessageEncoder()); 
  36.         // 自定义××× 
  37.         pcf.addMessageDecoder(new MyMessageDecoder()); 
  38.         ProtocolCodecFilter codec = new ProtocolCodecFilter(pcf); 
  39.         chain.addLast("codec", codec);// 指定编码过滤器 
  40.         // 设置默认连接的地址和端口 
  41.         connector.setDefaultRemoteAddress(new InetSocketAddress("localhost"
  42.                 10000)); 
  43.         new Timer().schedule(new TimerTask() { 
  44.             @Override 
  45.             public void run() { 
  46.                 if (null != connector && !connector.isActive()) { 
  47.                     try { 
  48.                         // 尝试连接默认的地址和端口 
  49.                         ConnectFuture connFuture = connector.connect(); 
  50.                         connFuture.awaitUninterruptibly(); 
  51.                     } catch (Exception e) { 
  52.                         // TODO: handle exception 
  53.                         e.printStackTrace(); 
  54.                     } 
  55.                 } 
  56.             } 
  57.         }, new Date(), 30 * 1000); 
  58.     } 
  59.  
  60.     public static void main(String[] args) { 
  61.         new MyClient(); 
  62.     } 
 
  1. package com.allinpay.mina; 
  2.  
  3. import org.apache.mina.core.service.IoHandler; 
  4. import org.apache.mina.core.session.IdleStatus; 
  5. import org.apache.mina.core.session.IoSession; 
  6.  
  7. public class ClientIoHandler implements IoHandler { 
  8.     public void exceptionCaught(IoSession session, Throwable throwable) 
  9.             throws Exception { 
  10.     } 
  11.  
  12.     public void messageReceived(IoSession session, Object obj) throws Exception { 
  13.         // 收到的内容 
  14.         System.out.println(obj.toString()); 
  15.     } 
  16.  
  17.     public void messageSent(IoSession session, Object obj) throws Exception { 
  18.     } 
  19.  
  20.     public void sessionClosed(IoSession session) throws Exception { 
  21.     } 
  22.  
  23.     public void sessionCreated(IoSession session) throws Exception { 
  24.     } 
  25.  
  26.     public void sessionIdle(IoSession session, IdleStatus status) 
  27.             throws Exception { 
  28.     } 
  29.  
  30.     public void sessionOpened(IoSession session) throws Exception { 
  31.         session.write("[Client: Server,I'm client.][Client: Server,I'm client.]"); 
  32.     } 
 
  1. package com.allinpay.mina; 
  2.  
  3. import org.apache.mina.core.buffer.IoBuffer; 
  4. import org.apache.mina.core.session.IoSession; 
  5. import org.apache.mina.filter.codec.ProtocolDecoderOutput; 
  6. import org.apache.mina.filter.codec.demux.MessageDecoder; 
  7. import org.apache.mina.filter.codec.demux.MessageDecoderResult; 
  8.  
  9. public class MyMessageDecoder implements MessageDecoder { 
  10. //  // 消息的开始 
  11. //  private int flag = 0; 
  12. //  // 消息的长度 
  13. //  private int length = 0; 
  14. //  // 消息的结尾 
  15. //  private int flaglast = 0; 
  16. //  // 不是第一条消息 
  17. //  private boolean notfirstmessage = false; 
  18. // 
  19. //  public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
     
  20. //      int rem = in.remaining(); 
  21. //      int fornumber; 
  22. //      byte aa; 
  23. //      if (notfirstmessage) {
     
  24. //          flag++; 
  25. //          fornumber = rem + flag; 
  26. //      } else {
     
  27. //          flag = 0; 
  28. //          fornumber = rem + flag; 
  29. //      } 
  30. //      try {
     
  31. //          for (int i = flag; i < fornumber; i++) {
     
  32. //              aa = in.get(i); 
  33. //              if (']' == aa) {
     
  34. //                  flaglast = flag; 
  35. //                  flag = i; 
  36. //                  length = flag - flaglast; 
  37. //                  notfirstmessage = true; 
  38. //                  return MessageDecoderResult.OK; 
  39. //              } 
  40. //          } 
  41. //      } catch (Exception e) {
     
  42. //          e.printStackTrace(); 
  43. //      } 
  44. //      notfirstmessage = false; 
  45. //      return MessageDecoderResult.NEED_DATA; 
  46. //  } 
  47. // 
  48. //  public MessageDecoderResult decode(IoSession session, IoBuffer in, 
  49. //          ProtocolDecoderOutput out) throws Exception {
     
  50. //      try {
     
  51. //          if (length == 0 || length == 1) {
     
  52. //              in.get(); 
  53. //              out.write(""); 
  54. //              return MessageDecoderResult.OK; 
  55. //          } 
  56. //          length++; 
  57. //          byte[] result = new byte[length]; 
  58. //          for (int i = 0; i < length; i++) {
     
  59. //              result[i] = in.get(); 
  60. //          } 
  61. //          if (0 == in.remaining()) {
     
  62. //              notfirstmessage = false; 
  63. //          } 
  64. //          String cont = new String(result, "us-ascii"); 
  65. //          out.write(cont.trim()); 
  66. //          return MessageDecoderResult.OK; 
  67. //      } catch (Exception e) {
     
  68. //          e.printStackTrace(); 
  69. //      } 
  70. //      return MessageDecoderResult.OK; 
  71. //  } 
  72.  
  73.     public void finishDecode(IoSession session, ProtocolDecoderOutput out) 
  74.             throws Exception { 
  75.     } 
  76.  
  77.     public MessageDecoderResult decodable(IoSession session, IoBuffer in) { 
  78.         String string = MinaUtil.ioBufferToString(in); 
  79.         if(null==string || "".equals(string)){ 
  80.             return MessageDecoderResult.NEED_DATA; 
  81.         } 
  82.         return MessageDecoderResult.OK; 
  83.     } 
  84.  
  85.     public MessageDecoderResult decode(IoSession session, IoBuffer in, 
  86.             ProtocolDecoderOutput out) throws Exception { 
  87.         String string = MinaUtil.ioBufferToString(in); 
  88.         out.write(string); 
  89.         return MessageDecoderResult.OK; 
  90.     } 
 
  1. package com.allinpay.mina; 
  2.  
  3. import org.apache.mina.core.buffer.IoBuffer; 
  4. import org.apache.mina.core.session.IoSession; 
  5. import org.apache.mina.filter.codec.ProtocolEncoderOutput; 
  6. import org.apache.mina.filter.codec.demux.MessageEncoder; 
  7.  
  8. public class MyMessageEncoder implements MessageEncoder<String> { 
  9.     public void encode(IoSession session, String msg, ProtocolEncoderOutput out) 
  10.             throws Exception { 
  11.         IoBuffer buf = IoBuffer.allocate(msg.getBytes().length); 
  12.         buf.put(msg.getBytes()); 
  13.         buf.flip(); 
  14.         out.write(buf); 
  15.     } 
 
  1. package com.allinpay.mina; 
  2.  
  3. import org.apache.mina.core.buffer.IoBuffer; 
  4. /** 
  5.  * MINA中IoBuffer、byte[]、String之间转换 
  6.  * @author huangyh1 
  7.  * 
  8.  */ 
  9. public class MinaUtil { 
  10.     /**   
  11.     * 将byte[]转换成string     
  12.     * @param butBuffer   
  13.     */   
  14.     public static String byteToString(byte [] b)    
  15.     {    
  16.            StringBuffer stringBuffer = new StringBuffer();    
  17.            for (int i = 0; i < b.length; i++)    
  18.            {    
  19.                stringBuffer.append((char) b [i]);    
  20.            }    
  21.            return stringBuffer.toString();    
  22.     }    
  23.        
  24.     /**   
  25.     * 将bytebuffer转换成string     
  26.     * @param str   
  27.     */   
  28.     public static IoBuffer stringToIoBuffer(String str)    
  29.     {    
  30.        
  31.            byte bt[] = str.getBytes();    
  32.        
  33.            IoBuffer ioBuffer = IoBuffer.allocate(bt.length);    
  34.            ioBuffer.put(bt, 0, bt.length);    
  35.            ioBuffer.flip();    
  36.            return ioBuffer;    
  37.     }    
  38.     /**   
  39.     * 将IoBuffer转换成string     
  40.     * @param str   
  41.     */   
  42.     public static IoBuffer byteToIoBuffer(byte [] bt,int length)    
  43.     {    
  44.        
  45.            IoBuffer ioBuffer = IoBuffer.allocate(length);    
  46.            ioBuffer.put(bt, 0, length);    
  47.            ioBuffer.flip();    
  48.            return ioBuffer;    
  49.     }    
  50.     /**   
  51.     * 将IoBuffer转换成byte     
  52.     * @param str   
  53.     */   
  54.     public static byte [] ioBufferToByte(Object message)    
  55.     {    
  56.           if (!(message instanceof IoBuffer))    
  57.           {    
  58.               return null;    
  59.           }    
  60.           IoBuffer ioBuffer = (IoBuffer)message;    
  61.           byte[] b = new byte[ioBuffer.limit()];    
  62.           ioBuffer.get(b);    
  63.           return b;    
  64.     }    
  65.     /**   
  66.     * 将IoBuffer转换成string     
  67.     * @param butBuffer   
  68.     */   
  69.     public static String ioBufferToString(Object message)    
  70.     {    
  71.           if (!(message instanceof IoBuffer))    
  72.           {    
  73.             return "";    
  74.           }    
  75.           IoBuffer ioBuffer = (IoBuffer) message;    
  76.           byte[] b = new byte [ioBuffer.limit()];    
  77.           ioBuffer.get(b);    
  78.           StringBuffer stringBuffer = new StringBuffer();    
  79.        
  80.           for (int i = 0; i < b.length; i++)    
  81.           {    
  82.        
  83.            stringBuffer.append((char) b [i]);    
  84.           }    
  85.            return stringBuffer.toString();    
  86.     }