Tuesday, November 27, 2018

Types of NoSQL

There are 4 basic types of NoSQL databases:


Key-Value Store


– It has a Big Hash Table of keys & values
e.g. MemcacheDB, Redis, Amazon Dynamo

The schema-less format of a key value database like Redis is just about what you need for your storage needs. The key can be synthetic or auto-generated while the value can be String, JSON, BLOB (basic large object) etc.
CAP: Key value stores are great around the Availability and Partition aspects but definitely lack in Consistency.


Document-based Store


- It stores documents made up of tagged elements
e.g.  MongoDB, CouchDB

The data which is a collection of key value pairs is compressed as a document store quite similar to a key-value store, but the only difference is that the values stored (referred to as “documents”) provide some structure and encoding of the managed data. XML, JSON (Java Script Object Notation), BSON (which is a binary encoding of JSON objects) are some common standard encodings.

One key difference between a key-value store and a document store is that the latter embeds attribute metadata associated with stored content, which essentially provides a way to query the data based on the contents.


Column-based Store


- Each storage block contains data from only one column
e.g. HBase, Cassandra

In column-oriented NoSQL database, data is stored in cells grouped in columns of data rather than as rows of data. Columns are logically grouped into column families. Column families can contain a virtually unlimited number of columns that can be created at runtime or the definition of the schema. Read and write is done using columns rather than rows.

In comparison, most relational DBMS store data in rows, the benefit of storing data in columns, is fast search/ access and data aggregation. Relational databases store a single row as a continuous disk entry. Different rows are stored in different places on disk while Columnar databases store all the cells corresponding to a column as a continuous disk entry thus makes the search/access faster.

For example: To query the titles from a bunch of a million articles will be a painstaking task while using relational databases as it will go over each location to get item titles. On the other hand, with just one disk access, title of all the items can be obtained.

Data Model


ColumnFamily: ColumnFamily is a single structure that can group Columns and SuperColumns with ease.
Key: the permanent name of the record. Keys have different numbers of columns, so the database can scale in an irregular way.
Keyspace:  This defines the outermost level of an organization, typically the name of the application. Kind of like schema in RDBM.
Column:  It has an ordered list of elements aka tuple with a name and a value defined.


Graph-based


- A network database that uses edges and nodes to represent and store data.
e.g. Neo4J

These databases that use edges and nodes to represent and store data.
These nodes are organized by some relationships with one another, which is represented by edges between the nodes.
Both the nodes and the relationships have some defined properties.

Wednesday, November 14, 2018

Introduction to Mesos Architecture



Mesos consists of
1) a master daemon that manages agent daemons running on each cluster node

Allocation policy module
Enables fine-grained sharing of resources (CPU, RAM, …) across frameworks by making them resource offers.
Each resource offer contains a list of <agent ID, resource1: amount1, resource2: amount2, ...>

2) Mesos frameworks that run tasks on these agents.

Mesos framework consists of:
a scheduler that registers with the master to be offered resources
an executor process that is launched on agent nodes to run the framework’s tasks




1. Mesos slave reports available resources to Mesos master.
2. Based on allocation policy module Mesos master decides which framework to allocate these resources to. For example, It allocated to Framework 1.
3. Framework1 is free to accept/deny offered resources. For example, it accepts the offer
4. Master sends the tasks to the slave and Framework1 executor takes over. Mesos master may allocate any unused resource to other frameworks.

Two-Level Scheduling


Allocation Module decide resources for each framework
Framework Scheduler decide resources for each task

How can the constraints of a framework be satisfied without Mesos knowing about these constraints? For example, how can a framework achieve data locality without Mesos knowing which nodes store the data required by the framework? Mesos answers these questions by simply giving frameworks the ability to reject offers.

Scheduling Algorithm


Dominant Resource Fairness Algorithm (DRF)
Min-Max Fairness Algorithm --- To achieve fairness, the min of requirement gets higher priority
DRF is a Min-Max Fairness Algorithm for heterogeneous resources
   - CPU
   - Memory
   - IO

