【轉(zhuǎn)載】Consuming OpenCV through Hadoop Storm DRPC Server from .NET

這是一篇關(guān)于在Hadoop或Storm上如何進行智能視頻分析的簡單介紹文檔,轉(zhuǎn)載收藏抛虏。
本文摘自:https://oyermolenko.blog/2017/03/13/consuming-opencv-through-hadoop-storm-drpc-server-from-net/

In previous article I gave a basic overview of Hadoop Storm framework and showed how to use it in order to perform some simple operations like word counting and persistence of the information in real time. But Storm service can cover much wider scope of tasks. The scalable and dynamic nature of this product allows to wrap most complicated algorithms and distribute their handling among different machines of the Hadoop cluster. Computer vision is a good candidate for such type of functionality. This term covers a large range of tasks related to the processing of graphical data and performing such operations upon it like objects detection, motion tracking or face recognition. As you can imaging these procedures can be quite expensive and wrapping them within some scalable parallel processing model could significantly increase the end capacity of the potential solutions. You can find a good example of such application called Amazon Recognition in the list of official Amazon services. In this article I want to show you how to build similar products using Hadoop Storm framework and open-source Computer Vision libraries.

Face detection overview

If you do a small dive into the world of computer vision you will probably find out that face processing is one of the most spread tasks solved by this science. People like to see how their smartphones make photos and do some funny things upon the faces located there. But how such applications really works and what is happening behind the scene every time we click the button in our device? Of cause it is impossible to cover all aspects of these complicated operations in single article but lets try at figure out high level stages of this process.

In typical image processing program when we click some Capture button our application sends the signal to the camera to take a new photo. The information is then persisted into the local storage of the device. After that application uploads the photo into memory and begins its scanning. During that stage it tries to localize the pieces which contain the objects of interest – faces in our case. Internally program usually uses a set of prototype images which contain the example objects. Application’s scanner operates with that collection to handle proper match comparisons. This process is called detection. Once objects have been identified, program usually performs next steps like marking, replacement, filtering or recognition upon them. The overall workflow is sequential and can be divided into a series of tasks which needs to be solved to get the end result:

  1. Capturing data from device input sources
  2. Creation of training sets with different samples of objects of interest which will be used to underline the common features of the generic object
  3. Processing captured data through some framework which will be able to detect objects of interest relying on the training set of generic objects
  4. Update found matching objects

This process looks quite challenging from the first glance. The good thing is that all these tasks can be accomplished through the usage of different open-source components available for free usage.

Solution design

In my example I want to show you how to process video through computer vision based workflows. We will download a video sample from the internet and will process it through DRPC Server of Hadoop Storm service where we will host the topology. It will parse video into a set of image frames, perform marking face detection with skin recognition upon every frame and return the combined video file created from the set of modified images. This schema represents the workflow of the application:

In order to create this program I will use following open-source components:

  • FFMpeg – complete, cross-platform solution to record, convert and stream audio and video
  • OpenCv – open source computer vision library written natively in C++ which contains different algorithms for working with the graphical data
  • ByteCode JavaCv – bridge solution which wraps OpenCv and ffmpeg products and provides Java-based API for their consumption

Besides on the Hadoop side I will use Storm DRPC Server to host the solution with the logic built with usage of components mentioned above:

  • Storm DRPC – Hadoop service which provides Thrift API for consuming hosted topologies as remote functions

OpenCV detection

Before moving on I would like to give you some more details on how we will perform the actual detection. As you’ve probably noticed OpenCV is the core component of the solution. Indeed this is a powerful library which encapsulates more then 2500 different computer vision algorithms and provides an API for their consumption. For our task we will use its piece related to the detection of objects in the images. We will use Face Detection through Haar Cascades algorithm proposed by Paul Viola and improved by Rainer Lienhart. Originally the problems related with the localization of objects in the images were solved with very expensive algorithms which refereed to the RGB pixels comparisons. Haar Cascades approach introduces a new way of detecting the objects. The idea is that we first create of a set of positive images which contain the correct object of interest and a set of negative images without object of interest. Algorithm then studies theses two collections and identifies a number of common features of the object. For every new image algorithm will create a scanner which will loop through the content and will try to detect these features at different regions of the picture. This video explains the work of algorithm:
https://youtu.be/hPCTwxF0qf4

