Skip to content

Commit

Permalink
unify udpraw linux and mp branch
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyu- committed Jul 15, 2020
1 parent 79bb28f commit 5a51248
Show file tree
Hide file tree
Showing 8 changed files with 1,310 additions and 7 deletions.
269 changes: 268 additions & 1 deletion client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@
#include "encrypt.h"
#include "fd_manager.h"

#ifdef UDP2RAW_MP
u32_t detect_interval=1500;
u64_t laste_detect_time=0;

int use_udp_for_detection=0;
int use_tcp_for_detection=1;


extern pcap_t *pcap_handle;

extern int pcap_captured_full_len;
#endif

int client_on_timer(conn_info_t &conn_info) //for client. called when a timer is ready in epoll
{
Expand All @@ -20,6 +32,75 @@ int client_on_timer(conn_info_t &conn_info) //for client. called when a timer is

mylog(log_trace,"<client_on_timer,send_info.ts_ack= %u>\n",send_info.ts_ack);

#ifdef UDP2RAW_MP
//mylog(log_debug,"pcap cnt :%d\n",pcap_cnt);
if(send_with_pcap&&!pcap_header_captured)
{

if(get_current_time()-laste_detect_time>detect_interval)
{
laste_detect_time=get_current_time();
}
else
{
return 0;
}
/*
struct sockaddr_in remote_addr_in={0};
socklen_t slen = sizeof(sockaddr_in);
int port=get_true_random_number()%65534+1;
remote_addr_in.sin_family = AF_INET;
remote_addr_in.sin_port = htons(port);
remote_addr_in.sin_addr.s_addr = remote_ip_uint32;*/
int port=get_true_random_number()%65534+1;
address_t tmp_addr=remote_addr;
tmp_addr.set_port(port);

if(use_udp_for_detection)
{
int new_udp_fd=socket(tmp_addr.get_type(), SOCK_DGRAM, IPPROTO_UDP);
if(new_udp_fd<0)
{
mylog(log_warn,"create new_udp_fd error\n");
return -1;
}
setnonblocking(new_udp_fd);
u64_t tmp=get_true_random_number();

int ret=sendto(new_udp_fd,(char*)(&tmp),sizeof(tmp),0,(struct sockaddr *)&tmp_addr.inner,tmp_addr.get_len());
if(ret==-1)
{
mylog(log_warn,"sendto() failed\n");
}
sock_close(new_udp_fd);
}

if(use_tcp_for_detection)
{
static int last_tcp_fd=-1;

int new_tcp_fd=socket(tmp_addr.get_type(), SOCK_STREAM, IPPROTO_TCP);
if(new_tcp_fd<0)
{
mylog(log_warn,"create new_tcp_fd error\n");
return -1;
}
setnonblocking(new_tcp_fd);
connect(new_tcp_fd,(struct sockaddr *)&tmp_addr.inner,tmp_addr.get_len());
if(last_tcp_fd!=-1)
sock_close(last_tcp_fd);
last_tcp_fd=new_tcp_fd;
//close(new_tcp_fd);
}



mylog(log_info,"waiting for a use-able packet to be captured\n");

return 0;
}
#endif
if(raw_info.disabled)
{
conn_info.state.client_current_state=client_idle;
Expand Down Expand Up @@ -387,7 +468,10 @@ int client_on_raw_recv(conn_info_t &conn_info) //called when raw fd received a p
raw_info_t &raw_info=conn_info.raw_info;

mylog(log_trace,"<client_on_raw_recv,send_info.ts_ack= %u>\n",send_info.ts_ack);

#ifdef UDP2RAW_LINUX
if(pre_recv_raw_packet()<0) return -1;
#endif

if(conn_info.state.client_current_state==client_idle )
{
Expand Down Expand Up @@ -583,10 +667,71 @@ void udp_accept_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
}
void raw_recv_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
{
//assert(0==1);
if(is_udp2raw_mp)assert(0==1);
conn_info_t & conn_info= *((conn_info_t*)watcher->data);
client_on_raw_recv(conn_info);
}
#ifdef UDP2RAW_MP
void async_cb(struct ev_loop *loop, struct ev_async *watcher, int revents)
{
conn_info_t & conn_info= *((conn_info_t*)watcher->data);

if(send_with_pcap&&!pcap_header_captured)
{
int empty=0;char *p;int len;
pthread_mutex_lock(&queue_mutex);
empty=my_queue.empty();
if(!empty)
{
my_queue.peek_front(p,len);
my_queue.pop_front();
}
pthread_mutex_unlock(&queue_mutex);
if(empty) return;

pcap_header_captured=1;
assert(pcap_link_header_len!=-1);
memcpy(pcap_header_buf,p,max_data_len);

log_bare(log_info,"link level header captured:\n");
unsigned char *tmp=(unsigned char*)pcap_header_buf;
pcap_captured_full_len=len;
for(int i=0;i<pcap_link_header_len;i++)
log_bare(log_info,"<%x>",(u32_t)tmp[i]);

log_bare(log_info,"\n");
return ;
}

//mylog(log_info,"async_cb called\n");
while(1)
{
int empty=0;char *p;int len;
pthread_mutex_lock(&queue_mutex);
empty=my_queue.empty();
if(!empty)
{
my_queue.peek_front(p,len);
my_queue.pop_front();
}
pthread_mutex_unlock(&queue_mutex);

if(empty) break;
if(g_fix_gro==0&&len>max_data_len)
{
mylog(log_warn,"huge packet %d > %d, dropped\n",len,max_data_len);
break;
}

int new_len=len-pcap_link_header_len;
memcpy(g_packet_buf,p+pcap_link_header_len,new_len);
g_packet_buf_len=new_len;
assert(g_packet_buf_cnt==0);
g_packet_buf_cnt++;
client_on_raw_recv(conn_info);
}
}
#endif
void clear_timer_cb(struct ev_loop *loop, struct ev_timer *watcher, int revents)
{
conn_info_t & conn_info= *((conn_info_t*)watcher->data);
Expand Down Expand Up @@ -632,6 +777,7 @@ int client_event_loop()
packet_info_t &send_info=conn_info.raw_info.send_info;
packet_info_t &recv_info=conn_info.raw_info.recv_info;

#ifdef UDP2RAW_LINUX
if(lower_level)
{
if(lower_level_manual)
Expand Down Expand Up @@ -707,6 +853,116 @@ int client_event_loop()
}

}
#endif

#ifdef UDP2RAW_MP

address_t tmp_addr;
if(get_src_adress2(tmp_addr,remote_addr)!=0)
{
mylog(log_error,"get_src_adress() failed\n");
myexit(-1);
}
if(strcmp(dev,"")==0)
{
mylog(log_info,"--dev have not been set, trying to detect automatically, avaliable deives:\n");

mylog(log_info,"avaliable deives(device name: ip address ; description):\n");

char errbuf[PCAP_ERRBUF_SIZE];

int found=0;

pcap_if_t *interfaces,*d;
if(pcap_findalldevs(&interfaces,errbuf)==-1)
{
mylog(log_fatal,"error in pcap_findalldevs(),%s\n",errbuf);
myexit(-1);
}

for(pcap_if_t *d=interfaces; d!=NULL; d=d->next) {
log_bare(log_warn,"%s:", d->name);
int cnt=0;
for(pcap_addr_t *a=d->addresses; a!=NULL; a=a->next) {
if(a->addr==NULL)
{
log_bare(log_debug," [a->addr==NULL]");
continue;
}
if(a->addr->sa_family == AF_INET||a->addr->sa_family == AF_INET6)
{
cnt++;

if(a->addr->sa_family ==AF_INET)
{
char s[max_addr_len];
inet_ntop(AF_INET, &((struct sockaddr_in*)a->addr)->sin_addr, s,max_addr_len);
log_bare(log_warn," [%s]", s);

if(a->addr->sa_family==raw_ip_version)
{
if(((struct sockaddr_in*)a->addr)->sin_addr.s_addr ==tmp_addr.inner.ipv4.sin_addr.s_addr)
{
found++;
strcpy(dev,d->name);
}
}
}
else
{
assert(a->addr->sa_family ==AF_INET6);

char s[max_addr_len];
inet_ntop(AF_INET6, &((struct sockaddr_in6*)a->addr)->sin6_addr, s,max_addr_len);
log_bare(log_warn," [%s]", s);

if(a->addr->sa_family==raw_ip_version)
{
if( memcmp( &((struct sockaddr_in6*)a->addr)->sin6_addr,&tmp_addr.inner.ipv6.sin6_addr,sizeof(struct in6_addr))==0 )
{
found++;
strcpy(dev,d->name);
}
}
}
}
else
{
log_bare(log_debug," [unknow:%d]",int(a->addr->sa_family));
}
}
if(cnt==0) log_bare(log_warn," [no ip found]");
if(d->description==0)
{
log_bare(log_warn,"; (no description avaliable)");
}
else
{
log_bare(log_warn,"; %s", d->description);
}
log_bare(log_warn,"\n");
}

if(found==0)
{
mylog(log_fatal,"no matched device found for ip: [%s]\n",tmp_addr.get_ip());
myexit(-1);
}
else if(found==1)
{
mylog(log_info,"using device:[%s], ip: [%s]\n",dev,tmp_addr.get_ip());
}
else
{
mylog(log_fatal,"more than one devices found for ip: [%s] , you need to use --dev manually\n",tmp_addr.get_ip());
myexit(-1);
}
}
else
{
mylog(log_info,"--dev has been manually set, using device:[%s]\n",dev);
}
#endif

send_info.src_port=0;
memset(&send_info.new_src_ip,0,sizeof(send_info.new_src_ip));
Expand Down Expand Up @@ -766,11 +1022,22 @@ int client_event_loop()
// myexit(-1);
//}

#ifdef UDP2RAW_LINUX
struct ev_io raw_recv_watcher;

raw_recv_watcher.data=&conn_info;
ev_io_init(&raw_recv_watcher, raw_recv_cb, raw_recv_fd, EV_READ);
ev_io_start(loop, &raw_recv_watcher);
#endif

#ifdef UDP2RAW_MP
g_default_loop=loop;
async_watcher.data=&conn_info;
ev_async_init(&async_watcher,async_cb);
ev_async_start(loop,&async_watcher);

init_raw_socket();//must be put after dev detection
#endif

//set_timer(epollfd,timer_fd);
struct ev_timer clear_timer;
Expand Down
Loading

0 comments on commit 5a51248

Please sign in to comment.