Scala中的IO操作及ArrayBuffer執行緒安全問題

通過Scala對文件進行讀寫操作在實際業務中應用也比較多,這裡介紹幾種常用的方式,直接上程式碼:

1. 從文件中讀取內容

object Main {
  
  def loadData(): Array[String] = {
    var bs: BufferedSource = null
    var in: InputStream = null
    try {
      in = Main.getClass.getClassLoader.getResourceAsStream("data.txt")
      if (in == null) {
        in = new FileInputStream(new File("data.txt"))
      }
      bs = new BufferedSource(in)
      bs.getLines().toArray
    } finally {
      bs.close()
    }
  }
  
  //直接通過scala.io.Source進行讀取
  def testSource(): Unit = {
    Source.fromFile("data.txt").foreach(println)
  }

}

 

2. 向文件中寫內容

def write(): Unit ={
     //調用的就是java中的io類
    val writer = new PrintWriter(new File("write.txt" ))
    writer.write("scala write")
    writer.close()
}

 

除了上述讀寫方式,也可以從”螢幕”上讀取用戶輸入的指令來處理程式:

import scala.io. StdIn
def printIn(): Unit = {
    print("please enter number :")
    val line = StdIn.readLine()
    println(s"number is : $line")
}

 

相信使用Scala進行應用開發時,ArrayBuffer是經常使用的數組。對ArrayBuffer進行新增元素時,通常使用方法:+=。但是該方法並非執行緒安全,如果在多執行緒環境使用該方法,由於並發問題,很容報索引越界異常。

下述模擬多執行緒向定義的ArrayBuffer中並發插入100個元素:

def arrBuffer(): Unit = {
   //默認初始容量為16
   val arrayBuffer = new ArrayBuffer[Int]()

   val executors = Executors.newFixedThreadPool(100)

   for (i <- 1 to 100) {
     executors.execute(new Runnable {
       override def run(): Unit = {
         arrayBuffer += i
       }
     })
   }

   executors.shutdown()
 }

 

執行上述程式,報出類似如下的索引越界問題:

java.lang.ArrayIndexOutOfBoundsException: 32
    at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:85)
    at Main$$anonfun$main$1$$anon$1.run(Main.scala:24)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

 

來看一下ArrayBuffer的+=實現源碼:

//初始容量
protected def initialSize: Int = 16
//array默認長度為16
protected var array: Array[AnyRef] = new Array[AnyRef](math.max(initialSize, 1))
//元素個數,默認0
protected var size0: Int = 0
  
def +=(elem: A): this.type = {
    ensureSize(size0 + 1)
    array(size0) = elem.asInstanceOf[AnyRef]
    size0 += 1
    this
}

 

val arrayBuffer = new ArrayBuffer[Int]():初始容量為16,並發情況下當array長度為16,但是size0已經大於16,並且array沒有及時擴容時,就會報索引越界。

所以,在並發環境下,要注意調用該方法時的執行緒安全問題,比如利用synchronized做鎖處理。

這裡只是以ArrayBuffer為例,對於Scala中其他的集合使用時也要注意,防止類似問題的出現影響程式的正常運行。


 

關注微信公眾號:大數據學習與分享,獲取更對技術乾貨