As I’ve mentioned native OpenCV framework is implemented in C++ language and it could be quite challenging to adopt this library to the common Hadoop services like Storm. In order to simplify this task we will use the Bytecode JavaCV packages which wrap all required functionality with Java language.

Now once we have some basic understanding of detection technique, we can start implementing the solution. I divided it into three parts – detection implementation, message processing implementation and wrapping workflow into Storm Topology. In order to simplify things I will be embed all the parts within a single Java project.

Implementing Detection:

As I’ve mentioned before in order to perform detection we need to have a training set of object of interest. The good thing is that instead of dealing with its creation we can download ready for use instance from official OpenCV GitHub repository. It is an ordinary XML file with formatted frontal face training history inside. We will point this file for our detection logic later. This part of program will be implemented inside following classes:

  • Face Scanner – primary container for the detectors which will scan through the input images and will mark the faces. This class plays the role of interface for the consumers over OpenCV methods. To make it more generic class will operate with the collection of detectors. Each detector will be able to localize particular object in the upcoming image. Currently we will have a single frontal face detector but in the same fashion you can extend with eyes or mouth detectors:
public class FaceScanner {
 
  List<FaceDetector> detectors = new ArrayList<FaceDetector>();
 
  public List<IplImage> Detect(IplImage image) throws Exception {
    if (detectors.size() == 0) {
      throw new Exception("No detectors found to perform recognition");
    }
 
    List<CvRect> rects = new ArrayList<CvRect>();
    List<IplImage> faces = new ArrayList<IplImage>();
 
    for (FaceDetector detector : detectors) {
      CvSeq catches = detector.detect(image);
      for (int i = 0; i < catches.total(); i++) {
        CvRect rectNext = new CvRect(cvGetSeqElem(catches, i));
 
        IplImage ROIFrame = image.clone();
        cvSetImageROI(ROIFrame, rectNext);
        faces.add(ROIFrame);
      }
      detector.mark(CvScalar.RED);
    }
 
    return faces;
  }
 
  public void AddDetector(FaceDetector detector) {
    detectors.add(detector);
  }
}

  • Face Detector – will perform the frontal face detection upon images basing on the training set contained in the frontal face classifier downloaded from GitHub repository. Our detector will support the extension with filters which will improve the detection quality with some extra logic:
public class FaceDetector implements FeatureDetector {
 
    CvHaarClassifierCascade cascadeInstance;
 
    FrameFilter<IplImage>filter;
 
    CvSeq detectedSeq;
 
    IplImage image;
 
    public FaceDetector(String historyLocation) {
      if (cascadeInstance == null) {
        cascadeInstance = new CvHaarClassifierCascade(cvLoad(historyLocation));
      }
    }
 
    @Override
    public CvSeq detect(IplImage src) {
      image = src;
      CvMemStorage storage = CvMemStorage.create();
      CvSeq signs = cvHaarDetectObjects(src, cascadeInstance, storage, 2, 1, 1);
      if (signs.total() &gt; 0) {
        cvClearMemStorage(storage);
        if (filter != null) {
          signs = filter.execute(src, signs);
        }
      }
 
      detectedSeq = signs;
      return signs;
    }
 
    @Override
    public void mark(CvScalar color) {
      if (detectedSeq != null && image != null) {
        for (int i = 0; i < detectedSeq.total(); i++) {
          CvRect rect = new CvRect(cvGetSeqElem(detectedSeq, i));
          cvRectangle(image, cvPoint(rect.x(), rect.y()),
          cvPoint(rect.width() + rect.x(), rect.height() + rect.y()), color, 2, CV_AA, 0);
        }
      }
    }
 
    public void WithSkinDetection(boolean markSkin) {
      filter = new SkinFilter(filter, markSkin);
    }
}
  • Skin Filter – will improve the quality of detector by applying the skin verification logic. The filter will analyze the regions located by detectors and will check if the range of colors inside them will be relevant to the human skin color range:
public class SkinFilter extends SequenceFrameFilterBase<IplImage>{
 
    public static int SkinThreshhold = 35;
 
    protected boolean markArea = false;
 
    public SkinFilter(FrameFilter<IplImage> frameFilter, boolean markArea) {
      super(frameFilter);
      this.markArea = markArea;
    }
 
    @Override
    public CvSeq execute(IplImage image, CvSeq catches) {
      if (frameFilter!=null) catches = frameFilter.execute(image, catches);
      locateSkin(image, catches);
      return catches;
    }
 
