forked from ZLMediaKit/ZLMediaKit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSrtSession.cpp
144 lines (124 loc) · 4.95 KB
/
SrtSession.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#include "SrtSession.hpp"
#include "Packet.hpp"
#include "SrtTransportImp.hpp"
#include "Common/config.h"
namespace SRT {
using namespace mediakit;
SrtSession::SrtSession(const Socket::Ptr &sock)
: Session(sock) {
socklen_t addr_len = sizeof(_peer_addr);
memset(&_peer_addr, 0, addr_len);
// TraceL<<"before addr len "<<addr_len;
getpeername(sock->rawFD(), (struct sockaddr *)&_peer_addr, &addr_len);
// TraceL<<"after addr len "<<addr_len<<" family "<<_peer_addr.ss_family;
}
SrtSession::~SrtSession() = default;
EventPoller::Ptr SrtSession::queryPoller(const Buffer::Ptr &buffer) {
uint8_t *data = (uint8_t *)buffer->data();
size_t size = buffer->size();
if (DataPacket::isDataPacket(data, size)) {
uint32_t socket_id = DataPacket::getSocketID(data, size);
auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
return trans ? trans->getPoller() : nullptr;
}
if (HandshakePacket::isHandshakePacket(data, size)) {
auto type = HandshakePacket::getHandshakeType(data, size);
if (type == HandshakePacket::HS_TYPE_INDUCTION) {
// 握手第一阶段
return nullptr;
} else if (type == HandshakePacket::HS_TYPE_CONCLUSION) {
// 握手第二阶段
uint32_t sync_cookie = HandshakePacket::getSynCookie(data, size);
auto trans = SrtTransportManager::Instance().getHandshakeItem(std::to_string(sync_cookie));
return trans ? trans->getPoller() : nullptr;
} else {
WarnL << " not reach there";
}
} else {
uint32_t socket_id = ControlPacket::getSocketID(data, size);
auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
return trans ? trans->getPoller() : nullptr;
}
return nullptr;
}
void SrtSession::attachServer(const toolkit::Server &server) {
SockUtil::setRecvBuf(getSock()->rawFD(), 1024 * 1024);
}
void SrtSession::onRecv(const Buffer::Ptr &buffer) {
uint8_t *data = (uint8_t *)buffer->data();
size_t size = buffer->size();
if (_find_transport) {
//只允许寻找一次transport
_find_transport = false;
if (DataPacket::isDataPacket(data, size)) {
uint32_t socket_id = DataPacket::getSocketID(data, size);
auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
if (trans) {
_transport = std::move(trans);
} else {
WarnL << " data packet not find transport ";
}
}
if (HandshakePacket::isHandshakePacket(data, size)) {
auto type = HandshakePacket::getHandshakeType(data, size);
if (type == HandshakePacket::HS_TYPE_INDUCTION) {
// 握手第一阶段
_transport = std::make_shared<SrtTransportImp>(getPoller());
} else if (type == HandshakePacket::HS_TYPE_CONCLUSION) {
// 握手第二阶段
uint32_t sync_cookie = HandshakePacket::getSynCookie(data, size);
auto trans = SrtTransportManager::Instance().getHandshakeItem(std::to_string(sync_cookie));
if (trans) {
_transport = std::move(trans);
} else {
WarnL << " hanshake packet not find transport ";
}
} else {
WarnL << " not reach there";
}
} else {
uint32_t socket_id = ControlPacket::getSocketID(data, size);
auto trans = SrtTransportManager::Instance().getItem(std::to_string(socket_id));
if (trans) {
_transport = std::move(trans);
} else {
WarnL << " not find transport";
}
}
if (_transport) {
_transport->setSession(static_pointer_cast<Session>(shared_from_this()));
}
InfoP(this);
}
_ticker.resetTime();
if (_transport) {
_transport->inputSockData(data, size, &_peer_addr);
} else {
// WarnL<< "ingore data";
}
}
void SrtSession::onError(const SockException &err) {
// udp链接超时,但是srt链接不一定超时,因为可能存在udp链接迁移的情况
//在udp链接迁移时,新的SrtSession对象将接管SrtSession对象的生命周期
//本SrtSession对象将在超时后自动销毁
WarnP(this) << err;
if (!_transport) {
return;
}
// 防止互相引用导致不释放
auto transport = std::move(_transport);
getPoller()->async(
[transport] {
//延时减引用,防止使用transport对象时,销毁对象
//transport->onShutdown(err);
},
false);
}
void SrtSession::onManager() {
GET_CONFIG(float, timeoutSec, kTimeOutSec);
if (_ticker.elapsedTime() > timeoutSec * 1000) {
shutdown(SockException(Err_timeout, "srt connection timeout"));
return;
}
}
} // namespace SRT