Friday, October 19, 2018

Hadoop vs. Spark


Hadoop Ecosystem

Apache Hadoop Ecosystem

OOZIE: Scheduling
ZooKeeper: Management & Coordination
HBase: NoSQL Database
HIVE: Analytical SQL on Haddop

Hadoop platform
  • HDFS
  • YARN
  • MapReduce (file based)<=> Spark

Hadoop vs. Spark

(If Spark needs to read from HDFS, remote network bandwidth will be the bottleneck, sometimes worse than Hadoop => can hybrid MapReduce with Spark, Hadoop shuffle Spark algorithm)
For streamming, machine learning, Spark > Hadoop (lots of iteration)
For batch process, it depends
Spark cluster is more expensive than Hadoop cluster
Spark application hard to scale and need Spark expert to tune for speed
PB data, MapReduce is stable and good enough

Machine Learning & Algorithm

Linear regression
Logistic regression
Decision Tree
Neural network
Word2Vector

Weighted random sampling
Combination
Page Rank
N-Gram

Friday, October 12, 2018

纪录产品release前混乱的一天


一大清早。。。在家芭蕾训练。。。不能让办公室屎一般的事情影响锻炼

刚到办公室,立刻被犹太老头叫走说有个问题想确定一下,然后叨叨叨的阐述一个他对某段 code 的理解,听了半天不知道他的问题在哪里。。。也许人家就是想叨叨一下。。。

紧接着release前最后一次会议,确定最后这一天还能做的修修补补,会议期间发现俄国大姐本应提交一个重要fix但是居然没有提交(俄国大姐早已进入养老阶段,为什么会提交代码呢。。。?) project manager 火了,说了一句PM最不能说的一句话:我觉得这个问题很 easy 为什么你就不修好。。。全体dev沉默。。。本小姐不得不挺身而出:我帮实习生看完代码就跟俄国大姐一起修这个bug。会议勉强和平结束。

接下来,找实习生code review,跟犹太人讨论今早没有说完的问题,安抚俄国大姐情绪(”PM怎么@#$@$%#。。对了ZZ你给我一点 clue 吧去哪里找 bug 。。。“)三条 threads 同时在我的脑子和 slack 上进行着。

下午,一个申请人最后 onsite interview 来了。因为别人的时间安排一改再改,分给我的面试时间也被动一改再改,最后被迫从30分钟延长到1个小时,才等来姗姗来迟的老总接着面试。各种尬聊之后,终于回到电脑前,还好实习生最后的 refactor 通过,俄国大姐居然找到了bug,犹太老头不知道到底有没有从我这里得到答案,反正正在修改过程之中。。。一个小时后,居然全部搞定。。。提交 build 检查最后的 release notes 。。。

结束。脑瘫。

为什么我们产品还有客户,一定是更大的脑残。

Wednesday, October 10, 2018

Hadoop vs. SQL Comparison


Hadoop: Schema on Read
SQL: Schema on Write

Hadoop: Compressed files across multiple nodes in a cluster
SQL: relational databases and tables

On the event of a node failure
Hadoop: provide an immediate answer to the user, eventual consistency
SQL: hold up the entire response to the user, complete consistency -- two-phase commit

HIVE:
mimic SQL syntax to perform Hadoop

Tuesday, October 9, 2018

Memcached vs. Redis?


Memcached vs. Redis?

Redis is more powerful, more popular, and better supported than Memcached.
For anything new, use Redis.

Memcached: When you restart Memcached your data is gone.
Redis: You can turn off persistence and it will happily lose your data on restart too. If you want your cache to survive restarts it lets you do that as well. In fact, that's the default.

Memcached: limited to string
Redis: many Data Types

Sunday, October 7, 2018

My Regret...


People always say that “It's better to regret what you have done than what you haven't.”
Oh well, I regret that I did, and am still doing, something so wrong 4 years ago, and every second nowadays I am burning with my regrets.

My soul is weary with sorrow; strengthen me according to your word.

Thursday, October 4, 2018

小白学习 Apache Spark



Advanced Apache Spark Training - Sameer Farooqui (Databricks)

Scheduling, Monitoring, Distributing

Spark Ecosystem

  • Spark Core (Memory + Disk)
  • Data frames API: Scala, Java, Python, R
  • Higher level libraries:
    • Spark SQL
    • Streaming
    • MLib, GraphX -> ML library
    • BlinkDB (sample aggregation based on confidence level, instead of the whole dataset)
    • Tachyon -> memory-based distributed storage system, data sharing across cluster frameworks
  • Resource Manager:
    • standalone scheduler
    • YARN
    • local node
    • Mesos (expanded comparison)
  • Outer Layer:
    • HDFS
    • MongoDB
    • RDBMS
    • Cassandra
Interactive Shell (Scala or Python) -- Driver

RDD

(Resilient Distributed Dataset)

1000 - 10,000 partition normally
each partition requires a thread of computation

An RDD can be created in 2 ways:
1) Parallelize a collection (data in the memory of JVM, only for prototyping)
2) Read data from an external source (S3, HDFS...) --- connection open/close only once

