Python multiprocessing safely writing to a file(Python多处理安全地写入文件)
问题描述
我正在尝试解决一个涉及大量子问题的大数值问题,并且我正在使用 Python 的多处理模块(特别是 Pool.map)将不同的独立子问题拆分到不同的核心上.每个子问题都涉及计算大量子子问题,如果尚未由任何进程计算它们,我试图通过将它们存储到文件中来有效地记忆这些结果,否则跳过计算并从文件中读取结果.
I am trying to solve a big numerical problem which involves lots of subproblems, and I'm using Python's multiprocessing module (specifically Pool.map) to split up different independent subproblems onto different cores. Each subproblem involves computing lots of sub-subproblems, and I'm trying to effectively memoize these results by storing them to a file if they have not been computed by any process yet, otherwise skip the computation and just read the results from the file.
我遇到了文件的并发问题:不同的进程有时会检查是否已经计算了子子问题(通过查找将存储结果的文件),看到它没有,运行计算,然后尝试将结果同时写入同一个文件.如何避免写这样的冲突?
I'm having concurrency issues with the files: different processes sometimes check to see if a sub-subproblem has been computed yet (by looking for the file where the results would be stored), see that it hasn't, run the computation, then try to write the results to the same file at the same time. How do I avoid writing collisions like this?
推荐答案
@GP89 提到了一个很好的解决方案.使用队列将写入任务发送到对文件具有唯一写入权限的专用进程.所有其他工作人员都具有只读访问权限.这将消除碰撞.这是一个使用 apply_async 的示例,但它也适用于 map:
@GP89 mentioned a good solution. Use a queue to send the writing tasks to a dedicated process that has sole write access to the file. All the other workers have read only access. This will eliminate collisions. Here is an example that uses apply_async, but it will work with map too:
import multiprocessing as mp
import time
fn = 'c:/temp/temp.txt'
def worker(arg, q):
'''stupidly simulates long running process'''
start = time.clock()
s = 'this is a test'
txt = s
for i in range(200000):
txt += s
done = time.clock() - start
with open(fn, 'rb') as f:
size = len(f.read())
res = 'Process' + str(arg), str(size), done
q.put(res)
return res
def listener(q):
'''listens for messages on the q, writes to file. '''
with open(fn, 'w') as f:
while 1:
m = q.get()
if m == 'kill':
f.write('killed')
break
f.write(str(m) + '
')
f.flush()
def main():
#must use Manager queue here, or will not work
manager = mp.Manager()
q = manager.Queue()
pool = mp.Pool(mp.cpu_count() + 2)
#put listener to work first
watcher = pool.apply_async(listener, (q,))
#fire off workers
jobs = []
for i in range(80):
job = pool.apply_async(worker, (i, q))
jobs.append(job)
# collect results from the workers through the pool result queue
for job in jobs:
job.get()
#now we are done, kill the listener
q.put('kill')
pool.close()
pool.join()
if __name__ == "__main__":
main()
这篇关于Python多处理安全地写入文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:Python多处理安全地写入文件
基础教程推荐
- 与常规 dict 相比,Python manager.dict() 非常慢 2022-01-01
- pyserial - 可以从线程 a 写入串行端口,是否阻塞从线程 b 读取? 2022-01-01
- 尝试制作WhatsApp机器人 2022-01-01
- 将 x 轴刻度更改为自定义字符串 2022-01-01
- Discord.py 缺少必需的参数 2022-01-01
- 由Python将MP3转换为MIDI(类型错误:无法加载插件:mtg-Melodia:Melodia) 2022-01-01
- 用 Python 编写 Fortran 无格式文件 2022-01-01
- 使用生成器和迭代器时 Python 多循环失败 2022-01-01
- numpy float:比算术运算中内置的慢 10 倍? 2022-01-01
- 在 Celery 工作人员中捕获 Heroku SIGTERM 以优雅地关 2022-01-01
