Skip to content

Commit

Permalink
Merge pull request JuliaLang#5905 from JuliaLang/kf/ioiddict
Browse files Browse the repository at this point in the history
Alternative fix for JuliaLang#3567
  • Loading branch information
Keno committed Feb 27, 2014
2 parents 7d98c4c + e1f7b8e commit c4d44c4
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 95 deletions.
21 changes: 5 additions & 16 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1027,25 +1027,14 @@ function create_worker(privhost, port, pubhost, stream, config, manage)
w.manage = manage

if isa(stream, AsyncStream)
stream.line_buffered = true
let wrker = w
# redirect console output from workers to the client's stdout:
start_reading(stream,function(stream::AsyncStream,nread::Int)
if nread>0
try
line = readbytes(stream.buffer, nread)
if length(line) < nread
println(STDERR,"\tTruncated reply from worker $(wrker.id)")
return false
end
print("\tFrom worker $(wrker.id):\t",bytestring(line))
catch err
println(STDERR,"\tError parsing reply from worker $(wrker.id):\t",err)
return false
end
@async begin
while !eof(stream)
line = readline(stream)
print("\tFrom worker $(wrker.id):\t",bytestring(line))
end
true
end)
end
end
end

Expand Down
12 changes: 6 additions & 6 deletions base/poll.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type FileMonitor
end
this = new(handle,cb,false,Condition())
associate_julia_struct(handle,this)
finalizer(this,close)
finalizer(this,uvfinalize)
this
end
FileMonitor(file) = FileMonitor(false,file)
Expand Down Expand Up @@ -80,7 +80,7 @@ type PollingFileWatcher <: UVPollingWatcher
end
this = new(handle, file, false, Condition(), cb)
associate_julia_struct(handle,this)
finalizer(this,close)
finalizer(this,uvfinalize)
this
end
PollingFileWatcher(file) = PollingFileWatcher(false,file)
Expand Down Expand Up @@ -116,7 +116,7 @@ function FDWatcher(fd::RawFD)
end
this = FDWatcher(handle,fd,false,Condition(),false,FDEvent())
associate_julia_struct(handle,this)
finalizer(this,close)
finalizer(this,uvfinalize)
this
end
@windows_only function FDWatcher(fd::WindowsRawSocket)
Expand All @@ -129,7 +129,7 @@ end
end
this = FDWatcher(handle,fd,false,Condition(),false,FDEvent())
associate_julia_struct(handle,this)
finalizer(this,close)
finalizer(this,uvfinalize)
this
end

Expand Down Expand Up @@ -220,15 +220,15 @@ function wait(pfw::PollingFileWatcher; interval=2.0)
if !pfw.open
start_watching(pfw_wait_cb,pfw,interval)
end
prev,curr = wait(pfw.notify)
prev,curr = stream_wait(pfw,pfw.notify)
if isempty(pfw.notify.waitq)
stop_watching(pfw)
end
(prev,curr)
end

function wait(m::FileMonitor)
err, filename, events = wait(m.notify)
err, filename, events = stream_wait(m,m.notify)
filename, events
end

Expand Down
12 changes: 10 additions & 2 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ type Process
if !isa(err,AsyncStream) || err === DevNull
err=DevNull
end
new(cmd,handle,in,out,err,typemin(Int32),typemin(Int32),false,Condition(),false,Condition())
this = new(cmd,handle,in,out,err,typemin(Int32),typemin(Int32),false,Condition(),false,Condition())
finalizer(this, uvfinalize)
this
end
end

Expand Down Expand Up @@ -213,6 +215,12 @@ function _jl_spawn(cmd::Ptr{Uint8}, argv::Ptr{Ptr{Uint8}}, loop::Ptr{Void}, pp::
return proc
end

function uvfinalize(proc::Process)
proc.handle != C_NULL && ccall(:jl_close_uv,Void,(Ptr{Void},),proc.handle)
disassociate_julia_struct(proc)
proc.handle = 0
end

function _uv_hook_return_spawn(proc::Process, exit_status::Int64, termsignal::Int32)
proc.exitcode = exit_status
proc.termsignal = termsignal
Expand Down Expand Up @@ -595,7 +603,7 @@ macro cmd(str)
:(cmd_gen($(shell_parse(str)[1])))
end

wait(x::Process) = if !process_exited(x); wait(x.exitnotify); end
wait(x::Process) = if !process_exited(x); stream_wait(x,x.exitnotify); end
wait(x::ProcessChain) = for p in x.processes; wait(p); end

show(io::IO, p::Process) = print(io, "Process(", p.cmd, ", ", process_status(p), ")")
24 changes: 21 additions & 3 deletions base/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ type TcpSocket <: Socket
end
function TcpSocket()
this = TcpSocket(c_malloc(_sizeof_uv_tcp))
associate_julia_struct(this.handle, this)
associate_julia_struct(this.handle,this)
finalizer(this,uvfinalize)
err = ccall(:uv_tcp_init,Cint,(Ptr{Void},Ptr{Void}),
eventloop(),this.handle)
if err != 0
Expand Down Expand Up @@ -289,6 +290,7 @@ end
function TcpServer()
this = TcpServer(c_malloc(_sizeof_uv_tcp))
associate_julia_struct(this.handle, this)
finalizer(this,uvfinalize)
err = ccall(:uv_tcp_init,Cint,(Ptr{Void},Ptr{Void}),
eventloop(),this.handle)
if err != 0
Expand All @@ -300,6 +302,22 @@ function TcpServer()
this
end

# Internal version of close that doesn't error when called on an unitialized socket, as well as disassociating the socket immidiately
# This is fine because if we're calling this from a finalizer, nobody can be possibly waiting for the close to go through
function uvfinalize(uv)
close(uv)
disassociate_julia_struct(uv)
uv.handle = 0
end

function uvfinalize(uv::Union(TTY,Pipe,TcpServer,TcpSocket))
if (uv.status != StatusUninit && uv.status != StatusInit)
close(uv)
end
disassociate_julia_struct(uv)
uv.handle = 0
end

isreadable(io::TcpSocket) = true
iswritable(io::TcpSocket) = true

Expand Down Expand Up @@ -435,7 +453,7 @@ function recv(sock::UdpSocket)
error("Invalid socket state")
end
_recv_start(sock)
wait(sock.recvnotify)::Vector{Uint8}
stream_wait(sock,sock.recvnotify)::Vector{Uint8}
end

function _uv_hook_recv(sock::UdpSocket, nread::Ptr{Void}, buf_addr::Ptr{Void}, buf_size::Int32, addr::Ptr{Void}, flags::Int32)
Expand Down Expand Up @@ -463,7 +481,7 @@ function send(sock::UdpSocket,ipaddr,port,msg)
error("Invalid socket state")
end
uv_error("send",_send(sock,ipaddr,uint16(port),msg))
wait(sock.sendnotify)
stream_wait(sock,sock.sendnotify)
nothing
end

Expand Down
Loading

0 comments on commit c4d44c4

Please sign in to comment.