DAG (Directed acyclic graph)
Building DAG is just metadata.
Only when action is called, materialize the DAG

According to DAG, strategically cache intermedia RDD (also lazy, only when a downstream action triggers the RDD cache, the cache starts)

External data/ collection -> BaseRDD (input RDD) -> Transformation, PipeLined Stage -> launch action such as count(), collect() to kick off a parallel computation, which is then optimized and executed by Spark

Transformations: filter()...
Actions: collect(), saveToCassandra(), count()...

Type of RDDs: HadoopRDD, FilteredRDD, JoinedRDD, ShuffledRDD, CassandraRDD (optimized for Cassandra, Cassandra connector)...

Spark Runtime Architecture Integration with Resource Managers

Local, Standalone Scheduler -> Static Partitioning (fixed number of executors during Spark job)
YARN, Mesos -> Dynamic Partitioning (number of executors can grow/shrink live during the job)

MapReduce vs. Spark

MapReduce:
[Master]: JobTracker ("brain"), NameNode (heartbeats to data nodes)
[Nodes]:
TaskTracker: several JVMs for each machine, either Map or Reduce
DataNode: response to NameNode, kind of like HDFS
Spark:
One JVM for each machine. Several slots where tasks can run.

Get parallelism in MapReduce: having processes IDs, JVMs
Get parallelism in Spark: having multiple tasks running JVM

MapReduce Slot: hard-coded, either Map or Reduce, takes time to build/ tear down a JVM
Spark Slot: generic, can convert, more efficient to reuse a slot

Local Mode

JVM: Executor + Driver

RDDs
Cores (slots): number of simultaneous tasks
-- in general, number of tasks threads = core number * 2 or 3
Internal Threads

Do not need a worker JVM in local mode.

Spark Standalone

SAPRK_LOCAL_DIRS
Local Dirs:
1) when RDD is persistent with memory and disk, and one of the partitions of the RDD has to be spilled down to a local disk -- to speed up
2) Intermedia shuffle data

Resource Management Layer:
Spark Master JVM
Worker JVM

For each Spark application:
Driver -> talk with Master JVM, Master JVM acts as a schedule and notifies each Worker JVM -> Work JVM spin up an Executor
Executor contains RDD, tasks...(Worker core: how many cores that worker JVM can give out to underlying executor)
If an Executor crashes, Worker JVM will restart; if a Worker JVM crashes, Master JVM will restart.

High Availability on Spark with Zookeeper

One Worker can start several executors for different Drivers (Applications)

Applications run in FIFO model
Application CORE # and allocated memory should roughly match
(Cores per application, Memory per executor)

Spark UI:
Jobs -- applications
Stages -- jobs decomposed into one or more stages, a stage decomposed into one or more tasks
Storage -- RDD caches, memory persistence (caches are normally 2-3 times more than data size in HDFS)
Environment -- config: default settings in Scala source code <  spark environment config file < spark submission < spark source code of spark context
Executors -- thread dump, can be used for profiling

YARN

(Yet Another Resource Negotiator)
  • Resource Manager (Scheduler+ Apps Master) -- High Availablity via ZooKeeper
  • Node Manager (hearting to resources with Resource Manager with bandwidth/ cores occupation information)
Client -> Resource Manager -> Start an App Master in a Node Manager -> Resource Manager gives keys and tokens to App Master to find Containers -> App Master return back to Client
Client Driver also has a Scheduler

Dynamic Allocation
min, max, time-outs

Spark Central MasterWho starts Executors?Tasks run in
Local[none]manuallyexecutor
StandaloneStandalone MasterWorker JVMexecutor
YARNYARN App MasterNode Managerexecutor
MesosMesos MasterMesos Slaveexecutor


Memory and Persistence

Recommended to use < 75% memory for Spark
Executor heap size >= 8GB
Memory usage is affected by storage level and serialization format

1. Persistence in memory
    The entire swapped out partition will evaporate
    (LRU cache); if an executor dies, DAG will create if it is used again
       - Deserialization mode
       - Serialization mode -- save heap space; choose a faster serialization library (Kryo)
2. Persistence in memory + disk
    The entire swapped out partition will go to LOCAL_DIR
3. Persistence in disk

Tachyon: off-heap storage

