技術(shù)頻道

娓娓工業(yè)
您現(xiàn)在的位置: 中國傳動網(wǎng) > 技術(shù)頻道 > 技術(shù)百科 > ROS機器人操作系統(tǒng)的實現(xiàn)原理解析

ROS機器人操作系統(tǒng)的實現(xiàn)原理解析

時間:2023-01-06 14:33:54來源:CSDN博主

導(dǎo)語:?本文介紹ROS機器人操作系統(tǒng)(Robot OperaTIng System)的實現(xiàn)原理,從最底層分析ROS代碼是如何實現(xiàn)的。

  1、序列化

  把通信的內(nèi)容(也就是消息message)序列化是通信的基礎(chǔ),所以我們先研究序列化。

  盡管筆者從事機器人學(xué)習和研發(fā)很長時間了,但是在研究ROS的過程中,“序列化”這個詞還是這輩子第一次聽到。

  所以可想而知很多人在看到“把一個消息序列化”這樣的描述時是如何一臉懵逼。

  但其實序列化是一個比較常見的概念,你雖然不知道它但一定接觸過它。

  下面我們先介紹“序列化”的一些常識,然后解釋ROS里的序列化是怎么做的?

  1.1 什么是序列化?

  “序列化”(SerializaTIon )的意思是將一個對象轉(zhuǎn)化為字節(jié)流。

  這里說的對象可以理解為“面向?qū)ο蟆崩锏哪莻€對象,具體的就是存儲在內(nèi)存中的對象數(shù)據(jù)。

  與之相反的過程是“反序列化”(DeserializaTIon )。

  雖然掛著機器人的羊頭,但是后面的介紹全部是計算機知識,跟機器人一丁點關(guān)系都沒有,序列化就是一個純粹的計算機概念。

  序列化的英文Serialize就有把一個東西變成一串連續(xù)的東西之意。

  形象的描述,數(shù)據(jù)對象是一團面,序列化就是將面團拉成一根面條,反序列化就將面條捏回面團。

  另一個形象的類比是我們在對話或者打電話時,一個人的思想轉(zhuǎn)換成一維的語音,然后在另一個人的頭腦里重新變成結(jié)構(gòu)化的思想,這也是一種序列化。

716d3cf4-8a26-11ed-bfe3-dac502259ad0.jpg

  面對序列化,很多人心中可能會有很多疑問。

  首先,為什么要序列化?或者更具體的說,既然對象的信息本來就是以字節(jié)的形式儲存在內(nèi)存中,那為什么要多此一舉把一些字節(jié)數(shù)據(jù)轉(zhuǎn)換成另一種形式的、一維的、連續(xù)的字節(jié)數(shù)據(jù)呢?

  如果我們的程序在內(nèi)存中存儲了一個數(shù)字,比如25.那要怎么傳遞25這個數(shù)字給別的程序節(jié)點或者把這個數(shù)字永久存儲起來呢?

  很簡單,直接傳遞25這個數(shù)字(的字節(jié)表示,即0X19.當然最終會變成二進制表示11001以高低電平傳輸存儲)或者直接把這個數(shù)字(的字節(jié)表示)寫進硬盤里即可。

  所以,對于本來就是連續(xù)的、一維的、一連串的數(shù)據(jù)(例如字符串),序列化并不需要做太多東西,其本質(zhì)是就是由內(nèi)存向其它地方拷貝數(shù)據(jù)而已。

  所以,如果你在一個序列化庫里看到memcpy函數(shù)不用覺得奇怪,因為你知道序列化最底層不過就是在操作內(nèi)存數(shù)據(jù)而已(還有些庫使用了流的ostream.rdbuf()->sputn函數(shù))。

  可是實際程序操作的對象很少是這么簡單的形式,大多數(shù)時候我們面對的是包含不同數(shù)據(jù)類型(int、double、string)的復(fù)雜數(shù)據(jù)結(jié)構(gòu)(比如vector、list),它們很可能在內(nèi)存中是不連續(xù)存儲的而是分散在各處。比如ROS的很多消息都包含向量。

  數(shù)據(jù)中還有各種指針和引用。而且,如果數(shù)據(jù)要在運行于不同架構(gòu)的計算機之上的、由不同編程語言所編寫的節(jié)點程序之間傳遞,那問題就更復(fù)雜了,它們的字節(jié)順序endianness規(guī)定有可能不一樣,基本數(shù)據(jù)類型(比如int)的長度也不一樣(有的int是4個字節(jié)、有的是8個字節(jié))。

  這些都不是通過簡單地、原封不動地復(fù)制粘貼原始數(shù)據(jù)就能解決的。這時候就需要序列化和反序列化了。

  所以在程序之間需要通信時(ROS恰好就是這種情況),或者希望保存程序的中間運算結(jié)果時,序列化就登場了。

  另外,在某種程度上,序列化還起到統(tǒng)一標準的作用。

