Skip to content

Commit

Permalink
Reactor
Browse files Browse the repository at this point in the history
  • Loading branch information
mu1023 committed Dec 28, 2020
1 parent cf8c70d commit 942426a
Show file tree
Hide file tree
Showing 16 changed files with 481 additions and 27 deletions.
2 changes: 2 additions & 0 deletions Young/ClassDiagram.cd
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
<?xml version="1.0" encoding="utf-8"?>
<ClassDiagram />
95 changes: 95 additions & 0 deletions Young/PollDemultiplexer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#include "PollDemultiplexer.h"
#include<iostream>
namespace Young
{
Events PollDemultiplexer::ReadEvent()
{
#ifdef WINDOWS_YOUNG
return POLLIN;
#else
return POLLIN | POLLPRI;
#endif // WINDOWS_YOUNG

}
Events PollDemultiplexer::WriteEvent()
{
return POLLOUT;
}
Events PollDemultiplexer::ErrorEvent()
{
return POLLERR;
}
void PollDemultiplexer::Run(UInt32 timeout)
{
UInt32 nums = poll(&*m_Pollfds.begin(), m_Pollfds.size(), timeout);
std::vector<pollfd> activityFds;
//把活跃的fd取出来。防止执行handler事件时删除句柄,而此时又在遍历m_Pollfds
for (UInt32 i = 0; i < m_Pollfds.size() && nums > 0; i++)
{
if (m_Pollfds[i].revents)
{
--nums;
activityFds.push_back(m_Pollfds[i]);
}

}
for (UInt32 i = 0; i < activityFds.size(); i++)
{
SocketFd sfd = activityFds[i].fd;
if (activityFds[i].revents & ReadEvent())
{
m_Handlers[sfd]->HandleRead();
}
if (activityFds[i].revents & WriteEvent())
{
m_Handlers[sfd]->HandleWrite();
}
if (activityFds[i].revents & ErrorEvent())
{
m_Handlers[sfd]->HandleError();
}
}
}
void PollDemultiplexer::UpdateHandler(EventHandlerPtr EventHandler, Events events)
{
if (EventHandler == NULL)
{
return;
}
SocketFd sfd = EventHandler->GetFd();
if (m_IndexBySocketFd.count(sfd) == 0)
{
//没事件
if (events == NoneEvent)
{
return;
}
pollfd pfd;
pfd.events = events;
pfd.fd = EventHandler->GetFd();

m_Pollfds.push_back(pfd);

m_IndexBySocketFd[sfd] = m_Pollfds.size() - 1;

m_Handlers[sfd] = EventHandler;
}
else
{
UInt32 idx = m_IndexBySocketFd[EventHandler->GetFd()];

if (events == NoneEvent)
{

std::iter_swap(m_Pollfds.begin() + idx, m_Pollfds.end() - 1);
m_Pollfds.pop_back();
m_IndexBySocketFd.erase(sfd);
m_Handlers.erase(sfd);
}
else
{
m_Pollfds[idx].events = events;
}
}
}
}
16 changes: 14 additions & 2 deletions Young/Young.vcxproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|Win32">
Expand Down Expand Up @@ -126,14 +126,23 @@
</Link>
</ItemDefinitionGroup>
<ItemGroup>
<ClCompile Include="..\include\Acceptor.cpp" />
<ClCompile Include="..\include\Connector.cpp" />
<ClCompile Include="main.cpp" />
<ClCompile Include="PollDemultiplexer.cpp" />
<ClCompile Include="SharedMemory.cpp" />
<ClCompile Include="DataPointLogger.cpp" />
<ClCompile Include="Formatter.cpp" />
<ClCompile Include="Logging.cpp" />
<ClCompile Include="YoungTime.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\include\Acceptor.h" />
<ClInclude Include="..\include\Connector.h" />
<ClInclude Include="..\include\EventDemultiplexer.h" />
<ClInclude Include="..\include\EventHandler.h" />
<ClInclude Include="..\include\PollDemultiplexer.h" />
<ClInclude Include="..\include\Reactor.h" />
<ClInclude Include="..\include\SharedMemory.h" />
<ClInclude Include="..\include\SharedMemoryMap.h" />
<ClInclude Include="..\include\Noncopyable.h" />
Expand All @@ -147,10 +156,13 @@
<ClInclude Include="..\include\FixedBuffer.h" />
<ClInclude Include="..\include\Formatter.h" />
<ClInclude Include="..\include\Logging.h" />
<ClInclude Include="..\include\SocketAPI.h" />
<ClInclude Include="..\include\NetWorkAPI.h" />
<ClInclude Include="..\include\YoungTime.h" />
<ClInclude Include="..\include\UdpAppender.h" />
</ItemGroup>
<ItemGroup>
<None Include="ClassDiagram.cd" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
Expand Down
39 changes: 36 additions & 3 deletions Young/Young.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
<Filter Include="源文件\Logger">
<UniqueIdentifier>{95db4ce2-cc6f-4386-8061-3f8a8e9f3014}</UniqueIdentifier>
</Filter>
<Filter Include="头文件\net">
<UniqueIdentifier>{baeb7e6c-4869-4a77-8012-be4c1cd9a747}</UniqueIdentifier>
</Filter>
</ItemGroup>
<ItemGroup>
<ClCompile Include="YoungTime.cpp">
Expand All @@ -36,6 +39,15 @@
<ClCompile Include="Logging.cpp">
<Filter>源文件\Logger</Filter>
</ClCompile>
<ClCompile Include="PollDemultiplexer.cpp">
<Filter>源文件</Filter>
</ClCompile>
<ClCompile Include="..\include\Acceptor.cpp">
<Filter>源文件</Filter>
</ClCompile>
<ClCompile Include="..\include\Connector.cpp">
<Filter>源文件</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\include\YoungDefine.h">
Expand Down Expand Up @@ -80,11 +92,32 @@
<ClInclude Include="..\include\Singleton.h">
<Filter>头文件</Filter>
</ClInclude>
<ClInclude Include="..\include\SocketAPI.h">
<Filter>头文件</Filter>
</ClInclude>
<ClInclude Include="..\include\UdpAppender.h">
<Filter>头文件</Filter>
</ClInclude>
<ClInclude Include="..\include\Acceptor.h">
<Filter>头文件\net</Filter>
</ClInclude>
<ClInclude Include="..\include\Reactor.h">
<Filter>头文件\net</Filter>
</ClInclude>
<ClInclude Include="..\include\Connector.h">
<Filter>头文件\net</Filter>
</ClInclude>
<ClInclude Include="..\include\EventDemultiplexer.h">
<Filter>头文件\net</Filter>
</ClInclude>
<ClInclude Include="..\include\PollDemultiplexer.h">
<Filter>头文件\net</Filter>
</ClInclude>
<ClInclude Include="..\include\EventHandler.h">
<Filter>头文件\net</Filter>
</ClInclude>
<ClInclude Include="..\include\NetWorkAPI.h">
<Filter>头文件\net</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<None Include="ClassDiagram.cd" />
</ItemGroup>
</Project>
44 changes: 43 additions & 1 deletion Young/main.cpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,55 @@
#include<Logging.h>
#include<ConsoleAppender.h>
#include<FileAppender.h>
#include<Reactor.h>
#include<Acceptor.h>
#include<PollDemultiplexer.h>
using namespace std;
using namespace Young;
int main() {

SOCKET_STARTUP
auto reactor = std::make_shared<Reactor>(std::make_shared<PollDemultiplexer>());

SocketFd fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
u_long iMode = 1;
ioctlsocket(fd, FIONBIO, &iMode);
sockaddr_in addr;
memset(&addr, 0, sizeof addr);
addr.sin_family = AF_INET;
addr.sin_port = htons(12345);
inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr.s_addr);
if (::bind(fd, (sockaddr *)&addr, sizeof addr) < 0)
{
std::cout << WSAGetLastError() << endl;
}
if (0 > listen(fd, 5))
{
std::cout << WSAGetLastError() << endl;
}
EventHandlerPtr b =std::make_shared<Acceptor>(fd, reactor);

