Skip to content

Commit

Permalink
Add/use data packets (lynckia#750)
Browse files Browse the repository at this point in the history
  • Loading branch information
lodoyun authored Feb 13, 2017
1 parent 042f578 commit fbe72ba
Show file tree
Hide file tree
Showing 26 changed files with 137 additions and 146 deletions.
1 change: 0 additions & 1 deletion erizo/src/erizo/DtlsTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

using erizo::Resender;
using erizo::DtlsTransport;
using erizo::packetPtr;
using dtls::DtlsSocketContext;

DEFINE_LOGGER(DtlsTransport, "DtlsTransport");
Expand Down
27 changes: 9 additions & 18 deletions erizo/src/erizo/MediaDefinitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ class Monitor {
class FeedbackSink {
public:
virtual ~FeedbackSink() {}
int deliverFeedback(char* buf, int len) {
return this->deliverFeedback_(buf, len);
int deliverFeedback(std::shared_ptr<dataPacket> data_packet) {
return this->deliverFeedback_(data_packet);
}
private:
virtual int deliverFeedback_(char* buf, int len) = 0;
virtual int deliverFeedback_(std::shared_ptr<dataPacket> data_packet) = 0;
};


Expand All @@ -76,11 +76,11 @@ class MediaSink: public virtual Monitor {
FeedbackSource* sinkfbSource_;

public:
int deliverAudioData(char* buf, int len) {
return this->deliverAudioData_(buf, len);
int deliverAudioData(std::shared_ptr<dataPacket> data_packet) {
return this->deliverAudioData_(data_packet);
}
int deliverVideoData(char* buf, int len) {
return this->deliverVideoData_(buf, len);
int deliverVideoData(std::shared_ptr<dataPacket> data_packet) {
return this->deliverVideoData_(data_packet);
}
unsigned int getVideoSinkSSRC() {
boost::mutex::scoped_lock lock(myMonitor_);
Expand Down Expand Up @@ -108,8 +108,8 @@ class MediaSink: public virtual Monitor {
virtual void close() = 0;

private:
virtual int deliverAudioData_(char* buf, int len) = 0;
virtual int deliverVideoData_(char* buf, int len) = 0;
virtual int deliverAudioData_(std::shared_ptr<dataPacket> data_packet) = 0;
virtual int deliverVideoData_(std::shared_ptr<dataPacket> data_packet) = 0;
};

/**
Expand Down Expand Up @@ -163,15 +163,6 @@ class MediaSource: public virtual Monitor {
virtual void close() = 0;
};

/**
* A NiceReceiver is any class that can receive data from a nice connection.
*/
class NiceReceiver {
public:
virtual int receiveNiceData(char* buf, int len, NiceConnection* nice) = 0;
virtual ~NiceReceiver() {}
};

} // namespace erizo

#endif // ERIZO_SRC_ERIZO_MEDIADEFINITIONS_H_
21 changes: 10 additions & 11 deletions erizo/src/erizo/OneToManyProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ namespace erizo {
this->closeAll();
}

int OneToManyProcessor::deliverAudioData_(char* buf, int len) {
// ELOG_DEBUG("OneToManyProcessor deliverAudio");
if (len <= 0)
int OneToManyProcessor::deliverAudioData_(std::shared_ptr<dataPacket> audio_packet) {
if (audio_packet->length <= 0)
return 0;

boost::unique_lock<boost::mutex> lock(myMonitor_);
Expand All @@ -33,21 +32,21 @@ namespace erizo {

std::map<std::string, std::shared_ptr<MediaSink>>::iterator it;
for (it = subscribers.begin(); it != subscribers.end(); ++it) {
(*it).second->deliverAudioData(buf, len);
(*it).second->deliverAudioData(audio_packet);
}

return 0;
}

int OneToManyProcessor::deliverVideoData_(char* buf, int len) {
if (len <= 0)
int OneToManyProcessor::deliverVideoData_(std::shared_ptr<dataPacket> video_packet) {
if (video_packet->length <= 0)
return 0;
RtcpHeader* head = reinterpret_cast<RtcpHeader*>(buf);
RtcpHeader* head = reinterpret_cast<RtcpHeader*>(video_packet->data);
if (head->isFeedback()) {
ELOG_WARN("Receiving Feedback in wrong path: %d", head->packettype);
if (feedbackSink_ != NULL) {
head->ssrc = htonl(publisher->getVideoSourceSSRC());
feedbackSink_->deliverFeedback(buf, len);
feedbackSink_->deliverFeedback(video_packet);
}
return 0;
}
Expand All @@ -57,7 +56,7 @@ namespace erizo {
std::map<std::string, std::shared_ptr<MediaSink>>::iterator it;
for (it = subscribers.begin(); it != subscribers.end(); ++it) {
if ((*it).second != NULL) {
(*it).second->deliverVideoData(buf, len);
(*it).second->deliverVideoData(video_packet);
}
}
return 0;
Expand All @@ -69,9 +68,9 @@ namespace erizo {
feedbackSink_ = publisher->getFeedbackSink();
}

int OneToManyProcessor::deliverFeedback_(char* buf, int len) {
int OneToManyProcessor::deliverFeedback_(std::shared_ptr<dataPacket> fb_packet) {
if (feedbackSink_ != NULL) {
feedbackSink_->deliverFeedback(buf, len);
feedbackSink_->deliverFeedback(fb_packet);
}
return 0;
}
Expand Down
6 changes: 3 additions & 3 deletions erizo/src/erizo/OneToManyProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ class OneToManyProcessor : public MediaSink, public FeedbackSink {
typedef std::shared_ptr<MediaSink> sink_ptr;
FeedbackSink* feedbackSink_;

int deliverAudioData_(char* buf, int len) override;
int deliverVideoData_(char* buf, int len) override;
int deliverFeedback_(char* buf, int len) override;
int deliverAudioData_(std::shared_ptr<dataPacket> audio_packet) override;
int deliverVideoData_(std::shared_ptr<dataPacket> video_packet) override;
int deliverFeedback_(std::shared_ptr<dataPacket> fb_packet) override;
void closeAll();
};

Expand Down
36 changes: 19 additions & 17 deletions erizo/src/erizo/WebRtcConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,42 +343,44 @@ void WebRtcConnection::onCandidate(const CandidateInfo& cand, Transport *transpo
}
}

int WebRtcConnection::deliverAudioData_(char* buf, int len) {
int WebRtcConnection::deliverAudioData_(std::shared_ptr<dataPacket> audio_packet) {
if (bundle_) {
if (videoTransport_.get() != NULL) {
if (audioEnabled_ == true) {
sendPacketAsync(std::make_shared<dataPacket>(0, buf, len, AUDIO_PACKET));
sendPacketAsync(std::make_shared<dataPacket>(*audio_packet));
}
}
} else if (audioTransport_.get() != NULL) {
if (audioEnabled_ == true) {
sendPacketAsync(std::make_shared<dataPacket>(0, buf, len, AUDIO_PACKET));
sendPacketAsync(std::make_shared<dataPacket>(*audio_packet));
}
}
return len;
return audio_packet->length;
}

int WebRtcConnection::deliverVideoData_(char* buf, int len) {
int WebRtcConnection::deliverVideoData_(std::shared_ptr<dataPacket> video_packet) {
if (videoTransport_.get() != NULL) {
if (videoEnabled_ == true) {
sendPacketAsync(std::make_shared<dataPacket>(0, buf, len, VIDEO_PACKET));
sendPacketAsync(std::make_shared<dataPacket>(*video_packet));
}
}
return len;
return video_packet->length;
}

int WebRtcConnection::deliverFeedback_(char* buf, int len) {
RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(buf);
int WebRtcConnection::deliverFeedback_(std::shared_ptr<dataPacket> fb_packet) {
RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(fb_packet->data);
uint32_t recvSSRC = chead->getSourceSSRC();
if (recvSSRC == this->getVideoSourceSSRC()) {
sendPacketAsync(std::make_shared<dataPacket>(0, buf, len, VIDEO_PACKET));
fb_packet->type = VIDEO_PACKET;
sendPacketAsync(fb_packet);
} else if (recvSSRC == this->getAudioSourceSSRC()) {
sendPacketAsync(std::make_shared<dataPacket>(0, buf, len, AUDIO_PACKET));
fb_packet->type = AUDIO_PACKET;
sendPacketAsync(fb_packet);
} else {
ELOG_DEBUG("%s unknownSSRC: %u, localVideoSSRC: %u, localAudioSSRC: %u",
toLog(), recvSSRC, this->getVideoSourceSSRC(), this->getAudioSourceSSRC());
}
return len;
return fb_packet->length;
}

void WebRtcConnection::onTransportData(std::shared_ptr<dataPacket> packet, Transport *transport) {
Expand Down Expand Up @@ -427,7 +429,7 @@ void WebRtcConnection::read(std::shared_ptr<dataPacket> packet) {
// DELIVER FEEDBACK (RR, FEEDBACK PACKETS)
if (chead->isFeedback()) {
if (fbSink_ != NULL && shouldSendFeedback_) {
fbSink_->deliverFeedback(buf, len);
fbSink_->deliverFeedback(packet);
}
} else {
// RTP or RTCP Sender Report
Expand All @@ -436,10 +438,10 @@ void WebRtcConnection::read(std::shared_ptr<dataPacket> packet) {
// Deliver data
if (recvSSRC == this->getVideoSourceSSRC()) {
parseIncomingPayloadType(buf, len, VIDEO_PACKET);
videoSink_->deliverVideoData(buf, len);
videoSink_->deliverVideoData(packet);
} else if (recvSSRC == this->getAudioSourceSSRC()) {
parseIncomingPayloadType(buf, len, AUDIO_PACKET);
audioSink_->deliverAudioData(buf, len);
audioSink_->deliverAudioData(packet);
} else {
ELOG_DEBUG("%s unknownSSRC: %u, localVideoSSRC: %u, localAudioSSRC: %u",
toLog(), recvSSRC, this->getVideoSourceSSRC(), this->getAudioSourceSSRC());
Expand All @@ -452,7 +454,7 @@ void WebRtcConnection::read(std::shared_ptr<dataPacket> packet) {
ELOG_DEBUG("%s discoveredAudioSourceSSRC:%u", toLog(), recvSSRC);
this->setAudioSourceSSRC(recvSSRC);
}
audioSink_->deliverAudioData(buf, len);
audioSink_->deliverAudioData(packet);
} else if (packet->type == VIDEO_PACKET && videoSink_ != NULL) {
parseIncomingPayloadType(buf, len, VIDEO_PACKET);
// Firefox does not send SSRC in SDP
Expand All @@ -461,7 +463,7 @@ void WebRtcConnection::read(std::shared_ptr<dataPacket> packet) {
this->setVideoSourceSSRC(recvSSRC);
}
// change ssrc for RTP packets, don't touch here if RTCP
videoSink_->deliverVideoData(buf, len);
videoSink_->deliverVideoData(packet);
}
} // if not bundle
} // if not Feedback
Expand Down
10 changes: 3 additions & 7 deletions erizo/src/erizo/WebRtcConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,6 @@ class WebRtcConnection: public MediaSink, public MediaSource, public FeedbackSin
*/
std::string getLocalSdp();

int deliverAudioData(char* buf, int len);
int deliverVideoData(char* buf, int len);
int deliverFeedback(char* buf, int len);

/**
* Sends a PLI Packet
* @return the size of the data sent
Expand Down Expand Up @@ -175,9 +171,9 @@ class WebRtcConnection: public MediaSink, public MediaSource, public FeedbackSin

private:
void sendPacket(std::shared_ptr<dataPacket> packet);
int deliverAudioData_(char* buf, int len) override;
int deliverVideoData_(char* buf, int len) override;
int deliverFeedback_(char* buf, int len) override;
int deliverAudioData_(std::shared_ptr<dataPacket> audio_packet) override;
int deliverVideoData_(std::shared_ptr<dataPacket> video_packet) override;
int deliverFeedback_(std::shared_ptr<dataPacket> fb_packet) override;
void initializePipeline();

// Utils
Expand Down
13 changes: 7 additions & 6 deletions erizo/src/erizo/media/ExternalInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

#include "./WebRtcConnection.h"

using std::memcpy;

namespace erizo {
DEFINE_LOGGER(ExternalInput, "media.ExternalInput");
ExternalInput::ExternalInput(const std::string& inputUrl):url_(inputUrl) {
Expand Down Expand Up @@ -165,10 +163,11 @@ int ExternalInput::sendPLI() {
}


void ExternalInput::receiveRtpData(unsigned char*rtpdata, int len) {
void ExternalInput::receiveRtpData(unsigned char* rtpdata, int len) {
if (videoSink_ != NULL) {
memcpy(sendVideoBuffer_, rtpdata, len);
videoSink_->deliverVideoData(sendVideoBuffer_, len);
std::shared_ptr<dataPacket> packet = std::make_shared<dataPacket>(0, reinterpret_cast<char*>(rtpdata),
len, VIDEO_PACKET);
videoSink_->deliverVideoData(packet);
}
}

Expand Down Expand Up @@ -214,7 +213,9 @@ void ExternalInput::receiveLoop() {
lastAudioPts_ = avpacket_.pts;
length = op_->packageAudio(avpacket_.data, avpacket_.size, decodedBuffer_.get(), avpacket_.pts);
if (length > 0) {
audioSink_->deliverAudioData(reinterpret_cast<char*>(decodedBuffer_.get()), length);
std::shared_ptr<dataPacket> packet = std::make_shared<dataPacket>(0,
reinterpret_cast<char*>(decodedBuffer_.get()), length, AUDIO_PACKET);
audioSink_->deliverAudioData(packet);
}
}
}
Expand Down
1 change: 0 additions & 1 deletion erizo/src/erizo/media/ExternalInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class ExternalInput : public MediaSource, public RTPDataReceiver {
boost::scoped_ptr<OutputProcessor> op_;
VideoDecoder inCodec_;
boost::scoped_array<unsigned char> decodedBuffer_;
char sendVideoBuffer_[2000];

std::string url_;
bool running_;
Expand Down
15 changes: 9 additions & 6 deletions erizo/src/erizo/media/ExternalOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,17 +326,19 @@ void ExternalOutput::writeVideoData(char* buf, int len) {
}
}

int ExternalOutput::deliverAudioData_(char* buf, int len) {
this->queueData(buf, len, AUDIO_PACKET);
int ExternalOutput::deliverAudioData_(std::shared_ptr<dataPacket> audio_packet) {
std::shared_ptr<dataPacket> copied_packet = std::make_shared<dataPacket>(*audio_packet);
this->queueData(copied_packet->data, copied_packet->length, AUDIO_PACKET);
return 0;
}

int ExternalOutput::deliverVideoData_(char* buf, int len) {
int ExternalOutput::deliverVideoData_(std::shared_ptr<dataPacket> video_packet) {
std::shared_ptr<dataPacket> copied_packet = std::make_shared<dataPacket>(*video_packet);
if (videoSourceSsrc_ == 0) {
RtpHeader* h = reinterpret_cast<RtpHeader*>(buf);
RtpHeader* h = reinterpret_cast<RtpHeader*>(copied_packet->data);
videoSourceSsrc_ = h->getSSRC();
}
this->queueData(buf, len, VIDEO_PACKET);
this->queueData(copied_packet->data, copied_packet->length, VIDEO_PACKET);
return 0;
}

Expand Down Expand Up @@ -480,7 +482,8 @@ int ExternalOutput::sendFirPacket() {
thePLI.setLength(2);
char *buf = reinterpret_cast<char*>(&thePLI);
int len = (thePLI.getLength() + 1) * 4;
fbSink_->deliverFeedback(reinterpret_cast<char*>(buf), len);
std::shared_ptr<dataPacket> pli_packet = std::make_shared<dataPacket>(0, buf, len, VIDEO_PACKET);
fbSink_->deliverFeedback(pli_packet);
return len;
}
return -1;
Expand Down
5 changes: 2 additions & 3 deletions erizo/src/erizo/media/ExternalOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ class ExternalOutput : public MediaSink, public RawDataReceiver, public Feedback
int unpackagedSize_;
uint32_t videoSourceSsrc_;
unsigned char* unpackagedBufferpart_;
unsigned char deliverMediaBuffer_[3000];
unsigned char unpackagedBuffer_[UNPACKAGE_BUFFER_SIZE];

// Timestamping strategy: we use the RTP timestamps so we don't have to restamp and we're not
Expand Down Expand Up @@ -103,8 +102,8 @@ class ExternalOutput : public MediaSink, public RawDataReceiver, public Feedback
int sendFirPacket();
void queueData(char* buffer, int length, packetType type);
void sendLoop();
int deliverAudioData_(char* buf, int len) override;
int deliverVideoData_(char* buf, int len) override;
int deliverAudioData_(std::shared_ptr<dataPacket> audio_packet) override;
int deliverVideoData_(std::shared_ptr<dataPacket> video_packet) override;
void writeAudioData(char* buf, int len);
void writeVideoData(char* buf, int len);
bool bufferCheck(RTPPayloadVP8* payload);
Expand Down
11 changes: 7 additions & 4 deletions erizo/src/erizo/media/MediaProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ int InputProcessor::init(const MediaInfo& info, RawDataReceiver* receiver) {
return 0;
}

int InputProcessor::deliverAudioData_(char* buf, int len) {
int InputProcessor::deliverAudioData_(std::shared_ptr<dataPacket> audio_packet) {
if (audioDecoder && audioUnpackager) {
std::shared_ptr<dataPacket> copied_packet = std::make_shared<dataPacket>(*audio_packet);
ELOG_DEBUG("Decoding audio");
int unp = unpackageAudio((unsigned char*) buf, len,
int unp = unpackageAudio((unsigned char*) copied_packet->data, copied_packet->length,
unpackagedAudioBuffer_);
int a = decodeAudio(unpackagedAudioBuffer_, unp, decodedAudioBuffer_);
ELOG_DEBUG("DECODED AUDIO a %d", a);
Expand All @@ -86,9 +87,11 @@ int InputProcessor::deliverAudioData_(char* buf, int len) {
}
return 0;
}
int InputProcessor::deliverVideoData_(char* buf, int len) {
int InputProcessor::deliverVideoData_(std::shared_ptr<dataPacket> video_packet) {
if (videoUnpackager && videoDecoder) {
int ret = unpackageVideo(reinterpret_cast<unsigned char*>(buf), len, unpackagedBufferPtr_, &gotUnpackagedFrame_);
std::shared_ptr<dataPacket> copied_packet = std::make_shared<dataPacket>(*video_packet);
int ret = unpackageVideo(reinterpret_cast<unsigned char*>(copied_packet->data), copied_packet->length,
unpackagedBufferPtr_, &gotUnpackagedFrame_);
if (ret < 0)
return 0;
upackagedSize_ += ret;
Expand Down
Loading

0 comments on commit fbe72ba

Please sign in to comment.