    private void locateSkin(IplImage image, CvSeq catches) {
      for (int i = 0; i < catches.total(); i++) {
        CvRect rect = new CvRect(cvGetSeqElem(catches, i));
        IplImage ROIFrame = image;
        cvSetImageROI(ROIFrame, rect);
        int persentage = getSkinPercentageInFrame(new Mat(ROIFrame));
        if (persentage < SkinThreshhold)
        {
          cvSeqRemove(catches, i);
        }
        cvSetImageROI(image, new CvRect(0, 0, image.width(), image.height()));
      }
    }
 
    private int getSkinPercentageInFrame(Mat original) {
      IplImage imageWithPhotoFilter = new IplImage(original.clone());
      cvtColor(original, new Mat(imageWithPhotoFilter), COLOR_BGR2YCrCb);
 
      CvScalar min = cvScalar(0, 133, 77, 0);
      CvScalar max = cvScalar(255, 173, 127, 0);
 
      IplImage imgSkin = cvCreateImage(cvGetSize(imageWithPhotoFilter), 8, 1);
      cvInRangeS(imageWithPhotoFilter, min, max, imgSkin);
 
      final MatVector skinContours = new MatVector();
      findContours(new Mat(imgSkin), skinContours, RETR_EXTERNAL, CHAIN_APPROX_SIMPLE);
 
      if (markArea) drawContours(original, skinContours, -1, AbstractScalar.GREEN);
      double totalSize = 0;
      for (int i = 0; i < skinContours.size(); i++) {
        totalSize = totalSize + contourArea(skinContours.get(i));
      }
 
      int persentage = (int) ((totalSize / (original.size().width() * original.size().height())) * 100);
 
      return persentage;
   }
}

Message processing implementation:

Our detector module is ready to process images. But it can accept them in special OpenCV IplImage format only. We need to convert our video files into a set of suitable image objects. For this purpose we will use ffmpeg frame grabber. This class allows us to split the incoming video file into a set of frames and convert them into the IplImage objects suitable for OpenvCV processing. On the other end we will use ffmpeg frame recorder to rejoin the images produced by our detector into a single video file:

  • Grab:
private static List<IplImage> Grab(byte[] data) throws IOException {
 
  List<IplImage> images = new ArrayList<IplImage>();
  InputStream bis = new ByteArrayInputStream(data);
 
  FileOutputStream fos = new FileOutputStream("/tmp/tmp.avi");
  fos.write(data);
  fos.close();
 
  logger.info("File saved to temp dir. Trying to read frames");
 
  FFmpegFrameGrabber grabber = new FFmpegFrameGrabber("/tmp/tmp.avi");
 
  try {
    grabber.start();
 
    int frame_count = grabber.getLengthInFrames();
 
    logger.info("Found " + frame_count + " frames");
 
    for (int i = 0; i < frame_count; i++) {
      grabber.setFrameNumber(i);
      Frame frame = grabber.grabFrame();
 
      if (frame == null)
        break;
      if (frame.image == null)
        continue;
 
      OpenCVFrameConverter.ToIplImage converter = new OpenCVFrameConverter.ToIplImage();
      IplImage grabbedImage = converter.convert(frame);
      IplImage grayImage = grabbedImage.clone();
 
      images.add(grayImage);
    }
    grabber.stop();
 
    logger.info("Grabbing finished. Processes images - " + images.size());
 
  } catch (java.lang.Exception e) {
      e.printStackTrace();
  }
  return images;
}
  • SaveToVideo:
private static byte[] SaveToVideo(List<IplImage> images)
throws org.bytedeco.javacv.FrameRecorder.Exception, IOException {
  File newFile = new File("/tmp/sample.mp4");
 
  FFmpegFrameRecorder fr = new FFmpegFrameRecorder(newFile, 640, 480);
  fr.setVideoCodec(avcodec.AV_CODEC_ID_H264);
  fr.setFormat("mp4");
 
  fr.start();
 
  OpenCVFrameConverter.ToIplImage converter = new OpenCVFrameConverter.ToIplImage();
 
  for (int i = 0; i < images.size(); i++) {
    IplImage img = images.get(i);
    Frame frame = converter.convert(img);
    fr.record(frame);
  }
 
  fr.stop();
  byte[] data = Files.readAllBytes(Paths.get("/tmp/sample.mp4"));
  return data;
}

