版本信息
python3.7
tornado==4.3.0
問題描述: 多次下載同樣的文件浦旱,每次文件的hash均不相同.
下載文件的示例接口:
import tornado
from tornado.concurrent import futures
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler
@run_on_executor
@tornado.web.asynchronous
class XXX_Handler(RequestHandler):
executor = futures.ThreadPoolExecutor()
def get():
data = open("/data/xxx.csv",encoding="utf-8")
chunk = data.read(65536)
while chunk:
self.write(chunk)
self.flush()
chunk = data.read(65536)
data.close()
self.set_status(200)
return self.finish()
推測(cè)可能與線程池有關(guān),果真去掉@run_on_executor則正常.
好奇心驅(qū)使進(jìn)行具體原因調(diào)查,后面涉及到Tornado相關(guān)源碼的查看與記錄露该。
- 通過
RequestHandler
的flush
方法 檢查到self.stream.write 中的 data每次的hash都是相同译暂,繼續(xù)向下調(diào)查。
else:
if callback is not None:
self._write_callback = stack_context.wrap(callback)
else:
future = self._write_future = Future()
data = b"\r\n".join(lines) + b"\r\n\r\n"
if chunk:
data += self._format_chunk(chunk)
self._pending_write = self.stream.write(data)
self._pending_write.add_done_callback(self._on_write_complete)
return future
- 查看self.stream如何產(chǎn)生的, 這里就看到TCP server的class蔫耽,來監(jiān)聽socket連接的請(qǐng)求结耀,使用sock的文件描述記錄下載, 分配一個(gè)handler來處理, 添加一個(gè)讀事件(事件都是主線程來處理), 然后獲取connection連接, 實(shí)例化IOStream 來處理消息的接收和響應(yīng)。
stream = IOStream(connection, io_loop=self.io_loop,
max_buffer_size=self.max_buffer_size,
read_chunk_size=self.read_chunk_size)
future = self.handle_stream(stream, address)
if future is not None:
self.io_loop.add_future(future, lambda f: f.result())
except Exception:
app_log.error("Error in connection callback", exc_info=True)
3.知道通過IOstream來傳送數(shù)據(jù)匙铡,查看它的write方法.主要就是按照大小將上層傳過來的數(shù)據(jù)切分到指定大小图甜,
(1).通過self._handle_write來發(fā)送 self._write_buffer保存的chunk數(shù)據(jù)
if not self._connecting:
self._handle_write()
if self._write_buffer:
self._add_io_state(self.io_loop.WRITE)
self._maybe_add_error_listener()
return future
(2). 查看self._handle_write, self._handle_write是ThreadPoolExecutor的線程池來處理的,發(fā)生(Resource temporarily unavailable)錯(cuò)誤失敗, 會(huì)返回到上面的函數(shù)中添加寫事件來處理(主線程) self._add_io_state(self.io_loop.WRITE)
鳖眼。
如果發(fā)送錯(cuò)誤比較多黑毅,這就導(dǎo)致出現(xiàn)多線程都在寫self._write_buffer
的問題, 會(huì)出現(xiàn)順序錯(cuò)誤或者重復(fù)數(shù)據(jù)的問題從而導(dǎo)致hash結(jié)果不同。
except (socket.error, IOError, OSError) as e:
if e.args[0] in _ERRNO_WOULDBLOCK:
self._write_buffer_frozen = True
break
else:
if not self._is_connreset(e):
# Broken pipe errors are usually caused by connection
# reset, and its better to not log EPIPE errors to
# minimize log spam
gen_log.warning("Write error on %s: %s",
self.fileno(), e)
self.close(exc_info=True)
return
測(cè)試使用硬核的方法钦讳,失敗后去除掉 主線程重試的寫事件矿瘦,而是交給當(dāng)前線程一直重試。蜂厅。匪凡。這個(gè)測(cè)試可以獲得正確的hash,不過最好是去掉@run_on_executor
的使用.