`
cxw65066165
  • 浏览: 10886 次
社区版块
存档分类
最新评论

websocket - java 后台处理实例详解

阅读更多
websocket 的前台这里就不多说了 主要说一下后台的业务逻辑 。
以下是本人的websocket 协议解析,框架是基于mina + spring 做的。
需要的mina jar包有

mina-core-2.0.4.jar
mina-integration-beans-2.0.4.jar
mina-integration-jmx-2.0.4.jar
mina-integration-ognl-2.0.4.jar
mina-integration-spring-1.1.7.jar.zip
大家还没搞出来的可以参照一下,若不对的还希望各位大神指出或者线下交流 QQ:593040793



import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.codehaus.jackson.map.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;

import qq.web.model.Data;
import qq.web.service.MessageData;
import qq.web.service.UserService;

/**
*
* @author 程欣伟
*
*/
public class WebSocketIoHandler extends IoHandlerAdapter {


@Autowired private MessageData messageData;

@Autowired private UserService userService;

    public static final String INDEX_KEY = WebSocketIoHandler.class.getName() + ".INDEX";
   
    //key=sessionId  value = session   sid 和 session对应
    private   Map<Long, IoSession>  ioSessionMap = new HashMap<Long, IoSession>();
    //key = userId value = sessionId   用户和 sid 对应
    private Map<Integer,Long> userSessionMap =   new HashMap<Integer, Long>();
   
    /**
     * 将IoBuffer转换成string  
     * @author 程欣伟
     * @param message
     * @return
     */
    public String ioBufferToString(Object message) { 
    if (!(message instanceof IoBuffer)){ 
    return ""; 
    } 
    IoBuffer ioBuffer = (IoBuffer) message; 
    return new String(ioBuffer.array());
    } 
   
    /**
     * 当有请求消息时触发
     * @author 程欣伟
     * @return
     */
    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
//    System.out.println(ioBufferToString(message));
    //吧传入的消息转换成流
    IoBuffer buffer = (IoBuffer)message;
    //转换成字节数组
    byte[] b = new byte[buffer.limit()];
    buffer.get(b);
   
    //获取sessionId
    Long sid = session.getId();

    //如果没有此sessionId则代表第一次连接
    if (!ioSessionMap.containsKey(sid)) {
    //把此session放入map
    ioSessionMap.put(sid, session);
   
    byte[] bufferAry = buffer.array();
        String m = new String(bufferAry);
       
        //获取握手协议字符串
String sss = getSecWebSocketAccept(m);

buffer.clear();
buffer.put(sss.getBytes("utf-8"));

buffer.flip();
session.write(buffer);

buffer.free();
    } else {
    //存在session
    //解析传输的数据内容
    String str = decode(b);
//    System.out.println(roleStr);
   
   
    //--------------- 业务开始 ---------------- //
    ObjectMapper objectMapper = new ObjectMapper();
   
    Data data = null;
    try{
    data = objectMapper.readValue(str, Data.class);
    }catch(Exception e){
    return ;
    }
   
    try{
    if(data.getDataType().equals("message")){
    messageData.processMessage(data.getData());
    }else if(data.getDataType().equals("login")){
    userService.processUser(data.getData(),sid);
    }
    }catch(Exception e){
    //如果处理消息报错 则告知浏览器
    e.printStackTrace();
    data.setData("false");
    sendMessageToHtml(objectMapper.writeValueAsString(data), sid);
    }
   
    }
    }
   
   
    /**
     * 发送消息给浏览器
     * @author 程欣伟
     * @param msg
     * @param sid
     * @return
     */
    public  boolean sendMessageToHtml(String msg,Long sid) {
    boolean sendFlag = true;
    try{
    //获取字节数组
    byte[] bb = encode( msg);
   
    //创建IO流
    IoBuffer ioBuffer = IoBuffer.allocate(bb.length);
    //把字节数组写入流中
    ioBuffer.put(bb);
    //api 解释为翻转 但是目前不知道什么意思
    ioBuffer.flip();
   
    //同步块
    synchronized (ioSessionMap) {
    //获取所有的session
        IoSession ioSession = ioSessionMap.get(sid);
if (ioSession!=null&&ioSession.isConnected()) {
//复制一个新的buffer
IoBuffer  writeResult = ioBuffer.duplicate();
ioSession.write(writeResult);
}else{
sendFlag = false;
    }
        }
    ioBuffer.free();
    }catch(Exception e){
    e.printStackTrace();
    sendFlag=false;
    }
    return sendFlag;
    }
   

    @Override  
    public void sessionOpened(IoSession session) throws Exception {
        session.setAttribute(INDEX_KEY, 0);
    }

//    @Override  
//    public void sessionIdle( IoSession session, IdleStatus status ) throws Exception {  
//        System.out.println( "IDLE " + session.getIdleCount( status ));  
//    }
   

    /**
     * 当ws连接断开时触发
     */
@Override
public void sessionClosed(IoSession session) throws Exception {
//如果不连接的话 则删除
ioSessionMap.remove(session);;
}
   




/**
* 根据用户ID 获取 session连接
* @param userId
* @return
*/
public IoSession getSessionByUserId(int userId){

Long sid = userSessionMap.get(userId);
if(sid==null){
return null;
}
IoSession session = ioSessionMap.get(sid);
if(session==null){
userSessionMap.remove(userId);
return null;
}
return session;
}

   


/**
*
* @author 程欣伟
* 获取握手协议 字符串
* 首先要获取到请求头中的Sec-WebSocket-Key的值,再把这一段GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11
* 加到获取到的Sec-WebSocket-Key的值的后面,然后拿这个字符串做SHA-1 hash计算,然后再把得到的结果通过base64加密
* @param key
* @return
*/
private String getSecWebSocketAccept(String key) {
String secKey = getSecWebSocketKey(key);

String guid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
secKey += guid;
try {
MessageDigest md = MessageDigest.getInstance("SHA-1");
md.update(secKey.getBytes("iso-8859-1"), 0, secKey.length());
byte[] sha1Hash = md.digest();
secKey = base64Encode(sha1Hash);
} catch (Exception e) {
e.printStackTrace();
}

String rtn = "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: "
+ secKey + "\r\n\r\n";
return rtn;
}

/**
*
* @author 程欣伟
* 获取到请求头中的Sec-WebSocket-Key的
* @param req
* @return
*/
private String getSecWebSocketKey(String req) {
Pattern p = Pattern.compile("^(Sec-WebSocket-Key:).+",
Pattern.CASE_INSENSITIVE | Pattern.MULTILINE);
Matcher m = p.matcher(req);
if (m.find()) {
String foundstring = m.group();
return foundstring.split(":")[1].trim();
} else {
return null;
}

}

/**
* base64
* @param input
* @return
*/
private String base64Encode(byte[] input) {
return new String(org.apache.mina.util.Base64.encodeBase64(input));
}



/**
* @author 程欣伟
* 把传入的消息解码
* @param receivedDataBuffer
* @return
* @throws UnsupportedEncodingException
*/
private String decode(byte[] receivedDataBuffer)
throws UnsupportedEncodingException {
String result = null;

//数据开始的位数  前面2个byte 固定必须存在
int dataStartIndex=2;

//查看第一帧的值  代表是否结束
int isend = receivedDataBuffer[0]>>7&0x1;
System.out.println("是否结束:【"+(isend==1?"yes":"no")+"】");

//获取是否需要掩码
boolean mask = ((receivedDataBuffer[1]>>7&0x1)==1)?true:false;

System.out.println("掩码:【"+(mask?"yes":"no")+"】");

//Payload length: 传输数据的长度,以字节的形式表示:7位、7+16位、或者7+64位。
//如果这个值以字节表示是0-125这个范围,那这个值就表示传输数据的长度;
//如果这个值是126,则随后的两个字节表示的是一个16进制无符号数,用来表示传输数据的长度;
//如果这个值是127,则随后的是8个字节表示的一个64位无符合数,这个数用来表示传输数据的长度
int dataLength = receivedDataBuffer[1] & 0x7F;

System.out.println("描述消息长度:【"+dataLength+"】");

//查看 消息描述 是否大于 126 如果大于

if(dataLength<126){
//126以内取本身
}else if(dataLength==126){
dataStartIndex = dataStartIndex +2;
}else if(dataLength==127){
dataStartIndex = dataStartIndex +8;
}

//掩码数组
byte[]  frameMaskingAry = new byte[4];
if(mask){
for(int i=0;i<frameMaskingAry.length;i++){
frameMaskingAry[i] = receivedDataBuffer[dataStartIndex+i];
}
dataStartIndex += 4;
}

// 计算非空位置
int lastStation = receivedDataBuffer.length - 1;

// 利用掩码对org-data进行异或
int frame_masking_key = 0;


//保存数据的 数组
byte[] dataByte = new byte[lastStation-dataStartIndex+1];

if(mask){
for (int i = dataStartIndex; i <= lastStation; i++) {
//吧数据进行异或运算
receivedDataBuffer[i] = (byte) (receivedDataBuffer[i] ^ frameMaskingAry[frame_masking_key%4]);
//吧进行异或运算之后的 数据放入数组
dataByte[i-dataStartIndex]=receivedDataBuffer[i];
frame_masking_key++;
}
}



result = new String(dataByte, "UTF-8");
System.out.println(result);
return result;

}

/**
* @author 程欣伟
* 对传入数据进行无掩码转换
* @param msg
* @return
* @throws UnsupportedEncodingException
*/
private byte[] encode(String msg) throws UnsupportedEncodingException {
// 掩码开始位置
int masking_key_startIndex = 2;

byte[] msgByte = msg.getBytes("UTF-8");

// 计算掩码开始位置
if (msgByte.length <= 125) {
masking_key_startIndex = 2;
} else if (msgByte.length > 65536) {
masking_key_startIndex = 10;
} else if (msgByte.length > 125) {
masking_key_startIndex = 4;
}

// 创建返回数据
byte[] result = new byte[msgByte.length + masking_key_startIndex];

// 开始计算ws-frame
// frame-fin + frame-rsv1 + frame-rsv2 + frame-rsv3 + frame-opcode
result[0] = (byte) 0x81; // 129

// frame-masked+frame-payload-length
// 从第9个字节开始是 1111101=125,掩码是第3-第6个数据
// 从第9个字节开始是 1111110>=126,掩码是第5-第8个数据
if (msgByte.length <= 125) {
result[1] = (byte) (msgByte.length);
} else if (msgByte.length > 65536) {
result[1] = 0x7F; // 127
} else if (msgByte.length > 125) {
result[1] = 0x7E; // 126
result[2] = (byte) (msgByte.length >>;
result[3] = (byte) (msgByte.length % 256);
}

// 将数据编码放到最后
for (int i = 0; i < msgByte.length; i++) {
result[i + masking_key_startIndex] = msgByte[i];
}

decode(result);

String str = new String(result ,"utf-8");
System.out.println(str);
return result;
}

public Map<Long, IoSession> getIoSessionMap() {
return ioSessionMap;
}

public void setIoSessionMap(Map<Long, IoSession> ioSessionMap) {
this.ioSessionMap = ioSessionMap;
}

public Map<Integer, Long> getUserSessionMap() {
return userSessionMap;
}

public void setUserSessionMap(Map<Integer, Long> userSessionMap) {
this.userSessionMap = userSessionMap;
}

public static void main(String[] args) throws UnsupportedEncodingException {

byte b = 8;
  System.out.println(""
    + (byte) ((b >> 7) & 0x1) + (byte) ((b >> 6) & 0x1)
    + (byte) ((b >> 5) & 0x1) + (byte) ((b >> 4) & 0x1)
    + (byte) ((b >> 3) & 0x1) + (byte) ((b >> 2) & 0x1)
    + (byte) ((b >> 1) & 0x1) + (byte) ((b >> 0) & 0x1)
   );
 
 
byte[] a = {(byte)104,(byte)49};
System.out.println(new String(a,"utf-8"));
}



}
分享到:
评论
2 楼 lch520100 2014-07-01  
(byte) (msgByte.length >>8)
1 楼 liye71023326 2013-04-01  
您好,能给个demo吗?

相关推荐

Global site tag (gtag.js) - Google Analytics