On the top level we will create a method for initialization of Face Scanner instance and for maintenance of the process of detection of faces in the images produced by the grabbers:

  • DetectFaces:
private void DetectFaces(List<IplImage> images) {
  int size = images.size();
 
  FaceScanner searcher = new FaceScanner();
  FaceDetector frontalDetector = new FaceDetector(XML_FILE_FACE);
  frontalDetector.WithSkinDetection(false);
  searcher.AddDetector(frontalDetector);
 
  logger.info("Face detector created");
 
  for (int i = 0; i < size; i++) {
 
    IplImage img = images.get(i);
 
    try {
      List<IplImage> faces = searcher.Detect(img);
    } catch (Exception e) {
      e.printStackTrace();
    }
 
    IplImage imageWithPhotoFilter = new IplImage(img.clone());
    Mat imgmt = new Mat(img);
 
    images.set(i, imageWithPhotoFilter);
  }
}

Wrapping workflow into Storm Topology:

At this stage most part of our detection logic is ready and we can start wrapping it into a single Storm Topology workflow. In previous article I showed how to use Kafka message broker for interactions with the service from side of external applications. But in this article I want to follow another approach called Distributed Remote Procedure Call. This technique allows to consume topologies like usual remote procedures through a special cross platform Thrift API. Such step will make our solution universal for most part of exiting platforms. But in order to create such application our Topology should follow two requirements:

  • Bolts within topology should re-emit tuples with starting with id as first field
  • We should use Liner DRPC Topology made with a LinearDRPCTopologyBuilder class

You may have noticed that LinearDRPCTopology is considered to be deprecated and some recent articles advice to work with Trident Topologies instead. But I think that in our case we would introduce some piece of unnecessary overhead for our task and it would be easier to follow the original approach. Now we will create our VideoDetectionBolt and will override its base methods following first requirement:

public class VideoDetectionBolt extends BaseBasicBolt {
 
  private static Logger logger = Logger.getLogger(VideoDetectionBolt.class.getName());
 
  public static final String XML_FILE_FACE = "/tmp/haarcascade_frontalface_alt.xml";
 
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    try {
      byte[] data = Base64.getDecoder().decode(tuple.getString(1));
      logger.info("Got new message " + data.length);
      List<IplImage> images = Grab(data);
      logger.info("Located  " + data.length + " images");
      DetectFaces(images);
      byte[] updatedVideo = SaveToVideo(images);
      String dataStr = Base64.getEncoder().encodeToString(updatedVideo);
      logger.info("Video processed. Emitting back to sender - " + dataStr.length());
      collector.emit(new Values(tuple.getValue(0), dataStr));
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
 
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("id", "video"));
  }
 
  //Detection methods
  //........
}

Don’t forget to put the training set into some location accessible by the worker:

wget -P https://github.com/opencv/opencv/blob/master/data/haarcascades/haarcascade_frontalface_alt.xml

At the end we need to summarize everything using LinearDRPCTopologyBuilder and submit our workflow using StormSubmitter class.

public class VideoProcessingTopology {
 
    public static void main(String[] args) throws Exception {
      Config cfg = new Config();
      cfg.put("drpc.servers", Arrays.asList(new String[]{"localhost"}));
 
      LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("video");
      builder.addBolt(new VideoDetectionBolt(), 1);
 
      cfg.setNumWorkers(1);
      StormSubmitter.submitTopology(args[0], cfg, builder.createRemoteTopology());
   }
}

Now lets submit the topology and verify that it is up and running:

storm jar target/RPVideoProcessingTopology-0.0.1-SNAPSHOT.jar demo.topologies.VideoProcessingTopology

storm list

Topology_name Status Num_tasks Num_workers Uptime_secs
——————————————————————-
VideoTopology ACTIVE 7 1 100083

Consuming DRPC Topology from .NET

I’ve mentioned earlier that DRPC server provides Thrift API. It means that we can build custom client applications using original contracts of the service on any language which supports this protocol. In our program we will use Storm.Net.Adapter NuGet package which already contains Thrift client proxi classes which we can be used to send messages to Storm DRPC server. The code part here is very simple:

