您当前的位置: 首页 >  Java

柳鲲鹏

暂无认证

  • 0浏览

    0关注

    4642博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

java版本的RtpStream

柳鲲鹏 发布时间:2019-04-29 19:04:59 ,浏览量:0

  今天在家里的机器上,打开Eclipse,类删除也不编译出错,不知道哪里的问题。以后有机会,把这些代码编译后放出来。当初也是从网上搜索到了,略微整理而已。

package com.newayte.rtp.clinet;

import android.os.HandlerThread;
import android.os.Handler;

import com.newayte.toolkit.Log;

import java.util.concurrent.LinkedBlockingDeque;

/**
 *This class is used to analysis the data from rtp socket , recombine it to video or audio stream
 * 1. get the data from rtp socket
 * 2. put the data into buffer
 * 3. use the thread to get the data from buffer, and unpack it
 */
public abstract class RtpStream {

    private final static String TAG = RtpStream.class.getCanonicalName();
    
    protected final static int TRACK_VIDEO = 0x01;
    protected final static int TRACK_AUDIO = 0x02;
    
    public    final static int DATA_AUDIO = 0x08;
    public    final static int DATA_VIDEO = 0x60;
    

    private Handler mHandler;
    private HandlerThread thread;
    private boolean isStoped;
    private int oldSeqNum;

    protected class StreamPacks {
        public boolean mark;
        public int pt;
        public long timestamp;
        public int sequenceNumber;
        public long Ssrc;
        public byte[] data;
    }

    private static class bufferUnit {
        public byte[] data;
        public int len;
    }

    private static LinkedBlockingDeque bufferQueue = new LinkedBlockingDeque();

    public RtpStream() {
        thread = new HandlerThread("RTPStreamThread");
        thread.start();
        mHandler = new Handler(thread.getLooper());
        unpackThread();
        isStoped = false;
        oldSeqNum = -1;
    }

    public static void receiveData(byte[] data, int len) {
        bufferUnit tmpBuffer = new bufferUnit();
        tmpBuffer.data = new byte[len];
        System.arraycopy(data,0,tmpBuffer.data,0,len);
        tmpBuffer.len = len;

        try {
            bufferQueue.put(tmpBuffer);
        } catch (InterruptedException e) {
        }
    }

    private void unpackThread() {
        mHandler.post(new Runnable() {
            @Override
            public void run() {
                bufferUnit tmpBuffer;
                while (!isStoped) {
                    try {
                        tmpBuffer = bufferQueue.take();
                        byte[] buffer = new byte[tmpBuffer.len];
                        System.arraycopy(tmpBuffer.data,0,buffer,0,tmpBuffer.len);
                        unpackData(buffer);
                    } catch (InterruptedException e) {
                        //Log.e(TAG,"wait the new data into the queue..");
                        break;
                    }
                }
                bufferQueue.clear();
            }
        });
    }

    public void stop(){
        isStoped = true;
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        bufferQueue.clear();
        thread.quit();
    }


    protected abstract void recombinePacket(StreamPacks sp);

    private void unpackData(final byte[] buffer) {
        if (buffer == null || buffer.length == 0)
        {
        	return;
        }
        
        if (((buffer[0]&0xFF)>>6) != 2)
        {
    		Log.d(TAG, "ERROR!!!");
            return;
        }

        int size = buffer.length;
        
        if ((buffer[0] & 0x20) > 0)
        {
        	int paddingLength = (buffer[size-1] & 0xFF);
        	if (paddingLength + 12 > size)
        	{
        		Log.d(TAG, "ERROR!!!");
        		return;
        	}
        }

        int numCSRCs = (buffer[0] & 0x0F);
        int payloadOffset = 12 + 4 * numCSRCs;
        if (size < payloadOffset)
        {
        	Log.d(TAG, "ERROR!!!");
    		return;
        }

        if ((buffer[0] & 0x10) > 0) {
            // Header eXtension present.

            if (size < payloadOffset + 4) {
                // Not enough data to fit the basic header, all CSRC entries
                // and the first 4 bytes of the extension header.
            	Log.d(TAG, "ERROR!!!");
        		return;
            }

            int extensionLength = 4 * (buffer[payloadOffset+2] > 7 == 1;
        tmpStreampack.pt             =   buffer[1] & 0x7F;
        tmpStreampack.sequenceNumber = ((buffer[2] & 0xFF)             
关注
打赏
1665724893
查看更多评论
0.0597s