Similar Systems


YARN, Kubernetes, Docker Swarm

Sunday, November 4, 2018

RabbitMQ vs. Apache Kafka

Understanding When to use RabbitMQ or Apache Kafka

RabbitMQ vs Kafka

Introduction to HBase

What is HBase 

HBase is a column-oriented database management system (NoSQL) that runs on top of HDFS.

High consistency
Fast scan operation
相比较下,不是那么available... (e.g. Major Compaction)
适合海量数据存储(如果数据⽐较⼩,使⽤HBase, 性能会⼤⼤降低。)
不适合OLTA这种需要快速响应的逻辑事物。

Architecture Design Strategy 


HBase Structure 

Master-Slave模式,有3种⻆⾊。
  • Hmaster: A cluster has one active Hmaster
Hmaster 跑在NameNode上⾯,监测所有Region Server的状态,处理所有 metadata 的更改 (Create/delete/update table),分配region, 监测region server压⼒的状态,如果某个 region server压⼒⽐较⼤,会将其分割,分配到其他的region server上,从⽽实现 loading balancing。
Hmaster是⼀个轻量级的master, 因为不处理数据本身,只处理⼀些元数据。Hmaster在 ⼀个cluster中,只有⼀个active Hmaster, 但会有多个back up Hmaster, 这就有了 redundancy.
  • Region Server: Handles all I/O requests,⼀个cluster有多个Region Server. 
运⾏在Hadoop的Data Node上,存储实际的数据,得到很⾼的data locality, 处理所有的 I/O请求,⼀个Region Server可以处理1000个左右的region. 
Region
Table的⽔平分割的产物,default 1GB in size。⼀个region包含了⽔平分割的start key和end key, ⼀个 region最⼤可以有1GB的size, Hmaster会将⼀个Region分配给⼀个Region Server, ⼀个Region Server上也会有多个regions, 任何的读写数据都会直接由Region Server 来处理。
  • Zookeeper: 协调整个cluster. 
协调整个cluster, 选举Hmaster, 节点的注册。
Zookeeper存储了location of META(B-Tree), META holds每个 Region的位置信息。

Read/Write

Client⼀次读或者写的时候: ⾸先访问Zookeeper, 得到META的位置. Client请求META Table所在的Region Server, 得到读写那个Region真正的Region Server位置, 这个信息还会放到Client的cache中。 Client会去那个真正的Region上,去读或者写。以后的操作,Client不会去找 Zookeeper了,会使⽤⾃⼰的cache.

Region Server Components

WAL: Write ahead log, 位于HDFS,为了加快写操作,client只要把数据的操作写⼊WAL log后, 就算是写⼊成功。 可以用于 WAL recovery来恢复丢失的 Memstore
BlockCache: 读操作的Cache, 存储最新的读数据,现在⼀般是使⽤LRU算法。
Memstore: 写操作的Cache, 存储最新写⼊的数据。
HFile: Memstore满了后,将数据⼀次性写 ⼊HFile.(sequecial write)


Read :
⾸先在BlockCache中寻找。
之后再MemStore中寻找。
最后去HFile中寻找。

优化:减少HFile的数量
Minor Compaction, 将多个 HFile merge成稍⼤的HFile。
或者Major Compaction, 将多个HFile合并成⼀个⾮常⼤的 HFile, 需要⼤量的I/O, Region Server处于不可⽤的状态。

Data Model


An HBase column represents an attribute of an object

Row: ⼀⾏数据。
Column Family
Column

HBase allows for many attributes to be grouped together into what is known as column families, such that the elements of a column family are all stored together.
With HBase, you must predefine the table schema and specify the column families.

Versioning 

可以存多份不同版本的数据,适合回溯⼀些数据,HBase的读操作是针对最新版本的 value, 核⼼数据要存在最新的version中。

Saturday, November 3, 2018

Big Data Infrastructure

Frameworks and Tools


Computation

Hadoop, Spark, Samza, Flink, Hive, Pig, Drill, etc