reactor->RegisterHandler(b, reactor->ReadEvent());

thread th([&] {
reactor->Run();
});

Sleep(1000);
SocketFd cfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

if(0 > connect(cfd, (sockaddr *)&addr, sizeof addr))
{
std::cout << WSAGetLastError() << endl;
}
Sleep(1000);
send(cfd, "aaaa", 4, 0);
th.join();
std::shared_ptr<FileAppender<std::mutex>> ptr1 = std::make_shared< FileAppender<std::mutex>>(".\\log\\test.log", Timestamp::Now());
std::shared_ptr<ConsoleAppender<std::mutex>> ptr2 = std::make_shared< ConsoleAppender<std::mutex>>();
std::shared_ptr<UdpAppender<std::mutex>> ptr3 = std::make_shared<UdpAppender<std::mutex>>("101.101.010.101", 123);
Logger::Instance()->InsertAppenderPtr(ptr1);
Logger::Instance()->InsertAppenderPtr(ptr2);
Logger::Instance()->InsertAppenderPtr(ptr3);
LOGGER_DEBUG("afwrwa{0}", "xxx");
LOGGER_DEBUG("afwrwa{0}", "xxx");
LOGGER_DEBUG("afwrwa{0}", "xxx");
Expand All @@ -20,4 +60,6 @@ int main() {
LOGGER_DEBUG("afwrwa{0}", "xxx");
LOGGER_DEBUG("afwrwa{0}", "xxx");
LOGGER_DEBUG("afwrwa{0}", "xxx");

SOCKET_CLEANUP
}
33 changes: 33 additions & 0 deletions include/Acceptor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#include "Acceptor.h"
namespace Young
{

Acceptor::Acceptor(SocketFd fd, std::shared_ptr<Reactor>& reactor) :m_Fd(fd), m_Reactor(reactor)
{
}

void Acceptor::HandleRead()
{
sockaddr_in saddr;
socklen_t len = sizeof saddr;
SocketFd connfd = accept(m_Fd, (sockaddr*)&saddr, &len);
printf("%d\n", WSAGetLastError());
std::shared_ptr<Connector> conn = std::make_shared<Connector>(connfd, m_Reactor);
m_Reactor->RegisterHandler(conn,m_Reactor->ReadEvent());
}

void Acceptor::HandleWrite()
{
}

void Acceptor::HandleError()
{
m_Reactor->RemoveHandler(shared_from_this());
}

SocketFd Acceptor::GetFd()
{
return m_Fd;
}

}
23 changes: 23 additions & 0 deletions include/Acceptor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#ifndef _YOUNG_ACCEPTOR_H_
#define _YOUNG_ACCEPTOR_H_
#include<EventHandler.h>
#include<memory>
#include<Reactor.h>
#include<Connector.h>
namespace Young
{

class Acceptor :public EventHandler, public std::enable_shared_from_this<Acceptor>
{
public:
Acceptor(SocketFd fd, std::shared_ptr<Reactor>& reactor);
void HandleRead()override;
void HandleWrite()override;
void HandleError()override;
SocketFd GetFd()override;
private:
SocketFd m_Fd;
std::shared_ptr<Reactor> m_Reactor;
};
}
#endif // ! _YOUNG_ACCEPTOR_H_
25 changes: 25 additions & 0 deletions include/Connector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#include "Connector.h"
#include<iostream>
namespace Young
{
Connector::Connector(SocketFd fd, std::shared_ptr<Reactor>& reactor) :m_Fd(fd), m_Reactor(reactor)
{
m_Status = NCS_NORMAL;
}
void Connector::HandleRead()
{
recv(m_Fd, m_Buffer, 1000,0);
std::cout << m_Buffer << std::endl;
}
void Connector::HandleWrite()
{
}
void Connector::HandleError()
{
}
SocketFd Connector::GetFd()
{
return m_Fd;
}
}

39 changes: 39 additions & 0 deletions include/Connector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#ifndef _YOUNG_CONNECTOR_H_
#define _YOUNG_CONNECTOR_H_
#include<EventHandler.h>
#include<memory>
#include<Reactor.h>
namespace Young
{

enum NetConnectionType
{
NCT_ACTIVE, //主动连接
NCT_PASSIVE, //被动连接
};

/**
*@brief 连接状态
*/
enum NetConnectionStatus
{
NCS_CLOSED, //初始状态,还没连接
NCS_VERIFY, //验证阶段
NCS_NORMAL //正常通信状态
};
class Connector :public EventHandler, public std::enable_shared_from_this<Connector>
{
public:
Connector(SocketFd fd, std::shared_ptr<Reactor>& reactor);
void HandleRead()override;
void HandleWrite()override;
void HandleError()override;
SocketFd GetFd()override;
private:
SocketFd m_Fd;
std::shared_ptr<Reactor> m_Reactor;
char m_Buffer[10086];
NetConnectionStatus m_Status;
};
}
#endif // !_YOUNG_CONNECTOR_H_
Loading

0 comments on commit 942426a

Please sign in to comment.