前言:
做实验流体力学的同学应该都接触过PIV实验,这应该是最经典的流动参数测量方法,简单来说,它的原理是在流场中投放示踪粒子来显示流动,并用相机记录一定时间间隔的两帧粒子图像,通过互相关或者光流算法计算出流场的速度分布,是一种瞬时、非接触式的全场测量手段,因此得到了广泛应用。
目前的商用PIV软件已经在测量以及数据处理方面有很强大的功能。虽然功能强大,但实际中很多时候用不上那么多功能,通常只需要简单的计算出速度场,并且软件运行期间中间会产生很多中间量,使数据量明显增大,此外,用于实验采集的计算机性能通常一般,如果用其他高性能计算机处理数据就需要安装相应软件,还需要加密狗,比较麻烦。
如果能够将PIV软件拍摄的原始图片直接读取出来并且转换成常用的格式,用开源PIV后处理程序PIVLab处理,进行速度场的求解,那么做完实验只需要将Project拷贝出来,岂不是很香?
因此,本文针对丹迪PIV软件dynamic studio记录的原始数据,使用Python程序将.image文件转换成.jpeg格式,方便进行后续的处理。并且支持多核并行处理,速度嘎嘎快!
操作方法:
首先将dynamic studio记录的原始数据(即某个Project)全部拷贝出来,打开一级一级的文件夹,找到相应试验工况下的data文件夹。
下面就是软件记录的原始图像文件,即.image文件,这种文件一般的软件是无法读取的,但是可以按二进制打开。它包含一些相机和软件参数,以及第一帧和第二帧图像数据,图像是12位深的(这个取决于相机型号),也就是说每个像素点处的亮度信息从黑到白分为 份。
![](https://img-blog.csdnimg.cn/img_convert/c205e3174ff9494d1cd8a667ec80d982.png)
我将其中11个.image文件拷贝出来,用于操作演示。
代码放在文章结尾,有两种版本,一个基础版(Imagefile4_base.py),针对单个图像处理,需要处理多个图像的话需要自己加上循环语句,网上教程很多。另一个是并行处理版本(Imagefile4_par.py),速度快,用命令提示符cmd运行或者Anaconda Prompt运行。
![](https://img-blog.csdnimg.cn/img_convert/fc61016d2c24d4fee4f6a98f81e9cb8d.png)
1.基础版程序:
用Spyder或者Python编辑器打开文章结尾给的.py文件,或者复制代码到一个新的py文件,在主函数部分输入图像的像素尺寸,要处理的image文件名,以及输入和输出路径,点击运行即可,程序会在输入路径下创建Image文件夹,并把生成的jpeg图像存在其中,一个.image文件会生成两帧jpeg,后缀分别为_1和_2。
![](https://img-blog.csdnimg.cn/img_convert/ef1a42d48ffe6038c77c557a803d4f40.png)
![](https://img-blog.csdnimg.cn/img_convert/81336999f1f7c6345d2bb137ff6a2975.png)
需要注意的是,原始图像是12位深,经过处理会转换为8位位深,亮度超过255(即 )的部分均会按照255处理,因此会丢失部分图像信息,但是对于分析来说影响不大。
2.并行版程序:
同样处理这11个文件,打开命令提示符cmd或者Anaconda Prompt,切换路径至程序(.py)文件所在路径,运行批处理命令,命令包括如下参数,即输入输出路径、图片名称(默认为“image#”)、起始和终止序号、以及图像的像素尺寸,help中有参数说明。
![](https://img-blog.csdnimg.cn/img_convert/134d63bfc41d923759859b12dda559c5.png)
这里的并行处理方法不用修改.py文件任何内容,只需要在命令提示符窗口输入相应参数,运行方法如下图:
![](https://img-blog.csdnimg.cn/img_convert/740d9cf87ad24952e0daffc5eb32aada.png)
打开命令提示符或Anaconda Prompt后,首先切换盘符,假如程序文件(.py)在E盘下,则输入“E:”就会切换到E盘,然后用“cd”命令结合”Tab“键一步一步切换到程序文件(.py)所在的文件夹下,然后输入批处理代码,回车运行,运行时可以监控CPU状态。
python Imagefile4_par.py C:\Users\HWW\Desktop\data C:\Users\HWW\Desktop\data\Images -Imagename image# -partheader 0 -parttailer 10 -pixle_x 2048 -pixle_y 1548
这里需要注意的是,在命令提示符窗口内要切换到程序文件所在的路径下,这样系统才会知道要运行哪个程序,而不是切换到数据所在路径。
![](https://img-blog.csdnimg.cn/img_convert/a66d5c6a8d1926e6c078a33fe5890195.png)
![](https://img-blog.csdnimg.cn/img_convert/3d41f550c6b6c24449ba7cf302d374f9.png)
除了直接在cmd窗口中输入命令,也可以将多个工况的处理命令写进.bat文件,然后运行该文件,就可以一次处理多个工况的数据!
![](https://img-blog.csdnimg.cn/img_convert/5c61f64c814fb462b22dd8ce0a69d490.png)
![](https://img-blog.csdnimg.cn/img_convert/75c397d8eb0a4075f6247415fa414c97.png)
目前存在的问题:
1.目前程序无法自动判断图像尺寸,二进制文件中关于像素部分的参数没有正确读取出来,因此需要手动输入,如果像素尺寸不正确,输出结果就有问题。
2.原始图像为12位深,而输出图像变为8位深,丢失了部分细节信息,尤其在亮度过高的部分。
最后,程序在这里
在我上传的资源中可以找到,名称为“PIV图片读取”,里面还有例示文件。也可以从下面复制:
1.基础版程序:
import os
import numpy as np
from PIL import Image
def Image2Jpeg(pathin,pathout,imagename):
#判断输出路径是否存在
isExists=os.path.exists(pathout)
if not isExists:
os.mkdir(pathout)
#以二进制格式读取文件
f = open(file=pathin,mode='rb')
data = f.read()
#空过文件前a行
a=3181
#创建两个数组分别存储两帧图像数据
Im_1=np.zeros((pixle_y,pixle_x))
Im_2=np.zeros((pixle_y,pixle_x))
for j in range(pixle_y):
for i in range(pixle_x):
index1 = a+i*pixle_y*2+j*2
Im_1[pixle_y-j-1][i]=int(str(bin(data[index1+1]))+str(bin(data[index1])[2:].zfill(8)),2)
for j in range(pixle_y):
for i in range(pixle_x):
index2 = a+pixle_x*pixle_y*2+i*pixle_y*2+j*2
Im_2[pixle_y-j-1][i]=int(str(bin(data[index2+1]))+str(bin(data[index2])[2:].zfill(8)),2)
#将矩阵按照图片格式写出
image = Image.fromarray(Im_1)
if image.mode == "F":
image = image.convert('RGB')
image.save(os.path.join(pathout,imagename+'_1.jpeg'))
image = Image.fromarray(Im_2)
if image.mode == "F":
image = image.convert('RGB')
image.save(os.path.join(pathout,imagename+'_2.jpeg'))
f.close()
return 0
if __name__=="__main__":
#图像的像素大小
pixle_x = 2048
pixle_y = 1548
#image文件的文件名,确定输入和出路径
imagename = 'image#1'
pathin = os.path.join(r'C:\Users\HWW\Desktop\data', imagename +'.image')
pathout = r'C:\Users\HWW\Desktop\data\Image'
Image2Jpeg(pathin,pathout,imagename)
并行版程序:
import os
import numpy as np
import time
from multiprocessing import Pool
import argparse
from PIL import Image
parser = argparse.ArgumentParser(description="Process .image files to jpeg pairs.")
parser.add_argument("inputfolder", help="Input folder name.")
parser.add_argument("outputfolder", help="Output folder name.")
parser.add_argument("-Imagename", help="Imagename.", default="image#")
parser.add_argument("-partheader", help="Where to start transfering .image to .jpeg", type=int, default=0)
parser.add_argument("-parttailer", help="Where to stop transfering .image to .jpeg", type=int, default=260)
parser.add_argument("-pixle_x", help="pixle_x", type=int, default=2048)
parser.add_argument("-pixle_y", help="pixle_y", type=int, default=1548)
args = parser.parse_args()
Imagename = args.Imagename
output = args.outputfolder
inpath = args.inputfolder
startI = args.partheader
endI = args.parttailer
pixle_x=args.pixle_x
pixle_y=args.pixle_y
def Image2Jpeg(pathin,pathout):
#以二进制格式读取文件
f = open(file=pathin,mode='rb')
data = f.read()
#空过文件前a行
a=3181
#创建两个数组分别存储两帧图像数据
Im_1=np.zeros((pixle_y,pixle_x))
Im_2=np.zeros((pixle_y,pixle_x))
for j in range(pixle_y):
for i in range(pixle_x):
index1 = a+i*pixle_y*2+j*2
Im_1[pixle_y-j-1][i]=int(str(bin(data[index1+1]))+str(bin(data[index1])[2:].zfill(8)),2)
for j in range(pixle_y):
for i in range(pixle_x):
index2 = a+pixle_x*pixle_y*2+i*pixle_y*2+j*2
Im_2[pixle_y-j-1][i]=int(str(bin(data[index2+1]))+str(bin(data[index2])[2:].zfill(8)),2)
#将矩阵按照图片格式写出
image = Image.fromarray(Im_1)
if image.mode == "F":
image = image.convert('RGB')
image.save(pathout+'_1.jpeg')
image = Image.fromarray(Im_2)
if image.mode == "F":
image = image.convert('RGB')
image.save(pathout+'_2.jpeg')
f.close()
return 0
def transfer(i):
isExists=os.path.exists(output)
if not isExists:
os.mkdir(output)
suffix=Imagename
pathin=os.path.join(inpath,suffix+str(i)+'.image')
pathout=os.path.join(output,suffix+str("%06d" % i))
Image2Jpeg(pathin,pathout)
def batchTransfer():
result = [0 for t in range(startI,endI)]
for i in range(startI,endI):
result[i]=i
cores = os.cpu_count()
pool = Pool(cores)
try:
pool.map(transfer, result[startI:endI])
finally:
pool.close()
if __name__=="__main__":
start = time.time()
batchTransfer()
print('Elapsed Time: ', time.time() - start)
如果觉得这篇文章还不错,欢迎点赞收藏转发!