Transportation

Kafka, Flume, Sqoop, Scribe, RabbitMQ, ZeroMQ, IronMQ, etc

Storage

HBase, Cassandra, CouchDB, MongoDB, etc

Coordination

Zookeeper, Consul, Etcd, Eureka, etc

Scheduling

Mesos, Yarn, Oozie, etc

Common Data Infrastructure


Data Ingestion Layer


  • High throughput
  • Simple processing logic, merely a pass through
  • Cannot serve as a storage layer

Data Storage Layer

(Operational Store [Indexed] + File System [Un-Indexed])

  • High availability
  • Fault tolerance
  • Handles high data volume
  • Able to handle various type of data
OLTP vs OLAP
"Online transaction processing" vs "Online analytical processing"

Data Processing Layer

Introduction to Kafka

What is Kafka 

Kafka是⼀个open source的分布式的messaging system.

特点:

  • 快,可以⽀持⼏百兆每秒的数据,以及上千个client发送数据。 
  • 快速扩展。
  • 数据都会在磁盘中持久化。

Design Strategy


Defensive Design

Producer 发消息的⼀⽅,使⽤ push model. (built-in retry logic, exactly one delivery)
Consumer 主动从kafka上收消息,使⽤ pull model. Consumer keeps state (Offset). Consumers need to send heartbeats to the Group Coordinator to be considered alive

Push model: 主动去发消息。⽐较⾼的throughput, 处理⽐较复杂的server logic.
Pull model: 主动去收消息。⽐较简单的server logic, ⽀持replay消息。

Kafka把消息按照topic进⾏分类,物理上,topic由partition组成,⼀个partition可以简单 的看成array, ⼀个partition只能属于⼀个topic。
Topics in Kafka are always multi-subscriber (single or multiple consumers)

Offset 类似于array的index, 接受者通过offset去定位某个partition上⾯的消息。

Kafka所有的消息是存在log file中
每个 message 有自己的 Offset

Log File Retention

Kafka有可配置的⾃⼰的retention机制:
Time-based, 例如,过了7天的message就被删除。
Size-based, 例如,⼤于1GB的就会被删除。

在读写时,sequential access

Kafka Message Format:每条 message 有一定的 overhead,传送 small size data 不划算

How to manage schema:
AVRO file
AVRO schema (metadata) + AVRO content
Parquet file: Column-based schema (applied to Spark)

Serialization
Encoding

Data Replication

单位是 partition, partition有leader和follower两种⻆⾊。
All reads/ writes must go to partition leader

ISR (in-sync Replica)

Kafka 一般需要和 ZooKeeper 配合使用
ZooKeeper 有 service discovery,可以 detect 到 broker 是否挂掉,如果leader挂掉了,ZooKeeper 会选出 replication 最及时的 follower 作为leader

Latency vs. Bandwidth

The slow internet can be caused by low bandwidth or high latency.

Bandwidth (Speed): 

Bandwidth determines how fast data can be transferred over time.
Bandwidth is measured in Mbps or megabits per second.
download speed /upload speed

Latency: 

Latency is the delay. It’s the time it takes for your request for data to get to a server (like a website), and then for the data to get back to you.
Latency is measured in milliseconds, abbreviated “ms”.

e.g. 

Satellite Internet Connection (High Speed, High Latency): 

You would click a link on a web page and, after a noticeable delay, the web page would start downloading and show up almost all at once.

Theoretical Connection (Low Speed, Low Latency): 

You would click a link on a web page and the web page would start loading immediately. However, it would take a while to load completely and you would see images load one-by-one.

Cable Internet Connection (High Speed, Low Latency): 

You would click a link on a web page and the web page would appear almost immediately, downloading all at once.

Thursday, November 1, 2018

Introduction to Zookeeper

Master-Slave Model Common Issues 


  • 如何选举master? 
  • master crash了怎么办? 
  • worker crash了怎么办? 
  • master和worker中的通信不流畅怎么办?

