From fb41c0e2e3a9d0b1c0db0eb010c60d39e1a5b7a2 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Fri, 18 Apr 2014 21:57:16 +0300 Subject: [PATCH 01/47] asyncio: Initial prototype implementation. --- asyncio/asyncio.py | 100 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 asyncio/asyncio.py diff --git a/asyncio/asyncio.py b/asyncio/asyncio.py new file mode 100644 index 000000000..9f346fe89 --- /dev/null +++ b/asyncio/asyncio.py @@ -0,0 +1,100 @@ +import time +import heapq + + +def coroutine(f): + return f + + +def get_event_loop(): + return EventLoop() + + +class EventLoop: + + def __init__(self): + self.q = [] + self.cnt = 0 + + def time(self): + return time.time() + + def call_soon(self, callback, *args): + self.call_at(0, callback, *args) + + def call_later(self, delay, callback, *args): + self.call_at(self.time() + delay, callback, *args) + + def call_at(self, time, callback, *args): +# self.q.append((callback, args)) + # self.cnt is workaround per heapq docs +# print("Scheduling", (time, self.cnt, callback, args)) + heapq.heappush(self.q, (time, self.cnt, callback, args)) +# print(self.q) + self.cnt += 1 + +# def run_forever(self): +# while self.q: +# c = self.q.pop(0) +# c[0](*c[1]) + + def run_forever(self): + while self.q: +# t, cnt, cb, args = self.q.pop(0) + t, cnt, cb, args = heapq.heappop(self.q) + tnow = self.time() + delay = t - tnow + if delay > 0: +# print("Sleeping for:", delay) + time.sleep(delay) + delay = 0 + try: + ret = next(cb) +# print("ret:", ret) + if isinstance(ret, Sleep): + delay = ret.args[0] + except StopIteration as e: + print(c, "finished") + continue + #self.q.append(c) + self.call_later(delay, cb, *args) + + def run_until_complete(self, coro): + val = None + while True: + try: + ret = coro.send(val) + except StopIteration as e: + print(e) + break + print("ret:", ret) + if isinstance(ret, SysCall): + ret.handle() + + def close(self): + pass + + +class SysCall: + + def __init__(self, call, *args): + self.call = call + self.args = args + +class Sleep(SysCall): + + def handle(self): + time.sleep(self.args[0]) + + +def sleep(secs): + yield Sleep("sleep", secs) + +def sleep2(secs): + t = time.time() +# print("Started sleep:", t, "targetting:", t + secs) + while time.time() < t + secs: + time.sleep(0.01) + yield None +# print("Finished sleeping", secs) +# time.sleep(secs) From a80cf93d4acea6957e23b0052a0c3a922a5a91ab Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sun, 20 Apr 2014 00:01:57 +0300 Subject: [PATCH 02/47] asyncio: Implement subclass implementing filedes watching interface. --- asyncio/asyncio.py | 34 ++++++++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/asyncio/asyncio.py b/asyncio/asyncio.py index 9f346fe89..d076f824b 100644 --- a/asyncio/asyncio.py +++ b/asyncio/asyncio.py @@ -6,10 +6,6 @@ def coroutine(f): return f -def get_event_loop(): - return EventLoop() - - class EventLoop: def __init__(self): @@ -38,6 +34,10 @@ def call_at(self, time, callback, *args): # c = self.q.pop(0) # c[0](*c[1]) + def wait(self, delay): +# print("Sleeping for:", delay) + time.sleep(delay) + def run_forever(self): while self.q: # t, cnt, cb, args = self.q.pop(0) @@ -45,8 +45,7 @@ def run_forever(self): tnow = self.time() delay = t - tnow if delay > 0: -# print("Sleeping for:", delay) - time.sleep(delay) + self.wait(delay) delay = 0 try: ret = next(cb) @@ -74,6 +73,26 @@ def run_until_complete(self, coro): def close(self): pass +import select + +class EpollEventLoop(EventLoop): + + def __init__(self): + EventLoop.__init__(self) + self.poller = select.epoll(1) + + def add_reader(self, fd, cb, *args): + self.poller.register(fd, select.EPOLLIN, (cb, args)) + + def add_writer(self, fd, cb, *args): + self.poller.register(fd, select.EPOLLOUT, (cb, args)) + + def wait(self, delay): + res = self.poller.poll(int(delay * 1000)) + print("poll: ", res) + for cb, ev in res: + cb[0](*cb[1]) + class SysCall: @@ -87,6 +106,9 @@ def handle(self): time.sleep(self.args[0]) +def get_event_loop(): + return EpollEventLoop() + def sleep(secs): yield Sleep("sleep", secs) From 7b5780bc665219cbd1d708df06f8ad4ad1e63aad Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sun, 20 Apr 2014 00:47:43 +0300 Subject: [PATCH 03/47] asyncio: Recover eventloop's ability to work with callbacks. Actually, coroutine support for call_soon() is a hack, in big asyncio coroutine should be wrapped in Task object. --- asyncio/asyncio.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/asyncio/asyncio.py b/asyncio/asyncio.py index d076f824b..5713459e2 100644 --- a/asyncio/asyncio.py +++ b/asyncio/asyncio.py @@ -46,17 +46,20 @@ def run_forever(self): delay = t - tnow if delay > 0: self.wait(delay) - delay = 0 - try: - ret = next(cb) -# print("ret:", ret) - if isinstance(ret, Sleep): - delay = ret.args[0] - except StopIteration as e: - print(c, "finished") - continue - #self.q.append(c) - self.call_later(delay, cb, *args) + if callable(cb): + cb(*args) + else: + delay = 0 + try: + ret = next(cb) +# print("ret:", ret) + if isinstance(ret, Sleep): + delay = ret.args[0] + except StopIteration as e: + print(c, "finished") + continue + #self.q.append(c) + self.call_later(delay, cb, *args) def run_until_complete(self, coro): val = None From dbb8857e15903648a9e0b901a96719a9ca695e67 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sun, 20 Apr 2014 05:58:12 +0300 Subject: [PATCH 04/47] asyncio: Make run_forever() actually run forever. --- asyncio/asyncio.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/asyncio/asyncio.py b/asyncio/asyncio.py index 5713459e2..e5c174a5b 100644 --- a/asyncio/asyncio.py +++ b/asyncio/asyncio.py @@ -39,13 +39,17 @@ def wait(self, delay): time.sleep(delay) def run_forever(self): - while self.q: -# t, cnt, cb, args = self.q.pop(0) - t, cnt, cb, args = heapq.heappop(self.q) - tnow = self.time() - delay = t - tnow - if delay > 0: - self.wait(delay) + while True: + if self.q: + t, cnt, cb, args = heapq.heappop(self.q) + tnow = self.time() + delay = t - tnow + if delay > 0: + self.wait(delay) + else: + self.wait(-1) + # Assuming IO completion scheduled some tasks + continue if callable(cb): cb(*args) else: From b8f41198de0ccd72ae2726aa379d0690a11515ac Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sun, 20 Apr 2014 06:01:04 +0300 Subject: [PATCH 05/47] asyncio: EpollEventLoop.wait(): support infinite wait. --- asyncio/asyncio.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/asyncio/asyncio.py b/asyncio/asyncio.py index e5c174a5b..238565cb0 100644 --- a/asyncio/asyncio.py +++ b/asyncio/asyncio.py @@ -95,7 +95,11 @@ def add_writer(self, fd, cb, *args): self.poller.register(fd, select.EPOLLOUT, (cb, args)) def wait(self, delay): - res = self.poller.poll(int(delay * 1000)) + print("epoll.wait", delay) + if delay == -1: + res = self.poller.poll(-1) + else: + res = self.poller.poll(int(delay * 1000)) print("poll: ", res) for cb, ev in res: cb[0](*cb[1]) From f9fd9ddd42da572720d71649a8f5dd62a617328e Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sun, 20 Apr 2014 06:05:50 +0300 Subject: [PATCH 06/47] asyncio: Support read/write syscalls, and route vals both ways between coros. --- asyncio/asyncio.py | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/asyncio/asyncio.py b/asyncio/asyncio.py index 238565cb0..e2e266e73 100644 --- a/asyncio/asyncio.py +++ b/asyncio/asyncio.py @@ -55,12 +55,22 @@ def run_forever(self): else: delay = 0 try: - ret = next(cb) -# print("ret:", ret) - if isinstance(ret, Sleep): - delay = ret.args[0] + if args == (): + args = (None,) + print("Send args:", args) + ret = cb.send(*args) + print("ret:", ret) + if isinstance(ret, SysCall): + if isinstance(ret, Sleep): + delay = ret.args[0] + elif isinstance(ret, IORead): + self.add_reader(ret.obj.fileno(), lambda f: self.call_soon(cb, f), ret.obj) + continue + elif isinstance(ret, IOWrite): + self.add_writer(ret.obj.fileno(), lambda f: self.call_soon(cb, f), ret.obj) + continue except StopIteration as e: - print(c, "finished") + print(cb, "finished") continue #self.q.append(c) self.call_later(delay, cb, *args) @@ -116,6 +126,16 @@ class Sleep(SysCall): def handle(self): time.sleep(self.args[0]) +class IORead(SysCall): + + def __init__(self, obj): + self.obj = obj + +class IOWrite(SysCall): + + def __init__(self, obj): + self.obj = obj + def get_event_loop(): return EpollEventLoop() From 0762929bd3c91118e45a7eb3f3830e27225183c0 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sun, 20 Apr 2014 06:08:49 +0300 Subject: [PATCH 07/47] asyncio: Start adding asyncio stream interface. --- asyncio/asyncio.py | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/asyncio/asyncio.py b/asyncio/asyncio.py index e2e266e73..4a1832373 100644 --- a/asyncio/asyncio.py +++ b/asyncio/asyncio.py @@ -151,3 +151,47 @@ def sleep2(secs): yield None # print("Finished sleeping", secs) # time.sleep(secs) + + +import microsocket as _socket + +class StreamReader: + + def __init__(self, s): + self.s = s + + def readline(self): + print("readline") + s = yield IORead(self.s) + print("after IORead") + res = self.s.readline() + print("readline res:", res) + return res + + +class StreamWriter: + + def __init__(self, s): + self.s = s + + def write(self, buf): + print("Write!") + res = self.s.write(buf) + print("write res:", res) + s = yield IOWrite(self.s) + print("returning write res:", res) + + +def open_connection(host, port): + s = _socket.socket() + s.setblocking(False) + ai = _socket.getaddrinfo(host, port) + addr = ai[0][4] + try: + s.connect(addr) + except OSError as e: + print(e.args[0]) + print("After connect") + s = yield IOWrite(s) + print("After iowait:", s) + return StreamReader(s), StreamWriter(s) From 40af791abf441061de80ead8629f7d76cde138dc Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sun, 20 Apr 2014 06:09:45 +0300 Subject: [PATCH 08/47] asyncio: Add dumb debug output. --- asyncio/asyncio.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/asyncio/asyncio.py b/asyncio/asyncio.py index 4a1832373..1f4687e0e 100644 --- a/asyncio/asyncio.py +++ b/asyncio/asyncio.py @@ -99,9 +99,11 @@ def __init__(self): self.poller = select.epoll(1) def add_reader(self, fd, cb, *args): + print("add_reader") self.poller.register(fd, select.EPOLLIN, (cb, args)) def add_writer(self, fd, cb, *args): + print("add_writer") self.poller.register(fd, select.EPOLLOUT, (cb, args)) def wait(self, delay): @@ -112,6 +114,7 @@ def wait(self, delay): res = self.poller.poll(int(delay * 1000)) print("poll: ", res) for cb, ev in res: + print("Calling %s%s" % (cb[0], cb[1])) cb[0](*cb[1]) From dddf237fb6b6d7131f658ac2e141e8cbf00463ae Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Tue, 22 Apr 2014 02:51:04 +0300 Subject: [PATCH 09/47] asyncio: Use logging and errno modules. --- asyncio/asyncio.py | 46 ++++++++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/asyncio/asyncio.py b/asyncio/asyncio.py index 1f4687e0e..b5a862f9f 100644 --- a/asyncio/asyncio.py +++ b/asyncio/asyncio.py @@ -1,7 +1,12 @@ +import __main__ import time import heapq +import errno +import logging +log = logging.getLogger("asyncio") + def coroutine(f): return f @@ -24,7 +29,7 @@ def call_later(self, delay, callback, *args): def call_at(self, time, callback, *args): # self.q.append((callback, args)) # self.cnt is workaround per heapq docs -# print("Scheduling", (time, self.cnt, callback, args)) + log.debug("Scheduling %s", (time, self.cnt, callback, args)) heapq.heappush(self.q, (time, self.cnt, callback, args)) # print(self.q) self.cnt += 1 @@ -42,6 +47,8 @@ def run_forever(self): while True: if self.q: t, cnt, cb, args = heapq.heappop(self.q) + log.debug("Next task to run: %s", (t, cnt, cb, args)) +# __main__.mem_info() tnow = self.time() delay = t - tnow if delay > 0: @@ -57,20 +64,22 @@ def run_forever(self): try: if args == (): args = (None,) - print("Send args:", args) + log.debug("Gen send args: %s", args) ret = cb.send(*args) - print("ret:", ret) + log.debug("Gen yield result: %s", ret) if isinstance(ret, SysCall): if isinstance(ret, Sleep): delay = ret.args[0] elif isinstance(ret, IORead): +# self.add_reader(ret.obj.fileno(), lambda self, c, f: self.call_soon(c, f), self, cb, ret.obj) +# self.add_reader(ret.obj.fileno(), lambda c, f: self.call_soon(c, f), cb, ret.obj) self.add_reader(ret.obj.fileno(), lambda f: self.call_soon(cb, f), ret.obj) continue elif isinstance(ret, IOWrite): self.add_writer(ret.obj.fileno(), lambda f: self.call_soon(cb, f), ret.obj) continue except StopIteration as e: - print(cb, "finished") + log.debug("Gen finished: %s", cb) continue #self.q.append(c) self.call_later(delay, cb, *args) @@ -99,22 +108,22 @@ def __init__(self): self.poller = select.epoll(1) def add_reader(self, fd, cb, *args): - print("add_reader") + log.debug("add_reader%s", (fd, cb, args)) self.poller.register(fd, select.EPOLLIN, (cb, args)) def add_writer(self, fd, cb, *args): - print("add_writer") + log.debug("add_writer%s", (fd, cb, args)) self.poller.register(fd, select.EPOLLOUT, (cb, args)) def wait(self, delay): - print("epoll.wait", delay) + log.debug("epoll.wait(%d)", delay) if delay == -1: res = self.poller.poll(-1) else: res = self.poller.poll(int(delay * 1000)) - print("poll: ", res) + log.debug("epoll result: %s", res) for cb, ev in res: - print("Calling %s%s" % (cb[0], cb[1])) + log.debug("Calling IO callback: %s%s", cb[0], cb[1]) cb[0](*cb[1]) @@ -164,11 +173,11 @@ def __init__(self, s): self.s = s def readline(self): - print("readline") + log.debug("StreamReader.readline()") s = yield IORead(self.s) - print("after IORead") + log.debug("StreamReader.readline(): after IORead: %s", s) res = self.s.readline() - print("readline res:", res) + log.debug("StreamReader.readline(): res: %s", res) return res @@ -178,14 +187,14 @@ def __init__(self, s): self.s = s def write(self, buf): - print("Write!") res = self.s.write(buf) - print("write res:", res) + log.debug("StreamWriter.write(): %d", res) s = yield IOWrite(self.s) - print("returning write res:", res) + log.debug("StreamWriter.write(): returning") def open_connection(host, port): + log.debug("open_connection(%s, %s)", host, port) s = _socket.socket() s.setblocking(False) ai = _socket.getaddrinfo(host, port) @@ -193,8 +202,9 @@ def open_connection(host, port): try: s.connect(addr) except OSError as e: - print(e.args[0]) - print("After connect") + if e.args[0] != errno.EINPROGRESS: + raise + log.debug("open_connection: After connect") s = yield IOWrite(s) - print("After iowait:", s) + log.debug("open_connection: After iowait: %s", s) return StreamReader(s), StreamWriter(s) From 395e2ecee6b03f085de26432d2537f5caf657ddc Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Thu, 24 Apr 2014 01:18:51 +0300 Subject: [PATCH 10/47] asyncio: Add remove_reader()/remove_writer(). --- asyncio/asyncio.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/asyncio/asyncio.py b/asyncio/asyncio.py index b5a862f9f..827d9252a 100644 --- a/asyncio/asyncio.py +++ b/asyncio/asyncio.py @@ -111,10 +111,18 @@ def add_reader(self, fd, cb, *args): log.debug("add_reader%s", (fd, cb, args)) self.poller.register(fd, select.EPOLLIN, (cb, args)) + def remove_reader(self, fd): + log.debug("remove_reader(%s)", fd) + self.poller.unregister(fd) + def add_writer(self, fd, cb, *args): log.debug("add_writer%s", (fd, cb, args)) self.poller.register(fd, select.EPOLLOUT, (cb, args)) + def remove_writer(self, fd): + log.debug("remove_writer(%s)", fd) + self.poller.unregister(fd) + def wait(self, delay): log.debug("epoll.wait(%d)", delay) if delay == -1: From ebc6a1faf2c2ff70f7984611ea5215cd135f92df Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Thu, 24 Apr 2014 01:26:55 +0300 Subject: [PATCH 11/47] asyncio: Add asyncio.async() dummy factory function. Not Task bloat implemented (so far?), so just identity function for CPython compatibility. --- asyncio/asyncio.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/asyncio/asyncio.py b/asyncio/asyncio.py index 827d9252a..caf079495 100644 --- a/asyncio/asyncio.py +++ b/asyncio/asyncio.py @@ -160,6 +160,10 @@ def __init__(self, obj): def get_event_loop(): return EpollEventLoop() +def async(coro): + # We don't have Task bloat, so op is null + return coro + def sleep(secs): yield Sleep("sleep", secs) From e6ed3ffceb95b6415e0d3b131f03a1af23d57414 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Thu, 24 Apr 2014 01:35:10 +0300 Subject: [PATCH 12/47] asyncio: Add basic loop.call_soon() test. --- asyncio/test_call_soon.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 asyncio/test_call_soon.py diff --git a/asyncio/test_call_soon.py b/asyncio/test_call_soon.py new file mode 100644 index 000000000..8ab5cb879 --- /dev/null +++ b/asyncio/test_call_soon.py @@ -0,0 +1,13 @@ +import asyncio +import time + + +def cb(): + print("callback") + time.sleep(0.5) + loop.call_soon(cb) + + +loop = asyncio.get_event_loop() +loop.call_soon(cb) +loop.run_forever() From 2caed7ddef63eac106cd7a20eb16a8a561df4ceb Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Thu, 24 Apr 2014 02:13:21 +0300 Subject: [PATCH 13/47] asyncio: Handle end of stream condition properly. By removing any IO watches for associated file handle. The way it's implemented tries to preserve OS-like separation between event loop and tasks. So, stream to finish watching fd for IO also issues syscall, instead of calling methods on loop instance directly. Calling method on loop would be more efficient, but will require storing reference to loop in each stream. And those separation matters... --- asyncio/asyncio.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/asyncio/asyncio.py b/asyncio/asyncio.py index caf079495..bdddfb3a8 100644 --- a/asyncio/asyncio.py +++ b/asyncio/asyncio.py @@ -7,6 +7,10 @@ log = logging.getLogger("asyncio") +IO_READ = 1 +IO_WRITE = 2 + + def coroutine(f): return f @@ -78,6 +82,12 @@ def run_forever(self): elif isinstance(ret, IOWrite): self.add_writer(ret.obj.fileno(), lambda f: self.call_soon(cb, f), ret.obj) continue + elif isinstance(ret, IODone): + if ret.op == IO_READ: + self.remove_reader(ret.obj.fileno()) + elif ret.op == IO_WRITE: + self.remove_writer(ret.obj.fileno()) + continue except StopIteration as e: log.debug("Gen finished: %s", cb) continue @@ -156,6 +166,12 @@ class IOWrite(SysCall): def __init__(self, obj): self.obj = obj +class IODone(SysCall): + + def __init__(self, op, obj): + self.op = op + self.obj = obj + def get_event_loop(): return EpollEventLoop() @@ -189,6 +205,8 @@ def readline(self): s = yield IORead(self.s) log.debug("StreamReader.readline(): after IORead: %s", s) res = self.s.readline() + if not res: + yield IODone(IO_READ, self.s) log.debug("StreamReader.readline(): res: %s", res) return res From 4c4a74368a0a6c83b6ce53f067bf8ad0652ce3cc Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sun, 20 Apr 2014 06:27:43 +0300 Subject: [PATCH 14/47] asyncio: Add basic asyncio stream interface test. --- asyncio/test_http_client.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 asyncio/test_http_client.py diff --git a/asyncio/test_http_client.py b/asyncio/test_http_client.py new file mode 100644 index 000000000..4cd292a0e --- /dev/null +++ b/asyncio/test_http_client.py @@ -0,0 +1,25 @@ +import asyncio + +@asyncio.coroutine +def print_http_headers(url): + reader, writer = yield from asyncio.open_connection(url, 80) + print(reader, writer) + print("================") + query = "GET / HTTP/1.0\r\n\r\n" + yield from writer.write(query.encode('latin-1')) + while True: + line = yield from reader.readline() + if not line: + break + if line: + print(line.rstrip()) + +import logging +logging.basicConfig(level=logging.INFO) +url = "google.com" +loop = asyncio.get_event_loop() +#task = asyncio.async(print_http_headers(url)) +#loop.run_until_complete(task) +loop.call_soon(print_http_headers(url)) +loop.run_forever() +loop.close() From 5166ecbbd14dad3e5b8ea53bda31656f2df2c0e4 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sat, 26 Apr 2014 07:59:18 +0300 Subject: [PATCH 15/47] asyncio_slow: Start new upstream API-compatible asyncio implementation. The trait of this implementation is that it doesn't use priority queue and time scheduling, and instead does its all operations using polling, starting with such basic one as sleep. On the other hand, this tries to implement all (well, much) of upstream asyncio API and warts. asyncio_slow: Rename from asyncio_micro. It may turn out that this won't be "micro" at all. The main trait of this implementation is that it stay 100% API compatible with upstream (in those APIs which are implemented of course). It will also keep inefficient implementation of event loop scheduling, to discourage its use. Here we go. --- asyncio_slow/asyncio_slow.py | 144 +++++++++++++++++++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 asyncio_slow/asyncio_slow.py diff --git a/asyncio_slow/asyncio_slow.py b/asyncio_slow/asyncio_slow.py new file mode 100644 index 000000000..ef21708f3 --- /dev/null +++ b/asyncio_slow/asyncio_slow.py @@ -0,0 +1,144 @@ +import time +import logging + + +log = logging.getLogger("asyncio") + + +# Workaround for not being able to subclass builtin types +DoneException = AssertionError + +class InvalidStateError: + pass + +# Object not matching any other object +_sentinel = [] + + +class EventLoop: + + def __init__(self): + self.q = [] + + def call_soon(self, c, *args): + self.q.append(c) + + def run_forever(self): + while self.q: + c = self.q.pop(0) + c() + # I mean, forever + while True: + time.sleep(1) + + def run_until_complete(self, coro): + def _cb(val): + raise DoneException + + t = async(coro) + t.add_done_callback(_cb) + self.call_soon(t) + try: + self.run_forever() + except DoneException: + pass + + def close(self): + pass + + +_def_event_loop = EventLoop() + + +class Future: + + def __init__(self, loop=_def_event_loop): + self.loop = loop + self.res = _sentinel + self.cbs = [] + + def result(self): + if self.res is _sentinel: + raise InvalidStateError + return self.res + + def add_done_callback(self, fn): + if self.res is _sentinel: + self.cbs.append(fn) + else: + self.loop.call_soon(fn, self) + + def set_result(self, val): + self.res = val + for f in self.cbs: + f(self) + + +class Task(Future): + + def __init__(self, coro, loop=_def_event_loop): + super().__init__() + self.loop = loop + self.c = coro + # upstream asyncio forces task to be scheduled on instantiation + self.loop.call_soon(self) + + def __call__(self): + try: + next(self.c) + self.loop.call_soon(self) + except StopIteration as e: + log.debug("Coro finished: %s", self.c) + self.set_result(None) + + +def get_event_loop(): + return _def_event_loop + + +# Decorator +def coroutine(f): + return f + + +def async(coro): + if isinstance(coro, Future): + return coro + return Task(coro) + + +class Wait(Future): + + def __init__(self, n): + Future.__init__(self) + self.n = n + + def _done(self): + self.n -= 1 + log.debug("Wait: remaining tasks: %d", self.n) + if not self.n: + self.set_result(None) + + def __call__(self): + pass + + +def wait(coro_list, loop=_def_event_loop): + + w = Wait(len(coro_list)) + + for c in coro_list: + t = async(c) + t.add_done_callback(lambda val: w._done()) + loop.call_soon(t) + + return w + + +def sleep(secs): + t = time.time() + log.debug("Started sleep at: %s, targetting: %s", t, t + secs) + while time.time() < t + secs: + time.sleep(0.01) + yield + log.debug("Finished sleeping %ss", secs) From 1d7b189e7579a5c3e3f4e3f5b7ed7b7956866a27 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Thu, 1 May 2014 23:20:44 +0300 Subject: [PATCH 16/47] asyncio_slow: Fix call_soon(), add call_later(). --- asyncio_slow/asyncio_slow.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/asyncio_slow/asyncio_slow.py b/asyncio_slow/asyncio_slow.py index ef21708f3..bf82fcac1 100644 --- a/asyncio_slow/asyncio_slow.py +++ b/asyncio_slow/asyncio_slow.py @@ -21,12 +21,18 @@ def __init__(self): self.q = [] def call_soon(self, c, *args): - self.q.append(c) + self.q.append((c, args)) + + def call_later(self, delay, c, *args): + def _delayed(c, args, delay): + yield from sleep(delay) + self.call_soon(c, *args) + Task(_delayed(c, args, delay)) def run_forever(self): while self.q: c = self.q.pop(0) - c() + c[0](*c[1]) # I mean, forever while True: time.sleep(1) From b0c4fc7536a92d0e570713633cc038047f5a8fd3 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Thu, 1 May 2014 23:45:04 +0300 Subject: [PATCH 17/47] asyncio_slow: Add callback example from docs. --- asyncio_slow/test_hello_world_callback.py | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 asyncio_slow/test_hello_world_callback.py diff --git a/asyncio_slow/test_hello_world_callback.py b/asyncio_slow/test_hello_world_callback.py new file mode 100644 index 000000000..9836ffd7b --- /dev/null +++ b/asyncio_slow/test_hello_world_callback.py @@ -0,0 +1,11 @@ +# https://docs.python.org/3.4/library/asyncio-eventloop.html#example-hello-world-callback +#import asyncio +import asyncio_slow as asyncio + +def print_and_repeat(loop): + print('Hello World') + loop.call_later(2, print_and_repeat, loop) + +loop = asyncio.get_event_loop() +loop.call_soon(print_and_repeat, loop) +loop.run_forever() From 93178819b31d5aa4c02765a37d041d20990ea5d5 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Thu, 1 May 2014 23:48:41 +0300 Subject: [PATCH 18/47] asyncio_slow: Add coroutine example from docs. --- asyncio_slow/test_hello_world.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 asyncio_slow/test_hello_world.py diff --git a/asyncio_slow/test_hello_world.py b/asyncio_slow/test_hello_world.py new file mode 100644 index 000000000..fab558134 --- /dev/null +++ b/asyncio_slow/test_hello_world.py @@ -0,0 +1,12 @@ +#https://docs.python.org/3.4/library/asyncio-task.html#example-hello-world-coroutine +#import asyncio +import asyncio_slow as asyncio + +@asyncio.coroutine +def greet_every_two_seconds(): + while True: + print('Hello World') + yield from asyncio.sleep(2) + +loop = asyncio.get_event_loop() +loop.run_until_complete(greet_every_two_seconds()) From 6ded654afe09c43e97d438bbbc4763bcddd77aa7 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Thu, 1 May 2014 23:50:40 +0300 Subject: [PATCH 19/47] asyncio_slow: Add example for scheduling coro using Task. --- asyncio_slow/test_hello_world_bare.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 asyncio_slow/test_hello_world_bare.py diff --git a/asyncio_slow/test_hello_world_bare.py b/asyncio_slow/test_hello_world_bare.py new file mode 100644 index 000000000..1f8d9702f --- /dev/null +++ b/asyncio_slow/test_hello_world_bare.py @@ -0,0 +1,12 @@ +#import asyncio +import asyncio_slow as asyncio + +@asyncio.coroutine +def greet_every_two_seconds(): + while True: + print('Hello World') + yield from asyncio.sleep(2) + +loop = asyncio.get_event_loop() +asyncio.Task(greet_every_two_seconds()) +loop.run_forever() From 6bff4dbe72b084aedcb1c28dd87b12710f261f45 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Thu, 1 May 2014 23:55:22 +0300 Subject: [PATCH 20/47] asyncio_slow: Add example on chaining coros using "yield from" from docs. --- asyncio_slow/test_chain.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 asyncio_slow/test_chain.py diff --git a/asyncio_slow/test_chain.py b/asyncio_slow/test_chain.py new file mode 100644 index 000000000..8d6b9a615 --- /dev/null +++ b/asyncio_slow/test_chain.py @@ -0,0 +1,18 @@ +#https://docs.python.org/3.4/library/asyncio-task.html#example-chain-coroutines +#import asyncio +import asyncio_slow as asyncio + +@asyncio.coroutine +def compute(x, y): + print("Compute %s + %s ..." % (x, y)) + yield from asyncio.sleep(1.0) + return x + y + +@asyncio.coroutine +def print_sum(x, y): + result = yield from compute(x, y) + print("%s + %s = %s" % (x, y, result)) + +loop = asyncio.get_event_loop() +loop.run_until_complete(print_sum(1, 2)) +loop.close() From 3a639ce666e229fa7aea583d1ca7ec53c7de37ae Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Fri, 2 May 2014 00:22:36 +0300 Subject: [PATCH 21/47] asyncio_slow: run_until_complete() should not schedule anyhing. Everything should be scheduled either already, or async() does this. --- asyncio_slow/asyncio_slow.py | 1 - 1 file changed, 1 deletion(-) diff --git a/asyncio_slow/asyncio_slow.py b/asyncio_slow/asyncio_slow.py index bf82fcac1..e0a894336 100644 --- a/asyncio_slow/asyncio_slow.py +++ b/asyncio_slow/asyncio_slow.py @@ -43,7 +43,6 @@ def _cb(val): t = async(coro) t.add_done_callback(_cb) - self.call_soon(t) try: self.run_forever() except DoneException: From 8711a45a13fb3cfa0637e60719a0d9f81ba25997 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Fri, 2 May 2014 02:45:06 +0300 Subject: [PATCH 22/47] asyncio_slow: Implement loop.stop(). --- asyncio_slow/asyncio_slow.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/asyncio_slow/asyncio_slow.py b/asyncio_slow/asyncio_slow.py index e0a894336..e82afcf1d 100644 --- a/asyncio_slow/asyncio_slow.py +++ b/asyncio_slow/asyncio_slow.py @@ -6,9 +6,10 @@ # Workaround for not being able to subclass builtin types -DoneException = AssertionError +class LoopStop(Exception): + pass -class InvalidStateError: +class InvalidStateError(Exception): pass # Object not matching any other object @@ -32,21 +33,23 @@ def _delayed(c, args, delay): def run_forever(self): while self.q: c = self.q.pop(0) - c[0](*c[1]) + try: + c[0](*c[1]) + except LoopStop: + return # I mean, forever while True: time.sleep(1) - def run_until_complete(self, coro): - def _cb(val): - raise DoneException + def stop(self): + def _cb(): + raise LoopStop + self.call_soon(_cb) + def run_until_complete(self, coro): t = async(coro) - t.add_done_callback(_cb) - try: - self.run_forever() - except DoneException: - pass + t.add_done_callback(lambda a: self.stop()) + self.run_forever() def close(self): pass From 96cbb50e2744a740e50a8fc33edc63f8557d7b63 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Fri, 2 May 2014 02:47:54 +0300 Subject: [PATCH 23/47] asyncio_slow: Add Future examples from docs. --- asyncio_slow/test_future.py | 15 +++++++++++++++ asyncio_slow/test_future2.py | 21 +++++++++++++++++++++ 2 files changed, 36 insertions(+) create mode 100644 asyncio_slow/test_future.py create mode 100644 asyncio_slow/test_future2.py diff --git a/asyncio_slow/test_future.py b/asyncio_slow/test_future.py new file mode 100644 index 000000000..53026c8d0 --- /dev/null +++ b/asyncio_slow/test_future.py @@ -0,0 +1,15 @@ +#https://docs.python.org/3.4/library/asyncio-task.html#example-chain-coroutines +#import asyncio +import asyncio_slow as asyncio + +@asyncio.coroutine +def slow_operation(future): + yield from asyncio.sleep(1) + future.set_result('Future is done!') + +loop = asyncio.get_event_loop() +future = asyncio.Future() +asyncio.Task(slow_operation(future)) +loop.run_until_complete(future) +print(future.result()) +loop.close() diff --git a/asyncio_slow/test_future2.py b/asyncio_slow/test_future2.py new file mode 100644 index 000000000..8ba03ef85 --- /dev/null +++ b/asyncio_slow/test_future2.py @@ -0,0 +1,21 @@ +#https://docs.python.org/3.4/library/asyncio-task.html#example-future-with-run-forever +#import asyncio +import asyncio_slow as asyncio + +@asyncio.coroutine +def slow_operation(future): + yield from asyncio.sleep(1) + future.set_result('Future is done!') + +def got_result(future): + print(future.result()) + loop.stop() + +loop = asyncio.get_event_loop() +future = asyncio.Future() +asyncio.Task(slow_operation(future)) +future.add_done_callback(got_result) +try: + loop.run_forever() +finally: + loop.close() \ No newline at end of file From be628acf7d73a6450bcca168e943dfc9f9b6f2f7 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Fri, 2 May 2014 02:52:59 +0300 Subject: [PATCH 24/47] asyncio_slow: Fix wait: again, should not schedule anything itself. --- asyncio_slow/asyncio_slow.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/asyncio_slow/asyncio_slow.py b/asyncio_slow/asyncio_slow.py index e82afcf1d..55f1347f1 100644 --- a/asyncio_slow/asyncio_slow.py +++ b/asyncio_slow/asyncio_slow.py @@ -115,7 +115,7 @@ def async(coro): return Task(coro) -class Wait(Future): +class _Wait(Future): def __init__(self, n): Future.__init__(self) @@ -133,12 +133,11 @@ def __call__(self): def wait(coro_list, loop=_def_event_loop): - w = Wait(len(coro_list)) + w = _Wait(len(coro_list)) for c in coro_list: t = async(c) t.add_done_callback(lambda val: w._done()) - loop.call_soon(t) return w From dd80d188661ed8da87935c30051973a9370e1b99 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Fri, 2 May 2014 02:54:11 +0300 Subject: [PATCH 25/47] asyncio_slow: Add example of wait() from docs. --- asyncio_slow/test_parallel.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 asyncio_slow/test_parallel.py diff --git a/asyncio_slow/test_parallel.py b/asyncio_slow/test_parallel.py new file mode 100644 index 000000000..48a187b87 --- /dev/null +++ b/asyncio_slow/test_parallel.py @@ -0,0 +1,21 @@ +#https://docs.python.org/3.4/library/asyncio-task.html#example-parallel-execution-of-tasks +#import asyncio +import asyncio_slow as asyncio + +@asyncio.coroutine +def factorial(name, number): + f = 1 + for i in range(2, number+1): + print("Task %s: Compute factorial(%s)..." % (name, i)) + yield from asyncio.sleep(1) + f *= i + print("Task %s: factorial(%s) = %s" % (name, number, f)) + +tasks = [ + asyncio.Task(factorial("A", 2)), + asyncio.Task(factorial("B", 3)), + asyncio.Task(factorial("C", 4))] + +loop = asyncio.get_event_loop() +loop.run_until_complete(asyncio.wait(tasks)) +loop.close() From dab2a2233ef2e6e7bdae15df1e71a3cdb05f586c Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Fri, 2 May 2014 20:17:03 +0300 Subject: [PATCH 26/47] asyncio: Remove polling sleep implementation, it belongs in async_slow. --- asyncio/asyncio.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/asyncio/asyncio.py b/asyncio/asyncio.py index bdddfb3a8..9e08a1bd8 100644 --- a/asyncio/asyncio.py +++ b/asyncio/asyncio.py @@ -183,15 +183,6 @@ def async(coro): def sleep(secs): yield Sleep("sleep", secs) -def sleep2(secs): - t = time.time() -# print("Started sleep:", t, "targetting:", t + secs) - while time.time() < t + secs: - time.sleep(0.01) - yield None -# print("Finished sleeping", secs) -# time.sleep(secs) - import microsocket as _socket From 6347de15372aeb08143d99bdd1b20a64c3cc057f Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Fri, 2 May 2014 21:10:21 +0300 Subject: [PATCH 27/47] asyncio: Clean up code a bit. --- asyncio/asyncio.py | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/asyncio/asyncio.py b/asyncio/asyncio.py index 9e08a1bd8..3652289db 100644 --- a/asyncio/asyncio.py +++ b/asyncio/asyncio.py @@ -11,10 +11,6 @@ IO_WRITE = 2 -def coroutine(f): - return f - - class EventLoop: def __init__(self): @@ -31,20 +27,16 @@ def call_later(self, delay, callback, *args): self.call_at(self.time() + delay, callback, *args) def call_at(self, time, callback, *args): -# self.q.append((callback, args)) - # self.cnt is workaround per heapq docs + # Including self.cnt is a workaround per heapq docs log.debug("Scheduling %s", (time, self.cnt, callback, args)) heapq.heappush(self.q, (time, self.cnt, callback, args)) # print(self.q) self.cnt += 1 -# def run_forever(self): -# while self.q: -# c = self.q.pop(0) -# c[0](*c[1]) - def wait(self, delay): -# print("Sleeping for:", delay) + # Default wait implementation, to be overriden in subclasses + # with IO scheduling + log.debug("Sleeping for: %s", delay) time.sleep(delay) def run_forever(self): @@ -91,7 +83,6 @@ def run_forever(self): except StopIteration as e: log.debug("Gen finished: %s", cb) continue - #self.q.append(c) self.call_later(delay, cb, *args) def run_until_complete(self, coro): @@ -109,6 +100,7 @@ def run_until_complete(self, coro): def close(self): pass + import select class EpollEventLoop(EventLoop): @@ -176,6 +168,9 @@ def __init__(self, op, obj): def get_event_loop(): return EpollEventLoop() +def coroutine(f): + return f + def async(coro): # We don't have Task bloat, so op is null return coro From a728368a0840805ff9fe79fee8de481068db812e Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sat, 3 May 2014 23:04:30 +0300 Subject: [PATCH 28/47] asyncio_micro: Rename from asyncio. As this is not compliant with asyncio API, can't be called asyncio, and "micro" is just good moniker for what it's intended to be. --- asyncio/asyncio.py => asyncio_micro/asyncio_micro.py | 0 {asyncio => asyncio_micro}/test_call_soon.py | 2 +- {asyncio => asyncio_micro}/test_http_client.py | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) rename asyncio/asyncio.py => asyncio_micro/asyncio_micro.py (100%) rename {asyncio => asyncio_micro}/test_call_soon.py (83%) rename {asyncio => asyncio_micro}/test_http_client.py (95%) diff --git a/asyncio/asyncio.py b/asyncio_micro/asyncio_micro.py similarity index 100% rename from asyncio/asyncio.py rename to asyncio_micro/asyncio_micro.py diff --git a/asyncio/test_call_soon.py b/asyncio_micro/test_call_soon.py similarity index 83% rename from asyncio/test_call_soon.py rename to asyncio_micro/test_call_soon.py index 8ab5cb879..42a169cfa 100644 --- a/asyncio/test_call_soon.py +++ b/asyncio_micro/test_call_soon.py @@ -1,4 +1,4 @@ -import asyncio +import asyncio_micro as asyncio import time diff --git a/asyncio/test_http_client.py b/asyncio_micro/test_http_client.py similarity index 95% rename from asyncio/test_http_client.py rename to asyncio_micro/test_http_client.py index 4cd292a0e..eb7a34c0e 100644 --- a/asyncio/test_http_client.py +++ b/asyncio_micro/test_http_client.py @@ -1,4 +1,4 @@ -import asyncio +import asyncio_micro as asyncio @asyncio.coroutine def print_http_headers(url): From 6f27ec1cd080454c6063850c82a9782190f8e69d Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sat, 3 May 2014 23:10:29 +0300 Subject: [PATCH 29/47] asyncio_micro: Work around stupid Python closures. Which don't close variables, just variable references. --- asyncio_micro/asyncio_micro.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/asyncio_micro/asyncio_micro.py b/asyncio_micro/asyncio_micro.py index 3652289db..a6d97248e 100644 --- a/asyncio_micro/asyncio_micro.py +++ b/asyncio_micro/asyncio_micro.py @@ -69,10 +69,10 @@ def run_forever(self): elif isinstance(ret, IORead): # self.add_reader(ret.obj.fileno(), lambda self, c, f: self.call_soon(c, f), self, cb, ret.obj) # self.add_reader(ret.obj.fileno(), lambda c, f: self.call_soon(c, f), cb, ret.obj) - self.add_reader(ret.obj.fileno(), lambda f: self.call_soon(cb, f), ret.obj) + self.add_reader(ret.obj.fileno(), lambda cb, f: self.call_soon(cb, f), cb, ret.obj) continue elif isinstance(ret, IOWrite): - self.add_writer(ret.obj.fileno(), lambda f: self.call_soon(cb, f), ret.obj) + self.add_writer(ret.obj.fileno(), lambda cb, f: self.call_soon(cb, f), cb, ret.obj) continue elif isinstance(ret, IODone): if ret.op == IO_READ: From 4e3964d3fff61936346e624f3840d2788beb9e62 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sat, 3 May 2014 23:13:35 +0300 Subject: [PATCH 30/47] asyncio_micro: StreamReader, StreamWriter: add more methods. --- asyncio_micro/asyncio_micro.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/asyncio_micro/asyncio_micro.py b/asyncio_micro/asyncio_micro.py index a6d97248e..c687aa37d 100644 --- a/asyncio_micro/asyncio_micro.py +++ b/asyncio_micro/asyncio_micro.py @@ -186,6 +186,13 @@ class StreamReader: def __init__(self, s): self.s = s + def read(self, n): + s = yield IORead(self.s) + res = self.s.read(n) + if not res: + yield IODone(IO_READ, self.s) + return res + def readline(self): log.debug("StreamReader.readline()") s = yield IORead(self.s) @@ -208,6 +215,10 @@ def write(self, buf): s = yield IOWrite(self.s) log.debug("StreamWriter.write(): returning") + def close(self): + yield IODone(IO_WRITE, self.s) + self.s.close() + def open_connection(host, port): log.debug("open_connection(%s, %s)", host, port) From 4218e7d504de5b6bcc986d414bcec0121a035c6b Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sat, 3 May 2014 23:15:38 +0300 Subject: [PATCH 31/47] asyncio_micro: Implement start_server(). --- asyncio_micro/asyncio_micro.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/asyncio_micro/asyncio_micro.py b/asyncio_micro/asyncio_micro.py index c687aa37d..62568a46b 100644 --- a/asyncio_micro/asyncio_micro.py +++ b/asyncio_micro/asyncio_micro.py @@ -235,3 +235,23 @@ def open_connection(host, port): s = yield IOWrite(s) log.debug("open_connection: After iowait: %s", s) return StreamReader(s), StreamWriter(s) + + +def start_server(client_coro, host, port): + log.debug("start_server(%s, %s)", host, port) + s = _socket.socket() + s.setblocking(False) + + ai = _socket.getaddrinfo(host, port) + addr = ai[0][4] + s.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1) + s.bind(addr) + s.listen(10) + while True: + log.debug("start_server: Before accept") + yield IORead(s) + log.debug("start_server: After iowait") + s2, client_addr = s.accept() + s2.setblocking(False) + log.debug("start_server: After accept: %s", s2) + yield client_coro(StreamReader(s2), StreamWriter(s2)) From 48671ce22edb4f34957ba1a3d27300d2ee1e2a73 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sat, 3 May 2014 23:17:10 +0300 Subject: [PATCH 32/47] asyncio_micro: IODone syscall should return to coroutine. --- asyncio_micro/asyncio_micro.py | 1 - 1 file changed, 1 deletion(-) diff --git a/asyncio_micro/asyncio_micro.py b/asyncio_micro/asyncio_micro.py index 62568a46b..5ae10ea47 100644 --- a/asyncio_micro/asyncio_micro.py +++ b/asyncio_micro/asyncio_micro.py @@ -79,7 +79,6 @@ def run_forever(self): self.remove_reader(ret.obj.fileno()) elif ret.op == IO_WRITE: self.remove_writer(ret.obj.fileno()) - continue except StopIteration as e: log.debug("Gen finished: %s", cb) continue From 875de441348eabd40e19194cf194a0ba7b712878 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sat, 3 May 2014 23:20:22 +0300 Subject: [PATCH 33/47] asyncio_micro: Add support for starting a coroutine concurrently. Just yield it as a value. Also, improve logging/error reporting. --- asyncio_micro/asyncio_micro.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/asyncio_micro/asyncio_micro.py b/asyncio_micro/asyncio_micro.py index 5ae10ea47..de182b225 100644 --- a/asyncio_micro/asyncio_micro.py +++ b/asyncio_micro/asyncio_micro.py @@ -10,6 +10,7 @@ IO_READ = 1 IO_WRITE = 2 +type_gen = type((lambda: (yield))()) class EventLoop: @@ -60,9 +61,9 @@ def run_forever(self): try: if args == (): args = (None,) - log.debug("Gen send args: %s", args) + log.debug("Gen %s send args: %s", cb, args) ret = cb.send(*args) - log.debug("Gen yield result: %s", ret) + log.debug("Gen %s yield result: %s", cb, ret) if isinstance(ret, SysCall): if isinstance(ret, Sleep): delay = ret.args[0] @@ -79,6 +80,11 @@ def run_forever(self): self.remove_reader(ret.obj.fileno()) elif ret.op == IO_WRITE: self.remove_writer(ret.obj.fileno()) + elif isinstance(ret, type_gen): + self.call_soon(ret) + else: + print(ret, type(ret)) + assert False except StopIteration as e: log.debug("Gen finished: %s", cb) continue From 27546eaf532bdf6f51d2651897891139367237f4 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Wed, 7 May 2014 01:57:38 +0300 Subject: [PATCH 34/47] asyncio_micro: Support readall semantics and handle non-blocking read() well. Non-blocking read()/write() may return None if there's no data, and that's not EOF. --- asyncio_micro/asyncio_micro.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/asyncio_micro/asyncio_micro.py b/asyncio_micro/asyncio_micro.py index de182b225..e8dcf6295 100644 --- a/asyncio_micro/asyncio_micro.py +++ b/asyncio_micro/asyncio_micro.py @@ -191,9 +191,13 @@ class StreamReader: def __init__(self, s): self.s = s - def read(self, n): + def read(self, n=-1): s = yield IORead(self.s) - res = self.s.read(n) + while True: + res = self.s.read(n) + if res is not None: + break + log.warn("Empty read") if not res: yield IODone(IO_READ, self.s) return res @@ -202,7 +206,11 @@ def readline(self): log.debug("StreamReader.readline()") s = yield IORead(self.s) log.debug("StreamReader.readline(): after IORead: %s", s) - res = self.s.readline() + while True: + res = self.s.readline() + if res is not None: + break + log.warn("Empty read") if not res: yield IODone(IO_READ, self.s) log.debug("StreamReader.readline(): res: %s", res) From 1d9f7856f68cb1b14d01d33bb63c1c7f2038f896 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Wed, 7 May 2014 02:12:29 +0300 Subject: [PATCH 35/47] asyncio_micro: Implement proper write() handling. TODO: Test! --- asyncio_micro/asyncio_micro.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/asyncio_micro/asyncio_micro.py b/asyncio_micro/asyncio_micro.py index e8dcf6295..f7ddb9214 100644 --- a/asyncio_micro/asyncio_micro.py +++ b/asyncio_micro/asyncio_micro.py @@ -223,10 +223,19 @@ def __init__(self, s): self.s = s def write(self, buf): - res = self.s.write(buf) - log.debug("StreamWriter.write(): %d", res) - s = yield IOWrite(self.s) - log.debug("StreamWriter.write(): returning") + sz = len(buf) + while True: + res = self.s.write(buf) + log.debug("StreamWriter.write(): %d", res) + # If we spooled everything, (just) return + if res == sz: + return + if res is None: + res = 0 + buf = buf[res:] + sz -= res + s = yield IOWrite(self.s) + log.debug("StreamWriter.write(): can write more") def close(self): yield IODone(IO_WRITE, self.s) From fe23bdfa18f5c1e58845ee21b04a261fee53b6d8 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Wed, 7 May 2014 02:14:28 +0300 Subject: [PATCH 36/47] asyncio_micro: Add basic HTTP server example. --- asyncio_micro/test_http_server.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 asyncio_micro/test_http_server.py diff --git a/asyncio_micro/test_http_server.py b/asyncio_micro/test_http_server.py new file mode 100644 index 000000000..5b8271d6f --- /dev/null +++ b/asyncio_micro/test_http_server.py @@ -0,0 +1,21 @@ +import asyncio_micro as asyncio + +@asyncio.coroutine +def serve(reader, writer): + print(reader, writer) + print("================") + print((yield from reader.read())) + yield from writer.write("HTTP/1.0 200 OK\r\n\r\nHello.\r\n") + print("After response write") + yield from writer.close() + print("Finished processing request") + + +import logging +#logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.DEBUG) +loop = asyncio.get_event_loop() +mem_info() +loop.call_soon(asyncio.start_server(serve, "127.0.0.1", 8081)) +loop.run_forever() +loop.close() From ae4fa7f93a0ea527b8d3a8eee052d929ff511dd4 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sun, 1 Jun 2014 00:41:45 +0300 Subject: [PATCH 37/47] asyncio_micro: Rename StreamWriter.write() to awrite(). This method has different semantics than original asyncio, so rename to avoid confusion. Original asyncio's is not a coroutine, while ours is. --- asyncio_micro/asyncio_micro.py | 14 ++++++++++---- asyncio_micro/test_http_client.py | 2 +- asyncio_micro/test_http_server.py | 2 +- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/asyncio_micro/asyncio_micro.py b/asyncio_micro/asyncio_micro.py index f7ddb9214..a46b3d251 100644 --- a/asyncio_micro/asyncio_micro.py +++ b/asyncio_micro/asyncio_micro.py @@ -222,20 +222,26 @@ class StreamWriter: def __init__(self, s): self.s = s - def write(self, buf): + def awrite(self, buf): + # This method is called awrite (async write) to not proliferate + # incompatibility with original asyncio. Unlike original asyncio + # whose .write() method is both not a coroutine and guaranteed + # to return immediately (which means it has to buffer all the + # data), this method is a coroutine. sz = len(buf) while True: res = self.s.write(buf) - log.debug("StreamWriter.write(): %d", res) - # If we spooled everything, (just) return + # If we spooled everything, return immediately if res == sz: + log.debug("StreamWriter.awrite(): completed spooling %d bytes", res) return if res is None: res = 0 + log.debug("StreamWriter.awrite(): spooled partial %d bytes", res) buf = buf[res:] sz -= res s = yield IOWrite(self.s) - log.debug("StreamWriter.write(): can write more") + log.debug("StreamWriter.awrite(): can write more") def close(self): yield IODone(IO_WRITE, self.s) diff --git a/asyncio_micro/test_http_client.py b/asyncio_micro/test_http_client.py index eb7a34c0e..e83cf8598 100644 --- a/asyncio_micro/test_http_client.py +++ b/asyncio_micro/test_http_client.py @@ -6,7 +6,7 @@ def print_http_headers(url): print(reader, writer) print("================") query = "GET / HTTP/1.0\r\n\r\n" - yield from writer.write(query.encode('latin-1')) + yield from writer.awrite(query.encode('latin-1')) while True: line = yield from reader.readline() if not line: diff --git a/asyncio_micro/test_http_server.py b/asyncio_micro/test_http_server.py index 5b8271d6f..c2210767d 100644 --- a/asyncio_micro/test_http_server.py +++ b/asyncio_micro/test_http_server.py @@ -5,7 +5,7 @@ def serve(reader, writer): print(reader, writer) print("================") print((yield from reader.read())) - yield from writer.write("HTTP/1.0 200 OK\r\n\r\nHello.\r\n") + yield from writer.awrite("HTTP/1.0 200 OK\r\n\r\nHello.\r\n") print("After response write") yield from writer.close() print("Finished processing request") From 26d76576e46ec28f6b7cc148b108780cf274ac25 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Mon, 2 Jun 2014 00:35:18 +0300 Subject: [PATCH 38/47] asyncio_micro: Support just plain "yield" for cooperative control yield. --- asyncio_micro/asyncio_micro.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/asyncio_micro/asyncio_micro.py b/asyncio_micro/asyncio_micro.py index a46b3d251..46fc6058e 100644 --- a/asyncio_micro/asyncio_micro.py +++ b/asyncio_micro/asyncio_micro.py @@ -82,6 +82,9 @@ def run_forever(self): self.remove_writer(ret.obj.fileno()) elif isinstance(ret, type_gen): self.call_soon(ret) + elif ret is None: + # Just reschedule + pass else: print(ret, type(ret)) assert False From 6d8cfcde46c78ea04c33e8ebf0555ddcf7ea5aaa Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Mon, 2 Jun 2014 00:43:03 +0300 Subject: [PATCH 39/47] asyncio_micro: Clean up logging. --- asyncio_micro/asyncio_micro.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/asyncio_micro/asyncio_micro.py b/asyncio_micro/asyncio_micro.py index 46fc6058e..733e679f8 100644 --- a/asyncio_micro/asyncio_micro.py +++ b/asyncio_micro/asyncio_micro.py @@ -44,7 +44,7 @@ def run_forever(self): while True: if self.q: t, cnt, cb, args = heapq.heappop(self.q) - log.debug("Next task to run: %s", (t, cnt, cb, args)) + log.debug("Next coroutine to run: %s", (t, cnt, cb, args)) # __main__.mem_info() tnow = self.time() delay = t - tnow @@ -61,9 +61,9 @@ def run_forever(self): try: if args == (): args = (None,) - log.debug("Gen %s send args: %s", cb, args) + log.debug("Coroutine %s send args: %s", cb, args) ret = cb.send(*args) - log.debug("Gen %s yield result: %s", cb, ret) + log.debug("Coroutine %s yield result: %s", cb, ret) if isinstance(ret, SysCall): if isinstance(ret, Sleep): delay = ret.args[0] @@ -86,10 +86,9 @@ def run_forever(self): # Just reschedule pass else: - print(ret, type(ret)) - assert False + assert False, "Unsupported coroutine yield value: %r (of type %r)" % (ret, type(ret)) except StopIteration as e: - log.debug("Gen finished: %s", cb) + log.debug("Coroutine finished: %s", cb) continue self.call_later(delay, cb, *args) From 123b8cc1eeda80930418cd302e29a79deea1b9e4 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Mon, 2 Jun 2014 01:01:28 +0300 Subject: [PATCH 40/47] asyncio_micro: Move handle() method to SysCall base class. --- asyncio_micro/asyncio_micro.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/asyncio_micro/asyncio_micro.py b/asyncio_micro/asyncio_micro.py index 733e679f8..c07e40a84 100644 --- a/asyncio_micro/asyncio_micro.py +++ b/asyncio_micro/asyncio_micro.py @@ -150,10 +150,11 @@ def __init__(self, call, *args): self.call = call self.args = args -class Sleep(SysCall): - def handle(self): - time.sleep(self.args[0]) + raise NotImplementedError + +class Sleep(SysCall): + pass class IORead(SysCall): From eecd6c755c2293b7cf2402cd40ac464fbe2654d8 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Mon, 2 Jun 2014 01:23:16 +0300 Subject: [PATCH 41/47] asyncio_micro: Optimize size of SysCall objects. We have to have type header for any object, so use type to "store" information about syscall type (my initial idea was to have single syscall class and dispatch on its attribute, that would save memory on having bunch of classes, but would increase size of each syscall object). --- asyncio_micro/asyncio_micro.py | 46 ++++++++++++++-------------------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/asyncio_micro/asyncio_micro.py b/asyncio_micro/asyncio_micro.py index c07e40a84..48a27dd07 100644 --- a/asyncio_micro/asyncio_micro.py +++ b/asyncio_micro/asyncio_micro.py @@ -7,9 +7,6 @@ log = logging.getLogger("asyncio") -IO_READ = 1 -IO_WRITE = 2 - type_gen = type((lambda: (yield))()) class EventLoop: @@ -65,21 +62,21 @@ def run_forever(self): ret = cb.send(*args) log.debug("Coroutine %s yield result: %s", cb, ret) if isinstance(ret, SysCall): + arg = ret.args[0] if isinstance(ret, Sleep): - delay = ret.args[0] + delay = arg elif isinstance(ret, IORead): # self.add_reader(ret.obj.fileno(), lambda self, c, f: self.call_soon(c, f), self, cb, ret.obj) # self.add_reader(ret.obj.fileno(), lambda c, f: self.call_soon(c, f), cb, ret.obj) - self.add_reader(ret.obj.fileno(), lambda cb, f: self.call_soon(cb, f), cb, ret.obj) + self.add_reader(arg.fileno(), lambda cb, f: self.call_soon(cb, f), cb, arg) continue elif isinstance(ret, IOWrite): - self.add_writer(ret.obj.fileno(), lambda cb, f: self.call_soon(cb, f), cb, ret.obj) + self.add_writer(arg.fileno(), lambda cb, f: self.call_soon(cb, f), cb, arg) continue - elif isinstance(ret, IODone): - if ret.op == IO_READ: - self.remove_reader(ret.obj.fileno()) - elif ret.op == IO_WRITE: - self.remove_writer(ret.obj.fileno()) + elif isinstance(ret, IOReadDone): + self.remove_reader(arg.fileno()) + elif isinstance(ret, IOWriteDone): + self.remove_writer(arg.fileno()) elif isinstance(ret, type_gen): self.call_soon(ret) elif ret is None: @@ -146,8 +143,7 @@ def wait(self, delay): class SysCall: - def __init__(self, call, *args): - self.call = call + def __init__(self, *args): self.args = args def handle(self): @@ -157,20 +153,16 @@ class Sleep(SysCall): pass class IORead(SysCall): - - def __init__(self, obj): - self.obj = obj + pass class IOWrite(SysCall): + pass - def __init__(self, obj): - self.obj = obj - -class IODone(SysCall): +class IOReadDone(SysCall): + pass - def __init__(self, op, obj): - self.op = op - self.obj = obj +class IOWriteDone(SysCall): + pass def get_event_loop(): @@ -184,7 +176,7 @@ def async(coro): return coro def sleep(secs): - yield Sleep("sleep", secs) + yield Sleep(secs) import microsocket as _socket @@ -202,7 +194,7 @@ def read(self, n=-1): break log.warn("Empty read") if not res: - yield IODone(IO_READ, self.s) + yield IOReadDone(self.s) return res def readline(self): @@ -215,7 +207,7 @@ def readline(self): break log.warn("Empty read") if not res: - yield IODone(IO_READ, self.s) + yield IOReadDone(self.s) log.debug("StreamReader.readline(): res: %s", res) return res @@ -247,7 +239,7 @@ def awrite(self, buf): log.debug("StreamWriter.awrite(): can write more") def close(self): - yield IODone(IO_WRITE, self.s) + yield IOWriteDone(self.s) self.s.close() From 889c54658790e9c915f1e2276c94932ab36e6f53 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Mon, 21 Jul 2014 00:17:12 +0300 Subject: [PATCH 42/47] asyncio_micro: Add metadata. --- asyncio_micro/metadata.txt | 6 ++++++ asyncio_micro/setup.py | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+) create mode 100644 asyncio_micro/metadata.txt create mode 100644 asyncio_micro/setup.py diff --git a/asyncio_micro/metadata.txt b/asyncio_micro/metadata.txt new file mode 100644 index 000000000..bc3d83e4f --- /dev/null +++ b/asyncio_micro/metadata.txt @@ -0,0 +1,6 @@ +srctype = micropython-lib +type = module +version = 0.5 +author = Paul Sokolovsky +long_desc = Lightweight asyncio-like library built around native Python coroutines, not around un-Python devices like callback mess. +depends = heapq, errno, select, logging diff --git a/asyncio_micro/setup.py b/asyncio_micro/setup.py new file mode 100644 index 000000000..f0e017f28 --- /dev/null +++ b/asyncio_micro/setup.py @@ -0,0 +1,19 @@ +import sys +# Remove current dir from sys.path, otherwise setuptools will peek up our +# module instead of system. +sys.path.pop(0) +from setuptools import setup + + +setup(name='micropython-asyncio_micro', + version='0.5', + description='asyncio_micro module for MicroPython', + long_description='Lightweight asyncio-like library built around native Python coroutines, not around un-Python devices like callback mess.', + url='https://github.com/micropython/micropython/issues/405', + author='Paul Sokolovsky', + author_email='micro-python@googlegroups.com', + maintainer='MicroPython Developers', + maintainer_email='micro-python@googlegroups.com', + license='MIT', + py_modules=['asyncio_micro'], + install_requires=['micropython-heapq', 'micropython-errno', 'micropython-select', 'micropython-logging']) From e6ca3a7fa8ba06b5503fadf6341ec32b5e7501cd Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Thu, 24 Jul 2014 18:47:16 +0300 Subject: [PATCH 43/47] asyncio_micro: awrite(): More logging and checks. --- asyncio_micro/asyncio_micro.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/asyncio_micro/asyncio_micro.py b/asyncio_micro/asyncio_micro.py index 48a27dd07..cd7a00ec8 100644 --- a/asyncio_micro/asyncio_micro.py +++ b/asyncio_micro/asyncio_micro.py @@ -224,6 +224,7 @@ def awrite(self, buf): # to return immediately (which means it has to buffer all the # data), this method is a coroutine. sz = len(buf) + log.debug("StreamWriter.awrite(): spooling %d bytes", sz) while True: res = self.s.write(buf) # If we spooled everything, return immediately @@ -233,6 +234,7 @@ def awrite(self, buf): if res is None: res = 0 log.debug("StreamWriter.awrite(): spooled partial %d bytes", res) + assert res < sz buf = buf[res:] sz -= res s = yield IOWrite(self.s) From a950623f55fa1d3dd8ea471a050be8f78b1bf1a4 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Wed, 27 Aug 2014 03:13:38 +0300 Subject: [PATCH 44/47] uasyncio: Rename asyncio_micro to uasyncio. --- {asyncio_micro => uasyncio}/metadata.txt | 2 +- {asyncio_micro => uasyncio}/setup.py | 8 ++++---- {asyncio_micro => uasyncio}/test_call_soon.py | 2 +- {asyncio_micro => uasyncio}/test_http_client.py | 2 +- {asyncio_micro => uasyncio}/test_http_server.py | 2 +- asyncio_micro/asyncio_micro.py => uasyncio/uasyncio.py | 0 6 files changed, 8 insertions(+), 8 deletions(-) rename {asyncio_micro => uasyncio}/metadata.txt (94%) rename {asyncio_micro => uasyncio}/setup.py (81%) rename {asyncio_micro => uasyncio}/test_call_soon.py (83%) rename {asyncio_micro => uasyncio}/test_http_client.py (95%) rename {asyncio_micro => uasyncio}/test_http_server.py (94%) rename asyncio_micro/asyncio_micro.py => uasyncio/uasyncio.py (100%) diff --git a/asyncio_micro/metadata.txt b/uasyncio/metadata.txt similarity index 94% rename from asyncio_micro/metadata.txt rename to uasyncio/metadata.txt index bc3d83e4f..bc22759b7 100644 --- a/asyncio_micro/metadata.txt +++ b/uasyncio/metadata.txt @@ -1,6 +1,6 @@ srctype = micropython-lib type = module -version = 0.5 +version = 0.6 author = Paul Sokolovsky long_desc = Lightweight asyncio-like library built around native Python coroutines, not around un-Python devices like callback mess. depends = heapq, errno, select, logging diff --git a/asyncio_micro/setup.py b/uasyncio/setup.py similarity index 81% rename from asyncio_micro/setup.py rename to uasyncio/setup.py index f0e017f28..2bf389e23 100644 --- a/asyncio_micro/setup.py +++ b/uasyncio/setup.py @@ -5,9 +5,9 @@ from setuptools import setup -setup(name='micropython-asyncio_micro', - version='0.5', - description='asyncio_micro module for MicroPython', +setup(name='micropython-uasyncio', + version='0.6', + description='uasyncio module for MicroPython', long_description='Lightweight asyncio-like library built around native Python coroutines, not around un-Python devices like callback mess.', url='https://github.com/micropython/micropython/issues/405', author='Paul Sokolovsky', @@ -15,5 +15,5 @@ maintainer='MicroPython Developers', maintainer_email='micro-python@googlegroups.com', license='MIT', - py_modules=['asyncio_micro'], + py_modules=['uasyncio'], install_requires=['micropython-heapq', 'micropython-errno', 'micropython-select', 'micropython-logging']) diff --git a/asyncio_micro/test_call_soon.py b/uasyncio/test_call_soon.py similarity index 83% rename from asyncio_micro/test_call_soon.py rename to uasyncio/test_call_soon.py index 42a169cfa..99ccfefbc 100644 --- a/asyncio_micro/test_call_soon.py +++ b/uasyncio/test_call_soon.py @@ -1,4 +1,4 @@ -import asyncio_micro as asyncio +import uasyncio as asyncio import time diff --git a/asyncio_micro/test_http_client.py b/uasyncio/test_http_client.py similarity index 95% rename from asyncio_micro/test_http_client.py rename to uasyncio/test_http_client.py index e83cf8598..0b4ff83b7 100644 --- a/asyncio_micro/test_http_client.py +++ b/uasyncio/test_http_client.py @@ -1,4 +1,4 @@ -import asyncio_micro as asyncio +import uasyncio as asyncio @asyncio.coroutine def print_http_headers(url): diff --git a/asyncio_micro/test_http_server.py b/uasyncio/test_http_server.py similarity index 94% rename from asyncio_micro/test_http_server.py rename to uasyncio/test_http_server.py index c2210767d..51654d1d2 100644 --- a/asyncio_micro/test_http_server.py +++ b/uasyncio/test_http_server.py @@ -1,4 +1,4 @@ -import asyncio_micro as asyncio +import uasyncio as asyncio @asyncio.coroutine def serve(reader, writer): diff --git a/asyncio_micro/asyncio_micro.py b/uasyncio/uasyncio.py similarity index 100% rename from asyncio_micro/asyncio_micro.py rename to uasyncio/uasyncio.py From a64c7cbd577a5bf100881814d86f9dba18a70c82 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sat, 11 Oct 2014 05:20:55 +0300 Subject: [PATCH 45/47] uasyncio: Update for rename microsocket -> usocket. --- uasyncio/metadata.txt | 2 +- uasyncio/setup.py | 2 +- uasyncio/uasyncio.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/uasyncio/metadata.txt b/uasyncio/metadata.txt index bc22759b7..d0901ec75 100644 --- a/uasyncio/metadata.txt +++ b/uasyncio/metadata.txt @@ -1,6 +1,6 @@ srctype = micropython-lib type = module -version = 0.6 +version = 0.6.1 author = Paul Sokolovsky long_desc = Lightweight asyncio-like library built around native Python coroutines, not around un-Python devices like callback mess. depends = heapq, errno, select, logging diff --git a/uasyncio/setup.py b/uasyncio/setup.py index 2bf389e23..0e56c7549 100644 --- a/uasyncio/setup.py +++ b/uasyncio/setup.py @@ -6,7 +6,7 @@ setup(name='micropython-uasyncio', - version='0.6', + version='0.6.1', description='uasyncio module for MicroPython', long_description='Lightweight asyncio-like library built around native Python coroutines, not around un-Python devices like callback mess.', url='https://github.com/micropython/micropython/issues/405', diff --git a/uasyncio/uasyncio.py b/uasyncio/uasyncio.py index cd7a00ec8..e3befa8cb 100644 --- a/uasyncio/uasyncio.py +++ b/uasyncio/uasyncio.py @@ -179,7 +179,7 @@ def sleep(secs): yield Sleep(secs) -import microsocket as _socket +import usocket as _socket class StreamReader: From 67b713535b2e06812805d6e2e302f8ecfb4f212f Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sat, 18 Oct 2014 04:57:27 +0300 Subject: [PATCH 46/47] uasyncio: Implement run_until_complete(). --- uasyncio/uasyncio.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/uasyncio/uasyncio.py b/uasyncio/uasyncio.py index e3befa8cb..858fe4175 100644 --- a/uasyncio/uasyncio.py +++ b/uasyncio/uasyncio.py @@ -77,6 +77,8 @@ def run_forever(self): self.remove_reader(arg.fileno()) elif isinstance(ret, IOWriteDone): self.remove_writer(arg.fileno()) + elif isinstance(ret, StopLoop): + return arg elif isinstance(ret, type_gen): self.call_soon(ret) elif ret is None: @@ -90,16 +92,11 @@ def run_forever(self): self.call_later(delay, cb, *args) def run_until_complete(self, coro): - val = None - while True: - try: - ret = coro.send(val) - except StopIteration as e: - print(e) - break - print("ret:", ret) - if isinstance(ret, SysCall): - ret.handle() + def _run_and_stop(): + yield from coro + yield StopLoop(0) + self.call_soon(_run_and_stop()) + self.run_forever() def close(self): pass @@ -152,6 +149,9 @@ def handle(self): class Sleep(SysCall): pass +class StopLoop(SysCall): + pass + class IORead(SysCall): pass From 2a5d6b811857c8c987b406b1c62390972ce1ae5e Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sat, 18 Oct 2014 04:57:51 +0300 Subject: [PATCH 47/47] uasyncio: Add __repr__() for StreamReader/StreamWriter. --- uasyncio/uasyncio.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/uasyncio/uasyncio.py b/uasyncio/uasyncio.py index 858fe4175..f34123e65 100644 --- a/uasyncio/uasyncio.py +++ b/uasyncio/uasyncio.py @@ -211,6 +211,9 @@ def readline(self): log.debug("StreamReader.readline(): res: %s", res) return res + def __repr__(self): + return "" % self.s + class StreamWriter: @@ -244,6 +247,9 @@ def close(self): yield IOWriteDone(self.s) self.s.close() + def __repr__(self): + return "" % self.s + def open_connection(host, port): log.debug("open_connection(%s, %s)", host, port)