71866f12-8a26-11ed-bfe3-dac502259ad0.jpg

  我們把被序列化的東西叫object(對象),它可以是任意的數(shù)據(jù)結(jié)構(gòu)或者對象:結(jié)構(gòu)體、數(shù)組、類的實例等等。

  把序列化后得到的東西叫archive,它既可以是人類可讀的文本形式,也可以是二進制形式。

  前者比如JSON和XML,這兩個是網(wǎng)絡(luò)應(yīng)用里最常用的序列化格式,通過記事本就能打開閱讀;

  后者就是原始的二進制文件,比如后綴名是bin的文件,人類是沒辦法直接閱讀一堆的0101或者0XC9D23E72的。

  序列化算是一個比較常用的功能,所以大多數(shù)編程語言(比如C++、Python、Java等)都會附帶用于序列化的庫,不需要你再去造輪子。

  以C++為例,雖然標準STL庫沒有提供序列化功能,但是第三方庫Boost提供了[ 2 ]谷歌的protobuf也是一個序列化庫,還有Fast-CDR,以及不太知名的Cereal,Java自帶序列化函數(shù),python可以使用第三方的pickle模塊實現(xiàn)。

  總之,序列化沒有什么神秘的,用戶可以看看這些開源的序列化庫代碼,或者自己寫個小程序試試簡單數(shù)據(jù)的序列化,例如這個例子,或者這個,有助于更好地理解ROS中的實現(xiàn)。

  1.2 ROS中的序列化實現(xiàn)

  理解了序列化,再回到ROS。我們發(fā)現(xiàn),ROS沒有采用第三方的序列化工具,而是選擇自己實現(xiàn),代碼在roscpp_core項目下的roscpp_serializaTIon中,見下圖。這個功能涉及的代碼量不是很多。

  為什么ROS不使用現(xiàn)成的序列化工具或者庫呢?可能ROS誕生的時候(2007年),有些序列化庫可能還不存在(protobuf誕生于2008年),更有可能是ROS的創(chuàng)造者認為當時沒有合適的工具。

  1.2.1 serialization.h

  核心的函數(shù)都在serialization.h里,簡而言之,里面使用了C語言標準庫的memcpy函數(shù)把消息拷貝到流中。

  下面來看一下具體的實現(xiàn)。

  序列化功能的特點是要處理很多種數(shù)據(jù)類型,針對每種具體的類型都要實現(xiàn)相應(yīng)的序列化函數(shù)。

  為了盡量減少代碼量,ROS使用了模板的概念,所以代碼里有一堆的template。

  從后往前梳理,先看Stream這個結(jié)構(gòu)體吧。在C++里結(jié)構(gòu)體和類基本沒什么區(qū)別,結(jié)構(gòu)體里也可以定義函數(shù)。

  Stream翻譯為流,流是一個計算機中的抽象概念,前面我們提到過字節(jié)流,它是什么意思呢?

  在需要傳輸數(shù)據(jù)的時候,我們可以把數(shù)據(jù)想象成傳送帶上連續(xù)排列的一個個被傳送的物體,它們就是一個流。

  更形象的,可以想象磁帶或者圖靈機里連續(xù)的紙帶。在文件讀寫、使用串口、網(wǎng)絡(luò)Socket通信等領(lǐng)域,流經(jīng)常被使用。例如我們常用的輸入輸出流:

  cout<<"helllo"; 由于使用很多,流的概念也在演變。想了解更多可以看這里。

  struct Stream

  {

  // Returns a pointer to the current position of the stream

  inline uint8_t* getData() { return data_; }

  // Advances the stream, checking bounds, and returns a pointer to the position before it was advanced.

  // hrows StreamOverrunException if len would take this stream past the end of its buffer

  ROS_FORCE_INLINE uint8_t* advance(uint32_t len)

  {

  uint8_t* old_data = data_;

  data_ += len;

  if (data_ > end_)

  {

  // Throwing directly here causes a significant speed hit due to the extra code generated for the throw statement

  throwStreamOverrun();

  }

  return old_data;

  }

  // Returns the amount of space left in the stream

  inline uint32_t getLength() { return static_cast(end_ - data_); }

  protected:

  Stream(uint8_t* _data, uint32_t _count) : data_(_data), end_(_data + _count) {}

  private:

  uint8_t* data_;

  uint8_t* end_;

  };

  注釋表明Stream是個基類,輸入輸出流IStream和OStream都繼承自它。

  Stream的成員變量data_是個指針,指向序列化的字節(jié)流開始的位置,它的類型是uint8_t。

  在Ubuntu系統(tǒng)中,uint8_t的定義是typedef unsigned char uint8_t;

  所以uint8_t就是一個字節(jié),可以用size_of()函數(shù)檢驗。data_指向的空間就是保存字節(jié)流的。

  輸出流類OStream用來序列化一個對象,它引用了serialize函數(shù),如下。

  struct OStream : public Stream

  {

  static const StreamType stream_type = stream_types::Output;

  OStream(uint8_t* data, uint32_t count) : Stream(data, count) {}

  /* Serialize an item to this output stream*/

  template

  ROS_FORCE_INLINE void next(const T& t)

  {

  serialize(*this, t);

  }

  template

  ROS_FORCE_INLINE OStream& operator<<(const T& t)

  {

  serialize(*this, t);

  return *this;

  }

  };

  輸入流類IStream用來反序列化一個字節(jié)流,它引用了deserialize函數(shù),如下。

  struct ROSCPP_SERIALIZATION_DECL IStream : public Stream

  {

  static const StreamType stream_type = stream_types::Input;

  IStream(uint8_t* data, uint32_t count) : Stream(data, count) {}

  /* Deserialize an item from this input stream */

  template

  ROS_FORCE_INLINE void next(T& t)

  {

  deserialize(*this, t);

  }

  template

  ROS_FORCE_INLINE IStream& operator>>(T& t)

  {

  deserialize(*this, t);

  return *this;

  }

  };

  自然,serialize函數(shù)和deserialize函數(shù)就是改變數(shù)據(jù)形式的地方,它們的定義在比較靠前的地方。它們都接收兩個模板,都是內(nèi)聯(lián)函數(shù),然后里面沒什么東西,只是又調(diào)用了Serializer類的成員函數(shù)write和read。所以,serialize和deserialize函數(shù)就是個二道販子。

  // Serialize an object.  Stream here should normally be a ros::OStream

  template

  inline void serialize(Stream& stream, const T& t)

  {

  Serializer::write(stream, t);

  }

  // Deserialize an object.  Stream here should normally be a ros::IStream

  template

  inline void deserialize(Stream& stream, T& t)

  {

  Serializer::read(stream, t);

  }

  所以,我們來分析Serializer類,如下。我們發(fā)現(xiàn),write和read函數(shù)又調(diào)用了類型里的serialize函數(shù)和deserialize函數(shù)。

  頭別暈,這里的serialize和deserialize函數(shù)跟上面的同名函數(shù)不是一回事。

  注釋中說:“Specializing the Serializer class is the only thing you need to do to get the ROS serialization system to work with a type”(要想讓ROS的序列化功能適用于其它的某個類型,你唯一需要做的就是特化這個Serializer類)。

  這就涉及到的另一個知識點——模板特化(template specialization)。

  template struct Serializer

  {

  // Write an object to the stream.  Normally the stream passed in here will be a ros::OStream

  template

  inline static void write(Stream& stream, typename boost::call_traits::param_type t)

  {

  t.serialize(stream.getData(), 0);

  }

  // Read an object from the stream.  Normally the stream passed in here will be a ros::IStream

  template

  inline static void read(Stream& stream, typename boost::call_traits::reference t)

  {

  t.deserialize(stream.getData());

  }

  // Determine the serialized length of an object.

  inline static uint32_t serializedLength(typename boost::call_traits::param_type t)

  {

  return t.serializationLength();

  }

  };

  接著又定義了一個帶參數(shù)的宏函數(shù)ROS_CREATE_SIMPLE_SERIALIZER(Type),然后把這個宏作用到了ROS中的10種基本數(shù)據(jù)類型,分別是:uint8_t, int8_t, uint16_t, int16_t, uint32_t, int32_t, uint64_t, int64_t, float, double。

  說明這10種數(shù)據(jù)類型的處理方式都是類似的。看到這里大家應(yīng)該明白了,write和read函數(shù)都使用了memcpy函數(shù)進行數(shù)據(jù)的移動。

  注意宏定義中的template<>語句,這正是模板特化的標志,關(guān)鍵詞template后面跟一對尖括號。

  關(guān)于模板特化可以看這里。

  #define ROS_CREATE_SIMPLE_SERIALIZER(Type)

  template<> struct Serializer

  {

  template inline static void write(Stream& stream, const Type v)

  {

  memcpy(stream.advance(sizeof(v)), &v, sizeof(v) );

  }

  template inline static void read(Stream& stream, Type& v)

  {

  memcpy(&v, stream.advance(sizeof(v)), sizeof(v) );

  }

  inline static uint32_t serializedLength(const Type&)

  {

  return sizeof(Type);

  }

  };

  ROS_CREATE_SIMPLE_SERIALIZER(uint8_t)

  ROS_CREATE_SIMPLE_SERIALIZER(int8_t)

  ROS_CREATE_SIMPLE_SERIALIZER(uint16_t)

  ROS_CREATE_SIMPLE_SERIALIZER(int16_t)

  ROS_CREATE_SIMPLE_SERIALIZER(uint32_t)

  ROS_CREATE_SIMPLE_SERIALIZER(int32_t)

  ROS_CREATE_SIMPLE_SERIALIZER(uint64_t)

  ROS_CREATE_SIMPLE_SERIALIZER(int64_t)

  ROS_CREATE_SIMPLE_SERIALIZER(float)

  ROS_CREATE_SIMPLE_SERIALIZER(double)

  對于其它類型的數(shù)據(jù),例如bool、std::string、std::vector、ros::Time、ros::Duration、boost::array等等,它們各自的處理方式有細微的不同,所以不再用上面的宏函數(shù),而是用模板特化的方式每種單獨定義,這也是為什么serialization.h這個文件這么冗長。

  對于int、double這種單個元素的數(shù)據(jù),直接用上面特化的Serializer類中的memcpy函數(shù)實現(xiàn)序列化。

  對于vector、array這種多個元素的數(shù)據(jù)類型怎么辦呢?方法是分成幾種情況,對于固定長度簡單類型的(fixed-size simple types),還是用各自特化的Serializer類中的memcpy函數(shù)實現(xiàn),沒啥太大區(qū)別。

  對于固定但是類型不簡單的(fixed-size non-simple types)或者既不固定也不簡單的(non-fixed-size, non-simple types)或者固定但是不簡單的(fixed-size, non-simple types),用for循環(huán)遍歷,一個元素一個元素的單獨處理。

  那怎么判斷一個數(shù)據(jù)是不是固定是不是簡單呢?這是在roscpp_traits文件夾中的message_traits.h完成的。

  其中采用了萃取Type Traits,這是相對高級一點的編程技巧了,筆者也不太懂。

  對序列化的介紹暫時就到這里了,有一些細節(jié)還沒講,等筆者看懂了再補。

  2、消息訂閱發(fā)布

  2.1 ROS的本質(zhì)

  如果問ROS的本質(zhì)是什么,或者用一句話概括ROS的核心功能。那么,筆者認為ROS就是個通信庫,讓不同的程序節(jié)點能夠相互對話。

  很多文章和書籍在介紹ROS是什么的時候,經(jīng)常使用“ROS是一個通信框架”這種描述。

  但是筆者認為這種描述并不是太合適?!翱蚣堋笔莻€對初學(xué)者非常不友好的抽象詞匯,用一個更抽象難懂的概念去解釋一個本來就不清楚的概念,對初學(xué)者起不到任何幫助。

  而且筆者嚴重懷疑絕大多數(shù)作者能對機器人的本質(zhì)或者軟件框架能有什么太深的理解,他們的見解不會比你我深刻多少。

  既然提到本質(zhì),那我們就深入到最基本的問題。

  在接觸無窮的細節(jié)之前,我們不妨先做一個哲學(xué)層面的思考。

  那就是,為什么ROS要解決通信問題?

  機器人涉及的東西千千萬萬,機械、電子、軟件、人工智能無所不包,為什么底層的設(shè)計是一套用來通信的程序而不是別的東西。

  到目前為止,我還沒有看到有人討論過這個問題。這要回到機器人或者智能的本質(zhì)。

  當我們在談?wù)摍C器人的時候,最首要的問題不是硬件設(shè)計,而是對信息的處理。一個機器人需要哪些信息,信息從何而來,如何傳遞,又被誰使用,這些才是最重要的問題。

  人類飛不鳥,游不過魚,跑不過馬,力不如牛,為什么卻自稱萬物之靈呢。

  因為人有大腦,而且人類大腦處理的信息更多更復(fù)雜。

  拋開物質(zhì),從信息的角度看,人與動物、與機器人存在很多相似的地方。

  機器人由許多功能模塊組成,它們之間需要協(xié)作才能形成一個有用的整體,機器人與機器人之間也需要協(xié)作才能形成一個有用的系統(tǒng),要協(xié)作就離不開通信。

  需要什么樣的信息以及信息從何而來不是ROS首先關(guān)心的,因為這取決于機器人的應(yīng)用場景。

  因此,ROS首先要解決的是通信的問題,即如何建立通信、用什么方式通信、通信的格式是什么等等一系列具體問題。

  帶著這些問題,我們看看ROS是如何設(shè)計的。

  2.2 客戶端庫

  實現(xiàn)通信的代碼在ros_comm包中,如下。

  其中clients文件夾一共有127個文件,看來是最大的包了。

  現(xiàn)在我們來到了ROS最核心的地帶。