Master-Slave Model Common Tasks 

  • Master Election 选举⾸领的过程,当整个系统初始化的时候,或者现有master crash的时候。
  • Crash Detection 及时快速发现分布式系统中的master或者worker是否还在。
  • Group Membership 分布式系统中,有时会有不同种类的任务,有不同的worker在做对应的任务,需要Group Membership机制,来给worker分类。
  • Metadata Management 系统运⾏中的关键性数据。

Architecture

ZooKeeper 保证了消息传递中的consistency, order, durability. 本质上是⽤来帮助其他分布式系统实现协同。 提供了⾮常简洁的⽅式去解决concurrency.
Zookeeper并不是⼀个直接的tool来解决synchronization的问题。 只是⼀个强⼀致性的共享的存储空间。 其他系统需要依据Zookeeper这个shared storage来实现

Coordinate Strategy

Coordinating with Messages PassingShared Storage
Coordinate with Shared Storage --> ZooKeeper

Concepts

Data Tree: Zookeeper是⼀个树状的存储结构,类似于计算机⽂件系统的结构。通过这个结构,能够 解决上述的各种问题
znode
Ephemeral znode: 临时的znode, 如果创建者crash了,对应的ephemeral znode就 会被删除。

Internal

Client - Server Interaction

对Zookeeper来讲,client指的是使⽤Zookeeper的distributed system,例如 Kafka, server就是zookeeper本身。 Zookeeper的Client - Server的connection使⽤的是TCP, Client只可以连接到⽐它更 新,或者等同状态的zookeeper server上。

Leader Election within Zookeeper 

Zookeeper是⼀个shared storage, 给别⼈做⾸领选举。
它⾃⼰如何做⾸领选举呢?它使 ⽤AKA Ensemble这种机制,来选举。

State Replication within Zookeeper 

Zookeeper是使⽤Zab的协议来实现的state replication.
当写操作过来,由leader来处理写操作,并将write request转化成⼀个transaction, 之后 leader会向所有的follower发送⼀个proposal, 所有的follower收到proposal后,必须返回 给leader⼀个ACK, 当leader得到majority的票数后,就会发送⼀个commit message, 这 时消息就正式写⼊zookeeper.

Kafka使⽤ISR的机制,写操作不⽤跟所有的follower征求意⻅,⽐较灵活,throughput很 ⾼,但是不是⾮常consistent. Kafka更快,Zookeeper更加的fault tolerant.

Usage

Crash Detection

每个节点使⽤⼀个ephemeral znode, 如果某个节点挂了,对应的ephemeral znode就会被删 除,同时对这个节点set watcher的所有节点就会被通知。

Group Membership

Data Tree采⽤的是树状结构,这就是⼀个天然的group management, 把同⼀group的znode 挂在同⼀个group znode下⾯。

Master Election

master 被发现挂了,找下一个非空 的znode就是我们的新master

Metadata Management

我们⽤persistent znode来存储metadata, 包含了master/slave的信息、Application specific data. 不能存储⼤量的数据,因为写的效率⾮常低。

Introduction to Cassandra

Cassandra has a P2P architecture.

High Availability, Eventual Consistency

Data Partitioning

Every node has the whole nodes map.
Shared-nothing architecture (Each node is independent and self-sufficient, and there is no single point of contention across the system) with data replication
Cassandra stores data by dividing data evenly around its cluster of nodes.

Consistent Hashing

How to distribute data efficiently?
1. Determining a node on which a specific piece of data should reside on.
2. Minimizing data movement when adding or removing nodes.

Keys and nodes map to the same ID space (normally a ring)
Node: Hash(IP)
Key: Hash(Key)

Consistent Hashing is very balancing, and a minimal number of keys need to be remapped to maintain load balance when nodes join or leave the network.

Eventual Consistency

Cassandra enables users to configure the number of replicas in a cluster
Reaching a consistent state often takes microseconds.

Gossip Protocol

Cassandra uses a gossip protocol to discover node state for all nodes in a cluster.
Exchange information with <= 3 nodes, not every node (to reduce network load).
The gossip protocol facilitates failure detection.

