`
joe_zhjiang
  • 浏览: 155704 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Mina NIO Socket

    博客分类:
  • mina
阅读更多
Mina NIO Socket个人总结,其中包括重连机制,自定义解码器,需要加入的jar包log4j.jar,mina-core-2.0.1.jar,slf4j-api-1.4.2.jar,slf4j-log4j12-1.4.2.jar,也希望给接触者一些帮助。解码器感觉有点麻烦,各位指教。我的解码器"]"为一条消息的结束标记。后面附源码,如有更好方法请留言。
Server

package com.joe.server;

import java.net.InetSocketAddress;

import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.executor.OrderedThreadPoolExecutor;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

import com.joe.codec.decoder.MyMessageDecoder;
import com.joe.codec.encoder.MyMessageEncoder;
import com.joe.handler.ServerIoHandler;

/**
 * @author joe
 */
public class MyServer {
    private NioSocketAcceptor   acceptor;
    /**
     * Constructor
     */
    public MyServer() {
        try {
            acceptor = new NioSocketAcceptor();
            acceptor.getFilterChain().addLast("threadPool",
                    new ExecutorFilter(new OrderedThreadPoolExecutor()));// 设置线程池,以支持多线程
            acceptor.getFilterChain().addLast("logger", new LoggingFilter());
            /**
             * 默认编码器,解码器,遇到\n默认消息结束
             * 当然可以加参数指定解码字符,但解码字符会被截掉
             * 例如:new TextLineCodecFactory(Charset.forName("UTF-8"),"]","]");
             * 则会认为"]"为一条消息结束,遇到"]"则截取
             * 比如服务器给你发送的消息是aaaa]aaaa]
             * 会收到两条消息:
             * 1、aaaa
             * 2、aaaa
             * 后面的"]"则去掉了
             */
//            acceptor.getFilterChain().addLast(
//                    "codec",
//                    new ProtocolCodecFilter(new TextLineCodecFactory(Charset
//                            .forName("UTF-8"))));// 指定编码过滤器
            DemuxingProtocolCodecFactory pcf = new DemuxingProtocolCodecFactory();
            //自定义编码器
            pcf.addMessageEncoder(String.class, new MyMessageEncoder());
            //自定义解码器
            pcf.addMessageDecoder(new MyMessageDecoder());
            ProtocolCodecFilter codec = new ProtocolCodecFilter(pcf);
            acceptor.getFilterChain().addLast("codec",codec);// 指定编码过滤器
            
            acceptor.setReuseAddress(true);
            acceptor.setHandler(new ServerIoHandler());// 指定业务逻辑处理器
            acceptor.setDefaultLocalAddress(new InetSocketAddress(8888));// 设置端口号
            acceptor.bind();// 启动监听
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * @param args
     */
    public static void main(String[] args) {
        new MyServer();
    }
}



Server IoHandler

package com.joe.handler;

import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

public class ServerIoHandler implements IoHandler {

    public void exceptionCaught(IoSession session, Throwable pArg1)
            throws Exception {
        
    }

    public void messageReceived(IoSession session, Object obj) throws Exception {
        //收到的信息
        System.out.println(obj.toString());
    }

    public void messageSent(IoSession session, Object pArg1) throws Exception {
        
    }

    public void sessionClosed(IoSession session) throws Exception {
        
    }

    public void sessionCreated(IoSession session) throws Exception {
        
    }

    public void sessionIdle(IoSession session, IdleStatus pArg1) throws Exception {
        
    }

    public void sessionOpened(IoSession session) throws Exception {
        session.write("[Server: Client,I'm server.][Server: Client,I'm server.]");
    }

}



client

package com.joe.client;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;

import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

import com.joe.codec.decoder.MyMessageDecoder;
import com.joe.codec.encoder.MyMessageEncoder;
import com.joe.handler.ClientIoHandler;

/**
 * @author joe
 */
public class MyClient {
    private NioSocketConnector  connector;
    /**
     * Constructor
     */
    public MyClient() {
        connector = new NioSocketConnector();
        /**
         * 设置信息交换的IoHandler,负责接收和发送信息的处理
         */
        connector.setHandler(new ClientIoHandler());
        //配置过滤器
        DefaultIoFilterChainBuilder chain = connector.getFilterChain();
        //增加日志过滤器
        chain.addLast("logger", new LoggingFilter());
        //增加字符编码过滤器以及设置编码器和解码器
        //chain.addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
        /**
         * 默认编码器,解码器,遇到\n默认消息结束
         * 当然可以加参数指定解码字符,但解码字符会被截掉
         * 例如:new TextLineCodecFactory(Charset.forName("UTF-8"),"]","]");
         * 则会认为"]"为一条消息结束,遇到"]"则截取
         * 比如服务器给你发送的消息是aaaa]aaaa]
         * 会收到两条消息:
         * 1、aaaa
         * 2、aaaa
         * 后面的"]"则去掉了
         */
//        acceptor.getFilterChain().addLast(
//                "codec",
//                new ProtocolCodecFilter(new TextLineCodecFactory(Charset
//                        .forName("UTF-8"))));// 指定编码过滤器
        DemuxingProtocolCodecFactory pcf = new DemuxingProtocolCodecFactory();
        //自定义编码器
        pcf.addMessageEncoder(String.class, new MyMessageEncoder());
        //自定义解码器
        pcf.addMessageDecoder(new MyMessageDecoder());
        ProtocolCodecFilter codec = new ProtocolCodecFilter(pcf);
        chain.addLast("codec",codec);// 指定编码过滤器
        //设置默认连接的地址和端口
        connector.setDefaultRemoteAddress(new InetSocketAddress("localhost", 8888));
        /**
         * 重连机制
         * 如果没有连接,则过30 * 1000毫秒客户端会尝试重新连接服务器
         * 如果连接,则下面的代码不会执行
         */
        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                if (null != connector && !connector.isActive()) {
                    try {
                        //尝试连接默认的地址和端口
                        ConnectFuture connFuture = connector.connect();
                        connFuture.awaitUninterruptibly();
                    } catch (Exception e) {
                        // TODO: handle exception
                        e.printStackTrace();
                    }
                }
            }
        }, new Date(), 30 * 1000);
    }
    /**
     * @param args
     */
    public static void main(String[] args) {
        new MyClient();
    }

}



Client IoHandler

package com.joe.handler;

import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

public class ClientIoHandler implements IoHandler {

    public void exceptionCaught(IoSession session, Throwable throwable)
            throws Exception {
        
    }

    public void messageReceived(IoSession session, Object obj) throws Exception {
        //收到的内容
        System.out.println(obj.toString());
    }

    public void messageSent(IoSession session, Object obj) throws Exception {
        
    }

    public void sessionClosed(IoSession session) throws Exception {
        
    }

    public void sessionCreated(IoSession session) throws Exception {
        
    }

    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
        
    }

    public void sessionOpened(IoSession session) throws Exception {
        session.write("[Client: Server,I'm client.][Client: Server,I'm client.]");
    }

}



自定义编码器(MessageEncoder)

package com.joe.codec.encoder;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.apache.mina.filter.codec.demux.MessageEncoder;

/**
 * @author joe
 * @param <T>
 */
public class MyMessageEncoder implements MessageEncoder<String> {

    /**
     * 编码器未做任何处理
     * @param session 
     * @param msg 
     * @param out 
     * @throws Exception 
     */
    public void encode(IoSession session, String msg,
            ProtocolEncoderOutput out) throws Exception {
        IoBuffer buf = IoBuffer.allocate(msg.getBytes().length);
        buf.put(msg.getBytes());
        buf.flip();
        out.write(buf);
    }

}



自定义解码器(MessageDecoder)

package com.joe.codec.decoder;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.filter.codec.demux.MessageDecoder;
import org.apache.mina.filter.codec.demux.MessageDecoderResult;

/**
 * @author joe
 */
public class MyMessageDecoder implements MessageDecoder {
    //消息的开始
    private int flag = 0;
    //消息的长度
    private int length = 0;
    //消息的结尾
    private int flaglast = 0;
    //不是第一条消息
    private boolean notfirstmessage = false;
    
    public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
        int rem = in.remaining();
        int fornumber;
        byte aa;
        if (notfirstmessage) {
            flag++;
            fornumber = rem + flag;
        } else {
            flag = 0;
            fornumber = rem + flag;
        }
        try {
            for (int i = flag; i < fornumber; i++) {
                aa = in.get(i);
                if (']' == aa) {
                    flaglast = flag;
                    flag = i;
                    length = flag - flaglast;
                    notfirstmessage = true;
                    return MessageDecoderResult.OK;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        notfirstmessage = false;
        return MessageDecoderResult.NEED_DATA;
    }

    public MessageDecoderResult decode(IoSession session, IoBuffer in,
            ProtocolDecoderOutput out) throws Exception {
        try {
            if (length == 0 || length == 1) {
                in.get();
                out.write("");
                return MessageDecoderResult.OK;
            }
            length++;
            byte[] result = new byte[length];
            for (int i = 0; i < length; i++) {
                result[i] = in.get();
            }
            if (0 == in.remaining()) {
                notfirstmessage = false;
            }
            String cont = new String(result, "us-ascii");
            out.write(cont.trim());
            return MessageDecoderResult.OK;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return MessageDecoderResult.OK;
    }

    public void finishDecode(IoSession session, ProtocolDecoderOutput out)
            throws Exception {
    }

}

分享到:
评论
4 楼 webking168 2013-06-15  
你好,我刚接触mina,谢谢你的分享。
有个问题请教一下:下面这两句分别放到代码里的哪个位置?
joe_zhjiang 写道
补充:心跳信息
       //session空闲60秒给服务器发空闲的信息,即心跳信息
        connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60);

具体发送心跳信息的地方在IoHanlder的sessionIdle方法中
       if(pStatus.equals(IdleStatus.BOTH_IDLE)){
	    pSession.write("心跳消息");
       }


3 楼 joe_zhjiang 2011-09-14  
bluky999 写道
写MINA主要的确实是解码器,呵

你的消息分割符']' 不是很理想,如果消息体中出现这个字符呢?不是就BUG了嘛

常用的消息报文设计有以下三种:
1 定长消息体: 比如一条消息就定义1024长,就算消息内容不够,也要多传空字节;
2 定界消息体: 类似你的做法,用一个特殊符号来标记起始或者结束,这种情况一般用不可见的字符,比如0x6a啊之类的;
3 定长消息头: 消息报文由消息头+消息体两部分组成,头部固定长度,里面包含了消息体的长度 。

不过也有冗余或混合设计的,比如 定长消息头+定界消息体 ,也很常见,只要方便处理即可。

谢谢,我只不过是根据我项目内的需求来写的,当然会有缺陷。
2 楼 bluky999 2011-09-09  
写MINA主要的确实是解码器,呵

你的消息分割符']' 不是很理想,如果消息体中出现这个字符呢?不是就BUG了嘛

常用的消息报文设计有以下三种:
1 定长消息体: 比如一条消息就定义1024长,就算消息内容不够,也要多传空字节;
2 定界消息体: 类似你的做法,用一个特殊符号来标记起始或者结束,这种情况一般用不可见的字符,比如0x6a啊之类的;
3 定长消息头: 消息报文由消息头+消息体两部分组成,头部固定长度,里面包含了消息体的长度 。

不过也有冗余或混合设计的,比如 定长消息头+定界消息体 ,也很常见,只要方便处理即可。
1 楼 joe_zhjiang 2010-11-08  
补充:心跳信息
       //session空闲60秒给服务器发空闲的信息,即心跳信息
        connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60);

具体发送心跳信息的地方在IoHanlder的sessionIdle方法中
       if(pStatus.equals(IdleStatus.BOTH_IDLE)){
	    pSession.write("心跳消息");
       }

相关推荐

Global site tag (gtag.js) - Google Analytics