If RDD fits in memory, choose MEMORY_ONLY
If not, use MEMORY_ONLY_SER with fast serialization library
Don't' spill to disk unless functions that computed the datasets are very expensive or they filter a large amount of date (recomputing from DAG may be as fast as reading from disk)
Use replicated storage levels sparingly and only if you want fast fault recovery (maybe to serve requests from a web app)

Intermediate data is automatically persisted during shuffle operations

PySpark: always serialize with Pickle library

Default Memory Allocation in Executer JVM:
Cached RDDs: 60%
Shuffle memory: 20%
User programs (remainder): 20%
If Executor crashes (out of memory), need to figure out which part.

Cost of GC is proportional to the number of Java objects (not with the size of the object)
GC: Parallel, CMS, G1

Jobs -> Stages -> Tasks






















When .collect() is called, a job is created. A job can be split into several stages, each stage into several tasks.
Lineage

Narrow Dependencies
Each partition of the parent RDD is used by at most one partition of the child RDD
e.g. map, filter, union, join with inputs co-partitioned (the same way to hash)
Wide Dependencies
Where multiple child partitions may depend on it
e.g. groupByKey, join with inputs not co-partitioned



Broadcast Variables and Accumulators
PySpark
Shuffle
Spark Streaming

Kubernetes in 5 mins


Kubernetes in 5 mins

讲得不能再清楚简单了。。。
"Desired State Management"

- K8s cluster services (scheduling services)
- Worker: hosting container

Feed application YAML file (configuration) into K8s cluster service API

YAML:
Pods, Running Containers, Replica, Port, etc.

为什么我们做分布式使用Redis?Redis实现 Distributed lock


为什么我们做分布式使用Redis?

Redis is single-threaded.
Memory-based.

Types: String, Hash, List, Set, Sorted Set

Expiration:
Redis 采用的是定期删除(random scan and delete expired key)  + 惰性删除策略
在 redis.conf 中有一行配置:
# maxmemory-policy allkeys-lru

有强一致性要求的数据,不能放缓存
先更新数据库,再删缓存

同时有多个子系统去 Set 一个 Key?
Method1: multi, discard, exec
Method2: SETNX, GETSET
*(Redis本身是单线程不需要锁,但是Redis是implement distributed lock的方式之一)

Redis事务和锁的应用详解

multi -- sql: start transaction
...
discard -- sql: rollback transaction
exec -- sql: submit transaction

Redis实现分布式共享锁

SETNX key value 
当且仅当 key 不存在,将 key 的值设为 value ,并返回1
若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0

GETSET key value 
将给定 key 的值设为 value ,并返回 key 的旧值 (old value)
当 key 存在但不是字符串类型时,返回一个错误
当key不存在时,返回nil

GET key
返回 key 所关联的字符串值
如果 key 不存在那么返回特殊值 nil

DEL key
删除给定的一个或多个 key
不存在的 key 会被忽略

分布式锁的问题

1:必要的超时机制:获取锁的客户端一旦崩溃,一定要有过期机制,否则其他客户端都降无法获取锁,造成死锁问题。
2:分布式锁,多客户端的时间戳不能保证严格意义的一致性,所以在某些特定因素下,有可能存在锁串的情况。要适度的机制,可以承受小概率的事件产生。
3:只对关键处理节点加锁,良好的习惯是,把相关的资源准备好,比如连接数据库后,调用加锁机制获取锁,直接进行操作,然后释放,尽量减少持有锁的时间。
4:在持有锁期间要不要CHECK锁,如果需要严格依赖锁的状态,最好在关键步骤中做锁的CHECK检查机制,但是根据我们的测试发现,在大并发时,每一次CHECK锁操作,都要消耗掉几个毫秒,而我们的整个持锁处理逻辑才不到10毫秒,玩客没有选择做锁的检查。
5:sleep学问,为了减少对Redis的压力,获取锁尝试时,循环之间一定要做sleep操作。但是sleep时间是多少是门学问。需要根据自己的Redis的QPS,加上持锁处理时间等进行合理计算。
6:至于为什么不使用Redis的muti,expire,watch等机制,可以查一参考资料,找下原因。

Tuesday, October 2, 2018

存几个有用的学习资料


存几个有用的学习资料吧:

(Allegedly 2.5 hours introduction)


NumPy
Pandas provides high level data manipulation tools built on top of NumPy. 
NumPy by itself is a fairly low-level tool, and will be very much similar to using MATLAB.
Pandas on the other hand provides rich time series functionality, data alignment, NA-friendly statistics, groupby, merge and join methods, and lots of other conveniences. It has become very popular in recent years in financial applications.


Most Recent Posts