Bloom Filter

A bloom filter is an extremely fast way to test the existence of a data structure in a set.
A bloom filter can tell if an item might exist in a set or definitely does not exist in the set.
False positives are possible but false negatives are not.
Bloom filters are a good way of avoiding expensive I/O operation.


Cassandra Keyspace - Keyspace is similar to a schema in the RDBMS world.

Memtable - A memtable is a write-back cache residing in memory which has not been flushed to disk yet.
Write-Back Cache - A write-back cache is where the write operation is only directed to the cache and completion is immediately confirmed. This is different from Write-through cache where the write operation is directed at the cache but is only confirmed once the data is written to both the cache and the underlying storage structure.
SSTable - A Sorted String Table (SSTable) ordered immutable key-value map. It is basically an efficient way of storing large sorted data segments in a file.

Cassandra Write Path

Node level



Cassandra Read Path

Node level


SSTable read path


Introduction to HDFS

Strong consistency
Low availability -> common in a system that has a simple point failure

NameNode and DataNodes

HDFS has a master/slave architecture.

An HDFS cluster consists of a single NameNode, a master server that manages the file system namespace and regulates access to files by clients.
In addition, there are a number of DataNodes, usually one per node in the cluster, which manage storage attached to the nodes that they run on.

Namenode stores metadata
Datanode stores real data

HDFS exposes a file system namespace and allows user data to be stored in files. Internally, a file is split into one or more blocks and these blocks are stored in a set of DataNodes.

The NameNode executes file system namespace operations like opening, closing, and renaming files and directories. It also determines the mapping of blocks to DataNodes.
The DataNodes are responsible for serving read and write requests from the file system’s clients. The DataNodes also perform block creation, deletion, and replication upon instruction from the NameNode.


The File System Namespace

The NameNode maintains the file system namespace.
The NameNode is the arbitrator and repository for all HDFS metadata. The system is designed in such a way that user data never flows through the NameNode.

Data Replication

Replication factor -- configurable
The NameNode makes all decisions regarding replication of blocks.
It periodically receives a Heartbeat and a Blockreport from each of the DataNodes in the cluster.
Receipt of a Heartbeat implies that the DataNode is functioning properly.
A Blockreport contains a list of all blocks on a DataNode.

The Persistence of File System Metadata

The NameNode keeps an image of the entire file system namespace and file Blockmap in memory.
The NameNode uses a transaction log called the EditLog to persistently record every change that occurs to file system metadata. The NameNode uses a file in its local host OS file system to store the EditLog.
The entire file system namespace, including the mapping of blocks to files and file system properties, is stored in a file called the FsImage. The FsImage is stored as a file in the NameNode’s local file system too.

Communication Protocols

All HDFS communication protocols are layered on top of the TCP/IP protocol.
By design, the NameNode never initiates any RPCs. Instead, it only responds to RPC requests issued by DataNodes or clients.

Data Organization

A client request to create a file does not reach the NameNode immediately. In fact, initially, the HDFS client caches the file data into a temporary local file. Application writes are transparently redirected to this temporary local file. When the local file accumulates data worth over one HDFS block size, the client contacts the NameNode. The NameNode inserts the file name into the file system hierarchy and allocates a data block for it. The NameNode responds to the client request with the identity of the DataNode and the destination data block. Then the client flushes the block of data from the local temporary file to the specified DataNode. When a file is closed, the remaining un-flushed data in the temporary local file is transferred to the DataNode. The client then tells the NameNode that the file is closed. At this point, the NameNode commits the file creation operation into a persistent store. If the NameNode dies before the file is closed, the file is lost.

File Deletes and Undeletes

When a file is deleted by a user or an application, it is not immediately removed from HDFS. Instead, HDFS first renames it to a file in the /trash directory. The file can be restored quickly as long as it remains in /trash. A file remains in /trash for a configurable amount of time. After the expiry of its life in /trash, the NameNode deletes the file from the HDFS namespace.

Most Recent Posts