class Program
{
  static void Main(string[] args)
  {
    var bytes = File.ReadAllBytes(@"c:\out.mp4");
    string base64Str = Convert.ToBase64String(bytes);
 
    DRPCClient client = new DRPCClient("localhost", 3772);
    string result = client.execute("video", base64Str);
 
    File.WriteAllBytes("tmp.avi", Convert.FromBase64String(result));
  }
}

The solution is ready and now it is time to test it end to end. I will use a mp4 sample downloaded for this link.

After processing the data through the topology I obtained the following output video file.

As you can see the original video was updated with the marking of the faces detected in every frame of the file. With certain percentage of invalid catches in general our program managed to perform the localization of face objects very well. Now we can start scaling it with any extra pieces of logic like replacements or recognition. Also we can try improve the quality of algorithm with extra features like eyes and mouth detection. Besides we can play with the paralleling parts of the workflow through different Storm workers and executors to increase the speed of our algorithms.

The sources are available in GitHub.

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市套才,隨后出現(xiàn)的幾起案子嘉蕾,更是在濱河造成了極大的恐慌,老刑警劉巖霜旧,帶你破解...
    沈念sama閱讀 216,470評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異儡率,居然都是意外死亡挂据,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,393評論 3 392
  • 文/潘曉璐 我一進店門儿普,熙熙樓的掌柜王于貴愁眉苦臉地迎上來崎逃,“玉大人,你說我怎么就攤上這事眉孩「錾埽” “怎么了?”我有些...
    開封第一講書人閱讀 162,577評論 0 353
  • 文/不壞的土叔 我叫張陵浪汪,是天一觀的道長巴柿。 經(jīng)常有香客問我,道長死遭,這世上最難降的妖魔是什么广恢? 我笑而不...
    開封第一講書人閱讀 58,176評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮呀潭,結(jié)果婚禮上钉迷,老公的妹妹穿的比我還像新娘。我一直安慰自己钠署,他們只是感情好糠聪,可當(dāng)我...
    茶點故事閱讀 67,189評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著谐鼎,像睡著了一般舰蟆。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,155評論 1 299
  • 那天夭苗,我揣著相機與錄音信卡,去河邊找鬼。 笑死题造,一個胖子當(dāng)著我的面吹牛傍菇,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播界赔,決...
    沈念sama閱讀 40,041評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼丢习,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了淮悼?” 一聲冷哼從身側(cè)響起咐低,我...
    開封第一講書人閱讀 38,903評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎袜腥,沒想到半個月后见擦,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,319評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡羹令,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,539評論 2 332
  • 正文 我和宋清朗相戀三年鲤屡,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片福侈。...
    茶點故事閱讀 39,703評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡酒来,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出肪凛,到底是詐尸還是另有隱情堰汉,我是刑警寧澤,帶...
    沈念sama閱讀 35,417評論 5 343
  • 正文 年R本政府宣布伟墙,位于F島的核電站翘鸭,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏戳葵。R本人自食惡果不足惜矮固,卻給世界環(huán)境...
    茶點故事閱讀 41,013評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望譬淳。 院中可真熱鬧档址,春花似錦、人聲如沸邻梆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,664評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽浦妄。三九已至尼摹,卻和暖如春见芹,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背蠢涝。 一陣腳步聲響...
    開封第一講書人閱讀 32,818評論 1 269
  • 我被黑心中介騙來泰國打工玄呛, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人和二。 一個月前我還...
    沈念sama閱讀 47,711評論 2 368
  • 正文 我出身青樓徘铝,卻偏偏與公主長得像,于是被迫代替她去往敵國和親惯吕。 傳聞我的和親對象是個殘疾皇子惕它,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,601評論 2 353

推薦閱讀更多精彩內(nèi)容

  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi閱讀 7,322評論 0 10
  • A 24-year-old boy seeing out from the train's window shou...
    多維的宇宙閱讀 1,463評論 0 0
  • 好的人生是 這么回事吧 這一邊,荷爾蒙废登,虛榮淹魄,欲望,至死燃燒 那一邊堡距,悲憫甲锡,善與愛,也可以永無止境
    木卯丁閱讀 234評論 0 1
  • 今天是我34歲生日羽戒,心里有很多感概搔体,卻不知道對誰說。嫁給老公已經(jīng)7個年頭了半醉,也有了兩個可愛的寶貝兒子,按說是應(yīng)該很...
    曉峰_1757閱讀 272評論 0 0