71a08a28-8a26-11ed-bfe3-dac502259ad0.jpg

  客戶端這個名詞出現(xiàn)的有些突然,一個機器人操作系統(tǒng)里為什么需要客戶端。

  原因是,節(jié)點與主節(jié)點master之間的關(guān)系是client/server,這時每個節(jié)點都是一個客戶端(client),而master自然就是服務(wù)器端(server)。

  那客戶端庫(client libraries)是干什么的?就是為實現(xiàn)節(jié)點之間通信的。

  雖然整個文件夾中包含的文件眾多,但是我們?nèi)绻凑找欢ǖ拿}絡(luò)來分析就不會眼花繚亂。

  節(jié)點之間最主要的通信方式就是基于消息的。為了實現(xiàn)這個目的,需要三個步驟,如下。

  弄明白這三個步驟就明白ROS的工作方式了。這三個步驟看起來是比較合乎邏輯的,并不奇怪。

  消息的發(fā)布者和訂閱者(即消息的接收方)建立連接;

  發(fā)布者向話題發(fā)布消息,訂閱者在話題上接收消息,將消息保存在回調(diào)函數(shù)隊列中;

  調(diào)用回調(diào)函數(shù)隊列中的回調(diào)函數(shù)處理消息。

  2.2.1 一個節(jié)點的誕生

  在建立連接之前,首先要有節(jié)點。

  節(jié)點就是一個獨立的程序,它運行起來后就是一個普通的進程,與計算機中其它的進程并沒有太大區(qū)別。

  一個問題是:ROS中為什么把一個獨立的程序稱為“節(jié)點”

  這是因為ROS沿用了計算機網(wǎng)絡(luò)中“節(jié)點”的概念。

  在一個網(wǎng)絡(luò)中,例如互聯(lián)網(wǎng),每一個上網(wǎng)的計算機就是一個節(jié)點。前面我們看到的客戶端、服務(wù)器這樣的稱呼,也是從計算機網(wǎng)絡(luò)中借用的。

  下面來看一下節(jié)點是如何誕生的。我們在第一次使用ROS時,一般都會照著官方教程編寫一個talker和一個listener節(jié)點,以熟悉ROS的使用方法。

  我們以talker為例,它的部分代碼如下。

  #include "ros/ros.h"

  int main(int argc, char **argv)

  {

  /* You must call one of the versions of ros::init() before using any other part of the ROS system. */

  ros::init(argc, argv, "talker");

  ros::NodeHandle n;

  main函數(shù)里首先調(diào)用了init()函數(shù)初始化一個節(jié)點,該函數(shù)的定義在init.cpp文件中。

  當我們的程序運行到init()函數(shù)時,一個節(jié)點就呱呱墜地了。

  而且在出生的同時我們還順道給他起好了名字,也就是"talker"。

  名字是隨便起的,但是起名是必須的。

  我們進入init()函數(shù)里看看它做了什么,代碼如下,看上去還是挺復(fù)雜的。它初始化了一個叫g(shù)_global_queue的數(shù)據(jù),它的類型是CallbackQueuePtr。

  這是個相當重要的類,叫“回調(diào)隊列”,后面還會見到它。init()函數(shù)還調(diào)用了network、master、this_node、file_log、param這幾個命名空間里的init初始化函數(shù)各自實現(xiàn)一些變量的初始化,這些變量都以g開頭,例如g_host、g_uri,用來表明它們是全局變量。

  其中,network::init完成節(jié)點主機名、IP地址等的初始化,master::init獲取master的URI、主機號和端口號。

  this_node::init定義節(jié)點的命名空間和節(jié)點的名字,沒錯,把我們給節(jié)點起的名字就存儲在這里。file_log::init初始化日志文件的路徑。

  void init(const M_string& remappings, const std::string& name, uint32_t options)

  {

  if (!g_atexit_registered) {

  g_atexit_registered = true;

  atexit(atexitCallback);

  }

  if (!g_global_queue) {

  g_global_queue.reset(new CallbackQueue);

  }

  if (!g_initialized) {

  g_init_options = options;

  g_ok = true;

  ROSCONSOLE_AUTOINIT;

  // Disable SIGPIPE

  #ifndef WIN32

  signal(SIGPIPE, SIG_IGN);

  #else

  WSADATA wsaData;

  WSAStartup(MAKEWORD(2. 0), &wsaData);

  #endif

  check_ipv6_environment();

  network::init(remappings);

  master::init(remappings);

  // names:: namespace is initialized by this_node

  this_node::init(name, remappings, options);

  file_log::init(remappings);

  param::init(remappings);

  g_initialized = true;

  }

  }

  完成初始化以后,就進入下一步ros::NodeHandle n定義句柄。

  我們再進入node_handle.cpp文件,發(fā)現(xiàn)構(gòu)造函數(shù)NodeHandle::NodeHandle調(diào)用了自己的construct函數(shù)。然后,順藤摸瓜找到construct函數(shù),它里面又調(diào)用了ros::start()函數(shù)。

  沒錯,我們又繞回到了init.cpp文件。

  ros::start()函數(shù)主要實例化了幾個重要的類,如下。

  完成實例化后馬上又調(diào)用了各自的start()函數(shù),啟動相應(yīng)的動作。

  這些都做完了以后就可以發(fā)布或訂閱消息了。

  一個節(jié)點的故事暫時就到這了。

  TopicManager::instance()->start();

  ServiceManager::instance()->start();

  ConnectionManager::instance()->start();

  PollManager::instance()->start();

  XMLRPCManager::instance()->start();

  2.2.1 XMLRPC是什么?

  關(guān)于ROS節(jié)點建立連接的技術(shù)細節(jié),官方文檔說的非常簡單,在這里ROS Technical Overview。沒有基礎(chǔ)的同學(xué)看這個介紹必然還是不懂。

  在ROS中,節(jié)點與節(jié)點之間的通信依靠節(jié)點管理器(master)牽線搭橋。

  master像一個中介,它介紹節(jié)點們互相認識。一旦節(jié)點們認識了以后,master就完成自己的任務(wù)了,它就不再摻和了。

  這也是為什么你啟動節(jié)點后再殺死m(xù)aster,節(jié)點之間的通信依然保持正常的原因。

  使用過電驢和迅雷而且研究過BitTorrent的同學(xué)對master的工作方式應(yīng)該很熟悉,master就相當于Tracker服務(wù)器,它存儲著其它節(jié)點的信息。

  我們每次下載之前都會查詢Tracker服務(wù)器,找到有電影資源的節(jié)點,然后就可以與它們建立連接并開始下載電影了。

  那么master是怎么給節(jié)點牽線搭橋的呢?ROS使用了一種叫XMLRPC的方式實現(xiàn)這個功能。

  XMLRPC中的RPC的意思是遠程過程調(diào)用(Remote Procedure Call)。

  簡單來說,遠程過程調(diào)用的意思就是一個計算機中的程序(在我們這就是節(jié)點啦)可以調(diào)用另一個計算機中的函數(shù),只要這兩個計算機在一個網(wǎng)絡(luò)中。

  這是一種聽上去很高大上的功能,它能讓節(jié)點去訪問網(wǎng)絡(luò)中另一臺計算機上的程序資源。

  XMLRPC中的XML我們在1.1節(jié)講消息序列化時提到了,它就是一種數(shù)據(jù)表示方式而已。

  所以合起來,XMLRPC的意思就是把由XML表示的數(shù)據(jù)發(fā)送給其它計算機上的程序運行。

  運行后返回的結(jié)果仍然以XML格式返回回來,然后我們通過解析它(還原回純粹的數(shù)據(jù))就能干別的事了。

  想了解更多XMLRPC的細節(jié)可以看這個XML-RPC:概述。

  舉個例子,一個XMLRPC請求是下面這個樣子的。因為XMLRPC是基于HTTP協(xié)議的,所以下面的就是個標準的HTTP報文。

  POST / HTTP/1.1

  User-Agent: XMLRPC++ 0.7

  Host: localhost:11311

  Content-Type: text/xml

  Content-length: 78

  circleArea

  2.41

  如果你沒學(xué)過HTTP協(xié)議,看上面的語句可能會感到陌生?!秷D解HTTP》這本小書可以讓你快速入門。

  HTTP報文比較簡單,它分兩部分,前半部分是頭部,后半部分是主體。

  頭部和主體之間用空行分開,這都是HTTP協(xié)議規(guī)定的標準。

  上面主體部分的格式就是XML,見的多了你就熟悉了。

  所以,XMLRPC傳遞的消息其實就是主體部分是XML格式的HTTP報文而已,沒什么神秘的。

71dcb1f6-8a26-11ed-bfe3-dac502259ad0.png

  對應(yīng)客戶端一個XMLRPC請求,服務(wù)器端會執(zhí)行它并返回一個響應(yīng),它也是一個HTTP報文,如下。

  它的結(jié)構(gòu)和請求一樣,不再解釋了。所以,XMLRPC跟我們上網(wǎng)瀏覽網(wǎng)頁的過程其實差不多。

  HTTP/1.1 200 OK

  Date: Sat, 06 Oct 2001 23:20:04 GMT

  Server: Apache.1.3.12 (Unix)

  Connection: close

  Content-Type: text/xml

  Content-Length: 124

  18.24668429131

  2.2.2 ROS中XMLRPC的實現(xiàn)

  上面的例子解釋了XMLRPC是什么?下面我們看看ROS是如何實現(xiàn)XMLRPC的。

  ROS使用的XMLRPC介紹在這里:

  http://wiki.ros.org/xmlrpcpp。這次ROS的創(chuàng)作者沒有從零開始造輪子,而是在一個已有的XMLRPC庫的基礎(chǔ)上改造的。

  XMLRPC的C++代碼在下載后的ros_comm-noetic-develutilitiesxmlrpcpp路徑下。

  還好,整個工程不算太大。XMLRPC分成客戶端和服務(wù)器端兩大部分。

  咱們先看客戶端,主要代碼在XmlRpcClient.cpp文件里。

  擒賊先擒王,XmlRpcClient.cpp文件中最核心的函數(shù)就是execute,用于執(zhí)行遠程調(diào)用,代碼如下。

  // Execute the named procedure on the remote server.

  // Params should be an array of the arguments for the method.

  // Returns true if the request was sent and a result received (although the result might be a fault).

  bool XmlRpcClient::execute(const char* method, XmlRpcValue const& params, XmlRpcValue& result)

  {

  XmlRpcUtil::log(1. "XmlRpcClient: method %s (_connectionState %s).", method, connectionStateStr(_connectionState));

  // This is not a thread-safe operation, if you want to do multithreading, use separate

  // clients for each thread. If you want to protect yourself from multiple threads

  // accessing the same client, replace this code with a real mutex.

  if (_executing)

  return false;

  _executing = true;

  ClearFlagOnExit cf(_executing);

  _sendAttempts = 0;

  _isFault = false;

  if ( ! setupConnection())

  return false;

  if ( ! generateRequest(method, params))

  return false;

  result.clear();

  double msTime = -1.0;   // Process until exit is called

  _disp.work(msTime);

  if (_connectionState != IDLE || ! parseResponse(result)) {

  _header = "";

  return false;

  }

  // close() if server does not supports HTTP1.1

  // otherwise, reusing the socket to write leads to a SIGPIPE because

  // the remote server could shut down the corresponding socket.

  if (_header.find("HTTP/1.1 200 OK", 0. 15) != 0) {

  close();

  }

  XmlRpcUtil::log(1. "XmlRpcClient: method %s completed.", method);

  _header = "";

  _response = "";

  return true;

  }

  它首先調(diào)用setupConnection()函數(shù)與服務(wù)器端建立連接。

  連接成功后,調(diào)用generateRequest()函數(shù)生成發(fā)送請求報文。

  XMLRPC請求報文的頭部又交給generateHeader()函數(shù)做了,代碼如下。

  // Prepend http headers

  std::string XmlRpcClient::generateHeader(size_t length) const

  {

  std::string header =

  "POST " + _uri + " HTTP/1.1

  "

  "User-Agent: ";

  header += XMLRPC_VERSION;

  header += "

  Host: ";

  header += _host;

  char buff[40];

  std::snprintf(buff,40.":%d

  ", _port);

  header += buff;

  header += "Content-Type: text/xml

  Content-length: ";

  std::snprintf(buff,40."%zu

  ", length);

  return header + buff;

  }

  主體部分則先將遠程調(diào)用的方法和參數(shù)變成XML格式,generateRequest()函數(shù)再將頭部和主體組合成完整的報文,如下:

  std::string header = generateHeader(body.length());

  _request = header + body;

  把報文發(fā)給服務(wù)器后,就開始靜靜地等待。

  一旦接收到服務(wù)器返回的報文后,就調(diào)用parseResponse函數(shù)解析報文數(shù)據(jù),也就是把XML格式變成純凈的數(shù)據(jù)格式。

  我們發(fā)現(xiàn),XMLRPC使用了socket功能實現(xiàn)客戶端和服務(wù)器通信。

  我們搜索socket這個單詞,發(fā)現(xiàn)它原始的意思是插座,如下。這非常形象,建立連接實現(xiàn)通信就像把插頭插入插座。

72442e94-8a26-11ed-bfe3-dac502259ad0.jpg

  雖說XMLRPC也是ROS的一部分,但它畢竟只是一個基礎(chǔ)功能,我們會用即可,暫時不去探究其實現(xiàn)細節(jié),

  所以對它的分析到此為止。下面我們來看節(jié)點是如何調(diào)用XMLRPC的。

  2.2.3 節(jié)點間通過XMLRPC建立連接

  在一個節(jié)點剛啟動的時候,它并不知道其它節(jié)點的存在,更不知道它們在交談什么,當然也就談不上通信。

  所以,它要先與master對話查詢其它節(jié)點的狀態(tài),然后再與其它節(jié)點通信。

  而節(jié)點與master對話使用的就是XMLRPC。

  從這一點來看,master叫節(jié)點管理器確實名副其實,它是一個大管家,給剛出生的節(jié)點提供服務(wù)。

  下面我們以兩個節(jié)點:talker和listener為例,介紹其通過XMLRPC建立通信連接的過程,如下圖所示。

726eb556-8a26-11ed-bfe3-dac502259ad0.jpg

  talker注冊

  假設(shè)我們先啟動talker。啟動后,它通過1234端口使用XMLRPC向master注冊自己的信息,包含所發(fā)布消息的話題名。master會將talker的注冊信息加入注冊列表中;

  2.listener注冊

  listener啟動后,同樣通過XMLRPC向master注冊自己的信息,包含需要訂閱的話題名;

  3.master進行匹配

  master根據(jù)listener的訂閱信息從注冊列表中查找,如果沒有找到匹配的發(fā)布者,則等待發(fā)布者的加入,如果找到匹配的發(fā)布者信息,則通過XMLRPC向listener發(fā)送talker的地址信息。

  4.listener發(fā)送連接請求

  listener接收到master發(fā)回的talker地址信息,嘗試通過XMLRPC向talker發(fā)送連接請求,傳輸訂閱的話題名、消息類型以及通信協(xié)議(TCP或者UDP);

  5.talker確認連接請求

  talker接收到listener發(fā)送的連接請求后,繼續(xù)通過XMLRPC向listener確認連接信息,其中包含自身的TCP地址信息;

  6.listener嘗試與talker建立連接

  listener接收到確認信息后,使用TCP嘗試與talker建立網(wǎng)絡(luò)連接。

  7.talker向listener發(fā)布消息

  成功建立連接后,talker開始向listener發(fā)送話題消息數(shù)據(jù),master不再參與。

  從上面的分析中可以發(fā)現(xiàn),前五個步驟使用的通信協(xié)議都是XMLRPC,最后發(fā)布數(shù)據(jù)的過程才使用到TCP。

  master只在節(jié)點建立連接的過程中起作用,但是并不參與節(jié)點之間最終的數(shù)據(jù)傳輸。

  節(jié)點在請求建立連接時會通過master.cpp文件中的execute()函數(shù)調(diào)用XMLRPC庫中的函數(shù)。

  我們舉個例子,加入talker節(jié)點要發(fā)布消息,它會調(diào)用topic_manager.cpp中的TopicManager::advertise()函數(shù),在函數(shù)中會調(diào)用execute()函數(shù),該部分代碼如下。

  XmlRpcValue args, result, payload;

  args[0] = this_node::getName();

  args[1] = ops.topic;

  args[2] = ops.datatype;

  args[3] = xmlrpc_manager_->getServerURI();

  master::execute("registerPublisher", args, result, payload, true);

  其中,registerPublisher就是一個遠程過程調(diào)用的方法(或者叫函數(shù))。節(jié)點通過這個遠程過程調(diào)用向master注冊,表示自己要發(fā)布發(fā)消息了。

  你可能會問,registerPublisher方法在哪里被執(zhí)行了呢?我們來到ros_comm-noetic-devel ools osmastersrc osmaster路徑下,打開master_api.py文件,然后搜索registerPublisher這個方法,就會找到對應(yīng)的代碼,如下。

  匆匆掃一眼就知道,它在通知所有訂閱這個消息的節(jié)點,讓它們做好接收消息的準備。

  你可能注意到了,這個被調(diào)用的XMLRPC是用python語言實現(xiàn)的。

  也就是說,XMLRPC通信時只要報文的格式是一致的,不管C++還是python語言,都可以實現(xiàn)遠程調(diào)用的功能。

  def registerPublisher(self, caller_id, topic, topic_type, caller_api):

  try:

  self.ps_lock.acquire()

  self.reg_manager.register_publisher(topic, caller_id, caller_api)

  # don't let '*' type squash valid typing

  if topic_type != rosgraph.names.ANYTYPE or not topic in self.topics_types:

  self.topics_types[topic] = topic_type

  pub_uris = self.publishers.get_apis(topic)

  sub_uris = self.subscribers.get_apis(topic)

  self._notify_topic_subscribers(topic, pub_uris, sub_uris)

  mloginfo("+PUB [%s] %s %s",topic, caller_id, caller_api)

  sub_uris = self.subscribers.get_apis(topic)

  finally:

  self.ps_lock.release()

  return 1. "Registered [%s] as publisher of [%s]"%(caller_id, topic), sub_uris

  2.3 master是什么?

  當我們在命令行中輸入roscore想啟動ROS的節(jié)點管理器時,背后到底發(fā)生了什么?我們先用Ubuntu的which命令找找roscore這個命令在什么地方,發(fā)現(xiàn)它位于/opt/ros/melodic/bin/roscore路徑下,如下圖。再用file命令查看它的屬性,發(fā)現(xiàn)它是一個Python腳本。

7297ecb4-8a26-11ed-bfe3-dac502259ad0.jpg

  2.3.1 roscore腳本

  我們回到自己下載的源碼:ros_comm文件夾中,找到roscore文件,它在 os_comm-melodic-devel ools oslaunchscripts路徑下。

  雖然它是個Python腳本,但是卻沒有.py后綴名。

  用記事本打開它,迎面第一句話是#!/usr/bin/env python,說明這還是一個python 2版本的腳本。

  我們發(fā)現(xiàn)這個roscore只是個空殼,真正重要的只有最后一行指令,如下

  import roslaunch

  roslaunch.main(['roscore', '--core'] + sys.argv[1:])

  2.3.2 roslaunch模塊

  2.3.2.1 XMLRPC服務(wù)器如何啟動?

  roscore調(diào)用了roslaunch.main,我們繼續(xù)追蹤,進到ros_comm-noetic-devel ools oslaunchsrc oslaunch文件夾中,發(fā)現(xiàn)有個__init__.py文件,說明這個文件夾是一個python包,打開__init__.py文件找到def main(argv=sys.argv),這就是roscore調(diào)用的函數(shù)roslaunch.main的實現(xiàn),如下(這里只保留主要的代碼,不太重要的刪掉了)。

  def main(argv=sys.argv):

  options = None

  logger = None

  try:

  from . import rlutil

  parser = _get_optparse()

  (options, args) = parser.parse_args(argv[1:])

  args = rlutil.resolve_launch_arguments(args)

  write_pid_file(options.pid_fn, options.core, options.port)

  uuid = rlutil.get_or_generate_uuid(options.run_id, options.wait_for_master)

  configure_logging(uuid)

  # #3088: don't check disk usage on remote machines

  if not options.child_name and not options.skip_log_check:

  rlutil.check_log_disk_usage()

  logger = logging.getLogger('roslaunch')

  logger.info("roslaunch starting with args %s"%str(argv))

  logger.info("roslaunch env is %s"%os.environ)

  if options.child_name:

  # 這里沒執(zhí)行到,就不列出來了

  else:

  logger.info('starting in server mode')

  # #1491 change terminal name

  if not options.disable_title:

  rlutil.change_terminal_name(args, options.core)

  # Read roslaunch string from stdin when - is passed as launch filename.

  roslaunch_strs = []

  # This is a roslaunch parent, spin up parent server and launch processes.

  # args are the roslaunch files to load

  from . import parent as roslaunch_parent

  # force a port binding spec if we are running a core

  if options.core:

  options.port = options.port or DEFAULT_MASTER_PORT

  p = roslaunch_parent.ROSLaunchParent(uuid, args, roslaunch_strs=roslaunch_strs, is_core=options.core, port=options.port, local_only=options.local_only, verbose=options.verbose,  force_screen=options.force_screen, force_log=options.force_log, num_workers=options.num_workers, timeout=options.timeout, master_logger_level=options.master_logger_level, show_summary=not options.no_summary, force_required=options.force_required, sigint_timeout=options.sigint_timeout, sigterm_timeout=options.sigterm_timeout)

  p.start()

  p.spin()

  roslaunch.main開啟了日志,日志記錄的信息可以幫我們了解main函數(shù)執(zhí)行的順序。

  我們?nèi)buntu的.ros/log/路徑下,打開roslaunch-ubuntu-52246.log日志文件,內(nèi)容如下。

72a4116a-8a26-11ed-bfe3-dac502259ad0.jpg

  通過閱讀日志我們發(fā)現(xiàn),main函數(shù)首先檢查日志文件夾磁盤占用情況,如果有剩余空間就繼續(xù)往下運行。

  然后把運行roscore的終端的標題給改了。

  再調(diào)用ROSLaunchParent類中的函數(shù),這大概就是main函數(shù)中最重要的地方了。

  ROSLaunchParent類的定義是在同一路徑下的parent.py文件中。為什么叫LaunchParent筆者也不清楚。

  先不管它,我們再看日志,發(fā)現(xiàn)運行到了下面這個函數(shù),它打算啟動XMLRPC服務(wù)器端。

  所以調(diào)用的順序是:roslaunch\__init__.py文件中的main()函數(shù)調(diào)用parent.pystart()函數(shù),start()函數(shù)調(diào)用自己類中的_start_infrastructure()函數(shù),_start_infrastructure()函數(shù)調(diào)用自己類中的_start_server()函數(shù),_start_server()函數(shù)再調(diào)用server.py中的start函數(shù)。

  def _start_server(self):

  self.logger.info("starting parent XML-RPC server")

  self.server = roslaunch.server.ROSLaunchParentNode(self.config, self.pm)

  self.server.start()

  我們再進到server.py文件中,找到ROSLaunchNode類,里面的start函數(shù)又調(diào)用了父類XmlRpcNode中的start函數(shù)。

  class ROSLaunchNode(xmlrpc.XmlRpcNode):

  """

  Base XML-RPC server for roslaunch parent/child processes

  """

  def start(self):

  logger.info("starting roslaunch XML-RPC server")

  super(ROSLaunchNode, self).start()

  我們來到ros_comm-noetic-devel ools osgraphsrc osgraph路徑,找到xmlrpc.py文件。找到class XmlRpcNode(object)類,再進入start(self)函數(shù),發(fā)現(xiàn)它調(diào)用了自己類的run函數(shù),run函數(shù)又調(diào)用了自己類中的_run函數(shù),_run函數(shù)又調(diào)用了自己類中的_run_init()函數(shù),在這里才調(diào)用了真正起作用的ThreadingXMLRPCServer類。

  因為master節(jié)點是用python實現(xiàn)的,所以,需要有python版的XMLRPC庫。

  幸運的是,python有現(xiàn)成的XMLRPC庫,叫SimpleXMLRPCServer。SimpleXMLRPCServer已經(jīng)內(nèi)置到python中了,無需安裝。

  所以,ThreadingXMLRPCServer類直接繼承了SimpleXMLRPCServer,如下。

  class ThreadingXMLRPCServer(socketserver.ThreadingMixIn, SimpleXMLRPCServer)

  2.3.2.2 master如何啟動?

  我們再來看看節(jié)點管理器master是如何被啟動的,再回到parent.pystart()函數(shù),如下。

  我們發(fā)現(xiàn)它啟動了XMLRPC服務(wù)器后,接下來就調(diào)用了_init_runner()函數(shù)。

  def start(self, auto_terminate=True):

  self.logger.info("starting roslaunch parent run")

  # load config, start XMLRPC servers and process monitor

  try:

  self._start_infrastructure()

  except:

  self._stop_infrastructure()

  # Initialize the actual runner.

  self._init_runner()

  # Start the launch

  self.runner.launch()

  init_runner()函數(shù)實例化了ROSLaunchRunner類,這個類的定義在launch.py里。

  def _init_runner(self):

  self.runner = roslaunch.launch.ROSLaunchRunner(self.run_id, self.config, server_uri=self.server.uri, ...)

  實例化完成后,parent.pystart()函數(shù)就調(diào)用了它的launch()函數(shù)。

  我們打開launch.py文件,找到launch()函數(shù),發(fā)現(xiàn)它又調(diào)用了自己類中的_setup()函數(shù),而_setup()函數(shù)又調(diào)用了_launch_master()函數(shù)。

  看名字就能猜出來,_launch_master()函數(shù)肯定是啟動節(jié)點管理器master的,它調(diào)用了create_master_process函數(shù),這個函數(shù)在nodeprocess.py里。

  所以我們打開nodeprocess.py,create_master_process函數(shù)使用了LocalProcess類。這個類繼承自Process類。nodeprocess.py文件引用了python中用于創(chuàng)建新的進程的subprocess模塊。

  create_master_process函數(shù)實例化LocalProcess類用的是’rosmaster’,即ros_comm-noetic-devel ools osmaster中的包。

  main.py文件中的rosmaster_main函數(shù)使用了master.py中的Master類。

  Master類中又用到了master_api.py中的ROSMasterHandler類,這個類包含所有的XMLRPC服務(wù)器接收的遠程調(diào)用,一共24個,如下。

  shutdown(self, caller_id, msg='')

  getUri(self, caller_id)

  getPid(self, caller_id)

  deleteParam(self, caller_id, key)

  setParam(self, caller_id, key, value)

  getParam(self, caller_id, key)

  searchParam(self, caller_id, key)

  subscribeParam(self, caller_id, caller_api, key)

  unsubscribeParam(self, caller_id, caller_api, key)

  hasParam(self, caller_id, key)

  getParamNames(self, caller_id)

  param_update_task(self, caller_id, caller_api, param_key, param_value)

  _notify_topic_subscribers(self, topic, pub_uris, sub_uris)

  registerService(self, caller_id, service, service_api, caller_api)

  lookupService(self, caller_id, service)

  unregisterService(self, caller_id, service, service_api)

  registerSubscriber(self, caller_id, topic, topic_type, caller_api)

  unregisterSubscriber(self, caller_id, topic, caller_api)

  registerPublisher(self, caller_id, topic, topic_type, caller_api)

  unregisterPublisher(self, caller_id, topic, caller_api)

  lookupNode(self, caller_id, node_name)

  getPublishedTopics(self, caller_id, subgraph)

  getTopicTypes(self, caller_id)

  getSystemState(self, caller_id)

  2.3.2.1 檢查log文件夾占用空間

  roslaunch這個python包還負責檢查保存log的文件夾有多大。在ros_comm-noetic-devel ools oslaunchsrc oslaunch\__init__.py文件中的main函數(shù)里,有以下語句。

  看名字就知道是干啥的了。

  rlutil.check_log_disk_usage()

  再打開同一路徑下的rlutil.py,發(fā)現(xiàn)它又調(diào)用了rosclean包中的get_disk_usage函數(shù)。

  我們發(fā)現(xiàn),這個函數(shù)里直接寫死了比較的上限:disk_usage > 1073741824.當然這樣不太好,應(yīng)該改為可配置的。

  數(shù)字1073741824的單位是字節(jié),剛好就是1GB(102 4 3 1024^31024 3byte)。

  我們要是想修改log文件夾報警的上限,直接改這個值即可。

  def check_log_disk_usage():

  """

  Check size of log directory. If high, print warning to user

  """

  try:

  d = rospkg.get_log_dir()

  roslaunch.core.printlog("Checking log directory for disk usage. This may take a while.

  Press Ctrl-C to interrupt")

  disk_usage = rosclean.get_disk_usage(d)

  # warn if over a gig

  if disk_usage > 1073741824:

  roslaunch.core.printerrlog("WARNING: disk usage in log directory [%s] is over 1GB.

  It's recommended that you use the 'rosclean' command."%d)

  else:

  roslaunch.core.printlog("Done checking log file disk usage. Usage is <1GB.")

  except:

  pass

  我們刨根問底,追查rosclean.get_disk_usage(d)是如何實現(xiàn)的。

  這個rosclean包不在ros_comm里面,需要單獨下載。

  打開后發(fā)現(xiàn)這個包還是跨平臺的,給出了Windows和Linux下的實現(xiàn)。

  如果是Windows系統(tǒng),用os.path.getsize函數(shù)獲取文件的大小,通過os.walk函數(shù)遍歷所有文件,加起來就是文件夾的大小。

  如果是Linux系統(tǒng),用Linux中的du -sb命令獲取文件夾的大小。哎,搞個機器人不僅要學(xué)習python,還得熟悉Linux,容易嗎?

  主節(jié)點會獲取用戶設(shè)置的ROS_MASTER_URI變量中列出的URI地址和端口號(默認為當前的本地IP和11311端口號)。

  3、時間

  不只是機器人,在任何一個系統(tǒng)里,時間都是一個繞不開的重要話題。下面我們就從萬物的起點——時間開始吧。

  ROS中定義時間的程序都在roscpp_core項目下的rostime中,見下圖。

  如果細分一下,時間其實有兩種,一種叫“時刻”,也就是某個時間點;

  一種叫“時段”或者時間間隔,也就是兩個時刻之間的部分。這兩者的代碼是分開實現(xiàn)的,時刻是time,時間間隔是duration。

  在Ubuntu中把rostime文件夾中的文件打印出來,發(fā)現(xiàn)確實有time和duration兩類文件,但是還多了個rate,如下圖所示。

72cdf5d4-8a26-11ed-bfe3-dac502259ad0.jpg

72e950a4-8a26-11ed-bfe3-dac502259ad0.png

  我們還注意到,里面有 CMakeLists.txt 和 package.xml 兩個文件,那說明rostime本身也是個ROS的package,可以單獨編譯。

  3.1 時刻time

  先看下文件間的依賴關(guān)系。跟時刻有關(guān)的文件是兩個time.h文件和一個time.cpp文件。

  time.h給出了類的聲明,而impl ime.h給出了類運算符重載的實現(xiàn),time.cpp給出了其它函數(shù)的實現(xiàn)。

  3.1.1 TimeBase基類

  首先看time.h文件,它定義了一個叫TimeBase的類。注釋中說,TimeBase是個基類,定義了兩個成員變量uint32_t sec, nsec,還重載了+ ++、? -?、< <<、> >>、= ==等運算符。

  成員變量uint32_t sec, nsec其實就是時間的秒和納秒兩部分,它們合起來構(gòu)成一個完整的時刻。

  至于為啥要分成兩部分而不是用一個來定義,可能是考慮到整數(shù)表示精度的問題。

  因為32位整數(shù)最大表示的數(shù)字是2147483647.如果我們要用納秒這個范圍估計是不夠的。

  你可能會問,機器人系統(tǒng)怎么會使用到納秒這么高精度的時間分辨率,畢竟控制器的定時器最高精度可能也只能到微秒?

  如果你做過自動駕駛項目,有使用過GPS和激光雷達傳感器的經(jīng)驗,就會發(fā)現(xiàn)GPS的時鐘精度就是納秒級的,它可以同步激光雷達每個激光點的時間戳。

  還有,為什么定義TimeBase這個基類,原因大家很容易就能猜到。

  因為在程序里,時間本質(zhì)上就是一個數(shù)字而已,數(shù)字系統(tǒng)的序關(guān)系(能比較大小)和運算(加減乘除)也同樣適用于時間這個東西。

  當然,這里只有加減沒有乘除,因為時間的乘除沒有意義。

  3.1.2 Time類

  緊接著TimeBase類的是Time類,它是TimeBase的子類。我們做機器人應(yīng)用程序開發(fā)時用不到TimeBase基類,但是Time類會經(jīng)常使用。

  3.1.3 now()函數(shù)

  Time類比TimeBase類多了now()函數(shù),它是我們的老熟人了。在開發(fā)應(yīng)用的時候,我們直接用下面的代碼就能得到當前的時間戳:

  ros::Time begin = ros::now(); //獲取當前時間

  now()函數(shù)的定義在rostimesrc ime.cpp里,因為它很常用很重要,筆者就把代碼粘貼在這里,如下。

  函數(shù)很簡單,可以看到,如果定義了使用仿真時間(g_use_sim_time為true),那就使用仿真時間,否則就使用墻上時間。

  Time Time::now()

  {

  if (!g_initialized)

  throw TimeNotInitializedException();

  if (g_use_sim_time)

  {

  boost::mutex::scoped_lock lock(g_sim_time_mutex);

  Time t = g_sim_time;

  return t;

  }

  Time t;

  ros_walltime(t.sec, t.nsec);

  return t;

  }

  在ROS里,時間分成兩類,一種叫仿真時間,一種叫墻上時間。

  顧名思義,墻上時間就是實際的客觀世界的時間,它一秒一秒地流逝,誰都不能改變它,讓它快一點慢一點都不可能,除非你有超能力。

  仿真時間則是可以由你控制的,讓它快它就快。之所以多了一個仿真時間,是因為有時我們在仿真機器人希望可以自己控制時間,例如為了提高驗證算法的效率,讓它按我們的期望速度推進。

  在使用墻上時間的情況下,now()函數(shù)調(diào)用了ros_walltime函數(shù),這個函數(shù)也在rostimesrc ime.cpp里。

  剝開層層洋蔥皮,最后發(fā)現(xiàn),這個ros_walltime函數(shù)才是真正調(diào)用操作系統(tǒng)時間函數(shù)的地方,而且它還是個跨平臺的實現(xiàn)(Windows和Linux)。

  如果操作系統(tǒng)是Linux,那它會使用clock_gettime函數(shù),在筆者使用的Ubuntu 18.04系統(tǒng)中這個函數(shù)在usrinclude路徑下。

  但是萬一缺少這個函數(shù),那么ROS會使用gettimeofday函數(shù),但是gettimeofday沒有clock_gettime精確,clock_gettime能提供納秒的精確度。

  如果操作系統(tǒng)是Windows,那它會使用標準庫STL的chrono庫獲取當前的時刻,要用這個庫只需要引用它的頭文件,所以在time.cpp中引用了#include。

  具體使用的函數(shù)就是

  std::now().time_since_epoch()。

  當然,時間得是秒和納秒的形式,所以用了count方法:

  uint64_t now_ns = std::duration_cast

  (std::now().time_since_epoch()).count();

  3.1.4 WallTime類

  后面又接著聲明了WallTime類和SteadyTime類。

  3.2 時間間隔duration

  時間間隔duration的定義與實現(xiàn)跟time時刻差不多,但是Duration類里的sleep()延時函數(shù)是在time.cpp里定義的,其中使用了Linux操作系統(tǒng)的nanosleep系統(tǒng)調(diào)用。

  這個系統(tǒng)調(diào)用雖然叫納秒,但實際能實現(xiàn)的精度也就是幾十毫秒,即便這樣也比C語言提供的sleep函數(shù)的精度好多了。如果是Windows系統(tǒng)則調(diào)用STL chrono函數(shù):

  std::sleep_for(std::nanoseconds(static_cast(sec * 1e9 + nsec)));

  3.3 頻率rate

  關(guān)于Rate類,聲明注釋中是這么解釋的“Class to help run loops at a desired frequency”,也就是幫助(程序)按照期望的頻率循環(huán)執(zhí)行。

  我們看看一個ROS節(jié)點是怎么使用Rate類的,如下圖所示的例子。

73392e44-8a26-11ed-bfe3-dac502259ad0.jpg

  首先用Rate的構(gòu)造函數(shù)實例化一個對象loop_rate。調(diào)用的構(gòu)造函數(shù)如下。

  可見,構(gòu)造函數(shù)使用輸入完成了對三個參數(shù)的初始化。

  然后在While循環(huán)里調(diào)用sleep()函數(shù)實現(xiàn)一段時間的延遲。

  既然用到延遲,所以就使用了前面的time類。

  Rate::Rate(double frequency)

  : start_(Time::now())

  , expected_cycle_time_(1.0 / frequency)

  , actual_cycle_time_(0.0)

  { }

  3.4 總結(jié)

  至此rostime就解釋清楚了??梢钥吹?,這部分實現(xiàn)主要是依靠STL標準庫函數(shù)(比如chrono)、BOOST庫(date_time)、Linux操作系統(tǒng)的系統(tǒng)調(diào)用以及標準的C函數(shù)。這就是ROS的底層了。

  說句題外話,在百度Apollo自動駕駛項目中也受到了rostime的影響,其cyber ime中的Time、Duration、Rate類的定義方式與rostime中的類似,但是沒有定義基類,實現(xiàn)更加簡單了。


標簽: 機器人

點贊

分享到:

上一篇:鋰離子電池正極材料有哪些類別?

下一篇:機器狗為何設(shè)計成四足?四足...

中國傳動網(wǎng)版權(quán)與免責聲明:凡本網(wǎng)注明[來源:中國傳動網(wǎng)]的所有文字、圖片、音視和視頻文件,版權(quán)均為中國傳動網(wǎng)(www.treenowplaneincome.com)獨家所有。如需轉(zhuǎn)載請與0755-82949061聯(lián)系。任何媒體、網(wǎng)站或個人轉(zhuǎn)載使用時須注明來源“中國傳動網(wǎng)”,違反者本網(wǎng)將追究其法律責任。

本網(wǎng)轉(zhuǎn)載并注明其他來源的稿件,均來自互聯(lián)網(wǎng)或業(yè)內(nèi)投稿人士,版權(quán)屬于原版權(quán)人。轉(zhuǎn)載請保留稿件來源及作者,禁止擅自篡改,違者自負版權(quán)法律責任。

網(wǎng)站簡介|會員服務(wù)|聯(lián)系方式|幫助信息|版權(quán)信息|網(wǎng)站地圖|友情鏈接|法律支持|意見反饋|sitemap

中國傳動網(wǎng)-工業(yè)自動化與智能制造的全媒體“互聯(lián)網(wǎng)+”創(chuàng)新服務(wù)平臺

網(wǎng)站客服服務(wù)咨詢采購咨詢媒體合作

Chuandong.com Copyright ?2005 - 2024 ,All Rights Reserved 版權(quán)所有 粵ICP備 14004826號 | 營業(yè)執(zhí)照證書 | 不良信息舉報中心 | 粵公網(wǎng)安備 44030402000946號